blob: aecf65e7f1379636f9d00ad4360358abe9fe54c2 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
lsm_tree::merge,
object_handle::ReadObjectHandle,
serialized_types::{Version, Versioned, VersionedLatest},
},
anyhow::Error,
async_trait::async_trait,
async_utils::event::Event,
serde::{Deserialize, Serialize},
std::{fmt::Debug, sync::Arc},
};
/// Keys and values need to implement the following traits. For merging, they need to implement
/// MergeableKey. TODO: Use trait_alias when available.
pub trait Key:
Clone
+ OrdUpperBound
+ Send
+ Sync
+ Versioned
+ VersionedLatest
+ Debug
+ std::marker::Unpin
+ 'static
{
}
pub trait RangeKey: Key {
/// Returns if two keys overlap.
fn overlaps(&self, other: &Self) -> bool;
}
impl<K> Key for K where
K: Clone
+ OrdUpperBound
+ Send
+ Sync
+ Versioned
+ VersionedLatest
+ Debug
+ std::marker::Unpin
+ 'static
{
}
pub trait MergeableKey: Key + Eq + NextKey + OrdLowerBound {}
impl<K> MergeableKey for K where K: Key + Eq + NextKey + OrdLowerBound {}
pub trait Value:
Clone + Send + Sync + Versioned + VersionedLatest + Debug + std::marker::Unpin + 'static
{
}
impl<V> Value for V where
V: Clone + Send + Sync + Versioned + VersionedLatest + Debug + std::marker::Unpin + 'static
{
}
/// ItemRef is a struct that contains references to key and value, which is useful since in many
/// cases since keys and values are stored separately so &Item is not possible.
#[derive(Debug, Serialize)]
pub struct ItemRef<'a, K, V> {
pub key: &'a K,
pub value: &'a V,
pub sequence: u64,
}
impl<K: PartialEq, V: PartialEq> PartialEq for ItemRef<'_, K, V> {
fn eq(&self, other: &Self) -> bool {
self.key == other.key && self.value == other.value
}
}
impl<K: PartialEq, V: PartialEq> Eq for ItemRef<'_, K, V> {}
impl<K: Clone, V: Clone> ItemRef<'_, K, V> {
pub fn cloned(&self) -> Item<K, V> {
Item { key: self.key.clone(), value: self.value.clone(), sequence: self.sequence }
}
}
impl<'a, K, V> Clone for ItemRef<'a, K, V> {
fn clone(&self) -> Self {
ItemRef { key: self.key, value: self.value, sequence: self.sequence }
}
}
impl<'a, K, V> Copy for ItemRef<'a, K, V> {}
/// Item is a struct that combines a key and a value.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Item<K, V> {
pub key: K,
pub value: V,
/// |sequence| is a monotonically increasing sequence number for the Item, which is set when the
/// Item is inserted into the tree. In practice, this is the journal file offset at the time of
/// committing the transaction containing the Item. Note that two or more Items may share the
/// same |sequence|.
pub sequence: u64,
}
impl<K: PartialEq, V: PartialEq> PartialEq for Item<K, V> {
fn eq(&self, other: &Self) -> bool {
self.key == other.key && self.value == other.value
}
}
impl<K: PartialEq, V: PartialEq> Eq for Item<K, V> {}
impl<K, V> Item<K, V> {
pub fn new(key: K, value: V) -> Item<K, V> {
Item { key, value, sequence: 0u64 }
}
pub fn new_with_sequence(key: K, value: V, sequence: u64) -> Item<K, V> {
Item { key, value, sequence }
}
pub fn as_item_ref(&self) -> ItemRef<'_, K, V> {
self.into()
}
}
impl<'a, K, V> From<&'a Item<K, V>> for ItemRef<'a, K, V> {
fn from(item: &'a Item<K, V>) -> ItemRef<'a, K, V> {
ItemRef { key: &item.key, value: &item.value, sequence: item.sequence }
}
}
/// The find functions will return items with keys that are greater-than or equal to the search key,
/// so for keys that are like extents, the keys should sort (via OrdUpperBound) using the end
/// of their ranges, and you should set the search key accordingly.
///
/// For example, let's say the tree holds extents 100..200, 200..250 and you want to perform a read
/// for range 150..250, you should search for 0..151 which will first return the extent 100..200
/// (and then the iterator can be advanced to 200..250 after). When merging, keys can overlap, so
/// consider the case where we want to merge an extent with range 100..300 with an existing extent
/// of 200..250. In that case, we want to treat the extent with range 100..300 as lower than the key
/// 200..250 because we'll likely want to split the extents (e.g. perhaps we want 100..200,
/// 200..250, 250..300), so for merging, we need to use a different comparison function and we deal
/// with that using the OrdLowerBound trait.
///
/// If your keys don't have overlapping ranges that need to be merged, then these can be the same as
/// std::cmp::Ord (use the DefaultOrdUpperBound and DefaultOrdLowerBound traits).
pub trait OrdUpperBound {
fn cmp_upper_bound(&self, other: &Self) -> std::cmp::Ordering;
}
pub trait DefaultOrdUpperBound: OrdUpperBound + Ord {}
impl<T: DefaultOrdUpperBound> OrdUpperBound for T {
fn cmp_upper_bound(&self, other: &Self) -> std::cmp::Ordering {
// Default to using cmp.
self.cmp(other)
}
}
pub trait OrdLowerBound {
fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering;
}
pub trait DefaultOrdLowerBound: OrdLowerBound + Ord {}
impl<T: DefaultOrdLowerBound> OrdLowerBound for T {
fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
// Default to using cmp.
self.cmp(other)
}
}
/// The NextKey trait allows for an optimisation which allows the merger to avoid querying a layer
/// if it knows it has found the next possible key. Consider the following example showing two
/// layers with range based keys.
///
/// +----------+------------+
/// 0 | 0..100 | 100..200 |
/// +----------+------------+
/// 1 | 100..200 |
/// +------------+
///
/// If you search and find the 0..100 key, then only layer 0 will be touched. If you then want to
/// advance to the 100..200 record, you can find it in layer 0 but unless you know that it
/// immediately follows the 0..100 key i.e. that there is no possible key, K, such that 0..100 < K <
/// 100..200, the merger has to consult all other layers to check. next_key should return a key, N,
/// such that if the merger encounters a key that is <= N (using OrdLowerBound), it can stop
/// touching more layers. The key N should also be the the key to search for in other layers if the
/// merger needs to do so. In the example above, it should be a key that is > 0..100 and 99..100,
/// but <= 100..200 (using OrdUpperBound). In practice, what this means is that for range based
/// keys, OrdUpperBound should use the end of the range, OrdLowerBound should use the start of the
/// range, and next_key should return end..end + 1. This is purely an optimisation; the default
/// will be correct but not performant.
pub trait NextKey: Clone {
fn next_key(&self) -> Option<Self> {
None
}
}
/// Layer is a trait that all layers need to implement (mutable and immutable).
#[async_trait]
pub trait Layer<K, V>: Send + Sync {
fn handle(&self) -> Option<&dyn ReadObjectHandle> {
None
}
/// Searches for a key. Bound::Excluded is not supported. Bound::Unbounded positions the
/// iterator on the first item in the layer.
async fn seek(&self, bound: std::ops::Bound<&K>)
-> Result<BoxedLayerIterator<'_, K, V>, Error>;
/// Locks the layer preventing it from being closed. This will never block i.e. there can be
/// many locks concurrently. The lock is purely advisory: seek will still work even if lock has
/// not been called; it merely causes close to wait until all locks are released. Returns None
/// if close has been called for the layer.
fn lock(&self) -> Option<Event>;
/// Waits for existing locks readers to finish and then returns. Subsequent calls to lock will
/// return None.
async fn close(&self);
/// Returns the version number used by structs in this layer
fn get_version(&self) -> Version;
}
/// MutableLayer is a trait that only mutable layers need to implement.
#[async_trait]
pub trait MutableLayer<K, V>: Layer<K, V> {
fn as_layer(self: Arc<Self>) -> Arc<dyn Layer<K, V>>;
/// Inserts the given item into the layer. The item *must* not already exist.
async fn insert(&self, item: Item<K, V>);
/// Merges the given item into the layer. `lower_bound` is the key to search for that should
/// provide the first potential item to be merged with.
async fn merge_into(&self, item: Item<K, V>, lower_bound: &K, merge_fn: merge::MergeFn<K, V>);
/// Inserts or replaces an item.
async fn replace_or_insert(&self, item: Item<K, V>);
/// Returns the number of items in the layer.
fn len(&self) -> usize;
}
/// Something that implements LayerIterator is returned by the seek function.
#[async_trait]
pub trait LayerIterator<K, V>: Send + Sync {
/// Advances the iterator.
async fn advance(&mut self) -> Result<(), Error>;
/// Returns the current item. This will be None if called when the iterator is first crated i.e.
/// before either seek or advance has been called, and None if the iterator has reached the end
/// of the layer.
fn get(&self) -> Option<ItemRef<'_, K, V>>;
}
pub type BoxedLayerIterator<'iter, K, V> = Box<dyn LayerIterator<K, V> + 'iter>;
#[async_trait]
pub trait LayerIteratorFilter<'a, K, V> {
/// Mirrors std::iter::Iterator::filter.
async fn filter<P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync>(
self,
predicate: P,
) -> Result<Filter<'a, K, V, P>, Error>;
}
#[async_trait]
impl<'a, K, V> LayerIteratorFilter<'a, K, V> for BoxedLayerIterator<'a, K, V> {
async fn filter<P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync>(
self,
predicate: P,
) -> Result<Filter<'a, K, V, P>, Error> {
Filter::new(self, predicate).await
}
}
#[async_trait]
impl<'iter, K, V> LayerIterator<K, V> for BoxedLayerIterator<'iter, K, V> {
async fn advance(&mut self) -> Result<(), Error> {
(**self).advance().await
}
fn get(&self) -> Option<ItemRef<'_, K, V>> {
(**self).get()
}
}
/// Mutable layers need an iterator that implements this in order to make merge_into work.
#[async_trait]
pub(super) trait LayerIteratorMut<K, V>: LayerIterator<K, V> {
/// Casts to super-traits.
fn as_iterator_mut(&mut self) -> &mut dyn LayerIterator<K, V>;
fn as_iterator(&self) -> &dyn LayerIterator<K, V>;
/// Erases the item that the iterator is currently pointing at. Afterwards, the iterator will
/// be pointing at the item that follows.
fn erase(&mut self);
/// Inserts the given item immediately prior to the item the iterator is currently pointing at.
fn insert(&mut self, item: Item<K, V>);
/// Commits changes and waits for any existing readers to finish.
async fn commit_and_wait(&mut self);
}
/// Trait for writing new layers.
#[async_trait]
pub trait LayerWriter<K, V>
where
K: Debug + Send + Versioned + Sync,
V: Debug + Send + Versioned + Sync,
{
/// Writes the given item to this layer.
async fn write(&mut self, item: ItemRef<'_, K, V>) -> Result<(), Error>;
/// Flushes any buffered items to the backing storage.
async fn flush(&mut self) -> Result<(), Error>;
}
/// A helper trait that converts arrays of layers into arrays of references to layers.
pub trait IntoLayerRefs<'a, K, V, T: AsRef<U> + 'a, U: ?Sized>
where
Self: IntoIterator<Item = &'a T>,
{
fn into_layer_refs(self) -> Box<[&'a dyn Layer<K, V>]>;
}
// Generic implementation where we need the cast to &dyn Layer.
impl<'a, K, V, T: AsRef<U>, U: Layer<K, V> + 'a> IntoLayerRefs<'a, K, V, T, U> for &'a [T] {
fn into_layer_refs(self) -> Box<[&'a dyn Layer<K, V>]> {
let refs: Vec<_> = self.iter().map(|x| x.as_ref() as &dyn Layer<K, V>).collect();
refs.into_boxed_slice()
}
}
// Generic implementation where we already have &dyn Layer.
impl<'a, K, V, T: AsRef<dyn Layer<K, V>>> IntoLayerRefs<'a, K, V, T, dyn Layer<K, V>>
for &'a [T]
{
fn into_layer_refs(self) -> Box<[&'a dyn Layer<K, V>]> {
let refs: Vec<_> = self.iter().map(|x| x.as_ref()).collect();
refs.into_boxed_slice()
}
}
pub struct Filter<'a, K, V, P> {
iter: BoxedLayerIterator<'a, K, V>,
predicate: P,
}
impl<'a, K, V, P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync> Filter<'a, K, V, P> {
async fn new(
iter: BoxedLayerIterator<'a, K, V>,
predicate: P,
) -> Result<Filter<'a, K, V, P>, Error> {
let mut filter = Filter { iter, predicate };
filter.skip_filtered().await?;
Ok(filter)
}
async fn skip_filtered(&mut self) -> Result<(), Error> {
loop {
match self.iter.get() {
Some(item) if !(self.predicate)(item) => {}
_ => return Ok(()),
}
self.iter.advance().await?;
}
}
}
#[async_trait]
impl<K: Send + Sync, V: Send + Sync, P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync>
LayerIterator<K, V> for Filter<'_, K, V, P>
{
async fn advance(&mut self) -> Result<(), Error> {
self.iter.advance().await?;
self.skip_filtered().await
}
fn get(&self) -> Option<ItemRef<'_, K, V>> {
self.iter.get()
}
}