| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| pub mod cache; |
| pub mod merge; |
| pub mod simple_persistent_layer; |
| pub mod skip_list_layer; |
| pub mod types; |
| |
| use { |
| crate::{ |
| drop_event::DropEvent, |
| log::*, |
| object_handle::{ReadObjectHandle, WriteBytes}, |
| serialized_types::{Version, LATEST_VERSION}, |
| }, |
| anyhow::Error, |
| cache::{ObjectCache, ObjectCacheResult}, |
| simple_persistent_layer::SimplePersistentLayerWriter, |
| skip_list_layer::SkipListLayer, |
| std::{ |
| fmt, |
| ops::Bound, |
| sync::{Arc, RwLock}, |
| }, |
| types::{ |
| Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey, |
| OrdLowerBound, Value, |
| }, |
| }; |
| |
| const SKIP_LIST_LAYER_ITEMS: usize = 512; |
| |
| // For serialization. |
| pub use simple_persistent_layer::LayerInfo; |
| |
| pub async fn layers_from_handles<K: Key, V: Value>( |
| handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>, |
| ) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> { |
| let mut layers = Vec::new(); |
| for handle in handles { |
| layers.push( |
| simple_persistent_layer::SimplePersistentLayer::open(handle).await? |
| as Arc<dyn Layer<K, V>>, |
| ); |
| } |
| Ok(layers) |
| } |
| |
| #[derive(Eq, PartialEq, Debug)] |
| pub enum Operation { |
| Insert, |
| ReplaceOrInsert, |
| MergeInto, |
| } |
| |
| pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>; |
| |
| struct Inner<K, V> { |
| mutable_layer: Arc<SkipListLayer<K, V>>, |
| layers: Vec<Arc<dyn Layer<K, V>>>, |
| mutation_callback: MutationCallback<K, V>, |
| } |
| |
| /// LSMTree manages a tree of layers to provide a key/value store. Each layer contains deltas on |
| /// the preceding layer. The top layer is an in-memory mutable layer. Layers can be compacted to |
| /// form a new combined layer. |
| pub struct LSMTree<K, V> { |
| data: RwLock<Inner<K, V>>, |
| merge_fn: merge::MergeFn<K, V>, |
| cache: Box<dyn ObjectCache<K, V>>, |
| } |
| |
| #[fxfs_trace::trace] |
| impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> { |
| /// Creates a new empty tree. |
| pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self { |
| LSMTree { |
| data: RwLock::new(Inner { |
| mutable_layer: Self::new_mutable_layer(), |
| layers: Vec::new(), |
| mutation_callback: None, |
| }), |
| merge_fn, |
| cache, |
| } |
| } |
| |
| /// Opens an existing tree from the provided handles to the layer objects. |
| pub async fn open( |
| merge_fn: merge::MergeFn<K, V>, |
| handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>, |
| cache: Box<dyn ObjectCache<K, V>>, |
| ) -> Result<Self, Error> { |
| Ok(LSMTree { |
| data: RwLock::new(Inner { |
| mutable_layer: Self::new_mutable_layer(), |
| layers: layers_from_handles(handles).await?, |
| mutation_callback: None, |
| }), |
| merge_fn, |
| cache, |
| }) |
| } |
| |
| /// Replaces the immutable layers. |
| pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) { |
| self.data.write().unwrap().layers = layers; |
| } |
| |
| /// Appends to the given layers at the end i.e. they should be base layers. This is supposed |
| /// to be used after replay when we are opening a tree and we have discovered the base layers. |
| pub async fn append_layers( |
| &self, |
| handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>, |
| ) -> Result<(), Error> { |
| let mut layers = layers_from_handles(handles).await?; |
| self.data.write().unwrap().layers.append(&mut layers); |
| Ok(()) |
| } |
| |
| /// Resets the immutable layers. |
| pub fn reset_immutable_layers(&self) { |
| self.data.write().unwrap().layers = Vec::new(); |
| } |
| |
| /// Seals the current mutable layer and creates a new one. |
| pub fn seal(&self) { |
| // We need to be sure there are no mutations currently in-progress. This is currently |
| // guaranteed by ensuring that all mutations take a read lock on `data`. |
| let mut data = self.data.write().unwrap(); |
| let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer()); |
| data.layers.insert(0, layer); |
| } |
| |
| /// Resets the tree to an empty state. |
| pub fn reset(&self) { |
| let mut data = self.data.write().unwrap(); |
| data.layers = Vec::new(); |
| data.mutable_layer = Self::new_mutable_layer(); |
| } |
| |
| /// Writes the items yielded by the iterator into the supplied object. |
| #[trace] |
| pub async fn compact_with_iterator<W: WriteBytes + Send>( |
| &self, |
| mut iterator: impl LayerIterator<K, V>, |
| writer: W, |
| block_size: u64, |
| ) -> Result<(), Error> { |
| let mut writer = SimplePersistentLayerWriter::<W, K, V>::new(writer, block_size).await?; |
| while let Some(item_ref) = iterator.get() { |
| debug!(?item_ref, "compact: writing"); |
| writer.write(item_ref).await?; |
| iterator.advance().await?; |
| } |
| writer.flush().await |
| } |
| |
| /// Returns an empty layer-set for this tree. |
| pub fn empty_layer_set(&self) -> LayerSet<K, V> { |
| LayerSet { layers: Vec::new(), merge_fn: self.merge_fn } |
| } |
| |
| /// Adds all the layers (including the mutable layer) to `layer_set`. |
| pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) { |
| let data = self.data.read().unwrap(); |
| layer_set.layers.reserve_exact(data.layers.len() + 1); |
| layer_set |
| .layers |
| .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>)); |
| for layer in &data.layers { |
| layer_set.layers.push(layer.clone().into()); |
| } |
| } |
| |
| /// Returns a clone of the current set of layers (including the mutable layer), after which one |
| /// can get an iterator. |
| pub fn layer_set(&self) -> LayerSet<K, V> { |
| let mut layer_set = self.empty_layer_set(); |
| self.add_all_layers_to_layer_set(&mut layer_set); |
| layer_set |
| } |
| |
| /// Returns the current set of immutable layers after which one can get an iterator (for e.g. |
| /// compacting). Since these layers are immutable, getting an iterator should not block |
| /// anything else. |
| pub fn immutable_layer_set(&self) -> LayerSet<K, V> { |
| let data = self.data.read().unwrap(); |
| let mut layers = Vec::with_capacity(data.layers.len()); |
| for layer in &data.layers { |
| layers.push(layer.clone().into()); |
| } |
| LayerSet { layers, merge_fn: self.merge_fn } |
| } |
| |
| /// Inserts an item into the mutable layer. |
| /// Returns error if item already exists. |
| pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> { |
| let key = item.key.clone(); |
| let val = item.value.clone(); |
| { |
| // `seal` below relies on us holding a read lock whilst we do the mutation. |
| let data = self.data.read().unwrap(); |
| if let Some(mutation_callback) = data.mutation_callback.as_ref() { |
| mutation_callback(Operation::Insert, &item); |
| } |
| data.mutable_layer.insert(item)?; |
| } |
| self.cache.invalidate(key, Some(val)); |
| Ok(()) |
| } |
| |
| /// Replaces or inserts an item into the mutable layer. |
| pub fn replace_or_insert(&self, item: Item<K, V>) { |
| let key = item.key.clone(); |
| let val = item.value.clone(); |
| { |
| // `seal` below relies on us holding a read lock whilst we do the mutation. |
| let data = self.data.read().unwrap(); |
| if let Some(mutation_callback) = data.mutation_callback.as_ref() { |
| mutation_callback(Operation::ReplaceOrInsert, &item); |
| } |
| data.mutable_layer.replace_or_insert(item); |
| } |
| self.cache.invalidate(key, Some(val)); |
| } |
| |
| /// Merges the given item into the mutable layer. |
| pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) { |
| let key = item.key.clone(); |
| { |
| // `seal` below relies on us holding a read lock whilst we do the mutation. |
| let data = self.data.read().unwrap(); |
| if let Some(mutation_callback) = data.mutation_callback.as_ref() { |
| mutation_callback(Operation::MergeInto, &item); |
| } |
| data.mutable_layer.merge_into(item, lower_bound, self.merge_fn); |
| } |
| self.cache.invalidate(key, None); |
| } |
| |
| /// Searches for an exact match for the given key. |
| pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error> |
| where |
| K: Eq, |
| { |
| // It is important that the cache lookup is done prior to fetching the layer set as the |
| // placeholder returned acts as a sort of lock for the validity of the item that may be |
| // inserted later via that placeholder. |
| let token = match self.cache.lookup_or_reserve(search_key) { |
| ObjectCacheResult::Value(value) => { |
| return Ok(Some(Item::new(search_key.clone(), value))) |
| } |
| ObjectCacheResult::Placeholder(token) => Some(token), |
| ObjectCacheResult::NoCache => None, |
| }; |
| let layer_set = self.layer_set(); |
| let mut merger = layer_set.merger(); |
| |
| Ok(match merger.seek(Bound::Included(search_key)).await?.get() { |
| Some(ItemRef { key, value, sequence }) if key == search_key => { |
| if let Some(token) = token { |
| token.complete(Some(value)); |
| } |
| Some(Item { key: key.clone(), value: value.clone(), sequence }) |
| } |
| _ => None, |
| }) |
| } |
| |
| pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> { |
| self.data.read().unwrap().mutable_layer.clone() |
| } |
| |
| /// Sets a mutation callback which is a callback that is triggered whenever any mutations are |
| /// applied to the tree. This might be useful for tests that want to record the precise |
| /// sequence of mutations that are applied to the tree. |
| pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) { |
| self.data.write().unwrap().mutation_callback = mutation_callback; |
| } |
| |
| /// Returns the earliest version used by a layer in the tree. |
| pub fn get_earliest_version(&self) -> Version { |
| let mut earliest_version = LATEST_VERSION; |
| for layer in self.layer_set().layers { |
| let layer_version = layer.get_version(); |
| if layer_version < earliest_version { |
| earliest_version = layer_version; |
| } |
| } |
| return earliest_version; |
| } |
| |
| /// Returns a new mutable layer. |
| pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> { |
| SkipListLayer::new(SKIP_LIST_LAYER_ITEMS) |
| } |
| |
| /// Replaces the mutable layer. |
| pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) { |
| self.data.write().unwrap().mutable_layer = layer; |
| } |
| } |
| |
| /// This is an RAII wrapper for a layer which holds a lock on the layer (via the Layer::lock |
| /// method). |
| pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>); |
| |
| impl<K, V> LockedLayer<K, V> { |
| pub async fn close_layer(self) { |
| let layer = self.1; |
| std::mem::drop(self.0); |
| layer.close().await; |
| } |
| } |
| |
| impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> { |
| fn from(layer: Arc<dyn Layer<K, V>>) -> Self { |
| let event = layer.lock().unwrap(); |
| Self(event, layer) |
| } |
| } |
| |
| impl<K, V> std::ops::Deref for LockedLayer<K, V> { |
| type Target = Arc<dyn Layer<K, V>>; |
| |
| fn deref(&self) -> &Self::Target { |
| &self.1 |
| } |
| } |
| |
| impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> { |
| fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) { |
| self.1.as_ref() |
| } |
| } |
| |
| /// A LayerSet provides a snapshot of the layers at a particular point in time, and allows you to |
| /// get an iterator. Iterators borrow the layers so something needs to hold reference count. |
| pub struct LayerSet<K, V> { |
| pub layers: Vec<LockedLayer<K, V>>, |
| merge_fn: merge::MergeFn<K, V>, |
| } |
| |
| impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> { |
| pub fn merger(&self) -> merge::Merger<'_, K, V> { |
| merge::Merger::new(self.layers.iter().map(|x| x.as_ref()), self.merge_fn) |
| } |
| } |
| |
| impl<K, V> fmt::Debug for LayerSet<K, V> { |
| fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| fmt.debug_list() |
| .entries(self.layers.iter().map(|l| { |
| if let Some(handle) = l.handle() { |
| format!("{}", handle.object_id()) |
| } else { |
| format!("{:?}", Arc::as_ptr(l)) |
| } |
| })) |
| .finish() |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::LSMTree, |
| crate::{ |
| drop_event::DropEvent, |
| lsm_tree::{ |
| cache::{NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult}, |
| layers_from_handles, |
| merge::{MergeLayerIterator, MergeResult}, |
| types::{ |
| BoxedLayerIterator, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, |
| OrdLowerBound, OrdUpperBound, SortByU64, Value, |
| }, |
| }, |
| object_handle::ObjectHandle, |
| serialized_types::{ |
| versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION, |
| }, |
| testing::{ |
| fake_object::{FakeObject, FakeObjectHandle}, |
| writer::Writer, |
| }, |
| }, |
| anyhow::{anyhow, Error}, |
| async_trait::async_trait, |
| fprint::TypeFingerprint, |
| rand::{seq::SliceRandom, thread_rng}, |
| std::{ |
| hash::Hash, |
| ops::Bound, |
| sync::{Arc, Mutex}, |
| }, |
| }; |
| |
| #[derive( |
| Clone, |
| Eq, |
| PartialEq, |
| Debug, |
| Hash, |
| serde::Serialize, |
| serde::Deserialize, |
| TypeFingerprint, |
| Versioned, |
| )] |
| struct TestKey(std::ops::Range<u64>); |
| |
| versioned_type! { 1.. => TestKey } |
| |
| impl SortByU64 for TestKey { |
| fn get_leading_u64(&self) -> u64 { |
| self.0.start |
| } |
| } |
| |
| impl LayerKey for TestKey {} |
| |
| impl OrdUpperBound for TestKey { |
| fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering { |
| self.0.end.cmp(&other.0.end) |
| } |
| } |
| |
| impl OrdLowerBound for TestKey { |
| fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering { |
| self.0.start.cmp(&other.0.start) |
| } |
| } |
| |
| fn emit_left_merge_fn( |
| _left: &MergeLayerIterator<'_, TestKey, u64>, |
| _right: &MergeLayerIterator<'_, TestKey, u64>, |
| ) -> MergeResult<TestKey, u64> { |
| MergeResult::EmitLeft |
| } |
| |
| #[fuchsia::test] |
| async fn test_iteration() { |
| let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {})); |
| let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)]; |
| tree.insert(items[0].clone()).expect("insert error"); |
| tree.insert(items[1].clone()).expect("insert error"); |
| let layers = tree.layer_set(); |
| let mut merger = layers.merger(); |
| let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed"); |
| let ItemRef { key, value, .. } = iter.get().expect("missing item"); |
| assert_eq!((key, value), (&items[0].key, &items[0].value)); |
| iter.advance().await.expect("advance failed"); |
| let ItemRef { key, value, .. } = iter.get().expect("missing item"); |
| assert_eq!((key, value), (&items[1].key, &items[1].value)); |
| iter.advance().await.expect("advance failed"); |
| assert!(iter.get().is_none()); |
| } |
| |
| #[fuchsia::test] |
| async fn test_compact() { |
| let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {})); |
| let items = [ |
| Item::new(TestKey(1..1), 1), |
| Item::new(TestKey(2..2), 2), |
| Item::new(TestKey(3..3), 3), |
| Item::new(TestKey(4..4), 4), |
| ]; |
| tree.insert(items[0].clone()).expect("insert error"); |
| tree.insert(items[1].clone()).expect("insert error"); |
| tree.seal(); |
| tree.insert(items[2].clone()).expect("insert error"); |
| tree.insert(items[3].clone()).expect("insert error"); |
| tree.seal(); |
| let object = Arc::new(FakeObject::new()); |
| let handle = FakeObjectHandle::new(object.clone()); |
| { |
| let layer_set = tree.immutable_layer_set(); |
| let mut merger = layer_set.merger(); |
| let iter = merger.seek(Bound::Unbounded).await.expect("create merger"); |
| tree.compact_with_iterator(iter, Writer::new(&handle).await, handle.block_size()) |
| .await |
| .expect("compact failed"); |
| } |
| tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed")); |
| let handle = FakeObjectHandle::new(object.clone()); |
| let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {})) |
| .await |
| .expect("open failed"); |
| |
| let layers = tree.layer_set(); |
| let mut merger = layers.merger(); |
| let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed"); |
| for i in 1..5 { |
| let ItemRef { key, value, .. } = iter.get().expect("missing item"); |
| assert_eq!((key, value), (&TestKey(i..i), &i)); |
| iter.advance().await.expect("advance failed"); |
| } |
| assert!(iter.get().is_none()); |
| } |
| |
| #[fuchsia::test] |
| async fn test_find() { |
| let items = [ |
| Item::new(TestKey(1..1), 1), |
| Item::new(TestKey(2..2), 2), |
| Item::new(TestKey(3..3), 3), |
| Item::new(TestKey(4..4), 4), |
| ]; |
| let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {})); |
| tree.insert(items[0].clone()).expect("insert error"); |
| tree.insert(items[1].clone()).expect("insert error"); |
| tree.seal(); |
| tree.insert(items[2].clone()).expect("insert error"); |
| tree.insert(items[3].clone()).expect("insert error"); |
| |
| let item = tree.find(&items[1].key).await.expect("find failed").expect("not found"); |
| assert_eq!(item, items[1]); |
| assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none()); |
| } |
| |
| #[fuchsia::test] |
| async fn test_empty_seal() { |
| let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {})); |
| tree.seal(); |
| let item = Item::new(TestKey(1..1), 1); |
| tree.insert(item.clone()).expect("insert error"); |
| let object = Arc::new(FakeObject::new()); |
| let handle = FakeObjectHandle::new(object.clone()); |
| { |
| let layer_set = tree.immutable_layer_set(); |
| let mut merger = layer_set.merger(); |
| let iter = merger.seek(Bound::Unbounded).await.expect("create merger"); |
| tree.compact_with_iterator(iter, Writer::new(&handle).await, handle.block_size()) |
| .await |
| .expect("compact failed"); |
| } |
| tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed")); |
| let found_item = tree.find(&item.key).await.expect("find failed").expect("not found"); |
| assert_eq!(found_item, item); |
| assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none()); |
| } |
| |
| #[fuchsia::test] |
| async fn test_filter() { |
| let items = [ |
| Item::new(TestKey(1..1), 1), |
| Item::new(TestKey(2..2), 2), |
| Item::new(TestKey(3..3), 3), |
| Item::new(TestKey(4..4), 4), |
| ]; |
| let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {})); |
| tree.insert(items[0].clone()).expect("insert error"); |
| tree.insert(items[1].clone()).expect("insert error"); |
| tree.insert(items[2].clone()).expect("insert error"); |
| tree.insert(items[3].clone()).expect("insert error"); |
| |
| let layers = tree.layer_set(); |
| let mut merger = layers.merger(); |
| |
| // Filter out odd keys (which also guarantees we skip the first key which is an edge case). |
| let mut iter = merger |
| .seek(Bound::Unbounded) |
| .await |
| .expect("seek failed") |
| .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0) |
| .await |
| .expect("filter failed"); |
| |
| assert_eq!(iter.get(), Some(items[1].as_item_ref())); |
| iter.advance().await.expect("advance failed"); |
| assert_eq!(iter.get(), Some(items[3].as_item_ref())); |
| iter.advance().await.expect("advance failed"); |
| assert!(iter.get().is_none()); |
| } |
| |
| #[fuchsia::test] |
| async fn test_insert_order_agnostic() { |
| let items = [ |
| Item::new(TestKey(1..1), 1), |
| Item::new(TestKey(2..2), 2), |
| Item::new(TestKey(3..3), 3), |
| Item::new(TestKey(4..4), 4), |
| Item::new(TestKey(5..5), 5), |
| Item::new(TestKey(6..6), 6), |
| ]; |
| let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {})); |
| for item in &items { |
| a.insert(item.clone()).expect("insert error"); |
| } |
| let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {})); |
| let mut shuffled = items.clone(); |
| shuffled.shuffle(&mut thread_rng()); |
| for item in &shuffled { |
| b.insert(item.clone()).expect("insert error"); |
| } |
| let layers = a.layer_set(); |
| let mut merger = layers.merger(); |
| let mut iter_a = merger.seek(Bound::Unbounded).await.expect("seek failed"); |
| let layers = b.layer_set(); |
| let mut merger = layers.merger(); |
| let mut iter_b = merger.seek(Bound::Unbounded).await.expect("seek failed"); |
| |
| for item in items { |
| assert_eq!(Some(item.as_item_ref()), iter_a.get()); |
| assert_eq!(Some(item.as_item_ref()), iter_b.get()); |
| iter_a.advance().await.expect("advance failed"); |
| iter_b.advance().await.expect("advance failed"); |
| } |
| assert!(iter_a.get().is_none()); |
| assert!(iter_b.get().is_none()); |
| } |
| |
| struct AuditCacheInner<'a, V: Value> { |
| lookups: u64, |
| completions: u64, |
| invalidations: u64, |
| drops: u64, |
| result: Option<ObjectCacheResult<'a, V>>, |
| } |
| |
| impl<V: Value> AuditCacheInner<'_, V> { |
| fn stats(&self) -> (u64, u64, u64, u64) { |
| (self.lookups, self.completions, self.invalidations, self.drops) |
| } |
| } |
| |
| struct AuditCache<'a, V: Value> { |
| inner: Arc<Mutex<AuditCacheInner<'a, V>>>, |
| } |
| |
| impl<V: Value> AuditCache<'_, V> { |
| fn new() -> Self { |
| Self { |
| inner: Arc::new(Mutex::new(AuditCacheInner { |
| lookups: 0, |
| completions: 0, |
| invalidations: 0, |
| drops: 0, |
| result: None, |
| })), |
| } |
| } |
| } |
| |
| struct AuditPlaceholder<'a, V: Value> { |
| inner: Arc<Mutex<AuditCacheInner<'a, V>>>, |
| completed: Mutex<bool>, |
| } |
| |
| impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> { |
| fn complete(self: Box<Self>, _: Option<&V>) { |
| self.inner.lock().unwrap().completions += 1; |
| *self.completed.lock().unwrap() = true; |
| } |
| } |
| |
| impl<V: Value> Drop for AuditPlaceholder<'_, V> { |
| fn drop(&mut self) { |
| if !*self.completed.lock().unwrap() { |
| self.inner.lock().unwrap().drops += 1; |
| } |
| } |
| } |
| |
| impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> { |
| fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> { |
| { |
| let mut inner = self.inner.lock().unwrap(); |
| inner.lookups += 1; |
| if inner.result.is_some() { |
| return std::mem::take(&mut inner.result).unwrap(); |
| } |
| } |
| ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder { |
| inner: self.inner.clone(), |
| completed: Mutex::new(false), |
| })) |
| } |
| |
| fn invalidate(&self, _key: K, _value: Option<V>) { |
| self.inner.lock().unwrap().invalidations += 1; |
| } |
| } |
| |
| #[fuchsia::test] |
| async fn test_cache_handling() { |
| let item = Item::new(TestKey(1..1), 1); |
| let cache = Box::new(AuditCache::new()); |
| let inner = cache.inner.clone(); |
| let a = LSMTree::new(emit_left_merge_fn, cache); |
| |
| // Zero counters. |
| assert_eq!(inner.lock().unwrap().stats(), (0, 0, 0, 0)); |
| |
| // Look for an item, but don't find it. So no insertion. It is dropped. |
| assert!(a.find(&item.key).await.expect("Failed find").is_none()); |
| assert_eq!(inner.lock().unwrap().stats(), (1, 0, 0, 1)); |
| |
| // Insert attempts to invalidate. |
| let _ = a.insert(item.clone()); |
| assert_eq!(inner.lock().unwrap().stats(), (1, 0, 1, 1)); |
| |
| // Look for item, find it and insert into the cache. |
| assert_eq!( |
| a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value, |
| item.value |
| ); |
| assert_eq!(inner.lock().unwrap().stats(), (2, 1, 1, 1)); |
| |
| // Insert or replace attempts to invalidate as well. |
| a.replace_or_insert(item.clone()); |
| assert_eq!(inner.lock().unwrap().stats(), (2, 1, 2, 1)); |
| } |
| |
| #[fuchsia::test] |
| async fn test_cache_hit() { |
| let item = Item::new(TestKey(1..1), 1); |
| let cache = Box::new(AuditCache::new()); |
| let inner = cache.inner.clone(); |
| let a = LSMTree::new(emit_left_merge_fn, cache); |
| |
| // Zero counters. |
| assert_eq!(inner.lock().unwrap().stats(), (0, 0, 0, 0)); |
| |
| // Insert attempts to invalidate. |
| let _ = a.insert(item.clone()); |
| assert_eq!(inner.lock().unwrap().stats(), (0, 0, 1, 0)); |
| |
| // Set up the item to find in the cache. |
| inner.lock().unwrap().result = Some(ObjectCacheResult::Value(item.value.clone())); |
| |
| // Look for item, find it in cache, so no insert. |
| assert_eq!( |
| a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value, |
| item.value |
| ); |
| assert_eq!(inner.lock().unwrap().stats(), (1, 0, 1, 0)); |
| } |
| |
| #[fuchsia::test] |
| async fn test_cache_says_uncacheable() { |
| let item = Item::new(TestKey(1..1), 1); |
| let cache = Box::new(AuditCache::new()); |
| let inner = cache.inner.clone(); |
| let a = LSMTree::new(emit_left_merge_fn, cache); |
| let _ = a.insert(item.clone()); |
| |
| // One invalidation from the insert. |
| assert_eq!(inner.lock().unwrap().stats(), (0, 0, 1, 0)); |
| |
| // Set up the NoCache response to find in the cache. |
| inner.lock().unwrap().result = Some(ObjectCacheResult::NoCache); |
| |
| // Look for item, it is uncacheable, so no insert. |
| assert_eq!( |
| a.find(&item.key).await.expect("Failed find").expect("Should find item").value, |
| item.value |
| ); |
| assert_eq!(inner.lock().unwrap().stats(), (1, 0, 1, 0)); |
| } |
| |
| struct FailLayer { |
| drop_event: Mutex<Option<Arc<DropEvent>>>, |
| } |
| |
| impl FailLayer { |
| fn new() -> Self { |
| Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) } |
| } |
| } |
| |
| #[async_trait] |
| impl<K: Key, V: Value> Layer<K, V> for FailLayer { |
| async fn seek( |
| &self, |
| _bound: std::ops::Bound<&K>, |
| ) -> Result<BoxedLayerIterator<'_, K, V>, Error> { |
| Err(anyhow!("Purposely failed seek")) |
| } |
| |
| fn lock(&self) -> Option<Arc<DropEvent>> { |
| self.drop_event.lock().unwrap().clone() |
| } |
| |
| async fn close(&self) { |
| let listener = match std::mem::replace(&mut (*self.drop_event.lock().unwrap()), None) { |
| Some(drop_event) => drop_event.listen(), |
| None => return, |
| }; |
| listener.await; |
| } |
| |
| fn get_version(&self) -> Version { |
| LATEST_VERSION |
| } |
| } |
| |
| #[fuchsia::test] |
| async fn test_failed_lookup() { |
| let cache = Box::new(AuditCache::new()); |
| let inner = cache.inner.clone(); |
| let a = LSMTree::new(emit_left_merge_fn, cache); |
| a.set_layers(vec![Arc::new(FailLayer::new())]); |
| |
| // Zero counters. |
| assert_eq!(inner.lock().unwrap().stats(), (0, 0, 0, 0)); |
| |
| // Lookup should fail and drop the placeholder. |
| assert!(a.find(&TestKey(1..1)).await.is_err()); |
| assert_eq!(inner.lock().unwrap().stats(), (1, 0, 0, 1)); |
| } |
| } |
| |
| #[cfg(fuzz)] |
| mod fuzz { |
| use { |
| crate::{ |
| lsm_tree::types::{Item, LayerKey, OrdLowerBound, OrdUpperBound, SortByU64}, |
| serialized_types::{ |
| versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION, |
| }, |
| }, |
| arbitrary::Arbitrary, |
| fprint::TypeFingerprint, |
| fuzz::fuzz, |
| std::hash::Hash, |
| }; |
| |
| #[derive( |
| Arbitrary, |
| Clone, |
| Eq, |
| Hash, |
| PartialEq, |
| Debug, |
| serde::Serialize, |
| serde::Deserialize, |
| TypeFingerprint, |
| Versioned, |
| )] |
| struct TestKey(std::ops::Range<u64>); |
| |
| versioned_type! { 1.. => TestKey } |
| |
| impl Versioned for u64 {} |
| versioned_type! { 1.. => u64 } |
| |
| impl LayerKey for TestKey {} |
| |
| impl SortByU64 for TestKey { |
| fn get_leading_u64(&self) -> u64 { |
| self.0.start |
| } |
| } |
| |
| impl OrdUpperBound for TestKey { |
| fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering { |
| self.0.end.cmp(&other.0.end) |
| } |
| } |
| |
| impl OrdLowerBound for TestKey { |
| fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering { |
| self.0.start.cmp(&other.0.start) |
| } |
| } |
| |
| #[allow(dead_code)] // TODO(https://fxbug.dev/318827209) |
| #[derive(Arbitrary)] |
| enum FuzzAction { |
| Insert(Item<TestKey, u64>), |
| ReplaceOrInsert(Item<TestKey, u64>), |
| MergeInto(Item<TestKey, u64>, TestKey), |
| Find(TestKey), |
| Seal, |
| } |
| |
| #[fuzz] |
| fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) { |
| use { |
| super::{cache::NullCache, LSMTree}, |
| crate::lsm_tree::merge::{MergeLayerIterator, MergeResult}, |
| futures::executor::block_on, |
| }; |
| |
| fn emit_left_merge_fn( |
| _left: &MergeLayerIterator<'_, TestKey, u64>, |
| _right: &MergeLayerIterator<'_, TestKey, u64>, |
| ) -> MergeResult<TestKey, u64> { |
| MergeResult::EmitLeft |
| } |
| |
| let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {})); |
| for action in actions { |
| match action { |
| FuzzAction::Insert(item) => { |
| let _ = tree.insert(item); |
| } |
| FuzzAction::ReplaceOrInsert(item) => { |
| tree.replace_or_insert(item); |
| } |
| FuzzAction::Find(key) => { |
| block_on(tree.find(&key)).expect("find failed"); |
| } |
| FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound), |
| FuzzAction::Seal => tree.seal(), |
| }; |
| } |
| } |
| } |