blob: 576441bc761f5c89992a1f665d6861e532f80e6d [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// There are a great many optimisations that could be considered to improve performance and maybe
// memory usage.
use {
crate::{
log::*,
lsm_tree::{
merge::{self, MergeFn},
types::{
BoxedLayerIterator, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut,
MutableLayer, OrdLowerBound, OrdUpperBound, Value,
},
},
serialized_types::{Version, LATEST_VERSION},
},
anyhow::Error,
async_trait::async_trait,
async_utils::event::Event,
futures::future::poll_fn,
std::{
cell::UnsafeCell,
cmp::{min, Ordering},
ops::{Bound, Range},
sync::{
atomic::{self, AtomicPtr, AtomicU32},
Arc, Mutex,
},
task::{Poll, Waker},
},
};
// Each skip list node contains a variable sized pointer list. The head pointers also exist in the
// form of a pointer list. Index 0 in the pointer list is the chain with the most elements i.e.
// contains every element in the list.
struct PointerList<K, V>(Box<[AtomicPtr<SkipListNode<K, V>>]>);
impl<K, V> PointerList<K, V> {
fn new(count: usize) -> PointerList<K, V> {
let mut pointers = Vec::new();
for _ in 0..count {
pointers.push(AtomicPtr::new(std::ptr::null_mut()));
}
PointerList(pointers.into_boxed_slice())
}
fn len(&self) -> usize {
self.0.len()
}
// Extracts the pointer at the given index.
fn get_mut<'a>(&self, index: usize) -> Option<&'a mut SkipListNode<K, V>> {
unsafe { self.0[index].load(atomic::Ordering::SeqCst).as_mut() }
}
// Same as previous, but returns an immutable reference.
fn get<'a>(&self, index: usize) -> Option<&'a SkipListNode<K, V>> {
unsafe { self.0[index].load(atomic::Ordering::SeqCst).as_ref() }
}
// Sets the pointer at the given index.
fn set(&self, index: usize, node: Option<&SkipListNode<K, V>>) {
self.0[index].store(
match node {
None => std::ptr::null_mut(),
Some(node) => {
// https://github.com/rust-lang/rust/issues/66136#issuecomment-550003651
// suggests that the following is the best way to cast from const* to mut*.
unsafe {
(&*(node as *const SkipListNode<K, V>
as *const UnsafeCell<SkipListNode<K, V>>))
.get()
}
}
},
atomic::Ordering::SeqCst,
);
}
fn get_ptr(&self, index: usize) -> *mut SkipListNode<K, V> {
self.0[index].load(atomic::Ordering::SeqCst)
}
}
struct SkipListNode<K, V> {
item: Item<K, V>,
pointers: PointerList<K, V>,
}
pub struct SkipListLayer<K, V> {
// These are the head pointers for the list.
pointers: PointerList<K, V>,
inner: std::sync::Mutex<Inner<K, V>>,
// Writes are locked using this lock.
writer_lock: futures::lock::Mutex<()>,
// The number of nodes that have been allocated. This is only used for debugging purposes.
allocated: AtomicU32,
close_event: Mutex<Option<Event>>,
}
// The writer needs to synchronize with the readers and this is done by keeping track of read
// counts. We could, in theory, remove the mutex and make the read counts atomic (and thus make
// reads truly lock free) but it's simpler and easier to reason about with a mutex and what matters
// most is that we avoid using a futures::lock::Mutex for readers because that can be blocked for
// relatively long periods of time.
struct Inner<K, V> {
// After a write, if there are nodes that need to be freed, and existing readers, the epoch
// changes and new readers will be in a new epoch. When all the old readers finish, the nodes
// can be freed.
epoch: u64,
// We only allow two epochs to be live at any point in time. These are the counts of active
// readers for each of those two epochs, and this is indexed by the lowest bit of epoch.
counts: [u16; 2],
// A waker waiting for the read count to drop to zero.
waker: Option<Waker>,
// A list of nodes to be freed once the read counts have reached zero.
erase_list: Range<*mut SkipListNode<K, V>>,
// The number of items in the skip-list.
item_count: usize,
}
// Required because of `erase_list` which holds pointers.
unsafe impl<K, V> Send for Inner<K, V> {}
impl<K, V> Inner<K, V> {
fn new() -> Self {
Inner {
epoch: 0,
counts: [0, 0],
waker: None,
erase_list: std::ptr::null_mut()..std::ptr::null_mut(),
item_count: 0,
}
}
fn free_erase_list(&mut self, owner: &SkipListLayer<K, V>) {
let mut maybe_node = unsafe { self.erase_list.start.as_mut() };
loop {
match maybe_node {
Some(node) if node as *const _ != self.erase_list.end => {
maybe_node = owner.free_node(node);
}
_ => break,
}
}
}
}
impl<K, V> SkipListLayer<K, V> {
pub fn new(max_item_count: usize) -> Arc<SkipListLayer<K, V>> {
Arc::new(SkipListLayer {
pointers: PointerList::new((max_item_count as f32).log2() as usize + 1),
inner: std::sync::Mutex::new(Inner::new()),
writer_lock: futures::lock::Mutex::new(()),
allocated: AtomicU32::new(0),
close_event: Mutex::new(Some(Event::new())),
})
}
fn alloc_node(&self, item: Item<K, V>, pointer_count: usize) -> Box<SkipListNode<K, V>> {
self.allocated.fetch_add(1, atomic::Ordering::Relaxed);
Box::new(SkipListNode { item, pointers: PointerList::new(pointer_count) })
}
// Frees and then returns the next node in the chain.
fn free_node(&self, node: &mut SkipListNode<K, V>) -> Option<&mut SkipListNode<K, V>> {
self.allocated.fetch_sub(1, atomic::Ordering::Relaxed);
unsafe { Box::from_raw(node).pointers.get_mut(0) }
}
}
impl<K: Key, V: Value> SkipListLayer<K, V> {
// Erases the given item. Does nothing if the item doesn't exist.
pub async fn erase(&self, key: &K)
where
K: std::cmp::Eq,
{
let mut iter = SkipListLayerIterMut::new(self, Bound::Included(key)).await;
if let Some(ItemRef { key: k, .. }) = iter.get() {
if k == key {
iter.erase();
} else {
warn!("Attempt to erase key not present!");
}
}
iter.commit_and_wait().await;
}
}
// We have to manually manage memory.
impl<K, V> Drop for SkipListLayer<K, V> {
fn drop(&mut self) {
let mut next = self.pointers.get_mut(0);
while let Some(node) = next {
next = self.free_node(node);
}
assert_eq!(self.allocated.load(atomic::Ordering::Relaxed), 0);
}
}
#[async_trait]
impl<K: Key, V: Value> Layer<K, V> for SkipListLayer<K, V> {
async fn seek<'a>(
&'a self,
bound: std::ops::Bound<&K>,
) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
Ok(Box::new(SkipListLayerIter::new(self, bound)))
}
fn lock(&self) -> Option<Event> {
self.close_event.lock().unwrap().clone()
}
async fn close(&self) {
let _ = {
let event = self.close_event.lock().unwrap().take().expect("close already called");
event.wait_or_dropped()
}
.await;
}
fn get_version(&self) -> Version {
// The SkipListLayer is stored in RAM and written to disk as a SimplePersitentLayer
// Hence, the SkipListLayer is always at the latest version
return LATEST_VERSION;
}
}
// The methods here all use commit_and_wait (rather than just using commit via drop) for now because
// it provides a barrier such that upon return, callers can assume that all other threads will see
// the mutation.
#[async_trait]
impl<K: Eq + Key + OrdLowerBound, V: Value> MutableLayer<K, V> for SkipListLayer<K, V> {
fn as_layer(self: Arc<Self>) -> Arc<dyn Layer<K, V>> {
self
}
// Inserts the given item.
async fn insert(&self, item: Item<K, V>) {
let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key)).await;
if let Some(found_item) = iter.get() {
// TODO(fxbug.dev/96084): This assertion will eventually have to go since it would be
// possible to trip this when replaying a malformed journal.
assert_ne!(found_item.key, &item.key);
}
iter.insert(item);
iter.commit_and_wait().await;
}
// Replaces or inserts the given item.
async fn replace_or_insert(&self, item: Item<K, V>) {
let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key)).await;
if let Some(found_item) = iter.get() {
if found_item.key == &item.key {
iter.erase();
}
}
iter.insert(item);
iter.commit_and_wait().await;
}
async fn merge_into(&self, item: Item<K, V>, lower_bound: &K, merge_fn: MergeFn<K, V>) {
merge::merge_into(
Box::new(SkipListLayerIterMut::new(self, Bound::Included(lower_bound)).await),
item,
merge_fn,
)
.await
.unwrap();
}
fn len(&self) -> usize {
self.inner.lock().unwrap().item_count
}
}
// -- SkipListLayerIter --
struct SkipListLayerIter<'a, K, V> {
skip_list: &'a SkipListLayer<K, V>,
// The epoch for this reader.
epoch: u64,
// The current node.
node: Option<&'a SkipListNode<K, V>>,
}
impl<'a, K: OrdUpperBound, V> SkipListLayerIter<'a, K, V> {
fn new(skip_list: &'a SkipListLayer<K, V>, bound: Bound<&K>) -> Self {
let epoch = {
let mut inner = skip_list.inner.lock().unwrap();
let index = (inner.epoch & 1) as usize;
inner.counts[index] += 1;
inner.epoch
};
let (included, key) = match bound {
Bound::Unbounded => {
return SkipListLayerIter { skip_list, epoch, node: skip_list.pointers.get(0) };
}
Bound::Included(key) => (true, key),
Bound::Excluded(key) => (false, key),
};
let mut last_pointers = &skip_list.pointers;
// Some care needs to be taken here because new elements can be inserted atomically, so it
// is important that the node we return in the iterator is the same node that we performed
// the last comparison on.
let mut node = None;
for index in (0..skip_list.pointers.len()).rev() {
// Keep iterating along this level until we encounter a key that's >= our search key.
loop {
node = last_pointers.get(index);
if let Some(node) = node {
match &node.item.key.cmp_upper_bound(key) {
Ordering::Equal if included => break,
Ordering::Greater => break,
_ => {}
}
last_pointers = &node.pointers;
} else {
break;
}
}
}
SkipListLayerIter { skip_list, epoch, node }
}
}
impl<K, V> Drop for SkipListLayerIter<'_, K, V> {
fn drop(&mut self) {
let mut inner = self.skip_list.inner.lock().unwrap();
let index = (self.epoch & 1) as usize;
inner.counts[index] -= 1;
if inner.counts[index] == 0 && inner.epoch != self.epoch {
inner.free_erase_list(self.skip_list);
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
}
}
#[async_trait]
impl<K: Key, V: Value> LayerIterator<K, V> for SkipListLayerIter<'_, K, V> {
async fn advance(&mut self) -> Result<(), Error> {
match self.node {
None => {}
Some(node) => self.node = node.pointers.get(0),
}
Ok(())
}
fn get(&self) -> Option<ItemRef<'_, K, V>> {
self.node.map(|node| node.item.as_item_ref())
}
}
type PointerListRefArray<'a, K, V> = Box<[&'a PointerList<K, V>]>;
// -- SkipListLayerIterMut --
// This works by building an insertion chain. When that chain is committed, it is done atomically
// so that readers are not interrupted. When the existing readers are finished, it is then safe to
// release memory for any nodes that might have been erased. In the case that we are only erasing
// elements, there will be no insertion chain, in which case we just atomically remove the elements
// from the chain.
struct SkipListLayerIterMut<'a, K, V> {
skip_list: &'a SkipListLayer<K, V>,
// Since this is a mutable iterator, we need to keep pointers to all the nodes that precede the
// current position at every level, so that we can update them when inserting or erasing
// elements.
prev_pointers: PointerListRefArray<'a, K, V>,
// When we first insert or erase an element, we take a copy of prev_pointers so that
// we know which pointers need to be updated when we commit.
insertion_point: Option<PointerListRefArray<'a, K, V>>,
// These are the nodes that we should point to when we commit.
insertion_nodes: PointerList<K, V>,
// Only one write can proceed at a time. We only need a place to keep the mutex guard, which is
// why Rust thinks this is unused.
#[allow(dead_code)]
write_guard: futures::lock::MutexGuard<'a, ()>,
// The change in item count as a result of this mutation.
item_delta: isize,
}
impl<K: OrdUpperBound, V> SkipListLayerIterMut<'_, K, V> {
async fn new<'a>(
skip_list: &'a SkipListLayer<K, V>,
bound: std::ops::Bound<&K>,
) -> SkipListLayerIterMut<'a, K, V> {
let len = skip_list.pointers.len();
let write_guard = skip_list.writer_lock.lock().await;
// Before we proceed, we should wait for any old readers on the other epoch to finish.
poll_fn(|cx| {
let mut inner = skip_list.inner.lock().unwrap();
if inner.counts[(inner.epoch & 1 ^ 1) as usize] == 0 {
Poll::Ready(())
} else {
inner.waker = Some(cx.waker().clone());
Poll::Pending
}
})
.await;
// Start by setting all the previous pointers to the head.
//
// To understand how the previous pointers work, imagine the list looks something like the
// following:
//
// 2 |--->|
// 1 |--->|--|------->|
// 0 |--->|--|--|--|->|
// HEAD A B C D E F
//
// Now imagine that the iterator is pointing at element D. In that case, the previous
// pointers will point at C for index 0, B for index 1 and A for index 2. With that
// information, it will be possible to insert an element immediately prior to D and
// correctly update as many pointers as required (remember a new element will be given a
// random number of levels).
let mut prev_pointers = vec![&skip_list.pointers; len].into_boxed_slice();
match bound {
Bound::Unbounded => {}
Bound::Included(key) => {
let pointers = &mut prev_pointers;
for index in (0..len).rev() {
while let Some(node) = pointers[index].get(index) {
// Keep iterating along this level until we encounter a key that's >= our
// search key.
match &(node.item.key).cmp_upper_bound(key) {
Ordering::Equal | Ordering::Greater => break,
Ordering::Less => {}
}
pointers[index] = &node.pointers;
}
if index > 0 {
pointers[index - 1] = pointers[index];
}
}
}
Bound::Excluded(_) => panic!("Excluded bounds not supported"),
}
SkipListLayerIterMut {
skip_list,
prev_pointers,
insertion_point: None,
insertion_nodes: PointerList::new(len),
write_guard,
item_delta: 0,
}
}
}
impl<K, V> SkipListLayerIterMut<'_, K, V> {
// Commits the changes but doesn't wait for readers to finish. Returns the new epoch if the
// epoch changed. If `force_epoch_change` is true, the epoch will be changed if there are
// existing readers even if it is not necessary (because no elements need to be freed); this
// gives callers a mechanism to wait for existing readers to finish.
fn commit(&mut self, force_epoch_change: bool) -> Option<u64> {
// Splice the changes into the list.
let prev_pointers = match self.insertion_point.take() {
Some(prev_pointers) => prev_pointers,
None => return None,
};
// Keep track of the first node that we might need to erase later.
let maybe_erase = prev_pointers[0].get_mut(0);
// If there are no insertion nodes, then it means that we're only erasing nodes.
if self.insertion_nodes.get(0).is_none() {
// Erase all elements between the insertion point and the current element. The
// pointers for levels > 0 should already have been done, so it's only level 0 we
// need to worry about.
prev_pointers[0].set(0, self.prev_pointers[0].get(0));
} else {
// Switch the pointers over so that the insertion chain is spliced in. This is safe
// so long as the bottom pointer is done first because that guarantees the new nodes
// will be found, just maybe not as efficiently.
for i in 0..self.insertion_nodes.len() {
if let Some(node) = self.insertion_nodes.get_mut(i) {
prev_pointers[i].set(i, Some(node));
}
}
}
// Switch the epoch so that we can track when existing readers have finished.
let mut inner = self.skip_list.inner.lock().unwrap();
inner.item_count = inner.item_count.wrapping_add(self.item_delta as usize);
let has_readers = inner.counts[(inner.epoch & 1) as usize] > 0;
if let Some(start) = maybe_erase {
let end = self.prev_pointers[0].get_ptr(0);
if start as *mut _ != end {
inner.erase_list = start..end;
if has_readers {
inner.epoch = inner.epoch.wrapping_add(1);
return Some(inner.epoch);
} else {
inner.free_erase_list(self.skip_list);
}
}
}
if force_epoch_change && has_readers {
inner.erase_list = std::ptr::null_mut()..std::ptr::null_mut();
inner.epoch = inner.epoch.wrapping_add(1);
return Some(inner.epoch);
}
None
}
}
impl<K, V> Drop for SkipListLayerIterMut<'_, K, V> {
fn drop(&mut self) {
self.commit(false);
}
}
#[async_trait]
impl<K: Key + Clone, V: Value + Clone> LayerIterator<K, V> for SkipListLayerIterMut<'_, K, V> {
async fn advance(&mut self) -> Result<(), Error> {
if self.insertion_point.is_some() {
if let Some(item) = self.get() {
// Copy the current item into the insertion chain.
let copy = item.cloned();
self.insert(copy);
self.erase();
}
} else {
let pointers = &mut self.prev_pointers;
if let Some(next) = pointers[0].get_mut(0) {
for i in 0..next.pointers.len() {
pointers[i] = &next.pointers;
}
}
}
Ok(())
}
fn get(&self) -> Option<ItemRef<'_, K, V>> {
self.prev_pointers[0].get(0).map(|node| node.item.as_item_ref())
}
}
#[async_trait]
impl<K: Key + Clone, V: Value + Clone> LayerIteratorMut<K, V> for SkipListLayerIterMut<'_, K, V> {
fn as_iterator_mut(&mut self) -> &mut dyn LayerIterator<K, V> {
self
}
fn as_iterator(&self) -> &dyn LayerIterator<K, V> {
self
}
fn insert(&mut self, item: Item<K, V>) {
use rand::Rng;
let mut rng = rand::thread_rng();
let max_pointers = self.skip_list.pointers.len();
// This chooses a random number of pointers such that each level has half the number of
// pointers of the previous one.
let pointer_count = max_pointers
- min(
(rng.gen_range(0..2u32.pow(max_pointers as u32) - 1) as f32).log2() as usize,
max_pointers - 1,
);
let node = Box::leak(self.skip_list.alloc_node(item, pointer_count));
if self.insertion_point.is_none() {
self.insertion_point = Some(self.prev_pointers.clone());
}
for i in 0..pointer_count {
let pointers = self.prev_pointers[i];
node.pointers.set(i, pointers.get(i));
if self.insertion_nodes.get(i).is_none() {
// If there's no insertion node at this level, record this node as the node to
// switch in when we commit.
self.insertion_nodes.set(i, Some(node));
} else {
// There's already an insertion node at this level which means that it's part of the
// insertion chain, so we can just update the pointers now.
pointers.set(i, Some(&node));
}
// The iterator should point at the node following the new node i.e. the existing node.
self.prev_pointers[i] = &node.pointers;
}
self.item_delta += 1;
}
fn erase(&mut self) {
let pointers = &mut self.prev_pointers;
if let Some(next) = pointers[0].get_mut(0) {
if self.insertion_point.is_none() {
self.insertion_point = Some(pointers.clone());
}
if self.insertion_nodes.get(0).is_none() {
// If there's no insertion node, then just update the iterator position to point to
// the next node, and then when we commit, it'll get erased.
pointers[0] = &next.pointers;
} else {
// There's an insertion node, so the current element must be part of the insertion
// chain and so we can update the pointers immediately. There will be another node
// that isn't part of the insertion chain that will still point at this node, but it
// will disappear when we commit.
pointers[0].set(0, next.pointers.get(0));
}
// Fix up all the pointers except the bottom one. Readers will still find this node,
// just not as efficiently.
for i in 1..next.pointers.len() {
pointers[i].set(i, next.pointers.get(i));
}
}
self.item_delta -= 1;
}
async fn commit_and_wait(&mut self) {
if let Some(epoch) = self.commit(true) {
poll_fn(|cx| {
let mut inner = self.skip_list.inner.lock().unwrap();
if inner.counts[(epoch & 1 ^ 1) as usize] > 0 {
inner.waker = Some(cx.waker().clone());
Poll::Pending
} else {
Poll::Ready(())
}
})
.await;
}
}
}
#[cfg(test)]
mod tests {
use {
super::{SkipListLayer, SkipListLayerIterMut},
crate::{
lsm_tree::{
merge::{
ItemOp::{Discard, Replace},
MergeLayerIterator, MergeResult,
},
types::{
DefaultOrdLowerBound, DefaultOrdUpperBound, Item, ItemRef, Layer,
LayerIterator, LayerIteratorMut, MutableLayer,
},
},
serialized_types::{
versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
},
},
fuchsia_async as fasync,
futures::{channel::oneshot::channel, future::join_all, join},
std::{
ops::Bound,
sync::Mutex,
time::{Duration, Instant},
},
};
#[derive(
Clone,
Eq,
PartialEq,
PartialOrd,
Ord,
Debug,
serde::Serialize,
serde::Deserialize,
Versioned,
)]
struct TestKey(i32);
versioned_type! { 1.. => TestKey }
impl DefaultOrdLowerBound for TestKey {}
impl DefaultOrdUpperBound for TestKey {}
#[fasync::run_singlethreaded(test)]
async fn test_iteration() {
// Insert two items and make sure we can iterate back in the correct order.
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
skip_list.insert(items[1].clone()).await;
skip_list.insert(items[0].clone()).await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_seek_exact() {
// Seek for an exact match.
let skip_list = SkipListLayer::new(100);
for i in (0..100).rev() {
skip_list.insert(Item::new(TestKey(i), i)).await;
}
let mut iter = skip_list.seek(Bound::Included(&TestKey(57))).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(57), &57));
// And check the next item is correct.
iter.advance().await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(58), &58));
}
#[fasync::run_singlethreaded(test)]
async fn test_seek_lower_bound() {
// Seek for a non-exact match.
let skip_list = SkipListLayer::new(100);
for i in (0..100).rev() {
skip_list.insert(Item::new(TestKey(i * 3), i * 3)).await;
}
let mut expected_index = 57 * 3;
let mut iter = skip_list.seek(Bound::Included(&TestKey(expected_index - 1))).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
// And check the next item is correct.
expected_index += 3;
iter.advance().await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
}
#[fasync::run_singlethreaded(test)]
async fn test_replace_or_insert_replaces() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
skip_list.insert(items[1].clone()).await;
skip_list.insert(items[0].clone()).await;
let replacement_value = 3;
skip_list.replace_or_insert(Item::new(items[1].key.clone(), replacement_value)).await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &replacement_value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_replace_or_insert_inserts() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
skip_list.insert(items[2].clone()).await;
skip_list.insert(items[0].clone()).await;
skip_list.replace_or_insert(items[1].clone()).await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
iter.advance().await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[2].key, &items[2].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_erase() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
skip_list.insert(items[1].clone()).await;
skip_list.insert(items[0].clone()).await;
assert_eq!(skip_list.len(), 2);
skip_list.erase(&items[1].key).await;
assert_eq!(skip_list.len(), 1);
{
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
skip_list.erase(&items[0].key).await;
assert_eq!(skip_list.len(), 0);
{
let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
assert!(iter.get().is_none());
}
}
// This test ends up being flaky on CQ. It is left here as it might be useful in case
// significant changes are made.
#[fasync::run_singlethreaded(test)]
#[ignore]
async fn test_seek_is_log_n_complexity() {
// Keep doubling up the number of items until it takes about 500ms to search and then go
// back and measure something that should, in theory, take about half that time.
let mut n = 100;
let mut loops = 0;
const TARGET_TIME: Duration = Duration::from_millis(500);
let time = loop {
let skip_list = SkipListLayer::new(n as usize);
for i in 0..n {
skip_list.insert(Item::new(TestKey(i), i)).await;
}
let start = Instant::now();
for i in 0..n {
skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
}
let elapsed = Instant::now() - start;
if elapsed > TARGET_TIME {
break elapsed;
}
n *= 2;
loops += 1;
};
let seek_count = n;
n >>= loops / 2; // This should, in theory, result in 50% seek time.
let skip_list = SkipListLayer::new(n as usize);
for i in 0..n {
skip_list.insert(Item::new(TestKey(i), i)).await;
}
let start = Instant::now();
for i in 0..seek_count {
skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
}
let elapsed = Instant::now() - start;
eprintln!(
"{} items: {}ms, {} items: {}ms",
seek_count,
time.as_millis(),
n,
elapsed.as_millis()
);
// Experimental results show that typically we do a bit better than log(n), but here we just
// check that the time we just measured is above 25% of the time we first measured, the
// theory suggests it should be around 50%.
assert!(elapsed * 4 > time);
}
#[fasync::run_singlethreaded(test)]
async fn test_large_number_of_items() {
let item_count = 1000;
let skip_list = SkipListLayer::new(1000);
for i in 1..item_count {
skip_list.insert(Item::new(TestKey(i), 1)).await;
}
let mut iter = skip_list.seek(Bound::Included(&TestKey(item_count - 10))).await.unwrap();
for i in item_count - 10..item_count {
assert_eq!(iter.get().expect("missing item").key, &TestKey(i));
iter.advance().await.unwrap();
}
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_mutliple_readers_allowed() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
skip_list.insert(items[1].clone()).await;
skip_list.insert(items[0].clone()).await;
// Create the first iterator and check the first item.
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
// Create a second iterator and check the first item.
let iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter2.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
// Now go back to the first iterator and check the second item.
iter.advance().await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
}
fn merge(
left: &'_ MergeLayerIterator<'_, TestKey, i32>,
right: &'_ MergeLayerIterator<'_, TestKey, i32>,
) -> MergeResult<TestKey, i32> {
MergeResult::Other {
emit: None,
left: Replace(Item::new((*left.key()).clone(), *left.value() + *right.value())),
right: Discard,
}
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_into() {
let skip_list = SkipListLayer::new(100);
skip_list.insert(Item::new(TestKey(1), 1)).await;
skip_list.merge_into(Item::new(TestKey(2), 2), &TestKey(1), merge).await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&TestKey(1), &3));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_two_inserts() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
{
let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded).await;
iter.insert(items[0].clone());
iter.insert(items[1].clone());
}
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
}
#[fasync::run_singlethreaded(test)]
async fn test_erase_after_insert() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
skip_list.insert(items[1].clone()).await;
{
let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded).await;
iter.insert(items[0].clone());
iter.erase();
}
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_insert_after_erase() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
skip_list.insert(items[1].clone()).await;
{
let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded).await;
iter.erase();
iter.insert(items[0].clone());
}
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_insert_erase_insert() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
skip_list.insert(items[0].clone()).await;
{
let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded).await;
iter.insert(items[1].clone());
iter.erase();
iter.insert(items[2].clone());
}
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
iter.advance().await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[2].key, &items[2].value));
}
#[fasync::run_singlethreaded(test)]
async fn test_two_erase_erases() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
skip_list.insert(items[0].clone()).await;
skip_list.insert(items[1].clone()).await;
skip_list.insert(items[2].clone()).await;
{
let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded).await;
iter.erase();
iter.erase();
}
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[2].key, &items[2].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_readers_not_blocked_by_writers() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
skip_list.insert(items[1].clone()).await;
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
let mut iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
join!(skip_list.insert(items[0].clone()), async {
loop {
let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, .. } = iter.get().expect("missing item");
if key == &items[0].key {
break;
}
}
iter.advance().await.unwrap();
assert!(iter.get().is_none());
std::mem::drop(iter);
iter2.advance().await.unwrap();
assert!(iter2.get().is_none());
std::mem::drop(iter2);
});
}
#[fasync::run(20, test)]
async fn test_many_readers_and_writers() {
let skip_list = SkipListLayer::new(100);
join_all(
(0..10)
.map(|i| {
let skip_list_clone = skip_list.clone();
fasync::Task::spawn(async move {
for j in 0..10 {
skip_list_clone.insert(Item::new(TestKey(i * 100 + j), i)).await;
}
})
})
.chain((0..10).map(|_| {
let skip_list_clone = skip_list.clone();
fasync::Task::spawn(async move {
for _ in 0..300 {
let mut iter =
skip_list_clone.seek(Bound::Unbounded).await.expect("seek failed");
let mut last_item: Option<TestKey> = None;
while let Some(item) = iter.get() {
if let Some(last) = last_item {
assert!(item.key > &last);
}
last_item = Some(item.key.clone());
iter.advance().await.expect("advance failed");
}
}
})
})),
)
.await;
}
#[fasync::run_singlethreaded(test)]
async fn test_insert_advance_erase() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
skip_list.insert(items[1].clone()).await;
skip_list.insert(items[2].clone()).await;
assert_eq!(skip_list.len(), 2);
{
let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded).await;
iter.insert(items[0].clone());
iter.advance().await.expect("advance failed");
iter.erase();
}
assert_eq!(skip_list.len(), 2);
let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[0].key, &items[0].value));
iter.advance().await.unwrap();
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
iter.advance().await.unwrap();
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_seek_excluded() {
let skip_list = SkipListLayer::new(100);
let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
skip_list.insert(items[0].clone()).await;
skip_list.insert(items[1].clone()).await;
let iter = skip_list.seek(Bound::Excluded(&items[0].key)).await.expect("seek failed");
let ItemRef { key, value, .. } = iter.get().expect("missing item");
assert_eq!((key, value), (&items[1].key, &items[1].value));
}
#[fasync::run(10, test)]
async fn test_insert_race() {
for _ in 0..1000 {
let skip_list = SkipListLayer::new(100);
skip_list.insert(Item::new(TestKey(2), 2)).await;
let skip_list_clone = skip_list.clone();
join!(
fasync::Task::spawn(async move {
skip_list_clone.insert(Item::new(TestKey(1), 1)).await;
}),
fasync::Task::spawn(async move {
let iter =
skip_list.seek(Bound::Included(&TestKey(2))).await.expect("seek failed");
match iter.get() {
Some(ItemRef { key: TestKey(2), .. }) => {}
result => assert!(false, "{:?}", result),
}
})
);
}
}
#[fasync::run(10, test)]
async fn test_commit_and_wait_waits() {
let skip_list = SkipListLayer::new(100);
let (send, recv) = channel();
let writer_done = Mutex::new(false);
join!(
async {
recv.await.unwrap();
let mut iter =
SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded).await;
iter.insert(Item::new(TestKey(1), 2));
iter.commit_and_wait().await;
*writer_done.lock().unwrap() = true;
},
async {
let _iter = skip_list.seek(Bound::Unbounded).await.unwrap();
send.send(()).unwrap();
// This is a halting problem so all we can do is sleep.
fasync::Timer::new(Duration::from_millis(100)).await;
assert_eq!(*writer_done.lock().unwrap(), false);
}
);
}
}