From 233c15b20cd38faac6f55ea4add562d779537dae Mon Sep 17 00:00:00 2001 From: Julien Cretin Date: Mon, 2 Nov 2020 10:18:16 +0100 Subject: [PATCH] Add new store (without tests yet) --- libraries/persistent_store/src/format.rs | 3 - libraries/persistent_store/src/lib.rs | 4 +- libraries/persistent_store/src/store.rs | 1128 +++++++++++++++++++++- 3 files changed, 1130 insertions(+), 5 deletions(-) diff --git a/libraries/persistent_store/src/format.rs b/libraries/persistent_store/src/format.rs index a34e6ec..712d8cd 100644 --- a/libraries/persistent_store/src/format.rs +++ b/libraries/persistent_store/src/format.rs @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO(ia0): Remove when the module is used. -#![allow(dead_code)] - #[macro_use] mod bitfield; diff --git a/libraries/persistent_store/src/lib.rs b/libraries/persistent_store/src/lib.rs index beb183b..27bdcd1 100644 --- a/libraries/persistent_store/src/lib.rs +++ b/libraries/persistent_store/src/lib.rs @@ -359,7 +359,9 @@ pub use self::buffer::{BufferCorruptFunction, BufferOptions, BufferStorage}; #[cfg(feature = "std")] pub use self::model::{StoreModel, StoreOperation}; pub use self::storage::{Storage, StorageError, StorageIndex, StorageResult}; -pub use self::store::{StoreError, StoreRatio, StoreResult, StoreUpdate}; +pub use self::store::{ + Store, StoreError, StoreHandle, StoreIter, StoreRatio, StoreResult, StoreUpdate, +}; /// Internal representation of natural numbers. /// diff --git a/libraries/persistent_store/src/store.rs b/libraries/persistent_store/src/store.rs index ab0d66f..ce34892 100644 --- a/libraries/persistent_store/src/store.rs +++ b/libraries/persistent_store/src/store.rs @@ -12,8 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{Nat, StorageError}; +use crate::format::*; +#[cfg(feature = "std")] +pub use crate::model::{StoreModel, StoreOperation}; +#[cfg(feature = "std")] +pub use crate::BufferStorage; +use crate::{usize_to_nat, Nat, Storage, StorageError, StorageIndex}; use alloc::vec::Vec; +use core::cmp::{max, min, Ordering}; +#[cfg(feature = "std")] +use std::collections::HashSet; /// Errors returned by store operations. #[derive(Debug, PartialEq, Eq)] @@ -101,6 +109,38 @@ impl StoreRatio { } } +/// Safe pointer to an entry. +/// +/// A store handle stays valid at least until the next mutable operation. Store operations taking a +/// handle as argument always verify that the handle is still valid. +#[derive(Clone, Debug)] +pub struct StoreHandle { + /// The key of the entry. + key: Nat, + + /// The position of the entry. + pos: Position, + + /// The length in bytes of the value. + len: Nat, +} + +impl StoreHandle { + /// Returns the key of the entry. + pub fn get_key(&self) -> usize { + self.key as usize + } + + /// Returns the value of the entry. + /// + /// # Errors + /// + /// Returns `InvalidArgument` if the entry has been deleted or compacted. + pub fn get_value(&self, store: &Store) -> StoreResult> { + store.get_value(self) + } +} + /// Represents an update to the store as part of a transaction. #[derive(Clone, Debug)] pub enum StoreUpdate { @@ -128,3 +168,1089 @@ impl StoreUpdate { } } } + +/// Implements a store with a map interface over a storage. +#[derive(Clone)] +pub struct Store { + /// The underlying storage. + storage: S, + + /// The storage configuration. + format: Format, +} + +impl Store { + /// Resumes or initializes a store for a given storage. + /// + /// If the storage is completely erased, it is initialized. Otherwise, a possible interrupted + /// operation is recovered by being either completed or rolled-back. In case of error, the + /// storage ownership is returned. + /// + /// # Errors + /// + /// Returns `InvalidArgument` if the storage is not supported. + pub fn new(storage: S) -> Result, (StoreError, S)> { + let format = match Format::new(&storage) { + None => return Err((StoreError::InvalidArgument, storage)), + Some(x) => x, + }; + let mut store = Store { storage, format }; + if let Err(error) = store.recover() { + return Err((error, store.storage)); + } + Ok(store) + } + + /// Iterates over the entries. + pub fn iter<'a>(&'a self) -> StoreResult> { + StoreIter::new(self) + } + + /// Returns the current capacity in words. + /// + /// The capacity represents the size of what is stored. + pub fn capacity(&self) -> StoreResult { + let total = self.format.total_capacity(); + let mut used = 0; + let mut pos = self.head()?; + let end = pos + self.format.virt_size(); + while pos < end { + let entry_pos = pos; + match self.parse_entry(&mut pos)? { + ParsedEntry::Tail => break, + ParsedEntry::Padding => (), + ParsedEntry::User(_) => used += pos - entry_pos, + _ => return Err(StoreError::InvalidStorage), + } + } + Ok(StoreRatio { used, total }) + } + + /// Returns the current lifetime in words. + /// + /// The lifetime represents the age of the storage. The limit is an over-approximation by at + /// most the maximum length of a value (the actual limit depends on the length of the prefix of + /// the first physical page once all its erase cycles have been used). + pub fn lifetime(&self) -> StoreResult { + let total = self.format.total_lifetime().get(); + let used = self.tail()?.get(); + Ok(StoreRatio { used, total }) + } + + /// Applies a sequence of updates as a single transaction. + /// + /// # Errors + /// + /// Returns `InvalidArgument` in the following circumstances: + /// - There are too many updates. + /// - The updates overlap, i.e. their keys are not disjoint. + /// - The updates are invalid, e.g. key out of bound or value too long. + pub fn transaction(&mut self, updates: &[StoreUpdate]) -> StoreResult<()> { + let count = usize_to_nat(updates.len()); + if count == 0 { + return Ok(()); + } + if count == 1 { + match updates[0] { + StoreUpdate::Insert { key, ref value } => return self.insert(key, value), + StoreUpdate::Remove { key } => return self.remove(key), + } + } + if count > self.format.max_updates() { + return Err(StoreError::InvalidArgument); + } + // Compute how much capacity (including transient) we need. + let mut sorted_keys = Vec::with_capacity(count as usize); + let mut word_capacity = 1 + count; + for update in updates { + let key = usize_to_nat(update.key()); + if key > self.format.max_key() { + return Err(StoreError::InvalidArgument); + } + if let Some(value) = update.value() { + let value_len = usize_to_nat(value.len()); + if value_len > self.format.max_value_len() { + return Err(StoreError::InvalidArgument); + } + word_capacity += self.format.bytes_to_words(value_len); + } + match sorted_keys.binary_search(&key) { + Ok(_) => return Err(StoreError::InvalidArgument), + Err(pos) => sorted_keys.insert(pos, key), + } + } + // Reserve the capacity. + self.reserve(word_capacity)?; + // Write the marker entry. + let marker = self.tail()?; + let entry = self.format.build_internal(InternalEntry::Marker { count }); + self.write_slice(marker, &entry)?; + self.init_page(marker, marker)?; + // Write the updates. + let mut tail = marker + 1; + for update in updates { + let length = match *update { + StoreUpdate::Insert { key, ref value } => { + let entry = self.format.build_user(usize_to_nat(key), value); + let word_size = self.format.word_size(); + let footer = usize_to_nat(entry.len()) / word_size - 1; + self.write_slice(tail, &entry[..(footer * word_size) as usize])?; + self.write_slice(tail + footer, &entry[(footer * word_size) as usize..])?; + footer + } + StoreUpdate::Remove { key } => { + let key = usize_to_nat(key); + let remove = self.format.build_internal(InternalEntry::Remove { key }); + self.write_slice(tail, &remove)?; + 0 + } + }; + self.init_page(tail, tail + length)?; + tail += 1 + length; + } + // Apply the transaction. + self.transaction_apply(&sorted_keys, marker) + } + + /// Removes multiple entries as part of a single transaction. + /// + /// Entries with a key larger or equal to `min_key` are deleted. + pub fn clear(&mut self, min_key: usize) -> StoreResult<()> { + let min_key = usize_to_nat(min_key); + if min_key > self.format.max_key() { + return Err(StoreError::InvalidArgument); + } + let clear = self.format.build_internal(InternalEntry::Clear { min_key }); + // We always have one word available. We can't use `reserve` because this is internal + // capacity, not user capacity. + while self.immediate_capacity()? < 1 { + self.compact()?; + } + let tail = self.tail()?; + self.write_slice(tail, &clear)?; + self.clear_delete(tail) + } + + /// Compacts the store once if needed. + /// + /// If the immediate capacity is at least `length` words, then nothing is modified. Otherwise, + /// one page is compacted. + pub fn prepare(&mut self, length: usize) -> Result<(), StoreError> { + if self.capacity()?.remaining() < length { + return Err(StoreError::NoCapacity); + } + if self.immediate_capacity()? < length as isize { + self.compact()?; + } + Ok(()) + } + + /// Recovers a possible interrupted operation. + /// + /// If the storage is completely erased, it is initialized. + pub fn recover(&mut self) -> StoreResult<()> { + self.recover_initialize()?; + self.recover_erase()?; + self.recover_compaction()?; + self.recover_operation()?; + Ok(()) + } + + /// Returns the value of an entry given its key. + pub fn find(&self, key: usize) -> StoreResult>> { + Ok(match self.find_handle(key)? { + None => None, + Some(handle) => Some(self.get_value(&handle)?), + }) + } + + /// Returns a handle to an entry given its key. + pub fn find_handle(&self, key: usize) -> StoreResult> { + let key = usize_to_nat(key); + for handle in self.iter()? { + let handle = handle?; + if handle.key == key { + return Ok(Some(handle)); + } + } + Ok(None) + } + + /// Inserts an entry in the store. + /// + /// If an entry for the same key is already present, it is replaced. + pub fn insert(&mut self, key: usize, value: &[u8]) -> StoreResult<()> { + // NOTE: This (and transaction) could take a position hint on the value to delete. + let key = usize_to_nat(key); + let value_len = usize_to_nat(value.len()); + if key > self.format.max_key() || value_len > self.format.max_value_len() { + return Err(StoreError::InvalidArgument); + } + let entry = self.format.build_user(key, value); + let entry_len = usize_to_nat(entry.len()); + self.reserve(entry_len / self.format.word_size())?; + let tail = self.tail()?; + let word_size = self.format.word_size(); + let footer = entry_len / word_size - 1; + self.write_slice(tail, &entry[..(footer * word_size) as usize])?; + self.write_slice(tail + footer, &entry[(footer * word_size) as usize..])?; + self.insert_init(tail, footer, key) + } + + /// Removes an entry given its key. + /// + /// This is not an error if there is no entry for this key. + pub fn remove(&mut self, key: usize) -> StoreResult<()> { + let key = usize_to_nat(key); + if key > self.format.max_key() { + return Err(StoreError::InvalidArgument); + } + self.delete_keys(&[key], self.tail()?) + } + + /// Removes an entry given a handle. + pub fn remove_handle(&mut self, handle: &StoreHandle) -> StoreResult<()> { + self.check_handle(handle)?; + self.delete_pos(handle.pos, self.format.bytes_to_words(handle.len)) + } + + /// Returns the maximum length in bytes of a value. + pub fn max_value_length(&self) -> usize { + self.format.max_value_len() as usize + } + + /// Returns the value of an entry given its handle. + fn get_value(&self, handle: &StoreHandle) -> StoreResult> { + self.check_handle(handle)?; + let mut pos = handle.pos; + match self.parse_entry(&mut pos)? { + ParsedEntry::User(header) => { + let mut result = self.read_slice(handle.pos + 1, header.length); + if header.flipped { + let last_byte = result.len() - 1; + result[last_byte] = 0xff; + } + Ok(result) + } + ParsedEntry::Padding => Err(StoreError::InvalidArgument), + _ => Err(StoreError::InvalidStorage), + } + } + + /// Initializes the storage if completely erased or partially initialized. + fn recover_initialize(&mut self) -> StoreResult<()> { + let word_size = self.format.word_size(); + for page in 0..self.format.num_pages() { + let (init, rest) = self.read_page(page).split_at(word_size as usize); + if (page > 0 && !is_erased(init)) || !is_erased(rest) { + return Ok(()); + } + } + let index = self.format.index_init(0); + let init_info = self.format.build_init(InitInfo { + cycle: 0, + prefix: 0, + }); + self.storage_write_slice(index, &init_info) + } + + /// Recovers a possible compaction interrupted while erasing the page. + fn recover_erase(&mut self) -> StoreResult<()> { + let mut pos = self.get_extremum_page_head(Ordering::Greater)?; + let end = pos.next_page(&self.format); + while pos < end { + let entry_pos = pos; + match self.parse_entry(&mut pos)? { + ParsedEntry::Internal(InternalEntry::Erase { .. }) => { + return self.compact_erase(entry_pos) + } + ParsedEntry::Padding | ParsedEntry::User(_) => (), + _ => break, + } + } + Ok(()) + } + + /// Recovers a possible compaction interrupted while copying the entries. + fn recover_compaction(&mut self) -> StoreResult<()> { + let head_page = self.head()?.page(&self.format); + match self.parse_compact(head_page)? { + WordState::Erased => Ok(()), + WordState::Partial => self.compact(), + WordState::Valid(_) => self.compact_copy(), + } + } + + /// Recover a possible interrupted operation which is not a compaction. + fn recover_operation(&mut self) -> StoreResult<()> { + let mut pos = self.head()?; + let mut prev_pos = pos; + let end = pos + self.format.virt_size(); + while pos < end { + let entry_pos = pos; + match self.parse_entry(&mut pos)? { + ParsedEntry::Tail => break, + ParsedEntry::User(_) => (), + ParsedEntry::Padding => { + self.wipe_span(entry_pos + 1, pos - entry_pos - 1)?; + } + ParsedEntry::Internal(InternalEntry::Erase { .. }) => { + return Err(StoreError::InvalidStorage); + } + ParsedEntry::Internal(InternalEntry::Clear { .. }) => { + return self.clear_delete(entry_pos); + } + ParsedEntry::Internal(InternalEntry::Marker { .. }) => { + return self.recover_transaction(entry_pos, end); + } + ParsedEntry::Internal(InternalEntry::Remove { .. }) => { + self.set_padding(entry_pos)?; + } + ParsedEntry::Partial => { + return self.recover_wipe_partial(entry_pos, pos - entry_pos - 1); + } + ParsedEntry::PartialUser => { + return self.recover_delete_user(entry_pos, pos - entry_pos - 1); + } + } + prev_pos = entry_pos; + } + pos = prev_pos; + if let ParsedEntry::User(header) = self.parse_entry(&mut pos)? { + self.insert_init(prev_pos, pos - prev_pos - 1, header.key)?; + } + Ok(()) + } + + /// Recovers a possible interrupted transaction. + fn recover_transaction(&mut self, marker: Position, end: Position) -> StoreResult<()> { + let mut pos = marker; + let count = match self.parse_entry(&mut pos)? { + ParsedEntry::Internal(InternalEntry::Marker { count }) => count, + _ => return Err(StoreError::InvalidStorage), + }; + let sorted_keys = self.recover_transaction_keys(count, pos, end)?; + match usize_to_nat(sorted_keys.len()).cmp(&count) { + Ordering::Less => (), + Ordering::Equal => return self.transaction_apply(&sorted_keys, marker), + Ordering::Greater => return Err(StoreError::InvalidStorage), + } + while pos < end { + let entry_pos = pos; + match self.parse_entry(&mut pos)? { + ParsedEntry::Tail => break, + ParsedEntry::Padding => (), + ParsedEntry::User(_) => { + self.delete_pos(entry_pos, pos - entry_pos - 1)?; + } + ParsedEntry::Internal(InternalEntry::Remove { .. }) => { + self.set_padding(entry_pos)?; + } + ParsedEntry::Partial => { + self.recover_wipe_partial(entry_pos, pos - entry_pos - 1)?; + break; + } + ParsedEntry::PartialUser => { + self.recover_delete_user(entry_pos, pos - entry_pos - 1)?; + break; + } + ParsedEntry::Internal(InternalEntry::Erase { .. }) + | ParsedEntry::Internal(InternalEntry::Clear { .. }) + | ParsedEntry::Internal(InternalEntry::Marker { .. }) => { + return Err(StoreError::InvalidStorage); + } + } + } + self.init_page(marker, marker)?; + self.set_padding(marker)?; + Ok(()) + } + + /// Returns the domain of a possible interrupted transaction. + /// + /// The domain is returned as a sorted list of keys. + fn recover_transaction_keys( + &mut self, + count: Nat, + mut pos: Position, + end: Position, + ) -> StoreResult> { + let mut sorted_keys = Vec::with_capacity(count as usize); + let mut prev_pos = pos; + while pos < end { + let entry_pos = pos; + let key = match self.parse_entry(&mut pos)? { + ParsedEntry::Tail + | ParsedEntry::Padding + | ParsedEntry::Partial + | ParsedEntry::PartialUser => break, + ParsedEntry::User(header) => header.key, + ParsedEntry::Internal(InternalEntry::Remove { key }) => key, + ParsedEntry::Internal(_) => return Err(StoreError::InvalidStorage), + }; + match sorted_keys.binary_search(&key) { + Ok(_) => return Err(StoreError::InvalidStorage), + Err(pos) => sorted_keys.insert(pos, key), + } + prev_pos = entry_pos; + } + pos = prev_pos; + match self.parse_entry(&mut pos)? { + ParsedEntry::User(_) | ParsedEntry::Internal(InternalEntry::Remove { .. }) => { + let length = pos - prev_pos - 1; + self.init_page(prev_pos, prev_pos + length)?; + } + _ => (), + } + Ok(sorted_keys) + } + + /// Completes a possible partial entry wipe. + fn recover_wipe_partial(&mut self, pos: Position, length: Nat) -> StoreResult<()> { + self.wipe_span(pos + 1, length)?; + self.init_page(pos, pos + length)?; + self.set_padding(pos)?; + Ok(()) + } + + /// Completes a possible partial entry deletion. + fn recover_delete_user(&mut self, pos: Position, length: Nat) -> StoreResult<()> { + self.init_page(pos, pos + length)?; + self.delete_pos(pos, length) + } + + /// Checks that a handle still points in the current window. + /// + /// In particular, the handle has not been compacted. + fn check_handle(&self, handle: &StoreHandle) -> StoreResult<()> { + if handle.pos < self.head()? { + Err(StoreError::InvalidArgument) + } else { + Ok(()) + } + } + + /// Compacts the store as needed. + /// + /// If there is at least `length` words of remaining capacity, pages are compacted until that + /// amount is immediately available. + fn reserve(&mut self, length: Nat) -> Result<(), StoreError> { + if self.capacity()?.remaining() < length as usize { + return Err(StoreError::NoCapacity); + } + while self.immediate_capacity()? < length as isize { + self.compact()?; + } + Ok(()) + } + + /// Continues an entry insertion after it has been written. + fn insert_init(&mut self, pos: Position, length: Nat, key: Nat) -> StoreResult<()> { + self.init_page(pos, pos + length)?; + self.delete_keys(&[key], pos)?; + Ok(()) + } + + /// Compacts one page. + fn compact(&mut self) -> StoreResult<()> { + let head = self.head()?; + if head.cycle(&self.format) >= self.format.max_page_erases() { + return Err(StoreError::NoLifetime); + } + let tail = max(self.tail()?, head.next_page(&self.format)); + let index = self.format.index_compact(head.page(&self.format)); + let compact_info = self.format.build_compact(CompactInfo { tail: tail - head }); + self.storage_write_slice(index, &compact_info)?; + self.compact_copy() + } + + /// Continues a compaction after its compact page info has been written. + fn compact_copy(&mut self) -> StoreResult<()> { + let mut head = self.head()?; + let page = head.page(&self.format); + let end = head.next_page(&self.format); + let mut tail = match self.parse_compact(page)? { + WordState::Valid(CompactInfo { tail }) => head + tail, + _ => return Err(StoreError::InvalidStorage), + }; + if tail < end { + return Err(StoreError::InvalidStorage); + } + while head < end { + let pos = head; + match self.parse_entry(&mut head)? { + ParsedEntry::Tail => break, + ParsedEntry::User(_) => (), + _ => continue, + }; + let length = head - pos; + // We have to copy the slice for 2 reasons: + // 1. We would need to work around the lifetime. This is possible using unsafe. + // 2. We can't pass a flash slice to the kernel. This should get fixed with + // https://github.com/tock/tock/issues/1274. + let entry = self.read_slice(pos, length * self.format.word_size()); + self.write_slice(tail, &entry)?; + self.init_page(tail, tail + (length - 1))?; + tail += length; + } + let erase = self.format.build_internal(InternalEntry::Erase { page }); + self.write_slice(tail, &erase)?; + self.init_page(tail, tail)?; + self.compact_erase(tail) + } + + /// Continues a compaction after its erase entry has been written. + fn compact_erase(&mut self, erase: Position) -> StoreResult<()> { + let page = match self.parse_entry(&mut erase.clone())? { + ParsedEntry::Internal(InternalEntry::Erase { page }) => page, + _ => return Err(StoreError::InvalidStorage), + }; + self.storage_erase_page(page)?; + let head = self.head()?; + let pos = head.page_begin(&self.format); + self.wipe_span(pos, head - pos)?; + self.set_padding(erase)?; + Ok(()) + } + + /// Continues a transaction after it has been written. + fn transaction_apply(&mut self, sorted_keys: &[Nat], marker: Position) -> StoreResult<()> { + self.delete_keys(&sorted_keys, marker)?; + self.set_padding(marker)?; + let end = self.head()? + self.format.virt_size(); + let mut pos = marker + 1; + while pos < end { + let entry_pos = pos; + match self.parse_entry(&mut pos)? { + ParsedEntry::Tail => break, + ParsedEntry::User(_) => (), + ParsedEntry::Internal(InternalEntry::Remove { .. }) => { + self.set_padding(entry_pos)? + } + _ => return Err(StoreError::InvalidStorage), + } + } + Ok(()) + } + + /// Continues a clear operation after its internal entry has been written. + fn clear_delete(&mut self, clear: Position) -> StoreResult<()> { + self.init_page(clear, clear)?; + let min_key = match self.parse_entry(&mut clear.clone())? { + ParsedEntry::Internal(InternalEntry::Clear { min_key }) => min_key, + _ => return Err(StoreError::InvalidStorage), + }; + let mut pos = self.head()?; + let end = pos + self.format.virt_size(); + while pos < end { + let entry_pos = pos; + match self.parse_entry(&mut pos)? { + ParsedEntry::Internal(InternalEntry::Clear { .. }) if entry_pos == clear => break, + ParsedEntry::User(header) if header.key >= min_key => { + self.delete_pos(entry_pos, pos - entry_pos - 1)?; + } + ParsedEntry::Padding | ParsedEntry::User(_) => (), + _ => return Err(StoreError::InvalidStorage), + } + } + self.set_padding(clear)?; + Ok(()) + } + + /// Deletes a set of entries up to a certain position. + fn delete_keys(&mut self, sorted_keys: &[Nat], end: Position) -> StoreResult<()> { + let mut pos = self.head()?; + while pos < end { + let entry_pos = pos; + match self.parse_entry(&mut pos)? { + ParsedEntry::Tail => break, + ParsedEntry::User(header) if sorted_keys.binary_search(&header.key).is_ok() => { + self.delete_pos(entry_pos, pos - entry_pos - 1)?; + } + ParsedEntry::Padding | ParsedEntry::User(_) => (), + _ => return Err(StoreError::InvalidStorage), + } + } + Ok(()) + } + + /// Deletes the entry at a given position. + fn delete_pos(&mut self, pos: Position, length: Nat) -> StoreResult<()> { + self.set_deleted(pos)?; + self.wipe_span(pos + 1, length)?; + Ok(()) + } + + /// Writes the init info of a page between 2 positions if needed. + /// + /// The positions should designate the first and last word of an entry. The init info of the + /// highest page is written in any of the following conditions: + /// - The entry starts at the beginning of a virtual page. + /// - The entry spans 2 pages. + fn init_page(&mut self, first: Position, last: Position) -> StoreResult<()> { + debug_assert!(first <= last); + debug_assert!(last - first < self.format.virt_page_size()); + let new_first = if first.word(&self.format) == 0 { + first + } else if first.page(&self.format) == last.page(&self.format) { + return Ok(()); + } else { + last + 1 + }; + let page = new_first.page(&self.format); + if let WordState::Valid(_) = self.parse_init(page)? { + return Ok(()); + } + let index = self.format.index_init(page); + let init_info = self.format.build_init(InitInfo { + cycle: new_first.cycle(&self.format), + prefix: new_first.word(&self.format), + }); + self.storage_write_slice(index, &init_info)?; + Ok(()) + } + + /// Sets the padding bit of a user header. + fn set_padding(&mut self, pos: Position) -> StoreResult<()> { + let mut word = Word::from_slice(self.read_word(pos)); + self.format.set_padding(&mut word); + self.write_slice(pos, &word.as_slice())?; + Ok(()) + } + + /// Sets the deleted bit of a user header. + fn set_deleted(&mut self, pos: Position) -> StoreResult<()> { + let mut word = Word::from_slice(self.read_word(pos)); + self.format.set_deleted(&mut word); + self.write_slice(pos, &word.as_slice())?; + Ok(()) + } + + /// Wipes a slice of words. + fn wipe_span(&mut self, pos: Position, length: Nat) -> StoreResult<()> { + let length = (length * self.format.word_size()) as usize; + self.write_slice(pos, &vec![0x00; length]) + } + + /// Returns an extremum page. + /// + /// With `Greater` returns the most recent page (or the tail). With `Less` returns the oldest + /// page (or the head). + fn get_extremum_page_head(&self, ordering: Ordering) -> StoreResult { + let mut best = None; + for page in 0..self.format.num_pages() { + let init = match self.parse_init(page)? { + WordState::Valid(x) => x, + _ => continue, + }; + let pos = self.format.page_head(init, page); + if best.map_or(true, |x| pos.cmp(&x) == ordering) { + best = Some(pos); + } + } + // There is always at least one initialized page. + best.ok_or(StoreError::InvalidStorage) + } + + /// Returns the number of words that can be written without compaction. + /// + /// This can be temporarily negative during compaction. + fn immediate_capacity(&self) -> StoreResult { + let tail = self.tail()?; + let end = self.head()? + self.format.virt_size(); + Ok(if tail > end { + -((tail - end) as isize) + } else { + (end - tail) as isize + }) + } + + /// Returns the position of the first word in the store. + pub(crate) fn head(&self) -> StoreResult { + self.get_extremum_page_head(Ordering::Less) + } + + /// Returns one past the position of the last word in the store. + pub(crate) fn tail(&self) -> StoreResult { + let mut pos = self.get_extremum_page_head(Ordering::Greater)?; + let end = pos.next_page(&self.format); + while pos < end { + if let ParsedEntry::Tail = self.parse_entry(&mut pos)? { + break; + } + } + Ok(pos) + } + + /// Parses the entry at a given position. + /// + /// The position is updated to point to the next entry. + fn parse_entry(&self, pos: &mut Position) -> StoreResult { + let valid = match self.parse_word(*pos)? { + WordState::Erased | WordState::Partial => return Ok(self.parse_partial(pos)), + WordState::Valid(x) => x, + }; + Ok(match valid { + ParsedWord::Padding(Padding { length }) => { + *pos += 1 + length; + ParsedEntry::Padding + } + ParsedWord::Header(header) if header.length > self.format.max_value_len() => { + self.parse_partial(pos) + } + ParsedWord::Header(header) => { + let length = self.format.bytes_to_words(header.length); + let footer = match length { + 0 => None, + _ => Some(self.read_word(*pos + length)), + }; + if header.check(footer) { + if header.key > self.format.max_key() { + return Err(StoreError::InvalidStorage); + } + *pos += 1 + length; + ParsedEntry::User(header) + } else if footer.map_or(true, |x| is_erased(x)) { + self.parse_partial(pos) + } else { + *pos += 1 + length; + ParsedEntry::PartialUser + } + } + ParsedWord::Internal(internal) => { + *pos += 1; + ParsedEntry::Internal(internal) + } + }) + } + + /// Parses a possible partial user entry. + /// + /// This does look ahead past the header and possible erased word in case words near the end of + /// the entry were written first. + fn parse_partial(&self, pos: &mut Position) -> ParsedEntry { + let mut length = None; + for i in 0..self.format.max_prefix_len() { + if !is_erased(self.read_word(*pos + i)) { + length = Some(i); + } + } + match length { + None => ParsedEntry::Tail, + Some(length) => { + *pos += 1 + length; + ParsedEntry::Partial + } + } + } + + /// Parses the init info of a page. + fn parse_init(&self, page: Nat) -> StoreResult> { + let index = self.format.index_init(page); + let word = self.storage_read_slice(index, self.format.word_size()); + self.format.parse_init(Word::from_slice(word)) + } + + /// Parses the compact info of a page. + fn parse_compact(&self, page: Nat) -> StoreResult> { + let index = self.format.index_compact(page); + let word = self.storage_read_slice(index, self.format.word_size()); + self.format.parse_compact(Word::from_slice(word)) + } + + /// Parses a word from the virtual storage. + fn parse_word(&self, pos: Position) -> StoreResult> { + self.format + .parse_word(Word::from_slice(self.read_word(pos))) + } + + /// Reads a slice from the virtual storage. + /// + /// The slice may span 2 pages. + fn read_slice(&self, pos: Position, length: Nat) -> Vec { + let mut result = Vec::with_capacity(length as usize); + let index = pos.index(&self.format); + let max_length = self.format.page_size() - usize_to_nat(index.byte); + result.extend_from_slice(self.storage_read_slice(index, min(length, max_length))); + if length > max_length { + // The slice spans the next page. + let index = pos.next_page(&self.format).index(&self.format); + result.extend_from_slice(self.storage_read_slice(index, length - max_length)); + } + result + } + + /// Reads a word from the virtual storage. + fn read_word(&self, pos: Position) -> &[u8] { + self.storage_read_slice(pos.index(&self.format), self.format.word_size()) + } + + /// Reads a physical page. + fn read_page(&self, page: Nat) -> &[u8] { + let index = StorageIndex { + page: page as usize, + byte: 0, + }; + self.storage_read_slice(index, self.format.page_size()) + } + + /// Reads a slice from the physical storage. + fn storage_read_slice(&self, index: StorageIndex, length: Nat) -> &[u8] { + // The only possible failures are if the slice spans multiple pages. + self.storage.read_slice(index, length as usize).unwrap() + } + + /// Writes a slice to the virtual storage. + /// + /// The slice may span 2 pages. + fn write_slice(&mut self, pos: Position, value: &[u8]) -> StoreResult<()> { + let index = pos.index(&self.format); + let max_length = (self.format.page_size() - usize_to_nat(index.byte)) as usize; + self.storage_write_slice(index, &value[..min(value.len(), max_length)])?; + if value.len() > max_length { + // The slice spans the next page. + let index = pos.next_page(&self.format).index(&self.format); + self.storage_write_slice(index, &value[max_length..])?; + } + Ok(()) + } + + /// Writes a slice to the physical storage. + /// + /// Only starts writing the slice from the first word that needs to be written (because it + /// differs from the current value). + fn storage_write_slice(&mut self, index: StorageIndex, value: &[u8]) -> StoreResult<()> { + let word_size = self.format.word_size(); + debug_assert!(usize_to_nat(value.len()) % word_size == 0); + let slice = self.storage.read_slice(index, value.len())?; + // Skip as many words that don't need to be written as possible. + for start in (0..usize_to_nat(value.len())).step_by(word_size as usize) { + if is_write_needed( + &slice[start as usize..][..word_size as usize], + &value[start as usize..][..word_size as usize], + )? { + // We must write the remaining slice. + let index = StorageIndex { + page: index.page, + byte: (usize_to_nat(index.byte) + start) as usize, + }; + let value = &value[start as usize..]; + self.storage.write_slice(index, value)?; + break; + } + } + // There is nothing remaining to write. + Ok(()) + } + + /// Erases a page if not already erased. + fn storage_erase_page(&mut self, page: Nat) -> StoreResult<()> { + if !is_erased(self.read_page(page)) { + self.storage.erase_page(page as usize)?; + } + Ok(()) + } +} + +// Those functions are not meant for production. +#[cfg(feature = "std")] +impl Store { + /// Returns the storage configuration. + pub fn format(&self) -> &Format { + &self.format + } + + /// Accesses the storage. + pub fn storage(&self) -> &BufferStorage { + &self.storage + } + + /// Accesses the storage mutably. + pub fn storage_mut(&mut self) -> &mut BufferStorage { + &mut self.storage + } + + /// Extracts the storage. + pub fn into_storage(self) -> BufferStorage { + self.storage + } + + /// Returns the value of a possibly deleted entry. + /// + /// If the value has been partially compacted, only return the non-compacted part. Returns an + /// empty value if it has been fully compacted. + pub fn inspect_value(&self, handle: &StoreHandle) -> Vec { + let head = self.head().unwrap(); + let length = self.format.bytes_to_words(handle.len); + if head <= handle.pos { + // The value has not been compacted. + self.read_slice(handle.pos + 1, handle.len) + } else if (handle.pos + length).page(&self.format) == head.page(&self.format) { + // The value has been partially compacted. + let next_page = handle.pos.next_page(&self.format); + let erased_len = (next_page - (handle.pos + 1)) * self.format.word_size(); + self.read_slice(next_page, handle.len - erased_len) + } else { + // The value has been fully compacted. + Vec::new() + } + } + + /// Applies an operation and returns the deleted entries. + /// + /// Note that the deleted entries are before any compaction, so they may point outside the + /// window. This is more expressive than returning the deleted entries after compaction since + /// compaction can be controlled independently. + pub fn apply(&mut self, operation: &StoreOperation) -> (Vec, StoreResult<()>) { + let deleted = |store: &Store, delete_key: &dyn Fn(usize) -> bool| { + store + .iter() + .unwrap() + .map(|x| x.unwrap()) + .filter(|x| delete_key(x.key as usize)) + .collect::>() + }; + match *operation { + StoreOperation::Transaction { ref updates } => { + let keys: HashSet = updates.iter().map(|x| x.key()).collect(); + let deleted = deleted(self, &|key| keys.contains(&key)); + (deleted, self.transaction(updates)) + } + StoreOperation::Clear { min_key } => { + let deleted = deleted(self, &|key| key >= min_key); + (deleted, self.clear(min_key)) + } + StoreOperation::Prepare { length } => (Vec::new(), self.prepare(length)), + } + } + + /// Initializes an erased storage as if it has been erased `cycle` times. + pub fn init_with_cycle(storage: &mut BufferStorage, cycle: usize) { + let format = Format::new(storage).unwrap(); + // Write the init info of the first page. + let mut index = format.index_init(0); + let init_info = format.build_init(InitInfo { + cycle: usize_to_nat(cycle), + prefix: 0, + }); + storage.write_slice(index, &init_info).unwrap(); + // Pad the first word of the page. This makes the store looks used, otherwise we may confuse + // it with a partially initialized store. + index.byte += 2 * format.word_size() as usize; + storage + .write_slice(index, &vec![0; format.word_size() as usize]) + .unwrap(); + // Inform the storage that the pages have been used. + for page in 0..storage.num_pages() { + storage.set_page_erases(page, cycle); + } + } +} + +/// Represents an entry in the store. +#[derive(Debug)] +enum ParsedEntry { + /// Padding entry. + /// + /// This can be any of the following: + /// - A deleted user entry. + /// - A completed internal entry. + /// - A wiped partial entry. + Padding, + + /// Non-deleted user entry. + User(Header), + + /// Internal entry. + Internal(InternalEntry), + + /// Partial user entry with non-erased footer. + /// + /// The fact that the footer is not erased and does not checksum, means that the header is + /// valid. In particular, the length is valid. We cannot wipe the entry because wiping the + /// footer may validate the checksum. So we mark the entry as deleted, which also wipes it. + PartialUser, + + /// Partial entry. + /// + /// This can be any of the following: + /// - A partial user entry with erased footer. + /// - A partial user entry with invalid length. + Partial, + + /// End of entries. + /// + /// In particular this is where the next entry will be written. + Tail, +} + +/// Iterates over the entries of a store. +pub struct StoreIter<'a, S: Storage> { + /// The store being iterated. + store: &'a Store, + + /// The position of the next entry. + pos: Position, + + /// Iteration stops when reaching this position. + end: Position, +} + +impl<'a, S: Storage> StoreIter<'a, S> { + /// Creates an iterator over the entries of a store. + fn new(store: &'a Store) -> StoreResult> { + let pos = store.head()?; + let end = pos + store.format.virt_size(); + Ok(StoreIter { store, pos, end }) + } +} + +impl<'a, S: Storage> StoreIter<'a, S> { + /// Returns the next entry and advances the iterator. + fn transposed_next(&mut self) -> StoreResult> { + if self.pos >= self.end { + return Ok(None); + } + while self.pos < self.end { + let entry_pos = self.pos; + match self.store.parse_entry(&mut self.pos)? { + ParsedEntry::Tail => break, + ParsedEntry::Padding => (), + ParsedEntry::User(header) => { + return Ok(Some(StoreHandle { + key: header.key, + pos: entry_pos, + len: header.length, + })) + } + _ => return Err(StoreError::InvalidStorage), + } + } + self.pos = self.end; + Ok(None) + } +} + +impl<'a, S: Storage> Iterator for StoreIter<'a, S> { + type Item = StoreResult; + + fn next(&mut self) -> Option> { + self.transposed_next().transpose() + } +} + +/// Returns whether 2 slices are different. +/// +/// Returns an error if `target` has a bit set to one for which `source` is set to zero. +fn is_write_needed(source: &[u8], target: &[u8]) -> StoreResult { + debug_assert_eq!(source.len(), target.len()); + for (&source, &target) in source.iter().zip(target.iter()) { + if source & target != target { + return Err(StoreError::InvalidStorage); + } + if source != target { + return Ok(true); + } + } + Ok(false) +}