| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| errors::FxfsError, |
| filesystem::{ApplyContext, ApplyMode, JournalingObject}, |
| log::*, |
| metrics, |
| object_handle::INVALID_OBJECT_ID, |
| object_store::{ |
| allocator::{Allocator, Reservation}, |
| directory::Directory, |
| journal::{self, JournalCheckpoint}, |
| transaction::{ |
| AssocObj, AssociatedObject, MetadataReservation, Mutation, Transaction, TxnMutation, |
| }, |
| tree_cache::TreeCache, |
| volume::{list_volumes, VOLUMES_DIRECTORY}, |
| LastObjectId, LockState, ObjectDescriptor, ObjectStore, |
| }, |
| round::round_div, |
| serialized_types::{Version, LATEST_VERSION}, |
| }, |
| anyhow::{anyhow, bail, ensure, Context, Error}, |
| fuchsia_inspect::{Property as _, UintProperty}, |
| futures::FutureExt as _, |
| once_cell::sync::OnceCell, |
| rustc_hash::FxHashMap as HashMap, |
| std::{ |
| collections::hash_map::Entry, |
| sync::{Arc, RwLock}, |
| }, |
| }; |
| |
| // Data written to the journal eventually needs to be flushed somewhere (typically into layer |
| // files). Here we conservatively assume that could take up to four times as much space as it does |
| // in the journal. In the layer file, it'll take up at least as much, but we must reserve the same |
| // again that so that there's enough space for compactions, and then we need some spare for |
| // overheads. |
| // |
| // TODO(https://fxbug.dev/42178158): We should come up with a better way of determining what the multiplier |
| // should be here. 2x was too low, as it didn't cover any space for metadata. 4x might be too |
| // much. |
| pub const fn reserved_space_from_journal_usage(journal_usage: u64) -> u64 { |
| journal_usage * 4 |
| } |
| |
| /// ObjectManager is a global loading cache for object stores and other special objects. |
| pub struct ObjectManager { |
| inner: RwLock<Inner>, |
| metadata_reservation: OnceCell<Reservation>, |
| volume_directory: OnceCell<Directory<ObjectStore>>, |
| on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>, |
| } |
| |
| // Whilst we are flushing we need to keep track of the old checkpoint that we are hoping to flush, |
| // and a new one that should apply if we successfully finish the flush. |
| #[derive(Debug)] |
| enum Checkpoints { |
| Current(JournalCheckpoint), |
| Old(JournalCheckpoint), |
| Both(/* old: */ JournalCheckpoint, /* current: */ JournalCheckpoint), |
| } |
| |
| impl Checkpoints { |
| // Returns the earliest checkpoint (which will always be the old one if present). |
| fn earliest(&self) -> &JournalCheckpoint { |
| match self { |
| Checkpoints::Old(x) | Checkpoints::Both(x, _) | Checkpoints::Current(x) => x, |
| } |
| } |
| } |
| |
| // We currently maintain strong references to all stores that have been opened, but there's no |
| // currently no mechanism for releasing stores that aren't being used. |
| struct Inner { |
| stores: HashMap<u64, Arc<ObjectStore>>, |
| root_parent_store_object_id: u64, |
| root_store_object_id: u64, |
| allocator_object_id: u64, |
| allocator: Option<Arc<Allocator>>, |
| |
| // Records dependencies on the journal for objects i.e. an entry for object ID 1, would mean it |
| // has a dependency on journal records from that offset. |
| journal_checkpoints: HashMap<u64, Checkpoints>, |
| |
| // Mappings from object-id to a target reservation amount. The object IDs here are from the |
| // root store namespace, so it can be associated with any object in the root store. A |
| // reservation will be made to cover the *maximum* in this map, since it is assumed that any |
| // requirement is only temporary, for the duration of a compaction, and that once compaction has |
| // finished for a particular object, the space will be recovered. |
| reservations: HashMap<u64, u64>, |
| |
| // The last journal end offset for a transaction that has been applied. This is not necessarily |
| // the same as the start offset for the next transaction because of padding. |
| last_end_offset: u64, |
| |
| // A running counter that tracks metadata space that has been borrowed on the understanding that |
| // eventually it will be recovered (potentially after a full compaction). |
| borrowed_metadata_space: u64, |
| |
| // The maximum transaction size that has been encountered so far. |
| max_transaction_size: (u64, UintProperty), |
| } |
| |
| impl Inner { |
| fn earliest_journal_offset(&self) -> Option<u64> { |
| self.journal_checkpoints.values().map(|c| c.earliest().file_offset).min() |
| } |
| |
| // Returns the required size of the metadata reservation assuming that no space has been |
| // borrowed. The invariant is: reservation-size + borrowed-space = required. |
| fn required_reservation(&self) -> u64 { |
| // Start with the maximum amount of temporary space we might need during compactions. |
| self.reservations.values().max().unwrap_or(&0) |
| |
| // Account for data that has been written to the journal that will need to be written |
| // to layer files when flushed. |
| + self.earliest_journal_offset() |
| .map(|min| reserved_space_from_journal_usage(self.last_end_offset - min)) |
| .unwrap_or(0) |
| |
| // Add extra for temporary space that might be tied up in the journal that hasn't yet been |
| // deallocated. |
| + journal::RESERVED_SPACE |
| } |
| } |
| |
| impl ObjectManager { |
| pub fn new(on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>) -> ObjectManager { |
| ObjectManager { |
| inner: RwLock::new(Inner { |
| stores: HashMap::default(), |
| root_parent_store_object_id: INVALID_OBJECT_ID, |
| root_store_object_id: INVALID_OBJECT_ID, |
| allocator_object_id: INVALID_OBJECT_ID, |
| allocator: None, |
| journal_checkpoints: HashMap::default(), |
| reservations: HashMap::default(), |
| last_end_offset: 0, |
| borrowed_metadata_space: 0, |
| max_transaction_size: (0, metrics::detail().create_uint("max_transaction_size", 0)), |
| }), |
| metadata_reservation: OnceCell::new(), |
| volume_directory: OnceCell::new(), |
| on_new_store, |
| } |
| } |
| |
| pub fn root_parent_store_object_id(&self) -> u64 { |
| self.inner.read().unwrap().root_parent_store_object_id |
| } |
| |
| pub fn root_parent_store(&self) -> Arc<ObjectStore> { |
| let inner = self.inner.read().unwrap(); |
| inner.stores.get(&inner.root_parent_store_object_id).unwrap().clone() |
| } |
| |
| pub fn set_root_parent_store(&self, store: Arc<ObjectStore>) { |
| if let Some(on_new_store) = &self.on_new_store { |
| on_new_store(&store); |
| } |
| let mut inner = self.inner.write().unwrap(); |
| let store_id = store.store_object_id(); |
| inner.stores.insert(store_id, store); |
| inner.root_parent_store_object_id = store_id; |
| } |
| |
| pub fn root_store_object_id(&self) -> u64 { |
| self.inner.read().unwrap().root_store_object_id |
| } |
| |
| pub fn root_store(&self) -> Arc<ObjectStore> { |
| let inner = self.inner.read().unwrap(); |
| inner.stores.get(&inner.root_store_object_id).unwrap().clone() |
| } |
| |
| pub fn set_root_store(&self, store: Arc<ObjectStore>) { |
| if let Some(on_new_store) = &self.on_new_store { |
| on_new_store(&store); |
| } |
| let mut inner = self.inner.write().unwrap(); |
| let store_id = store.store_object_id(); |
| inner.stores.insert(store_id, store); |
| inner.root_store_object_id = store_id; |
| } |
| |
| pub fn is_system_store(&self, store_id: u64) -> bool { |
| let inner = self.inner.read().unwrap(); |
| store_id == inner.root_store_object_id || store_id == inner.root_parent_store_object_id |
| } |
| |
| /// When replaying the journal, we need to replay mutation records into the LSM tree, but we |
| /// cannot properly open the store until all the records have been replayed since some of the |
| /// records we replay might affect how we open, e.g. they might pertain to new layer files |
| /// backing this store. The stores will get properly opened once we finish replaying the |
| /// journal. This should *only* be called during replay. At any other time, `open_store` |
| /// should be used. |
| fn lazy_open_store(&self, store_object_id: u64) -> Arc<ObjectStore> { |
| let mut inner = self.inner.write().unwrap(); |
| assert_ne!(store_object_id, inner.allocator_object_id); |
| let root_parent_store_object_id = inner.root_parent_store_object_id; |
| let root_store = inner.stores.get(&inner.root_store_object_id).unwrap().clone(); |
| let fs = root_store.filesystem(); |
| inner |
| .stores |
| .entry(store_object_id) |
| .or_insert_with(|| { |
| // This assumes that all stores are children of the root store. |
| assert_ne!(store_object_id, root_parent_store_object_id); |
| assert_ne!(store_object_id, root_store.store_object_id()); |
| let store = ObjectStore::new( |
| Some(root_store), |
| store_object_id, |
| fs, |
| None, |
| Box::new(TreeCache::new()), |
| None, |
| LockState::Unknown, |
| LastObjectId::default(), |
| ); |
| if let Some(on_new_store) = &self.on_new_store { |
| on_new_store(&store); |
| } |
| store |
| }) |
| .clone() |
| } |
| |
| /// Returns the store which might or might not be locked. |
| pub fn store(&self, store_object_id: u64) -> Option<Arc<ObjectStore>> { |
| self.inner.read().unwrap().stores.get(&store_object_id).cloned() |
| } |
| |
| /// This is not thread-safe: it assumes that a store won't be forgotten whilst the loop is |
| /// running. This is to be used after replaying the journal. |
| pub async fn on_replay_complete(&self) -> Result<(), Error> { |
| let root_store = self.root_store(); |
| |
| let root_directory = Directory::open(&root_store, root_store.root_directory_object_id()) |
| .await |
| .context("Unable to open root volume directory")?; |
| |
| match root_directory.lookup(VOLUMES_DIRECTORY).await? { |
| None => bail!("Root directory not found"), |
| Some((object_id, ObjectDescriptor::Directory)) => { |
| let volume_directory = Directory::open(&root_store, object_id) |
| .await |
| .context("Unable to open volumes directory")?; |
| self.volume_directory.set(volume_directory).unwrap(); |
| } |
| _ => { |
| bail!(anyhow!(FxfsError::Inconsistent) |
| .context("Unexpected type for volumes directory")) |
| } |
| } |
| |
| let object_ids = list_volumes(self.volume_directory.get().unwrap()).await?; |
| |
| for store_id in object_ids { |
| let store = self.lazy_open_store(store_id); |
| store |
| .on_replay_complete() |
| .await |
| .with_context(|| format!("Store {} failed to load after replay", store_id))?; |
| } |
| |
| ensure!( |
| !self.inner.read().unwrap().stores.iter().any(|(_, store)| store.is_unknown()), |
| FxfsError::Inconsistent |
| ); |
| |
| self.init_metadata_reservation()?; |
| |
| Ok(()) |
| } |
| |
| pub fn volume_directory(&self) -> &Directory<ObjectStore> { |
| self.volume_directory.get().unwrap() |
| } |
| |
| pub fn set_volume_directory(&self, volume_directory: Directory<ObjectStore>) { |
| self.volume_directory.set(volume_directory).unwrap(); |
| } |
| |
| pub fn add_store(&self, store: Arc<ObjectStore>) { |
| if let Some(on_new_store) = &self.on_new_store { |
| on_new_store(&store); |
| } |
| let mut inner = self.inner.write().unwrap(); |
| let store_object_id = store.store_object_id(); |
| assert_ne!(store_object_id, inner.root_parent_store_object_id); |
| assert_ne!(store_object_id, inner.root_store_object_id); |
| assert_ne!(store_object_id, inner.allocator_object_id); |
| inner.stores.insert(store_object_id, store); |
| } |
| |
| pub fn forget_store(&self, store_object_id: u64) { |
| let mut inner = self.inner.write().unwrap(); |
| assert_ne!(store_object_id, inner.allocator_object_id); |
| inner.stores.remove(&store_object_id); |
| inner.reservations.remove(&store_object_id); |
| } |
| |
| pub fn set_allocator(&self, allocator: Arc<Allocator>) { |
| let mut inner = self.inner.write().unwrap(); |
| assert!(!inner.stores.contains_key(&allocator.object_id())); |
| inner.allocator_object_id = allocator.object_id(); |
| inner.allocator = Some(allocator.clone()); |
| } |
| |
| pub fn allocator(&self) -> Arc<Allocator> { |
| self.inner.read().unwrap().allocator.clone().unwrap() |
| } |
| |
| fn apply_mutation( |
| &self, |
| object_id: u64, |
| mutation: Mutation, |
| context: &ApplyContext<'_, '_>, |
| associated_object: AssocObj<'_>, |
| ) -> Result<(), Error> { |
| debug!(oid = object_id, ?mutation, "applying mutation"); |
| let object = { |
| let mut inner = self.inner.write().unwrap(); |
| match mutation { |
| Mutation::BeginFlush => { |
| if let Some(entry) = inner.journal_checkpoints.get_mut(&object_id) { |
| match entry { |
| Checkpoints::Current(x) | Checkpoints::Both(x, _) => { |
| *entry = Checkpoints::Old(x.clone()); |
| } |
| _ => {} |
| } |
| } |
| } |
| Mutation::EndFlush => { |
| if let Entry::Occupied(mut o) = inner.journal_checkpoints.entry(object_id) { |
| let entry = o.get_mut(); |
| match entry { |
| Checkpoints::Old(_) => { |
| o.remove(); |
| } |
| Checkpoints::Both(_, x) => { |
| *entry = Checkpoints::Current(x.clone()); |
| } |
| _ => {} |
| } |
| } |
| } |
| Mutation::DeleteVolume => { |
| inner.stores.remove(&object_id); |
| inner.reservations.remove(&object_id); |
| inner.journal_checkpoints.remove(&object_id); |
| return Ok(()); |
| } |
| _ => { |
| if object_id != inner.root_parent_store_object_id { |
| inner |
| .journal_checkpoints |
| .entry(object_id) |
| .and_modify(|entry| { |
| if let Checkpoints::Old(x) = entry { |
| *entry = |
| Checkpoints::Both(x.clone(), context.checkpoint.clone()); |
| } |
| }) |
| .or_insert_with(|| Checkpoints::Current(context.checkpoint.clone())); |
| } |
| } |
| } |
| if object_id == inner.allocator_object_id { |
| Some(inner.allocator.clone().unwrap() as Arc<dyn JournalingObject>) |
| } else { |
| inner.stores.get(&object_id).map(|x| x.clone() as Arc<dyn JournalingObject>) |
| } |
| } |
| .unwrap_or_else(|| { |
| assert!(context.mode.is_replay()); |
| self.lazy_open_store(object_id) |
| }); |
| associated_object.map(|o| o.will_apply_mutation(&mutation, object_id, self)); |
| object.apply_mutation(mutation, context, associated_object) |
| } |
| |
| /// Called by the journaling system to replay the given mutations. `checkpoint` indicates the |
| /// location in the journal file for this transaction and `end_offset` is the ending journal |
| /// offset. |
| pub fn replay_mutations( |
| &self, |
| mutations: Vec<(u64, Mutation)>, |
| context: &ApplyContext<'_, '_>, |
| end_offset: u64, |
| ) -> Result<(), Error> { |
| debug!(checkpoint = context.checkpoint.file_offset, "REPLAY"); |
| let txn_size = { |
| let mut inner = self.inner.write().unwrap(); |
| if end_offset > inner.last_end_offset { |
| Some(end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset)) |
| } else { |
| None |
| } |
| }; |
| for (object_id, mutation) in mutations { |
| if let Mutation::UpdateBorrowed(borrowed) = mutation { |
| if let Some(txn_size) = txn_size { |
| self.inner.write().unwrap().borrowed_metadata_space = borrowed |
| .checked_add(reserved_space_from_journal_usage(txn_size)) |
| .ok_or(FxfsError::Inconsistent)?; |
| } |
| continue; |
| } |
| self.apply_mutation(object_id, mutation, context, AssocObj::None)?; |
| } |
| Ok(()) |
| } |
| |
| /// Called by the journaling system to apply a transaction. `checkpoint` indicates the location |
| /// in the journal file for this transaction. Returns an optional mutation to be written to be |
| /// included with the transaction. |
| pub fn apply_transaction( |
| &self, |
| transaction: &mut Transaction<'_>, |
| checkpoint: &JournalCheckpoint, |
| ) -> Result<Option<Mutation>, Error> { |
| // Record old values so we can see what changes as a result of this transaction. |
| let old_amount = self.metadata_reservation().amount(); |
| let old_required = self.inner.read().unwrap().required_reservation(); |
| |
| debug!(checkpoint = checkpoint.file_offset, "BEGIN TXN"); |
| let mutations = transaction.take_mutations(); |
| let context = |
| ApplyContext { mode: ApplyMode::Live(transaction), checkpoint: checkpoint.clone() }; |
| for TxnMutation { object_id, mutation, associated_object, .. } in mutations { |
| self.apply_mutation(object_id, mutation, &context, associated_object)?; |
| } |
| debug!("END TXN"); |
| |
| Ok(if let MetadataReservation::Borrowed = transaction.metadata_reservation { |
| // If this transaction is borrowing metadata, figure out what has changed and return a |
| // mutation with the updated value for borrowed. The transaction might have allocated |
| // or deallocated some data from the metadata reservation, or it might have made a |
| // change that means we need to reserve more or less space (e.g. we compacted). |
| let new_amount = self.metadata_reservation().amount(); |
| let mut inner = self.inner.write().unwrap(); |
| let new_required = inner.required_reservation(); |
| let add = old_amount + new_required; |
| let sub = new_amount + old_required; |
| if add >= sub { |
| inner.borrowed_metadata_space += add - sub; |
| } else { |
| inner.borrowed_metadata_space = |
| inner.borrowed_metadata_space.saturating_sub(sub - add); |
| } |
| Some(Mutation::UpdateBorrowed(inner.borrowed_metadata_space)) |
| } else { |
| // This transaction should have had no impact on the metadata reservation or the amount |
| // we need to reserve. |
| debug_assert_eq!(self.metadata_reservation().amount(), old_amount); |
| debug_assert_eq!(self.inner.read().unwrap().required_reservation(), old_required); |
| None |
| }) |
| } |
| |
| /// Called by the journaling system after a transaction has been written providing the end |
| /// offset for the transaction so that we can adjust borrowed metadata space accordingly. |
| pub fn did_commit_transaction( |
| &self, |
| transaction: &mut Transaction<'_>, |
| _checkpoint: &JournalCheckpoint, |
| end_offset: u64, |
| ) { |
| let reservation = self.metadata_reservation(); |
| let mut inner = self.inner.write().unwrap(); |
| let journal_usage = end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset); |
| |
| if journal_usage > inner.max_transaction_size.0 { |
| inner.max_transaction_size.0 = journal_usage; |
| inner.max_transaction_size.1.set(journal_usage); |
| } |
| |
| let txn_space = reserved_space_from_journal_usage(journal_usage); |
| match &mut transaction.metadata_reservation { |
| MetadataReservation::None => unreachable!(), |
| MetadataReservation::Borrowed => { |
| // Account for the amount we need to borrow for the transaction itself now that we |
| // know the transaction size. |
| inner.borrowed_metadata_space += txn_space; |
| |
| // This transaction borrowed metadata space, but it might have returned space to the |
| // transaction that we can now give back to the allocator. |
| let to_give_back = (reservation.amount() + inner.borrowed_metadata_space) |
| .saturating_sub(inner.required_reservation()); |
| if to_give_back > 0 { |
| reservation.give_back(to_give_back); |
| } |
| } |
| MetadataReservation::Hold(hold_amount) => { |
| // Transfer reserved space into the metadata reservation. |
| let txn_reservation = transaction.allocator_reservation.unwrap(); |
| assert_ne!( |
| txn_reservation as *const _, reservation as *const _, |
| "MetadataReservation::Borrowed should be used." |
| ); |
| txn_reservation.commit(txn_space); |
| if txn_reservation.owner_object_id() != reservation.owner_object_id() { |
| assert_eq!( |
| reservation.owner_object_id(), |
| None, |
| "Should not be mixing attributed owners." |
| ); |
| inner |
| .allocator |
| .as_ref() |
| .unwrap() |
| .disown_reservation(txn_reservation.owner_object_id(), txn_space); |
| } |
| if let Some(amount) = hold_amount.checked_sub(txn_space) { |
| *hold_amount = amount; |
| } else { |
| panic!("Transaction was larger than metadata reservation"); |
| } |
| reservation.add(txn_space); |
| } |
| MetadataReservation::Reservation(txn_reservation) => { |
| // Transfer reserved space into the metadata reservation. |
| txn_reservation.move_to(reservation, txn_space); |
| } |
| } |
| // Check that our invariant holds true. |
| debug_assert_eq!( |
| reservation.amount() + inner.borrowed_metadata_space, |
| inner.required_reservation(), |
| "txn_space: {}, reservation_amount: {}, borrowed: {}, required: {}", |
| txn_space, |
| reservation.amount(), |
| inner.borrowed_metadata_space, |
| inner.required_reservation(), |
| ); |
| } |
| |
| /// Drops a transaction. This is called automatically when a transaction is dropped. If the |
| /// transaction has been committed, it should contain no mutations and so nothing will get rolled |
| /// back. For each mutation, drop_mutation is called to allow for roll back (e.g. the allocator |
| /// will unreserve allocations). |
| pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) { |
| for TxnMutation { object_id, mutation, .. } in transaction.take_mutations() { |
| self.object(object_id).map(|o| o.drop_mutation(mutation, transaction)); |
| } |
| } |
| |
| /// Returns the journal file offsets that each object depends on and the checkpoint for the |
| /// minimum offset. |
| pub fn journal_file_offsets(&self) -> (HashMap<u64, u64>, Option<JournalCheckpoint>) { |
| let inner = self.inner.read().unwrap(); |
| let mut min_checkpoint = None; |
| let mut offsets = HashMap::default(); |
| for (&object_id, checkpoint) in &inner.journal_checkpoints { |
| let checkpoint = checkpoint.earliest(); |
| match &mut min_checkpoint { |
| None => min_checkpoint = Some(checkpoint), |
| Some(ref mut min_checkpoint) => { |
| if checkpoint.file_offset < min_checkpoint.file_offset { |
| *min_checkpoint = checkpoint; |
| } |
| } |
| } |
| offsets.insert(object_id, checkpoint.file_offset); |
| } |
| (offsets, min_checkpoint.cloned()) |
| } |
| |
| /// Returns the checkpoint into the journal that the object depends on, or None if the object |
| /// has no journaled updates. |
| pub fn journal_checkpoint(&self, object_id: u64) -> Option<JournalCheckpoint> { |
| self.inner |
| .read() |
| .unwrap() |
| .journal_checkpoints |
| .get(&object_id) |
| .map(|checkpoints| checkpoints.earliest().clone()) |
| } |
| |
| /// Returns true if the object identified by `object_id` is known to have updates recorded in |
| /// the journal that the object depends upon. |
| pub fn needs_flush(&self, object_id: u64) -> bool { |
| self.inner.read().unwrap().journal_checkpoints.contains_key(&object_id) |
| } |
| |
| /// Flushes all known objects. This will then allow the journal space to be freed. |
| /// |
| /// Also returns the earliest known version of a struct on the filesystem. |
| pub async fn flush(&self) -> Result<Version, Error> { |
| let mut object_ids: Vec<_> = |
| self.inner.read().unwrap().journal_checkpoints.keys().cloned().collect(); |
| // Process objects in reverse sorted order because that will mean we compact the root object |
| // store last which will ensure we include the metadata from the compactions of other |
| // objects. |
| object_ids.sort_unstable(); |
| |
| // As we iterate, keep track of the earliest version used by structs in these objects |
| let mut earliest_version: Version = LATEST_VERSION; |
| for &object_id in object_ids.iter().rev() { |
| let object_earliest_version = self.object(object_id).unwrap().flush().await?; |
| if object_earliest_version < earliest_version { |
| earliest_version = object_earliest_version; |
| } |
| } |
| |
| Ok(earliest_version) |
| } |
| |
| fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> { |
| let inner = self.inner.read().unwrap(); |
| if object_id == inner.allocator_object_id { |
| Some(inner.allocator.clone().unwrap() as Arc<dyn JournalingObject>) |
| } else { |
| inner.stores.get(&object_id).map(|x| x.clone() as Arc<dyn JournalingObject>) |
| } |
| } |
| |
| pub fn init_metadata_reservation(&self) -> Result<(), Error> { |
| let inner = self.inner.read().unwrap(); |
| let required = inner.required_reservation(); |
| ensure!(required >= inner.borrowed_metadata_space, FxfsError::Inconsistent); |
| self.metadata_reservation |
| .set( |
| inner |
| .allocator |
| .as_ref() |
| .cloned() |
| .unwrap() |
| .reserve(None, inner.required_reservation() - inner.borrowed_metadata_space) |
| .with_context(|| { |
| format!( |
| "Failed to reserve {} - {} = {} bytes", |
| inner.required_reservation(), |
| inner.borrowed_metadata_space, |
| inner.required_reservation() - inner.borrowed_metadata_space |
| ) |
| })?, |
| ) |
| .unwrap(); |
| Ok(()) |
| } |
| |
| pub fn metadata_reservation(&self) -> &Reservation { |
| self.metadata_reservation.get().unwrap() |
| } |
| |
| pub fn update_reservation(&self, object_id: u64, amount: u64) { |
| self.inner.write().unwrap().reservations.insert(object_id, amount); |
| } |
| |
| pub fn last_end_offset(&self) -> u64 { |
| self.inner.read().unwrap().last_end_offset |
| } |
| |
| pub fn set_last_end_offset(&self, v: u64) { |
| self.inner.write().unwrap().last_end_offset = v; |
| } |
| |
| pub fn borrowed_metadata_space(&self) -> u64 { |
| self.inner.read().unwrap().borrowed_metadata_space |
| } |
| |
| pub fn set_borrowed_metadata_space(&self, v: u64) { |
| self.inner.write().unwrap().borrowed_metadata_space = v; |
| } |
| |
| pub fn write_mutation(&self, object_id: u64, mutation: &Mutation, writer: journal::Writer<'_>) { |
| self.object(object_id).unwrap().write_mutation(mutation, writer); |
| } |
| |
| pub fn unlocked_stores(&self) -> Vec<Arc<ObjectStore>> { |
| let inner = self.inner.read().unwrap(); |
| let mut stores = Vec::new(); |
| for store in inner.stores.values() { |
| if !store.is_locked() { |
| stores.push(store.clone()); |
| } |
| } |
| stores |
| } |
| |
| /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the |
| /// object manager when queried. |
| pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) { |
| let this = Arc::downgrade(self); |
| parent.record_lazy_child(name, move || { |
| let this_clone = this.clone(); |
| async move { |
| let inspector = fuchsia_inspect::Inspector::default(); |
| if let Some(this) = this_clone.upgrade() { |
| let (required, borrowed, earliest_checkpoint) = { |
| // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS. |
| let inner = this.inner.read().unwrap(); |
| ( |
| inner.required_reservation(), |
| inner.borrowed_metadata_space, |
| inner.earliest_journal_offset(), |
| ) |
| }; |
| let root = inspector.root(); |
| root.record_uint("metadata_reservation", this.metadata_reservation().amount()); |
| root.record_uint("required_reservation", required); |
| root.record_uint("borrowed_reservation", borrowed); |
| if let Some(earliest_checkpoint) = earliest_checkpoint { |
| root.record_uint("earliest_checkpoint", earliest_checkpoint); |
| } |
| |
| // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics. |
| if let Some(x) = round_div(100 * borrowed, required) { |
| root.record_uint("borrowed_to_required_reservation_percent", x); |
| } |
| } |
| Ok(inspector) |
| } |
| .boxed() |
| }); |
| } |
| |
| /// Normally, we make new transactions pay for overheads incurred by the journal, such as |
| /// checksums and padding, but if the journal has discarded a significant amount after a replay, |
| /// we run the risk of there not being enough reserved. To handle this, if the amount is |
| /// significant, we force the journal to borrow the space (using a journal created transaction). |
| pub fn needs_borrow_for_journal(&self, checkpoint: u64) -> bool { |
| checkpoint.checked_sub(self.inner.read().unwrap().last_end_offset).unwrap() > 256 |
| } |
| } |
| |
| /// ReservationUpdate is an associated object that sets the amount reserved for an object |
| /// (overwriting any previous amount). Updates must be applied as part of a transaction before |
| /// did_commit_transaction runs because it will reconcile the accounting for reserved metadata |
| /// space. |
| pub struct ReservationUpdate(u64); |
| |
| impl ReservationUpdate { |
| pub fn new(amount: u64) -> Self { |
| Self(amount) |
| } |
| } |
| |
| impl AssociatedObject for ReservationUpdate { |
| fn will_apply_mutation(&self, _mutation: &Mutation, object_id: u64, manager: &ObjectManager) { |
| manager.update_reservation(object_id, self.0); |
| } |
| } |