blob: 79f066a463a2fc74a004a731d13c645ae12fd13d [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::lsm_tree::types::{
Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut, OrdLowerBound, Value,
},
anyhow::Error,
async_trait::async_trait,
futures::try_join,
std::{cmp::Ordering, collections::BinaryHeap, convert::From, fmt::Debug, ops::Bound},
};
#[derive(Debug, Eq, PartialEq)]
pub enum ItemOp<K, V> {
/// Keeps the item to be presented to the merger subsequently with a new merge pair.
Keep,
/// Discards the item and moves on to the next item in the respective layer.
Discard,
/// Replaces the item with something new which will be presented to the merger subsequently with
/// a new pair.
Replace(Item<K, V>),
}
#[derive(Debug, Eq, PartialEq)]
pub enum MergeResult<K, V> {
/// Emits the left item unchanged. Keeps the right item. This is the common case. Once an item
/// has been emitted, it will never be seen again by the merge function.
EmitLeft,
/// All other merge results are covered by the following. Take care when replacing items
/// that you replace the correct item. The merger will never merge two items together from
/// the same layer. Consider the following scenario:
///
/// +-----------+ +-----------+
/// 0: | A | | C |
/// +-----------+--------------+-----------+
/// 1: | B |
/// +--------------+
///
/// Let's say that all three items can be merged together. The merge function will first be
/// presented with items A and B, at which point it has the option of replacing the left item
/// (i.e. A, in layer 0) or the right item (i.e. B in layer 1). However, if you replace the left
/// item, the merge function will not then be given the opportunity to merge it with C, so the
/// correct thing to do in this case is to replace the right item B in layer 1, and discard the
/// left item. A rule you can use is that you should avoid replacing an item with another item
/// whose upper bound exceeds that of the item you are replacing.
///
/// There are some combinations that might lead to infinite loops (e.g. None, Keep, Keep) and
/// should obviously be avoided.
Other { emit: Option<Item<K, V>>, left: ItemOp<K, V>, right: ItemOp<K, V> },
}
/// Users must provide a merge function which will take pairs of items, left and right, and return a
/// merge result. The left item's key will either be less than the right item's key, or if they are
/// the same, then the left item will be in a lower layer index (lower layer indexes indicate more
/// recent entries). The last remaining item is always emitted.
pub type MergeFn<K, V> =
fn(&MergeLayerIterator<'_, K, V>, &MergeLayerIterator<'_, K, V>) -> MergeResult<K, V>;
pub enum MergeItem<K, V> {
None,
Item(Item<K, V>),
Iter,
}
enum RawIterator<'a, K, V> {
None,
Const(Box<dyn LayerIterator<K, V> + 'a>),
Mut(Box<dyn LayerIteratorMut<K, V> + 'a>),
}
// An iterator that keeps track of where we are for each of the layers. We push these onto a
// min-heap.
pub struct MergeLayerIterator<'a, K, V> {
layer: Option<&'a dyn Layer<K, V>>,
// The underlying iterator.
iter: RawIterator<'a, K, V>,
// The index of the layer this is for.
pub layer_index: u16,
// The item we are currently pointing at.
item: MergeItem<K, V>,
}
impl<'a, K, V> MergeLayerIterator<'a, K, V> {
fn new(layer_index: u16, layer: &'a dyn Layer<K, V>) -> Self {
MergeLayerIterator {
layer: Some(layer),
iter: RawIterator::None,
layer_index,
item: MergeItem::None,
}
}
pub fn new_with_item(layer_index: u16, item: MergeItem<K, V>) -> Self {
MergeLayerIterator { layer: None, iter: RawIterator::None, layer_index, item }
}
pub fn item(&self) -> ItemRef<'_, K, V> {
match &self.item {
MergeItem::None => panic!("No item!"),
MergeItem::Item(ref item) => ItemRef::from(item),
MergeItem::Iter => self.iter().get().unwrap(),
}
}
pub fn key(&self) -> &K {
return self.item().key;
}
pub fn value(&self) -> &V {
return self.item().value;
}
fn iter(&self) -> &dyn LayerIterator<K, V> {
match &self.iter {
RawIterator::None => panic!("No iterator!"),
RawIterator::Const(iter) => iter.as_ref(),
RawIterator::Mut(iter) => iter.as_iterator(),
}
}
fn iter_mut(&mut self) -> &mut dyn LayerIterator<K, V> {
match &mut self.iter {
RawIterator::None => panic!("No iterator!"),
RawIterator::Const(iter) => iter.as_mut(),
RawIterator::Mut(iter) => iter.as_iterator_mut(),
}
}
fn set_item_from_iter(&mut self) {
self.item = {
if self.iter().get().is_none() {
MergeItem::None
} else {
match self.iter {
RawIterator::None => unreachable!(),
RawIterator::Const(_) => MergeItem::Iter,
RawIterator::Mut(_) => MergeItem::Iter,
}
}
}
}
fn take_item(&mut self) -> Option<Item<K, V>> {
if let MergeItem::Item(_) = self.item {
let mut item = MergeItem::None;
std::mem::swap(&mut self.item, &mut item);
if let MergeItem::Item(item) = item {
Some(item)
} else {
unreachable!();
}
} else {
None
}
}
async fn advance(&mut self) -> Result<(), Error> {
self.iter_mut().advance().await?;
self.set_item_from_iter();
Ok(())
}
fn replace(&mut self, item: Item<K, V>) {
self.item = MergeItem::Item(item);
}
fn is_some(&self) -> bool {
match self.item {
MergeItem::None => false,
_ => true,
}
}
// This function exists so that we can advance multiple iterators concurrently using, say,
// try_join!.
async fn maybe_discard(&mut self, op: &ItemOp<K, V>) -> Result<(), Error> {
if let ItemOp::Discard = op {
self.advance().await?;
}
Ok(())
}
fn erase(&mut self) {
if let RawIterator::Mut(iter) = &mut self.iter {
iter.erase();
} else {
panic!("No iterator!");
}
}
fn insert(&mut self, item: Item<K, V>) {
if let RawIterator::Mut(iter) = &mut self.iter {
iter.insert(item);
} else {
panic!("No iterator!");
}
}
async fn commit(&mut self) {
if let RawIterator::Mut(iter) = &mut self.iter {
iter.commit().await;
} else {
panic!("No iterator!");
}
}
}
// -- Ord and friends --
impl<K: OrdLowerBound, V> Ord for MergeLayerIterator<'_, K, V> {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse ordering because we want min-heap not max-heap.
other.key().cmp_lower_bound(self.key()).then(other.layer_index.cmp(&self.layer_index))
}
}
impl<K: OrdLowerBound, V> PartialOrd for MergeLayerIterator<'_, K, V> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
return Some(self.cmp(other));
}
}
impl<K: OrdLowerBound, V> PartialEq for MergeLayerIterator<'_, K, V> {
fn eq(&self, other: &Self) -> bool {
return self.cmp(other) == Ordering::Equal;
}
}
impl<K: OrdLowerBound, V> Eq for MergeLayerIterator<'_, K, V> {}
// As we merge items, the current item can be an item that has been replaced (and later emitted) by
// the merge function, or an item referenced by an iterator, or nothing.
enum CurrentItem<'a, 'b, K, V> {
None,
Item(Item<K, V>),
Iterator(&'a mut MergeLayerIterator<'b, K, V>),
}
impl<'a, 'b, K, V> CurrentItem<'a, 'b, K, V> {
// Takes the iterator if one is present and replaces the current item with None; otherwise,
// leaves the current item untouched.
fn take_iterator(&mut self) -> Option<&'a mut MergeLayerIterator<'b, K, V>> {
if let CurrentItem::Iterator(_) = self {
let mut result = CurrentItem::None;
std::mem::swap(self, &mut result);
if let CurrentItem::Iterator(iter) = result {
Some(iter)
} else {
unreachable!();
}
} else {
None
}
}
}
impl<'a, K, V> From<&'a CurrentItem<'_, '_, K, V>> for Option<ItemRef<'a, K, V>> {
fn from(iter: &'a CurrentItem<'_, '_, K, V>) -> Option<ItemRef<'a, K, V>> {
match iter {
CurrentItem::None => None,
CurrentItem::Iterator(iterator) => Some(iterator.item()),
CurrentItem::Item(item) => Some(item.into()),
}
}
}
/// Merger is the main entry point to merging.
pub struct Merger<'a, K, V> {
// A buffer containing all the MergeLayerIterator objects.
iterators: Vec<MergeLayerIterator<'a, K, V>>,
// The function to be used for merging items.
merge_fn: MergeFn<K, V>,
}
impl<'a, K: Debug + Ord + OrdLowerBound + Unpin + 'static, V: Debug + Unpin + 'static>
Merger<'a, K, V>
{
pub(super) fn new(layers: &[&'a dyn Layer<K, V>], merge_fn: MergeFn<K, V>) -> Merger<'a, K, V> {
Merger {
iterators: layers
.iter()
.enumerate()
.map(|(index, layer)| MergeLayerIterator::new(index as u16, *layer))
.collect(),
merge_fn: merge_fn,
}
}
/// Seek searches for |bound|. If |bound| is Bound::Unbounded, the iterator is positioned on
/// the first item. If |bound| is Bound::Included(key), the iterator is positioned on an item
/// such that item.key >= key. In the latter case, a full merge might not occur; only the
/// layers that need to be consulted to satisfy the query will occur, and afterwards,
/// advance_with_hint must be used rather advance if there's a need to move on to the next
/// element.
pub async fn seek(&mut self, bound: Bound<&K>) -> Result<MergerIterator<'_, 'a, K, V>, Error> {
let pending_iterators = self.iterators.iter_mut().rev().collect();
let mut merger_iter = MergerIterator {
merge_fn: self.merge_fn,
pending_iterators,
heap: BinaryHeap::new(),
item: CurrentItem::None,
must_use_advance_with_hint: false,
};
merger_iter.seek(bound).await?;
Ok(merger_iter)
}
}
/// This is an iterator that will allow iteration over merged layers. The primary interface is via
/// the LayerIterator trait.
pub struct MergerIterator<'a, 'b, K, V> {
merge_fn: MergeFn<K, V>,
// Iterators that we have not yet pushed onto the heap.
pending_iterators: Vec<&'a mut MergeLayerIterator<'b, K, V>>,
// A heap with the merge iterators.
heap: BinaryHeap<&'a mut MergeLayerIterator<'b, K, V>>,
// The current item.
item: CurrentItem<'a, 'b, K, V>,
// If seek(Bound::Included(_)) is used, then advance_with_hint should be used rather than
// advance, since that will be more performant. For now, we assert that this is the case so
// that users don't unintentionally use advance.
must_use_advance_with_hint: bool,
}
impl<
'a,
'b,
K: Debug + std::marker::Unpin + OrdLowerBound + 'static,
V: Debug + std::marker::Unpin + 'static,
> MergerIterator<'a, 'b, K, V>
{
async fn seek(&mut self, bound: Bound<&K>) -> Result<(), Error> {
match bound {
Bound::Unbounded => {
// Push all the iterators on.
for iter in self.pending_iterators.drain(..) {
iter.iter = RawIterator::Const(
iter.layer.as_ref().unwrap().seek(std::ops::Bound::Unbounded).await?,
);
iter.set_item_from_iter();
if iter.is_some() {
self.heap.push(iter);
}
}
self.advance_impl().await
}
Bound::Included(key) => {
self.must_use_advance_with_hint = true;
self.advance_with_hint(key).await
}
Bound::Excluded(_) => panic!("Excluded bounds not supported!"),
}
}
/// Advances the iterator to the next item, but will stop querying iterators when a key is
/// encountered that is <= |hint|, so it will not necessarily perform a merge with all base
/// layers. This function exists to allow more efficient point and range queries; if only the
/// top layer needs to be consulted, you will not pay the price of seeking in lower layers. If
/// new iterators need to be consulted, a search is done using std::cmp::Ord, so the hint should
/// be set accordingly i.e. if your keys are range based and you want to search for a key that
/// covers, say, 100..200, the hint should be ?..101 so that you find a key that is, say,
/// 50..101. Calling advance after calling advance_with_hint is undefined.
pub async fn advance_with_hint(&mut self, hint: &K) -> Result<(), Error> {
// Push the iterator for the current item (if we have one) onto the heap.
if let Some(iterator) = self.item.take_iterator() {
iterator.advance().await?;
if iterator.is_some() {
self.heap.push(iterator);
}
}
// If the lower bound of the next item is > hint, add more iterators.
while !self.pending_iterators.is_empty()
&& (self.heap.is_empty()
|| self.heap.peek().unwrap().key().cmp_lower_bound(&hint) == Ordering::Greater)
{
let iter = self.pending_iterators.pop().unwrap();
iter.iter = RawIterator::Const(
iter.layer.as_ref().unwrap().seek(std::ops::Bound::Included(hint)).await?,
);
iter.set_item_from_iter();
if iter.is_some() {
self.heap.push(iter);
}
}
// Call advance to do the merge.
self.advance_impl().await
}
// Merges items from an array of layers using the provided merge function. The merge function
// is repeatedly provided the lowest and the second lowest element, if one exists. In cases
// where the two lowest elements compare equal, the element with the lowest layer
// (i.e. whichever comes first in the layers array) will come first.
async fn advance_impl(&mut self) -> Result<(), Error> {
// Push the iterator for the current item (if we have one) onto the heap.
if let Some(iterator) = self.item.take_iterator() {
iterator.advance().await?;
if iterator.is_some() {
self.heap.push(iterator);
}
}
while !self.heap.is_empty() {
let lowest = self.heap.pop().unwrap();
let maybe_second_lowest = self.heap.pop();
if let Some(second_lowest) = maybe_second_lowest {
let result = (self.merge_fn)(&lowest, &second_lowest);
match result {
MergeResult::EmitLeft => {
self.heap.push(second_lowest);
self.item = CurrentItem::Iterator(lowest);
return Ok(());
}
MergeResult::Other { emit, left, right } => {
try_join!(
lowest.maybe_discard(&left),
second_lowest.maybe_discard(&right)
)?;
self.update_item(lowest, left);
self.update_item(second_lowest, right);
if let Some(emit) = emit {
self.item = CurrentItem::Item(emit);
return Ok(());
}
}
}
} else {
self.item = CurrentItem::Iterator(lowest);
return Ok(());
}
}
self.item = CurrentItem::None;
Ok(())
}
// Updates the merge iterator depending on |op|. If discarding, the iterator should have already
// been advanced.
fn update_item(&mut self, item: &'a mut MergeLayerIterator<'b, K, V>, op: ItemOp<K, V>) {
match op {
ItemOp::Keep => self.heap.push(item),
ItemOp::Discard => {
// The iterator should have already been advanced.
if item.is_some() {
self.heap.push(item);
}
}
ItemOp::Replace(replacement) => {
item.replace(replacement);
self.heap.push(item);
}
}
}
}
#[async_trait]
impl<'a, K: Key + OrdLowerBound, V: Value> LayerIterator<K, V> for MergerIterator<'a, '_, K, V> {
// This method should only be used with seek(Bound::Unbounded); use advance_with_hint with
// seek(Bound::Included(_)).
async fn advance(&mut self) -> Result<(), Error> {
assert!(!self.must_use_advance_with_hint);
self.advance_impl().await
}
fn get(&self) -> Option<ItemRef<'_, K, V>> {
(&self.item).into()
}
}
// Merges the given item into a mutable layer.
pub(super) async fn merge_into<K: Debug + OrdLowerBound, V: Debug>(
mut_iter: Box<dyn LayerIteratorMut<K, V> + '_>,
item: Item<K, V>,
merge_fn: MergeFn<K, V>,
) -> Result<(), Error> {
let merge_item = if mut_iter.get().is_some() { MergeItem::Iter } else { MergeItem::None };
let mut mut_merge_iter = MergeLayerIterator {
layer: None,
iter: RawIterator::Mut(mut_iter),
layer_index: 1,
item: merge_item,
};
let mut item_merge_iter = MergeLayerIterator::new_with_item(0, MergeItem::Item(item));
while mut_merge_iter.is_some() && item_merge_iter.is_some() {
if mut_merge_iter > item_merge_iter {
// In this branch the mutable layer is left and the item we're merging-in is right.
let merge_result = merge_fn(&mut_merge_iter, &item_merge_iter);
log::debug!(
"(1) merge for {:?} {:?} -> {:?}",
mut_merge_iter.key(),
item_merge_iter.key(),
merge_result
);
match merge_result {
MergeResult::EmitLeft => {
if let Some(item) = mut_merge_iter.take_item() {
mut_merge_iter.insert(item);
mut_merge_iter.set_item_from_iter();
} else {
mut_merge_iter.advance().await?;
}
}
MergeResult::Other { emit, left, right } => {
if let Some(emit) = emit {
mut_merge_iter.insert(emit);
}
match left {
ItemOp::Keep => {}
ItemOp::Discard => {
if let MergeItem::Iter = mut_merge_iter.item {
mut_merge_iter.erase();
}
mut_merge_iter.set_item_from_iter();
}
ItemOp::Replace(item) => {
if let MergeItem::Iter = mut_merge_iter.item {
mut_merge_iter.erase();
}
mut_merge_iter.item = MergeItem::Item(item)
}
}
match right {
ItemOp::Keep => {}
ItemOp::Discard => item_merge_iter.item = MergeItem::None,
ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
}
}
}
} else {
// In this branch, the item we're merging-in is left and the mutable layer is right.
let merge_result = merge_fn(&item_merge_iter, &mut_merge_iter);
log::debug!(
"(2) merge for {:?} {:?} -> {:?}",
item_merge_iter.key(),
mut_merge_iter.key(),
merge_result
);
match merge_result {
MergeResult::EmitLeft => break, // Item is inserted outside the loop
MergeResult::Other { emit, left, right } => {
if let Some(emit) = emit {
mut_merge_iter.insert(emit);
}
match left {
ItemOp::Keep => {}
ItemOp::Discard => item_merge_iter.item = MergeItem::None,
ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
}
match right {
ItemOp::Keep => {}
ItemOp::Discard => {
if let MergeItem::Iter = mut_merge_iter.item {
mut_merge_iter.erase();
}
mut_merge_iter.set_item_from_iter();
}
ItemOp::Replace(item) => {
if let MergeItem::Iter = mut_merge_iter.item {
mut_merge_iter.erase();
}
mut_merge_iter.item = MergeItem::Item(item)
}
}
}
}
}
} // while ...
// The only way we could get here with both items is via the break above, so we know the
// correct order required here.
if let MergeItem::Item(item) = item_merge_iter.item {
mut_merge_iter.insert(item);
}
if let Some(item) = mut_merge_iter.take_item() {
mut_merge_iter.insert(item);
}
mut_merge_iter.commit().await;
Ok(())
}
#[cfg(test)]
mod tests {
use {
super::{
ItemOp::{Discard, Keep, Replace},
MergeResult, Merger,
},
crate::lsm_tree::{
skip_list_layer::SkipListLayer,
types::{
IntoLayerRefs, Item, ItemRef, Layer, LayerIterator, MutableLayer, OrdLowerBound,
},
},
fuchsia_async as fasync,
rand::Rng,
std::ops::Bound,
};
#[derive(Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
struct TestKey(std::ops::Range<u64>);
impl Ord for TestKey {
fn cmp(&self, other: &TestKey) -> std::cmp::Ordering {
self.0.end.cmp(&other.0.end)
}
}
impl PartialOrd for TestKey {
fn partial_cmp(&self, other: &TestKey) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl OrdLowerBound for TestKey {
fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
self.0.start.cmp(&other.0.start)
}
}
#[fasync::run_singlethreaded(test)]
async fn test_emit_left() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
skip_lists[0].insert(items[1].clone()).await;
skip_lists[1].insert(items[0].clone()).await;
let mut merger =
Merger::new(&skip_lists.into_layer_refs(), |_left, _right| MergeResult::EmitLeft);
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_other_emit() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
skip_lists[0].insert(items[1].clone()).await;
skip_lists[1].insert(items[0].clone()).await;
let mut merger =
Merger::new(&skip_lists.into_layer_refs(), |_left, _right| MergeResult::Other {
emit: Some(Item::new(TestKey(3..3), 3)),
left: Discard,
right: Discard,
});
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(3..3), &3));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_replace_left() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
skip_lists[0].insert(items[1].clone()).await;
skip_lists[1].insert(items[0].clone()).await;
let mut merger =
Merger::new(&skip_lists.into_layer_refs(), |_left, _right| MergeResult::Other {
emit: None,
left: Replace(Item::new(TestKey(3..3), 3)),
right: Discard,
});
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
// The merger should replace the left item and then after discarding the right item, it
// should emit the replacement.
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(3..3), &3));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_replace_right() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
skip_lists[0].insert(items[1].clone()).await;
skip_lists[1].insert(items[0].clone()).await;
let mut merger =
Merger::new(&skip_lists.into_layer_refs(), |_left, _right| MergeResult::Other {
emit: None,
left: Discard,
right: Replace(Item::new(TestKey(3..3), 3)),
});
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
// The merger should replace the right item and then after discarding the left item, it
// should emit the replacement.
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(3..3), &3));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_left_less_than_right() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
skip_lists[0].insert(items[1].clone()).await;
skip_lists[1].insert(items[0].clone()).await;
let mut merger = Merger::new(&skip_lists.into_layer_refs(), |left, right| {
assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
assert_eq!((right.key(), right.value()), (&TestKey(2..2), &2));
MergeResult::EmitLeft
});
merger.seek(Bound::Unbounded).await.expect("seek failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_left_equals_right() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let item = Item::new(TestKey(1..1), 1);
skip_lists[0].insert(item.clone()).await;
skip_lists[1].insert(item.clone()).await;
let mut merger = Merger::new(&skip_lists.into_layer_refs(), |left, right| {
assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
assert_eq!(left.layer_index, 0);
assert_eq!(right.layer_index, 1);
MergeResult::EmitLeft
});
merger.seek(Bound::Unbounded).await.expect("seek failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_keep() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
skip_lists[0].insert(items[1].clone()).await;
skip_lists[1].insert(items[0].clone()).await;
let mut merger = Merger::new(&skip_lists.into_layer_refs(), |left, right| {
if left.key() == &TestKey(1..1) {
MergeResult::Other {
emit: None,
left: Replace(Item::new(TestKey(3..3), 3)),
right: Keep,
}
} else {
assert_eq!(left.key(), &TestKey(2..2));
assert_eq!(right.key(), &TestKey(3..3));
MergeResult::Other { emit: None, left: Discard, right: Keep }
}
});
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
// The merger should first replace left and then it should call the merger again with 2 & 3
// and end up just keeping 3.
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(3..3), &3));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_10_layers() {
let skip_lists: Vec<_> = (0..10).map(|_| SkipListLayer::new(100)).collect();
let mut rng = rand::thread_rng();
for i in 0..100 {
skip_lists[rng.gen_range(0, 10) as usize].insert(Item::new(TestKey(i..i), i)).await;
}
let mut merger =
Merger::new(&skip_lists.into_layer_refs(), |_left, _right| MergeResult::EmitLeft);
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
for i in 0..100 {
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(i..i), &i));
iter.advance().await.unwrap();
}
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_uses_cmp_lower_bound() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items = [Item::new(TestKey(1..10), 1), Item::new(TestKey(2..3), 2)];
skip_lists[0].insert(items[1].clone()).await;
skip_lists[1].insert(items[0].clone()).await;
let mut merger =
Merger::new(&skip_lists.into_layer_refs(), |_left, _right| MergeResult::EmitLeft);
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_into_emit_left() {
let skip_list = SkipListLayer::new(100);
let items =
[Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2), Item::new(TestKey(3..3), 3)];
skip_list.insert(items[0].clone()).await;
skip_list.insert(items[2].clone()).await;
skip_list
.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::EmitLeft)
.await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
iter.advance().await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[2].key, &items[2].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_into_emit_last_after_replacing() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
skip_list.insert(items[0].clone()).await;
skip_list
.merge_into(items[1].clone(), &items[0].key, |left, right| {
if left.key() == &TestKey(1..1) {
assert_eq!(right.key(), &TestKey(2..2));
MergeResult::Other {
emit: None,
left: Replace(Item::new(TestKey(3..3), 3)),
right: Keep,
}
} else {
assert_eq!(left.key(), &TestKey(2..2));
assert_eq!(right.key(), &TestKey(3..3));
MergeResult::EmitLeft
}
})
.await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
iter.advance().await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(3..3), &3));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_into_emit_left_after_replacing() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
skip_list.insert(items[0].clone()).await;
skip_list
.merge_into(items[1].clone(), &items[0].key, |left, right| {
if left.key() == &TestKey(1..1) {
assert_eq!(right.key(), &TestKey(3..3));
MergeResult::Other {
emit: None,
left: Replace(Item::new(TestKey(2..2), 2)),
right: Keep,
}
} else {
assert_eq!(left.key(), &TestKey(2..2));
assert_eq!(right.key(), &TestKey(3..3));
MergeResult::EmitLeft
}
})
.await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(2..2), &2));
iter.advance().await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
// This tests emitting in both branches of merge_into, and most of the discard paths.
#[fasync::run_singlethreaded(test)]
async fn test_merge_into_emit_other_and_discard() {
let skip_list = SkipListLayer::new(100);
let items =
[Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 3)];
skip_list.insert(items[0].clone()).await;
skip_list.insert(items[2].clone()).await;
skip_list
.merge_into(items[1].clone(), &items[0].key, |left, right| {
if left.key() == &TestKey(1..1) {
// This tests the top branch in merge_into.
assert_eq!(right.key(), &TestKey(3..3));
MergeResult::Other {
emit: Some(Item::new(TestKey(2..2), 2)),
left: Discard,
right: Keep,
}
} else {
// This tests the bottom branch in merge_into.
assert_eq!(left.key(), &TestKey(3..3));
assert_eq!(right.key(), &TestKey(5..5));
MergeResult::Other {
emit: Some(Item::new(TestKey(4..4), 4)),
left: Discard,
right: Discard,
}
}
})
.await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(2..2), &2));
iter.advance().await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(4..4), &4));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
// This tests replacing the item and discarding the right item (the one remaining untested
// discard path) in the top branch in merge_into.
#[fasync::run_singlethreaded(test)]
async fn test_merge_into_replace_and_discard() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
skip_list.insert(items[0].clone()).await;
skip_list
.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
emit: Some(Item::new(TestKey(2..2), 2)),
left: Replace(Item::new(TestKey(4..4), 4)),
right: Discard,
})
.await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(2..2), &2));
iter.advance().await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(4..4), &4));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
// This tests replacing the right item in the top branch of merge_into and the left item in the
// bottom branch of merge_into.
#[fasync::run_singlethreaded(test)]
async fn test_merge_into_replace_merge_item() {
let skip_list = SkipListLayer::new(100);
let items =
[Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 5)];
skip_list.insert(items[0].clone()).await;
skip_list.insert(items[2].clone()).await;
skip_list
.merge_into(items[1].clone(), &items[0].key, |_left, right| {
if right.key() == &TestKey(3..3) {
MergeResult::Other {
emit: None,
left: Discard,
right: Replace(Item::new(TestKey(2..2), 2)),
}
} else {
assert_eq!(right.key(), &TestKey(5..5));
MergeResult::Other {
emit: None,
left: Replace(Item::new(TestKey(4..4), 4)),
right: Discard,
}
}
})
.await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(4..4), &4));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
// This tests replacing the right item in the bottom branch of merge_into.
#[fasync::run_singlethreaded(test)]
async fn test_merge_into_replace_existing() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
skip_list.insert(items[1].clone()).await;
skip_list
.merge_into(items[0].clone(), &items[0].key, |_left, right| {
if right.key() == &TestKey(3..3) {
MergeResult::Other {
emit: None,
left: Keep,
right: Replace(Item::new(TestKey(2..2), 2)),
}
} else {
MergeResult::EmitLeft
}
})
.await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(2..2), &2));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_into_discard_last() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
skip_list.insert(items[0].clone()).await;
skip_list
.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
emit: None,
left: Discard,
right: Keep,
})
.await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_into_empty() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1..1), 1)];
skip_list
.merge_into(items[0].clone(), &items[0].key, |_left, _right| {
panic!("Unexpected merge!");
})
.await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_seek_uses_minimum_number_of_iterators() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(1..1), 2)];
skip_lists[0].insert(items[0].clone()).await;
skip_lists[1].insert(items[1].clone()).await;
let mut merger = Merger::new(&skip_lists.into_layer_refs(), |_left, _right| {
MergeResult::Other { emit: None, left: Discard, right: Keep }
});
let iter = merger.seek(Bound::Included(&items[0].key)).await.expect("seek failed");
// Seek should only search in the first skip list, so no merge should take place, and we'll
// know if it has because we'll see a different value (2 rather than 1).
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
}
#[fasync::run_singlethreaded(test)]
async fn test_advance_with_hint() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items =
[Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2), Item::new(TestKey(3..3), 3)];
skip_lists[0].insert(items[0].clone()).await;
skip_lists[0].insert(items[1].clone()).await;
skip_lists[1].insert(items[2].clone()).await;
let mut merger =
Merger::new(&skip_lists.into_layer_refs(), |_left, _right| MergeResult::EmitLeft);
let mut iter = merger.seek(Bound::Included(&items[0].key)).await.expect("seek failed");
// This should still find the 2..2 key.
iter.advance_with_hint(&items[2].key).await.expect("advance_with_hint failed");
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
iter.advance_with_hint(&items[2].key).await.expect("advance_with_hint failed");
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[2].key, &items[2].value));
iter.advance_with_hint(&TestKey(4..4)).await.expect("advance_with_hint failed");
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_advance_with_hint_no_more() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
skip_lists[0].insert(items[0].clone()).await;
skip_lists[1].insert(items[1].clone()).await;
let mut merger =
Merger::new(&skip_lists.into_layer_refs(), |_left, _right| MergeResult::EmitLeft);
let mut iter = merger.seek(Bound::Included(&items[0].key)).await.expect("seek failed");
// This should skip over the 2..2 key.
iter.advance_with_hint(&TestKey(100..100)).await.expect("advance_with_hint failed");
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_seek_less_than() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
skip_lists[0].insert(items[0].clone()).await;
skip_lists[1].insert(items[1].clone()).await;
// Search for a key before 1..1.
let mut merger = Merger::new(&skip_lists.into_layer_refs(), |_left, _right| {
MergeResult::Other { emit: None, left: Discard, right: Keep }
});
let iter = merger.seek(Bound::Included(&TestKey(0..0))).await.expect("seek failed");
// This should find the 2..2 key because of our merge function.
let ItemRef { key, value } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
}
#[fasync::run_singlethreaded(test)]
async fn test_seek_to_end() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
skip_lists[0].insert(items[0].clone()).await;
skip_lists[1].insert(items[1].clone()).await;
let mut merger = Merger::new(&skip_lists.into_layer_refs(), |_left, _right| {
MergeResult::Other { emit: None, left: Discard, right: Keep }
});
let iter = merger.seek(Bound::Included(&TestKey(3..3))).await.expect("seek failed");
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_all_discarded() {
let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
skip_lists[0].insert(items[1].clone()).await;
skip_lists[1].insert(items[0].clone()).await;
let mut merger = Merger::new(&skip_lists.into_layer_refs(), |_left, _right| {
MergeResult::Other { emit: None, left: Discard, right: Discard }
});
let iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
assert!(iter.get().is_none());
}
}