| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| device::{Device, DeviceHolder}, |
| object_handle::INVALID_OBJECT_ID, |
| object_store::{ |
| allocator::Allocator, |
| graveyard::Graveyard, |
| journal::{super_block::SuperBlock, Journal, JournalCheckpoint}, |
| transaction::{ |
| AssociatedObject, LockKey, LockManager, Mutation, ReadGuard, Transaction, |
| TransactionHandler, TxnMutation, WriteGuard, |
| }, |
| ObjectStore, |
| }, |
| }, |
| anyhow::Error, |
| async_trait::async_trait, |
| fuchsia_async as fasync, |
| futures::channel::oneshot::{channel, Sender}, |
| once_cell::sync::OnceCell, |
| std::{ |
| collections::HashMap, |
| sync::{Arc, Mutex, RwLock}, |
| }, |
| }; |
| |
| #[async_trait] |
| pub trait Filesystem: TransactionHandler { |
| /// Informs the journaling system that a new store has been created so that when a transaction |
| /// is committed or replayed, mutations can be routed to the correct store. |
| fn register_store(&self, store: &Arc<ObjectStore>); |
| |
| /// Returns access to the undeyling device. |
| fn device(&self) -> Arc<dyn Device>; |
| |
| /// Returns the root store or panics if it is not available. |
| fn root_store(&self) -> Arc<ObjectStore>; |
| |
| /// Returns the allocator or panics if it is not available. |
| fn allocator(&self) -> Arc<dyn Allocator>; |
| |
| /// Returns the object manager for the filesystem. |
| fn object_manager(&self) -> Arc<ObjectManager>; |
| } |
| |
| pub struct ObjectManager { |
| objects: RwLock<Objects>, |
| } |
| |
| // We currently maintain strong references to all stores that have been opened, but there's no |
| // currently no mechanism for releasing stores that aren't being used. |
| struct Objects { |
| stores: HashMap<u64, Arc<ObjectStore>>, |
| root_parent_store_object_id: u64, |
| root_store_object_id: u64, |
| allocator_object_id: u64, |
| allocator: Option<Arc<dyn Allocator>>, |
| |
| // Records dependencies on the journal for objects i.e. an entry for object ID 1, would mean it |
| // has a dependency on journal records from that offset. |
| journal_file_checkpoints: HashMap<u64, JournalCheckpoint>, |
| |
| graveyard: Option<Arc<Graveyard>>, |
| } |
| |
| impl ObjectManager { |
| pub fn new() -> ObjectManager { |
| ObjectManager { |
| objects: RwLock::new(Objects { |
| stores: HashMap::new(), |
| root_parent_store_object_id: INVALID_OBJECT_ID, |
| root_store_object_id: INVALID_OBJECT_ID, |
| allocator_object_id: INVALID_OBJECT_ID, |
| allocator: None, |
| journal_file_checkpoints: HashMap::new(), |
| graveyard: None, |
| }), |
| } |
| } |
| |
| pub fn store_object_ids(&self) -> Vec<u64> { |
| self.objects.read().unwrap().stores.keys().cloned().collect() |
| } |
| |
| pub fn root_parent_store(&self) -> Arc<ObjectStore> { |
| let objects = self.objects.read().unwrap(); |
| objects.stores.get(&objects.root_parent_store_object_id).unwrap().clone() |
| } |
| |
| pub fn set_root_parent_store_object_id(&self, object_id: u64) { |
| let mut objects = self.objects.write().unwrap(); |
| assert!(objects.stores.contains_key(&object_id)); |
| objects.root_parent_store_object_id = object_id; |
| } |
| |
| pub fn register_store(&self, store: &Arc<ObjectStore>) { |
| let mut objects = self.objects.write().unwrap(); |
| assert_ne!(store.store_object_id(), objects.allocator_object_id); |
| assert!(objects.stores.insert(store.store_object_id(), store.clone()).is_none()); |
| } |
| |
| pub fn store(&self, store_object_id: u64) -> Option<Arc<ObjectStore>> { |
| self.objects.read().unwrap().stores.get(&store_object_id).cloned() |
| } |
| |
| pub fn set_root_store_object_id(&self, object_id: u64) { |
| let mut objects = self.objects.write().unwrap(); |
| assert!(objects.stores.contains_key(&object_id)); |
| objects.root_store_object_id = object_id; |
| } |
| |
| pub fn root_store(&self) -> Arc<ObjectStore> { |
| let objects = self.objects.read().unwrap(); |
| objects.stores.get(&objects.root_store_object_id).unwrap().clone() |
| } |
| |
| pub fn set_allocator(&self, allocator: Arc<dyn Allocator>) { |
| let mut objects = self.objects.write().unwrap(); |
| assert!(!objects.stores.contains_key(&allocator.object_id())); |
| objects.allocator_object_id = allocator.object_id(); |
| objects.allocator = Some(allocator.clone()); |
| } |
| |
| pub fn allocator(&self) -> Arc<dyn Allocator> { |
| self.objects.read().unwrap().allocator.clone().unwrap() |
| } |
| |
| /// The journaling system should call this when a mutation needs to be applied. |replay| |
| /// indicates whether this is for replay. |checkpoint| indicates the location in the journal |
| /// file for this mutation and is used to keep track of each object's dependencies on the |
| /// journal. |
| pub async fn apply_mutation( |
| &self, |
| object_id: u64, |
| mutation: Mutation, |
| replay: bool, |
| checkpoint: &JournalCheckpoint, |
| associated_object: Option<&dyn AssociatedObject>, |
| ) { |
| let object = { |
| let mut objects = self.objects.write().unwrap(); |
| objects.journal_file_checkpoints.entry(object_id).or_insert_with(|| checkpoint.clone()); |
| if object_id == objects.allocator_object_id { |
| Some(objects.allocator.clone().unwrap().as_mutations()) |
| } else { |
| objects.stores.get(&object_id).map(|x| x.clone() as Arc<dyn Mutations>) |
| } |
| } |
| .unwrap_or_else(|| self.root_store().lazy_open_store(object_id)); |
| // It is intentional that we call will_apply_mutation _after_ we updated the |
| // journal_file_checkpoints above, because ObjectFlush::will_apply_mutation might reset |
| // journal_file_checkpoints. |
| if let Some(associated_object) = associated_object { |
| associated_object.will_apply_mutation(&mutation); |
| } |
| object.apply_mutation(mutation, replay).await; |
| } |
| |
| // Drops a transaction. This is called automatically when a transaction is dropped. If the |
| // transaction has been committed, it should contain no mutations and so nothing will get rolled |
| // back. For each mutation, drop_mutation is called to allow for roll back (e.g. the allocator |
| // will unreserve allocations). |
| pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) { |
| for TxnMutation { object_id, mutation, .. } in std::mem::take(&mut transaction.mutations) { |
| self.object(object_id).map(|o| o.drop_mutation(mutation)); |
| } |
| } |
| |
| /// Returns the journal file offsets that each object depends on and the checkpoint for the |
| /// minimum offset. |
| pub fn journal_file_offsets(&self) -> (HashMap<u64, u64>, Option<JournalCheckpoint>) { |
| let objects = self.objects.read().unwrap(); |
| let mut min_checkpoint = None; |
| let mut offsets = HashMap::new(); |
| for (&object_id, checkpoint) in &objects.journal_file_checkpoints { |
| match &mut min_checkpoint { |
| None => min_checkpoint = Some(checkpoint), |
| Some(ref mut min_checkpoint) => { |
| if checkpoint.file_offset < min_checkpoint.file_offset { |
| *min_checkpoint = checkpoint; |
| } |
| } |
| } |
| offsets.insert(object_id, checkpoint.file_offset); |
| } |
| (offsets, min_checkpoint.cloned()) |
| } |
| |
| /// Returns true if the object identified by `object_id` is known to have updates recorded in |
| /// the journal that the object depends upon. |
| pub fn needs_flush(&self, object_id: u64) -> bool { |
| self.objects.read().unwrap().journal_file_checkpoints.contains_key(&object_id) |
| } |
| |
| pub fn graveyard(&self) -> Option<Arc<Graveyard>> { |
| self.objects.read().unwrap().graveyard.clone() |
| } |
| |
| pub fn register_graveyard(&self, graveyard: Arc<Graveyard>) { |
| self.objects.write().unwrap().graveyard = Some(graveyard); |
| } |
| |
| /// Flushes all known objects. This will then allow the journal space to be freed. |
| pub async fn flush(&self) -> Result<(), Error> { |
| let object_ids: Vec<_> = |
| self.objects.read().unwrap().journal_file_checkpoints.keys().cloned().collect(); |
| for object_id in object_ids { |
| self.object(object_id).unwrap().flush().await?; |
| } |
| Ok(()) |
| } |
| |
| fn object(&self, object_id: u64) -> Option<Arc<dyn Mutations>> { |
| let objects = self.objects.read().unwrap(); |
| if object_id == objects.allocator_object_id { |
| Some(objects.allocator.clone().unwrap().as_mutations()) |
| } else { |
| objects.stores.get(&object_id).map(|x| x.clone() as Arc<dyn Mutations>) |
| } |
| } |
| } |
| |
| /// ObjectFlush is used by objects to indicate some kind of event such that if successful, existing |
| /// mutation records are no longer required from the journal. For example, for object stores, it is |
| /// used when the in-memory layer is persisted since once that is done the records in the journal |
| /// are no longer required. Clients must make sure to call the commit function upon success; the |
| /// default is to roll back. |
| #[must_use] |
| pub struct ObjectFlush { |
| object_manager: Arc<ObjectManager>, |
| object_id: u64, |
| old_journal_file_checkpoint: OnceCell<JournalCheckpoint>, |
| } |
| |
| impl ObjectFlush { |
| pub fn new(object_manager: Arc<ObjectManager>, object_id: u64) -> Self { |
| Self { object_manager, object_id, old_journal_file_checkpoint: OnceCell::new() } |
| } |
| |
| /// This marks the point at which the flush is beginning. This begins a commitment (in the |
| /// absence of errors) to flush _all_ mutations that were made to the object prior to this point |
| /// and should therefore be called when appropriate locks are held (see the AssociatedObject |
| /// implementation below). Mutations that come after this will be preserved in the journal |
| /// until the next flush. This can panic if called more than once; it shouldn't be called |
| /// directly if being used as an AssociatedObject since will_apply_mutation will call it below. |
| pub fn begin(&self) { |
| if let Some(checkpoint) = self |
| .object_manager |
| .objects |
| .write() |
| .unwrap() |
| .journal_file_checkpoints |
| .remove(&self.object_id) |
| { |
| self.old_journal_file_checkpoint.set(checkpoint).unwrap(); |
| } |
| } |
| |
| pub fn commit(mut self) { |
| self.old_journal_file_checkpoint.take(); |
| } |
| } |
| |
| impl Drop for ObjectFlush { |
| fn drop(&mut self) { |
| if let Some(checkpoint) = self.old_journal_file_checkpoint.take() { |
| self.object_manager |
| .objects |
| .write() |
| .unwrap() |
| .journal_file_checkpoints |
| .insert(self.object_id, checkpoint); |
| } |
| } |
| } |
| |
| /// ObjectFlush can be used as an associated object in a transaction such that we begin the flush at |
| /// the appropriate time (whilst a lock is held on the journal). |
| impl AssociatedObject for ObjectFlush { |
| fn will_apply_mutation(&self, _: &Mutation) { |
| self.begin(); |
| } |
| } |
| |
| #[async_trait] |
| pub trait Mutations: Send + Sync { |
| /// Objects that use the journaling system to track mutations should implement this trait. This |
| /// method will get called when the transaction commits, which can either be during live |
| /// operation or during journal replay, in which case |replay| will be true. Also see |
| /// ObjectManager's apply_mutation method. |
| async fn apply_mutation(&self, mutation: Mutation, replay: bool); |
| |
| /// Called when a transaction fails to commit. |
| fn drop_mutation(&self, mutation: Mutation); |
| |
| /// Flushes in-memory changes to the device (to allow journal space to be freed). |
| async fn flush(&self) -> Result<(), Error>; |
| } |
| |
| #[derive(Default)] |
| pub struct SyncOptions {} |
| |
| pub struct FxFilesystem { |
| device: OnceCell<DeviceHolder>, |
| objects: Arc<ObjectManager>, |
| journal: Journal, |
| lock_manager: LockManager, |
| compaction_task: Mutex<Option<fasync::Task<()>>>, |
| device_sender: OnceCell<Sender<DeviceHolder>>, |
| } |
| |
| impl FxFilesystem { |
| pub async fn new_empty(device: DeviceHolder) -> Result<Arc<FxFilesystem>, Error> { |
| let objects = Arc::new(ObjectManager::new()); |
| let journal = Journal::new(objects.clone()); |
| let filesystem = Arc::new(FxFilesystem { |
| device: OnceCell::new(), |
| objects: objects.clone(), |
| journal, |
| lock_manager: LockManager::new(), |
| compaction_task: Mutex::new(None), |
| device_sender: OnceCell::new(), |
| }); |
| filesystem.device.set(device).unwrap_or_else(|_| unreachable!()); |
| filesystem.journal.init_empty(filesystem.clone()).await?; |
| Ok(filesystem) |
| } |
| |
| pub async fn open_with_trace( |
| device: DeviceHolder, |
| trace: bool, |
| ) -> Result<Arc<FxFilesystem>, Error> { |
| let objects = Arc::new(ObjectManager::new()); |
| let journal = Journal::new(objects.clone()); |
| journal.set_trace(trace); |
| let filesystem = Arc::new(FxFilesystem { |
| device: OnceCell::new(), |
| objects: objects.clone(), |
| journal, |
| lock_manager: LockManager::new(), |
| compaction_task: Mutex::new(None), |
| device_sender: OnceCell::new(), |
| }); |
| filesystem.device.set(device).unwrap_or_else(|_| unreachable!()); |
| filesystem.journal.replay(filesystem.clone()).await?; |
| Ok(filesystem) |
| } |
| |
| pub fn set_trace(&self, v: bool) { |
| self.journal.set_trace(v); |
| } |
| |
| pub async fn open(device: DeviceHolder) -> Result<Arc<FxFilesystem>, Error> { |
| Self::open_with_trace(device, false).await |
| } |
| |
| pub fn root_parent_store(&self) -> Arc<ObjectStore> { |
| self.objects.root_parent_store() |
| } |
| |
| pub fn root_store(&self) -> Arc<ObjectStore> { |
| self.objects.root_store() |
| } |
| |
| pub fn store(&self, object_id: u64) -> Option<Arc<ObjectStore>> { |
| self.objects.store(object_id) |
| } |
| |
| pub async fn sync(&self, options: SyncOptions) -> Result<(), Error> { |
| self.journal.sync(options).await |
| } |
| |
| pub async fn close(&self) -> Result<(), Error> { |
| self.wait_for_compaction_to_finish().await; |
| // Regardless of whether sync succeeds, we should close the device, since otherwise we will |
| // crash instead of exiting gracefully. |
| let sync_status = self.journal.sync(SyncOptions::default()).await; |
| if sync_status.is_err() { |
| log::error!("Failed to sync filesystem; data may be lost: {:?}", sync_status); |
| } |
| self.device().close().await.expect("Failed to close device"); |
| sync_status |
| } |
| |
| async fn compact(self: Arc<Self>) { |
| log::debug!("Compaction starting"); |
| if let Err(e) = self.objects.flush().await { |
| log::error!("Compaction ecnountered error: {}", e); |
| return; |
| } |
| if let Err(e) = self.journal.write_super_block().await { |
| log::error!("Error writing journal super-block: {}", e); |
| return; |
| } |
| *self.compaction_task.lock().unwrap() = None; |
| } |
| |
| /// Acquires a write lock for the given keys. |
| pub async fn write_lock(&self, lock_keys: &[LockKey]) -> WriteGuard<'_> { |
| self.lock_manager.write_lock(lock_keys).await |
| } |
| |
| /// Waits for filesystem to be dropped (so callers should ensure all direct and indirect |
| /// references are dropped) and returns the device. No attempt is made at a graceful shutdown. |
| pub async fn take_device(self: Arc<FxFilesystem>) -> DeviceHolder { |
| let (sender, receiver) = channel::<DeviceHolder>(); |
| self.device_sender |
| .set(sender) |
| .unwrap_or_else(|_| panic!("take_device should only be called once")); |
| std::mem::drop(self); |
| receiver.await.unwrap() |
| } |
| |
| pub fn super_block(&self) -> SuperBlock { |
| self.journal.super_block() |
| } |
| |
| async fn wait_for_compaction_to_finish(&self) { |
| let compaction_task = self.compaction_task.lock().unwrap().take(); |
| if let Some(compaction_task) = compaction_task { |
| compaction_task.await; |
| } |
| } |
| } |
| |
| impl Drop for FxFilesystem { |
| fn drop(&mut self) { |
| if let Some(sender) = self.device_sender.take() { |
| // We don't care if this fails to send. |
| let _ = sender.send(self.device.take().unwrap()); |
| } |
| } |
| } |
| |
| #[async_trait] |
| impl Filesystem for FxFilesystem { |
| fn register_store(&self, store: &Arc<ObjectStore>) { |
| self.objects.register_store(store); |
| } |
| |
| fn device(&self) -> Arc<dyn Device> { |
| Arc::clone(self.device.get().unwrap()) |
| } |
| |
| fn root_store(&self) -> Arc<ObjectStore> { |
| self.objects.root_store() |
| } |
| |
| fn allocator(&self) -> Arc<dyn Allocator> { |
| self.objects.allocator() |
| } |
| |
| fn object_manager(&self) -> Arc<ObjectManager> { |
| self.objects.clone() |
| } |
| } |
| |
| #[async_trait] |
| impl TransactionHandler for FxFilesystem { |
| async fn new_transaction<'a>( |
| self: Arc<Self>, |
| locks: &[LockKey], |
| ) -> Result<Transaction<'a>, Error> { |
| Ok(Transaction::new(self, &[LockKey::Filesystem], locks).await) |
| } |
| |
| async fn commit_transaction(self: Arc<Self>, transaction: Transaction<'_>) { |
| self.lock_manager.commit_prepare(&transaction).await; |
| self.journal.commit(transaction).await; |
| let mut compaction_task = self.compaction_task.lock().unwrap(); |
| if compaction_task.is_none() && self.journal.should_flush() { |
| *compaction_task = Some(fasync::Task::spawn(self.clone().compact())); |
| } |
| } |
| |
| fn drop_transaction(&self, transaction: &mut Transaction<'_>) { |
| self.objects.drop_transaction(transaction); |
| self.lock_manager.drop_transaction(transaction); |
| } |
| |
| async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a> { |
| self.lock_manager.read_lock(lock_keys).await |
| } |
| } |
| |
| impl AsRef<LockManager> for FxFilesystem { |
| fn as_ref(&self) -> &LockManager { |
| &self.lock_manager |
| } |
| } |
| |
| // TODO(csuter): How do we ensure sync prior to drop? |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| crate::{ |
| device::DeviceHolder, |
| object_handle::{ObjectHandle, ObjectHandleExt}, |
| object_store::{ |
| directory::Directory, |
| filesystem::{FxFilesystem, SyncOptions}, |
| fsck::fsck, |
| transaction::TransactionHandler, |
| }, |
| testing::fake_device::FakeDevice, |
| }, |
| fuchsia_async as fasync, |
| futures::future::join_all, |
| }; |
| |
| const TEST_DEVICE_BLOCK_SIZE: u32 = 512; |
| |
| #[fasync::run(10, test)] |
| async fn test_compaction() { |
| let device = DeviceHolder::new(FakeDevice::new(4096, TEST_DEVICE_BLOCK_SIZE)); |
| |
| // If compaction is not working correctly, this test will run out of space. |
| let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed"); |
| let root_store = fs.root_store(); |
| let root_directory = Directory::open(&root_store, root_store.root_directory_object_id()) |
| .await |
| .expect("open failed"); |
| |
| let mut tasks = Vec::new(); |
| for i in 0..2 { |
| let mut transaction = |
| fs.clone().new_transaction(&[]).await.expect("new_transaction failed"); |
| let handle = root_directory |
| .create_child_file(&mut transaction, &format!("{}", i)) |
| .await |
| .expect("create_child_file failed"); |
| transaction.commit().await; |
| tasks.push(fasync::Task::spawn(async move { |
| const TEST_DATA: &[u8] = b"hello"; |
| let mut buf = handle.allocate_buffer(TEST_DATA.len()); |
| buf.as_mut_slice().copy_from_slice(TEST_DATA); |
| for _ in 0..5000 { |
| handle.write(0, buf.as_ref()).await.expect("write failed"); |
| } |
| })); |
| } |
| join_all(tasks).await; |
| fs.sync(SyncOptions::default()).await.expect("sync failed"); |
| |
| fsck(&fs).await.expect("fsck failed"); |
| } |
| } |