blob: 4a29a737be92309a636e442822050b4dbd246146 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
lsm_tree::{
merge::{
ItemOp::{Discard, Keep, Replace},
MergeLayerIterator, MergeResult,
},
types::{Item, LayerIterator},
},
object_store::allocator::{AllocatorKey, AllocatorValue},
},
anyhow::Error,
std::collections::HashSet,
};
pub fn merge(
left: &MergeLayerIterator<'_, AllocatorKey, AllocatorValue>,
right: &MergeLayerIterator<'_, AllocatorKey, AllocatorValue>,
) -> MergeResult<AllocatorKey, AllocatorValue> {
// Wherever Replace is used below, it must not extend the *end* of the range for whichever item
// is returned i.e. if replacing the left item, replacement.end <= left.end because otherwise we
// might not merge records that come after that end point because the merger won't merge records
// in the same layer
/* Case 1: Disjoint
* L: |------------|
* R: |-----------|
*/
if left.key().device_range.end < right.key().device_range.start {
return MergeResult::EmitLeft;
}
/* Case 2: Touching
* L: |------------|
* R: |-----------|
*/
if left.key().device_range.end == right.key().device_range.start {
// We can only merge the range if the values are an exact match.
if *left.value() == *right.value() {
return MergeResult::Other {
emit: None,
left: Discard,
right: Replace(Item {
key: AllocatorKey {
device_range: left.key().device_range.start..right.key().device_range.end,
},
value: left.value().clone(),
sequence: std::cmp::min(left.sequence(), right.sequence()),
}),
};
} else {
return MergeResult::EmitLeft;
}
}
if left.key().device_range.start == right.key().device_range.start {
/* Case 3: Overlap with same start
* L: |------------|
* R: |-----------------|
*/
if left.key().device_range.end < right.key().device_range.end {
// The newer value eclipses the older.
if left.layer_index < right.layer_index {
return MergeResult::Other {
emit: None,
left: Keep,
right: if left.key().device_range.end == right.key().device_range.end {
Discard
} else {
Replace(Item {
key: AllocatorKey {
device_range: left.key().device_range.end
..right.key().device_range.end,
},
value: right.value().clone(),
sequence: right.sequence(),
})
},
};
} else {
// right is a newer Abs/None than left
return MergeResult::Other { emit: None, left: Discard, right: Keep };
}
/* Case 4: Overlap with same start
* L: |-----------------|
* R: |------------|
*/
} else {
// The newer value eclipses the older.
if right.layer_index < left.layer_index {
return MergeResult::Other {
emit: None,
left: if right.key().device_range.end == left.key().device_range.end {
Discard
} else {
Replace(Item {
key: AllocatorKey {
device_range: right.key().device_range.end
..left.key().device_range.end,
},
value: left.value().clone(),
sequence: left.sequence(),
})
},
right: Keep,
};
} else {
// right is a newer Abs/None than left
return MergeResult::Other { emit: None, left: Keep, right: Discard };
}
}
}
/* Case 5: Split off left prefix
* L: |-----...
* R: |-----...
*/
debug_assert!(left.key().device_range.end >= right.key().device_range.start);
MergeResult::Other {
emit: Some(Item {
key: AllocatorKey {
device_range: left.key().device_range.start..right.key().device_range.start,
},
value: left.value().clone(),
sequence: left.sequence(),
}),
left: Replace(Item {
key: AllocatorKey {
device_range: right.key().device_range.start..left.key().device_range.end,
},
value: left.value().clone(),
sequence: left.sequence(),
}),
right: Keep,
}
}
pub async fn filter_tombstones(
iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
Ok(iter.filter(|i| *i.value != AllocatorValue::None).await?)
}
pub async fn filter_marked_for_deletion(
iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
marked_for_deletion: HashSet<u64>,
) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
Ok(iter
.filter(move |i| {
if let AllocatorValue::Abs { owner_object_id, .. } = i.value {
!marked_for_deletion.contains(owner_object_id)
} else {
true
}
})
.await?)
}
#[cfg(test)]
mod tests {
use {
crate::{
lsm_tree::{
cache::NullCache,
types::{Item, ItemRef, LayerIterator},
LSMTree,
},
object_handle::INVALID_OBJECT_ID,
object_store::allocator::{
merge::{filter_tombstones, merge},
AllocatorKey, AllocatorValue,
},
},
std::ops::{Bound, Range},
};
// Tests merge logic given (range, delta and object_id) for left, right and expected output.
async fn test_merge(
left: (Range<u64>, AllocatorValue),
right: (Range<u64>, AllocatorValue),
expected: &[(Range<u64>, AllocatorValue)],
) {
let tree = LSMTree::new(merge, Box::new(NullCache {}));
tree.insert(Item::new(AllocatorKey { device_range: right.0 }, right.1))
.expect("insert error");
tree.seal();
tree.insert(Item::new(AllocatorKey { device_range: left.0 }, left.1))
.expect("insert error");
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = filter_tombstones(merger.seek(Bound::Unbounded).await.expect("seek failed"))
.await
.expect("filter failed");
for e in expected {
let ItemRef { key, value, .. } = iter.get().expect("get failed");
assert_eq!((key, value), (&AllocatorKey { device_range: e.0.clone() }, &e.1));
iter.advance().await.expect("advance failed");
}
assert!(iter.get().is_none());
}
#[fuchsia::test]
async fn test_no_overlap() {
test_merge(
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(200..300, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
&[
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(200..300, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
],
)
.await;
}
#[fuchsia::test]
async fn test_touching() {
test_merge(
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
&[(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
)
.await;
}
#[fuchsia::test]
async fn test_identical() {
test_merge(
(0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
&[(0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 })],
)
.await;
test_merge(
(0..100, AllocatorValue::None),
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
&[],
)
.await;
}
#[fuchsia::test]
async fn test_left_smaller_than_right_with_same_start() {
test_merge(
(0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
&[
(0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
(100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
],
)
.await;
test_merge(
(0..100, AllocatorValue::None),
(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
&[(100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
)
.await;
}
#[fuchsia::test]
async fn test_left_starts_before_right_with_overlap() {
test_merge(
(0..200, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
(100..150, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
&[
(0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
(100..200, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
],
)
.await;
}
#[fuchsia::test]
async fn test_different_object_id() {
// Case 1
test_merge(
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(200..300, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
&[
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(200..300, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
],
)
.await;
// Case 2
test_merge(
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
&[
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
],
)
.await;
// Case 3
test_merge(
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
&[(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
)
.await;
// Case 4
test_merge(
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
&[
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
],
)
.await;
// Case 5
test_merge(
(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
&[(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
)
.await;
// Case 6
test_merge(
(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(50..150, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
&[
(0..50, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(50..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
(100..150, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
],
)
.await;
}
#[fuchsia::test]
async fn test_tombstones() {
// We have to make sure we don't prematurely discard records. seal() may be called at
// any time and the resulting layer tree must remain valid.
// Here we test absolute allocation counts and reuse of allocated space.
//
// 1. Alloc object_id A, write layer file.
// 2. Dealloc object_id A, Alloc object_id B, write layer file.
// 3. Dealloc object_id B, Alloc object_id A.
let key = AllocatorKey { device_range: 0..100 };
let lower_bound = AllocatorKey::lower_bound_for_merge_into(&key);
let tree = LSMTree::new(merge, Box::new(NullCache {}));
tree.merge_into(
Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
&lower_bound,
);
tree.seal();
tree.merge_into(
Item::new(key.clone(), AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
&lower_bound,
);
tree.seal();
tree.merge_into(Item::new(key.clone(), AllocatorValue::None), &lower_bound);
tree.merge_into(
Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
&lower_bound,
);
tree.seal();
tree.merge_into(Item::new(key.clone(), AllocatorValue::None), &lower_bound);
tree.merge_into(
Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
&lower_bound,
);
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
let ItemRef { key: k, value, .. } = iter.get().expect("get failed");
assert_eq!((k, value), (&key, &AllocatorValue::Abs { count: 1, owner_object_id: 1 }));
iter.advance().await.expect("advance failed");
assert!(iter.get().is_none());
}
#[fuchsia::test]
async fn test_merge_preserves_sequences() {
let tree = LSMTree::new(merge, Box::new(NullCache {}));
// |1-1-1-1|
tree.insert(Item {
key: AllocatorKey { device_range: 0..100 },
value: AllocatorValue::Abs { count: 1, owner_object_id: INVALID_OBJECT_ID },
sequence: 1u64,
})
.expect("insert error");
tree.seal();
// |1|0|1-1|
tree.insert(Item {
key: AllocatorKey { device_range: 25..50 },
value: AllocatorValue::None,
sequence: 2u64,
})
.expect("insert error");
// |1|0|1|2|
tree.insert(Item {
key: AllocatorKey { device_range: 75..100 },
value: AllocatorValue::Abs { count: 2, owner_object_id: INVALID_OBJECT_ID },
sequence: 3u64,
})
.expect("insert error");
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = filter_tombstones(merger.seek(Bound::Unbounded).await.expect("seek failed"))
.await
.expect("filter failed");
assert_eq!(iter.get().unwrap().key, &AllocatorKey { device_range: 0..25 });
assert_eq!(
iter.get().unwrap().value,
&AllocatorValue::Abs { count: 1, owner_object_id: INVALID_OBJECT_ID }
);
assert_eq!(iter.get().unwrap().sequence, 1u64);
iter.advance().await.expect("advance failed");
assert_eq!(iter.get().unwrap().key, &AllocatorKey { device_range: 50..75 });
assert_eq!(
iter.get().unwrap().value,
&AllocatorValue::Abs { count: 1, owner_object_id: INVALID_OBJECT_ID }
);
assert_eq!(iter.get().unwrap().sequence, 1u64);
iter.advance().await.expect("advance failed");
assert_eq!(iter.get().unwrap().key, &AllocatorKey { device_range: 75..100 });
assert_eq!(
iter.get().unwrap().value,
&AllocatorValue::Abs { count: 2, owner_object_id: INVALID_OBJECT_ID }
);
assert_eq!(iter.get().unwrap().sequence, 3u64);
iter.advance().await.expect("advance failed");
assert!(iter.get().is_none());
}
}