blob: d322612219568ab6da3973c70603a0c177f1c3aa [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
pub mod merge;
pub mod simple_persistent_layer;
pub mod skip_list_layer;
pub mod types;
use {
crate::{
log::*,
object_handle::{ReadObjectHandle, WriteBytes, WriteObjectHandle, Writer},
serialized_types::{Version, LATEST_VERSION},
trace_duration,
},
anyhow::Error,
async_utils::event::Event,
simple_persistent_layer::SimplePersistentLayerWriter,
std::{
fmt,
ops::Bound,
sync::{Arc, RwLock},
},
types::{
IntoLayerRefs, Item, ItemRef, Key, Layer, LayerIterator, LayerWriter, MergeableKey,
MutableLayer, NextKey, OrdLowerBound, Value,
},
};
const SKIP_LIST_LAYER_ITEMS: usize = 512;
// For serialization.
pub use simple_persistent_layer::LayerInfo;
pub async fn layers_from_handles<K: Key, V: Value>(
handles: Box<[impl ReadObjectHandle + 'static]>,
) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
let mut layers = Vec::new();
for handle in Vec::from(handles) {
layers.push(
simple_persistent_layer::SimplePersistentLayer::open(handle).await?
as Arc<dyn Layer<K, V>>,
);
}
Ok(layers)
}
#[derive(Eq, PartialEq, Debug)]
pub enum Operation {
Insert,
ReplaceOrInsert,
MergeInto,
}
pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
struct Inner<K, V> {
// The Event allows us to wait for any impending mutations to complete. See the seal method
// below.
mutable_layer: (Event, Arc<dyn MutableLayer<K, V>>),
layers: Vec<Arc<dyn Layer<K, V>>>,
mutation_callback: MutationCallback<K, V>,
}
/// LSMTree manages a tree of layers to provide a key/value store. Each layer contains deltas on
/// the preceding layer. The top layer is an in-memory mutable layer. Layers can be compacted to
/// form a new combined layer.
pub struct LSMTree<K, V> {
data: RwLock<Inner<K, V>>,
merge_fn: merge::MergeFn<K, V>,
}
impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
/// Creates a new empty tree.
pub fn new(merge_fn: merge::MergeFn<K, V>) -> Self {
LSMTree {
data: RwLock::new(Inner {
mutable_layer: (
Event::new(),
skip_list_layer::SkipListLayer::new(SKIP_LIST_LAYER_ITEMS),
),
layers: Vec::new(),
mutation_callback: None,
}),
merge_fn,
}
}
/// Opens an existing tree from the provided handles to the layer objects.
pub async fn open(
merge_fn: merge::MergeFn<K, V>,
handles: Box<[impl ReadObjectHandle + 'static]>,
) -> Result<Self, Error> {
Ok(LSMTree {
data: RwLock::new(Inner {
mutable_layer: (
Event::new(),
skip_list_layer::SkipListLayer::new(SKIP_LIST_LAYER_ITEMS),
),
layers: layers_from_handles(handles).await?,
mutation_callback: None,
}),
merge_fn,
})
}
/// Replaces the immutable layers.
pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
self.data.write().unwrap().layers = layers;
}
/// Appends to the given layers at the end i.e. they should be base layers. This is supposed
/// to be used after replay when we are opening a tree and we have discovered the base layers.
pub async fn append_layers(
&self,
handles: Box<[impl ReadObjectHandle + 'static]>,
) -> Result<(), Error> {
let mut layers = layers_from_handles(handles).await?;
self.data.write().unwrap().layers.append(&mut layers);
Ok(())
}
/// Resets the immutable layers.
pub fn reset_immutable_layers(&self) {
self.data.write().unwrap().layers = Vec::new();
}
/// Seals the current mutable layer and creates a new one.
pub async fn seal(&self) {
{
let mut data = self.data.write().unwrap();
let (event, layer) = std::mem::replace(
&mut data.mutable_layer,
(Event::new(), skip_list_layer::SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)),
);
data.layers.insert(0, layer.as_layer());
// Before we return, we must wait for any mutations to the old mutable layer to complete
// and that's done by waiting for the event to be dropped and ensuring that the event is
// cloned whenever we plan to mutate the layer.
event.wait_or_dropped()
}
.await
.unwrap_err(); // wait_or_dropped returns Result<(), Dropped>
}
/// Writes the items yielded by the iterator into the supplied object.
pub async fn compact_with_iterator<W: WriteBytes + Send>(
&self,
mut iterator: impl LayerIterator<K, V>,
writer: W,
block_size: u64,
) -> Result<(), Error> {
trace_duration!("LSMTree::compact_with_iterator");
let mut writer = SimplePersistentLayerWriter::<W, K, V>::new(writer, block_size).await?;
while let Some(item_ref) = iterator.get() {
debug!(?item_ref, "compact: writing");
writer.write(item_ref).await?;
iterator.advance().await?;
}
writer.flush().await
}
/// Compacts all the immutable layers.
pub async fn compact(&self, object_handle: &impl WriteObjectHandle) -> Result<(), Error> {
let layer_set = self.immutable_layer_set();
let mut merger = layer_set.merger();
let iter = merger.seek(Bound::Unbounded).await?;
self.compact_with_iterator(iter, Writer::new(object_handle), object_handle.block_size())
.await
}
/// Returns an empty layer-set for this tree.
pub fn empty_layer_set(&self) -> LayerSet<K, V> {
LayerSet { layers: Vec::new(), merge_fn: self.merge_fn }
}
/// Adds all the layers (including the mutable layer) to `layer_set`.
pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
let data = self.data.read().unwrap();
layer_set.layers.push(data.mutable_layer.1.clone().as_layer().into());
for layer in &data.layers {
layer_set.layers.push(layer.clone().into());
}
}
/// Returns a clone of the current set of layers (including the mutable layer), after which one
/// can get an iterator.
pub fn layer_set(&self) -> LayerSet<K, V> {
let mut layer_set = self.empty_layer_set();
self.add_all_layers_to_layer_set(&mut layer_set);
layer_set
}
/// Returns the current set of immutable layers after which one can get an iterator (for e.g.
/// compacting). Since these layers are immutable, getting an iterator should not block
/// anything else.
pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
let mut layers = Vec::new();
{
let data = self.data.read().unwrap();
for layer in &data.layers {
layers.push(layer.clone().into());
}
}
LayerSet { layers, merge_fn: self.merge_fn }
}
/// Inserts an item into the mutable layer. Behaviour is undefined if the item already exists.
pub async fn insert(&self, item: Item<K, V>) {
let (_event, mutable_layer) = {
let data = self.data.read().unwrap();
if let Some(mutation_callback) = data.mutation_callback.as_ref() {
mutation_callback(Operation::Insert, &item);
}
data.mutable_layer.clone()
};
mutable_layer.insert(item).await;
}
/// Replaces or inserts an item into the mutable layer.
pub async fn replace_or_insert(&self, item: Item<K, V>) {
let (_event, mutable_layer) = {
let data = self.data.read().unwrap();
if let Some(mutation_callback) = data.mutation_callback.as_ref() {
mutation_callback(Operation::ReplaceOrInsert, &item);
}
data.mutable_layer.clone()
};
mutable_layer.replace_or_insert(item).await;
}
/// Merges the given item into the mutable layer.
pub async fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
let (_event, mutable_layer) = {
let data = self.data.read().unwrap();
if let Some(mutation_callback) = data.mutation_callback.as_ref() {
mutation_callback(Operation::MergeInto, &item);
}
data.mutable_layer.clone()
};
mutable_layer.merge_into(item, lower_bound, self.merge_fn).await
}
/// Searches for an exact match for the given key.
pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
where
K: Eq,
{
let layer_set = self.layer_set();
let mut merger = layer_set.merger();
let iter = merger.seek(Bound::Included(search_key)).await?;
Ok(match iter.get() {
Some(ItemRef { key, value, sequence }) if key == search_key => {
Some(Item { key: key.clone(), value: value.clone(), sequence })
}
_ => None,
})
}
pub fn mutable_layer(&self) -> Arc<dyn MutableLayer<K, V>> {
self.data.read().unwrap().mutable_layer.1.clone()
}
/// Sets a mutation callback which is a callback that is triggered whenever any mutations are
/// applied to the tree. This might be useful for tests that want to record the precise
/// sequence of mutations that are applied to the tree.
pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
self.data.write().unwrap().mutation_callback = mutation_callback;
}
/// Returns the earliest version used by a layer in the tree.
pub fn get_earliest_version(&self) -> Version {
let mut earliest_version = LATEST_VERSION;
for layer in self.layer_set().layers {
let layer_version = layer.get_version();
if layer_version < earliest_version {
earliest_version = layer_version;
}
}
return earliest_version;
}
}
/// This is an RAII wrapper for a layer which holds a lock on the layer (via the Layer::lock
/// method).
pub struct LockedLayer<K, V>(Event, Arc<dyn Layer<K, V>>);
impl<K, V> LockedLayer<K, V> {
pub async fn close_layer(self) {
let layer = self.1;
std::mem::drop(self.0);
layer.close().await;
}
}
impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
let event = layer.lock().unwrap();
Self(event, layer)
}
}
impl<K, V> std::ops::Deref for LockedLayer<K, V> {
type Target = Arc<dyn Layer<K, V>>;
fn deref(&self) -> &Self::Target {
&self.1
}
}
impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
self.1.as_ref()
}
}
/// A LayerSet provides a snapshot of the layers at a particular point in time, and allows you to
/// get an iterator. Iterators borrow the layers so something needs to hold reference count.
pub struct LayerSet<K, V> {
pub layers: Vec<LockedLayer<K, V>>,
merge_fn: merge::MergeFn<K, V>,
}
impl<K: Key + NextKey + OrdLowerBound, V: Value> LayerSet<K, V> {
pub fn merger(&self) -> merge::Merger<'_, K, V> {
merge::Merger::new(&self.layers.as_slice().into_layer_refs(), self.merge_fn)
}
}
impl<K, V> fmt::Debug for LayerSet<K, V> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_list()
.entries(self.layers.iter().map(|l| {
if let Some(handle) = l.handle() {
format!("{}", handle.object_id())
} else {
format!("{:?}", Arc::as_ptr(l))
}
}))
.finish()
}
}
#[cfg(test)]
mod tests {
use {
super::LSMTree,
crate::{
lsm_tree::{
layers_from_handles,
merge::{MergeLayerIterator, MergeResult},
types::{
Item, ItemRef, LayerIterator, LayerIteratorFilter, NextKey, OrdLowerBound,
OrdUpperBound,
},
},
serialized_types::{
versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
},
testing::fake_object::{FakeObject, FakeObjectHandle},
},
fuchsia_async as fasync,
std::{ops::Bound, sync::Arc},
};
#[derive(Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize, Versioned)]
struct TestKey(std::ops::Range<u64>);
versioned_type! { 1.. => TestKey }
impl NextKey for TestKey {}
impl OrdUpperBound for TestKey {
fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
self.0.end.cmp(&other.0.end)
}
}
impl OrdLowerBound for TestKey {
fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
self.0.start.cmp(&other.0.start)
}
}
fn emit_left_merge_fn(
_left: &MergeLayerIterator<'_, TestKey, u64>,
_right: &MergeLayerIterator<'_, TestKey, u64>,
) -> MergeResult<TestKey, u64> {
MergeResult::EmitLeft
}
#[fasync::run_singlethreaded(test)]
async fn test_iteration() {
let tree = LSMTree::new(emit_left_merge_fn);
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
tree.insert(items[0].clone()).await;
tree.insert(items[1].clone()).await;
let layers = tree.layer_set();
let mut merger = layers.merger();
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.expect("advance failed");
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
iter.advance().await.expect("advance failed");
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_compact() {
let tree = LSMTree::new(emit_left_merge_fn);
let items = [
Item::new(TestKey(1..1), 1),
Item::new(TestKey(2..2), 2),
Item::new(TestKey(3..3), 3),
Item::new(TestKey(4..4), 4),
];
tree.insert(items[0].clone()).await;
tree.insert(items[1].clone()).await;
tree.seal().await;
tree.insert(items[2].clone()).await;
tree.insert(items[3].clone()).await;
tree.seal().await;
let object = Arc::new(FakeObject::new());
let handle = FakeObjectHandle::new(object.clone());
tree.compact(&handle).await.expect("compact failed");
tree.set_layers(
layers_from_handles(Box::new([handle])).await.expect("layers_from_handles failed"),
);
let handle = FakeObjectHandle::new(object.clone());
let tree = LSMTree::open(emit_left_merge_fn, [handle].into()).await.expect("open failed");
let layers = tree.layer_set();
let mut merger = layers.merger();
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
for i in 1..5 {
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(i..i), &i));
iter.advance().await.expect("advance failed");
}
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_find() {
let items = [
Item::new(TestKey(1..1), 1),
Item::new(TestKey(2..2), 2),
Item::new(TestKey(3..3), 3),
Item::new(TestKey(4..4), 4),
];
let tree = LSMTree::new(emit_left_merge_fn);
tree.insert(items[0].clone()).await;
tree.insert(items[1].clone()).await;
tree.seal().await;
tree.insert(items[2].clone()).await;
tree.insert(items[3].clone()).await;
let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
assert_eq!(item, items[1]);
assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_empty_seal() {
let tree = LSMTree::new(emit_left_merge_fn);
tree.seal().await;
let item = Item::new(TestKey(1..1), 1);
tree.insert(item.clone()).await;
let object = Arc::new(FakeObject::new());
let handle = FakeObjectHandle::new(object.clone());
tree.compact(&handle).await.expect("compact failed");
tree.set_layers(
layers_from_handles(Box::new([handle])).await.expect("layers_from_handles failed"),
);
let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
assert_eq!(found_item, item);
assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_filter() {
let items = [
Item::new(TestKey(1..1), 1),
Item::new(TestKey(2..2), 2),
Item::new(TestKey(3..3), 3),
Item::new(TestKey(4..4), 4),
];
let tree = LSMTree::new(emit_left_merge_fn);
tree.insert(items[0].clone()).await;
tree.insert(items[1].clone()).await;
tree.insert(items[2].clone()).await;
tree.insert(items[3].clone()).await;
let layers = tree.layer_set();
let mut merger = layers.merger();
// Filter out odd keys (which also guarantees we skip the first key which is an edge case).
let mut iter = (Box::new(merger.seek(Bound::Unbounded).await.expect("seek failed"))
as Box<dyn LayerIterator<_, _>>)
.filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
.await
.expect("filter failed");
assert_eq!(iter.get(), Some(items[1].as_item_ref()));
iter.advance().await.expect("advance failed");
assert_eq!(iter.get(), Some(items[3].as_item_ref()));
iter.advance().await.expect("advance failed");
assert!(iter.get().is_none());
}
}