blob: 8c7de35014774106205010d8f9d900159155ac91 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
pub mod merge;
use {
BoxedLayerIterator, Item, ItemRef, Layer, LayerIterator, MutableLayer, NextKey,
OrdLowerBound, OrdUpperBound,
object_handle::{ObjectHandle, ObjectHandleExt, Writer},
filesystem::{Filesystem, Mutations},
transaction::{AllocatorMutation, AssocObj, Mutation, Options, Transaction},
HandleOptions, ObjectStore,
anyhow::{anyhow, bail, ensure, Error},
bincode::{deserialize_from, serialize_into},
serde::{Deserialize, Serialize},
ops::{Bound, Range},
sync::{Arc, Mutex, Weak},
/// Allocators must implement this. An allocator is responsible for allocating ranges on behalf of
/// an object-store.
pub trait Allocator: Send + Sync {
/// Returns the object ID for the allocator.
fn object_id(&self) -> u64;
/// Tries to allocate enough space for |object_range| in the specified object and returns the
/// device ranges allocated.
/// TODO(csuter): We need to think about how to deal with fragmentation e.g. returning an array
/// of allocations, or returning a partial allocation request for situations where that makes
/// sense.
async fn allocate(
transaction: &mut Transaction<'_>,
len: u64,
) -> Result<Range<u64>, Error>;
/// Deallocates the given device range for the specified object.
async fn deallocate(
transaction: &mut Transaction<'_>,
device_range: Range<u64>,
) -> Result<u64, Error>;
/// Marks the given device range as allocated. The main use case for this at this time is for
/// the super-block which needs to be at a fixed location on the device.
async fn mark_allocated(
transaction: &mut Transaction<'_>,
device_range: Range<u64>,
) -> Result<(), Error>;
/// Adds a reference to the given device range which must already be allocated.
fn add_ref(&self, transaction: &mut Transaction<'_>, device_range: Range<u64>);
/// Cast to super-trait.
fn as_mutations(self: Arc<Self>) -> Arc<dyn Mutations>;
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
/// Called when the device has been flush and indicates what the journal log offset was when
/// that happened.
async fn did_flush_device(&self, flush_log_offset: u64);
/// Returns a reservation that can be used later, or None if there is insufficient space.
fn reserve(self: Arc<Self>, amount: u64) -> Option<Reservation>;
/// Like reserve, but returns as much as available if not all of amount is available, which
/// could be zero bytes.
fn reserve_at_most(self: Arc<Self>, amount: u64) -> Reservation;
/// Releases the reservation.
fn release_reservation(&self, amount: u64);
/// Returns the number of allocated bytes.
fn get_allocated_bytes(&self) -> u64;
/// Used during replay to validate a mutation. This should return false if the mutation is not
/// valid and should not be applied. This could be for benign reasons: e.g. the device flushed
/// data out-of-order, or because of a malicious actor.
async fn validate_mutation(
journal_offset: u64,
mutation: &Mutation,
checksum_list: &mut ChecksumList,
) -> Result<bool, Error>;
/// A reservation guarantees that when it comes time to actually allocate, it will not fail due to
/// lack of space. A hold can be placed on some of the reservation, which can later be committed.
pub struct Reservation {
allocator: Arc<dyn Allocator>,
inner: Mutex<ReservationInner>,
#[derive(Debug, Default)]
struct ReservationInner {
// Amount currently held by this reservation.
amount: u64,
// The amount within this reservation that is held for some purpose.
held: u64,
impl std::fmt::Debug for Reservation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl Reservation {
pub fn new(allocator: Arc<dyn Allocator>, amount: u64) -> Self {
Self { allocator, inner: Mutex::new(ReservationInner { amount, held: 0 }) }
/// Returns the total amount of the reservation, not accounting for anything that might be held.
pub fn amount(&self) -> u64 {
/// Returns the amount available after accounting for space that is held.
pub fn avail(&self) -> u64 {
let inner = self.inner.lock().unwrap();
inner.amount - inner.held
/// Adds more to the reservation.
pub fn add(&self, amount: u64) {
self.inner.lock().unwrap().amount += amount;
/// Places a hold an `amount` from the reservation.
pub fn hold(&self, amount: u64) -> Result<(), Error> {
let mut inner = self.inner.lock().unwrap();
if amount > inner.amount - inner.held {
inner.held += amount;
/// Releases some previously held amount.
pub fn release(&self, amount: u64) {
self.inner.lock().unwrap().held -= amount;
/// Commits a previously held amount.
pub fn commit(&self, amount: u64) {
let mut inner = self.inner.lock().unwrap();
inner.amount -= amount;
inner.held -= amount;
/// Returns the entire amount of the reservation. The caller is responsible for maintaining
/// consistency, i.e. updating counters, etc.
pub fn take(&self) -> u64 {
std::mem::take(&mut *self.inner.lock().unwrap()).amount
/// Returns some of the reservation back to the allocator. Asserts that the amount with a hold
/// is still valid afterwards.
pub fn give_back(&self, amount: u64) {
let mut inner = self.inner.lock().unwrap();
inner.amount -= amount;
assert!(inner.held <= inner.amount);
impl Drop for Reservation {
fn drop(&mut self) {
// Our allocator implementation tracks extents with a reference count. At time of writing, these
// reference counts should never exceed 1, but that might change with snapshots and clones.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct AllocatorKey {
pub device_range: Range<u64>,
impl AllocatorKey {
/// Returns a new key that is a lower bound suitable for use with merge_into.
pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
AllocatorKey { device_range: 0..self.device_range.start }
impl NextKey for AllocatorKey {}
impl OrdUpperBound for AllocatorKey {
fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
impl OrdLowerBound for AllocatorKey {
fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
impl Ord for AllocatorKey {
fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
impl PartialOrd for AllocatorKey {
fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct AllocatorValue {
// This is the delta on a reference count for the extent.
pub delta: i64,
pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
#[derive(Debug, Default, Deserialize, Serialize)]
struct AllocatorInfo {
layers: Vec<u64>,
allocated_bytes: u64,
// For now this just implements a first-fit strategy. This is a very naiive implementation.
pub struct SimpleAllocator {
filesystem: Weak<dyn Filesystem>,
block_size: u32,
device_size: u64,
object_id: u64,
empty: bool,
tree: LSMTree<AllocatorKey, AllocatorValue>,
reserved_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
inner: Mutex<Inner>,
allocation_lock: futures::lock::Mutex<()>,
struct Inner {
info: AllocatorInfo,
// The allocator can only be opened if there have been no allocations and it has not already
// been opened or initialized.
opened: bool,
// When a transaction is dropped, we need to release the reservation, but that requires the use
// of async methods which we can't use when called from drop. To workaround that, we keep an
// array of dropped_allocations and update reserved_allocations the next time we try to
// allocate.
dropped_allocations: Vec<AllocatorItem>,
// This value is the up-to-date count of the number of allocated bytes whereas the value in
// `info` is the value as it was when we last flushed. This is i64 because it can be negative
// during replay.
allocated_bytes: i64,
// This value is the number of bytes allocated to either uncommitted allocations, or
// reservations.
reserved_bytes: u64,
// Committed deallocations that we cannot use until they are flushed to the device. Each entry
// in this list is the log file offset at which it was committed and an array of deallocations
// that occurred at that time.
committed_deallocations: VecDeque<(u64, Range<u64>)>,
impl SimpleAllocator {
pub fn new(filesystem: Arc<dyn Filesystem>, object_id: u64, empty: bool) -> SimpleAllocator {
SimpleAllocator {
filesystem: Arc::downgrade(&filesystem),
block_size: filesystem.device().block_size(),
device_size: filesystem.device().size(),
tree: LSMTree::new(merge),
reserved_allocations: SkipListLayer::new(1024), // TODO(csuter): magic numbers
inner: Mutex::new(Inner {
info: AllocatorInfo::default(),
opened: false,
dropped_allocations: Vec::new(),
allocated_bytes: 0,
reserved_bytes: 0,
committed_deallocations: VecDeque::new(),
allocation_lock: futures::lock::Mutex::new(()),
pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
// Ensures the allocator is open. If empty, create the object in the root object store,
// otherwise load and initialise the LSM tree.
pub async fn ensure_open(&self) -> Result<(), Error> {
if self.inner.lock().unwrap().opened {
return Ok(());
let _guard = self.allocation_lock.lock().await;
if self.inner.lock().unwrap().opened {
// We lost a race.
return Ok(());
let filesystem = self.filesystem.upgrade().unwrap();
let root_store = filesystem.root_store();
if self.empty {
let mut transaction = filesystem
.new_transaction(&[], Options { skip_journal_checks: true, ..Default::default() })
&mut transaction,
} else {
let handle =
ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default())
if handle.get_size() > 0 {
let serialized_info = handle.contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE).await?;
let info: AllocatorInfo = deserialize_from(&serialized_info[..])?;
let mut handles = Vec::new();
let mut total_size = 0;
for object_id in &info.layers {
let handle =
ObjectStore::open_object(&root_store, *object_id, HandleOptions::default())
total_size += handle.get_size();
let mut inner = self.inner.lock().unwrap();
// After replaying, allocated_bytes should include all the deltas since the time
// the allocator was last flushed, so here we just need to add whatever is
// recorded in info.
let amount: i64 =
info.allocated_bytes.try_into().map_err(|_| FxfsError::Inconsistent)?;
inner.allocated_bytes += amount;
if inner.allocated_bytes < 0 || inner.allocated_bytes as u64 > self.device_size
} = info;
.update_reservation(self.object_id, total_size);
self.inner.lock().unwrap().opened = true;
/// Returns all objects that exist in the parent store that pertain to this allocator.
pub fn parent_objects(&self) -> Vec<u64> {
// The allocator tree needs to store a file for each of the layers in the tree, so we return
// those, since nothing else references them.
impl Allocator for SimpleAllocator {
fn object_id(&self) -> u64 {
async fn allocate(
transaction: &mut Transaction<'_>,
len: u64,
) -> Result<Range<u64>, Error> {
ensure!(len % self.block_size as u64 == 0);
let _guard = self.allocation_lock.lock().await;
if let Some(reservation) = transaction.allocator_reservation {
reservation.avail() >= len,
anyhow!(FxfsError::NoSpace).context("Insufficient space in reservation")
let dropped_allocations = {
let mut inner = self.inner.lock().unwrap();
if transaction.allocator_reservation.is_none()
&& self.device_size - inner.allocated_bytes as u64 - inner.reserved_bytes < len
std::mem::take(&mut inner.dropped_allocations)
// Update reserved_allocations using dropped_allocations.
for item in dropped_allocations {
let result = {
let tree = &self.tree;
let mut layer_set = tree.empty_layer_set();
.push((self.reserved_allocations.clone() as Arc<dyn Layer<_, _>>).into());
tree.add_all_layers_to_layer_set(&mut layer_set);
let mut merger = layer_set.merger();
let mut iter =;
let mut last_offset = 0;
loop {
// TODO(csuter): This is inconsistent; here we return no-space, but otherwise we
// return less than requested.
if last_offset + len >= self.device_size as u64 {
bail!(anyhow!(FxfsError::NoSpace).context("no space after search"));
match iter.get() {
None => {
break last_offset..last_offset + len;
Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
if device_range.start > last_offset {
break last_offset..min(last_offset + len, device_range.start);
last_offset = device_range.end;
log::debug!("allocate {:?}", result);
self.mark_allocated(transaction, result.clone()).await?;
async fn mark_allocated(
transaction: &mut Transaction<'_>,
device_range: Range<u64>,
) -> Result<(), Error> {
ensure!(device_range.end <= self.device_size, FxfsError::NoSpace);
if let Some(reservation) = &mut transaction.allocator_reservation {
// This shouldn't fail because we checked the reservation had enough space at the
// beginning of allocate, after we took the lock and the lock should still be held.
} else {
self.inner.lock().unwrap().reserved_bytes += device_range.length();
let item = AllocatorItem::new(AllocatorKey { device_range }, AllocatorValue { delta: 1 });
transaction.add(self.object_id(), Mutation::allocation(item));
fn add_ref(&self, transaction: &mut Transaction<'_>, device_range: Range<u64>) {
AllocatorKey { device_range },
AllocatorValue { delta: 1 },
async fn deallocate(
transaction: &mut Transaction<'_>,
mut dealloc_range: Range<u64>,
) -> Result<u64, Error> {
log::debug!("deallocate {:?}", dealloc_range);
// We need to determine whether this deallocation actually frees the range or is just a
// reference count adjustment. We separate the two kinds into two different mutation types
// so that we can adjust our counts correctly at commit time.
let layer_set = self.tree.layer_set();
let mut merger = layer_set.merger();
// The precise search key that we choose here is important. We need to perform a full merge
// across all layers because we want the precise value of delta, so we must ensure that we
// query all layers, which is done by setting the lower bound to zero (the merger consults
// iterators until it encounters a key whose lower-bound is not greater than the search
// key). The upper bound is used to search each individual layer, and we want to start with
// an extent that covers the first byte of the range we're deallocating.
let mut iter = merger
.seek(Bound::Included(&AllocatorKey { device_range: 0..dealloc_range.start + 1 }))
let mut deallocated = 0;
let mut mutation = None;
while let Some(ItemRef {
key: AllocatorKey { device_range, .. },
value: AllocatorValue { delta, .. },
}) = iter.get()
if device_range.start > dealloc_range.start {
// We expect the entire range to be allocated.
let end = std::cmp::min(device_range.end, dealloc_range.end);
if *delta == 1 {
// In this branch, we know that we're freeing data, so we want an Allocator
// mutation.
if let Some(Mutation::AllocatorRef(_)) = mutation {
transaction.add(self.object_id(), mutation.take().unwrap());
match &mut mutation {
None => {
mutation = Some(Mutation::allocation(Item::new(
AllocatorKey { device_range: dealloc_range.start..end },
AllocatorValue { delta: -1 },
Some(Mutation::Allocator(AllocatorMutation(AllocatorItem { key, .. }))) => {
key.device_range.end = end;
_ => unreachable!(),
deallocated += end - dealloc_range.start;
} else {
// In this branch, we know that we're not freeing data, so we want an AllocatorRef
// mutation.
if let Some(Mutation::Allocator(_)) = mutation {
transaction.add(self.object_id(), mutation.take().unwrap());
match &mut mutation {
None => {
mutation = Some(Mutation::allocation_ref(Item::new(
AllocatorKey { device_range: dealloc_range.start..end },
AllocatorValue { delta: -1 },
Some(Mutation::AllocatorRef(AllocatorMutation(AllocatorItem {
key, ..
}))) => {
key.device_range.end = end;
_ => unreachable!(),
if end == dealloc_range.end {
dealloc_range.start = end;
if let Some(mutation) = mutation {
transaction.add(self.object_id(), mutation);
fn as_mutations(self: Arc<Self>) -> Arc<dyn Mutations> {
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
async fn did_flush_device(&self, flush_log_offset: u64) {
// First take out the deallocations that we now know to be flushed. The list is maintained
// in order, so we can stop on the first entry that we find that should not be unreserved
// yet.
let deallocs = {
let mut inner = self.inner.lock().unwrap();
if let Some((index, _)) = inner
.find(|(_, (dealloc_log_offset, _))| *dealloc_log_offset > flush_log_offset)
let mut deallocs = inner.committed_deallocations.split_off(index);
// Swap because we want the opposite of what split_off does.
std::mem::swap(&mut inner.committed_deallocations, &mut deallocs);
} else {
std::mem::take(&mut inner.committed_deallocations)
// Now we can erase those elements from reserved_allocations (whilst we're not holding the
// lock on inner).
for (_, device_range) in deallocs {
Item::new(AllocatorKey { device_range }, AllocatorValue { delta: 0 })
fn reserve(self: Arc<Self>, amount: u64) -> Option<Reservation> {
let mut inner = self.inner.lock().unwrap();
if self.device_size - inner.allocated_bytes as u64 - inner.reserved_bytes < amount {
return None;
inner.reserved_bytes += amount;
Some(Reservation::new(self, amount))
fn reserve_at_most(self: Arc<Self>, mut amount: u64) -> Reservation {
let mut inner = self.inner.lock().unwrap();
amount = std::cmp::min(
self.device_size - inner.allocated_bytes as u64 - inner.reserved_bytes,
inner.reserved_bytes += amount;
Reservation::new(self, amount)
fn release_reservation(&self, amount: u64) {
self.inner.lock().unwrap().reserved_bytes -= amount;
fn get_allocated_bytes(&self) -> u64 {
self.inner.lock().unwrap().allocated_bytes as u64
async fn validate_mutation(
journal_offset: u64,
mutation: &Mutation,
checksum_list: &mut ChecksumList,
) -> Result<bool, Error> {
match mutation {
Mutation::Allocator(AllocatorMutation(AllocatorItem {
key: AllocatorKey { device_range },
value: AllocatorValue { delta },
})) if *delta < 0 => {
checksum_list.mark_deallocated(journal_offset, device_range.clone());
_ => {}
impl Mutations for SimpleAllocator {
async fn apply_mutation(
mutation: Mutation,
transaction: Option<&Transaction<'_>>,
log_offset: u64,
_assoc_obj: AssocObj<'_>,
) {
match mutation {
Mutation::Allocator(AllocatorMutation(mut item)) => {
item.sequence = log_offset;
// We currently rely on barriers here between inserting/removing from reserved
// allocations and merging into the tree. These barriers are present whilst we use
// skip_list_layer's commit_and_wait method, rather than just commit.
if transaction.is_some() && < 0 {
.push_back((log_offset, item.key.device_range.clone()));
let mut item = item.clone(); = 1;
let lower_bound = item.key.lower_bound_for_merge_into();
self.tree.merge_into(item.clone(), &lower_bound).await;
let len = item.key.device_range.length();
if > 0 {
if transaction.is_some() {
let mut inner = self.inner.lock().unwrap();
inner.allocated_bytes = inner.allocated_bytes.saturating_add(len as i64);
if let Some(transaction) = transaction {
inner.reserved_bytes -= len;
if let Some(reservation) = transaction.allocator_reservation {
} else {
let mut inner = self.inner.lock().unwrap();
inner.allocated_bytes = inner.allocated_bytes.saturating_sub(len as i64);
if let Some(Transaction { allocator_reservation: Some(reservation), .. }) =
inner.reserved_bytes += len;
Mutation::AllocatorRef(AllocatorMutation(mut item)) => {
item.sequence = log_offset;
let lower_bound = item.key.lower_bound_for_merge_into();
self.tree.merge_into(item, &lower_bound).await;
Mutation::BeginFlush => {
// After we seal the tree, we will start adding mutations to the new mutable
// layer, but we cannot safely do that whilst we are attempting to allocate
// because there is a chance it might miss an allocation and also not see the
// allocation in reserved_allocations.
let _guard = self.allocation_lock.lock().await;
// Transfer our running count for allocated_bytes so that it gets written to the new
// info file when flush completes.
let mut inner = self.inner.lock().unwrap(); = inner.allocated_bytes as u64;
Mutation::EndFlush => {
if transaction.is_none() {
// AllocatorInfo is written in the same transaction and will contain the count
// at the point BeginFlush was applied, so we need to adjust allocated_bytes so
// that it just covers the delta from that point. Later, when we properly open
// the allocator, we'll add this back.
let mut inner = self.inner.lock().unwrap();
inner.allocated_bytes -= as i64;
} else {
let layers = self.tree.immutable_layer_set();
.map(|l| l.handle().map(|h| h.get_size()).unwrap_or(0))
_ => panic!("unexpected mutation! {:?}", mutation), // TODO(csuter): This can't panic
fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
match mutation {
Mutation::Allocator(AllocatorMutation(item)) => {
if > 0 {
let mut inner = self.inner.lock().unwrap();
if let Some(reservation) = transaction.allocator_reservation {
} else {
inner.reserved_bytes -= item.key.device_range.length();
_ => {}
async fn flush(&self) -> Result<(), Error> {
let filesystem = self.filesystem.upgrade().unwrap();
let object_manager = filesystem.object_manager();
if !object_manager.needs_flush(self.object_id()) {
return Ok(());
let graveyard = object_manager.graveyard().ok_or(anyhow!("Missing graveyard!"))?;
// TODO(csuter): This all needs to be atomic somehow. We'll need to use different
// transactions for each stage, but we need make sure objects are cleaned up if there's a
// failure.
let reservation = object_manager.metadata_reservation();
let txn_options = Options {
skip_journal_checks: true,
borrow_metadata_space: true,
allocator_reservation: Some(reservation),
let mut transaction = filesystem.clone().new_transaction(&[], txn_options).await?;
let root_store = self.filesystem.upgrade().unwrap().root_store();
let layer_object_handle = ObjectStore::create_object(
&mut transaction,
HandleOptions { skip_journal_checks: true, ..Default::default() },
let object_id = layer_object_handle.object_id();
graveyard.add(&mut transaction, root_store.store_object_id(), object_id);
// It's important that this transaction does not include any allocations because we use
// BeginFlush as a snapshot point for mutations to the tree: other allocator mutations
// within this transaction might get applied before seal (which would be OK), but they could
// equally get applied afterwards (since Transaction makes no guarantees about the order in
// which mutations are applied whilst committing), in which case they'd get lost on replay
// because the journal will only send mutations that follow this transaction.
transaction.add(self.object_id(), Mutation::BeginFlush);
let layer_set = self.tree.immutable_layer_set();
let mut merger = layer_set.merger();
Writer::new(&layer_object_handle, txn_options),
log::debug!("using {} for allocator layer file", object_id);
let object_handle =
ObjectStore::open_object(&root_store, self.object_id(), HandleOptions::default())
// TODO(jfsulliv): Can we preallocate the buffer instead of doing a bounce? Do we know the
// size up front?
let mut transaction = filesystem.clone().new_transaction(&[], txn_options).await?;
let mut serialized_info = Vec::new();
let mut inner = self.inner.lock().unwrap();
// Move all the existing layers to the graveyard.
for object_id in & {
graveyard.add(&mut transaction, root_store.store_object_id(), *object_id);
} = vec![object_id];
serialize_into(&mut serialized_info, &;
let mut buf = object_handle.allocate_buffer(serialized_info.len());
object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
// It's important that EndFlush is in the same transaction that we write AllocatorInfo,
// because we use EndFlush to make the required adjustments to allocated_bytes.
transaction.add(self.object_id(), Mutation::EndFlush);
graveyard.remove(&mut transaction, root_store.store_object_id(), object_id);
// TODO(csuter): what if this fails.
// Now close the layers and purge them.
for layer in layer_set.layers {
let object_id = layer.handle().map(|h| h.object_id());
if let Some(object_id) = object_id {
root_store.tombstone(object_id, txn_options).await?;
// The merger is unable to merge extents that exist like the following:
// |----- +1 -----|
// |----- -1 -----|
// |----- +2 -----|
// It cannot coalesce them because it has to emit the +1 record so that it can move on and merge the
// -1 and +2 records. To address this, we add another stage that applies after merging which
// coalesces records after they have been emitted. This is a bit simpler than merging because the
// records cannot overlap, so it's just a question of merging adjacent records if they happen to
// have the same delta.
pub struct CoalescingIterator<'a> {
iter: BoxedLayerIterator<'a, AllocatorKey, AllocatorValue>,
item: Option<AllocatorItem>,
impl<'a> CoalescingIterator<'a> {
pub async fn new(
iter: BoxedLayerIterator<'a, AllocatorKey, AllocatorValue>,
) -> Result<CoalescingIterator<'a>, Error> {
let mut iter = Self { iter, item: None };
impl LayerIterator<AllocatorKey, AllocatorValue> for CoalescingIterator<'_> {
async fn advance(&mut self) -> Result<(), Error> {
self.item = self.iter.get().map(|x| x.cloned());
if self.item.is_none() {
return Ok(());
let left = self.item.as_mut().unwrap();
loop {
match self.iter.get() {
None => return Ok(()),
Some(right) => {
// The two records cannot overlap.
assert!(left.key.device_range.end <= right.key.device_range.start);
// We can only coalesce the records if they are touching and have the same
// delta.
if left.key.device_range.end < right.key.device_range.start
|| !=
return Ok(());
left.key.device_range.end = right.key.device_range.end;
fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
self.item.as_ref().map(|x| x.as_item_ref())
mod tests {
use {
types::{Item, ItemRef, Layer, LayerIterator, MutableLayer},
merge::merge, Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
filesystem::{Filesystem, Mutations},
transaction::{Options, TransactionHandler},
fuchsia_async as fasync,
cmp::{max, min},
ops::{Bound, Range},
storage_device::{fake_device::FakeDevice, DeviceHolder},
async fn test_coalescing_iterator() {
let skip_list = SkipListLayer::new(100);
let items = [
Item::new(AllocatorKey { device_range: 0..100 }, AllocatorValue { delta: 1 }),
Item::new(AllocatorKey { device_range: 100..200 }, AllocatorValue { delta: 1 }),
let mut iter =
CoalescingIterator::new("seek failed"))
.expect("new failed");
let ItemRef { key, value, .. } = iter.get().expect("get failed");
(key, value),
(&AllocatorKey { device_range: 0..200 }, &AllocatorValue { delta: 1 })
iter.advance().await.expect("advance failed");
async fn test_merge_and_coalesce_across_three_layers() {
let lsm_tree = LSMTree::new(merge);
.insert(Item::new(AllocatorKey { device_range: 100..200 }, AllocatorValue { delta: 2 }))
AllocatorKey { device_range: 100..200 },
AllocatorValue { delta: -1 },
.insert(Item::new(AllocatorKey { device_range: 0..100 }, AllocatorValue { delta: 1 }))
let layer_set = lsm_tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = CoalescingIterator::new(Box::new("seek failed"),
.expect("new failed");
let ItemRef { key, value, .. } = iter.get().expect("get failed");
(key, value),
(&AllocatorKey { device_range: 0..200 }, &AllocatorValue { delta: 1 })
iter.advance().await.expect("advance failed");
fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
if a.end > b.start && a.start < b.end {
min(a.end, b.end) - max(a.start, b.start)
} else {
async fn check_allocations(allocator: &SimpleAllocator, expected_allocations: &[Range<u64>]) {
let layer_set = allocator.tree.layer_set();
let mut merger = layer_set.merger();
let mut iter ="seek failed");
let mut found = 0;
while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
let mut l = device_range.length();
found += l;
// Make sure that the entire range we have found completely overlaps with all the
// allocations we expect to find.
for range in expected_allocations {
l -= overlap(range, device_range);
if l == 0 {
assert_eq!(l, 0);
iter.advance().await.expect("advance failed");
// Make sure the total we found adds up to what we expect.
assert_eq!(found, expected_allocations.iter().map(|r| r.length()).sum());
async fn test_allocations() {
let device = DeviceHolder::new(FakeDevice::new(4096, 512));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
let store = ObjectStore::new_empty(None, 2, fs.clone());
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
let mut device_ranges = Vec::new();
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
assert!(device_ranges.last().unwrap().length() == 512);
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
assert!(device_ranges.last().unwrap().length() == 512);
assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
assert!(device_ranges[2].length() == 512);
assert_eq!(overlap(&device_ranges[0], &device_ranges[2]), 0);
assert_eq!(overlap(&device_ranges[1], &device_ranges[2]), 0);
check_allocations(&allocator, &device_ranges).await;
async fn test_deallocations() {
let device = DeviceHolder::new(FakeDevice::new(4096, 512));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
let store = ObjectStore::new_empty(None, 2, fs.clone());
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
let device_range1 =
allocator.allocate(&mut transaction, 512).await.expect("allocate failed");
assert!(device_range1.length() == 512);
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
allocator.deallocate(&mut transaction, device_range1).await.expect("deallocate failed");
check_allocations(&allocator, &[]).await;
async fn test_mark_allocated() {
let device = DeviceHolder::new(FakeDevice::new(4096, 512));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
let store = ObjectStore::new_empty(None, 2, fs.clone());
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
let mut device_ranges = Vec::new();
.mark_allocated(&mut transaction, device_ranges.last().unwrap().clone())
.expect("mark_allocated failed");
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
assert!(device_ranges.last().unwrap().length() == 512);
assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
check_allocations(&allocator, &device_ranges).await;
async fn test_flush() {
let device = DeviceHolder::new(FakeDevice::new(4096, 512));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
let store = ObjectStore::new_empty(None, 2, fs.clone());
allocator.ensure_open().await.expect("ensure_open failed");
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
let graveyard = Graveyard::create(&mut transaction, &store).await.expect("create failed");
let mut device_ranges = Vec::new();
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
allocator.flush().await.expect("flush failed");
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, false));
// When we flushed the allocator, it would have been written to the device somewhere but
// without a journal, we will be missing those records, so this next allocation will likely
// be on top of those objects. That won't matter for the purposes of this test, since we
// are not writing anything to these ranges.
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
for r in &device_ranges[..3] {
assert_eq!(overlap(r, device_ranges.last().unwrap()), 0);
check_allocations(&allocator, &device_ranges).await;
async fn test_dropped_transaction() {
let device = DeviceHolder::new(FakeDevice::new(4096, 512));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
let store = ObjectStore::new_empty(None, 2, fs.clone());
let allocated_range = {
let mut transaction = fs
.new_transaction(&[], Options::default())
.expect("new_transaction failed");
allocator.allocate(&mut transaction, 512).await.expect("allocate failed")
// After dropping the transaction and attempting to allocate again, we should end up with
// the same range because the reservation should have been released.
let mut transaction = fs
.new_transaction(&[], Options::default())
.expect("new_transaction failed");
allocator.allocate(&mut transaction, 512).await.expect("allocate failed"),
async fn test_allocated_bytes() {
const BLOCK_COUNT: u32 = 4096;
const BLOCK_SIZE: u32 = 512;
let device = DeviceHolder::new(FakeDevice::new(BLOCK_COUNT.into(), BLOCK_SIZE));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
let store = ObjectStore::new_empty(None, 2, fs.clone());
assert_eq!(allocator.get_allocated_bytes(), 0);
// Verify allocated_bytes reflects allocation changes.
const ALLOCATED_BYTES: u64 = 512;
let allocated_range = {
let mut transaction = fs
.new_transaction(&[], Options::default())
.expect("new_transaction failed");
let range = allocator
.allocate(&mut transaction, ALLOCATED_BYTES)
.expect("allocate failed");
assert_eq!(allocator.get_allocated_bytes(), ALLOCATED_BYTES);
let mut transaction = fs
.new_transaction(&[], Options::default())
.expect("new_transaction failed");
allocator.allocate(&mut transaction, 512).await.expect("allocate failed");
// Prior to commiiting, the count of allocated bytes shouldn't change.
assert_eq!(allocator.get_allocated_bytes(), ALLOCATED_BYTES);
// After dropping the prior transaction, the allocated bytes still shouldn't have changed.
assert_eq!(allocator.get_allocated_bytes(), ALLOCATED_BYTES);
// Verify allocated_bytes reflects deallocations.
let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
allocator.deallocate(&mut transaction, deallocate_range).await.expect("deallocate failed");
// Before committing, there should be no change.
assert_eq!(allocator.get_allocated_bytes(), ALLOCATED_BYTES);
// After committing, all but 40 bytes should remain allocated.
assert_eq!(allocator.get_allocated_bytes(), 40);