| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| // This module is responsible for flushing (a.k.a. compacting) the object store trees. |
| |
| use { |
| crate::{ |
| crypt::{KeyPurpose, StreamCipher, UnwrappedKey}, |
| debug_assert_not_too_long, |
| log::*, |
| lsm_tree::{ |
| layers_from_handles, |
| types::{BoxedLayerIterator, ItemRef, LayerIteratorFilter}, |
| LSMTree, |
| }, |
| object_handle::{ObjectHandle, WriteObjectHandle, INVALID_OBJECT_ID}, |
| object_store::{ |
| extent_record::ExtentValue, |
| layer_size_from_encrypted_mutations_size, |
| object_manager::{ObjectManager, ReservationUpdate}, |
| object_record::{ObjectKey, ObjectValue}, |
| store_object_handle::DirectWriter, |
| transaction::{AssociatedObject, LockKey, Mutation}, |
| tree, AssocObj, CachingObjectHandle, HandleOptions, ObjectStore, Options, StoreInfo, |
| MAX_ENCRYPTED_MUTATIONS_SIZE, |
| }, |
| serialized_types::{Version, VersionedLatest}, |
| trace_duration, |
| }, |
| anyhow::Error, |
| async_trait::async_trait, |
| once_cell::sync::OnceCell, |
| std::sync::atomic::Ordering, |
| }; |
| |
| pub enum Reason { |
| /// Journal memory or space pressure. |
| Journal, |
| |
| /// After unlock and replay of encrypted mutations. |
| Unlock, |
| |
| /// Just prior to locking a store. |
| Lock, |
| } |
| |
| impl ObjectStore { |
| pub async fn flush_with_reason(&self, reason: Reason) -> Result<Version, Error> { |
| trace_duration!("ObjectStore::flush", "store_object_id" => self.store_object_id); |
| if self.parent_store.is_none() { |
| // Early exit, but still return the earliest version used by a struct in the tree |
| return Ok(self.tree.get_earliest_version()); |
| } |
| let filesystem = self.filesystem(); |
| let object_manager = filesystem.object_manager(); |
| |
| let keys = [LockKey::flush(self.store_object_id())]; |
| let _guard = debug_assert_not_too_long!(filesystem.write_lock(&keys)); |
| |
| match reason { |
| Reason::Unlock => { |
| // If we're unlocking, only flush if there are encrypted mutations currently stored |
| // in a file. We don't worry if they're in memory because a flush should get |
| // triggered when the journal gets full. |
| if self.store_info().encrypted_mutations_object_id == INVALID_OBJECT_ID { |
| // TODO(fxbug.dev/97078): Add earliest_version support for encrypted mutations. |
| // Early exit, but still return the earliest version used by a struct in the |
| // tree. |
| return Ok(self.tree.get_earliest_version()); |
| } |
| } |
| Reason::Journal | Reason::Lock => { |
| if !object_manager.needs_flush(self.store_object_id) { |
| // Early exit, but still return the earliest version used by a struct in the |
| // tree. |
| return Ok(self.tree.get_earliest_version()); |
| } |
| } |
| } |
| |
| let trace = self.trace.load(Ordering::Relaxed); |
| if trace { |
| info!(store_id = self.store_object_id(), "OS: begin flush"); |
| } |
| |
| let parent_store = self.parent_store.as_ref().unwrap(); |
| |
| let reservation = object_manager.metadata_reservation(); |
| let txn_options = Options { |
| skip_journal_checks: true, |
| borrow_metadata_space: true, |
| allocator_reservation: Some(reservation), |
| ..Default::default() |
| }; |
| |
| let roll_metadata_key = self |
| .mutations_cipher |
| .lock() |
| .unwrap() |
| .as_ref() |
| .map(|cipher| { |
| cipher.offset() >= self.filesystem().options().roll_metadata_key_byte_count |
| }) |
| .unwrap_or(false); |
| if roll_metadata_key { |
| let (wrapped_key, unwrapped_key) = self |
| .crypt() |
| .unwrap() |
| .create_key(self.store_object_id, KeyPurpose::Metadata) |
| .await?; |
| struct SetMutationsCipherCallback<'a>(&'a ObjectStore, UnwrappedKey); |
| impl AssociatedObject for SetMutationsCipherCallback<'_> { |
| fn will_apply_mutation( |
| &self, |
| _mutation: &Mutation, |
| _object_id: u64, |
| _manager: &ObjectManager, |
| ) { |
| *self.0.mutations_cipher.lock().unwrap() = Some(StreamCipher::new(&self.1, 0)); |
| } |
| } |
| let set_cipher_callback = SetMutationsCipherCallback(self, unwrapped_key); |
| let mut transaction = filesystem.clone().new_transaction(&[], txn_options).await?; |
| transaction.add_with_object( |
| self.store_object_id(), |
| Mutation::update_mutations_key(wrapped_key), |
| AssocObj::Borrowed(&set_cipher_callback), |
| ); |
| transaction.commit().await?; |
| } |
| |
| struct StoreInfoSnapshot<'a> { |
| store: &'a ObjectStore, |
| store_info: OnceCell<StoreInfo>, |
| } |
| impl AssociatedObject for StoreInfoSnapshot<'_> { |
| fn will_apply_mutation( |
| &self, |
| _mutation: &Mutation, |
| _object_id: u64, |
| _manager: &ObjectManager, |
| ) { |
| let mut store_info = self.store.store_info(); |
| |
| // Capture the offset in the cipher stream. |
| let mutations_cipher = self.store.mutations_cipher.lock().unwrap(); |
| if let Some(cipher) = mutations_cipher.as_ref() { |
| store_info.mutations_cipher_offset = cipher.offset(); |
| } |
| |
| // This will capture object IDs that might be in transactions not yet committed. In |
| // theory, we could do better than this but it's not worth the effort. |
| store_info.last_object_id = self.store.last_object_id.lock().unwrap().id; |
| |
| self.store_info.set(store_info).unwrap(); |
| } |
| } |
| |
| let store_info_snapshot = StoreInfoSnapshot { store: self, store_info: OnceCell::new() }; |
| |
| // The BeginFlush mutation must be within a transaction that has no impact on StoreInfo |
| // since we want to get an accurate snapshot of StoreInfo. |
| let mut transaction = filesystem.clone().new_transaction(&[], txn_options).await?; |
| transaction.add_with_object( |
| self.store_object_id(), |
| Mutation::BeginFlush, |
| AssocObj::Borrowed(&store_info_snapshot), |
| ); |
| transaction.commit().await?; |
| |
| #[cfg(test)] |
| if roll_metadata_key { |
| let fs = self.filesystem(); |
| if let Some(hook) = &fs.options().after_metadata_key_roll_hook { |
| hook(fs.clone()).await?; |
| } |
| } |
| |
| // There is a transaction to create objects at the start and then another transaction at the |
| // end. Between those two transactions, there are transactions that write to the files. In |
| // the first transaction, objects are created in the graveyard. Upon success, the objects |
| // are removed from the graveyard. |
| let mut transaction = filesystem.clone().new_transaction(&[], txn_options).await?; |
| |
| let reservation_update: ReservationUpdate; // Must live longer than end_transaction. |
| let mut end_transaction = filesystem.clone().new_transaction(&[], txn_options).await?; |
| |
| #[async_trait] |
| impl tree::MajorCompactable<ObjectKey, ObjectValue> for LSMTree<ObjectKey, ObjectValue> { |
| async fn major_iter( |
| iter: BoxedLayerIterator<'_, ObjectKey, ObjectValue>, |
| ) -> Result<BoxedLayerIterator<'_, ObjectKey, ObjectValue>, Error> { |
| Ok(Box::new( |
| iter.filter(|item: ItemRef<'_, _, _>| match item { |
| // Object Tombstone. |
| ItemRef { value: ObjectValue::None, .. } => false, |
| // Deleted extent. |
| ItemRef { value: ObjectValue::Extent(ExtentValue::None), .. } => false, |
| _ => true, |
| }) |
| .await?, |
| )) |
| } |
| } |
| |
| let mut new_store_info = store_info_snapshot.store_info.into_inner().unwrap(); |
| let mut total_layer_size = 0; |
| |
| let mut old_encrypted_mutations_object_id = INVALID_OBJECT_ID; |
| |
| let (old_layers, new_layers) = if self.is_locked() { |
| // The store is locked so we need to either write our encrypted mutations to a new file, |
| // or append them to an existing one. |
| let handle = if new_store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID { |
| let handle = ObjectStore::create_object( |
| parent_store, |
| &mut transaction, |
| HandleOptions { skip_journal_checks: true, ..Default::default() }, |
| None, |
| ) |
| .await?; |
| let oid = handle.object_id(); |
| new_store_info.encrypted_mutations_object_id = oid; |
| parent_store.add_to_graveyard(&mut transaction, oid); |
| parent_store.remove_from_graveyard(&mut end_transaction, oid); |
| handle |
| } else { |
| ObjectStore::open_object( |
| parent_store, |
| new_store_info.encrypted_mutations_object_id, |
| HandleOptions { skip_journal_checks: true, ..Default::default() }, |
| None, |
| ) |
| .await? |
| }; |
| transaction.commit().await?; |
| |
| // Append the encrypted mutations. |
| let mut buffer = handle.allocate_buffer(MAX_ENCRYPTED_MUTATIONS_SIZE); |
| let mut cursor = std::io::Cursor::new(buffer.as_mut_slice()); |
| self.lock_state |
| .lock() |
| .unwrap() |
| .encrypted_mutations() |
| .unwrap() |
| .serialize_with_version(&mut cursor)?; |
| let len = cursor.position() as usize; |
| handle.write_or_append(None, buffer.subslice(..len)).await?; |
| |
| total_layer_size += layer_size_from_encrypted_mutations_size(handle.get_size()) |
| + self |
| .tree |
| .immutable_layer_set() |
| .layers |
| .iter() |
| .map(|l| l.handle().map(ObjectHandle::get_size).unwrap_or(0)) |
| .sum::<u64>(); |
| |
| // There are no changes to the layers in this case. |
| (Vec::new(), None) |
| } else { |
| // Create and write a new layer, compacting existing layers. |
| let new_object_tree_layer = ObjectStore::create_object( |
| parent_store, |
| &mut transaction, |
| HandleOptions { skip_journal_checks: true, ..Default::default() }, |
| self.crypt().as_deref(), |
| ) |
| .await?; |
| let writer = DirectWriter::new(&new_object_tree_layer, txn_options); |
| let new_object_tree_layer_object_id = new_object_tree_layer.object_id(); |
| parent_store.add_to_graveyard(&mut transaction, new_object_tree_layer_object_id); |
| parent_store |
| .remove_from_graveyard(&mut end_transaction, new_object_tree_layer_object_id); |
| |
| transaction.commit().await?; |
| let (layers_to_keep, old_layers) = tree::flush(&self.tree, writer).await?; |
| |
| let mut new_layers = |
| layers_from_handles(Box::new([CachingObjectHandle::new(new_object_tree_layer)])) |
| .await?; |
| new_layers.extend(layers_to_keep.iter().map(|l| (*l).clone())); |
| |
| new_store_info.layers = Vec::new(); |
| for layer in &new_layers { |
| if let Some(handle) = layer.handle() { |
| new_store_info.layers.push(handle.object_id()); |
| } |
| } |
| |
| // Move the existing layers we're compacting to the graveyard at the end. |
| for layer in &old_layers { |
| if let Some(handle) = layer.handle() { |
| parent_store.add_to_graveyard(&mut end_transaction, handle.object_id()); |
| } |
| } |
| |
| let object_tree_handles = new_layers.iter().map(|l| l.handle()); |
| total_layer_size += object_tree_handles |
| .map(|h| h.map(ObjectHandle::get_size).unwrap_or(0)) |
| .sum::<u64>(); |
| |
| old_encrypted_mutations_object_id = std::mem::replace( |
| &mut new_store_info.encrypted_mutations_object_id, |
| INVALID_OBJECT_ID, |
| ); |
| if old_encrypted_mutations_object_id != INVALID_OBJECT_ID { |
| parent_store |
| .add_to_graveyard(&mut end_transaction, old_encrypted_mutations_object_id); |
| } |
| |
| (old_layers, Some(new_layers)) |
| }; |
| |
| let mut serialized_info = Vec::new(); |
| new_store_info.serialize_with_version(&mut serialized_info)?; |
| let mut buf = self.device.allocate_buffer(serialized_info.len()); |
| buf.as_mut_slice().copy_from_slice(&serialized_info[..]); |
| |
| self.store_info_handle |
| .get() |
| .unwrap() |
| .txn_write(&mut end_transaction, 0u64, buf.as_ref()) |
| .await?; |
| |
| reservation_update = |
| ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size)); |
| |
| end_transaction.add_with_object( |
| self.store_object_id(), |
| Mutation::EndFlush, |
| AssocObj::Borrowed(&reservation_update), |
| ); |
| |
| if trace { |
| info!( |
| store_id = self.store_object_id(), |
| old_layer_count = old_layers.len(), |
| new_layer_count = new_layers.as_ref().map(|v| v.len()).unwrap_or(0), |
| total_layer_size, |
| "OS: compacting" |
| ); |
| } |
| end_transaction |
| .commit_with_callback(|_| { |
| let mut store_info = self.store_info.lock().unwrap(); |
| let info = store_info.info_mut().unwrap(); |
| info.layers = new_store_info.layers; |
| info.encrypted_mutations_object_id = new_store_info.encrypted_mutations_object_id; |
| info.mutations_cipher_offset = new_store_info.mutations_cipher_offset; |
| |
| if let Some(layers) = new_layers { |
| self.tree.set_layers(layers); |
| } |
| if let Some(m) = self.lock_state.lock().unwrap().encrypted_mutations_mut() { |
| std::mem::take(m); |
| } |
| }) |
| .await?; |
| |
| // Now close the layers and purge them. |
| for layer in old_layers { |
| let object_id = layer.handle().map(|h| h.object_id()); |
| layer.close_layer().await; |
| if let Some(object_id) = object_id { |
| parent_store.tombstone(object_id, txn_options).await?; |
| } |
| } |
| |
| if old_encrypted_mutations_object_id != INVALID_OBJECT_ID { |
| parent_store.tombstone(old_encrypted_mutations_object_id, txn_options).await?; |
| } |
| |
| if trace { |
| info!(store_id = self.store_object_id(), "OS: end flush"); |
| } |
| |
| // Return the earliest version used by a struct in the tree |
| Ok(self.tree.get_earliest_version()) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| crate::{ |
| crypt::insecure::InsecureCrypt, |
| errors::FxfsError, |
| filesystem::{self, Filesystem, FxFilesystem, SyncOptions}, |
| object_store::{ |
| directory::Directory, |
| transaction::{Options, TransactionHandler}, |
| volume::root_volume, |
| }, |
| }, |
| anyhow::bail, |
| fuchsia_async as fasync, |
| futures::FutureExt, |
| std::sync::{ |
| atomic::{AtomicBool, Ordering}, |
| Arc, |
| }, |
| storage_device::{fake_device::FakeDevice, DeviceHolder}, |
| }; |
| |
| async fn run_key_roll_test(with_failure: bool, flush_before_unlock: bool) { |
| let device = DeviceHolder::new(FakeDevice::new(8192, 1024)); |
| let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed"); |
| let store_id = { |
| let root_volume = root_volume(&fs).await.expect("root_volume failed"); |
| root_volume |
| .new_volume("test", Some(Arc::new(InsecureCrypt::new()))) |
| .await |
| .expect("new_volume failed") |
| .store_object_id() |
| }; |
| |
| fs.close().await.expect("close failed"); |
| let device = fs.take_device().await; |
| device.reopen(); |
| |
| let stop = Arc::new(AtomicBool::new(false)); |
| |
| let stop_clone = stop.clone(); |
| |
| let filesystem_options = filesystem::Options { |
| roll_metadata_key_byte_count: 512 * 1024, |
| after_metadata_key_roll_hook: Some(Box::new(move |fs: Arc<dyn Filesystem>| { |
| stop_clone.store(true, Ordering::SeqCst); |
| async move { |
| // Sync the filesystem so any new children that were created with the new cipher |
| // will be there when the journal is replayed. |
| fs.sync(SyncOptions::default()).await.expect("sync failed"); |
| if with_failure { |
| // Fail the compaction so that the updated store-info file doesn't make it. |
| bail!(FxfsError::Internal); |
| } else { |
| Ok(()) |
| } |
| } |
| .boxed() |
| })), |
| ..Default::default() |
| }; |
| |
| let fs = FxFilesystem::open_with_options( |
| device, |
| filesystem::OpenOptions { filesystem_options, ..Default::default() }, |
| ) |
| .await |
| .expect("open failed"); |
| |
| { |
| let store = fs.object_manager().store(store_id).expect("store not found"); |
| store.unlock(Arc::new(InsecureCrypt::new())).await.expect("unlock failed"); |
| |
| // Keep writing until we're told to stop. |
| let root_dir = Directory::open(&store, store.root_directory_object_id()) |
| .await |
| .expect("open failed"); |
| |
| let mut last_mutations_cipher_offset = 0; |
| let mut cipher_rolled = false; |
| let mut i = 0; |
| loop { |
| // These can fail if with_failure is true. |
| if let Ok(mut transaction) = |
| fs.clone().new_transaction(&[], Options::default()).await |
| { |
| if let Ok(_) = |
| root_dir.create_child_file(&mut transaction, &format!("{:<200}", i)).await |
| { |
| i += 1; |
| let _ = transaction.commit().await; |
| } |
| } |
| let done = stop.load(Ordering::SeqCst); |
| let cipher_offset = |
| store.mutations_cipher.lock().unwrap().as_ref().unwrap().offset(); |
| if cipher_offset < last_mutations_cipher_offset { |
| cipher_rolled = true; |
| } |
| if done { |
| break; |
| } |
| last_mutations_cipher_offset = cipher_offset; |
| } |
| assert!(cipher_rolled); |
| } |
| match fs.close().await { |
| Err(_) if with_failure => {} |
| Err(e) => panic!("close failed: {:?}", e), |
| Ok(()) => {} |
| } |
| |
| // Reopen and make sure replay succeeds. |
| let device = fs.take_device().await; |
| device.reopen(); |
| let fs = FxFilesystem::open_with_options( |
| device, |
| filesystem::OpenOptions { |
| filesystem_options: filesystem::Options { |
| roll_metadata_key_byte_count: 512 * 1024, |
| ..Default::default() |
| }, |
| ..Default::default() |
| }, |
| ) |
| .await |
| .expect("open failed"); |
| |
| if flush_before_unlock { |
| // Flush before unlocking the store which will see that the encrypted mutations get |
| // written to a file. |
| fs.object_manager().flush().await.expect("flush failed"); |
| } |
| |
| { |
| let store = fs.object_manager().store(store_id).expect("store not found"); |
| store.unlock(Arc::new(InsecureCrypt::new())).await.expect("unlock failed"); |
| } |
| } |
| |
| #[fasync::run(10, test)] |
| async fn test_metadata_key_roll() { |
| run_key_roll_test(/* with_failure: */ false, /* flush_before_unlock: */ false).await; |
| } |
| |
| #[fasync::run(10, test)] |
| async fn test_metadata_key_roll_with_compaction_failure() { |
| run_key_roll_test(/* with_failure: */ true, /* flush_before_unlock: */ false).await; |
| } |
| |
| #[fasync::run(10, test)] |
| async fn test_metadata_key_roll_with_flush_before_unlock() { |
| run_key_roll_test(/* with_failure: */ false, /* flush_before_unlock: */ true).await; |
| } |
| |
| #[fasync::run(10, test)] |
| async fn test_metadata_key_roll_with_compaction_failure_and_flush_before_unlock() { |
| run_key_roll_test(/* with_failure: */ true, /* flush_before_unlock: */ true).await; |
| } |
| } |