blob: fb82776b2c0d48c8ff69bdbfb7e801b459259ab7 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
mod allocator;
mod constants;
pub mod directory;
pub mod filesystem;
pub mod fsck;
mod graveyard;
mod journal;
mod merge;
pub mod object_manager;
mod record;
pub mod store_object_handle;
#[cfg(test)]
mod testing;
pub mod transaction;
mod tree;
pub mod volume;
pub use directory::Directory;
pub use filesystem::FxFilesystem;
pub use record::{ObjectDescriptor, Timestamp};
pub use store_object_handle::{round_down, round_up, StoreObjectHandle};
use {
crate::{
errors::FxfsError,
lsm_tree::{
layers_from_handles,
types::{BoxedLayerIterator, Item, ItemRef, LayerIterator},
LSMTree,
},
object_handle::{ObjectHandle, ObjectHandleExt, Writer, INVALID_OBJECT_ID},
object_store::{
filesystem::{Filesystem, Mutations},
journal::checksum_list::ChecksumList,
record::{
Checksums, ExtentKey, ExtentValue, ObjectItem, ObjectKey, ObjectKeyData,
ObjectKind, ObjectValue, DEFAULT_DATA_ATTRIBUTE_ID,
},
transaction::{
AssocObj, AssociatedObject, ExtentMutation, LockKey, Mutation, ObjectStoreMutation,
Operation, Options, StoreInfoMutation, Transaction,
},
},
trace_duration,
},
allocator::Allocator,
anyhow::{anyhow, bail, Context, Error},
async_trait::async_trait,
bincode::{deserialize_from, serialize_into},
futures::{future::BoxFuture, FutureExt},
interval_tree::utils::RangeOps,
once_cell::sync::OnceCell,
serde::{Deserialize, Serialize},
std::{
convert::TryFrom,
ops::Bound,
sync::{
atomic::{self, AtomicU64},
Arc, Mutex, Weak,
},
time::{Duration, SystemTime, UNIX_EPOCH},
},
storage_device::Device,
};
// TODO(jfsulliv): This probably could have a better home.
pub fn current_time() -> Timestamp {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO).into()
}
// StoreInfo stores information about the object store. This is stored within the parent object
// store, and is used, for example, to get the persistent layer objects.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct StoreInfo {
// The last used object ID. Note that this field is not accurate in memory; ObjectStore's
// last_object_id field is the one to use in that case.
last_object_id: u64,
// Object ids for layers. TODO(csuter): need a layer of indirection here so we can
// support snapshots.
object_tree_layers: Vec<u64>,
extent_tree_layers: Vec<u64>,
// The object ID for the root directory.
root_directory_object_id: u64,
// The object ID for the graveyard.
// TODO(csuter): Move this out of here. This can probably be a child of the root directory.
graveyard_directory_object_id: u64,
}
// TODO(csuter): We should test or put checks in place to ensure this limit isn't exceeded. It
// will likely involve placing limits on the maximum number of layers.
const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
#[derive(Default)]
pub struct HandleOptions {
/// If true, transactions used by this handle will skip journal space checks.
pub skip_journal_checks: bool,
}
/// An object store supports a file like interface for objects. Objects are keyed by a 64 bit
/// identifier. And object store has to be backed by a parent object store (which stores metadata
/// for the object store). The top-level object store (a.k.a. the root parent object store) is
/// in-memory only.
pub struct ObjectStore {
parent_store: Option<Arc<ObjectStore>>,
store_object_id: u64,
device: Arc<dyn Device>,
block_size: u32,
filesystem: Weak<dyn Filesystem>,
last_object_id: AtomicU64,
store_info: Mutex<Option<StoreInfo>>,
tree: LSMTree<ObjectKey, ObjectValue>,
extent_tree: LSMTree<ExtentKey, ExtentValue>,
// When replaying the journal, the store cannot read StoreInfo until the whole journal
// has been replayed, so during that time, store_info_handle will be None and records
// just get sent to the tree. Once the journal has been replayed, we can open the store
// and load all the other layer information.
store_info_handle: OnceCell<StoreObjectHandle<ObjectStore>>,
}
impl ObjectStore {
fn new(
parent_store: Option<Arc<ObjectStore>>,
store_object_id: u64,
filesystem: Arc<dyn Filesystem>,
store_info: Option<StoreInfo>,
) -> Arc<ObjectStore> {
let device = filesystem.device();
let block_size = filesystem.block_size();
let store = Arc::new(ObjectStore {
parent_store,
store_object_id,
device,
block_size,
filesystem: Arc::downgrade(&filesystem),
last_object_id: AtomicU64::new(0),
store_info: Mutex::new(store_info),
tree: LSMTree::new(merge::merge),
extent_tree: LSMTree::new(merge::merge_extents),
store_info_handle: OnceCell::new(),
});
store
}
pub fn new_empty(
parent_store: Option<Arc<ObjectStore>>,
store_object_id: u64,
filesystem: Arc<dyn Filesystem>,
) -> Arc<Self> {
Self::new(parent_store, store_object_id, filesystem, Some(StoreInfo::default()))
}
pub fn device(&self) -> &Arc<dyn Device> {
&self.device
}
pub fn block_size(&self) -> u32 {
self.block_size
}
pub fn filesystem(&self) -> Arc<dyn Filesystem> {
self.filesystem.upgrade().unwrap()
}
pub fn store_object_id(&self) -> u64 {
self.store_object_id
}
pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
&self.tree
}
pub fn extent_tree(&self) -> &LSMTree<ExtentKey, ExtentValue> {
&self.extent_tree
}
pub fn root_directory_object_id(&self) -> u64 {
self.store_info.lock().unwrap().as_ref().unwrap().root_directory_object_id
}
pub fn set_root_directory_object_id<'a>(&'a self, transaction: &mut Transaction<'a>, oid: u64) {
let mut store_info = self.txn_get_store_info(transaction);
store_info.root_directory_object_id = oid;
transaction.add_with_object(
self.store_object_id,
Mutation::store_info(store_info),
AssocObj::Borrowed(self),
);
}
pub fn graveyard_directory_object_id(&self) -> u64 {
self.store_info.lock().unwrap().as_ref().unwrap().graveyard_directory_object_id
}
pub fn set_graveyard_directory_object_id<'a>(
&'a self,
transaction: &mut Transaction<'a>,
oid: u64,
) {
let mut store_info = self.txn_get_store_info(transaction);
store_info.graveyard_directory_object_id = oid;
transaction.add_with_object(
self.store_object_id,
Mutation::store_info(store_info),
AssocObj::Borrowed(self),
);
}
pub async fn create_child_store<'a>(
self: &'a Arc<ObjectStore>,
transaction: &mut Transaction<'a>,
) -> Result<Arc<ObjectStore>, Error> {
let object_id = self.get_next_object_id();
self.create_child_store_with_id(transaction, object_id).await
}
async fn create_child_store_with_id<'a>(
self: &'a Arc<Self>,
transaction: &mut Transaction<'a>,
object_id: u64,
) -> Result<Arc<ObjectStore>, Error> {
self.ensure_open().await?;
// TODO(csuter): if the transaction rolls back, we need to delete the store.
let handle = ObjectStore::create_object_with_id(
self,
transaction,
object_id,
HandleOptions::default(),
)
.await?;
let fs = self.filesystem.upgrade().unwrap();
let store = Self::new_empty(Some(self.clone()), handle.object_id(), fs.clone());
assert!(store.store_info_handle.set(handle).is_ok());
fs.object_manager().add_store(store.clone());
Ok(store)
}
pub async fn open_object<S: AsRef<ObjectStore>>(
owner: &Arc<S>,
object_id: u64,
options: HandleOptions,
) -> Result<StoreObjectHandle<S>, Error> {
let store = owner.as_ref().as_ref();
store.ensure_open().await?;
let item = store
.tree
.find(&ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID))
.await?
.ok_or(FxfsError::NotFound)?;
if let ObjectValue::Attribute { size } = item.value {
Ok(StoreObjectHandle::new(
owner.clone(),
object_id,
DEFAULT_DATA_ATTRIBUTE_ID,
size,
options,
false,
))
} else {
bail!(FxfsError::Inconsistent);
}
}
async fn create_object_with_id<S: AsRef<ObjectStore>>(
owner: &Arc<S>,
transaction: &mut Transaction<'_>,
object_id: u64,
options: HandleOptions,
) -> Result<StoreObjectHandle<S>, Error> {
let store = owner.as_ref().as_ref();
store.ensure_open().await?;
// If the object ID was specified i.e. this hasn't come via create_object, then we
// should update last_object_id in case the caller wants to create more objects in
// the same transaction.
store.update_last_object_id(object_id);
let now = current_time();
transaction.add(
store.store_object_id(),
Mutation::insert_object(
ObjectKey::object(object_id),
ObjectValue::file(1, 0, now.clone(), now),
),
);
transaction.add(
store.store_object_id(),
Mutation::insert_object(
ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID),
ObjectValue::attribute(0),
),
);
Ok(StoreObjectHandle::new(
owner.clone(),
object_id,
DEFAULT_DATA_ATTRIBUTE_ID,
0,
options,
false,
))
}
pub async fn create_object<S: AsRef<ObjectStore>>(
owner: &Arc<S>,
mut transaction: &mut Transaction<'_>,
options: HandleOptions,
) -> Result<StoreObjectHandle<S>, Error> {
let object_id = owner.as_ref().as_ref().get_next_object_id();
ObjectStore::create_object_with_id(owner, &mut transaction, object_id, options).await
}
/// Adjusts the reference count for a given object. If the reference count reaches zero, the
/// object is moved into the graveyard and true is returned.
pub async fn adjust_refs(
&self,
transaction: &mut Transaction<'_>,
oid: u64,
delta: i64,
) -> Result<bool, Error> {
let mut item = self.txn_get_object(transaction, oid).await?;
let refs =
if let ObjectValue::Object { kind: ObjectKind::File { ref mut refs, .. }, .. } =
item.value
{
*refs = if delta < 0 {
refs.checked_sub((-delta) as u64)
} else {
refs.checked_add(delta as u64)
}
.ok_or(anyhow!("refs underflow/overflow"))?;
refs
} else {
bail!(FxfsError::NotFile);
};
if *refs == 0 {
// Move the object into the graveyard.
self.filesystem().object_manager().graveyard().unwrap().add(
transaction,
self.store_object_id,
oid,
);
// We might still need to adjust the reference count if delta was something other than
// -1.
if delta != -1 {
*refs = 1;
transaction.add(
self.store_object_id,
Mutation::replace_or_insert_object(item.key, item.value),
);
}
Ok(true)
} else {
transaction.add(
self.store_object_id,
Mutation::replace_or_insert_object(item.key, item.value),
);
Ok(false)
}
}
// Purges an object that is in the graveyard. This has no locking, so it's not safe to call
// this more than once simultaneously for a given object.
pub async fn tombstone(&self, object_id: u64, txn_options: Options<'_>) -> Result<(), Error> {
let fs = self.filesystem();
let mut search_key = ExtentKey::new(object_id, 0, 0..0);
// TODO(csuter): There should be a test that runs fsck after each transaction.
loop {
let mut transaction = fs.clone().new_transaction(&[], txn_options).await?;
let next_key = self.delete_extents(&mut transaction, &search_key).await?;
if next_key.is_none() {
transaction.add(
self.store_object_id,
Mutation::merge_object(
ObjectKey::tombstone(search_key.object_id),
ObjectValue::None,
),
);
// Remove the object from the graveyard.
self.filesystem().object_manager().graveyard().unwrap().remove(
&mut transaction,
self.store_object_id,
search_key.object_id,
);
}
transaction.commit().await?;
search_key = if let Some(next_key) = next_key {
next_key
} else {
break;
};
}
Ok(())
}
// Makes progress on deleting part of a file but stops before a transaction gets too big.
async fn delete_extents(
&self,
transaction: &mut Transaction<'_>,
search_key: &ExtentKey,
) -> Result<Option<ExtentKey>, Error> {
let layer_set = self.extent_tree.layer_set();
let mut merger = layer_set.merger();
let allocator = self.allocator();
let mut iter = merger.seek(Bound::Included(search_key)).await?;
let mut delete_extent_mutation = None;
// Loop over the extents and deallocate them.
while let Some(ItemRef {
key: ExtentKey { object_id, attribute_id, range },
value: ExtentValue { device_offset },
..
}) = iter.get()
{
if *object_id != search_key.object_id {
break;
}
if let Some((device_offset, _)) = device_offset {
let device_range = *device_offset..*device_offset + (range.end - range.start);
allocator.deallocate(transaction, device_range).await?;
delete_extent_mutation = Some(Mutation::extent(
ExtentKey::new(search_key.object_id, *attribute_id, 0..range.end),
ExtentValue::deleted_extent(),
));
// Stop if the transaction is getting too big. At time of writing, this threshold
// limits transactions to about 10,000 bytes.
const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
if transaction.mutations.len() >= TRANSACTION_MUTATION_THRESHOLD {
transaction.add(self.store_object_id, delete_extent_mutation.unwrap());
return Ok(Some(ExtentKey::search_key_from_offset(
search_key.object_id,
*attribute_id,
range.end,
)));
}
}
iter.advance().await?;
}
if let Some(m) = delete_extent_mutation {
transaction.add(self.store_object_id, m);
}
Ok(None)
}
/// Returns all objects that exist in the parent store that pertain to this object store.
pub fn parent_objects(&self) -> Vec<u64> {
assert!(self.store_info_handle.get().is_some());
let mut objects = Vec::new();
// We should not include the ID of the store itself, since that should be referred to in the
// volume directory.
let guard = self.store_info.lock().unwrap();
let store_info = guard.as_ref().unwrap();
objects.extend_from_slice(&store_info.object_tree_layers);
objects.extend_from_slice(&store_info.extent_tree_layers);
objects
}
/// Returns root objects for this store.
pub fn root_objects(&self) -> Vec<u64> {
let mut objects = Vec::new();
let store_info = self.store_info.lock().unwrap();
if store_info.as_ref().unwrap().root_directory_object_id != INVALID_OBJECT_ID {
objects.push(store_info.as_ref().unwrap().root_directory_object_id);
}
if store_info.as_ref().unwrap().graveyard_directory_object_id != INVALID_OBJECT_ID {
objects.push(store_info.as_ref().unwrap().graveyard_directory_object_id);
}
objects
}
pub fn store_info(&self) -> StoreInfo {
self.store_info.lock().unwrap().as_ref().unwrap().clone()
}
pub async fn ensure_open(&self) -> Result<(), Error> {
if self.parent_store.is_none() || self.store_info_handle.get().is_some() {
return Ok(());
}
let fs = self.filesystem();
let _guard = fs
.write_lock(&[LockKey::object(
self.parent_store.as_ref().unwrap().store_object_id(),
self.store_object_id,
)])
.await;
if self.store_info_handle.get().is_some() {
// We lost the race.
Ok(())
} else {
self.open_impl().await
}
}
// This returns a BoxFuture because of the cycle: open_object -> ensure_open -> open_impl ->
// open_object.
fn open_impl<'a>(&'a self) -> BoxFuture<'a, Result<(), Error>> {
async move {
let parent_store = self.parent_store.as_ref().unwrap();
let handle = ObjectStore::open_object(
&parent_store,
self.store_object_id,
HandleOptions::default(),
)
.await?;
let (object_tree_layer_object_ids, extent_tree_layer_object_ids) = loop {
if let Some(store_info) = &*self.store_info.lock().unwrap() {
break (
store_info.object_tree_layers.clone(),
store_info.extent_tree_layers.clone(),
);
}
if handle.get_size() > 0 {
let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
let store_info: StoreInfo = deserialize_from(&serialized_info[..])
.context("Failed to deserialize StoreInfo")?;
let layer_object_ids = (
store_info.object_tree_layers.clone(),
store_info.extent_tree_layers.clone(),
);
self.update_last_object_id(store_info.last_object_id);
*self.store_info.lock().unwrap() = Some(store_info);
break layer_object_ids;
}
// The store_info will be absent for a newly created and empty object store, since
// there have been no StoreInfoMutations applied to it.
break (vec![], vec![]);
};
let mut handles = Vec::new();
let mut total_size = 0;
for object_id in object_tree_layer_object_ids {
let handle =
ObjectStore::open_object(&parent_store, object_id, HandleOptions::default())
.await?;
total_size += handle.get_size();
handles.push(handle);
}
self.tree.append_layers(handles.into()).await?;
let mut handles = Vec::new();
for object_id in extent_tree_layer_object_ids {
let handle =
ObjectStore::open_object(&parent_store, object_id, HandleOptions::default())
.await?;
total_size += handle.get_size();
handles.push(handle);
}
self.extent_tree.append_layers(handles.into()).await?;
let _ = self.store_info_handle.set(handle);
self.filesystem().object_manager().update_reservation(self.store_object_id, total_size);
Ok(())
}
.boxed()
}
fn get_next_object_id(&self) -> u64 {
self.last_object_id.fetch_add(1, atomic::Ordering::Relaxed) + 1
}
fn allocator(&self) -> Arc<dyn Allocator> {
self.filesystem().allocator()
}
fn txn_get_store_info(&self, transaction: &Transaction<'_>) -> StoreInfo {
match transaction.get_store_info(self.store_object_id) {
None => self.store_info(),
Some(store_info) => store_info.clone(),
}
}
// If |transaction| has an impending mutation for the underlying object, returns that.
// Otherwise, looks up the object from the tree.
async fn txn_get_object(
&self,
transaction: &Transaction<'_>,
object_id: u64,
) -> Result<ObjectItem, Error> {
if let Some(ObjectStoreMutation { item, .. }) =
transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
{
Ok(item.clone())
} else {
self.tree.find(&ObjectKey::object(object_id)).await?.ok_or(anyhow!(FxfsError::NotFound))
}
}
fn update_last_object_id(&self, object_id: u64) {
let _ = self.last_object_id.fetch_update(
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
|oid| if object_id > oid { Some(object_id) } else { None },
);
}
async fn validate_mutation(
journal_offset: u64,
mutation: &Mutation,
checksum_list: &mut ChecksumList,
) -> Result<bool, Error> {
if let Mutation::Extent(ExtentMutation(
ExtentKey { range, .. },
ExtentValue { device_offset: Some((device_offset, Checksums::Fletcher(checksums))) },
)) = mutation
{
if checksums.len() == 0 {
return Ok(false);
}
let len = if let Ok(l) = usize::try_from(range.length()) {
l
} else {
return Ok(false);
};
if len % checksums.len() != 0 {
return Ok(false);
}
if (len / checksums.len()) % 4 != 0 {
return Ok(false);
}
checksum_list.push(
journal_offset,
*device_offset..*device_offset + range.length(),
checksums,
);
}
Ok(true)
}
}
// In a major compaction (i.e. a compaction which involves the base layer), we have an opportunity
// to apply a number of optimizations, such as removing tombstoned objects or deleted extents.
// These optimizations can only be applied after the compaction completes, thus we have an explicit
// iterator to apply these optimizations.
struct MajorCompactionIterator<'a, K, V, F> {
iter: BoxedLayerIterator<'a, K, V>,
can_discard: F,
}
impl<'a, K, V, F> MajorCompactionIterator<'a, K, V, F> {
pub fn new(iter: BoxedLayerIterator<'a, K, V>, can_discard: F) -> Self {
Self { iter, can_discard }
}
}
#[async_trait]
impl<K: Send + Sync, V: Send + Sync, F: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync>
LayerIterator<K, V> for MajorCompactionIterator<'_, K, V, F>
{
async fn advance(&mut self) -> Result<(), Error> {
self.iter.advance().await?;
loop {
match self.iter.get() {
Some(item) if (self.can_discard)(item) => self.iter.advance().await?,
_ => return Ok(()),
}
}
}
fn get(&self) -> Option<ItemRef<'_, K, V>> {
self.iter.get()
}
}
#[async_trait]
impl Mutations for ObjectStore {
async fn apply_mutation(
&self,
mutation: Mutation,
transaction: Option<&Transaction<'_>>,
log_offset: u64,
_assoc_obj: AssocObj<'_>,
) {
// It's not safe to fully open a store until replay is fully complete (because
// subsequent mutations could render current records invalid). The exception to
// this is the root parent object store which contains the extents for the journal
// file: whilst we are replaying we need to be able to track new extents for the
// journal file so that we can read from it whilst we are replaying.
assert!(
transaction.is_some()
|| self.store_info_handle.get().is_none()
|| self.parent_store.is_none()
);
match mutation {
Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => {
item.sequence = log_offset;
self.update_last_object_id(item.key.object_id);
match op {
Operation::Insert => self.tree.insert(item).await,
Operation::ReplaceOrInsert => self.tree.replace_or_insert(item).await,
Operation::Merge => {
let lower_bound = item.key.key_for_merge_into();
self.tree.merge_into(item, &lower_bound).await;
}
}
}
Mutation::ObjectStoreInfo(StoreInfoMutation(store_info)) => {
*self.store_info.lock().unwrap() = Some(store_info);
}
Mutation::BeginFlush => {
self.tree.seal().await;
self.extent_tree.seal().await;
}
Mutation::EndFlush => {
if transaction.is_none() {
self.tree.reset_immutable_layers();
self.extent_tree.reset_immutable_layers();
// StoreInfo needs to be read from the store-info file.
*self.store_info.lock().unwrap() = None;
} else {
let object_tree_layer_set = self.tree.immutable_layer_set();
let object_tree_handles =
object_tree_layer_set.layers.iter().map(|l| l.handle());
let extent_tree_layer_set = self.extent_tree.immutable_layer_set();
let extent_tree_handles =
extent_tree_layer_set.layers.iter().map(|l| l.handle());
self.filesystem().object_manager().update_reservation(
self.store_object_id,
object_tree_handles
.chain(extent_tree_handles)
.map(|h| h.map(ObjectHandle::get_size).unwrap_or(0))
.sum(),
);
}
}
Mutation::Extent(ExtentMutation(key, value)) => {
let item = Item::new_with_sequence(key, value, log_offset);
let lower_bound = item.key.key_for_merge_into();
self.extent_tree.merge_into(item, &lower_bound).await;
}
_ => panic!("unexpected mutation: {:?}", mutation), // TODO(csuter): can't panic
}
}
fn drop_mutation(&self, _mutation: Mutation, _transaction: &Transaction<'_>) {}
/// Push all in-memory structures to the device. This is not necessary for sync since the
/// journal will take care of it. This is supposed to be called when there is either memory or
/// space pressure (flushing the store will persist in-memory data and allow the journal file to
/// be trimmed). This is not thread-safe insofar as calling flush from multiple threads at the
/// same time is not safe.
async fn flush(&self) -> Result<(), Error> {
trace_duration!("ObjectStore::flush", "store_object_id" => self.store_object_id);
if self.parent_store.is_none() {
return Ok(());
}
self.ensure_open().await?;
let filesystem = self.filesystem();
let object_manager = filesystem.object_manager();
if !object_manager.needs_flush(self.store_object_id) {
return Ok(());
}
let parent_store = self.parent_store.as_ref().unwrap();
let graveyard = object_manager.graveyard().ok_or(anyhow!("Missing graveyard!"))?;
let reservation = object_manager.metadata_reservation();
let txn_options = Options {
skip_journal_checks: true,
borrow_metadata_space: true,
allocator_reservation: Some(reservation),
..Default::default()
};
let mut transaction = filesystem.clone().new_transaction(&[], txn_options).await?;
let new_object_tree_layer = ObjectStore::create_object(
parent_store,
&mut transaction,
HandleOptions { skip_journal_checks: true, ..Default::default() },
)
.await?;
let new_object_tree_layer_object_id = new_object_tree_layer.object_id();
graveyard.add(
&mut transaction,
parent_store.store_object_id(),
new_object_tree_layer_object_id,
);
let new_extent_tree_layer = ObjectStore::create_object(
parent_store,
&mut transaction,
HandleOptions { skip_journal_checks: true, ..Default::default() },
)
.await?;
let new_extent_tree_layer_object_id = new_extent_tree_layer.object_id();
graveyard.add(
&mut transaction,
parent_store.store_object_id(),
new_extent_tree_layer_object_id,
);
transaction.add(self.store_object_id(), Mutation::BeginFlush);
transaction.commit().await?;
impl tree::MajorCompactable<ObjectKey, ObjectValue> for LSMTree<ObjectKey, ObjectValue> {
fn major_iter(
iter: BoxedLayerIterator<'_, ObjectKey, ObjectValue>,
) -> BoxedLayerIterator<'_, ObjectKey, ObjectValue> {
Box::new(MajorCompactionIterator::new(iter, |item: ItemRef<'_, _, _>| {
match item.key {
ObjectKey { data: ObjectKeyData::Tombstone, .. } => true,
_ => false,
}
}))
}
}
impl tree::MajorCompactable<ExtentKey, ExtentValue> for LSMTree<ExtentKey, ExtentValue> {
fn major_iter(
iter: BoxedLayerIterator<'_, ExtentKey, ExtentValue>,
) -> BoxedLayerIterator<'_, ExtentKey, ExtentValue> {
Box::new(MajorCompactionIterator::new(iter, |item: ItemRef<'_, _, _>| {
match item.value {
ExtentValue { device_offset: None } => true,
_ => false,
}
}))
}
}
let (object_tree_layers_to_keep, old_object_tree_layers) =
tree::flush(&self.tree, Writer::new(&new_object_tree_layer, txn_options)).await?;
let (extent_tree_layers_to_keep, old_extent_tree_layers) =
tree::flush(&self.extent_tree, Writer::new(&new_extent_tree_layer, txn_options))
.await?;
let mut new_object_tree_layers =
layers_from_handles(Box::new([new_object_tree_layer])).await?;
new_object_tree_layers.extend(object_tree_layers_to_keep.iter().map(|l| (*l).clone()));
let mut new_extent_tree_layers =
layers_from_handles(Box::new([new_extent_tree_layer])).await?;
new_extent_tree_layers.extend(extent_tree_layers_to_keep.iter().map(|l| (*l).clone()));
let mut serialized_info = Vec::new();
let mut new_store_info = self.store_info();
let mut transaction = filesystem.clone().new_transaction(&[], txn_options).await?;
// Move the existing layers we're compacting to the graveyard.
for layer in &old_object_tree_layers {
if let Some(handle) = layer.handle() {
graveyard.add(&mut transaction, parent_store.store_object_id(), handle.object_id());
}
}
for layer in &old_extent_tree_layers {
if let Some(handle) = layer.handle() {
graveyard.add(&mut transaction, parent_store.store_object_id(), handle.object_id());
}
}
new_store_info.last_object_id = self.last_object_id.load(atomic::Ordering::Relaxed);
new_store_info.object_tree_layers = Vec::new();
for layer in &new_object_tree_layers {
if let Some(handle) = layer.handle() {
new_store_info.object_tree_layers.push(handle.object_id());
}
}
new_store_info.extent_tree_layers = Vec::new();
for layer in &new_extent_tree_layers {
if let Some(handle) = layer.handle() {
new_store_info.extent_tree_layers.push(handle.object_id());
}
}
serialize_into(&mut serialized_info, &new_store_info)?;
let mut buf = self.device.allocate_buffer(serialized_info.len());
buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
self.store_info_handle
.get()
.unwrap()
.txn_write(&mut transaction, 0u64, buf.as_ref())
.await?;
transaction.add(self.store_object_id(), Mutation::EndFlush);
graveyard.remove(
&mut transaction,
parent_store.store_object_id(),
new_object_tree_layer_object_id,
);
graveyard.remove(
&mut transaction,
parent_store.store_object_id(),
new_extent_tree_layer_object_id,
);
transaction
.commit_with_callback(|_| {
*self.store_info.lock().unwrap() = Some(new_store_info);
self.tree.set_layers(new_object_tree_layers);
self.extent_tree.set_layers(new_extent_tree_layers);
})
.await?;
// Now close the layers and purge them.
for layer in old_object_tree_layers {
let object_id = layer.handle().map(|h| h.object_id());
layer.close_layer().await;
if let Some(object_id) = object_id {
parent_store.tombstone(object_id, txn_options).await?;
}
}
for layer in old_extent_tree_layers {
let object_id = layer.handle().map(|h| h.object_id());
layer.close_layer().await;
if let Some(object_id) = object_id {
parent_store.tombstone(object_id, txn_options).await?;
}
}
Ok(())
}
}
impl AsRef<ObjectStore> for ObjectStore {
fn as_ref(&self) -> &ObjectStore {
self
}
}
impl AssociatedObject for ObjectStore {}
#[cfg(test)]
mod tests {
use {
crate::{
errors::FxfsError,
lsm_tree::types::{ItemRef, LayerIterator},
object_handle::{ObjectHandle, ObjectHandleExt},
object_store::{
directory::Directory,
filesystem::{Filesystem, FxFilesystem, Mutations, OpenFxFilesystem, SyncOptions},
fsck::fsck,
record::{ExtentKey, ExtentValue, ObjectKey},
transaction::{Options, TransactionHandler},
HandleOptions, ObjectStore,
},
},
fuchsia_async as fasync,
futures::{future::join_all, join},
matches::assert_matches,
std::{
ops::Bound,
sync::{Arc, Mutex},
time::Duration,
},
storage_device::{fake_device::FakeDevice, DeviceHolder},
};
const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
async fn test_filesystem() -> OpenFxFilesystem {
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
FxFilesystem::new_empty(device).await.expect("new_empty failed")
}
#[fasync::run_singlethreaded(test)]
async fn test_item_sequences() {
let fs = test_filesystem().await;
let object1;
let object2;
let object3;
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let store = fs.root_store();
object1 = Arc::new(
ObjectStore::create_object(&store, &mut transaction, HandleOptions::default())
.await
.expect("create_object failed"),
);
transaction.commit().await.expect("commit failed");
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
object2 = Arc::new(
ObjectStore::create_object(&store, &mut transaction, HandleOptions::default())
.await
.expect("create_object failed"),
);
transaction.commit().await.expect("commit failed");
fs.sync(SyncOptions::default()).await.expect("sync failed");
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
object3 = Arc::new(
ObjectStore::create_object(&store, &mut transaction, HandleOptions::default())
.await
.expect("create_object failed"),
);
transaction.commit().await.expect("commit failed");
let layer_set = store.tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
let mut sequences = [0u64; 3];
while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() {
if *object_id == object1.object_id() {
sequences[0] = sequence;
} else if *object_id == object2.object_id() {
sequences[1] = sequence;
} else if *object_id == object3.object_id() {
sequences[2] = sequence;
}
iter.advance().await.expect("advance failed");
}
assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences);
// The last item came after a sync, so should be strictly greater.
assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_create_and_open_store() {
let fs = test_filesystem().await;
let store_id = {
let root_store = fs.root_store();
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let child_store = root_store
.create_child_store(&mut transaction)
.await
.expect("create_child_store failed");
transaction.commit().await.expect("commit failed");
child_store.store_object_id()
};
fs.close().await.expect("close failed");
let device = fs.take_device().await;
device.reopen();
let fs = FxFilesystem::open(device).await.expect("open failed");
fs.object_manager().open_store(store_id).await.expect("open_store failed");
fs.close().await.expect("Close failed");
}
#[fasync::run(10, test)]
async fn test_concurrent_store_opening() {
let fs = test_filesystem().await;
let store_id = {
let store = fs.root_store();
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let child_store = store
.create_child_store_with_id(&mut transaction, 555u64)
.await
.expect("create_child_store failed");
transaction.commit().await.expect("commit failed");
child_store.store_object_id()
};
let mut fs = Some(fs);
for _ in 0..20 {
let device = {
let fs = fs.unwrap();
fs.close().await.expect("close failed");
let device = fs.take_device().await;
device.reopen();
device
};
fs = Some(FxFilesystem::open(device).await.expect("open failed"));
join_all((0..4).map(|_| {
let manager = fs.as_ref().unwrap().object_manager();
fasync::Task::spawn(async move {
manager.open_store(store_id).await.expect("open_store failed");
})
}))
.await;
}
fs.unwrap().close().await.expect("Close failed");
}
#[fasync::run(10, test)]
async fn test_old_layers_are_purged() {
let fs = test_filesystem().await;
let store = fs.root_store();
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let object = Arc::new(
ObjectStore::create_object(&store, &mut transaction, HandleOptions::default())
.await
.expect("create_object failed"),
);
transaction.commit().await.expect("commit failed");
store.flush().await.expect("flush failed");
let mut buf = object.allocate_buffer(5);
buf.as_mut_slice().copy_from_slice(b"hello");
object.write(0, buf.as_ref()).await.expect("write failed");
// Getting the layer-set should cause the flush to stall.
let layer_set = store.tree().layer_set();
let done = Mutex::new(false);
let mut object_id = 0;
join!(
async {
store.flush().await.expect("flush failed");
assert!(*done.lock().unwrap());
},
async {
// This is a halting problem so all we can do is sleep.
fasync::Timer::new(Duration::from_secs(1)).await;
*done.lock().unwrap() = true;
object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
std::mem::drop(layer_set);
}
);
if let Err(e) = ObjectStore::open_object(
&store.parent_store.as_ref().unwrap(),
object_id,
HandleOptions::default(),
)
.await
{
assert!(FxfsError::NotFound.matches(&e));
} else {
panic!("open_object succeeded");
}
}
#[fasync::run_singlethreaded(test)]
async fn test_tombstone_deletes_data() {
let fs = test_filesystem().await;
let root_store = fs.root_store();
let child_id = {
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let child =
ObjectStore::create_object(&root_store, &mut transaction, HandleOptions::default())
.await
.expect("create_child failed");
transaction.commit().await.expect("commit failed");
// Allocate an extent in the file.
let mut buffer = child.allocate_buffer(8192);
buffer.as_mut_slice().fill(0xaa);
child.write(0, buffer.as_ref()).await.expect("write failed");
child.object_id()
};
root_store.tombstone(child_id, Options::default()).await.expect("tombstone failed");
let layers = root_store.extent_tree.layer_set();
let mut merger = layers.merger();
let mut iter = merger
.seek(Bound::Included(&ExtentKey::new(child_id, 0, 0..8192).search_key()))
.await
.expect("seek failed");
assert_matches!(
iter.get(),
Some(ItemRef { value: ExtentValue { device_offset: None }, .. })
);
iter.advance().await.expect("advance failed");
assert_matches!(iter.get(), None);
}
#[fasync::run_singlethreaded(test)]
async fn test_major_compaction_discards_unnecessary_records() {
let fs = test_filesystem().await;
let root_store = fs.root_store();
let child_id = {
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let child =
ObjectStore::create_object(&root_store, &mut transaction, HandleOptions::default())
.await
.expect("create_child failed");
transaction.commit().await.expect("commit failed");
// Allocate an extent in the file.
let mut buffer = child.allocate_buffer(8192);
buffer.as_mut_slice().fill(0xaa);
child.write(0, buffer.as_ref()).await.expect("write failed");
child.object_id()
};
let has_deleted_extent_records = |root_store: Arc<ObjectStore>, child_id| async move {
let layers = root_store.extent_tree.layer_set();
let mut merger = layers.merger();
let mut iter = merger
.seek(Bound::Included(&ExtentKey::new(child_id, 0, 0..1).search_key()))
.await
.expect("seek failed");
loop {
match iter.get() {
None => return false,
Some(ItemRef {
key: ExtentKey { object_id, .. },
value: ExtentValue { device_offset: None },
..
}) if *object_id == child_id => return true,
_ => {}
}
iter.advance().await.expect("advance failed");
}
};
root_store.tombstone(child_id, Options::default()).await.expect("tombstone failed");
assert_matches!(
root_store.tree.find(&ObjectKey::tombstone(child_id)).await.expect("find failed"),
Some(_)
);
assert!(has_deleted_extent_records(root_store.clone(), child_id).await);
root_store.flush().await.expect("flush failed");
assert_matches!(
root_store.tree.find(&ObjectKey::tombstone(child_id)).await.expect("find failed"),
None
);
assert!(!has_deleted_extent_records(root_store.clone(), child_id).await);
}
#[fasync::run_singlethreaded(test)]
async fn test_overlapping_extents_in_different_layers() {
let fs = test_filesystem().await;
let store = fs.root_store();
let root_directory =
Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let object = root_directory
.create_child_file(&mut transaction, "test")
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
let buf = object.allocate_buffer(16384);
object.write(0, buf.as_ref()).await.expect("write failed");
store.flush().await.expect("flush failed");
object.write(0, buf.subslice(0..4096)).await.expect("write failed");
// At this point, we should have an extent for 0..16384 in a layer that has been flushed,
// and an extent for 0..4096 that partially overwrites it. Writing to 0..16384 should
// overwrite both of those extents.
object.write(0, buf.as_ref()).await.expect("write failed");
fsck(&fs).await.expect("fsck failed");
}
}
// TODO(csuter): validation of all deserialized structs.
// TODO(csuter): check all panic! calls.