| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| async_enter, |
| log::*, |
| lsm_tree::{ |
| merge::{Merger, MergerIterator}, |
| types::{ItemRef, LayerIterator}, |
| }, |
| object_handle::INVALID_OBJECT_ID, |
| object_store::{ |
| object_manager::ObjectManager, |
| object_record::{ |
| ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, Timestamp, |
| }, |
| transaction::{Mutation, Options, Transaction}, |
| ObjectStore, |
| }, |
| }, |
| anyhow::{Context, Error}, |
| fuchsia_async::{self as fasync}, |
| futures::{ |
| channel::{ |
| mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, |
| oneshot, |
| }, |
| StreamExt, |
| }, |
| std::{ |
| ops::Bound, |
| sync::{Arc, Mutex}, |
| }, |
| }; |
| |
| enum ReaperTask { |
| None, |
| Pending(UnboundedReceiver<Message>), |
| Running(fasync::Task<()>), |
| } |
| |
| /// A graveyard exists as a place to park objects that should be deleted when they are no longer in |
| /// use. How objects enter and leave the graveyard is up to the caller to decide. The intention is |
| /// that at mount time, any objects in the graveyard will get removed. Each object store has a |
| /// directory like object that contains a list of the objects within that store that are part of the |
| /// graveyard. A single instance of this Graveyard struct manages *all* stores. |
| pub struct Graveyard { |
| object_manager: Arc<ObjectManager>, |
| reaper_task: Mutex<ReaperTask>, |
| channel: UnboundedSender<Message>, |
| } |
| |
| enum Message { |
| // Tombstone the object identified by <store-id>, <object-id>. |
| Tombstone(u64, u64), |
| |
| // When the flush message is processed, notifies sender. This allows the receiver to know |
| // that all preceding tombstone messages have been processed. |
| Flush(oneshot::Sender<()>), |
| } |
| |
| impl Graveyard { |
| /// Creates a new instance of the graveyard manager. |
| pub fn new(object_manager: Arc<ObjectManager>) -> Arc<Self> { |
| let (sender, receiver) = unbounded(); |
| Arc::new(Graveyard { |
| object_manager, |
| reaper_task: Mutex::new(ReaperTask::Pending(receiver)), |
| channel: sender, |
| }) |
| } |
| |
| /// Creates a graveyard object in `store`. Returns the object ID for the graveyard object. |
| pub fn create(transaction: &mut Transaction<'_>, store: &ObjectStore) -> u64 { |
| let object_id = store.maybe_get_next_object_id(); |
| // This is OK because we only ever create a graveyard as we are creating a new store so |
| // maybe_get_next_object_id will never fail here due to a lack of an object ID cipher. |
| assert_ne!(object_id, INVALID_OBJECT_ID); |
| let now = Timestamp::now(); |
| transaction.add( |
| store.store_object_id, |
| Mutation::insert_object( |
| ObjectKey::object(object_id), |
| ObjectValue::Object { |
| kind: ObjectKind::Graveyard, |
| attributes: ObjectAttributes { |
| creation_time: now.clone(), |
| modification_time: now, |
| }, |
| }, |
| ), |
| ); |
| object_id |
| } |
| |
| /// Starts an asynchronous task to reap the graveyard for all entries older than |
| /// |journal_offset| (exclusive). |
| /// If a task is already started, this has no effect, even if that task was targeting an older |
| /// |journal_offset|. |
| pub fn reap_async(self: Arc<Self>) { |
| let mut reaper_task = self.reaper_task.lock().unwrap(); |
| if let ReaperTask::Pending(_) = &*reaper_task { |
| if let ReaperTask::Pending(receiver) = |
| std::mem::replace(&mut *reaper_task, ReaperTask::None) |
| { |
| *reaper_task = |
| ReaperTask::Running(fasync::Task::spawn(self.clone().reap_task(receiver))); |
| } else { |
| unreachable!(); |
| } |
| } |
| } |
| |
| /// Returns a future which completes when the ongoing reap task (if it exists) completes. |
| pub async fn wait_for_reap(&self) { |
| self.channel.close_channel(); |
| let task = std::mem::replace(&mut *self.reaper_task.lock().unwrap(), ReaperTask::None); |
| if let ReaperTask::Running(task) = task { |
| task.await; |
| } |
| } |
| |
| async fn reap_task(self: Arc<Self>, mut receiver: UnboundedReceiver<Message>) { |
| // Wait and process reap requests. |
| while let Some(message) = receiver.next().await { |
| match message { |
| Message::Tombstone(store_id, object_id) => { |
| if let Err(e) = self.tombstone(store_id, object_id).await { |
| error!(error = e.as_value(), store_id, oid = object_id, "Tombstone error"); |
| } |
| } |
| Message::Flush(sender) => { |
| let _ = sender.send(()); |
| } |
| } |
| } |
| } |
| |
| /// Performs the initial mount-time reap for the given store. |
| pub async fn initial_reap( |
| self: &Arc<Self>, |
| store: &Arc<ObjectStore>, |
| end_journal_offset: u64, |
| ) -> Result<usize, Error> { |
| async_enter!("Graveyard::initial_reap"); |
| let mut count = 0; |
| let layer_set = store.tree().layer_set(); |
| let mut merger = layer_set.merger(); |
| let graveyard_object_id = store.graveyard_directory_object_id(); |
| let mut iter = Self::iter(graveyard_object_id, &mut merger).await?; |
| while let Some((object_id, journal_offset)) = iter.get() { |
| if journal_offset >= end_journal_offset { |
| break; |
| } |
| self.queue_tombstone(store.store_object_id(), object_id); |
| count += 1; |
| iter.advance().await?; |
| } |
| Ok(count) |
| } |
| |
| /// Queues an object for tombstoning. |
| pub fn queue_tombstone(&self, store_id: u64, object_id: u64) { |
| let _ = self.channel.unbounded_send(Message::Tombstone(store_id, object_id)); |
| } |
| |
| /// Waits for all preceding queued tombstones to finish. |
| pub async fn flush(&self) { |
| let (sender, receiver) = oneshot::channel::<()>(); |
| self.channel.unbounded_send(Message::Flush(sender)).unwrap(); |
| receiver.await.unwrap(); |
| } |
| |
| async fn tombstone(&self, store_id: u64, object_id: u64) -> Result<(), Error> { |
| let store = self |
| .object_manager |
| .store(store_id) |
| .context(format!("Failed to get store {}", store_id))?; |
| // For now, it's safe to assume that all objects in the root parent and root store should |
| // return space to the metadata reservation, but we might have to revisit that if we end up |
| // with objects that are in other stores. |
| let options = if store_id == self.object_manager.root_parent_store_object_id() |
| || store_id == self.object_manager.root_store_object_id() |
| { |
| Options { |
| skip_journal_checks: true, |
| borrow_metadata_space: true, |
| allocator_reservation: Some(self.object_manager.metadata_reservation()), |
| ..Default::default() |
| } |
| } else { |
| Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() } |
| }; |
| store.tombstone(object_id, options).await.context("Failed to tombstone object") |
| } |
| |
| /// Returns an iterator that will return graveyard entries skipping deleted ones. Example |
| /// usage: |
| /// |
| /// let layer_set = graveyard.store().tree().layer_set(); |
| /// let mut merger = layer_set.merger(); |
| /// let mut iter = graveyard.iter(&mut merger).await?; |
| /// |
| pub async fn iter<'a, 'b>( |
| graveyard_object_id: u64, |
| merger: &'a mut Merger<'b, ObjectKey, ObjectValue>, |
| ) -> Result<GraveyardIterator<'a, 'b>, Error> { |
| Self::iter_from(merger, graveyard_object_id, 0).await |
| } |
| |
| /// Like "iter", but seeks from a specific (store-id, object-id) tuple. Example usage: |
| /// |
| /// let layer_set = graveyard.store().tree().layer_set(); |
| /// let mut merger = layer_set.merger(); |
| /// let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await?; |
| /// |
| async fn iter_from<'a, 'b>( |
| merger: &'a mut Merger<'b, ObjectKey, ObjectValue>, |
| graveyard_object_id: u64, |
| from: u64, |
| ) -> Result<GraveyardIterator<'a, 'b>, Error> { |
| GraveyardIterator::new( |
| graveyard_object_id, |
| merger |
| .seek(Bound::Included(&ObjectKey::graveyard_entry(graveyard_object_id, from))) |
| .await?, |
| ) |
| .await |
| } |
| } |
| |
| pub struct GraveyardIterator<'a, 'b> { |
| object_id: u64, |
| iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>, |
| } |
| |
| impl<'a, 'b> GraveyardIterator<'a, 'b> { |
| async fn new( |
| object_id: u64, |
| iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>, |
| ) -> Result<GraveyardIterator<'a, 'b>, Error> { |
| let mut iter = GraveyardIterator { object_id, iter }; |
| iter.skip_deleted_entries().await?; |
| Ok(iter) |
| } |
| |
| async fn skip_deleted_entries(&mut self) -> Result<(), Error> { |
| loop { |
| match self.iter.get() { |
| Some(ItemRef { |
| key: ObjectKey { object_id, .. }, |
| value: ObjectValue::None, |
| .. |
| }) if *object_id == self.object_id => {} |
| _ => return Ok(()), |
| } |
| self.iter.advance().await?; |
| } |
| } |
| |
| /// Returns a tuple (object_id, sequence). |
| pub fn get(&self) -> Option<(u64, u64)> { |
| match self.iter.get() { |
| Some(ItemRef { |
| key: ObjectKey { object_id: oid, data: ObjectKeyData::GraveyardEntry { object_id } }, |
| sequence, |
| .. |
| }) if *oid == self.object_id => Some((*object_id, sequence)), |
| _ => None, |
| } |
| } |
| |
| pub async fn advance(&mut self) -> Result<(), Error> { |
| self.iter.advance().await?; |
| self.skip_deleted_entries().await |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::Graveyard, |
| crate::{ |
| filesystem::{Filesystem, FxFilesystem, SyncOptions}, |
| object_store::transaction::{Options, TransactionHandler}, |
| }, |
| assert_matches::assert_matches, |
| fuchsia_async as fasync, |
| storage_device::{fake_device::FakeDevice, DeviceHolder}, |
| }; |
| |
| const TEST_DEVICE_BLOCK_SIZE: u32 = 512; |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_graveyard() { |
| let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE)); |
| let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed"); |
| let root_store = fs.root_store(); |
| |
| // Create and add two objects to the graveyard. |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| |
| root_store.add_to_graveyard(&mut transaction, 3); |
| root_store.add_to_graveyard(&mut transaction, 4); |
| transaction.commit().await.expect("commit failed"); |
| |
| // Check that we see the objects we added. |
| { |
| let layer_set = root_store.tree().layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger) |
| .await |
| .expect("iter failed"); |
| assert_matches!(iter.get().expect("missing entry"), (3, _)); |
| iter.advance().await.expect("advance failed"); |
| assert_matches!(iter.get().expect("missing entry"), (4, _)); |
| iter.advance().await.expect("advance failed"); |
| assert_eq!(iter.get(), None); |
| } |
| |
| // Remove one of the objects. |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| root_store.remove_from_graveyard(&mut transaction, 4); |
| transaction.commit().await.expect("commit failed"); |
| |
| // Check that the graveyard has been updated as expected. |
| let layer_set = root_store.tree().layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger) |
| .await |
| .expect("iter failed"); |
| assert_matches!(iter.get().expect("missing entry"), (3, _)); |
| iter.advance().await.expect("advance failed"); |
| assert_eq!(iter.get(), None); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_graveyard_sequences() { |
| let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE)); |
| let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed"); |
| let root_store = fs.root_store(); |
| |
| // Create and add two objects to the graveyard, syncing in between. |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| root_store.add_to_graveyard(&mut transaction, 1234); |
| transaction.commit().await.expect("commit failed"); |
| |
| fs.sync(SyncOptions::default()).await.expect("sync failed"); |
| |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| root_store.add_to_graveyard(&mut transaction, 5678); |
| transaction.commit().await.expect("commit failed"); |
| |
| // Ensure the objects have a monotonically increasing sequence. |
| let sequence = { |
| let layer_set = root_store.tree().layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger) |
| .await |
| .expect("iter failed"); |
| let (id, sequence1) = iter.get().expect("Missing entry"); |
| assert_eq!(id, 1234); |
| iter.advance().await.expect("advance failed"); |
| let (id, sequence2) = iter.get().expect("Missing entry"); |
| assert_eq!(id, 5678); |
| iter.advance().await.expect("advance failed"); |
| assert_eq!(iter.get(), None); |
| |
| assert!(sequence1 < sequence2, "sequence1: {}, sequence2: {}", sequence1, sequence2); |
| |
| sequence2 |
| }; |
| |
| // Reap the graveyard of entries with offset < sequence, which should leave just the second |
| // entry. |
| let graveyard = fs.graveyard(); |
| graveyard.clone().reap_async(); |
| graveyard.initial_reap(&root_store, sequence).await.expect("initial_reap failed"); |
| graveyard.wait_for_reap().await; |
| |
| fs.sync(SyncOptions::default()).await.expect("sync failed"); |
| let layer_set = root_store.tree().layer_set(); |
| let mut merger = layer_set.merger(); |
| merger.set_trace(true); |
| let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger) |
| .await |
| .expect("iter failed"); |
| let mut items = vec![]; |
| while let Some((id, _)) = iter.get() { |
| items.push(id); |
| iter.advance().await.expect("advance failed"); |
| } |
| assert_eq!(items, [5678]); |
| } |
| } |