blob: eea0847d35ea84238302ba3235a9fbc1e459917b [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
super::extent_record::{ExtentKey, ExtentValue},
super::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue},
crate::lsm_tree::{
merge::{
ItemOp::{Discard, Keep, Replace},
MergeLayerIterator, MergeResult,
},
types::Item,
},
};
fn merge_extents(
object_id: u64,
attribute_id: u64,
left: &MergeLayerIterator<'_, ObjectKey, ObjectValue>,
right: &MergeLayerIterator<'_, ObjectKey, ObjectValue>,
left_key: &ExtentKey,
right_key: &ExtentKey,
left_value: &ExtentValue,
right_value: &ExtentValue,
) -> MergeResult<ObjectKey, ObjectValue> {
// For now, we don't support/expect two extents with the same key in one layer.
// One reason you can't merge deleted extents in non-adjacent layers is because you
// can't decide which layer the merge should end up in.
//
// Consider this scenario:
// |X-X-X|
// |a-a-a|
// |X-X-X|
// If you merge the deleted extents here, you might end up with:
// |X-X-X-X-X-X|
// |a-a-a|
// which is clearly incorrect.
debug_assert!(right.layer_index != left.layer_index);
if let (ExtentValue::None, ExtentValue::None) = (left_value, right_value) {
if (left.layer_index as i32 - right.layer_index as i32).abs() == 1 {
// Two deletions in adjacent layers can be merged.
return merge_deleted_extents(
object_id,
attribute_id,
left_key,
right_key,
std::cmp::min(left.sequence(), right.sequence()),
);
}
}
if left_key.range.end <= right_key.range.start {
// Extents don't overlap.
return MergeResult::EmitLeft;
}
// The start of the left extent is <= the start of the right extent, due to merge key ordering.
//
// One of the extents has to win. The way we break this tie is by picking the extent from the
// newest layer (i.e. the layer with the lowest index).
//
// Generally, we'll be doing the following:
//
// Old |----------|
// New |----------|
//
// Turns into
//
// Emit |------|
// Old |--|
// New |----------|
if right.layer_index < left.layer_index {
// Right layer is newer.
debug_assert!(left_key.range.start < right_key.range.start);
return MergeResult::Other {
emit: Some(Item::new_with_sequence(
ObjectKey::extent(
object_id,
attribute_id,
left_key.range.start..right_key.range.start,
),
ObjectValue::Extent(left_value.shrunk(
left_key.range.end - left_key.range.start,
right_key.range.start - left_key.range.start,
)),
std::cmp::min(left.sequence(), right.sequence()),
)),
left: Replace(Item::new_with_sequence(
ObjectKey::extent(
object_id,
attribute_id,
right_key.range.start..left_key.range.end,
),
ObjectValue::Extent(left_value.offset_by(
right_key.range.start - left_key.range.start,
left_key.range.end - left_key.range.start,
)),
std::cmp::min(left.sequence(), right.sequence()),
)),
right: Keep,
};
}
// Left layer is newer.
if left_key.range.end >= right_key.range.end {
// The left key entirely contains the right key.
return MergeResult::Other { emit: None, left: Keep, right: Discard };
}
MergeResult::Other {
emit: None,
left: Keep,
right: Replace(Item::new_with_sequence(
ObjectKey::extent(object_id, attribute_id, left_key.range.end..right_key.range.end),
ObjectValue::Extent(right_value.offset_by(
left_key.range.end - right_key.range.start,
right_key.range.end - right_key.range.start,
)),
std::cmp::min(left.sequence(), right.sequence()),
)),
}
}
// Assumes that the two extents to be merged are on adjacent layers (i.e. layers N, N+1).
fn merge_deleted_extents(
object_id: u64,
attribute_id: u64,
left_key: &ExtentKey,
right_key: &ExtentKey,
sequence: u64,
) -> MergeResult<ObjectKey, ObjectValue> {
if left_key.range.end < right_key.range.start {
// The extents are not adjacent or overlapping.
return MergeResult::EmitLeft;
}
// Both of these are deleted extents which are either adjacent or overlapping, which means
// we can coalece the records.
if left_key.range.end >= right_key.range.end {
// The left deletion eclipses the right, so just keep the left.
return MergeResult::Other { emit: None, left: Keep, right: Discard };
}
MergeResult::Other {
emit: None,
left: Discard,
right: Replace(Item::new_with_sequence(
ObjectKey::extent(object_id, attribute_id, left_key.range.start..right_key.range.end),
ObjectValue::deleted_extent(),
sequence,
)),
}
}
/// Merge function for items in the object store.
///
/// The most interesting behaviour in this merge function is how extents are handled. Since extents
/// can overlap and replace one another, the merge function generally builds up the most
/// recent view of the extents in the tree, so that the output of a full merge contains no
/// overlapping extents. You can imagine looking down at the extents from the top-most layer.
///
/// A brief example:
///
/// Layer 0 |a-a-a-a| |b-b-b|
/// Layer 1 |c-c-c-c-c|
/// Layer 2 |d-d-d-d|
///
/// Merged |a-a-a-a|c| |b-b-b|d-d-d|
///
/// Adjacent or overlapping extent deletions in two adjacent layers can be merged into single
/// records (since they do not have a physical offset, so there's no need to keep the physical
/// extents contiguous). We can't merge deletions from non-adjacent layers, since that would
/// cause issues in situations like this:
///
/// Layer 0 |X-X-X|
/// Layer 1 |a-a-a-a-a-a|
/// Layer 2 |X-X-X|
///
/// Merging the two deletions in layers 0 and 2 would either result in the middle extent being
/// fully occluded or not at all (depending on whether we replaced on the left or right layer).
pub fn merge(
left: &MergeLayerIterator<'_, ObjectKey, ObjectValue>,
right: &MergeLayerIterator<'_, ObjectKey, ObjectValue>,
) -> MergeResult<ObjectKey, ObjectValue> {
if left.key().object_id != right.key().object_id {
return MergeResult::EmitLeft;
}
match (left.key(), right.key(), left.value(), right.value()) {
(
ObjectKey {
object_id,
data: ObjectKeyData::Attribute(left_attr_id, AttributeKey::Extent(left_extent_key)),
},
ObjectKey {
object_id: _,
data:
ObjectKeyData::Attribute(right_attr_id, AttributeKey::Extent(right_extent_key)),
},
ObjectValue::Extent(left_extent),
ObjectValue::Extent(right_extent),
) if left_attr_id == right_attr_id => {
return merge_extents(
*object_id,
*left_attr_id,
left,
right,
left_extent_key,
right_extent_key,
left_extent,
right_extent,
);
}
// Tombstones (ObjectKeyData::Object) compare before others, so always appear on left.
(ObjectKey { data: ObjectKeyData::Object, .. }, _, ObjectValue::None, _) => {
debug_assert!(left.layer_index < right.layer_index);
MergeResult::Other { emit: None, left: Keep, right: Discard }
}
// Note that identical keys are sorted by layer_index, so left is always newer.
(left_key, right_key, _, _) if left_key == right_key => {
debug_assert!(left.layer_index < right.layer_index);
MergeResult::Other { emit: None, left: Keep, right: Discard }
}
_ => MergeResult::EmitLeft,
}
}
#[cfg(test)]
mod tests {
use {
super::merge,
crate::{
lsm_tree::{
types::{Item, LayerIterator, MergeableKey, Value},
LSMTree,
},
object_store::extent_record::{Checksums, ExtentValue},
object_store::object_record::{AttributeKey, ObjectKey, ObjectValue, Timestamp},
},
anyhow::Error,
fuchsia_async as fasync,
std::ops::Bound,
};
async fn test_merge<K: MergeableKey, V: Value + PartialEq>(
tree: &LSMTree<K, V>,
layer0: &[Item<K, V>],
layer1: &[Item<K, V>],
expected: &[Item<K, V>],
) {
for item in layer1 {
tree.insert(item.clone()).await;
}
tree.seal().await;
for item in layer0 {
tree.insert(item.clone()).await;
}
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
for e in expected {
assert_eq!(iter.get().expect("get failed"), e.as_item_ref());
iter.advance().await.expect("advance failed");
}
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_non_overlapping() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::Extent(ExtentValue::new(0)),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::Extent(ExtentValue::new(16384)),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_rewrite_right() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::Extent(ExtentValue::new(0)),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::Extent(ExtentValue::new(16384)),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(0)));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(16384)));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_rewrite_left() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::Extent(ExtentValue::with_checksum(0, Checksums::Fletcher(vec![1, 2]))),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::Extent(ExtentValue::with_checksum(16384, Checksums::Fletcher(vec![3]))),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(
iter.get().unwrap().value,
&ObjectValue::Extent(ExtentValue::with_checksum(16384, Checksums::Fletcher(vec![3])))
);
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
assert_eq!(
iter.get().unwrap().value,
&ObjectValue::Extent(ExtentValue::with_checksum(512, Checksums::Fletcher(vec![2])))
);
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_rewrite_middle() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..2048),
ObjectValue::Extent(ExtentValue::with_checksum(
0,
Checksums::Fletcher(vec![1, 2, 3, 4]),
)),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::Extent(ExtentValue::with_checksum(16384, Checksums::Fletcher(vec![5]))),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1024));
assert_eq!(
iter.get().unwrap().value,
&ObjectValue::Extent(ExtentValue::with_checksum(0, Checksums::Fletcher(vec![1, 2])))
);
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1024..1536));
assert_eq!(
iter.get().unwrap().value,
&ObjectValue::Extent(ExtentValue::with_checksum(16384, Checksums::Fletcher(vec![5])))
);
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1536..2048));
assert_eq!(
iter.get().unwrap().value,
&ObjectValue::Extent(ExtentValue::with_checksum(1536, Checksums::Fletcher(vec![4])))
);
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_rewrite_eclipses() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::Extent(ExtentValue::new(0)),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..2048),
ObjectValue::Extent(ExtentValue::new(16384)),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..2048));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(16384)));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_delete_left() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::Extent(ExtentValue::new(0)),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(512)));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_delete_right() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::Extent(ExtentValue::new(0)),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(0)));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_delete_middle() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..2048),
ObjectValue::Extent(ExtentValue::new(0)),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(0)));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1024..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1536..2048));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(1536)));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_delete_eclipses() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::Extent(ExtentValue::new(0)),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..2048),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..2048));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_new_layer_joins_two_deletions() -> Result<(), Error> {
// Old layer: [----] [----]
// New layer: [----]
// Merged: [--------------]
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::deleted_extent(),
))
.await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::deleted_extent(),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.seal().await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_new_layer_joined_by_old_deletion() -> Result<(), Error> {
// Old layer: [----]
// New layer: [----] [----]
// Merged: [--------------]
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::deleted_extent(),
))
.await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::deleted_extent(),
))
.await;
tree.seal().await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_overlapping_newest_on_right() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1536),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_overlapping_newest_on_left() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1536),
ObjectValue::deleted_extent(),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_new_layer_contained_in_old() -> Result<(), Error> {
// Old layer: [--------------]
// New layer: [----]
// Merged: [--------------]
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1536),
ObjectValue::deleted_extent(),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.seal().await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_new_layer_eclipses_old() -> Result<(), Error> {
// Old layer: [----]
// New layer: [--------------]
// Merged: [--------------]
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1536),
ObjectValue::deleted_extent(),
))
.await;
tree.seal().await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_does_not_coalesce_if_not_adjacent_layers(
) -> Result<(), Error> {
// Layer 0: [XXXXX]
// Layer 1: [--------------]
// Layer 2: [XXXXXXXX]
// Merged: [XXXXX|--------]
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::Extent(ExtentValue::new(0)),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(512)));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_does_not_coalesce_if_not_adjacent_deletions(
) -> Result<(), Error> {
// Layer 0: [XXXXX|--------]
// Layer 1: [XXXXX]
// Merged: [XXXXX|--------]
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::Extent(ExtentValue::new(0)),
))
.await;
tree.seal().await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::deleted_extent(),
))
.await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1536),
ObjectValue::Extent(ExtentValue::new(0)),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(0)));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extent_into_overwrites_extents() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::Extent(ExtentValue::new(0)),
))
.await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..2048),
ObjectValue::Extent(ExtentValue::new(16384)),
))
.await;
let key = ObjectKey::extent(object_id, attr_id, 512..1536);
tree.merge_into(
Item::new(key.clone(), ObjectValue::deleted_extent()),
&key.key_for_merge_into(),
)
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(0)));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1536..2048));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(16896)));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extent_into_merges_with_other_deletions() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..2048),
ObjectValue::deleted_extent(),
))
.await;
let key = ObjectKey::extent(object_id, attr_id, 512..1536);
tree.merge_into(
Item::new(key.clone(), ObjectValue::deleted_extent()),
&key.key_for_merge_into(),
)
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..2048));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_size_records() {
let left =
&[Item::new(ObjectKey::attribute(1, 0, AttributeKey::Size), ObjectValue::attribute(5))];
let right = &[Item::new(
ObjectKey::attribute(1, 0, AttributeKey::Size),
ObjectValue::attribute(10),
)];
let tree = LSMTree::new(merge);
test_merge(&tree, left, right, left).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_different_attributes_not_merged() {
let left =
Item::new(ObjectKey::attribute(1, 0, AttributeKey::Size), ObjectValue::attribute(5));
let right =
Item::new(ObjectKey::attribute(1, 1, AttributeKey::Size), ObjectValue::attribute(10));
let tree = LSMTree::new(merge);
test_merge(&tree, &[left.clone()], &[right.clone()], &[left, right]).await;
let left =
Item::new(ObjectKey::extent(1, 0, 0..100), ObjectValue::Extent(ExtentValue::new(0)));
let right =
Item::new(ObjectKey::extent(1, 1, 0..100), ObjectValue::Extent(ExtentValue::new(1)));
let tree = LSMTree::new(merge);
test_merge(&tree, &[left.clone()], &[right.clone()], &[left, right]).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_tombstone_discards_all_other_records() {
let tombstone = Item::new(ObjectKey::object(1), ObjectValue::None);
let other_object = Item::new(
ObjectKey::object(2),
ObjectValue::file(1, 0, Timestamp::default(), Timestamp::default()),
);
let tree = LSMTree::new(merge);
test_merge(
&tree,
&[tombstone.clone()],
&[
Item::new(
ObjectKey::object(1),
ObjectValue::file(1, 100, Timestamp::default(), Timestamp::default()),
),
Item::new(
ObjectKey::attribute(1, 0, AttributeKey::Size),
ObjectValue::attribute(100),
),
other_object.clone(),
],
&[tombstone, other_object],
)
.await;
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_preserves_sequences() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item {
key: ObjectKey::extent(object_id, attr_id, 0..1024),
value: ObjectValue::Extent(ExtentValue::new(0u64)),
sequence: 1u64,
})
.await;
tree.seal().await;
tree.insert(Item {
key: ObjectKey::extent(object_id, attr_id, 0..512),
value: ObjectValue::deleted_extent(),
sequence: 2u64,
})
.await;
tree.insert(Item {
key: ObjectKey::extent(object_id, attr_id, 1536..2048),
value: ObjectValue::Extent(ExtentValue::new(1536)),
sequence: 3u64,
})
.await;
tree.insert(Item {
key: ObjectKey::extent(object_id, attr_id, 768..1024),
value: ObjectValue::Extent(ExtentValue::new(12345)),
sequence: 4u64,
})
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
assert_eq!(iter.get().unwrap().sequence, 2u64);
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..768));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(512)));
assert_eq!(iter.get().unwrap().sequence, 1u64);
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 768..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(12345)));
assert_eq!(iter.get().unwrap().sequence, 4u64);
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1536..2048));
assert_eq!(iter.get().unwrap().value, &ObjectValue::Extent(ExtentValue::new(1536)));
assert_eq!(iter.get().unwrap().sequence, 3u64);
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
}