blob: 94eb950b61232b99e5ad9953773cee522bb2214e [file] [log] [blame]
pub mod merge;
mod simple_persistent_layer;
pub mod skip_list_layer;
#[cfg(test)]
mod tests;
use {
crate::object_handle::ObjectHandle,
anyhow::Error,
serde::{Deserialize, Serialize},
simple_persistent_layer::SimplePersistentLayerWriter,
std::{
ops::Bound,
ptr::NonNull,
sync::{Arc, RwLock},
},
};
const SKIP_LIST_LAYER_ITEMS: usize = 512;
// Use trait_alias when available.
pub trait Key:
std::cmp::Ord
+ OrdLowerBound
+ std::fmt::Debug
+ Send
+ Sync
+ std::marker::Unpin
+ serde::de::DeserializeOwned
+ serde::Serialize
+ 'static
{
}
pub trait Value:
std::fmt::Debug
+ Send
+ Sync
+ serde::de::DeserializeOwned
+ serde::Serialize
+ std::marker::Unpin
+ 'static
{
}
impl<K> Key for K where
K: std::cmp::Ord
+ OrdLowerBound
+ std::fmt::Debug
+ Send
+ Sync
+ std::marker::Unpin
+ serde::de::DeserializeOwned
+ serde::Serialize
+ 'static
{
}
impl<V> Value for V where
V: std::fmt::Debug
+ Send
+ Sync
+ std::marker::Unpin
+ serde::de::DeserializeOwned
+ serde::Serialize
+ 'static
{
}
#[derive(Debug, Serialize)]
pub struct ItemRef<'a, K, V> {
pub key: &'a K,
pub value: &'a V,
}
impl<'a, K, V> Clone for ItemRef<'a, K, V> {
fn clone(&self) -> Self {
ItemRef { key: self.key, value: self.value }
}
}
impl<'a, K, V> Copy for ItemRef<'a, K, V> {}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Item<K, V> {
pub key: K,
pub value: V,
}
impl<K, V> Item<K, V> {
#[cfg(test)]
pub fn new(key: K, value: V) -> Item<K, V> {
Item { key, value }
}
pub fn as_item_ref(&self) -> ItemRef<'_, K, V> {
self.into()
}
}
impl<'a, K, V> From<&'a Item<K, V>> for ItemRef<'a, K, V> {
fn from(item: &'a Item<K, V>) -> ItemRef<'a, K, V> {
ItemRef { key: &item.key, value: &item.value }
}
}
pub trait OrdLowerBound {
fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering;
}
// TODO: make this private.
pub trait LayerIterator<K, V>: Send {
fn seek(&mut self, bound: std::ops::Bound<&K>) -> Result<(), Error>;
fn advance(&mut self) -> Result<(), Error>;
fn get(&self) -> Option<ItemRef<'_, K, V>>;
// TODO: remove this
fn discard_or_advance(&mut self) -> Result<(), Error>;
}
pub trait LayerIteratorMut<K, V>: LayerIterator<K, V> {
fn as_iterator_mut(&mut self) -> &mut dyn LayerIterator<K, V>;
fn as_iterator(&self) -> &dyn LayerIterator<K, V>;
fn erase(&mut self);
fn insert_before(&mut self, item: Item<K, V>);
}
type BoxedLayerIterator<'iter, K, V> = Box<dyn LayerIterator<K, V> + 'iter>;
pub trait Layer<K, V>: Send + Sync {
fn get_iterator(&self) -> BoxedLayerIterator<'_, K, V>;
}
pub trait MutableLayer<K, V>: Layer<K, V> {
fn as_layer(self: Arc<Self>) -> Arc<dyn Layer<K, V>>;
fn dump(&self);
fn insert(&self, item: Item<K, V>);
fn replace_range(&self, item: Item<K, V>, lower_bound: &K, merge_fn: merge::MergeFn<K, V>);
fn replace_or_insert(&self, item: Item<K, V>);
}
struct Data<K, V> {
mutable_layer: Arc<dyn MutableLayer<K, V>>,
layers: Vec<Arc<dyn Layer<K, V>>>,
}
pub struct LSMTree<K, V> {
data: RwLock<Data<K, V>>,
merge_fn: merge::MergeFn<K, V>,
}
pub struct LSMTreeIter<'iter, K, V> {
_layers: Box<[Arc<dyn Layer<K, V> + 'iter>]>,
merger: merge::Merger<'iter, K, V>,
}
impl<
'iter,
K: std::fmt::Debug + OrdLowerBound + Unpin + 'static,
V: std::fmt::Debug + Unpin + 'static,
> LSMTreeIter<'iter, K, V>
{
fn new(layers: Box<[Arc<dyn Layer<K, V> + 'iter>]>, merge_fn: merge::MergeFn<K, V>) -> Self {
let iterators: Box<[BoxedLayerIterator<'_, K, V>]> = layers
.iter()
.map(|x| {
let ptr = NonNull::from(x);
unsafe { &*ptr.as_ptr() }.get_iterator()
})
.collect();
LSMTreeIter { _layers: layers, merger: merge::Merger::new(iterators, merge_fn) }
}
pub fn get(&self) -> Option<ItemRef<'_, K, V>> {
self.merger.get()
}
pub fn advance(&mut self) -> Result<(), Error> {
self.merger.advance()
}
pub fn advance_to(&mut self, key: &K) -> Result<(), Error> {
self.merger.advance_to(key)
}
}
impl<'tree, K: Key, V: Value> LSMTree<K, V>
where
for<'de> K: serde::Deserialize<'de>,
for<'de> V: serde::Deserialize<'de>,
{
pub fn new(merge_fn: merge::MergeFn<K, V>) -> Self {
LSMTree {
data: RwLock::new(Data {
mutable_layer: Arc::new(skip_list_layer::SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)),
layers: Vec::new(),
}),
merge_fn,
}
}
fn layers_from_handles(
handles: Box<[impl ObjectHandle + 'static]>,
) -> Vec<Arc<dyn Layer<K, V>>> {
handles
.into_vec()
.drain(..)
.map(|h| {
Arc::new(simple_persistent_layer::SimplePersistentLayer::new(h, 512))
as Arc<dyn Layer<K, V>>
})
.collect()
}
pub fn open(
merge_fn: merge::MergeFn<K, V>,
handles: Box<[impl ObjectHandle + 'static]>,
) -> Self {
LSMTree {
data: RwLock::new(Data {
mutable_layer: Arc::new(skip_list_layer::SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)),
layers: Self::layers_from_handles(handles),
}),
merge_fn,
}
}
pub fn set_layers(&self, handles: Box<[impl ObjectHandle + 'static]>) {
let mut data = self.data.write().unwrap();
data.layers = Self::layers_from_handles(handles);
}
fn add_all_layers<'a>(&'a self, all_layers: &mut Vec<Arc<dyn Layer<K, V>>>) {
let data = self.data.read().unwrap();
all_layers.push(data.mutable_layer.clone().as_layer().into());
for layer in &data.layers {
all_layers.push(layer.clone().into());
}
}
pub fn dump_mutable_layer(&self) {
let data = self.data.read().unwrap();
data.mutable_layer.dump();
}
// TODO(csuter): This should maybe run on a different thread. Depends when it's
// called.
// Writes the current memory layer to the given object handle and creates a new one.
pub fn commit<'a>(
&'a self,
mut object_handle: impl ObjectHandle + 'static,
) -> Result<(), Error> {
let mut layers = Vec::new();
self.add_all_layers(&mut layers);
{
let mut data = self.data.write().unwrap();
data.mutable_layer =
Arc::new(skip_list_layer::SkipListLayer::new(SKIP_LIST_LAYER_ITEMS));
data.layers = layers.clone();
}
// TODO: optimize for the case where the mutable layer is empty.
{
let mut writer = SimplePersistentLayerWriter::new(&mut object_handle, 512);
let iterators = layers.iter().map(|x| x.get_iterator()).collect();
let mut merger = merge::Merger::new(iterators, self.merge_fn);
merger.advance()?;
while let Some(item_ref) = merger.get() {
eprintln!("{:?}", item_ref);
writer.write(item_ref)?;
merger.advance()?;
}
writer.close()?;
}
{
let mut data = self.data.write().unwrap();
data.layers = vec![Arc::new(simple_persistent_layer::SimplePersistentLayer::new(
object_handle,
512,
))];
}
Ok(())
}
pub fn iter(&self) -> LSMTreeIter<'tree, K, V> {
let mut layers = Vec::new();
self.add_all_layers(&mut layers);
LSMTreeIter::new(layers.into_boxed_slice(), self.merge_fn)
}
pub fn iter_with_layers(
&self,
mut layers: Vec<Arc<dyn Layer<K, V>>>,
) -> LSMTreeIter<'tree, K, V> {
self.add_all_layers(&mut layers);
LSMTreeIter::new(layers.into_boxed_slice(), self.merge_fn)
}
pub fn insert(&self, item: Item<K, V>) {
let mut data = self.data.write().unwrap(); // TODO: probably should be read and clone.
Arc::get_mut(&mut data.mutable_layer).unwrap().insert(item); // TODO: is get_mut call always safe?
}
pub fn replace_or_insert(&self, item: Item<K, V>) {
let mut data = self.data.write().unwrap();
Arc::get_mut(&mut data.mutable_layer).unwrap().replace_or_insert(item); // TODO
}
pub fn replace_range(&self, item: Item<K, V>, lower_bound: &K) {
let mut data = self.data.write().unwrap();
Arc::get_mut(&mut data.mutable_layer).unwrap().replace_range(
item,
lower_bound,
self.merge_fn,
);
}
pub fn range_from(
&self,
bound: std::ops::Bound<&K>,
) -> Result<LSMTreeIter<'tree, K, V>, Error> {
let mut layers = Vec::new();
self.add_all_layers(&mut layers);
let mut iter = LSMTreeIter::new(layers.into_boxed_slice(), self.merge_fn);
match bound {
Bound::Unbounded => iter.merger.advance()?,
Bound::Included(key) => iter.merger.advance_to(key)?,
Bound::Excluded(_) => panic!("Excluded bounds not supported!"),
};
Ok(iter)
}
pub fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
where
K: Clone,
V: Clone,
{
let mut layers = Vec::new();
self.add_all_layers(&mut layers);
let iterators = layers.iter().map(|x| x.get_iterator()).collect();
let mut merger = merge::Merger::new(iterators, self.merge_fn);
merger.advance_to(search_key)?;
Ok(match merger.get() {
Some(ItemRef { key, value }) => {
if key == search_key {
Some(Item { key: key.clone(), value: value.clone() })
} else {
None
}
}
_ => None,
})
}
}