blob: 4cd5e6751de01acc91d5aba29a7b94589c0771ba [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
debug_assert_not_too_long,
errors::FxfsError,
fsck::{fsck_volume_with_options, fsck_with_options, FsckOptions},
log::*,
metrics,
object_store::{
allocator::{Allocator, Hold, Reservation},
directory::Directory,
graveyard::Graveyard,
journal::{
self, super_block::SuperBlockHeader, Journal, JournalCheckpoint, JournalOptions,
},
object_manager::ObjectManager,
transaction::{
self, lock_keys, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation,
Mutation, ReadGuard, Transaction, TRANSACTION_METADATA_MAX_AMOUNT,
},
volume::{root_volume, VOLUMES_DIRECTORY},
ObjectStore,
},
range::RangeExt,
serialized_types::Version,
},
anyhow::{bail, Context, Error},
async_trait::async_trait,
event_listener::Event,
fuchsia_async as fasync,
fuchsia_inspect::{NumericProperty as _, UintProperty},
futures::FutureExt,
fxfs_crypto::Crypt,
once_cell::sync::OnceCell,
static_assertions::const_assert,
std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, Weak,
},
storage_device::{Device, DeviceHolder},
};
pub const MIN_BLOCK_SIZE: u64 = 4096;
pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
// Whilst Fxfs could support up to u64::MAX, off_t is i64 so allowing files larger than that becomes
// difficult to deal with via the POSIX APIs. Additionally, PagedObjectHandle only sees data get
// modified in page chunks so to prevent writes at i64::MAX the entire page containing i64::MAX
// needs to be excluded.
pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
const_assert!(9223372036854771712 == MAX_FILE_SIZE);
// The maximum number of transactions that can be in-flight at any time.
const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
// Start trimming 1 hour after boot. The idea here is to wait until the initial flurry of
// activity during boot is finished. This is a rough heuristic and may need to change later if
// performance is affected.
const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
// After the initial trim, perform another trim every 24 hours.
const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
/// Holds information on an Fxfs Filesystem
pub struct Info {
pub total_bytes: u64,
pub used_bytes: u64,
}
pub type PostCommitHook =
Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
pub struct Options {
/// True if the filesystem is read-only.
pub read_only: bool,
/// The metadata keys will be rolled after this many bytes. This must be large enough such that
/// we can't end up with more than two live keys (so it must be bigger than the maximum possible
/// size of unflushed journal contents). This is exposed for testing purposes.
pub roll_metadata_key_byte_count: u64,
/// A callback that runs before every transaction is committed. If this callback returns an
/// error then the transaction is failed with that error.
pub pre_commit_hook: PreCommitHook,
/// A callback that runs after every transaction has been committed. This will be called whilst
/// a lock is held which will block more transactions from being committed.
pub post_commit_hook: PostCommitHook,
/// If true, don't do an initial reap of the graveyard at mount time. This is useful for
/// testing.
pub skip_initial_reap: bool,
// The first duration is how long after the filesystem has been mounted to perform an initial
// trim. The second is the interval to repeat trimming thereafter. If set to None, no trimming
// is done.
// Default values are (5 minutes, 24 hours).
pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
}
impl Default for Options {
fn default() -> Self {
Options {
roll_metadata_key_byte_count: 128 * 1024 * 1024,
read_only: false,
pre_commit_hook: None,
post_commit_hook: None,
skip_initial_reap: false,
trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
}
}
}
/// The context in which a transaction is being applied.
pub struct ApplyContext<'a, 'b> {
/// The mode indicates whether the transaction is being replayed.
pub mode: ApplyMode<'a, 'b>,
/// The transaction checkpoint for this mutation.
pub checkpoint: JournalCheckpoint,
}
/// A transaction can be applied during replay or on a live running system (in which case a
/// transaction object will be available).
pub enum ApplyMode<'a, 'b> {
Replay,
Live(&'a Transaction<'b>),
}
impl ApplyMode<'_, '_> {
pub fn is_replay(&self) -> bool {
matches!(self, ApplyMode::Replay)
}
pub fn is_live(&self) -> bool {
matches!(self, ApplyMode::Live(_))
}
}
/// Objects that use journaling to track mutations (`Allocator` and `ObjectStore`) implement this.
/// This is primarily used by `ObjectManager` and `SuperBlock` with flush calls used in a few tests.
#[async_trait]
pub trait JournalingObject: Send + Sync {
/// This method get called when the transaction commits, which can either be during live
/// operation (See `ObjectManager::apply_mutation`) or during journal replay, in which case
/// transaction will be None (See `super_block::read`).
fn apply_mutation(
&self,
mutation: Mutation,
context: &ApplyContext<'_, '_>,
assoc_obj: AssocObj<'_>,
) -> Result<(), Error>;
/// Called when a transaction fails to commit.
fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
/// Flushes in-memory changes to the device (to allow journal space to be freed).
///
/// Also returns the earliest version of a struct in the filesystem.
async fn flush(&self) -> Result<Version, Error>;
/// Writes a mutation to the journal. This allows objects to encrypt or otherwise modify what
/// gets written to the journal.
fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
writer.write(mutation.clone());
}
}
#[derive(Default)]
pub struct SyncOptions<'a> {
/// If set, the journal will be flushed, as well as the underlying block device. This is much
/// more expensive, but ensures the contents of the journal are persisted (which also acts as a
/// barrier, ensuring all previous journal writes are observable by future operations).
/// Note that when this is not set, the journal is *not* synchronously flushed by the sync call,
/// and it will return before the journal flush completes. In other words, some journal
/// mutations may still be buffered in memory after this call returns.
pub flush_device: bool,
// A precondition that is evaluated whilst a lock is held that determines whether or not the
// sync needs to proceed.
pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
}
pub struct OpenFxFilesystem(Arc<FxFilesystem>);
impl OpenFxFilesystem {
/// Waits for filesystem to be dropped (so callers should ensure all direct and indirect
/// references are dropped) and returns the device. No attempt is made at a graceful shutdown.
pub async fn take_device(self) -> DeviceHolder {
let fut = self.device.take_when_dropped();
std::mem::drop(self);
debug_assert_not_too_long!(fut)
}
}
impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
fn from(fs: Arc<FxFilesystem>) -> Self {
Self(fs)
}
}
impl Drop for OpenFxFilesystem {
fn drop(&mut self) {
if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
}
}
}
impl std::ops::Deref for OpenFxFilesystem {
type Target = Arc<FxFilesystem>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub struct FxFilesystemBuilder {
format: bool,
trace: bool,
options: Options,
journal_options: JournalOptions,
on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
fsck_after_every_transaction: bool,
}
impl FxFilesystemBuilder {
pub fn new() -> Self {
Self {
format: false,
trace: false,
options: Options::default(),
journal_options: JournalOptions::default(),
on_new_allocator: None,
on_new_store: None,
fsck_after_every_transaction: false,
}
}
/// Sets whether the block device should be formatted when opened. Defaults to `false`.
pub fn format(mut self, format: bool) -> Self {
self.format = format;
self
}
/// Enables or disables trace level logging. Defaults to `false`.
pub fn trace(mut self, trace: bool) -> Self {
self.trace = trace;
self
}
/// Sets whether the filesystem will be opened in read-only mode. Defaults to `false`.
/// Incompatible with `format`.
pub fn read_only(mut self, read_only: bool) -> Self {
self.options.read_only = read_only;
self
}
/// Sets how often the metadata keys are rolled. See `Options::roll_metadata_key_byte_count`.
pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
self
}
/// Sets a callback that runs before every transaction. See `Options::pre_commit_hook`.
pub fn pre_commit_hook(
mut self,
hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
) -> Self {
self.options.pre_commit_hook = Some(Box::new(hook));
self
}
/// Sets a callback that runs after every transaction has been committed. See
/// `Options::post_commit_hook`.
pub fn post_commit_hook(
mut self,
hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
) -> Self {
self.options.post_commit_hook = Some(Box::new(hook));
self
}
/// Sets whether to do an initial reap of the graveyard at mount time. See
/// `Options::skip_initial_reap`. Defaults to `false`.
pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
self.options.skip_initial_reap = skip_initial_reap;
self
}
/// Sets the options for the journal.
pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
self.journal_options = journal_options;
self
}
/// Sets a method to be called immediately after creating the allocator.
pub fn on_new_allocator(
mut self,
on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
) -> Self {
self.on_new_allocator = Some(Box::new(on_new_allocator));
self
}
/// Sets a method to be called each time a new store is registered with `ObjectManager`.
pub fn on_new_store(
mut self,
on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
) -> Self {
self.on_new_store = Some(Box::new(on_new_store));
self
}
/// Enables or disables running fsck after every transaction. Defaults to `false`.
pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
self.fsck_after_every_transaction = fsck_after_every_transaction;
self
}
pub fn trim_config(
mut self,
delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
) -> Self {
self.options.trim_config = delay_and_interval;
self
}
/// Constructs an `FxFilesystem` object with the specified settings.
pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
let read_only = self.options.read_only;
if self.format && read_only {
bail!("Cannot initialize a filesystem as read-only");
}
let objects = Arc::new(ObjectManager::new(self.on_new_store));
let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
let mut fsck_after_every_transaction = None;
let mut filesystem_options = self.options;
if self.fsck_after_every_transaction {
let instance =
FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
fsck_after_every_transaction = Some(instance.clone());
filesystem_options.post_commit_hook =
Some(Box::new(move || instance.clone().run().boxed()));
}
if !read_only && !self.format {
// See comment in JournalRecord::DidFlushDevice for why we need to flush the device
// before replay.
device.flush().await.context("Device flush failed")?;
}
let filesystem = Arc::new(FxFilesystem {
device,
block_size,
objects: objects.clone(),
journal,
commit_mutex: futures::lock::Mutex::new(()),
lock_manager: LockManager::new(),
flush_task: Mutex::new(None),
trim_task: Mutex::new(None),
closed: AtomicBool::new(true),
shutdown_event: Event::new(),
trace: self.trace,
graveyard: Graveyard::new(objects.clone()),
completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
options: filesystem_options,
in_flight_transactions: AtomicU64::new(0),
transaction_limit_event: Event::new(),
});
filesystem.journal.set_trace(self.trace);
if self.format {
filesystem.journal.init_empty(filesystem.clone()).await?;
// Start the graveyard's background reaping task.
filesystem.graveyard.clone().reap_async();
// Create the root volume directory.
let root_store = filesystem.root_store();
root_store.set_trace(self.trace);
let root_directory =
Directory::open(&root_store, root_store.root_directory_object_id())
.await
.context("Unable to open root volume directory")?;
let mut transaction = filesystem
.clone()
.new_transaction(
lock_keys![LockKey::object(
root_store.store_object_id(),
root_directory.object_id()
)],
transaction::Options::default(),
)
.await?;
let volume_directory = root_directory
.create_child_dir(&mut transaction, VOLUMES_DIRECTORY, Default::default())
.await?;
transaction.commit().await?;
objects.set_volume_directory(volume_directory);
} else {
filesystem
.journal
.replay(filesystem.clone(), self.on_new_allocator)
.await
.context("Journal replay failed")?;
filesystem.root_store().set_trace(self.trace);
if !read_only {
// Queue all purged entries for tombstoning. Don't start the reaper yet because
// that can trigger a flush which can add more entries to the graveyard which might
// get caught in the initial reap and cause objects to be prematurely tombstoned.
for store in objects.unlocked_stores() {
filesystem.graveyard.initial_reap(&store).await?;
}
}
}
// This must be after we've formatted the filesystem; it will fail during format otherwise.
if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
fsck_after_every_transaction
.fs
.set(Arc::downgrade(&filesystem))
.unwrap_or_else(|_| unreachable!());
}
filesystem.closed.store(false, Ordering::SeqCst);
if !read_only {
// Start the background tasks.
filesystem.graveyard.clone().reap_async();
if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
filesystem.start_trim_task(delay, interval);
}
}
Ok(filesystem.into())
}
}
pub struct FxFilesystem {
block_size: u64,
objects: Arc<ObjectManager>,
journal: Arc<Journal>,
commit_mutex: futures::lock::Mutex<()>,
lock_manager: LockManager,
flush_task: Mutex<Option<fasync::Task<()>>>,
trim_task: Mutex<Option<fasync::Task<()>>>,
closed: AtomicBool,
// An event that is signalled when the filesystem starts to shut down.
shutdown_event: Event,
trace: bool,
graveyard: Arc<Graveyard>,
completed_transactions: UintProperty,
options: Options,
// The number of in-flight transactions which we will limit to MAX_IN_FLIGHT_TRANSACTIONS.
in_flight_transactions: AtomicU64,
// An event that is used to wake up tasks that are blocked due to the in-flight transaction
// limit.
transaction_limit_event: Event,
// NOTE: This *must* go last so that when users take the device from a closed filesystem, the
// filesystem has dropped all other members first (Rust drops members in declaration order).
device: DeviceHolder,
}
#[fxfs_trace::trace]
impl FxFilesystem {
pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
FxFilesystemBuilder::new().format(true).open(device).await
}
pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
FxFilesystemBuilder::new().open(device).await
}
pub fn root_parent_store(&self) -> Arc<ObjectStore> {
self.objects.root_parent_store()
}
pub async fn close(&self) -> Result<(), Error> {
assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
self.shutdown_event.notify(usize::MAX);
debug_assert_not_too_long!(self.graveyard.wait_for_reap());
let trim_task = self.trim_task.lock().unwrap().take();
if let Some(task) = trim_task {
debug_assert_not_too_long!(task);
}
self.journal.stop_compactions().await;
let sync_status =
self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await;
match &sync_status {
Ok(checkpoint) => info!(
"Filesystem closed (checkpoint: {})",
checkpoint.as_ref().unwrap().0.file_offset
),
Err(e) => error!(error = ?e, "Failed to sync filesystem; data may be lost"),
}
self.journal.terminate();
let flush_task = self.flush_task.lock().unwrap().take();
if let Some(task) = flush_task {
debug_assert_not_too_long!(task);
}
// Regardless of whether sync succeeds, we should close the device, since otherwise we will
// crash instead of exiting gracefully.
self.device().close().await.context("Failed to close device")?;
sync_status.map(|_| ())
}
pub fn device(&self) -> Arc<dyn Device> {
Arc::clone(&self.device)
}
pub fn root_store(&self) -> Arc<ObjectStore> {
self.objects.root_store()
}
pub fn allocator(&self) -> Arc<Allocator> {
self.objects.allocator()
}
pub fn object_manager(&self) -> &Arc<ObjectManager> {
&self.objects
}
pub fn journal(&self) -> &Arc<Journal> {
&self.journal
}
pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
self.journal.sync(options).await.map(|_| ())
}
pub fn block_size(&self) -> u64 {
self.block_size
}
pub fn get_info(&self) -> Info {
Info {
total_bytes: self.device.size(),
used_bytes: self.object_manager().allocator().get_used_bytes(),
}
}
pub fn super_block_header(&self) -> SuperBlockHeader {
self.journal.super_block_header()
}
pub fn graveyard(&self) -> &Arc<Graveyard> {
&self.graveyard
}
pub fn trace(&self) -> bool {
self.trace
}
pub fn options(&self) -> &Options {
&self.options
}
/// Returns a guard that must be taken before any transaction can commence. This guard takes a
/// shared lock on the filesystem. `fsck` will take an exclusive lock so that it can get a
/// consistent picture of the filesystem that it can verify. It is important that this lock is
/// acquired before *all* other locks. It is also important that this lock is not taken twice
/// by the same task since that can lead to deadlocks if another task tries to take a write
/// lock.
pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
unsafe fn extend_lifetime(guard: ReadGuard<'_>) -> ReadGuard<'static> {
std::mem::transmute(guard)
}
let guard = self.lock_manager.read_lock(lock_keys!(LockKey::Filesystem)).await;
// SAFETY: This is safe because we keep a reference to the filesystem until
// the guard is dropped. See `TxnGuard`.
let guard = unsafe { extend_lifetime(guard) };
TxnGuard { fs: self, _guard: Some(guard) }
}
pub async fn new_transaction<'a>(
self: Arc<Self>,
locks: LockKeys,
options: transaction::Options<'a>,
) -> Result<Transaction<'a>, Error> {
let guard = if options.txn_guard.is_some() {
// We can just pass None to guard. The 'a lifetime on Options and Transaction mean that
// the guard will remain in place until after the new Transaction is dropped.
TxnGuard { _guard: None, fs: self }
} else {
self.txn_guard().await
};
Transaction::new(guard, options, locks).await
}
#[trace]
pub async fn commit_transaction(
&self,
transaction: &mut Transaction<'_>,
callback: &mut (dyn FnMut(u64) + Send),
) -> Result<u64, Error> {
if let Some(hook) = self.options.pre_commit_hook.as_ref() {
hook(transaction)?;
}
debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
self.maybe_start_flush_task();
let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
let journal_offset = self.journal.commit(transaction).await?;
self.completed_transactions.add(1);
// For now, call the callback whilst holding the lock. Technically, we don't need to do
// that except if there's a post-commit-hook (which there usually won't be). We can
// consider changing this if we need to for performance, but we'd need to double check that
// callers don't depend on this.
callback(journal_offset);
if let Some(hook) = self.options.post_commit_hook.as_ref() {
hook().await;
}
Ok(journal_offset)
}
pub fn lock_manager(&self) -> &LockManager {
&self.lock_manager
}
pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
self.sub_transaction();
}
// If we placed a hold for metadata space, return it now.
if let MetadataReservation::Hold(hold_amount) =
std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
{
let hold = transaction
.allocator_reservation
.unwrap()
.reserve(0)
.expect("Zero should always succeed.");
hold.add(hold_amount);
}
self.objects.drop_transaction(transaction);
self.lock_manager.drop_transaction(transaction);
}
fn maybe_start_flush_task(&self) {
let mut flush_task = self.flush_task.lock().unwrap();
if flush_task.is_none() {
let journal = self.journal.clone();
*flush_task = Some(fasync::Task::spawn(journal.flush_task()));
}
}
// Returns the number of bytes trimmed.
async fn do_trim(&self) -> Result<usize, Error> {
const MAX_EXTENTS_PER_BATCH: usize = 8;
const MAX_EXTENT_SIZE: usize = 256 * 1024;
let mut offset = 0;
let mut bytes_trimmed = 0;
loop {
if self.closed.load(Ordering::Relaxed) {
info!("Filesystem is closed, nothing to trim");
return Ok(bytes_trimmed);
}
let allocator = self.allocator();
let trimmable_extents =
allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
for device_range in trimmable_extents.extents() {
self.device.trim(device_range.clone()).await?;
bytes_trimmed += device_range.length()? as usize;
}
if let Some(device_range) = trimmable_extents.extents().last() {
offset = device_range.end;
} else {
break;
}
}
Ok(bytes_trimmed)
}
fn start_trim_task(
self: &Arc<Self>,
delay: std::time::Duration,
interval: std::time::Duration,
) {
if !self.device.supports_trim() {
info!("Device does not support trim; not scheduling trimming");
return;
}
let this = self.clone();
let mut next_timer = delay;
*self.trim_task.lock().unwrap() = Some(fasync::Task::spawn(async move {
loop {
let shutdown_listener = this.shutdown_event.listen();
// Note that we need to check if the filesystem was closed after we start listening
// to the shutdown event, but before we start waiting on `timer`, because otherwise
// we might start listening on `shutdown_event` *after* the event was signaled, and
// so `shutdown_listener` will never fire, and this task will get stuck until
// `timer` expires.
if this.closed.load(Ordering::SeqCst) {
return;
}
futures::select!(
() = fasync::Timer::new(next_timer.clone()).fuse() => {},
() = shutdown_listener.fuse() => return,
);
let start_time = std::time::Instant::now();
info!("Starting trim...");
let res = this.do_trim().await;
let duration = std::time::Instant::now() - start_time;
match res {
Ok(bytes_trimmed) => info!("Trimmed {bytes_trimmed} bytes in {duration:?}"),
Err(e) => error!(?e, "Failed to trim"),
}
next_timer = interval.clone();
info!("Scheduled next trim after {:?}", next_timer);
}
}));
}
pub(crate) async fn reservation_for_transaction<'a>(
self: &Arc<Self>,
options: transaction::Options<'a>,
) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
if !options.skip_journal_checks {
self.maybe_start_flush_task();
self.journal.check_journal_space().await?;
}
// We support three options for metadata space reservation:
//
// 1. We can borrow from the filesystem's metadata reservation. This should only be
// be used on the understanding that eventually, potentially after a full compaction,
// there should be no net increase in space used. For example, unlinking an object
// should eventually decrease the amount of space used and setting most attributes
// should not result in any change.
//
// 2. A reservation is provided in which case we'll place a hold on some of it for
// metadata.
//
// 3. No reservation is supplied, so we try and reserve space with the allocator now,
// and will return NoSpace if that fails.
let mut hold = None;
let metadata_reservation = if options.borrow_metadata_space {
MetadataReservation::Borrowed
} else {
match options.allocator_reservation {
Some(reservation) => {
hold = Some(
reservation
.reserve(TRANSACTION_METADATA_MAX_AMOUNT)
.ok_or(FxfsError::NoSpace)?,
);
MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
}
None => {
let reservation = self
.allocator()
.reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
.ok_or(FxfsError::NoSpace)?;
MetadataReservation::Reservation(reservation)
}
}
};
Ok((metadata_reservation, options.allocator_reservation, hold))
}
pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
if skip_journal_checks {
self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
} else {
let inc = || {
let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
match self.in_flight_transactions.compare_exchange_weak(
in_flights,
in_flights + 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(x) => in_flights = x,
}
}
return false;
};
while !inc() {
let listener = self.transaction_limit_event.listen();
if inc() {
break;
}
listener.await;
}
}
}
pub(crate) fn sub_transaction(&self) {
let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
assert!(old != 0);
if old <= MAX_IN_FLIGHT_TRANSACTIONS {
self.transaction_limit_event.notify(usize::MAX);
}
}
}
pub struct TxnGuard<'a> {
// Elsewhere we rely on _guard being dropped before `fs`: see the `txn_guard` function above.
_guard: Option<ReadGuard<'a>>,
pub fs: Arc<FxFilesystem>,
}
/// Helper method for making a new filesystem.
pub async fn mkfs(device: DeviceHolder) -> Result<(), Error> {
let fs = FxFilesystem::new_empty(device).await?;
fs.close().await
}
/// Helper method for making a new filesystem with a single named volume.
/// This shouldn't be used in production; instead volumes should be created with the Volumes
/// protocol.
pub async fn mkfs_with_volume(
device: DeviceHolder,
volume_name: &str,
crypt: Option<Arc<dyn Crypt>>,
) -> Result<(), Error> {
let fs = FxFilesystem::new_empty(device).await?;
{
// expect instead of propagating errors here, since otherwise we could drop |fs| before
// close is called, which leads to confusing and unrelated error messages.
let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
root_volume.new_volume(volume_name, crypt).await.expect("Create volume failed");
}
fs.close().await?;
Ok(())
}
struct FsckAfterEveryTransaction {
fs: OnceCell<Weak<FxFilesystem>>,
old_hook: PostCommitHook,
}
impl FsckAfterEveryTransaction {
fn new(old_hook: PostCommitHook) -> Arc<Self> {
Arc::new(Self { fs: OnceCell::new(), old_hook })
}
async fn run(self: Arc<Self>) {
if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
let options = FsckOptions {
fail_on_warning: true,
no_lock: true,
quiet: true,
..Default::default()
};
fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
let object_manager = fs.object_manager();
for store in object_manager.unlocked_stores() {
let store_id = store.store_object_id();
if !object_manager.is_system_store(store_id) {
fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
.await
.expect("fsck_volume_with_options failed");
}
}
}
if let Some(old_hook) = self.old_hook.as_ref() {
old_hook().await;
}
}
}
#[cfg(test)]
mod tests {
use {
super::{FxFilesystem, FxFilesystemBuilder, SyncOptions},
crate::{
fsck::{fsck, fsck_volume},
log::*,
lsm_tree::{types::Item, Operation},
object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle, INVALID_OBJECT_ID},
object_store::{
directory::replace_child,
directory::Directory,
journal::JournalOptions,
transaction::{lock_keys, LockKey, Options},
volume::root_volume,
HandleOptions, ObjectStore,
},
},
fuchsia_async as fasync,
futures::{
future::join_all,
stream::{FuturesUnordered, TryStreamExt},
},
fxfs_insecure_crypto::InsecureCrypt,
rustc_hash::FxHashMap as HashMap,
std::{
sync::{Arc, Mutex},
time::Duration,
},
storage_device::{fake_device::FakeDevice, DeviceHolder},
};
const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
#[fuchsia::test(threads = 10)]
async fn test_compaction() {
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
// If compaction is not working correctly, this test will run out of space.
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let root_store = fs.root_store();
let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
.await
.expect("open failed");
let mut tasks = Vec::new();
for i in 0..2 {
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
root_store.store_object_id(),
root_directory.object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
let handle = root_directory
.create_child_file(&mut transaction, &format!("{}", i), None)
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
tasks.push(fasync::Task::spawn(async move {
const TEST_DATA: &[u8] = b"hello";
let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
buf.as_mut_slice().copy_from_slice(TEST_DATA);
for _ in 0..1500 {
handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
}
}));
}
join_all(tasks).await;
fs.sync(SyncOptions::default()).await.expect("sync failed");
fsck(fs.clone()).await.expect("fsck failed");
fs.close().await.expect("Close failed");
}
#[fuchsia::test(threads = 10)]
async fn test_replay_is_identical() {
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
// Reopen the store, but set reclaim size to a very large value which will effectively
// stop the journal from flushing and allows us to track all the mutations to the store.
fs.close().await.expect("close failed");
let device = fs.take_device().await;
device.reopen(false);
struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
impl<K: Clone, V: Clone> Mutations<K, V> {
fn new() -> Self {
Mutations(Mutex::new(Vec::new()))
}
fn push(&self, operation: Operation, item: &Item<K, V>) {
self.0.lock().unwrap().push((operation, item.clone()));
}
}
let open_fs = |device,
object_mutations: Arc<Mutex<HashMap<_, _>>>,
allocator_mutations: Arc<Mutations<_, _>>| async {
FxFilesystemBuilder::new()
.journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
.on_new_allocator(move |allocator| {
let allocator_mutations = allocator_mutations.clone();
allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
allocator_mutations.push(op, item)
})));
})
.on_new_store(move |store| {
let mutations = Arc::new(Mutations::new());
object_mutations
.lock()
.unwrap()
.insert(store.store_object_id(), mutations.clone());
store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
mutations.push(op, item)
})));
})
.open(device)
.await
.expect("open failed")
};
let allocator_mutations = Arc::new(Mutations::new());
let object_mutations = Arc::new(Mutex::new(HashMap::default()));
let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
let root_store = fs.root_store();
let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
.await
.expect("open failed");
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
root_store.store_object_id(),
root_directory.object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
let object = root_directory
.create_child_file(&mut transaction, "test", None)
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
// Append some data.
let buf = object.allocate_buffer(10000).await;
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
// Overwrite some data.
object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
// Truncate.
object.truncate(3000).await.expect("truncate failed");
// Delete the object.
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![
LockKey::object(root_store.store_object_id(), root_directory.object_id()),
LockKey::object(root_store.store_object_id(), object.object_id()),
],
Options::default(),
)
.await
.expect("new_transaction failed");
replace_child(&mut transaction, None, (&root_directory, "test"))
.await
.expect("replace_child failed");
transaction.commit().await.expect("commit failed");
// Finally tombstone the object.
root_store
.tombstone_object(object.object_id(), Options::default())
.await
.expect("tombstone failed");
// Now reopen and check that replay produces the same set of mutations.
fs.close().await.expect("close failed");
let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
let device = fs.take_device().await;
device.reopen(false);
let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
let replayed_allocator_mutations = Arc::new(Mutations::new());
let fs = open_fs(
device,
replayed_object_mutations.clone(),
replayed_allocator_mutations.clone(),
)
.await;
let m1 = object_mutations.lock().unwrap();
let m2 = replayed_object_mutations.lock().unwrap();
assert_eq!(m1.len(), m2.len());
for (store_id, mutations) in &*m1 {
let mutations = mutations.0.lock().unwrap();
let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock().unwrap();
assert_eq!(mutations.len(), replayed.len());
for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
assert_eq!(op1, op2);
assert_eq!(i1.key, i2.key);
assert_eq!(i1.value, i2.value);
assert_eq!(i1.sequence, i2.sequence);
}
}
let a1 = allocator_mutations.0.lock().unwrap();
let a2 = replayed_allocator_mutations.0.lock().unwrap();
assert_eq!(a1.len(), a2.len());
for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
assert_eq!(op1, op2);
assert_eq!(i1.key, i2.key);
assert_eq!(i1.value, i2.value);
assert_eq!(i1.sequence, i2.sequence);
}
assert_eq!(
fs.object_manager().metadata_reservation().amount(),
metadata_reservation_amount
);
}
#[fuchsia::test]
async fn test_max_in_flight_transactions() {
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let transactions = FuturesUnordered::new();
for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
}
let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
// Trying to create another one should be blocked.
let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
assert!(futures::poll!(&mut fut).is_pending());
// Dropping one should allow it to proceed.
transactions.pop();
assert!(futures::poll!(&mut fut).is_ready());
}
// If run on a single thread, the trim tasks starve out other work.
#[fuchsia::test(threads = 10)]
async fn test_continuously_trim() {
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
let fs = FxFilesystemBuilder::new()
.trim_config(Some((Duration::ZERO, Duration::ZERO)))
.format(true)
.open(device)
.await
.expect("open failed");
// Do a small sleep so trim has time to get going.
fasync::Timer::new(Duration::from_millis(10)).await;
// Create and delete a bunch of files whilst trim is ongoing. This just ensures that
// regular usage isn't affected by trim.
let root_store = fs.root_store();
let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
.await
.expect("open failed");
for _ in 0..100 {
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
root_store.store_object_id(),
root_directory.object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
let object = root_directory
.create_child_file(&mut transaction, "test", None)
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
{
let buf = object.allocate_buffer(1024).await;
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
}
std::mem::drop(object);
let mut transaction = root_directory
.acquire_context_for_replace(None, "test", true)
.await
.expect("acquire_context_for_replace failed")
.transaction;
replace_child(&mut transaction, None, (&root_directory, "test"))
.await
.expect("replace_child failed");
transaction.commit().await.expect("commit failed");
}
fs.close().await.expect("close failed");
}
#[fuchsia::test]
async fn test_power_fail() {
// This test randomly discards blocks, so we run it a few times to increase the chances
// of catching an issue in a single run.
for _ in 0..10 {
let (store_id, device, test_file_object_id) = {
let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
.await
.expect("sync failed");
let store = root_volume
.new_volume("test", Some(Arc::new(InsecureCrypt::new())))
.await
.expect("new_volume failed");
let root_directory = Directory::open(&store, store.root_directory_object_id())
.await
.expect("open failed");
// Create a number of files with the goal of using up more than one journal block.
async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
let fs = store.filesystem();
let root_directory = Directory::open(store, store.root_directory_object_id())
.await
.expect("open failed");
for i in 0..100 {
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
store.root_directory_object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
root_directory
.create_child_file(&mut transaction, &format!("{prefix} {i}"), None)
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
}
}
// Create one batch of files.
create_files(&store, "A").await;
// Create a file and write something to it. This will make sure there's a
// transaction present that includes a checksum.
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
store.root_directory_object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
let object = root_directory
.create_child_file(&mut transaction, "test", None)
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
let mut transaction =
object.new_transaction().await.expect("new_transaction failed");
let mut buffer = object.allocate_buffer(4096).await;
buffer.as_mut_slice().fill(0xed);
object
.txn_write(&mut transaction, 0, buffer.as_ref())
.await
.expect("txn_write failed");
transaction.commit().await.expect("commit failed");
// Create another batch of files.
create_files(&store, "B").await;
// Sync the device, but don't flush the device. We want to do this so we can
// randomly discard blocks below.
fs.sync(SyncOptions::default()).await.expect("sync failed");
// When we call `sync` above on the filesystem, it will pad the journal so that it
// will get written, but it doesn't wait for the write to occur. We wait for a
// short time here to give allow time for the journal to be written. Adding timers
// isn't great, but this test already isn't deterministic since we randomly discard
// blocks.
fasync::Timer::new(Duration::from_millis(10)).await;
(
store.store_object_id(),
fs.device().snapshot().expect("snapshot failed"),
object.object_id(),
)
};
// Randomly discard blocks since the last flush. This simulates what might happen in
// the case of power-loss. This will be an uncontrolled unmount.
device
.discard_random_since_last_flush()
.expect("discard_random_since_last_flush failed");
let fs = FxFilesystem::open(device).await.expect("open failed");
fsck(fs.clone()).await.expect("fsck failed");
let mut check_test_file = false;
// If we replayed and the store exists (i.e. the transaction that created the store
// made it out), start by running fsck on it.
let object_id = if fs.object_manager().store(store_id).is_some() {
fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
.await
.expect("fsck_volume failed");
// Now we want to create another file, unmount cleanly, and then finally check that
// the new file exists. This checks that we can continue to use the filesystem
// after an unclean unmount.
let store = root_volume(fs.clone())
.await
.expect("root_volume failed")
.volume("test", Some(Arc::new(InsecureCrypt::new())))
.await
.expect("volume failed");
let root_directory = Directory::open(&store, store.root_directory_object_id())
.await
.expect("open failed");
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
store.root_directory_object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
let object = root_directory
.create_child_file(&mut transaction, &format!("C"), None)
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
// Write again to the test file if it exists.
if let Ok(test_file) = ObjectStore::open_object(
&store,
test_file_object_id,
HandleOptions::default(),
None,
)
.await
{
// Check it has the contents we expect.
let mut buffer = test_file.allocate_buffer(4096).await;
let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
if bytes == 4096 {
let expected = [0xed; 4096];
assert_eq!(buffer.as_slice(), &expected);
} else {
// If the write didn't make it, the file should have zero bytes.
assert_eq!(bytes, 0);
}
// Modify the test file.
let mut transaction =
test_file.new_transaction().await.expect("new_transaction failed");
buffer.as_mut_slice().fill(0x37);
test_file
.txn_write(&mut transaction, 0, buffer.as_ref())
.await
.expect("txn_write failed");
transaction.commit().await.expect("commit failed");
check_test_file = true;
}
object.object_id()
} else {
INVALID_OBJECT_ID
};
// This will do a controlled unmount.
fs.close().await.expect("close failed");
let device = fs.take_device().await;
device.reopen(false);
let fs = FxFilesystem::open(device).await.expect("open failed");
fsck(fs.clone()).await.expect("fsck failed");
// As mentioned above, make sure that the object we created before the clean unmount
// exists.
if object_id != INVALID_OBJECT_ID {
fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
.await
.expect("fsck_volume failed");
let store = root_volume(fs.clone())
.await
.expect("root_volume failed")
.volume("test", Some(Arc::new(InsecureCrypt::new())))
.await
.expect("volume failed");
// We should be able to open the C object.
ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
.await
.expect("open_object failed");
// If we made the modification to the test file, check it.
if check_test_file {
info!("Checking test file for modification");
let test_file = ObjectStore::open_object(
&store,
test_file_object_id,
HandleOptions::default(),
None,
)
.await
.expect("open_object failed");
let mut buffer = test_file.allocate_buffer(4096).await;
assert_eq!(
test_file.read(0, buffer.as_mut()).await.expect("read failed"),
4096
);
let expected = [0x37; 4096];
assert_eq!(buffer.as_slice(), &expected);
}
}
fs.close().await.expect("close failed");
}
}
}