| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| pub mod allocator; |
| pub mod caching_object_handle; |
| pub mod directory; |
| mod extent_record; |
| mod flush; |
| pub mod graveyard; |
| pub mod journal; |
| mod merge; |
| pub mod object_manager; |
| mod object_record; |
| mod store_object_handle; |
| #[cfg(test)] |
| mod testing; |
| pub mod transaction; |
| mod tree; |
| pub mod volume; |
| mod writeback_cache; |
| |
| pub use caching_object_handle::CachingObjectHandle; |
| pub use directory::Directory; |
| pub use object_record::{ObjectDescriptor, Timestamp}; |
| pub use store_object_handle::StoreObjectHandle; |
| |
| use { |
| crate::{ |
| crypt::{Crypt, KeyPurpose, StreamCipher, WrappedKey, WrappedKeys}, |
| data_buffer::{DataBuffer, MemDataBuffer}, |
| debug_assert_not_too_long, |
| errors::FxfsError, |
| ff1::Ff1, |
| filesystem::{ApplyContext, ApplyMode, Filesystem, FxFilesystem, JournalingObject}, |
| log::*, |
| lsm_tree::{ |
| types::{Item, ItemRef, LayerIterator}, |
| LSMTree, |
| }, |
| metrics::{traits::Metric, StringMetric, UintMetric}, |
| object_handle::{ObjectHandle, ObjectHandleExt, INVALID_OBJECT_ID}, |
| object_store::{ |
| graveyard::Graveyard, |
| journal::JournalCheckpoint, |
| transaction::{ |
| AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options, |
| Transaction, UpdateMutationsKey, |
| }, |
| }, |
| serialized_types::{Version, Versioned, VersionedLatest}, |
| }, |
| allocator::Allocator, |
| anyhow::{anyhow, bail, ensure, Context, Error}, |
| async_trait::async_trait, |
| once_cell::sync::OnceCell, |
| serde::{Deserialize, Serialize}, |
| std::{ |
| collections::VecDeque, |
| ops::Bound, |
| sync::{ |
| atomic::{AtomicBool, Ordering}, |
| Arc, Mutex, Weak, |
| }, |
| }, |
| storage_device::Device, |
| uuid::Uuid, |
| }; |
| |
| // Exposed for serialized_types. |
| pub use allocator::{AllocatorInfo, AllocatorKey, AllocatorValue}; |
| pub use extent_record::{ExtentKey, ExtentValue, DEFAULT_DATA_ATTRIBUTE_ID}; |
| pub use journal::{JournalRecord, SuperBlock, SuperBlockRecord}; |
| pub use object_record::{ |
| AttributeKey, EncryptionKeys, ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, |
| ObjectValue, |
| }; |
| pub use transaction::Mutation; |
| |
| // For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel |
| // attacks more difficult. This mask can be used to extract the hi part of the object ID. |
| const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000; |
| |
| /// StoreObjectHandle stores an owner that must implement this trait, which allows the handle to get |
| /// back to an ObjectStore and provides a callback for creating a data buffer for the handle. |
| pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static { |
| type Buffer: DataBuffer; |
| |
| fn create_data_buffer(&self, object_id: u64, initial_size: u64) -> Self::Buffer; |
| } |
| |
| // StoreInfo stores information about the object store. This is stored within the parent object |
| // store, and is used, for example, to get the persistent layer objects. |
| #[derive(Clone, Debug, Default, Serialize, Deserialize, Versioned)] |
| pub struct StoreInfo { |
| /// The globally unique identifier for the associated object store. If unset, will be all zero. |
| guid: [u8; 16], |
| |
| /// The last used object ID. Note that this field is not accurate in memory; ObjectStore's |
| /// last_object_id field is the one to use in that case. Technically, this might not be the |
| /// last object ID used for the latest transaction that created an object because we use this at |
| /// the point of creating the object but before we commit the transaction. Transactions can |
| /// then get committed in an arbitrary order (or not at all). |
| last_object_id: u64, |
| |
| /// Object ids for layers. TODO(fxbug.dev/95971): need a layer of indirection here so we can |
| /// support snapshots. |
| pub layers: Vec<u64>, |
| |
| /// The object ID for the root directory. |
| root_directory_object_id: u64, |
| |
| /// The object ID for the graveyard. |
| graveyard_directory_object_id: u64, |
| |
| /// The number of live objects in the store. |
| object_count: u64, |
| |
| /// The (wrapped) key that encrypted mutations should use. |
| mutations_key: Option<WrappedKey>, |
| |
| /// Mutations for the store are encrypted using a stream cipher. To decrypt the mutations, we |
| /// need to know the offset in the cipher stream to start it. |
| mutations_cipher_offset: u64, |
| |
| /// If we have to flush the store whilst we do not have the key, we need to write the encrypted |
| /// mutations to an object. This is the object ID of that file if it exists. |
| encrypted_mutations_object_id: u64, |
| |
| /// Object IDs are encrypted to reduce the amount of information that sequential object IDs |
| /// reveal (such as the number of files in the system and the ordering of their creation in |
| /// time). Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits will |
| /// increment after 2^32 object IDs have been used and this allows us to roll the key. |
| object_id_key: Option<WrappedKey>, |
| } |
| |
| impl StoreInfo { |
| /// Create a new/default [`StoreInfo`] but with a newly generated GUID. |
| fn new_with_guid() -> Self { |
| let guid = Uuid::new_v4(); |
| Self { guid: *guid.as_bytes(), ..Default::default() } |
| } |
| } |
| |
| // TODO(fxbug.dev/95972): We should test or put checks in place to ensure this limit isn't exceeded. |
| // It will likely involve placing limits on the maximum number of layers. |
| pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072; |
| |
| // This needs to be large enough to accommodate the maximum amount of unflushed data (data that is |
| // in the journal but hasn't yet been written to layer files) for a store. We set a limit because |
| // we want to limit the amount of memory use in the case the filesystem is corrupt or under attaack. |
| pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize; |
| |
| #[derive(Default)] |
| pub struct HandleOptions { |
| /// If true, transactions used by this handle will skip journal space checks. |
| skip_journal_checks: bool, |
| } |
| |
| #[derive(Default)] |
| pub struct NewChildStoreOptions { |
| /// The store is unencrypted if store is none. |
| pub crypt: Option<Arc<dyn Crypt>>, |
| |
| /// Specifies the object ID in the root store to be used for the store. If set to |
| /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen. |
| pub object_id: u64, |
| } |
| |
| #[derive(Debug, Default, Deserialize, Serialize)] |
| pub struct EncryptedMutations { |
| // Information about the mutations are held here, but the actual encrypted data is held within |
| // data. For each transaction, we record the checkpoint and the count of mutations within the |
| // transaction. The checkpoint is required for the log file offset (which we need to apply the |
| // mutations), and the version so that we can correctly decode the mutation after it has been |
| // decrypted. The count specifies the number of serialized mutations encoded in |data|. |
| transactions: Vec<(JournalCheckpoint, u64)>, |
| |
| // The encrypted mutations. |
| data: Vec<u8>, |
| |
| // If the mutations key was rolled, this holds the amount of data in `data` encrypted using the |
| // old key, and also stores the new key. |
| mutations_key_roll: Option<(usize, WrappedKey)>, |
| } |
| |
| impl Versioned for EncryptedMutations { |
| fn max_serialized_size() -> u64 { |
| MAX_ENCRYPTED_MUTATIONS_SIZE as u64 |
| } |
| } |
| |
| impl EncryptedMutations { |
| fn extend(&mut self, other: EncryptedMutations) -> Result<(), Error> { |
| ensure!( |
| self.mutations_key_roll.is_none() || other.mutations_key_roll.is_none(), |
| FxfsError::Inconsistent |
| ); |
| self.transactions.extend(other.transactions); |
| if let Some((offset, key)) = other.mutations_key_roll { |
| self.mutations_key_roll = Some((self.data.len() + offset, key)); |
| } |
| self.data.extend(other.data); |
| Ok(()) |
| } |
| |
| fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) { |
| self.data.append(&mut data.into()); |
| // If the checkpoint is the same as the last mutation we pushed, increment the count. |
| if let Some((last_checkpoint, count)) = self.transactions.last_mut() { |
| if last_checkpoint.file_offset == checkpoint.file_offset { |
| *count += 1; |
| return; |
| } |
| } |
| self.transactions.push((checkpoint.clone(), 1)); |
| } |
| } |
| |
| // Whilst we are replaying the store, we need to keep track of changes to StoreInfo that arise from |
| // mutations in the journal stream that don't include all the fields in StoreInfo. After replay has |
| // finished, we load the full store information and merge it with the deltas here. |
| // NOTE: While changing this struct, make sure to also update fsck::Fsck::check_child_store if |
| // needed, which currently doesn't bother replaying this information. |
| #[derive(Debug, Default)] |
| struct ReplayInfo { |
| object_count_delta: i64, |
| encrypted_mutations: EncryptedMutations, |
| } |
| |
| impl ReplayInfo { |
| fn new() -> VecDeque<ReplayInfo> { |
| let mut info = VecDeque::new(); |
| info.push_back(ReplayInfo::default()); |
| info |
| } |
| } |
| |
| #[derive(Debug)] |
| enum StoreOrReplayInfo { |
| Info(StoreInfo), |
| |
| // When we flush a store, we take a snapshot of store information when we begin flushing, and |
| // that snapshot gets committed when we end flushing. In the intervening period, we need to |
| // record any changes made to store information. If during replay we don't get around to ending |
| // the flush, we need to hang on to the deltas that were applied before we started flushing. |
| // This is why the information is stored in a VecDeque. The frontmost element is always the |
| // most recent. |
| Replay(VecDeque<ReplayInfo>), |
| } |
| |
| impl StoreOrReplayInfo { |
| fn info(&self) -> Option<&StoreInfo> { |
| match self { |
| StoreOrReplayInfo::Info(info) => Some(info), |
| _ => None, |
| } |
| } |
| |
| fn info_mut(&mut self) -> Option<&mut StoreInfo> { |
| match self { |
| StoreOrReplayInfo::Info(info) => Some(info), |
| _ => None, |
| } |
| } |
| |
| fn replay_info_mut(&mut self) -> Option<&mut VecDeque<ReplayInfo>> { |
| match self { |
| StoreOrReplayInfo::Replay(info) => Some(info), |
| _ => None, |
| } |
| } |
| |
| fn adjust_object_count(&mut self, delta: i64) { |
| match self { |
| StoreOrReplayInfo::Info(StoreInfo { object_count, .. }) => { |
| if delta < 0 { |
| *object_count = object_count.saturating_sub(-delta as u64); |
| } else { |
| *object_count = object_count.saturating_add(delta as u64); |
| } |
| } |
| StoreOrReplayInfo::Replay(replay_info) => { |
| replay_info.front_mut().unwrap().object_count_delta += delta; |
| } |
| } |
| } |
| |
| fn begin_flush(&mut self) { |
| if let StoreOrReplayInfo::Replay(replay_info) = self { |
| // Push a new record on the front keeping the old one. We'll remove the old one when |
| // `end_flush` is called. |
| replay_info.push_front(ReplayInfo::default()); |
| } |
| } |
| |
| fn end_flush(&mut self) { |
| if let StoreOrReplayInfo::Replay(replay_info) = self { |
| replay_info.truncate(1); |
| } |
| } |
| |
| fn push_encrypted_mutation(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) { |
| if let StoreOrReplayInfo::Replay(replay_info) = self { |
| replay_info.front_mut().unwrap().encrypted_mutations.push(checkpoint, data); |
| } |
| } |
| |
| fn set_mutations_key(&mut self, key: WrappedKey) { |
| match self { |
| StoreOrReplayInfo::Info(StoreInfo { mutations_key, .. }) => *mutations_key = Some(key), |
| StoreOrReplayInfo::Replay(replay_info) => { |
| let mutations = &mut replay_info.front_mut().unwrap().encrypted_mutations; |
| mutations.mutations_key_roll = Some((mutations.data.len(), key)); |
| } |
| } |
| } |
| } |
| |
| pub enum LockState { |
| Locked(EncryptedMutations), |
| Unencrypted, |
| Unlocked(Arc<dyn Crypt>), |
| |
| // The store is encrypted but is now in an unusable state (either due to a failure to unlock, or |
| // a failure to lock). |
| Invalid, |
| |
| // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted. |
| // This can happen when lazily opening stores (ObjectManager::lazy_open_store). |
| Unknown, |
| } |
| |
| impl LockState { |
| fn encrypted_mutations(&self) -> Option<&EncryptedMutations> { |
| if let LockState::Locked(m) = self { |
| Some(m) |
| } else { |
| None |
| } |
| } |
| |
| fn encrypted_mutations_mut(&mut self) -> Option<&mut EncryptedMutations> { |
| if let LockState::Locked(m) = self { |
| Some(m) |
| } else { |
| None |
| } |
| } |
| |
| fn is_unlocked(&self) -> bool { |
| matches!(self, LockState::Unlocked(_)) |
| } |
| } |
| |
| #[derive(Default)] |
| struct LastObjectId { |
| // The *unencrypted* value of the last object ID. |
| id: u64, |
| |
| // Encrypted stores will use a cipher to obfuscate the object ID. |
| cipher: Option<Ff1>, |
| } |
| |
| impl LastObjectId { |
| // Returns true if a cipher is needed to generate new object IDs. |
| fn should_create_cipher(&self) -> bool { |
| self.id as u32 == u32::MAX |
| } |
| |
| fn get_next_object_id(&mut self) -> u64 { |
| self.id += 1; |
| if let Some(cipher) = &self.cipher { |
| let hi = self.id & OBJECT_ID_HI_MASK; |
| assert_ne!(hi, INVALID_OBJECT_ID); |
| assert_ne!(self.id as u32, 0); // This would indicate the ID wrapped. |
| hi | cipher.encrypt(self.id as u32) as u64 |
| } else { |
| self.id |
| } |
| } |
| } |
| |
| /// An object store supports a file like interface for objects. Objects are keyed by a 64 bit |
| /// identifier. And object store has to be backed by a parent object store (which stores metadata |
| /// for the object store). The top-level object store (a.k.a. the root parent object store) is |
| /// in-memory only. |
| pub struct ObjectStore { |
| parent_store: Option<Arc<ObjectStore>>, |
| store_object_id: u64, |
| device: Arc<dyn Device>, |
| block_size: u64, |
| filesystem: Weak<dyn Filesystem>, |
| store_info: Mutex<StoreOrReplayInfo>, |
| tree: LSMTree<ObjectKey, ObjectValue>, |
| |
| // When replaying the journal, the store cannot read StoreInfo until the whole journal |
| // has been replayed, so during that time, store_info_handle will be None and records |
| // just get sent to the tree. Once the journal has been replayed, we can open the store |
| // and load all the other layer information. |
| store_info_handle: OnceCell<StoreObjectHandle<ObjectStore>>, |
| |
| // The cipher to use for encrypted mutations, if this store is encrypted. |
| mutations_cipher: Mutex<Option<StreamCipher>>, |
| |
| // Current lock state of the store. |
| lock_state: Mutex<LockState>, |
| |
| // Enable/disable tracing. |
| trace: AtomicBool, |
| |
| // Metrics associated with the store. Only set once the journal has been replayed. |
| metrics: OnceCell<ObjectStoreMetrics>, |
| |
| // Contains the last object ID and, optionally, a cipher to be used when generating new object |
| // IDs. |
| last_object_id: Mutex<LastObjectId>, |
| } |
| |
| struct ObjectStoreMetrics { |
| _guid: StringMetric, |
| _store_id: UintMetric, |
| } |
| |
| impl ObjectStoreMetrics { |
| pub fn new(store_name: impl AsRef<str>, store_id: u64, store_info: &StoreInfo) -> Self { |
| // TODO(fxbug.dev/94075): Support proper nesting of values. |
| let prefix = format!("obj_store_{}", store_name.as_ref()); |
| let guid_string = Uuid::from_bytes(store_info.guid).to_string(); |
| ObjectStoreMetrics { |
| _guid: StringMetric::new(format!("{}_guid", &prefix), guid_string), |
| _store_id: UintMetric::new(format!("{}_id", &prefix), store_id), |
| } |
| } |
| } |
| |
| impl ObjectStore { |
| fn new( |
| parent_store: Option<Arc<ObjectStore>>, |
| store_object_id: u64, |
| filesystem: Arc<dyn Filesystem>, |
| store_info: Option<StoreInfo>, |
| mutations_cipher: Option<StreamCipher>, |
| lock_state: LockState, |
| last_object_id: LastObjectId, |
| ) -> Arc<ObjectStore> { |
| let device = filesystem.device(); |
| let block_size = filesystem.block_size(); |
| Arc::new(ObjectStore { |
| parent_store, |
| store_object_id, |
| device, |
| block_size, |
| filesystem: Arc::downgrade(&filesystem), |
| store_info: Mutex::new(match store_info { |
| Some(info) => StoreOrReplayInfo::Info(info), |
| None => StoreOrReplayInfo::Replay(ReplayInfo::new()), |
| }), |
| tree: LSMTree::new(merge::merge), |
| store_info_handle: OnceCell::new(), |
| mutations_cipher: Mutex::new(mutations_cipher), |
| lock_state: Mutex::new(lock_state), |
| trace: AtomicBool::new(false), |
| metrics: OnceCell::new(), |
| last_object_id: Mutex::new(last_object_id), |
| }) |
| } |
| |
| fn new_empty( |
| parent_store: Option<Arc<ObjectStore>>, |
| store_object_id: u64, |
| filesystem: Arc<dyn Filesystem>, |
| ) -> Arc<Self> { |
| Self::new( |
| parent_store, |
| store_object_id, |
| filesystem, |
| Some(StoreInfo::default()), |
| None, |
| LockState::Unencrypted, |
| LastObjectId::default(), |
| ) |
| } |
| |
| /// Cycle breaker constructor that returns an ObjectStore without a filesystem. |
| /// This should only be used from SuperBlock code. |
| pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self { |
| ObjectStore { |
| parent_store: None, |
| store_object_id, |
| device, |
| block_size, |
| filesystem: Weak::<FxFilesystem>::new(), |
| store_info: Mutex::new(StoreOrReplayInfo::Info(StoreInfo::default())), |
| tree: LSMTree::new(merge::merge), |
| store_info_handle: OnceCell::new(), |
| mutations_cipher: Mutex::new(None), |
| lock_state: Mutex::new(LockState::Unencrypted), |
| trace: AtomicBool::new(false), |
| metrics: OnceCell::new(), |
| last_object_id: Mutex::new(LastObjectId::default()), |
| } |
| } |
| |
| /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has |
| /// been created. |
| pub fn attach_filesystem( |
| mut this: ObjectStore, |
| filesystem: Arc<dyn Filesystem>, |
| ) -> ObjectStore { |
| this.filesystem = Arc::downgrade(&filesystem); |
| this |
| } |
| |
| /// Create a child store. It is a multi-step process: |
| /// |
| /// 1. Call `ObjectStore::new_child_store`. |
| /// 2. Register the store with the object-manager. |
| /// 3. Call `ObjectStore::create` to write the store-info. |
| /// |
| /// If the procedure fails, care must be taken to unregister store with the object-manager. |
| /// |
| /// The steps have to be separate because of lifetime issues when working with a transaction. |
| async fn new_child_store( |
| self: &Arc<Self>, |
| transaction: &mut Transaction<'_>, |
| options: NewChildStoreOptions, |
| ) -> Result<Arc<Self>, Error> { |
| let handle = if options.object_id != INVALID_OBJECT_ID { |
| ObjectStore::create_object_with_id( |
| self, |
| transaction, |
| options.object_id, |
| HandleOptions::default(), |
| None, |
| ) |
| .await? |
| } else { |
| ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await? |
| }; |
| let filesystem = self.filesystem(); |
| let store = if let Some(crypt) = options.crypt { |
| let (wrapped_key, unwrapped_key) = |
| crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?; |
| let (object_id_wrapped, object_id_unwrapped) = |
| crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?; |
| Self::new( |
| Some(self.clone()), |
| handle.object_id(), |
| filesystem.clone(), |
| Some(StoreInfo { |
| mutations_key: Some(wrapped_key), |
| object_id_key: Some(object_id_wrapped), |
| ..StoreInfo::new_with_guid() |
| }), |
| Some(StreamCipher::new(&unwrapped_key, 0)), |
| LockState::Unlocked(crypt), |
| LastObjectId { |
| // We need to avoid accidentally getting INVALID_OBJECT_ID, so we set |
| // the top 32 bits to a non-zero value. |
| id: 1 << 32, |
| cipher: Some(Ff1::new(&object_id_unwrapped)), |
| }, |
| ) |
| } else { |
| Self::new( |
| Some(self.clone()), |
| handle.object_id(), |
| filesystem.clone(), |
| Some(StoreInfo::new_with_guid()), |
| None, |
| LockState::Unencrypted, |
| LastObjectId::default(), |
| ) |
| }; |
| assert!(store.store_info_handle.set(handle).is_ok()); |
| Ok(store) |
| } |
| |
| /// Actually creates the store in a transaction. This will also create a root directory and |
| /// graveyard directory for the store. See `new_child_store` above. |
| async fn create<'a>( |
| self: &'a Arc<Self>, |
| transaction: &mut Transaction<'a>, |
| ) -> Result<(), Error> { |
| let buf = { |
| // Create a root directory and graveyard directory. |
| let graveyard_directory_object_id = Graveyard::create(transaction, &self); |
| let root_directory = Directory::create(transaction, &self).await?; |
| |
| let mut store_info = self.store_info.lock().unwrap(); |
| let mut store_info = store_info.info_mut().unwrap(); |
| |
| store_info.graveyard_directory_object_id = graveyard_directory_object_id; |
| store_info.root_directory_object_id = root_directory.object_id(); |
| |
| let mut serialized_info = Vec::new(); |
| store_info.serialize_with_version(&mut serialized_info)?; |
| let mut buf = self.device.allocate_buffer(serialized_info.len()); |
| buf.as_mut_slice().copy_from_slice(&serialized_info[..]); |
| buf |
| }; |
| |
| self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await |
| } |
| |
| pub fn set_trace(&self, trace: bool) { |
| let old_value = self.trace.swap(trace, Ordering::Relaxed); |
| if trace != old_value { |
| info!(store_id = self.store_object_id(), trace, "OS: trace",); |
| } |
| } |
| |
| pub fn is_root(&self) -> bool { |
| if let Some(parent) = &self.parent_store { |
| parent.parent_store.is_none() |
| } else { |
| // The root parent store isn't the root store. |
| false |
| } |
| } |
| |
| pub fn device(&self) -> &Arc<dyn Device> { |
| &self.device |
| } |
| |
| pub fn block_size(&self) -> u64 { |
| self.block_size |
| } |
| |
| pub fn filesystem(&self) -> Arc<dyn Filesystem> { |
| self.filesystem.upgrade().unwrap() |
| } |
| |
| pub fn store_object_id(&self) -> u64 { |
| self.store_object_id |
| } |
| |
| pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> { |
| &self.tree |
| } |
| |
| pub fn root_directory_object_id(&self) -> u64 { |
| self.store_info.lock().unwrap().info().unwrap().root_directory_object_id |
| } |
| |
| pub fn graveyard_directory_object_id(&self) -> u64 { |
| self.store_info.lock().unwrap().info().unwrap().graveyard_directory_object_id |
| } |
| |
| fn set_graveyard_directory_object_id(&self, oid: u64) { |
| assert_eq!( |
| std::mem::replace( |
| &mut self |
| .store_info |
| .lock() |
| .unwrap() |
| .info_mut() |
| .unwrap() |
| .graveyard_directory_object_id, |
| oid |
| ), |
| INVALID_OBJECT_ID |
| ); |
| } |
| |
| pub fn object_count(&self) -> u64 { |
| self.store_info.lock().unwrap().info().unwrap().object_count |
| } |
| |
| /// Returns the crypt object for the store. Returns None if the store is unencrypted. This will |
| /// panic if the store is locked. |
| pub fn crypt(&self) -> Option<Arc<dyn Crypt>> { |
| match &*self.lock_state.lock().unwrap() { |
| LockState::Locked(_) => panic!("Store is locked"), |
| LockState::Invalid => None, |
| LockState::Unencrypted => None, |
| LockState::Unlocked(crypt) => Some(crypt.clone()), |
| LockState::Unknown => { |
| panic!("Store is of unknown lock state; has the journal been replayed yet?") |
| } |
| } |
| } |
| |
| /// Returns the file size for the object without opening the object. |
| async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> { |
| let item = self |
| .tree |
| .find(&ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Size)) |
| .await? |
| .ok_or(FxfsError::NotFound)?; |
| if let ObjectValue::Attribute { size } = item.value { |
| Ok(size) |
| } else { |
| bail!(FxfsError::NotFile); |
| } |
| } |
| |
| /// `crypt` can be provided if the crypt service should be different to the default; see the |
| /// comment on create_object. |
| pub async fn open_object<S: HandleOwner>( |
| owner: &Arc<S>, |
| object_id: u64, |
| options: HandleOptions, |
| mut crypt: Option<&dyn Crypt>, |
| ) -> Result<StoreObjectHandle<S>, Error> { |
| let store = owner.as_ref().as_ref(); |
| let store_crypt = store.crypt(); |
| if crypt.is_none() { |
| crypt = store_crypt.as_deref(); |
| } |
| let keys = if let Some(crypt) = crypt { |
| match store.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::NotFound)? { |
| Item { value: ObjectValue::Keys(EncryptionKeys::AES256XTS(keys)), .. } => { |
| Some(crypt.unwrap_keys(&keys, object_id).await?) |
| } |
| _ => { |
| bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")) |
| } |
| } |
| } else { |
| None |
| }; |
| |
| let item = store |
| .tree |
| .find(&ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Size)) |
| .await? |
| .ok_or(FxfsError::NotFound)?; |
| if let ObjectValue::Attribute { size } = item.value { |
| Ok(StoreObjectHandle::new( |
| owner.clone(), |
| object_id, |
| keys, |
| DEFAULT_DATA_ATTRIBUTE_ID, |
| size, |
| options, |
| false, |
| )) |
| } else { |
| bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attribute")); |
| } |
| } |
| |
| // See the comment on create_object for the semantics of the `crypt` argument. If object_id == |
| // INVALID_OBJECT_ID (which should usually be the case), an object ID will be chosen. |
| async fn create_object_with_id<S: HandleOwner>( |
| owner: &Arc<S>, |
| transaction: &mut Transaction<'_>, |
| mut object_id: u64, |
| options: HandleOptions, |
| mut crypt: Option<&dyn Crypt>, |
| ) -> Result<StoreObjectHandle<S>, Error> { |
| let store = owner.as_ref().as_ref(); |
| if object_id == INVALID_OBJECT_ID { |
| object_id = store.get_next_object_id().await?; |
| } else { |
| store.update_last_object_id(object_id); |
| } |
| let store_crypt; |
| if crypt.is_none() { |
| store_crypt = store.crypt(); |
| crypt = store_crypt.as_deref(); |
| } |
| let now = Timestamp::now(); |
| transaction.add( |
| store.store_object_id(), |
| Mutation::insert_object( |
| ObjectKey::object(object_id), |
| ObjectValue::file(1, 0, now.clone(), now), |
| ), |
| ); |
| let unwrapped_keys = if let Some(crypt) = crypt { |
| let (key, unwrapped_key) = crypt.create_key(object_id, KeyPurpose::Data).await?; |
| transaction.add( |
| store.store_object_id(), |
| Mutation::insert_object( |
| ObjectKey::keys(object_id), |
| ObjectValue::keys(EncryptionKeys::AES256XTS(WrappedKeys(vec![(0, key)]))), |
| ), |
| ); |
| Some(vec![(0, unwrapped_key)]) |
| } else { |
| None |
| }; |
| transaction.add( |
| store.store_object_id(), |
| Mutation::insert_object( |
| ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Size), |
| ObjectValue::attribute(0), |
| ), |
| ); |
| Ok(StoreObjectHandle::new( |
| owner.clone(), |
| object_id, |
| unwrapped_keys, |
| DEFAULT_DATA_ATTRIBUTE_ID, |
| 0, |
| options, |
| false, |
| )) |
| } |
| |
| /// There are instances where a store might not be an encrypted store, but the object should |
| /// still be encrypted. For example, the layer files for child stores should be encrypted using |
| /// the crypt service of the child store even though the root store doesn't have encryption. If |
| /// `crypt` is None, the default for the store is used. |
| pub async fn create_object<S: HandleOwner>( |
| owner: &Arc<S>, |
| mut transaction: &mut Transaction<'_>, |
| options: HandleOptions, |
| crypt: Option<&dyn Crypt>, |
| ) -> Result<StoreObjectHandle<S>, Error> { |
| ObjectStore::create_object_with_id( |
| owner, |
| &mut transaction, |
| INVALID_OBJECT_ID, |
| options, |
| crypt, |
| ) |
| .await |
| } |
| |
| /// Adjusts the reference count for a given object. If the reference count reaches zero, the |
| /// object is moved into the graveyard and true is returned. |
| pub async fn adjust_refs( |
| &self, |
| transaction: &mut Transaction<'_>, |
| oid: u64, |
| delta: i64, |
| ) -> Result<bool, Error> { |
| let mut mutation = self.txn_get_object_mutation(transaction, oid).await?; |
| let refs = if let ObjectValue::Object { kind: ObjectKind::File { refs, .. }, .. } = |
| &mut mutation.item.value |
| { |
| *refs = if delta < 0 { |
| refs.checked_sub((-delta) as u64) |
| } else { |
| refs.checked_add(delta as u64) |
| } |
| .ok_or(anyhow!("refs underflow/overflow"))?; |
| refs |
| } else { |
| bail!(FxfsError::NotFile); |
| }; |
| if *refs == 0 { |
| self.add_to_graveyard(transaction, oid); |
| |
| // We might still need to adjust the reference count if delta was something other than |
| // -1. |
| if delta != -1 { |
| *refs = 1; |
| transaction.add(self.store_object_id, Mutation::ObjectStore(mutation)); |
| } |
| Ok(true) |
| } else { |
| transaction.add(self.store_object_id, Mutation::ObjectStore(mutation)); |
| Ok(false) |
| } |
| } |
| |
| // Purges an object that is in the graveyard. This has no locking, so it's not safe to call |
| // this more than once simultaneously for a given object. |
| pub async fn tombstone(&self, object_id: u64, txn_options: Options<'_>) -> Result<(), Error> { |
| let fs = self.filesystem(); |
| let mut search_key = ObjectKey::extent(object_id, 0, 0..0); |
| // TODO(fxbug.dev/95976): There should be a test that runs fsck after each transaction. |
| loop { |
| let mut transaction = fs.clone().new_transaction(&[], txn_options).await?; |
| let next_key = self.delete_extents(&mut transaction, &search_key).await?; |
| if next_key.is_none() { |
| // Tombstone records *must* be merged so as to consume all other records for the |
| // object. |
| transaction.add( |
| self.store_object_id, |
| Mutation::merge_object( |
| ObjectKey::object(search_key.object_id), |
| ObjectValue::None, |
| ), |
| ); |
| |
| self.remove_from_graveyard(&mut transaction, search_key.object_id); |
| } |
| transaction.commit().await?; |
| search_key = if let Some(next_key) = next_key { |
| next_key |
| } else { |
| break; |
| }; |
| } |
| Ok(()) |
| } |
| |
| // Makes progress on deleting part of a file but stops before a transaction gets too big. |
| async fn delete_extents( |
| &self, |
| transaction: &mut Transaction<'_>, |
| search_key: &ObjectKey, |
| ) -> Result<Option<ObjectKey>, Error> { |
| let layer_set = self.tree.layer_set(); |
| let mut merger = layer_set.merger(); |
| let allocator = self.allocator(); |
| let mut iter = merger.seek(Bound::Included(search_key)).await?; |
| let mut delete_extent_mutation = None; |
| // Loop over the extents and deallocate them. |
| while let Some(item_ref) = iter.get() { |
| if item_ref.key.object_id != search_key.object_id { |
| break; |
| } |
| if let ItemRef { |
| key: |
| ObjectKey { |
| data: |
| ObjectKeyData::Attribute( |
| attribute_id, |
| AttributeKey::Extent(ExtentKey { range }), |
| ), |
| .. |
| }, |
| value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }), |
| .. |
| } = item_ref |
| { |
| let device_range = *device_offset..*device_offset + (range.end - range.start); |
| allocator.deallocate(transaction, self.store_object_id(), device_range).await?; |
| delete_extent_mutation = Some(Mutation::merge_object( |
| ObjectKey::extent(search_key.object_id, *attribute_id, 0..range.end), |
| ObjectValue::deleted_extent(), |
| )); |
| // Stop if the transaction is getting too big. At time of writing, this threshold |
| // limits transactions to about 10,000 bytes. |
| const TRANSACTION_MUTATION_THRESHOLD: usize = 200; |
| if transaction.mutations.len() >= TRANSACTION_MUTATION_THRESHOLD { |
| transaction.add(self.store_object_id, delete_extent_mutation.unwrap()); |
| return Ok(Some(ObjectKey::attribute( |
| search_key.object_id, |
| *attribute_id, |
| AttributeKey::Extent(ExtentKey::search_key_from_offset(range.end)), |
| ))); |
| } |
| } |
| iter.advance().await?; |
| } |
| if let Some(m) = delete_extent_mutation { |
| transaction.add(self.store_object_id, m); |
| } |
| Ok(None) |
| } |
| |
| /// Returns all objects that exist in the parent store that pertain to this object store. |
| /// Note that this doesn't include the object_id of the store itself which is generally |
| /// referenced externally. |
| pub fn parent_objects(&self) -> Vec<u64> { |
| assert!(self.store_info_handle.get().is_some()); |
| let mut objects = Vec::new(); |
| // We should not include the ID of the store itself, since that should be referred to in the |
| // volume directory. |
| let guard = self.store_info.lock().unwrap(); |
| let store_info = guard.info().unwrap(); |
| objects.extend_from_slice(&store_info.layers); |
| if store_info.encrypted_mutations_object_id != INVALID_OBJECT_ID { |
| objects.push(store_info.encrypted_mutations_object_id); |
| } |
| objects |
| } |
| |
| /// Returns root objects for this store. |
| pub fn root_objects(&self) -> Vec<u64> { |
| let mut objects = Vec::new(); |
| let store_info = self.store_info.lock().unwrap(); |
| let info = store_info.info().unwrap(); |
| if info.root_directory_object_id != INVALID_OBJECT_ID { |
| objects.push(info.root_directory_object_id); |
| } |
| if info.graveyard_directory_object_id != INVALID_OBJECT_ID { |
| objects.push(info.graveyard_directory_object_id); |
| } |
| objects |
| } |
| |
| pub fn store_info(&self) -> StoreInfo { |
| self.store_info.lock().unwrap().info().unwrap().clone() |
| } |
| |
| /// Returns None if called during journal replay. |
| pub fn store_info_handle_object_id(&self) -> Option<u64> { |
| self.store_info_handle.get().map(|h| h.object_id()) |
| } |
| |
| /// Called when replay for a store has completed. |
| async fn on_replay_complete(&self) -> Result<(), Error> { |
| if self.parent_store.is_none() || self.store_info_handle.get().is_some() { |
| return Ok(()); |
| } |
| |
| let parent_store = self.parent_store.as_ref().unwrap(); |
| let handle = ObjectStore::open_object( |
| &parent_store, |
| self.store_object_id, |
| HandleOptions::default(), |
| None, |
| ) |
| .await?; |
| |
| let mut encrypted_mutations = EncryptedMutations::default(); |
| |
| let (object_tree_layer_object_ids, encrypted) = { |
| let mut info = if handle.get_size() > 0 { |
| let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?; |
| let mut cursor = std::io::Cursor::new(&serialized_info[..]); |
| let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor) |
| .context("Failed to deserialize StoreInfo")?; |
| if store_info.object_id_key.is_none() { |
| self.update_last_object_id(store_info.last_object_id); |
| } |
| store_info |
| } else { |
| // The store_info will be absent for a newly created and empty object store. |
| StoreInfo::default() |
| }; |
| |
| // Merge the replay information. |
| let mut store_info = self.store_info.lock().unwrap(); |
| |
| // The frontmost element of the replay information is the most recent so we must apply |
| // that last. |
| for replay_info in store_info.replay_info_mut().unwrap().iter_mut().rev() { |
| if replay_info.object_count_delta < 0 { |
| info.object_count = |
| info.object_count.saturating_sub(-replay_info.object_count_delta as u64); |
| } else { |
| info.object_count = |
| info.object_count.saturating_add(replay_info.object_count_delta as u64); |
| } |
| encrypted_mutations.extend(std::mem::take(&mut replay_info.encrypted_mutations))?; |
| } |
| |
| let result = ( |
| info.layers.clone(), |
| if info.mutations_key.is_some() { |
| Some(info.encrypted_mutations_object_id) |
| } else { |
| None |
| }, |
| ); |
| *store_info = StoreOrReplayInfo::Info(info); |
| result |
| }; |
| |
| if encrypted.is_some() { |
| *self.lock_state.lock().unwrap() = LockState::Locked(encrypted_mutations); |
| } else { |
| *self.lock_state.lock().unwrap() = LockState::Unencrypted; |
| } |
| |
| // TODO(fxbug.dev/95978): the layer size here could be bad and cause overflow. |
| |
| // If the store is encrypted, we can't open the object tree layers now, but we need to |
| // compute the size of the layers. |
| let total_size: u64 = if let Some(encrypted_mutations_object_id) = encrypted { |
| let mut size = 0; |
| let parent_store = self.parent_store.as_ref().unwrap(); |
| for oid in object_tree_layer_object_ids.into_iter() { |
| size += parent_store.get_file_size(oid).await?; |
| } |
| if encrypted_mutations_object_id != INVALID_OBJECT_ID { |
| size += layer_size_from_encrypted_mutations_size( |
| parent_store.get_file_size(encrypted_mutations_object_id).await?, |
| ); |
| } |
| size |
| } else { |
| let object_layers = self.open_layers(object_tree_layer_object_ids, None).await?; |
| let size: u64 = object_layers.iter().map(|h| h.get_size()).sum(); |
| self.tree.append_layers(object_layers.into()).await?; |
| *self.lock_state.lock().unwrap() = LockState::Unencrypted; |
| size |
| }; |
| |
| assert!(self.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!"); |
| self.filesystem().object_manager().update_reservation( |
| self.store_object_id, |
| tree::reservation_amount_from_layer_size(total_size), |
| ); |
| |
| Ok(()) |
| } |
| |
| /// Record metrics for this ObjectStore under the given `store_name`. |
| /// Acts as a no-op if called multiple times. |
| pub fn record_metrics(&self, store_name: impl AsRef<str>) { |
| self.metrics.get_or_init(|| { |
| ObjectStoreMetrics::new(store_name, self.store_object_id, &self.store_info()) |
| }); |
| } |
| |
| async fn open_layers( |
| &self, |
| object_ids: impl std::iter::IntoIterator<Item = u64>, |
| crypt: Option<&dyn Crypt>, |
| ) -> Result<Vec<CachingObjectHandle<ObjectStore>>, Error> { |
| let parent_store = self.parent_store.as_ref().unwrap(); |
| let mut handles = Vec::new(); |
| for object_id in object_ids { |
| let handle = CachingObjectHandle::new( |
| ObjectStore::open_object(&parent_store, object_id, HandleOptions::default(), crypt) |
| .await |
| .context(format!("Failed to open layer file {}", object_id))?, |
| ); |
| handles.push(handle); |
| } |
| Ok(handles) |
| } |
| |
| /// Unlocks a store so that it is ready to be used. |
| /// This is not thread-safe. |
| pub async fn unlock(&self, crypt: Arc<dyn Crypt>) -> Result<(), Error> { |
| match &*self.lock_state.lock().unwrap() { |
| LockState::Locked(_) => {} |
| LockState::Invalid => bail!(FxfsError::Inconsistent), |
| LockState::Unencrypted => bail!(FxfsError::InvalidArgs), |
| LockState::Unlocked(_) => bail!(FxfsError::AlreadyBound), |
| LockState::Unknown => panic!("Store was unlocked before replay"), |
| } |
| |
| // We must lock flushing since that can modify store_info and the encrypted mutations file. |
| let keys = [LockKey::flush(self.store_object_id())]; |
| let fs = self.filesystem(); |
| let guard = debug_assert_not_too_long!(fs.write_lock(&keys)); |
| |
| let store_info = self.store_info(); |
| self.tree |
| .append_layers( |
| self.open_layers(store_info.layers.iter().cloned(), Some(crypt.as_ref())) |
| .await? |
| .into(), |
| ) |
| .await |
| .context("Failed to read object tree layer file contents")?; |
| |
| let unwrapped_key = crypt |
| .unwrap_key(store_info.mutations_key.as_ref().unwrap(), self.store_object_id) |
| .await |
| .context("Failed to unwrap mutations keys")?; |
| let mut mutations_cipher = |
| StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset); |
| |
| let wrapped_key = store_info.object_id_key.as_ref().ok_or(FxfsError::Inconsistent)?; |
| let object_id_cipher = |
| Ff1::new(&crypt.unwrap_key(wrapped_key, self.store_object_id).await?); |
| { |
| let mut last_object_id = self.last_object_id.lock().unwrap(); |
| last_object_id.cipher = Some(object_id_cipher); |
| } |
| self.update_last_object_id(store_info.last_object_id); |
| |
| // Apply the encrypted mutations. |
| let mut mutations = { |
| if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID { |
| EncryptedMutations::default() |
| } else { |
| let parent_store = self.parent_store.as_ref().unwrap(); |
| let handle = ObjectStore::open_object( |
| &parent_store, |
| store_info.encrypted_mutations_object_id, |
| HandleOptions::default(), |
| None, |
| ) |
| .await?; |
| let mut cursor = std::io::Cursor::new( |
| handle |
| .contents(MAX_ENCRYPTED_MUTATIONS_SIZE) |
| .await |
| .context(FxfsError::Inconsistent)?, |
| ); |
| let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)?.0; |
| let len = cursor.get_ref().len() as u64; |
| while cursor.position() < len { |
| mutations |
| .extend(EncryptedMutations::deserialize_with_version(&mut cursor)?.0)?; |
| } |
| mutations |
| } |
| }; |
| |
| if let LockState::Locked(m) = |
| std::mem::replace(&mut *self.lock_state.lock().unwrap(), LockState::Invalid) |
| { |
| mutations.extend(m)?; |
| } else { |
| unreachable!(); |
| } |
| |
| let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations; |
| if let Some((offset, key)) = mutations_key_roll { |
| let (old, new) = data.split_at_mut(offset); |
| mutations_cipher.decrypt(old); |
| let unwrapped_key = crypt |
| .unwrap_key(&key, self.store_object_id) |
| .await |
| .context("Failed to unwrap mutations keys")?; |
| mutations_cipher = StreamCipher::new(&unwrapped_key, 0); |
| mutations_cipher.decrypt(new); |
| } else { |
| mutations_cipher.decrypt(&mut data); |
| } |
| |
| let mut cursor = std::io::Cursor::new(data); |
| for (checkpoint, count) in transactions { |
| let context = ApplyContext { mode: ApplyMode::Replay, checkpoint }; |
| for _ in 0..count { |
| self.apply_mutation( |
| Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)?, |
| &context, |
| AssocObj::None, |
| ) |
| .await; |
| } |
| } |
| |
| *self.mutations_cipher.lock().unwrap() = Some(mutations_cipher); |
| *self.lock_state.lock().unwrap() = LockState::Unlocked(crypt); |
| |
| // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise |
| // it's possible for more writes to be queued and for the store to be locked before we can |
| // flush anything and that can repeat. |
| std::mem::drop(guard); |
| self.flush_with_reason(flush::Reason::Unlock).await?; |
| |
| Ok(()) |
| } |
| |
| pub fn is_locked(&self) -> bool { |
| matches!(*self.lock_state.lock().unwrap(), LockState::Locked(_)) |
| } |
| |
| // Locks a store. This assumes no other concurrent access to the store. Whilst this can return |
| // an error, the store will be placed into an unusable but safe state (i.e. no lingering |
| // unencrypted data) if an error is encountered. |
| pub async fn lock(&self) -> Result<(), Error> { |
| assert!(self.lock_state.lock().unwrap().is_unlocked()); |
| |
| // We must flush because we want to discard unencrypted data and we can't easily replay |
| // again later if we try and unlock this store again. |
| let result = self.flush_with_reason(flush::Reason::Lock).await; |
| |
| *self.lock_state.lock().unwrap() = if result.is_err() { |
| // Seal the mutable layer so that we dump unencrypted data when we reset the immutable |
| // layers below. |
| self.tree.seal().await; |
| LockState::Invalid |
| } else { |
| // There should have been no concurrent access with the store so there should be nothing |
| // to flush. |
| assert!(!self.filesystem().object_manager().needs_flush(self.store_object_id)); |
| LockState::Locked(EncryptedMutations::default()) |
| }; |
| |
| self.tree.reset_immutable_layers(); |
| |
| result?; |
| Ok(()) |
| } |
| |
| // Returns INVALID_OBJECT_ID if the object ID cipher needs to be created or rolled. |
| fn maybe_get_next_object_id(&self) -> u64 { |
| let mut last_object_id = self.last_object_id.lock().unwrap(); |
| if last_object_id.should_create_cipher() { |
| INVALID_OBJECT_ID |
| } else { |
| last_object_id.get_next_object_id() |
| } |
| } |
| |
| // Returns a new object ID that can be used. This will create an object ID cipher if necessary. |
| pub async fn get_next_object_id(&self) -> Result<u64, Error> { |
| let object_id = self.maybe_get_next_object_id(); |
| if object_id != INVALID_OBJECT_ID { |
| return Ok(object_id); |
| } |
| |
| // Create a transaction (which has a lock) and then check again. |
| let mut transaction = self |
| .filesystem() |
| .new_transaction( |
| &[LockKey::object( |
| self.parent_store.as_ref().unwrap().store_object_id, |
| self.store_object_id, |
| )], |
| Options { |
| // We must skip journal checks because this transaction might be needed to |
| // compact. |
| skip_journal_checks: true, |
| borrow_metadata_space: true, |
| ..Default::default() |
| }, |
| ) |
| .await?; |
| |
| { |
| let mut last_object_id = self.last_object_id.lock().unwrap(); |
| if !last_object_id.should_create_cipher() { |
| // We lost a race. |
| return Ok(last_object_id.get_next_object_id()); |
| } |
| } |
| |
| // Create a key. |
| let (object_id_wrapped, object_id_unwrapped) = |
| self.crypt().unwrap().create_key(self.store_object_id, KeyPurpose::Metadata).await?; |
| |
| // Update StoreInfo. |
| let buf = { |
| let mut store_info = self.store_info.lock().unwrap(); |
| let mut store_info = store_info.info_mut().unwrap(); |
| store_info.object_id_key = Some(object_id_wrapped); |
| let mut serialized_info = Vec::new(); |
| store_info.serialize_with_version(&mut serialized_info)?; |
| let mut buf = self.device.allocate_buffer(serialized_info.len()); |
| buf.as_mut_slice().copy_from_slice(&serialized_info[..]); |
| buf |
| }; |
| |
| self.store_info_handle |
| .get() |
| .unwrap() |
| .txn_write(&mut transaction, 0u64, buf.as_ref()) |
| .await?; |
| transaction.commit().await?; |
| |
| let mut last_object_id = self.last_object_id.lock().unwrap(); |
| last_object_id.cipher = Some(Ff1::new(&object_id_unwrapped)); |
| last_object_id.id = (last_object_id.id + (1 << 32)) & OBJECT_ID_HI_MASK; |
| |
| Ok((last_object_id.id & OBJECT_ID_HI_MASK) |
| | last_object_id.cipher.as_ref().unwrap().encrypt(last_object_id.id as u32) as u64) |
| } |
| |
| fn allocator(&self) -> Arc<dyn Allocator> { |
| self.filesystem().allocator() |
| } |
| |
| // If |transaction| has an impending mutation for the underlying object, returns that. |
| // Otherwise, looks up the object from the tree and returns a suitable mutation for it. The |
| // mutation is returned here rather than the item because the mutation includes the operation |
| // which has significance: inserting an object implies it's the first of its kind unlike |
| // replacing an object. |
| async fn txn_get_object_mutation( |
| &self, |
| transaction: &Transaction<'_>, |
| object_id: u64, |
| ) -> Result<ObjectStoreMutation, Error> { |
| if let Some(mutation) = |
| transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id)) |
| { |
| Ok(mutation.clone()) |
| } else { |
| Ok(ObjectStoreMutation { |
| item: self |
| .tree |
| .find(&ObjectKey::object(object_id)) |
| .await? |
| .ok_or(anyhow!(FxfsError::NotFound))?, |
| op: Operation::ReplaceOrInsert, |
| }) |
| } |
| } |
| |
| fn update_last_object_id(&self, mut object_id: u64) { |
| let mut last_object_id = self.last_object_id.lock().unwrap(); |
| // For encrypted stores, object_id will be encrypted here, so we must decrypt first. |
| if let Some(cipher) = &last_object_id.cipher { |
| // If the object ID cipher has been rolled, then it's possible we might see object IDs |
| // that were generated using a different cipher so the decrypt here will return the |
| // wrong value, but that won't matter because the hi part of the object ID should still |
| // discriminate. |
| object_id = object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64; |
| } |
| if object_id > last_object_id.id { |
| last_object_id.id = object_id; |
| } |
| } |
| |
| /// Adds the specified object to the graveyard. |
| fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) { |
| let graveyard_id = self.graveyard_directory_object_id(); |
| assert_ne!(graveyard_id, INVALID_OBJECT_ID); |
| transaction.add( |
| self.store_object_id, |
| Mutation::replace_or_insert_object( |
| ObjectKey::graveyard_entry(graveyard_id, object_id), |
| ObjectValue::Some, |
| ), |
| ); |
| } |
| |
| /// Removes the specified object from the graveyard. |
| fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) { |
| transaction.add( |
| self.store_object_id, |
| Mutation::replace_or_insert_object( |
| ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id), |
| ObjectValue::None, |
| ), |
| ); |
| } |
| } |
| |
| #[async_trait] |
| impl JournalingObject for ObjectStore { |
| async fn apply_mutation( |
| &self, |
| mutation: Mutation, |
| context: &ApplyContext<'_, '_>, |
| _assoc_obj: AssocObj<'_>, |
| ) { |
| // It's not safe to fully open a store until replay is fully complete (because subsequent |
| // mutations could render current records invalid). |
| |
| match mutation { |
| Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => { |
| item.sequence = context.checkpoint.file_offset; |
| match op { |
| Operation::Insert => { |
| // If we are inserting an object record for the first time, it signifies the |
| // birth of the object so we need to adjust the object count. |
| if matches!(item.value, ObjectValue::Object { .. }) { |
| self.store_info.lock().unwrap().adjust_object_count(1); |
| if context.mode.is_replay() { |
| self.update_last_object_id(item.key.object_id); |
| } |
| } |
| self.tree.insert(item).await; |
| } |
| Operation::ReplaceOrInsert => self.tree.replace_or_insert(item).await, |
| Operation::Merge => { |
| if item.is_tombstone() { |
| self.store_info.lock().unwrap().adjust_object_count(-1); |
| } |
| let lower_bound = item.key.key_for_merge_into(); |
| self.tree.merge_into(item, &lower_bound).await; |
| } |
| } |
| } |
| Mutation::BeginFlush => { |
| self.tree.seal().await; |
| self.store_info.lock().unwrap().begin_flush(); |
| } |
| Mutation::EndFlush => { |
| if context.mode.is_replay() { |
| self.tree.reset_immutable_layers(); |
| self.store_info.lock().unwrap().end_flush(); |
| } |
| } |
| Mutation::EncryptedObjectStore(data) => { |
| self.store_info.lock().unwrap().push_encrypted_mutation(&context.checkpoint, data); |
| } |
| Mutation::UpdateMutationsKey(UpdateMutationsKey(key)) => { |
| self.store_info.lock().unwrap().set_mutations_key(key); |
| } |
| // TODO(fxbug.dev/95979): ideally, we'd return an error here instead. This should only |
| // be possible with a bad mutation during replay. |
| _ => panic!("unexpected mutation: {:?}", mutation), |
| } |
| } |
| |
| fn drop_mutation(&self, _mutation: Mutation, _transaction: &Transaction<'_>) {} |
| |
| /// Push all in-memory structures to the device. This is not necessary for sync since the |
| /// journal will take care of it. This is supposed to be called when there is either memory or |
| /// space pressure (flushing the store will persist in-memory data and allow the journal file to |
| /// be trimmed). |
| /// |
| /// Also returns the earliest version of a struct in the filesystem (when known). |
| async fn flush(&self) -> Result<Version, Error> { |
| return self.flush_with_reason(flush::Reason::Journal).await; |
| } |
| |
| fn encrypt_mutation(&self, mutation: &Mutation) -> Option<Mutation> { |
| match mutation { |
| // Encrypt all object store mutations. |
| Mutation::ObjectStore(_) => self.mutations_cipher.lock().unwrap().as_mut().map(|c| { |
| let mut buffer = Vec::new(); |
| mutation.serialize_into(&mut buffer).unwrap(); |
| c.encrypt(&mut buffer); |
| Mutation::EncryptedObjectStore(buffer.into()) |
| }), |
| _ => None, |
| } |
| } |
| } |
| |
| // TODO(fxbug.dev/95980): MemDataBuffer has size limits so we should check sizes before we use it. |
| impl HandleOwner for ObjectStore { |
| type Buffer = MemDataBuffer; |
| |
| fn create_data_buffer(&self, _object_id: u64, initial_size: u64) -> Self::Buffer { |
| MemDataBuffer::new(initial_size) |
| } |
| } |
| |
| impl AsRef<ObjectStore> for ObjectStore { |
| fn as_ref(&self) -> &ObjectStore { |
| self |
| } |
| } |
| |
| fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 { |
| // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of |
| // the amount of metadata space that might need to be reserved to allow the encrypted mutations |
| // to be written to layer files. It needs to be >= than reservation_amount_from_layer_size will |
| // return once the data has been written to layer files and <= than |
| // reserved_space_from_journal_usage would use. We can't just use |
| // reserved_space_from_journal_usage because the encrypted mutations file includes some extra |
| // data (it includes the checkpoints) that isn't written in the same way to the journal. |
| size * 3 |
| } |
| |
| impl AssociatedObject for ObjectStore {} |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::OBJECT_ID_HI_MASK, |
| crate::{ |
| crypt::{insecure::InsecureCrypt, Crypt}, |
| errors::FxfsError, |
| filesystem::{ |
| Filesystem, FxFilesystem, JournalingObject, OpenFxFilesystem, SyncOptions, |
| }, |
| fsck::fsck, |
| lsm_tree::types::{Item, ItemRef, LayerIterator}, |
| object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle, INVALID_OBJECT_ID}, |
| object_store::{ |
| directory::Directory, |
| extent_record::{ExtentKey, ExtentValue}, |
| object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue}, |
| transaction::{Options, TransactionHandler}, |
| volume::root_volume, |
| HandleOptions, ObjectStore, |
| }, |
| }, |
| assert_matches::assert_matches, |
| fuchsia_async as fasync, |
| futures::join, |
| std::{ |
| ops::Bound, |
| sync::{Arc, Mutex}, |
| time::Duration, |
| }, |
| storage_device::{fake_device::FakeDevice, DeviceHolder}, |
| }; |
| |
| const TEST_DEVICE_BLOCK_SIZE: u32 = 512; |
| |
| async fn test_filesystem() -> OpenFxFilesystem { |
| let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE)); |
| FxFilesystem::new_empty(device).await.expect("new_empty failed") |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_item_sequences() { |
| let fs = test_filesystem().await; |
| let object1; |
| let object2; |
| let object3; |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| let store = fs.root_store(); |
| object1 = Arc::new( |
| ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None) |
| .await |
| .expect("create_object failed"), |
| ); |
| transaction.commit().await.expect("commit failed"); |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| object2 = Arc::new( |
| ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None) |
| .await |
| .expect("create_object failed"), |
| ); |
| transaction.commit().await.expect("commit failed"); |
| |
| fs.sync(SyncOptions::default()).await.expect("sync failed"); |
| |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| object3 = Arc::new( |
| ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None) |
| .await |
| .expect("create_object failed"), |
| ); |
| transaction.commit().await.expect("commit failed"); |
| |
| let layer_set = store.tree.layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed"); |
| let mut sequences = [0u64; 3]; |
| while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() { |
| if *object_id == object1.object_id() { |
| sequences[0] = sequence; |
| } else if *object_id == object2.object_id() { |
| sequences[1] = sequence; |
| } else if *object_id == object3.object_id() { |
| sequences[2] = sequence; |
| } |
| iter.advance().await.expect("advance failed"); |
| } |
| |
| assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences); |
| // The last item came after a sync, so should be strictly greater. |
| assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences); |
| fs.close().await.expect("Close failed"); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_create_and_open_store() { |
| let fs = test_filesystem().await; |
| let store_id = { |
| let root_volume = root_volume(&fs).await.expect("root_volume failed"); |
| root_volume |
| .new_volume("test", Some(Arc::new(InsecureCrypt::new()))) |
| .await |
| .expect("new_volume failed") |
| .store_object_id() |
| }; |
| |
| fs.close().await.expect("close failed"); |
| let device = fs.take_device().await; |
| device.reopen(); |
| let fs = FxFilesystem::open(device).await.expect("open failed"); |
| |
| { |
| let store = fs.object_manager().store(store_id).expect("store not found"); |
| store.unlock(Arc::new(InsecureCrypt::new())).await.expect("unlock failed"); |
| } |
| fs.close().await.expect("Close failed"); |
| } |
| |
| #[fasync::run(10, test)] |
| async fn test_old_layers_are_purged() { |
| let fs = test_filesystem().await; |
| |
| let store = fs.root_store(); |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| let object = Arc::new( |
| ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None) |
| .await |
| .expect("create_object failed"), |
| ); |
| transaction.commit().await.expect("commit failed"); |
| |
| store.flush().await.expect("flush failed"); |
| |
| let mut buf = object.allocate_buffer(5); |
| buf.as_mut_slice().copy_from_slice(b"hello"); |
| object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed"); |
| |
| // Getting the layer-set should cause the flush to stall. |
| let layer_set = store.tree().layer_set(); |
| |
| let done = Mutex::new(false); |
| let mut object_id = 0; |
| |
| join!( |
| async { |
| store.flush().await.expect("flush failed"); |
| assert!(*done.lock().unwrap()); |
| }, |
| async { |
| // This is a halting problem so all we can do is sleep. |
| fasync::Timer::new(Duration::from_secs(1)).await; |
| *done.lock().unwrap() = true; |
| object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id(); |
| std::mem::drop(layer_set); |
| } |
| ); |
| |
| if let Err(e) = ObjectStore::open_object( |
| &store.parent_store.as_ref().unwrap(), |
| object_id, |
| HandleOptions::default(), |
| store.crypt().as_deref(), |
| ) |
| .await |
| { |
| assert!(FxfsError::NotFound.matches(&e)); |
| } else { |
| panic!("open_object succeeded"); |
| } |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_tombstone_deletes_data() { |
| let fs = test_filesystem().await; |
| let root_store = fs.root_store(); |
| let child_id = { |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| let child = ObjectStore::create_object( |
| &root_store, |
| &mut transaction, |
| HandleOptions::default(), |
| None, |
| ) |
| .await |
| .expect("create_object failed"); |
| transaction.commit().await.expect("commit failed"); |
| |
| // Allocate an extent in the file. |
| let mut buffer = child.allocate_buffer(8192); |
| buffer.as_mut_slice().fill(0xaa); |
| child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed"); |
| |
| child.object_id() |
| }; |
| |
| root_store.tombstone(child_id, Options::default()).await.expect("tombstone failed"); |
| |
| let layers = root_store.tree.layer_set(); |
| let search_key = ExtentKey::new(0..8192).search_key(); |
| let mut merger = layers.merger(); |
| let mut iter = merger |
| .seek(Bound::Included(&ObjectKey::extent(child_id, 0, search_key.range))) |
| .await |
| .expect("seek failed"); |
| assert_matches!( |
| iter.get(), |
| Some(ItemRef { value: ObjectValue::Extent(ExtentValue::None), .. }) |
| ); |
| iter.advance().await.expect("advance failed"); |
| assert_matches!(iter.get(), None); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_major_compaction_discards_unnecessary_records() { |
| let fs = test_filesystem().await; |
| let root_store = fs.root_store(); |
| let child_id = { |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| let child = ObjectStore::create_object( |
| &root_store, |
| &mut transaction, |
| HandleOptions::default(), |
| None, |
| ) |
| .await |
| .expect("create_object failed"); |
| transaction.commit().await.expect("commit failed"); |
| |
| // Allocate an extent in the file. |
| let mut buffer = child.allocate_buffer(8192); |
| buffer.as_mut_slice().fill(0xaa); |
| child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed"); |
| |
| child.object_id() |
| }; |
| |
| let has_deleted_extent_records = |root_store: Arc<ObjectStore>, child_id| async move { |
| let layers = root_store.tree.layer_set(); |
| let search_key = ExtentKey::new(0..1).search_key(); |
| let mut merger = layers.merger(); |
| let mut iter = merger |
| .seek(Bound::Included(&ObjectKey::extent(child_id, 0, search_key.range))) |
| .await |
| .expect("seek failed"); |
| loop { |
| match iter.get() { |
| None => return false, |
| Some(ItemRef { |
| key: |
| ObjectKey { |
| object_id, |
| data: |
| ObjectKeyData::Attribute(0, AttributeKey::Extent(ExtentKey { .. })), |
| }, |
| value: ObjectValue::Extent(ExtentValue::None), |
| .. |
| }) if *object_id == child_id => return true, |
| _ => {} |
| } |
| iter.advance().await.expect("advance failed"); |
| } |
| }; |
| |
| root_store.tombstone(child_id, Options::default()).await.expect("tombstone failed"); |
| assert_matches!( |
| root_store.tree.find(&ObjectKey::object(child_id)).await.expect("find failed"), |
| Some(Item { value: ObjectValue::None, .. }) |
| ); |
| assert!(has_deleted_extent_records(root_store.clone(), child_id).await); |
| |
| root_store.flush().await.expect("flush failed"); |
| assert_matches!( |
| root_store.tree.find(&ObjectKey::object(child_id)).await.expect("find failed"), |
| None |
| ); |
| assert!(!has_deleted_extent_records(root_store.clone(), child_id).await); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_overlapping_extents_in_different_layers() { |
| let fs = test_filesystem().await; |
| let store = fs.root_store(); |
| let root_directory = |
| Directory::open(&store, store.root_directory_object_id()).await.expect("open failed"); |
| |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| let object = root_directory |
| .create_child_file(&mut transaction, "test") |
| .await |
| .expect("create_child_file failed"); |
| transaction.commit().await.expect("commit failed"); |
| |
| let buf = object.allocate_buffer(16384); |
| object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed"); |
| |
| store.flush().await.expect("flush failed"); |
| |
| object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed"); |
| |
| // At this point, we should have an extent for 0..16384 in a layer that has been flushed, |
| // and an extent for 0..4096 that partially overwrites it. Writing to 0..16384 should |
| // overwrite both of those extents. |
| object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed"); |
| |
| fsck(&fs, None).await.expect("fsck failed"); |
| } |
| |
| #[fasync::run(10, test)] |
| async fn test_encrypted_mutations() { |
| async fn one_iteration( |
| fs: OpenFxFilesystem, |
| crypt: Arc<dyn Crypt>, |
| iteration: u64, |
| ) -> OpenFxFilesystem { |
| async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem { |
| fs.close().await.expect("Close failed"); |
| let device = fs.take_device().await; |
| device.reopen(); |
| FxFilesystem::open(device).await.expect("FS open failed") |
| } |
| |
| let fs = reopen(fs).await; |
| |
| let (store_object_id, object_id) = { |
| let root_volume = root_volume(&fs).await.expect("root_volume failed"); |
| let store = |
| root_volume.volume("test", Some(crypt.clone())).await.expect("volume failed"); |
| let root_directory = Directory::open(&store, store.root_directory_object_id()) |
| .await |
| .expect("open failed"); |
| |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| let object = root_directory |
| .create_child_file(&mut transaction, &format!("test {}", iteration)) |
| .await |
| .expect("create_child_file failed"); |
| transaction.commit().await.expect("commit failed"); |
| |
| let mut buf = object.allocate_buffer(1000); |
| for i in 0..buf.len() { |
| buf.as_mut_slice()[i] = i as u8; |
| } |
| object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed"); |
| |
| (store.store_object_id(), object.object_id()) |
| }; |
| |
| let fs = reopen(fs).await; |
| |
| let check_object = |fs| { |
| let crypt = crypt.clone(); |
| async move { |
| let root_volume = root_volume(&fs).await.expect("root_volume failed"); |
| let volume = |
| root_volume.volume("test", Some(crypt)).await.expect("volume failed"); |
| |
| let object = ObjectStore::open_object( |
| &volume, |
| object_id, |
| HandleOptions::default(), |
| None, |
| ) |
| .await |
| .expect("open_object failed"); |
| let mut buf = object.allocate_buffer(1000); |
| assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000); |
| for i in 0..buf.len() { |
| assert_eq!(buf.as_slice()[i], i as u8); |
| } |
| } |
| }; |
| |
| check_object(fs.clone()).await; |
| |
| let fs = reopen(fs).await; |
| |
| // At this point the "test" volume is locked. Before checking the object, flush the |
| // filesystem. This should leave a file with encrypted mutations. |
| fs.object_manager().flush().await.expect("flush failed"); |
| |
| assert_ne!( |
| fs.object_manager() |
| .store(store_object_id) |
| .unwrap() |
| .store_info() |
| .encrypted_mutations_object_id, |
| INVALID_OBJECT_ID |
| ); |
| |
| check_object(fs.clone()).await; |
| |
| // Checking the object should have triggered a flush and so now there should be no |
| // encrypted mutations object. |
| assert_eq!( |
| fs.object_manager() |
| .store(store_object_id) |
| .unwrap() |
| .store_info() |
| .encrypted_mutations_object_id, |
| INVALID_OBJECT_ID |
| ); |
| |
| let fs = reopen(fs).await; |
| |
| fsck(&fs, Some(crypt.clone())).await.expect("fsck failed"); |
| |
| let fs = reopen(fs).await; |
| |
| check_object(fs.clone()).await; |
| |
| fs |
| } |
| |
| let mut fs = test_filesystem().await; |
| let crypt = Arc::new(InsecureCrypt::new()); |
| |
| { |
| let root_volume = root_volume(&fs).await.expect("root_volume failed"); |
| let _store = root_volume |
| .new_volume("test", Some(crypt.clone())) |
| .await |
| .expect("new_volume failed"); |
| } |
| |
| // Run a few iterations so that we test changes with the stream cipher offset. |
| for i in 0..5 { |
| fs = one_iteration(fs, crypt.clone(), i).await; |
| } |
| } |
| |
| #[fasync::run(10, test)] |
| async fn test_object_id_cipher_roll() { |
| let fs = test_filesystem().await; |
| let crypt = Arc::new(InsecureCrypt::new()); |
| |
| { |
| let root_volume = root_volume(&fs).await.expect("root_volume failed"); |
| let store = root_volume |
| .new_volume("test", Some(crypt.clone())) |
| .await |
| .expect("new_volume failed"); |
| |
| let store_info = store.store_info(); |
| |
| // Hack the last object ID to force a roll of the object ID cipher. |
| { |
| let mut last_object_id = store.last_object_id.lock().unwrap(); |
| assert_eq!(last_object_id.id & OBJECT_ID_HI_MASK, 1u64 << 32); |
| last_object_id.id |= 0xffffffff; |
| } |
| |
| let root_directory = Directory::open(&store, store.root_directory_object_id()) |
| .await |
| .expect("open failed"); |
| |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| let object = root_directory |
| .create_child_file(&mut transaction, "test") |
| .await |
| .expect("create_child_file failed"); |
| transaction.commit().await.expect("commit failed"); |
| |
| assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 2u64 << 32); |
| |
| // Check that the key has been changed. |
| assert_ne!(store.store_info().object_id_key, store_info.object_id_key); |
| |
| assert_eq!(store.last_object_id.lock().unwrap().id, 2u64 << 32); |
| }; |
| |
| fs.close().await.expect("Close failed"); |
| let device = fs.take_device().await; |
| device.reopen(); |
| let fs = FxFilesystem::open(device).await.expect("open failed"); |
| let root_volume = root_volume(&fs).await.expect("root_volume failed"); |
| let store = root_volume.volume("test", Some(crypt.clone())).await.expect("volume failed"); |
| |
| assert_eq!(store.last_object_id.lock().unwrap().id, 2u64 << 32); |
| } |
| |
| #[fasync::run(10, test)] |
| async fn test_lock_store() { |
| let fs = test_filesystem().await; |
| let crypt = Arc::new(InsecureCrypt::new()); |
| |
| let root_volume = root_volume(&fs).await.expect("root_volume failed"); |
| let store = |
| root_volume.new_volume("test", Some(crypt.clone())).await.expect("new_volume failed"); |
| let root_directory = |
| Directory::open(&store, store.root_directory_object_id()).await.expect("open failed"); |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| root_directory |
| .create_child_file(&mut transaction, "test") |
| .await |
| .expect("create_child_file failed"); |
| transaction.commit().await.expect("commit failed"); |
| store.lock().await.expect("lock failed"); |
| |
| store.unlock(crypt).await.expect("unlock failed"); |
| root_directory.lookup("test").await.expect("lookup failed").expect("not found"); |
| } |
| } |