blob: d0586b93e8dc3004298d7da976af2613b784f256 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
super::record::{
AttributeKey, ExtentKey, ExtentValue, ObjectItem, ObjectKey, ObjectKeyData, ObjectValue,
},
crate::lsm_tree::merge::{
ItemOp::{Discard, Keep, Replace},
MergeLayerIterator, MergeResult,
},
};
fn merge_extents<'a>(
object_id: u64,
attribute_id: u64,
left_layer: u16,
right_layer: u16,
left_key: &ExtentKey,
right_key: &ExtentKey,
left_value: &ExtentValue,
right_value: &ExtentValue,
) -> MergeResult<ObjectKey, ObjectValue> {
// For now, we don't support/expect two extents with the same key in one layer.
debug_assert!(right_layer != left_layer);
// TODO(jfsulliv): once we are at the base layer, we should be deleting records mapping to
// deleted extents. Otherwise they'll stick around forever.
if left_key.range.end <= right_key.range.start {
// Extents don't overlap.
return MergeResult::EmitLeft;
}
// The start of the left extent is <= the start of the right extent, due to merge key ordering.
//
// One of the extents has to win. The way we break this tie is by picking the extent from the
// newest layer (i.e. the layer with the lowest index).
//
// Generally, we'll be doing the following:
//
// Old |----------|
// New |----------|
//
// Turns into
//
// Emit |------|
// Old |--|
// New |----------|
if right_layer < left_layer {
// Right layer is newer.
debug_assert!(left_key.range.start < right_key.range.start);
return MergeResult::Other {
emit: Some(ObjectItem::new(
ObjectKey::extent(
object_id,
attribute_id,
left_key.range.start..right_key.range.start,
),
ObjectValue::Extent(*left_value),
)),
left: Replace(ObjectItem::new(
ObjectKey::extent(
object_id,
attribute_id,
right_key.range.start..left_key.range.end,
),
ObjectValue::Extent(
left_value.offset_by(right_key.range.start - left_key.range.start),
),
)),
right: Keep,
};
}
// Left layer is newer.
if left_key.range.end >= right_key.range.end {
// The left key entirely contains the right key.
return MergeResult::Other { emit: None, left: Keep, right: Discard };
}
MergeResult::Other {
emit: None,
left: Keep,
right: Replace(ObjectItem::new(
ObjectKey::extent(object_id, attribute_id, left_key.range.end..right_key.range.end),
ObjectValue::Extent(right_value.offset_by(left_key.range.end - right_key.range.start)),
)),
}
}
// Assumes that the two extents to be merged are on adjacent layers (i.e. layers N, N+1).
fn merge_deleted_extents<'a>(
object_id: u64,
attribute_id: u64,
left_key: &ExtentKey,
right_key: &ExtentKey,
) -> MergeResult<ObjectKey, ObjectValue> {
if left_key.range.end < right_key.range.start {
// The extents are not adjacent or overlapping.
return MergeResult::EmitLeft;
}
// Both of these are deleted extents which are either adjacent or overlapping, which means
// we can coalece the records.
if left_key.range.end >= right_key.range.end {
// The left deletion eclipses the right, so just keep the left.
return MergeResult::Other { emit: None, left: Keep, right: Discard };
}
MergeResult::Other {
emit: None,
left: Discard,
right: Replace(ObjectItem::new(
ObjectKey::extent(object_id, attribute_id, left_key.range.start..right_key.range.end),
ObjectValue::deleted_extent(),
)),
}
}
/// Merge function for items in the object store.
///
/// The most interesting behaviour in this merge function is how extents are handled. Since extents
/// can overlap and replace one another, the merge function generally builds up the most
/// recent view of the extents in the tree, so that the output of a full merge contains no
/// overlapping extents. You can imagine looking down at the extents from the top-most layer.
///
/// A brief example:
///
/// Layer 0 |a-a-a-a| |b-b-b|
/// Layer 1 |c-c-c-c-c|
/// Layer 2 |d-d-d-d|
///
/// Merged |a-a-a-a|c| |b-b-b|d-d-d|
///
/// Adjacent or overlapping extent deletions in two adjacent layers can be merged into single
/// records (since they do not have a physical offset, so there's no need to keep the physical
/// extents contiguous). We can't merge deletions from non-adjacent layers, since that would
/// cause issues in situations like this:
///
/// Layer 0 |X-X-X|
/// Layer 1 |a-a-a-a-a-a|
/// Layer 2 |X-X-X|
///
/// Merging the two deletions in layers 0 and 2 would either result in the middle extent being
/// fully occluded or not at all (depending on whether we replaced on the left or right layer).
///
/// TODO(jfsulliv): At the base layer, we should prune deleted extents completely.
pub fn merge(
left: &MergeLayerIterator<'_, ObjectKey, ObjectValue>,
right: &MergeLayerIterator<'_, ObjectKey, ObjectValue>,
) -> MergeResult<ObjectKey, ObjectValue> {
if left.key().object_id != right.key().object_id {
return MergeResult::EmitLeft;
}
match (left.key(), right.key(), left.value(), right.value()) {
(
ObjectKey {
object_id,
data: ObjectKeyData::Attribute(left_attr_id, AttributeKey::Extent(left_extent_key)),
},
ObjectKey {
object_id: _,
data:
ObjectKeyData::Attribute(right_attr_id, AttributeKey::Extent(right_extent_key)),
},
ObjectValue::Extent(left_extent),
ObjectValue::Extent(right_extent),
) if left_attr_id == right_attr_id => {
if let (None, None) = (left_extent.device_offset, right_extent.device_offset) {
if (left.layer_index as i32 - right.layer_index as i32).abs() == 1 {
// Two deletions in adjacent layers can be merged.
return merge_deleted_extents(
*object_id,
*left_attr_id,
left_extent_key,
right_extent_key,
);
}
}
return merge_extents(
*object_id,
*left_attr_id,
left.layer_index,
right.layer_index,
left_extent_key,
right_extent_key,
left_extent,
right_extent,
);
}
(ObjectKey { data: ObjectKeyData::Tombstone, .. }, _, _, _) => {
MergeResult::Other { emit: None, left: Keep, right: Discard }
}
(left_key, right_key, _, _) if left_key == right_key => {
MergeResult::Other { emit: None, left: Keep, right: Discard }
}
_ => MergeResult::EmitLeft,
}
}
#[cfg(test)]
mod tests {
use {
super::merge,
crate::{
lsm_tree::{
types::{Item, LayerIterator},
LSMTree,
},
object_store::record::{ObjectDescriptor, ObjectItem, ObjectKey, ObjectValue},
},
anyhow::Error,
fuchsia_async as fasync,
std::ops::Bound,
};
async fn test_merge(layer0: &[ObjectItem], layer1: &[ObjectItem], expected: &[ObjectItem]) {
let tree = LSMTree::new(merge);
for item in layer1 {
tree.insert(item.clone()).await;
}
tree.seal();
for item in layer0 {
tree.insert(item.clone()).await;
}
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
for e in expected {
assert_eq!(iter.get().expect("get failed"), e.as_item_ref());
iter.advance().await.expect("advance failed");
}
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_non_overlapping() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::extent(0),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::extent(16384),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_rewrite_right() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::extent(0),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::extent(16384),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(0));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(16384));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_rewrite_left() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::extent(0),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::extent(16384),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(16384));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(512));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_rewrite_middle() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..2048),
ObjectValue::extent(0),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::extent(16384),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(0));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1024..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(16384));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1536..2048));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(1536));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_rewrite_eclipses() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::extent(0),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..2048),
ObjectValue::extent(16384),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..2048));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(16384));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_delete_left() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::extent(0),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(512));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_delete_right() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::extent(0),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(0));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_delete_middle() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..2048),
ObjectValue::extent(0),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(0));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1024..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1536..2048));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(1536));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_extents_delete_eclipses() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::extent(0),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..2048),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..2048));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_new_layer_joins_two_deletions() -> Result<(), Error> {
// Old layer: [----] [----]
// New layer: [----]
// Merged: [--------------]
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::deleted_extent(),
))
.await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::deleted_extent(),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.seal();
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_new_layer_joined_by_old_deletion() -> Result<(), Error> {
// Old layer: [----]
// New layer: [----] [----]
// Merged: [--------------]
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::deleted_extent(),
))
.await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::deleted_extent(),
))
.await;
tree.seal();
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_overlapping_newest_on_right() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1536),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_overlapping_newest_on_left() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1536),
ObjectValue::deleted_extent(),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_new_layer_contained_in_old() -> Result<(), Error> {
// Old layer: [--------------]
// New layer: [----]
// Merged: [--------------]
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1536),
ObjectValue::deleted_extent(),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.seal();
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_new_layer_eclipses_old() -> Result<(), Error> {
// Old layer: [----]
// New layer: [--------------]
// Merged: [--------------]
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1536),
ObjectValue::deleted_extent(),
))
.await;
tree.seal();
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_does_not_coalesce_if_not_adjacent_layers(
) -> Result<(), Error> {
// Layer 0: [XXXXX]
// Layer 1: [--------------]
// Layer 2: [XXXXXXXX]
// Merged: [XXXXX|--------]
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::extent(0),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::deleted_extent(),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(512));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extents_does_not_coalesce_if_not_adjacent_deletions(
) -> Result<(), Error> {
// Layer 0: [XXXXX|--------]
// Layer 1: [XXXXX]
// Merged: [XXXXX|--------]
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..1536),
ObjectValue::extent(0),
))
.await;
tree.seal();
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..512),
ObjectValue::deleted_extent(),
))
.await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 512..1536),
ObjectValue::extent(0),
))
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(0));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extent_into_overwrites_extents() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::extent(0),
))
.await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..2048),
ObjectValue::extent(16384),
))
.await;
let key = ObjectKey::extent(object_id, attr_id, 512..1536);
tree.merge_into(
Item::new(key.clone(), ObjectValue::deleted_extent()),
&key.key_for_merge_into(),
)
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(0));
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1536));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1536..2048));
assert_eq!(iter.get().unwrap().value, &ObjectValue::extent(16896));
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_deleted_extent_into_merges_with_other_deletions() -> Result<(), Error> {
let object_id = 0;
let attr_id = 0;
let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge);
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 0..1024),
ObjectValue::deleted_extent(),
))
.await;
tree.insert(Item::new(
ObjectKey::extent(object_id, attr_id, 1024..2048),
ObjectValue::deleted_extent(),
))
.await;
let key = ObjectKey::extent(object_id, attr_id, 512..1536);
tree.merge_into(
Item::new(key.clone(), ObjectValue::deleted_extent()),
&key.key_for_merge_into(),
)
.await;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(std::ops::Bound::Unbounded).await?;
assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..2048));
assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
iter.advance().await?;
assert!(iter.get().is_none());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_size_records() {
let left = &[Item::new(ObjectKey::attribute(1, 0), ObjectValue::attribute(5))];
let right = &[Item::new(ObjectKey::attribute(1, 0), ObjectValue::attribute(10))];
test_merge(left, right, left).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_different_attributes_not_merged() {
let left = Item::new(ObjectKey::attribute(1, 0), ObjectValue::attribute(5));
let right = Item::new(ObjectKey::attribute(1, 1), ObjectValue::attribute(10));
test_merge(&[left.clone()], &[right.clone()], &[left, right]).await;
let left = Item::new(ObjectKey::extent(1, 0, 0..100), ObjectValue::extent(0));
let right = Item::new(ObjectKey::extent(1, 1, 0..100), ObjectValue::extent(1));
test_merge(&[left.clone()], &[right.clone()], &[left, right]).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_tombstone_discards_all_other_records() {
let tombstone = Item::new(ObjectKey::tombstone(1), ObjectValue::None);
let other_object =
Item::new(ObjectKey::object(2), ObjectValue::object(ObjectDescriptor::File, 1));
test_merge(
&[tombstone.clone()],
&[
Item::new(ObjectKey::object(1), ObjectValue::object(ObjectDescriptor::File, 1)),
Item::new(ObjectKey::attribute(1, 0), ObjectValue::attribute(100)),
Item::new(ObjectKey::extent(1, 0, 0..100), ObjectValue::extent(5000)),
other_object.clone(),
],
&[tombstone, other_object],
)
.await;
}
}