| pub mod merge; |
| mod simple_persistent_layer; |
| pub mod skip_list_layer; |
| #[cfg(test)] |
| mod tests; |
| |
| use { |
| crate::object_handle::ObjectHandle, |
| anyhow::Error, |
| serde::{Deserialize, Serialize}, |
| simple_persistent_layer::SimplePersistentLayerWriter, |
| std::{ |
| ops::Bound, |
| ptr::NonNull, |
| sync::{Arc, RwLock}, |
| }, |
| }; |
| |
| const SKIP_LIST_LAYER_ITEMS: usize = 512; |
| |
| // Use trait_alias when available. |
| pub trait Key: |
| std::cmp::Ord |
| + OrdLowerBound |
| + std::fmt::Debug |
| + Send |
| + Sync |
| + std::marker::Unpin |
| + serde::de::DeserializeOwned |
| + serde::Serialize |
| + 'static |
| { |
| } |
| pub trait Value: |
| std::fmt::Debug |
| + Send |
| + Sync |
| + serde::de::DeserializeOwned |
| + serde::Serialize |
| + std::marker::Unpin |
| + 'static |
| { |
| } |
| |
| impl<K> Key for K where |
| K: std::cmp::Ord |
| + OrdLowerBound |
| + std::fmt::Debug |
| + Send |
| + Sync |
| + std::marker::Unpin |
| + serde::de::DeserializeOwned |
| + serde::Serialize |
| + 'static |
| { |
| } |
| impl<V> Value for V where |
| V: std::fmt::Debug |
| + Send |
| + Sync |
| + std::marker::Unpin |
| + serde::de::DeserializeOwned |
| + serde::Serialize |
| + 'static |
| { |
| } |
| |
| #[derive(Debug, Serialize)] |
| pub struct ItemRef<'a, K, V> { |
| pub key: &'a K, |
| pub value: &'a V, |
| } |
| |
| impl<'a, K, V> Clone for ItemRef<'a, K, V> { |
| fn clone(&self) -> Self { |
| ItemRef { key: self.key, value: self.value } |
| } |
| } |
| impl<'a, K, V> Copy for ItemRef<'a, K, V> {} |
| |
| #[derive(Clone, Debug, Serialize, Deserialize)] |
| pub struct Item<K, V> { |
| pub key: K, |
| pub value: V, |
| } |
| |
| impl<K, V> Item<K, V> { |
| #[cfg(test)] |
| pub fn new(key: K, value: V) -> Item<K, V> { |
| Item { key, value } |
| } |
| |
| pub fn as_item_ref(&self) -> ItemRef<'_, K, V> { |
| self.into() |
| } |
| } |
| |
| impl<'a, K, V> From<&'a Item<K, V>> for ItemRef<'a, K, V> { |
| fn from(item: &'a Item<K, V>) -> ItemRef<'a, K, V> { |
| ItemRef { key: &item.key, value: &item.value } |
| } |
| } |
| |
| pub trait OrdLowerBound { |
| fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering; |
| } |
| |
| // TODO: make this private. |
| pub trait LayerIterator<K, V>: Send { |
| fn seek(&mut self, bound: std::ops::Bound<&K>) -> Result<(), Error>; |
| fn advance(&mut self) -> Result<(), Error>; |
| fn get(&self) -> Option<ItemRef<'_, K, V>>; |
| // TODO: remove this |
| fn discard_or_advance(&mut self) -> Result<(), Error>; |
| } |
| |
| pub trait LayerIteratorMut<K, V>: LayerIterator<K, V> { |
| fn as_iterator_mut(&mut self) -> &mut dyn LayerIterator<K, V>; |
| fn as_iterator(&self) -> &dyn LayerIterator<K, V>; |
| fn erase(&mut self); |
| fn insert_before(&mut self, item: Item<K, V>); |
| } |
| |
| type BoxedLayerIterator<'iter, K, V> = Box<dyn LayerIterator<K, V> + 'iter>; |
| |
| pub trait Layer<K, V>: Send + Sync { |
| fn get_iterator(&self) -> BoxedLayerIterator<'_, K, V>; |
| } |
| |
| pub trait MutableLayer<K, V>: Layer<K, V> { |
| fn as_layer(self: Arc<Self>) -> Arc<dyn Layer<K, V>>; |
| |
| fn dump(&self); |
| |
| fn insert(&self, item: Item<K, V>); |
| |
| fn replace_range(&self, item: Item<K, V>, lower_bound: &K, merge_fn: merge::MergeFn<K, V>); |
| |
| fn replace_or_insert(&self, item: Item<K, V>); |
| } |
| |
| struct Data<K, V> { |
| mutable_layer: Arc<dyn MutableLayer<K, V>>, |
| layers: Vec<Arc<dyn Layer<K, V>>>, |
| } |
| |
| pub struct LSMTree<K, V> { |
| data: RwLock<Data<K, V>>, |
| merge_fn: merge::MergeFn<K, V>, |
| } |
| |
| pub struct LSMTreeIter<'iter, K, V> { |
| _layers: Box<[Arc<dyn Layer<K, V> + 'iter>]>, |
| merger: merge::Merger<'iter, K, V>, |
| } |
| |
| impl< |
| 'iter, |
| K: std::fmt::Debug + OrdLowerBound + Unpin + 'static, |
| V: std::fmt::Debug + Unpin + 'static, |
| > LSMTreeIter<'iter, K, V> |
| { |
| fn new(layers: Box<[Arc<dyn Layer<K, V> + 'iter>]>, merge_fn: merge::MergeFn<K, V>) -> Self { |
| let iterators: Box<[BoxedLayerIterator<'_, K, V>]> = layers |
| .iter() |
| .map(|x| { |
| let ptr = NonNull::from(x); |
| unsafe { &*ptr.as_ptr() }.get_iterator() |
| }) |
| .collect(); |
| LSMTreeIter { _layers: layers, merger: merge::Merger::new(iterators, merge_fn) } |
| } |
| |
| pub fn get(&self) -> Option<ItemRef<'_, K, V>> { |
| self.merger.get() |
| } |
| |
| pub fn advance(&mut self) -> Result<(), Error> { |
| self.merger.advance() |
| } |
| |
| pub fn advance_to(&mut self, key: &K) -> Result<(), Error> { |
| self.merger.advance_to(key) |
| } |
| } |
| |
| impl<'tree, K: Key, V: Value> LSMTree<K, V> |
| where |
| for<'de> K: serde::Deserialize<'de>, |
| for<'de> V: serde::Deserialize<'de>, |
| { |
| pub fn new(merge_fn: merge::MergeFn<K, V>) -> Self { |
| LSMTree { |
| data: RwLock::new(Data { |
| mutable_layer: Arc::new(skip_list_layer::SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)), |
| layers: Vec::new(), |
| }), |
| merge_fn, |
| } |
| } |
| |
| fn layers_from_handles( |
| handles: Box<[impl ObjectHandle + 'static]>, |
| ) -> Vec<Arc<dyn Layer<K, V>>> { |
| handles |
| .into_vec() |
| .drain(..) |
| .map(|h| { |
| Arc::new(simple_persistent_layer::SimplePersistentLayer::new(h, 512)) |
| as Arc<dyn Layer<K, V>> |
| }) |
| .collect() |
| } |
| |
| pub fn open( |
| merge_fn: merge::MergeFn<K, V>, |
| handles: Box<[impl ObjectHandle + 'static]>, |
| ) -> Self { |
| LSMTree { |
| data: RwLock::new(Data { |
| mutable_layer: Arc::new(skip_list_layer::SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)), |
| layers: Self::layers_from_handles(handles), |
| }), |
| merge_fn, |
| } |
| } |
| |
| pub fn set_layers(&self, handles: Box<[impl ObjectHandle + 'static]>) { |
| let mut data = self.data.write().unwrap(); |
| data.layers = Self::layers_from_handles(handles); |
| } |
| |
| fn add_all_layers<'a>(&'a self, all_layers: &mut Vec<Arc<dyn Layer<K, V>>>) { |
| let data = self.data.read().unwrap(); |
| all_layers.push(data.mutable_layer.clone().as_layer().into()); |
| for layer in &data.layers { |
| all_layers.push(layer.clone().into()); |
| } |
| } |
| |
| pub fn dump_mutable_layer(&self) { |
| let data = self.data.read().unwrap(); |
| data.mutable_layer.dump(); |
| } |
| |
| // TODO(csuter): This should maybe run on a different thread. Depends when it's |
| // called. |
| // Writes the current memory layer to the given object handle and creates a new one. |
| pub fn commit<'a>( |
| &'a self, |
| mut object_handle: impl ObjectHandle + 'static, |
| ) -> Result<(), Error> { |
| let mut layers = Vec::new(); |
| self.add_all_layers(&mut layers); |
| { |
| let mut data = self.data.write().unwrap(); |
| data.mutable_layer = |
| Arc::new(skip_list_layer::SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)); |
| data.layers = layers.clone(); |
| } |
| // TODO: optimize for the case where the mutable layer is empty. |
| { |
| let mut writer = SimplePersistentLayerWriter::new(&mut object_handle, 512); |
| let iterators = layers.iter().map(|x| x.get_iterator()).collect(); |
| let mut merger = merge::Merger::new(iterators, self.merge_fn); |
| merger.advance()?; |
| while let Some(item_ref) = merger.get() { |
| eprintln!("{:?}", item_ref); |
| writer.write(item_ref)?; |
| merger.advance()?; |
| } |
| writer.close()?; |
| } |
| { |
| let mut data = self.data.write().unwrap(); |
| data.layers = vec![Arc::new(simple_persistent_layer::SimplePersistentLayer::new( |
| object_handle, |
| 512, |
| ))]; |
| } |
| Ok(()) |
| } |
| |
| pub fn iter(&self) -> LSMTreeIter<'tree, K, V> { |
| let mut layers = Vec::new(); |
| self.add_all_layers(&mut layers); |
| LSMTreeIter::new(layers.into_boxed_slice(), self.merge_fn) |
| } |
| |
| pub fn iter_with_layers( |
| &self, |
| mut layers: Vec<Arc<dyn Layer<K, V>>>, |
| ) -> LSMTreeIter<'tree, K, V> { |
| self.add_all_layers(&mut layers); |
| LSMTreeIter::new(layers.into_boxed_slice(), self.merge_fn) |
| } |
| |
| pub fn insert(&self, item: Item<K, V>) { |
| let mut data = self.data.write().unwrap(); // TODO: probably should be read and clone. |
| Arc::get_mut(&mut data.mutable_layer).unwrap().insert(item); // TODO: is get_mut call always safe? |
| } |
| |
| pub fn replace_or_insert(&self, item: Item<K, V>) { |
| let mut data = self.data.write().unwrap(); |
| Arc::get_mut(&mut data.mutable_layer).unwrap().replace_or_insert(item); // TODO |
| } |
| |
| pub fn replace_range(&self, item: Item<K, V>, lower_bound: &K) { |
| let mut data = self.data.write().unwrap(); |
| Arc::get_mut(&mut data.mutable_layer).unwrap().replace_range( |
| item, |
| lower_bound, |
| self.merge_fn, |
| ); |
| } |
| |
| pub fn range_from( |
| &self, |
| bound: std::ops::Bound<&K>, |
| ) -> Result<LSMTreeIter<'tree, K, V>, Error> { |
| let mut layers = Vec::new(); |
| self.add_all_layers(&mut layers); |
| let mut iter = LSMTreeIter::new(layers.into_boxed_slice(), self.merge_fn); |
| match bound { |
| Bound::Unbounded => iter.merger.advance()?, |
| Bound::Included(key) => iter.merger.advance_to(key)?, |
| Bound::Excluded(_) => panic!("Excluded bounds not supported!"), |
| }; |
| Ok(iter) |
| } |
| |
| pub fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error> |
| where |
| K: Clone, |
| V: Clone, |
| { |
| let mut layers = Vec::new(); |
| self.add_all_layers(&mut layers); |
| let iterators = layers.iter().map(|x| x.get_iterator()).collect(); |
| let mut merger = merge::Merger::new(iterators, self.merge_fn); |
| merger.advance_to(search_key)?; |
| Ok(match merger.get() { |
| Some(ItemRef { key, value }) => { |
| if key == search_key { |
| Some(Item { key: key.clone(), value: value.clone() }) |
| } else { |
| None |
| } |
| } |
| _ => None, |
| }) |
| } |
| } |