blob: 8a2a4bb77791a8c2df8e25a3e4b334b3a3b3e55c [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
pub mod merge;
use {
crate::{
errors::FxfsError,
lsm_tree::{
layers_from_handles,
skip_list_layer::SkipListLayer,
types::{
BoxedLayerIterator, Item, ItemRef, Layer, LayerIterator, MutableLayer, NextKey,
OrdLowerBound, OrdUpperBound,
},
LSMTree,
},
object_handle::{ObjectHandle, ObjectHandleExt, Writer},
object_store::{
filesystem::{Filesystem, Mutations},
journal::checksum_list::ChecksumList,
object_manager::ObjectFlush,
transaction::{AllocatorMutation, AssocObj, Mutation, Options, Transaction},
HandleOptions, ObjectStore,
},
trace_duration,
},
anyhow::{anyhow, bail, ensure, Error},
async_trait::async_trait,
bincode::{deserialize_from, serialize_into},
interval_tree::utils::RangeOps,
merge::merge,
serde::{Deserialize, Serialize},
std::{
any::Any,
cmp::min,
collections::VecDeque,
convert::TryInto,
ops::{Bound, Range},
sync::{Arc, Mutex, Weak},
},
};
/// Allocators must implement this. An allocator is responsible for allocating ranges on behalf of
/// an object-store.
#[async_trait]
pub trait Allocator: Send + Sync {
/// Returns the object ID for the allocator.
fn object_id(&self) -> u64;
/// Tries to allocate enough space for |object_range| in the specified object and returns the
/// device ranges allocated.
/// TODO(csuter): We need to think about how to deal with fragmentation e.g. returning an array
/// of allocations, or returning a partial allocation request for situations where that makes
/// sense.
async fn allocate(
&self,
transaction: &mut Transaction<'_>,
len: u64,
) -> Result<Range<u64>, Error>;
/// Deallocates the given device range for the specified object.
async fn deallocate(
&self,
transaction: &mut Transaction<'_>,
device_range: Range<u64>,
) -> Result<u64, Error>;
/// Marks the given device range as allocated. The main use case for this at this time is for
/// the super-block which needs to be at a fixed location on the device.
async fn mark_allocated(
&self,
transaction: &mut Transaction<'_>,
device_range: Range<u64>,
) -> Result<(), Error>;
/// Adds a reference to the given device range which must already be allocated.
fn add_ref(&self, transaction: &mut Transaction<'_>, device_range: Range<u64>);
/// Cast to super-trait.
fn as_mutations(self: Arc<Self>) -> Arc<dyn Mutations>;
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
/// Called when the device has been flush and indicates what the journal log offset was when
/// that happened.
async fn did_flush_device(&self, flush_log_offset: u64);
/// Returns a reservation that can be used later, or None if there is insufficient space.
fn reserve(self: Arc<Self>, amount: u64) -> Option<Reservation>;
/// Like reserve, but returns as much as available if not all of amount is available, which
/// could be zero bytes.
fn reserve_at_most(self: Arc<Self>, amount: u64) -> Reservation;
/// Releases the reservation.
fn release_reservation(&self, reservation: &mut Reservation);
/// Returns the number of allocated bytes.
fn get_allocated_bytes(&self) -> u64;
/// Used during replay to validate a mutation. This should return false if the mutation is not
/// valid and should not be applied. This could be for benign reasons: e.g. the device flushed
/// data out-of-order, or because of a malicious actor.
async fn validate_mutation(
&self,
journal_offset: u64,
mutation: &Mutation,
checksum_list: &mut ChecksumList,
) -> Result<bool, Error>;
}
pub struct Reservation {
allocator: Arc<dyn Allocator>,
amount: Mutex<u64>,
}
impl std::fmt::Debug for Reservation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Reservation").field("amount", &*self.amount.lock().unwrap()).finish()
}
}
impl Reservation {
pub fn new(allocator: Arc<dyn Allocator>, amount: u64) -> Self {
Self { allocator, amount: Mutex::new(amount) }
}
pub fn amount(&self) -> u64 {
*self.amount.lock().unwrap()
}
pub fn add(&self, amount: u64) {
*self.amount.lock().unwrap() += amount;
}
pub fn sub(&self, delta: u64) -> Result<(), Error> {
let mut amount = self.amount.lock().unwrap();
*amount = amount.checked_sub(delta).ok_or(FxfsError::NoSpace)?;
Ok(())
}
pub fn take(&self) -> u64 {
std::mem::take(&mut self.amount.lock().unwrap())
}
pub fn try_top_up(&self, target: u64) -> bool {
let mut amount = self.amount.lock().unwrap();
if *amount < target {
*amount += self.allocator.clone().reserve_at_most(target - *amount).take();
}
*amount >= target
}
}
impl Drop for Reservation {
fn drop(&mut self) {
self.allocator.clone().release_reservation(self);
}
}
// Our allocator implementation tracks extents with a reference count. At time of writing, these
// reference counts should never exceed 1, but that might change with snapshots and clones.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct AllocatorKey {
pub device_range: Range<u64>,
}
impl AllocatorKey {
/// Returns a new key that is a lower bound suitable for use with merge_into.
pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
AllocatorKey { device_range: 0..self.device_range.start }
}
}
impl NextKey for AllocatorKey {}
impl OrdUpperBound for AllocatorKey {
fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
self.device_range.end.cmp(&other.device_range.end)
}
}
impl OrdLowerBound for AllocatorKey {
fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
self.device_range.start.cmp(&other.device_range.start)
}
}
impl Ord for AllocatorKey {
fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
self.device_range
.start
.cmp(&other.device_range.start)
.then(self.device_range.end.cmp(&other.device_range.end))
}
}
impl PartialOrd for AllocatorKey {
fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct AllocatorValue {
// This is the delta on a reference count for the extent.
pub delta: i64,
}
pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
#[derive(Debug, Default, Deserialize, Serialize)]
struct AllocatorInfo {
layers: Vec<u64>,
allocated_bytes: u64,
}
const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131072;
// For now this just implements a first-fit strategy. This is a very naiive implementation.
pub struct SimpleAllocator {
filesystem: Weak<dyn Filesystem>,
block_size: u32,
device_size: u64,
object_id: u64,
empty: bool,
tree: LSMTree<AllocatorKey, AllocatorValue>,
reserved_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
inner: Mutex<Inner>,
allocation_lock: futures::lock::Mutex<()>,
}
struct Inner {
info: AllocatorInfo,
// The allocator can only be opened if there have been no allocations and it has not already
// been opened or initialized.
opened: bool,
// When a transaction is dropped, we need to release the reservation, but that requires the use
// of async methods which we can't use when called from drop. To workaround that, we keep an
// array of dropped_allocations and update reserved_allocations the next time we try to
// allocate.
dropped_allocations: Vec<AllocatorItem>,
// This value is the up-to-date count of the number of allocated bytes whereas the value in
// `info` is the value as it was when we last flushed. This is i64 because it can be negative
// during replay.
allocated_bytes: i64,
// This value is the number of bytes allocated to either uncommitted allocations, or
// reservations.
reserved_bytes: u64,
// Committed deallocations that we cannot use until they are flushed to the device. Each entry
// in this list is the log file offset at which it was committed and an array of deallocations
// that occurred at that time.
committed_deallocations: VecDeque<(u64, Range<u64>)>,
}
impl SimpleAllocator {
pub fn new(filesystem: Arc<dyn Filesystem>, object_id: u64, empty: bool) -> SimpleAllocator {
SimpleAllocator {
filesystem: Arc::downgrade(&filesystem),
block_size: filesystem.device().block_size(),
device_size: filesystem.device().size(),
object_id,
empty,
tree: LSMTree::new(merge),
reserved_allocations: SkipListLayer::new(1024), // TODO(csuter): magic numbers
inner: Mutex::new(Inner {
info: AllocatorInfo::default(),
opened: false,
dropped_allocations: Vec::new(),
allocated_bytes: 0,
reserved_bytes: 0,
committed_deallocations: VecDeque::new(),
}),
allocation_lock: futures::lock::Mutex::new(()),
}
}
pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
assert!(self.inner.lock().unwrap().opened);
&self.tree
}
// Ensures the allocator is open. If empty, create the object in the root object store,
// otherwise load and initialise the LSM tree.
pub async fn ensure_open(&self) -> Result<(), Error> {
{
if self.inner.lock().unwrap().opened {
return Ok(());
}
}
let _guard = self.allocation_lock.lock().await;
{
if self.inner.lock().unwrap().opened {
// We lost a race.
return Ok(());
}
}
let filesystem = self.filesystem.upgrade().unwrap();
let root_store = filesystem.root_store();
if self.empty {
let mut transaction = filesystem
.clone()
.new_transaction(&[], Options { skip_journal_checks: true, ..Default::default() })
.await?;
ObjectStore::create_object_with_id(
&root_store,
&mut transaction,
self.object_id(),
HandleOptions::default(),
)
.await?;
transaction.commit().await;
} else {
let handle =
ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default())
.await?;
if handle.get_size() > 0 {
let serialized_info = handle.contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE).await?;
let info: AllocatorInfo = deserialize_from(&serialized_info[..])?;
let mut handles = Vec::new();
for object_id in &info.layers {
handles.push(
ObjectStore::open_object(&root_store, *object_id, HandleOptions::default())
.await?,
);
}
{
let mut inner = self.inner.lock().unwrap();
// After replaying, allocated_bytes should include all the deltas since the time
// the allocator was last flushed, so here we just need to add whatever is
// recorded in info.
let amount: i64 =
info.allocated_bytes.try_into().map_err(|_| FxfsError::Inconsistent)?;
inner.allocated_bytes += amount;
if inner.allocated_bytes < 0 || inner.allocated_bytes as u64 > self.device_size
{
bail!(FxfsError::Inconsistent);
}
inner.info = info;
}
self.tree.append_layers(handles.into_boxed_slice()).await?;
}
}
self.inner.lock().unwrap().opened = true;
Ok(())
}
/// Returns all objects that exist in the parent store that pertain to this allocator.
pub fn parent_objects(&self) -> Vec<u64> {
// The allocator tree needs to store a file for each of the layers in the tree, so we return
// those, since nothing else references them.
self.inner.lock().unwrap().info.layers.clone()
}
}
#[async_trait]
impl Allocator for SimpleAllocator {
fn object_id(&self) -> u64 {
self.object_id
}
async fn allocate(
&self,
transaction: &mut Transaction<'_>,
len: u64,
) -> Result<Range<u64>, Error> {
ensure!(len % self.block_size as u64 == 0);
self.ensure_open().await?;
let _guard = self.allocation_lock.lock().await;
if let Some(reservation) = transaction.allocator_reservation {
ensure!(
reservation.amount() >= len,
anyhow!(FxfsError::NoSpace).context("Insufficient space in reservation")
);
}
let dropped_allocations = {
let mut inner = self.inner.lock().unwrap();
if transaction.allocator_reservation.is_none()
&& self.device_size - inner.allocated_bytes as u64 - inner.reserved_bytes < len
{
bail!(FxfsError::NoSpace);
}
std::mem::take(&mut inner.dropped_allocations)
};
// Update reserved_allocations using dropped_allocations.
for item in dropped_allocations {
self.reserved_allocations.erase(item.as_item_ref()).await;
}
let result = {
let tree = &self.tree;
let mut layer_set = tree.empty_layer_set();
layer_set
.layers
.push((self.reserved_allocations.clone() as Arc<dyn Layer<_, _>>).into());
tree.add_all_layers_to_layer_set(&mut layer_set);
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Unbounded).await?;
let mut last_offset = 0;
loop {
if last_offset + len >= self.device_size as u64 {
bail!(anyhow!(FxfsError::NoSpace).context("no space after search"));
}
match iter.get() {
None => {
break last_offset..last_offset + len;
}
Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
if device_range.start > last_offset {
break last_offset..min(last_offset + len, device_range.start);
}
last_offset = device_range.end;
}
}
iter.advance().await?;
}
};
log::debug!("allocate {:?}", result);
self.mark_allocated(transaction, result.clone()).await?;
Ok(result)
}
async fn mark_allocated(
&self,
transaction: &mut Transaction<'_>,
device_range: Range<u64>,
) -> Result<(), Error> {
ensure!(device_range.end <= self.device_size, FxfsError::NoSpace);
if let Some(reservation) = &mut transaction.allocator_reservation {
// This shouldn't fail because we checked the reservation had enough space at the
// beginning of allocate, after we took the lock and the lock should still be held.
reservation.sub(device_range.length()).unwrap();
} else {
self.inner.lock().unwrap().reserved_bytes += device_range.length();
}
let item = AllocatorItem::new(AllocatorKey { device_range }, AllocatorValue { delta: 1 });
self.reserved_allocations.insert(item.clone()).await;
transaction.add(self.object_id(), Mutation::allocation(item));
Ok(())
}
fn add_ref(&self, transaction: &mut Transaction<'_>, device_range: Range<u64>) {
transaction.add(
self.object_id(),
Mutation::allocation_ref(AllocatorItem::new(
AllocatorKey { device_range },
AllocatorValue { delta: 1 },
)),
);
}
async fn deallocate(
&self,
transaction: &mut Transaction<'_>,
mut dealloc_range: Range<u64>,
) -> Result<u64, Error> {
log::debug!("deallocate {:?}", dealloc_range);
trace_duration!("SimpleAllocator::deallocate");
// We need to determine whether this deallocation actually frees the range or is just a
// reference count adjustment. We separate the two kinds into two different mutation types
// so that we can adjust our counts correctly at commit time.
let layer_set = self.tree.layer_set();
let mut merger = layer_set.merger();
// The precise search key that we choose here is important. We need to perform a full merge
// across all layers because we want the precise value of delta, so we must ensure that we
// query all layers, which is done by setting the lower bound to zero (the merger consults
// iterators until it encounters a key whose lower-bound is not greater than the search
// key). The upper bound is used to search each individual layer, and we want to start with
// an extent that covers the first byte of the range we're deallocating.
let mut iter = merger
.seek(Bound::Included(&AllocatorKey { device_range: 0..dealloc_range.start + 1 }))
.await
.unwrap();
let mut deallocated = 0;
let mut mutation = None;
while let Some(ItemRef {
key: AllocatorKey { device_range, .. },
value: AllocatorValue { delta, .. },
..
}) = iter.get()
{
if device_range.start > dealloc_range.start {
// We expect the entire range to be allocated.
bail!(FxfsError::Inconsistent);
}
let end = std::cmp::min(device_range.end, dealloc_range.end);
if *delta == 1 {
// In this branch, we know that we're freeing data, so we want an Allocator
// mutation.
if let Some(Mutation::AllocatorRef(_)) = mutation {
transaction.add(self.object_id(), mutation.take().unwrap());
}
match &mut mutation {
None => {
mutation = Some(Mutation::allocation(Item::new(
AllocatorKey { device_range: dealloc_range.start..end },
AllocatorValue { delta: -1 },
)));
}
Some(Mutation::Allocator(AllocatorMutation(AllocatorItem { key, .. }))) => {
key.device_range.end = end;
}
_ => unreachable!(),
}
deallocated += end - dealloc_range.start;
} else {
// In this branch, we know that we're not freeing data, so we want an AllocatorRef
// mutation.
if let Some(Mutation::Allocator(_)) = mutation {
transaction.add(self.object_id(), mutation.take().unwrap());
}
match &mut mutation {
None => {
mutation = Some(Mutation::allocation_ref(Item::new(
AllocatorKey { device_range: dealloc_range.start..end },
AllocatorValue { delta: -1 },
)));
}
Some(Mutation::AllocatorRef(AllocatorMutation(AllocatorItem {
key, ..
}))) => {
key.device_range.end = end;
}
_ => unreachable!(),
}
}
if end == dealloc_range.end {
break;
}
dealloc_range.start = end;
iter.advance().await?;
}
if let Some(mutation) = mutation {
transaction.add(self.object_id(), mutation);
}
Ok(deallocated)
}
fn as_mutations(self: Arc<Self>) -> Arc<dyn Mutations> {
self
}
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self
}
async fn did_flush_device(&self, flush_log_offset: u64) {
// First take out the deallocations that we now know to be flushed. The list is maintained
// in order, so we can stop on the first entry that we find that should not be unreserved
// yet.
let deallocs = {
let mut inner = self.inner.lock().unwrap();
if let Some((index, _)) = inner
.committed_deallocations
.iter()
.enumerate()
.find(|(_, (dealloc_log_offset, _))| *dealloc_log_offset > flush_log_offset)
{
let mut deallocs = inner.committed_deallocations.split_off(index);
// Swap because we want the opposite of what split_off does.
std::mem::swap(&mut inner.committed_deallocations, &mut deallocs);
deallocs
} else {
std::mem::take(&mut inner.committed_deallocations)
}
};
// Now we can erase those elements from reserved_allocations (whilst we're not holding the
// lock on inner).
for (_, device_range) in deallocs {
self.reserved_allocations
.erase(
Item::new(AllocatorKey { device_range }, AllocatorValue { delta: 0 })
.as_item_ref(),
)
.await;
}
}
fn reserve(self: Arc<Self>, amount: u64) -> Option<Reservation> {
{
let mut inner = self.inner.lock().unwrap();
if self.device_size - inner.allocated_bytes as u64 - inner.reserved_bytes < amount {
return None;
}
inner.reserved_bytes += amount;
}
Some(Reservation::new(self, amount))
}
fn reserve_at_most(self: Arc<Self>, mut amount: u64) -> Reservation {
{
let mut inner = self.inner.lock().unwrap();
amount = std::cmp::min(
self.device_size - inner.allocated_bytes as u64 - inner.reserved_bytes,
amount,
);
inner.reserved_bytes += amount;
}
Reservation::new(self, amount)
}
fn release_reservation(&self, reservation: &mut Reservation) {
self.inner.lock().unwrap().reserved_bytes -= reservation.take();
}
fn get_allocated_bytes(&self) -> u64 {
self.inner.lock().unwrap().allocated_bytes as u64
}
async fn validate_mutation(
&self,
journal_offset: u64,
mutation: &Mutation,
checksum_list: &mut ChecksumList,
) -> Result<bool, Error> {
match mutation {
Mutation::Allocator(AllocatorMutation(AllocatorItem {
key: AllocatorKey { device_range },
value: AllocatorValue { delta },
..
})) if *delta < 0 => {
checksum_list.mark_deallocated(journal_offset, device_range.clone());
}
_ => {}
}
Ok(true)
}
}
#[async_trait]
impl Mutations for SimpleAllocator {
async fn apply_mutation(
&self,
mutation: Mutation,
transaction: Option<&Transaction<'_>>,
log_offset: u64,
_assoc_obj: AssocObj<'_>,
) {
match mutation {
Mutation::Allocator(AllocatorMutation(mut item)) => {
item.sequence = log_offset;
// We currently rely on barriers here between inserting/removing from reserved
// allocations and merging into the tree. These barriers are present whilst we use
// skip_list_layer's commit_and_wait method, rather than just commit.
if transaction.is_some() && item.value.delta < 0 {
self.inner
.lock()
.unwrap()
.committed_deallocations
.push_back((log_offset, item.key.device_range.clone()));
let mut item = item.clone();
item.value.delta = 1;
self.reserved_allocations.insert(item).await;
}
let lower_bound = item.key.lower_bound_for_merge_into();
self.tree.merge_into(item.clone(), &lower_bound).await;
let len = item.key.device_range.length();
if item.value.delta > 0 {
if transaction.is_some() {
self.reserved_allocations.erase(item.as_item_ref()).await;
}
let mut inner = self.inner.lock().unwrap();
inner.allocated_bytes = inner.allocated_bytes.saturating_add(len as i64);
if transaction.is_some() {
inner.reserved_bytes -= len;
}
} else {
let mut inner = self.inner.lock().unwrap();
inner.allocated_bytes = inner.allocated_bytes.saturating_sub(len as i64);
if let Some(Transaction { allocator_reservation: Some(reservation), .. }) =
transaction
{
inner.reserved_bytes += len;
reservation.add(len);
}
}
}
Mutation::AllocatorRef(AllocatorMutation(mut item)) => {
item.sequence = log_offset;
let lower_bound = item.key.lower_bound_for_merge_into();
self.tree.merge_into(item, &lower_bound).await;
}
// TODO(csuter): Since Seal and Compact are no longer being used for just trees, we
// should consider changing the names to something else, maybe FlushBegin and
// FlushCommit to match ObjectFlush, and maybe ObjectFlush::commit should be responsible
// for adding it to a transaction.
Mutation::TreeSeal => {
{
// After we seal the tree, we will start adding mutations to the new mutable
// layer, but we cannot safely do that whilst we are attempting to allocate
// because there is a chance it might miss an allocation and also not see the
// allocation in reserved_allocations.
let _guard = self.allocation_lock.lock().await;
self.tree.seal().await;
}
// Transfer our running count for allocated_bytes so that it gets written to the new
// info file when flush completes.
let mut inner = self.inner.lock().unwrap();
inner.info.allocated_bytes = inner.allocated_bytes as u64;
}
Mutation::TreeCompact => {
if transaction.is_none() {
self.tree.reset_immutable_layers();
// AllocatorInfo is written in the same transaction and will contain the count
// at the point TreeSeal was applied, so we need to adjust allocated_bytes so
// that it just covers the delta from that point. Later, when we properly open
// the allocator, we'll add this back.
let mut inner = self.inner.lock().unwrap();
inner.allocated_bytes -= inner.info.allocated_bytes as i64;
}
}
_ => panic!("unexpected mutation! {:?}", mutation), // TODO(csuter): This can't panic
}
}
fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
match mutation {
Mutation::Allocator(AllocatorMutation(item)) => {
if item.value.delta > 0 {
let mut inner = self.inner.lock().unwrap();
if let Some(reservation) = transaction.allocator_reservation {
reservation.add(item.key.device_range.length());
} else {
inner.reserved_bytes -= item.key.device_range.length();
}
inner.dropped_allocations.push(item);
}
}
_ => {}
}
}
async fn flush(&self) -> Result<(), Error> {
self.ensure_open().await?;
let filesystem = self.filesystem.upgrade().unwrap();
let object_manager = filesystem.object_manager();
if !object_manager.needs_flush(self.object_id()) {
return Ok(());
}
let graveyard = object_manager.graveyard().ok_or(anyhow!("Missing graveyard!"))?;
let object_sync = ObjectFlush::new(object_manager, self.object_id());
// TODO(csuter): This all needs to be atomic somehow. We'll need to use different
// transactions for each stage, but we need make sure objects are cleaned up if there's a
// failure.
let reservation = filesystem.flush_reservation();
let txn_options = Options {
skip_journal_checks: true,
allocator_reservation: Some(reservation),
..Default::default()
};
let mut transaction = filesystem.clone().new_transaction(&[], txn_options).await?;
let root_store = self.filesystem.upgrade().unwrap().root_store();
let layer_object_handle = ObjectStore::create_object(
&root_store,
&mut transaction,
HandleOptions { skip_journal_checks: true, ..Default::default() },
)
.await?;
let object_id = layer_object_handle.object_id();
graveyard.add(&mut transaction, root_store.store_object_id(), object_id);
// It's important that this transaction does not include any allocations because we use
// TreeSeal as a snapshot point for mutations to the tree: other allocator mutations within
// this transaction might get applied before seal (which would be OK), but they could
// equally get applied afterwards (since Transaction makes no guarantees about the order in
// which mutations are applied whilst committing), in which case they'd get lost on replay
// because the journal will only send mutations that follow this transaction.
transaction.add_with_object(
self.object_id(),
Mutation::TreeSeal,
AssocObj::Borrowed(&object_sync),
);
transaction.commit().await;
let layer_set = self.tree.immutable_layer_set();
{
let mut merger = layer_set.merger();
self.tree
.compact_with_iterator(
CoalescingIterator::new(Box::new(merger.seek(Bound::Unbounded).await?)).await?,
Writer::new(&layer_object_handle, txn_options),
)
.await?;
}
log::debug!("using {} for allocator layer file", object_id);
let object_handle =
ObjectStore::open_object(&root_store, self.object_id(), HandleOptions::default())
.await?;
// TODO(jfsulliv): Can we preallocate the buffer instead of doing a bounce? Do we know the
// size up front?
let mut transaction = filesystem.clone().new_transaction(&[], txn_options).await?;
let mut serialized_info = Vec::new();
{
let mut inner = self.inner.lock().unwrap();
// Move all the existing layers to the graveyard.
for object_id in &inner.info.layers {
graveyard.add(&mut transaction, root_store.store_object_id(), *object_id);
}
inner.info.layers = vec![object_id];
serialize_into(&mut serialized_info, &inner.info)?;
}
let mut buf = object_handle.allocate_buffer(serialized_info.len());
buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
// It's important that TreeCompact is in the same transaction that we write AllocatorInfo,
// because we use TreeCompact to make the required adjustments to allocated_bytes.
transaction.add(self.object_id(), Mutation::TreeCompact);
graveyard.remove(&mut transaction, root_store.store_object_id(), object_id);
transaction.commit().await;
// TODO(csuter): what if this fails.
self.tree.set_layers(layers_from_handles(Box::new([layer_object_handle])).await?);
object_sync.commit();
// Now close the layers and purge them.
for layer in layer_set.layers {
let object_id = layer.handle().map(|h| h.object_id());
layer.close_layer().await;
if let Some(object_id) = object_id {
root_store.tombstone(object_id, txn_options).await?;
}
}
Ok(())
}
}
// The merger is unable to merge extents that exist like the following:
//
// |----- +1 -----|
// |----- -1 -----|
// |----- +2 -----|
//
// It cannot coalesce them because it has to emit the +1 record so that it can move on and merge the
// -1 and +2 records. To address this, we add another stage that applies after merging which
// coalesces records after they have been emitted. This is a bit simpler than merging because the
// records cannot overlap, so it's just a question of merging adjacent records if they happen to
// have the same delta.
pub struct CoalescingIterator<'a> {
iter: BoxedLayerIterator<'a, AllocatorKey, AllocatorValue>,
item: Option<AllocatorItem>,
}
impl<'a> CoalescingIterator<'a> {
pub async fn new(
iter: BoxedLayerIterator<'a, AllocatorKey, AllocatorValue>,
) -> Result<CoalescingIterator<'a>, Error> {
let mut iter = Self { iter, item: None };
iter.advance().await?;
Ok(iter)
}
}
#[async_trait]
impl LayerIterator<AllocatorKey, AllocatorValue> for CoalescingIterator<'_> {
async fn advance(&mut self) -> Result<(), Error> {
self.item = self.iter.get().map(|x| x.cloned());
if self.item.is_none() {
return Ok(());
}
let left = self.item.as_mut().unwrap();
loop {
self.iter.advance().await?;
match self.iter.get() {
None => return Ok(()),
Some(right) => {
// The two records cannot overlap.
assert!(left.key.device_range.end <= right.key.device_range.start);
// We can only coalesce the records if they are touching and have the same
// delta.
if left.key.device_range.end < right.key.device_range.start
|| left.value.delta != right.value.delta
{
return Ok(());
}
left.key.device_range.end = right.key.device_range.end;
}
}
}
}
fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
self.item.as_ref().map(|x| x.as_item_ref())
}
}
#[cfg(test)]
mod tests {
use {
crate::{
lsm_tree::{
skip_list_layer::SkipListLayer,
types::{Item, ItemRef, Layer, LayerIterator, MutableLayer},
LSMTree,
},
object_store::{
allocator::{
merge::merge, Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
SimpleAllocator,
},
filesystem::{Filesystem, Mutations},
graveyard::Graveyard,
testing::fake_filesystem::FakeFilesystem,
transaction::{Options, TransactionHandler},
ObjectStore,
},
},
fuchsia_async as fasync,
interval_tree::utils::RangeOps,
std::{
cmp::{max, min},
ops::{Bound, Range},
sync::Arc,
},
storage_device::{fake_device::FakeDevice, DeviceHolder},
};
#[fasync::run_singlethreaded(test)]
async fn test_coalescing_iterator() {
let skip_list = SkipListLayer::new(100);
let items = [
Item::new(AllocatorKey { device_range: 0..100 }, AllocatorValue { delta: 1 }),
Item::new(AllocatorKey { device_range: 100..200 }, AllocatorValue { delta: 1 }),
];
skip_list.insert(items[1].clone()).await;
skip_list.insert(items[0].clone()).await;
let mut iter =
CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
.await
.expect("new failed");
let ItemRef { key, value, .. } = iter.get().expect("get failed");
assert_eq!(
(key, value),
(&AllocatorKey { device_range: 0..200 }, &AllocatorValue { delta: 1 })
);
iter.advance().await.expect("advance failed");
assert!(iter.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_merge_and_coalesce_across_three_layers() {
let lsm_tree = LSMTree::new(merge);
lsm_tree
.insert(Item::new(AllocatorKey { device_range: 100..200 }, AllocatorValue { delta: 2 }))
.await;
lsm_tree.seal().await;
lsm_tree
.insert(Item::new(
AllocatorKey { device_range: 100..200 },
AllocatorValue { delta: -1 },
))
.await;
lsm_tree.seal().await;
lsm_tree
.insert(Item::new(AllocatorKey { device_range: 0..100 }, AllocatorValue { delta: 1 }))
.await;
let layer_set = lsm_tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = CoalescingIterator::new(Box::new(
merger.seek(Bound::Unbounded).await.expect("seek failed"),
))
.await
.expect("new failed");
let ItemRef { key, value, .. } = iter.get().expect("get failed");
assert_eq!(
(key, value),
(&AllocatorKey { device_range: 0..200 }, &AllocatorValue { delta: 1 })
);
iter.advance().await.expect("advance failed");
assert!(iter.get().is_none());
}
fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
if a.end > b.start && a.start < b.end {
min(a.end, b.end) - max(a.start, b.start)
} else {
0
}
}
async fn check_allocations(allocator: &SimpleAllocator, expected_allocations: &[Range<u64>]) {
let layer_set = allocator.tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
let mut found = 0;
while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
let mut l = device_range.length();
found += l;
// Make sure that the entire range we have found completely overlaps with all the
// allocations we expect to find.
for range in expected_allocations {
l -= overlap(range, device_range);
if l == 0 {
break;
}
}
assert_eq!(l, 0);
iter.advance().await.expect("advance failed");
}
// Make sure the total we found adds up to what we expect.
assert_eq!(found, expected_allocations.iter().map(|r| r.length()).sum());
}
#[fasync::run_singlethreaded(test)]
async fn test_allocations() {
let device = DeviceHolder::new(FakeDevice::new(1024, 512));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
fs.object_manager().set_allocator(allocator.clone());
let store = ObjectStore::new_empty(None, 2, fs.clone());
fs.object_manager().set_root_store(store.clone());
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
let mut device_ranges = Vec::new();
device_ranges
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
assert!(device_ranges.last().unwrap().length() == 512);
device_ranges
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
assert!(device_ranges.last().unwrap().length() == 512);
assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
transaction.commit().await;
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
device_ranges
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
assert!(device_ranges[2].length() == 512);
assert_eq!(overlap(&device_ranges[0], &device_ranges[2]), 0);
assert_eq!(overlap(&device_ranges[1], &device_ranges[2]), 0);
transaction.commit().await;
check_allocations(&allocator, &device_ranges).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_deallocations() {
let device = DeviceHolder::new(FakeDevice::new(1024, 512));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
fs.object_manager().set_allocator(allocator.clone());
let store = ObjectStore::new_empty(None, 2, fs.clone());
fs.object_manager().set_root_store(store.clone());
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
let device_range1 =
allocator.allocate(&mut transaction, 512).await.expect("allocate failed");
assert!(device_range1.length() == 512);
transaction.commit().await;
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
allocator.deallocate(&mut transaction, device_range1).await.expect("deallocate failed");
transaction.commit().await;
check_allocations(&allocator, &[]).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_mark_allocated() {
let device = DeviceHolder::new(FakeDevice::new(1024, 512));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
fs.object_manager().set_allocator(allocator.clone());
let store = ObjectStore::new_empty(None, 2, fs.clone());
fs.object_manager().set_root_store(store.clone());
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
let mut device_ranges = Vec::new();
device_ranges.push(0..512);
allocator
.mark_allocated(&mut transaction, device_ranges.last().unwrap().clone())
.await
.expect("mark_allocated failed");
device_ranges
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
assert!(device_ranges.last().unwrap().length() == 512);
assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
transaction.commit().await;
check_allocations(&allocator, &device_ranges).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_flush() {
let device = DeviceHolder::new(FakeDevice::new(1024, 512));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
fs.object_manager().set_allocator(allocator.clone());
let store = ObjectStore::new_empty(None, 2, fs.clone());
fs.object_manager().set_root_store(store.clone());
allocator.ensure_open().await.expect("ensure_open failed");
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
let graveyard = Graveyard::create(&mut transaction, &store).await.expect("create failed");
fs.object_manager().register_graveyard(graveyard);
let mut device_ranges = Vec::new();
device_ranges
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
device_ranges
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
device_ranges
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
transaction.commit().await;
allocator.flush().await.expect("flush failed");
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, false));
fs.object_manager().set_allocator(allocator.clone());
// When we flushed the allocator, it would have been written to the device somewhere but
// without a journal, we will be missing those records, so this next allocation will likely
// be on top of those objects. That won't matter for the purposes of this test, since we
// are not writing anything to these ranges.
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
device_ranges
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
for r in &device_ranges[..3] {
assert_eq!(overlap(r, device_ranges.last().unwrap()), 0);
}
transaction.commit().await;
check_allocations(&allocator, &device_ranges).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_dropped_transaction() {
let device = DeviceHolder::new(FakeDevice::new(1024, 512));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
fs.object_manager().set_allocator(allocator.clone());
let store = ObjectStore::new_empty(None, 2, fs.clone());
fs.object_manager().set_root_store(store.clone());
let allocated_range = {
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
allocator.allocate(&mut transaction, 512).await.expect("allocate failed")
};
// After dropping the transaction and attempting to allocate again, we should end up with
// the same range because the reservation should have been released.
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
assert_eq!(
allocator.allocate(&mut transaction, 512).await.expect("allocate failed"),
allocated_range
);
}
#[fasync::run_singlethreaded(test)]
async fn test_allocated_bytes() {
const BLOCK_COUNT: u32 = 1024;
const BLOCK_SIZE: u32 = 512;
let device = DeviceHolder::new(FakeDevice::new(BLOCK_COUNT.into(), BLOCK_SIZE));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
fs.object_manager().set_allocator(allocator.clone());
let store = ObjectStore::new_empty(None, 2, fs.clone());
fs.object_manager().set_root_store(store.clone());
assert_eq!(allocator.get_allocated_bytes(), 0);
// Verify allocated_bytes reflects allocation changes.
const ALLOCATED_BYTES: u64 = 512;
let allocated_range = {
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let range = allocator
.allocate(&mut transaction, ALLOCATED_BYTES)
.await
.expect("allocate failed");
transaction.commit().await;
assert_eq!(allocator.get_allocated_bytes(), ALLOCATED_BYTES);
range
};
{
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
allocator.allocate(&mut transaction, 512).await.expect("allocate failed");
// Prior to commiiting, the count of allocated bytes shouldn't change.
assert_eq!(allocator.get_allocated_bytes(), ALLOCATED_BYTES);
}
// After dropping the prior transaction, the allocated bytes still shouldn't have changed.
assert_eq!(allocator.get_allocated_bytes(), ALLOCATED_BYTES);
// Verify allocated_bytes reflects deallocations.
let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
let mut transaction =
fs.clone().new_transaction(&[], Options::default()).await.expect("new failed");
allocator.deallocate(&mut transaction, deallocate_range).await.expect("deallocate failed");
// Before committing, there should be no change.
assert_eq!(allocator.get_allocated_bytes(), ALLOCATED_BYTES);
transaction.commit().await;
// After committing, all but 40 bytes should remain allocated.
assert_eq!(allocator.get_allocated_bytes(), 40);
}
}