| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| errors::FxfsError, |
| lsm_tree::{ |
| merge::{Merger, MergerIterator}, |
| types::{ItemRef, LayerIterator}, |
| }, |
| object_store::{ |
| current_time, |
| record::{ |
| ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, |
| }, |
| transaction::{Mutation, Options, Transaction}, |
| ObjectStore, |
| }, |
| trace_duration, |
| }, |
| anyhow::{bail, Context, Error}, |
| fuchsia_async::{self as fasync}, |
| std::{ |
| ops::Bound, |
| sync::{Arc, Mutex}, |
| }, |
| }; |
| |
| enum Reaper { |
| Idle, |
| Paused, |
| Task(fasync::Task<()>), |
| } |
| |
| /// A graveyard exists as a place to park objects that should be deleted when they are no longer in |
| /// use. How objects enter and leave the graveyard is up to the caller to decide. The intention is |
| /// that at mount time, any objects in the graveyard will get removed. |
| pub struct Graveyard { |
| store: Arc<ObjectStore>, |
| object_id: u64, |
| reaper_task: Mutex<Reaper>, |
| } |
| |
| impl Graveyard { |
| pub fn store(&self) -> &Arc<ObjectStore> { |
| &self.store |
| } |
| |
| pub fn object_id(&self) -> u64 { |
| self.object_id |
| } |
| |
| /// Creates a graveyard object in `store`. |
| pub async fn create( |
| transaction: &mut Transaction<'_>, |
| store: &Arc<ObjectStore>, |
| ) -> Result<Arc<Graveyard>, Error> { |
| store.ensure_open().await?; |
| let object_id = store.get_next_object_id(); |
| let now = current_time(); |
| transaction.add( |
| store.store_object_id, |
| Mutation::insert_object( |
| ObjectKey::object(object_id), |
| ObjectValue::Object { |
| kind: ObjectKind::Graveyard, |
| attributes: ObjectAttributes { |
| creation_time: now.clone(), |
| modification_time: now, |
| }, |
| }, |
| ), |
| ); |
| Ok(Arc::new(Graveyard { |
| store: store.clone(), |
| object_id, |
| reaper_task: Mutex::new(Reaper::Idle), |
| })) |
| } |
| |
| /// Starts an asynchronous task to reap the graveyard for all entries older than |
| /// |journal_offset| (exclusive). |
| /// If a task is already started, this has no effect, even if that task was targeting an older |
| /// |journal_offset|. |
| pub fn reap_async(self: Arc<Self>, journal_offset: u64) { |
| let mut reaper_task = self.reaper_task.lock().unwrap(); |
| if let Reaper::Idle = &*reaper_task { |
| *reaper_task = |
| Reaper::Task(fasync::Task::spawn(self.clone().reap_task(journal_offset))); |
| } |
| } |
| |
| /// Returns a future which completes when the ongoing reap task (if it exists) completes. |
| pub async fn wait_for_reap(&self) { |
| let task = std::mem::replace(&mut *self.reaper_task.lock().unwrap(), Reaper::Paused); |
| if let Reaper::Task(task) = task { |
| task.await; |
| } |
| } |
| |
| async fn reap_task(self: Arc<Self>, journal_offset: u64) { |
| log::info!("Reaping graveyard starting, gen: {}", journal_offset); |
| trace_duration!("Graveyard::reap"); |
| match self.reap_task_inner(journal_offset).await { |
| Ok(deleted) => log::info!("Reaping graveyard done, removed {} elements", deleted), |
| Err(e) => log::error!("Reaping graveyard encountered error: {:?}", e), |
| }; |
| *self.reaper_task.lock().unwrap() = Reaper::Idle; |
| } |
| |
| async fn reap_task_inner(self: &Arc<Self>, journal_offset: u64) -> Result<usize, Error> { |
| let purge_items = { |
| let mut purge_items = vec![]; |
| let layer_set = self.store().tree().layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = self.iter(&mut merger).await.expect("iter failed"); |
| while let Some((store_id, id, offset)) = iter.get() { |
| if offset < journal_offset { |
| purge_items.push((store_id, id)); |
| } |
| iter.advance().await?; |
| } |
| purge_items |
| }; |
| let num_purged = purge_items.len(); |
| let fs = self.store().filesystem(); |
| let object_manager = fs.object_manager(); |
| for (store_id, id) in purge_items { |
| // Since the reaping might be happening early in the mount, some stores might not be |
| // open yet. |
| let store = object_manager |
| .open_store(store_id) |
| .await |
| .context(format!("Failed to open store {}", store_id))?; |
| // TODO(csuter): we shouldn't assume that all objects in the root stores use the |
| // metadata reservation. |
| let options = if store_id == object_manager.root_parent_store_object_id() |
| || store_id == object_manager.root_store_object_id() |
| { |
| Options { |
| skip_journal_checks: true, |
| skip_space_checks: true, |
| allocator_reservation: Some(fs.flush_reservation()), |
| ..Default::default() |
| } |
| } else { |
| Options { skip_journal_checks: true, skip_space_checks: true, ..Default::default() } |
| }; |
| store.tombstone(id, options).await.context("Failed to tombstone object")?; |
| } |
| Ok(num_purged) |
| } |
| |
| /// Opens a graveyard object in `store`. |
| pub async fn open(store: &Arc<ObjectStore>, object_id: u64) -> Result<Arc<Graveyard>, Error> { |
| store.ensure_open().await?; |
| if let ObjectItem { |
| value: ObjectValue::Object { kind: ObjectKind::Graveyard, .. }, .. |
| } = store.tree.find(&ObjectKey::object(object_id)).await?.ok_or(FxfsError::NotFound)? |
| { |
| Ok(Arc::new(Graveyard { |
| store: store.clone(), |
| object_id, |
| reaper_task: Mutex::new(Reaper::Idle), |
| })) |
| } else { |
| bail!("Found an object, but it's not a graveyard"); |
| } |
| } |
| |
| /// Adds an object to the graveyard. |
| pub fn add(&self, transaction: &mut Transaction<'_>, store_object_id: u64, object_id: u64) { |
| transaction.add( |
| self.store.store_object_id(), |
| Mutation::replace_or_insert_object( |
| ObjectKey::graveyard_entry(self.object_id, store_object_id, object_id), |
| ObjectValue::Some, |
| ), |
| ); |
| } |
| |
| /// Removes an object from the graveyard. |
| pub fn remove(&self, transaction: &mut Transaction<'_>, store_object_id: u64, object_id: u64) { |
| transaction.add( |
| self.store.store_object_id(), |
| Mutation::replace_or_insert_object( |
| ObjectKey::graveyard_entry(self.object_id, store_object_id, object_id), |
| ObjectValue::None, |
| ), |
| ); |
| } |
| |
| /// Returns an iterator that will return graveyard entries skipping deleted ones. Example |
| /// usage: |
| /// |
| /// let layer_set = graveyard.store().tree().layer_set(); |
| /// let mut merger = layer_set.merger(); |
| /// let mut iter = graveyard.iter(&mut merger).await?; |
| /// |
| pub async fn iter<'a, 'b>( |
| &self, |
| merger: &'a mut Merger<'b, ObjectKey, ObjectValue>, |
| ) -> Result<GraveyardIterator<'a, 'b>, Error> { |
| self.iter_from(merger, (0, 0)).await |
| } |
| |
| /// Like "iter", but seeks from a specific (store-id, object-id) tuple. Example usage: |
| /// |
| /// let layer_set = graveyard.store().tree().layer_set(); |
| /// let mut merger = layer_set.merger(); |
| /// let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await?; |
| /// |
| pub async fn iter_from<'a, 'b>( |
| &self, |
| merger: &'a mut Merger<'b, ObjectKey, ObjectValue>, |
| from: (u64, u64), |
| ) -> Result<GraveyardIterator<'a, 'b>, Error> { |
| let mut iter = merger |
| .seek(Bound::Included(&ObjectKey::graveyard_entry(self.object_id, from.0, from.1))) |
| .await?; |
| // Skip deleted entries. |
| // TODO(csuter): Remove this once we've developed a filtering iterator. |
| loop { |
| match iter.get() { |
| Some(ItemRef { |
| key: ObjectKey { object_id, .. }, |
| value: ObjectValue::None, |
| .. |
| }) if *object_id == self.object_id => {} |
| _ => break, |
| } |
| iter.advance().await?; |
| } |
| Ok(GraveyardIterator { object_id: self.object_id, iter }) |
| } |
| } |
| |
| pub struct GraveyardIterator<'a, 'b> { |
| object_id: u64, |
| iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>, |
| } |
| |
| impl GraveyardIterator<'_, '_> { |
| /// Returns a tuple (store_object_id, object_id, sequence). |
| pub fn get(&self) -> Option<(u64, u64, u64)> { |
| match self.iter.get() { |
| Some(ItemRef { |
| key: |
| ObjectKey { |
| object_id: oid, |
| data: ObjectKeyData::GraveyardEntry { store_object_id, object_id }, |
| }, |
| sequence, |
| .. |
| }) if *oid == self.object_id => Some((*store_object_id, *object_id, sequence)), |
| _ => None, |
| } |
| } |
| |
| pub async fn advance(&mut self) -> Result<(), Error> { |
| loop { |
| self.iter.advance().await?; |
| // Skip deleted entries. |
| match self.iter.get() { |
| Some(ItemRef { |
| key: ObjectKey { object_id, .. }, |
| value: ObjectValue::None, |
| .. |
| }) if *object_id == self.object_id => {} |
| _ => return Ok(()), |
| } |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::Graveyard, |
| crate::object_store::{ |
| filesystem::{Filesystem, FxFilesystem, SyncOptions}, |
| transaction::{Options, TransactionHandler}, |
| }, |
| fuchsia_async as fasync, |
| matches::assert_matches, |
| storage_device::{fake_device::FakeDevice, DeviceHolder}, |
| }; |
| |
| const TEST_DEVICE_BLOCK_SIZE: u32 = 512; |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_graveyard() { |
| let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE)); |
| let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed"); |
| let root_store = fs.root_store(); |
| |
| // Create and add two objects to the graveyard. |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| let graveyard = |
| Graveyard::create(&mut transaction, &root_store).await.expect("create failed"); |
| graveyard.add(&mut transaction, 2, 3); |
| graveyard.add(&mut transaction, 3, 4); |
| transaction.commit().await; |
| |
| // Reopen the graveyard and check that we see the objects we added. |
| let graveyard = |
| Graveyard::open(&root_store, graveyard.object_id()).await.expect("open failed"); |
| { |
| let layer_set = graveyard.store().tree().layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = graveyard.iter(&mut merger).await.expect("iter failed"); |
| assert_matches!(iter.get().expect("missing entry"), (2, 3, _)); |
| iter.advance().await.expect("advance failed"); |
| assert_matches!(iter.get().expect("missing entry"), (3, 4, _)); |
| iter.advance().await.expect("advance failed"); |
| assert_eq!(iter.get(), None); |
| } |
| |
| // Remove one of the objects. |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| graveyard.remove(&mut transaction, 3, 4); |
| transaction.commit().await; |
| |
| // Check that the graveyard has been updated as expected. |
| let layer_set = graveyard.store().tree().layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await.expect("iter failed"); |
| assert_matches!(iter.get().expect("missing entry"), (2, 3, _)); |
| iter.advance().await.expect("advance failed"); |
| assert_eq!(iter.get(), None); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_graveyard_sequences() { |
| let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE)); |
| let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed"); |
| let root_store = fs.root_store(); |
| |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| let child_store = root_store |
| .create_child_store(&mut transaction) |
| .await |
| .expect("create_child_store failed"); |
| transaction.commit().await; |
| |
| let graveyard = fs.object_manager().graveyard().unwrap(); |
| |
| // Create and add two objects to the graveyard, syncing in between. |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| graveyard.add(&mut transaction, root_store.store_object_id(), 1234); |
| transaction.commit().await; |
| |
| fs.sync(SyncOptions::default()).await.expect("sync failed"); |
| |
| let graveyard = |
| Graveyard::open(&root_store, graveyard.object_id()).await.expect("open failed"); |
| |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| graveyard.add(&mut transaction, child_store.store_object_id(), 5678); |
| transaction.commit().await; |
| |
| // Ensure the objects have a monotonically increasing sequence. |
| let graveyard = |
| Graveyard::open(&root_store, graveyard.object_id()).await.expect("open failed"); |
| let sequence = { |
| let layer_set = graveyard.store().tree().layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = graveyard.iter(&mut merger).await.expect("iter failed"); |
| let (store_id, id, sequence1) = iter.get().expect("Missing entry"); |
| assert_eq!(store_id, root_store.store_object_id()); |
| assert_eq!(id, 1234); |
| iter.advance().await.expect("advance failed"); |
| let (store_id, id, sequence2) = iter.get().expect("Missing entry"); |
| assert_eq!(store_id, child_store.store_object_id()); |
| assert_eq!(id, 5678); |
| iter.advance().await.expect("advance failed"); |
| assert_eq!(iter.get(), None); |
| |
| assert!(sequence1 < sequence2, "sequence1: {}, sequence2: {}", sequence1, sequence2); |
| |
| sequence2 |
| }; |
| |
| // Reap the graveyard of entries with offset < sequence, which should leave just the second |
| // entry. |
| graveyard.clone().reap_async(sequence); |
| graveyard.wait_for_reap().await; |
| |
| fs.sync(SyncOptions::default()).await.expect("sync failed"); |
| let layer_set = graveyard.store().tree().layer_set(); |
| let mut merger = layer_set.merger(); |
| merger.set_trace(true); |
| let mut iter = graveyard.iter(&mut merger).await.expect("iter failed"); |
| let mut items = vec![]; |
| while let Some((store_id, id, _)) = iter.get() { |
| items.push((store_id, id)); |
| iter.advance().await.expect("advance failed"); |
| } |
| assert_eq!(items, [(child_store.store_object_id(), 5678)]); |
| } |
| } |