blob: f4f434505f24800aaa44b36042754740ad3190c3 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
pub mod allocator;
pub mod caching_object_handle;
mod data_object_handle;
pub mod directory;
mod extent_record;
mod flush;
pub mod graveyard;
pub mod journal;
mod key_manager;
mod merge;
pub mod object_manager;
pub mod object_record;
pub mod project_id;
mod store_object_handle;
pub mod transaction;
mod tree;
mod tree_cache;
pub mod volume;
pub use data_object_handle::{DataObjectHandle, DirectWriter, FsverityState, FsverityStateInner};
pub use directory::Directory;
pub use object_record::{ChildValue, ObjectDescriptor, PosixAttributes, Timestamp};
pub use store_object_handle::{
SetExtendedAttributeMode, StoreObjectHandle, EXTENDED_ATTRIBUTE_RANGE_END,
EXTENDED_ATTRIBUTE_RANGE_START,
};
use {
crate::{
errors::FxfsError,
filesystem::{
ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions, MAX_FILE_SIZE,
},
log::*,
lsm_tree::{
cache::{NullCache, ObjectCache},
types::{Item, ItemRef, LayerIterator},
LSMTree,
},
object_handle::{ObjectHandle, ReadObjectHandle, INVALID_OBJECT_ID},
object_store::{
allocator::Allocator,
graveyard::Graveyard,
journal::{JournalCheckpoint, JournaledTransaction},
key_manager::KeyManager,
transaction::{
lock_keys, AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation,
Options, Transaction,
},
},
range::RangeExt,
round::round_up,
serialized_types::{migrate_to_version, Migrate, Version, Versioned, VersionedLatest},
},
anyhow::{anyhow, bail, ensure, Context, Error},
assert_matches::assert_matches,
async_trait::async_trait,
fidl_fuchsia_io as fio,
fprint::TypeFingerprint,
fuchsia_inspect::ArrayProperty,
fxfs_crypto::{
ff1::Ff1, Crypt, KeyPurpose, StreamCipher, WrappedKey, WrappedKeyV32, WrappedKeys,
},
once_cell::sync::OnceCell,
scopeguard::ScopeGuard,
serde::{Deserialize, Serialize},
std::{
collections::VecDeque,
fmt,
ops::Bound,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, OnceLock, Weak,
},
},
storage_device::Device,
uuid::Uuid,
};
pub use extent_record::{
ExtentKey, ExtentValue, BLOB_MERKLE_ATTRIBUTE_ID, DEFAULT_DATA_ATTRIBUTE_ID,
FSVERITY_MERKLE_ATTRIBUTE_ID,
};
pub use object_record::{
AttributeKey, EncryptionKeys, ExtendedAttributeValue, FsverityMetadata, ObjectAttributes,
ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, ProjectProperty, RootDigest,
};
pub use transaction::Mutation;
// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
// attacks more difficult. This mask can be used to extract the hi part of the object ID.
const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
/// back to an ObjectStore.
pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
/// StoreInfo stores information about the object store. This is stored within the parent object
/// store, and is used, for example, to get the persistent layer objects.
pub type StoreInfo = StoreInfoV36;
#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
pub struct StoreInfoV36 {
/// The globally unique identifier for the associated object store. If unset, will be all zero.
guid: [u8; 16],
/// The last used object ID. Note that this field is not accurate in memory; ObjectStore's
/// last_object_id field is the one to use in that case. Technically, this might not be the
/// last object ID used for the latest transaction that created an object because we use this at
/// the point of creating the object but before we commit the transaction. Transactions can
/// then get committed in an arbitrary order (or not at all).
last_object_id: u64,
/// Object ids for layers. TODO(https://fxbug.dev/42178036): need a layer of indirection here so we can
/// support snapshots.
pub layers: Vec<u64>,
/// The object ID for the root directory.
root_directory_object_id: u64,
/// The object ID for the graveyard.
graveyard_directory_object_id: u64,
/// The number of live objects in the store.
object_count: u64,
/// The (wrapped) key that encrypted mutations should use.
mutations_key: Option<WrappedKeyV32>,
/// Mutations for the store are encrypted using a stream cipher. To decrypt the mutations, we
/// need to know the offset in the cipher stream to start it.
mutations_cipher_offset: u64,
/// If we have to flush the store whilst we do not have the key, we need to write the encrypted
/// mutations to an object. This is the object ID of that file if it exists.
pub encrypted_mutations_object_id: u64,
/// Object IDs are encrypted to reduce the amount of information that sequential object IDs
/// reveal (such as the number of files in the system and the ordering of their creation in
/// time). Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits will
/// increment after 2^32 object IDs have been used and this allows us to roll the key.
object_id_key: Option<WrappedKeyV32>,
/// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
/// when the directory doesn't yet exist.
internal_directory_object_id: u64,
}
#[derive(Clone, Debug, Default, Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
#[migrate_to_version(StoreInfoV36)]
pub struct StoreInfoV32 {
guid: [u8; 16],
last_object_id: u64,
pub layers: Vec<u64>,
root_directory_object_id: u64,
graveyard_directory_object_id: u64,
object_count: u64,
mutations_key: Option<WrappedKeyV32>,
mutations_cipher_offset: u64,
pub encrypted_mutations_object_id: u64,
object_id_key: Option<WrappedKeyV32>,
}
impl StoreInfo {
/// Create a new/default [`StoreInfo`] but with a newly generated GUID.
fn new_with_guid() -> Self {
let guid = Uuid::new_v4();
Self { guid: *guid.as_bytes(), ..Default::default() }
}
/// Returns the parent objects for this store.
pub fn parent_objects(&self) -> Vec<u64> {
// We should not include the ID of the store itself, since that should be referred to in the
// volume directory.
let mut objects = self.layers.to_vec();
if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
objects.push(self.encrypted_mutations_object_id);
}
objects
}
}
// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
// It will likely involve placing limits on the maximum number of layers.
pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
// in the journal but hasn't yet been written to layer files) for a store. We set a limit because
// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
#[derive(Default)]
pub struct HandleOptions {
/// If true, transactions used by this handle will skip journal space checks.
pub skip_journal_checks: bool,
/// If true, data written to any attribute of this handle will not have per-block checksums
/// computed.
pub skip_checksums: bool,
}
#[derive(Default)]
pub struct NewChildStoreOptions {
/// The store is unencrypted if store is none.
pub crypt: Option<Arc<dyn Crypt>>,
/// Specifies the object ID in the root store to be used for the store. If set to
/// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
pub object_id: u64,
}
pub type EncryptedMutations = EncryptedMutationsV32;
#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
pub struct EncryptedMutationsV32 {
// Information about the mutations are held here, but the actual encrypted data is held within
// data. For each transaction, we record the checkpoint and the count of mutations within the
// transaction. The checkpoint is required for the log file offset (which we need to apply the
// mutations), and the version so that we can correctly decode the mutation after it has been
// decrypted. The count specifies the number of serialized mutations encoded in |data|.
transactions: Vec<(JournalCheckpoint, u64)>,
// The encrypted mutations.
data: Vec<u8>,
// If the mutations key was rolled, this holds the offset in `data` where the new key should
// apply.
mutations_key_roll: Vec<(usize, WrappedKey)>,
}
impl std::fmt::Debug for EncryptedMutations {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("EncryptedMutations")
.field("transactions", &self.transactions)
.field("len", &self.data.len())
.field(
"mutations_key_roll",
&self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
)
.finish()
}
}
impl Versioned for EncryptedMutations {
fn max_serialized_size() -> u64 {
MAX_ENCRYPTED_MUTATIONS_SIZE as u64
}
}
impl EncryptedMutations {
fn from_replayed_mutations(
store_object_id: u64,
transactions: Vec<JournaledTransaction>,
) -> Self {
let mut this = Self::default();
for JournaledTransaction { checkpoint, mutations, .. } in transactions {
for (object_id, mutation) in mutations {
if store_object_id == object_id {
if let Mutation::EncryptedObjectStore(data) = mutation {
this.push(&checkpoint, data);
} else if let Mutation::UpdateMutationsKey(key) = mutation {
this.mutations_key_roll.push((this.data.len(), key.into()));
}
}
}
}
this
}
fn extend(&mut self, other: &EncryptedMutations) {
self.transactions.extend_from_slice(&other.transactions[..]);
self.mutations_key_roll.extend(
other
.mutations_key_roll
.iter()
.map(|(offset, key)| (offset + self.data.len(), key.clone())),
);
self.data.extend_from_slice(&other.data[..]);
}
fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
self.data.append(&mut data.into());
// If the checkpoint is the same as the last mutation we pushed, increment the count.
if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
if last_checkpoint.file_offset == checkpoint.file_offset {
*count += 1;
return;
}
}
self.transactions.push((checkpoint.clone(), 1));
}
}
// Whilst we are replaying the store, we need to keep track of changes to StoreInfo that arise from
// mutations in the journal stream that don't include all the fields in StoreInfo. After replay has
// finished, we load the full store information and merge it with the deltas here.
// NOTE: While changing this struct, make sure to also update fsck::Fsck::check_child_store if
// needed, which currently doesn't bother replaying this information.
#[derive(Debug, Default)]
struct ReplayInfo {
object_count_delta: i64,
internal_directory_object_id: Option<u64>,
}
impl ReplayInfo {
fn new() -> VecDeque<ReplayInfo> {
let mut info = VecDeque::new();
info.push_back(ReplayInfo::default());
info
}
}
#[derive(Debug)]
enum StoreOrReplayInfo {
Info(StoreInfo),
// When we flush a store, we take a snapshot of store information when we begin flushing, and
// that snapshot gets committed when we end flushing. In the intervening period, we need to
// record any changes made to store information. If during replay we don't get around to ending
// the flush, we need to hang on to the deltas that were applied before we started flushing.
// This is why the information is stored in a VecDeque. The frontmost element is always the
// most recent.
Replay(VecDeque<ReplayInfo>),
// Used when a store is locked.
Locked,
}
impl StoreOrReplayInfo {
fn info(&self) -> Option<&StoreInfo> {
match self {
StoreOrReplayInfo::Info(info) => Some(info),
_ => None,
}
}
fn info_mut(&mut self) -> Option<&mut StoreInfo> {
match self {
StoreOrReplayInfo::Info(info) => Some(info),
_ => None,
}
}
fn replay_info_mut(&mut self) -> Option<&mut VecDeque<ReplayInfo>> {
match self {
StoreOrReplayInfo::Replay(info) => Some(info),
_ => None,
}
}
fn adjust_object_count(&mut self, delta: i64) {
match self {
StoreOrReplayInfo::Info(StoreInfo { object_count, .. }) => {
if delta < 0 {
*object_count = object_count.saturating_sub(-delta as u64);
} else {
*object_count = object_count.saturating_add(delta as u64);
}
}
StoreOrReplayInfo::Replay(replay_info) => {
replay_info.front_mut().unwrap().object_count_delta += delta;
}
StoreOrReplayInfo::Locked => unreachable!(),
}
}
fn set_internal_dir(&mut self, dir_id: u64) {
match self {
StoreOrReplayInfo::Info(StoreInfo { internal_directory_object_id, .. }) => {
*internal_directory_object_id = dir_id;
}
StoreOrReplayInfo::Replay(replay_info) => {
replay_info.front_mut().unwrap().internal_directory_object_id = Some(dir_id);
}
StoreOrReplayInfo::Locked => unreachable!(),
}
}
fn begin_flush(&mut self) {
if let StoreOrReplayInfo::Replay(replay_info) = self {
// Push a new record on the front keeping the old one. We'll remove the old one when
// `end_flush` is called.
replay_info.push_front(ReplayInfo::default());
}
}
fn end_flush(&mut self) {
if let StoreOrReplayInfo::Replay(replay_info) = self {
replay_info.truncate(1);
}
}
}
pub enum LockState {
Locked,
Unencrypted,
Unlocked(Arc<dyn Crypt>),
// The store is unlocked, but in a read-only state, and no flushes or other operations will be
// performed on the store.
UnlockedReadOnly(Arc<dyn Crypt>),
// The store is encrypted but is now in an unusable state (due to a failure to sync the journal
// after locking the store). The store cannot be unlocked.
Invalid,
// Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
// This can happen when lazily opening stores (ObjectManager::lazy_open_store).
Unknown,
// The store is in the process of being locked. Whilst the store is being locked, the store
// isn't usable; assertions will trip if any mutations are applied.
Locking,
// Whilst we're unlocking, we will replay encrypted mutations. The store isn't usable until
// it's in the Unlocked state.
Unlocking,
}
impl fmt::Debug for LockState {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str(match self {
LockState::Locked => "Locked",
LockState::Unencrypted => "Unencrypted",
LockState::Unlocked(_) => "Unlocked",
LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
LockState::Invalid => "Invalid",
LockState::Unknown => "Unknown",
LockState::Locking => "Locking",
LockState::Unlocking => "Unlocking",
})
}
}
#[derive(Default)]
struct LastObjectId {
// The *unencrypted* value of the last object ID.
id: u64,
// Encrypted stores will use a cipher to obfuscate the object ID.
cipher: Option<Ff1>,
}
impl LastObjectId {
// Returns true if a cipher is needed to generate new object IDs.
fn should_create_cipher(&self) -> bool {
self.id as u32 == u32::MAX
}
fn get_next_object_id(&mut self) -> u64 {
self.id += 1;
if let Some(cipher) = &self.cipher {
let hi = self.id & OBJECT_ID_HI_MASK;
assert_ne!(hi, INVALID_OBJECT_ID);
assert_ne!(self.id as u32, 0); // This would indicate the ID wrapped.
hi | cipher.encrypt(self.id as u32) as u64
} else {
self.id
}
}
}
/// An object store supports a file like interface for objects. Objects are keyed by a 64 bit
/// identifier. And object store has to be backed by a parent object store (which stores metadata
/// for the object store). The top-level object store (a.k.a. the root parent object store) is
/// in-memory only.
pub struct ObjectStore {
parent_store: Option<Arc<ObjectStore>>,
store_object_id: u64,
device: Arc<dyn Device>,
block_size: u64,
filesystem: Weak<FxFilesystem>,
// Lock ordering: This must be taken before `lock_state`.
store_info: Mutex<StoreOrReplayInfo>,
tree: LSMTree<ObjectKey, ObjectValue>,
// When replaying the journal, the store cannot read StoreInfo until the whole journal
// has been replayed, so during that time, store_info_handle will be None and records
// just get sent to the tree. Once the journal has been replayed, we can open the store
// and load all the other layer information.
store_info_handle: OnceCell<DataObjectHandle<ObjectStore>>,
// The cipher to use for encrypted mutations, if this store is encrypted.
mutations_cipher: Mutex<Option<StreamCipher>>,
// Current lock state of the store.
// Lock ordering: This must be taken after `store_info`.
lock_state: Mutex<LockState>,
key_manager: KeyManager,
// Enable/disable tracing.
trace: AtomicBool,
// Informational counters for events occurring within the store.
counters: Mutex<ObjectStoreCounters>,
// These are updated in performance-sensitive code paths so we use atomics instead of counters.
device_read_ops: AtomicU64,
device_write_ops: AtomicU64,
logical_read_ops: AtomicU64,
logical_write_ops: AtomicU64,
// Contains the last object ID and, optionally, a cipher to be used when generating new object
// IDs.
last_object_id: Mutex<LastObjectId>,
// An optional callback to be invoked each time the ObjectStore flushes. The callback is
// invoked at the end of flush, while the write lock is still held.
flush_callback: OnceLock<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>,
}
#[derive(Clone, Default)]
struct ObjectStoreCounters {
mutations_applied: u64,
mutations_dropped: u64,
num_flushes: u64,
last_flush_time: Option<std::time::SystemTime>,
persistent_layer_file_sizes: Vec<u64>,
}
impl ObjectStore {
fn new(
parent_store: Option<Arc<ObjectStore>>,
store_object_id: u64,
filesystem: Arc<FxFilesystem>,
store_info: Option<StoreInfo>,
object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
mutations_cipher: Option<StreamCipher>,
lock_state: LockState,
last_object_id: LastObjectId,
) -> Arc<ObjectStore> {
let device = filesystem.device();
let block_size = filesystem.block_size();
Arc::new(ObjectStore {
parent_store,
store_object_id,
device,
block_size,
filesystem: Arc::downgrade(&filesystem),
store_info: Mutex::new(match store_info {
Some(info) => StoreOrReplayInfo::Info(info),
None => StoreOrReplayInfo::Replay(ReplayInfo::new()),
}),
tree: LSMTree::new(merge::merge, object_cache),
store_info_handle: OnceCell::new(),
mutations_cipher: Mutex::new(mutations_cipher),
lock_state: Mutex::new(lock_state),
key_manager: KeyManager::new(),
trace: AtomicBool::new(false),
counters: Mutex::new(ObjectStoreCounters::default()),
device_read_ops: AtomicU64::new(0),
device_write_ops: AtomicU64::new(0),
logical_read_ops: AtomicU64::new(0),
logical_write_ops: AtomicU64::new(0),
last_object_id: Mutex::new(last_object_id),
flush_callback: OnceLock::new(),
})
}
fn new_empty(
parent_store: Option<Arc<ObjectStore>>,
store_object_id: u64,
filesystem: Arc<FxFilesystem>,
object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
) -> Arc<Self> {
Self::new(
parent_store,
store_object_id,
filesystem,
Some(StoreInfo::default()),
object_cache,
None,
LockState::Unencrypted,
LastObjectId::default(),
)
}
/// Cycle breaker constructor that returns an ObjectStore without a filesystem.
/// This should only be used from super block code.
pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
ObjectStore {
parent_store: None,
store_object_id,
device,
block_size,
filesystem: Weak::<FxFilesystem>::new(),
store_info: Mutex::new(StoreOrReplayInfo::Info(StoreInfo::default())),
tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
store_info_handle: OnceCell::new(),
mutations_cipher: Mutex::new(None),
lock_state: Mutex::new(LockState::Unencrypted),
key_manager: KeyManager::new(),
trace: AtomicBool::new(false),
counters: Mutex::new(ObjectStoreCounters::default()),
device_read_ops: AtomicU64::new(0),
device_write_ops: AtomicU64::new(0),
logical_read_ops: AtomicU64::new(0),
logical_write_ops: AtomicU64::new(0),
last_object_id: Mutex::new(LastObjectId::default()),
flush_callback: OnceLock::new(),
}
}
/// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
/// been created.
pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
this.filesystem = Arc::downgrade(&filesystem);
this
}
/// Create a child store. It is a multi-step process:
///
/// 1. Call `ObjectStore::new_child_store`.
/// 2. Register the store with the object-manager.
/// 3. Call `ObjectStore::create` to write the store-info.
///
/// If the procedure fails, care must be taken to unregister store with the object-manager.
///
/// The steps have to be separate because of lifetime issues when working with a transaction.
async fn new_child_store(
self: &Arc<Self>,
transaction: &mut Transaction<'_>,
options: NewChildStoreOptions,
object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
) -> Result<Arc<Self>, Error> {
let handle = if options.object_id != INVALID_OBJECT_ID {
ObjectStore::create_object_with_id(
self,
transaction,
options.object_id,
HandleOptions::default(),
None,
None,
)
.await?
} else {
ObjectStore::create_object(self, transaction, HandleOptions::default(), None, None)
.await?
};
let filesystem = self.filesystem();
let store = if let Some(crypt) = options.crypt {
let (wrapped_key, unwrapped_key) =
crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
let (object_id_wrapped, object_id_unwrapped) =
crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
Self::new(
Some(self.clone()),
handle.object_id(),
filesystem.clone(),
Some(StoreInfo {
mutations_key: Some(wrapped_key),
object_id_key: Some(object_id_wrapped),
..StoreInfo::new_with_guid()
}),
object_cache,
Some(StreamCipher::new(&unwrapped_key, 0)),
LockState::Unlocked(crypt),
LastObjectId {
// We need to avoid accidentally getting INVALID_OBJECT_ID, so we set
// the top 32 bits to a non-zero value.
id: 1 << 32,
cipher: Some(Ff1::new(&object_id_unwrapped)),
},
)
} else {
Self::new(
Some(self.clone()),
handle.object_id(),
filesystem.clone(),
Some(StoreInfo::new_with_guid()),
object_cache,
None,
LockState::Unencrypted,
LastObjectId::default(),
)
};
assert!(store.store_info_handle.set(handle).is_ok());
Ok(store)
}
/// Actually creates the store in a transaction. This will also create a root directory and
/// graveyard directory for the store. See `new_child_store` above.
async fn create<'a>(
self: &'a Arc<Self>,
transaction: &mut Transaction<'a>,
) -> Result<(), Error> {
let buf = {
// Create a root directory and graveyard directory.
let graveyard_directory_object_id = Graveyard::create(transaction, &self);
let root_directory = Directory::create(transaction, &self, Default::default()).await?;
let serialized_info = {
let mut store_info = self.store_info.lock().unwrap();
let store_info = store_info.info_mut().unwrap();
store_info.graveyard_directory_object_id = graveyard_directory_object_id;
store_info.root_directory_object_id = root_directory.object_id();
let mut serialized_info = Vec::new();
store_info.serialize_with_version(&mut serialized_info)?;
serialized_info
};
let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
buf
};
self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
}
pub fn set_trace(&self, trace: bool) {
let old_value = self.trace.swap(trace, Ordering::Relaxed);
if trace != old_value {
info!(store_id = self.store_object_id(), trace, "OS: trace",);
}
}
/// Sets a callback to be invoked each time the ObjectStore flushes. The callback is invoked at
/// the end of flush, while the write lock is still held.
/// Note that this can only be set once per ObjectStore; repeated calls will panic.
pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
self.flush_callback.set(Box::new(callback)).ok().unwrap();
}
pub fn is_root(&self) -> bool {
if let Some(parent) = &self.parent_store {
parent.parent_store.is_none()
} else {
// The root parent store isn't the root store.
false
}
}
/// Populates an inspect node with store statistics.
pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
// TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
let counters = self.counters.lock().unwrap();
if let Some(store_info) = self.store_info() {
root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
} else {
warn!("Can't access store_info; store is locked.");
};
root.record_uint("store_object_id", self.store_object_id);
root.record_uint("mutations_applied", counters.mutations_applied);
root.record_uint("mutations_dropped", counters.mutations_dropped);
root.record_uint("num_flushes", counters.num_flushes);
if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
root.record_uint(
"last_flush_time_ms",
last_flush_time
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(std::time::Duration::ZERO)
.as_millis()
.try_into()
.unwrap_or(0u64),
);
}
let sizes = root.create_uint_array(
"persistent_layer_file_sizes",
counters.persistent_layer_file_sizes.len(),
);
for i in 0..counters.persistent_layer_file_sizes.len() {
sizes.set(i, counters.persistent_layer_file_sizes[i]);
}
root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
root.record(sizes);
}
pub fn device(&self) -> &Arc<dyn Device> {
&self.device
}
pub fn block_size(&self) -> u64 {
self.block_size
}
pub fn filesystem(&self) -> Arc<FxFilesystem> {
self.filesystem.upgrade().unwrap()
}
pub fn store_object_id(&self) -> u64 {
self.store_object_id
}
pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
&self.tree
}
pub fn root_directory_object_id(&self) -> u64 {
self.store_info.lock().unwrap().info().unwrap().root_directory_object_id
}
pub fn graveyard_directory_object_id(&self) -> u64 {
self.store_info.lock().unwrap().info().unwrap().graveyard_directory_object_id
}
fn set_graveyard_directory_object_id(&self, oid: u64) {
assert_eq!(
std::mem::replace(
&mut self
.store_info
.lock()
.unwrap()
.info_mut()
.unwrap()
.graveyard_directory_object_id,
oid
),
INVALID_OBJECT_ID
);
}
pub fn object_count(&self) -> u64 {
self.store_info.lock().unwrap().info().unwrap().object_count
}
/// Returns the crypt object for the store. Returns None if the store is unencrypted. This will
/// panic if the store is locked.
pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
match &*self.lock_state.lock().unwrap() {
LockState::Locked => panic!("Store is locked"),
LockState::Invalid
| LockState::Unencrypted
| LockState::Locking
| LockState::Unlocking => None,
LockState::Unlocked(crypt) => Some(crypt.clone()),
LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
LockState::Unknown => {
panic!("Store is of unknown lock state; has the journal been replayed yet?")
}
}
}
pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
// Create the transaction first to use the object store lock.
let mut transaction = self
.filesystem()
.new_transaction(
lock_keys![LockKey::object(
self.parent_store.as_ref().unwrap().store_object_id,
self.store_object_id,
)],
Options::default(),
)
.await?;
let obj_id = self.store_info.lock().unwrap().info().unwrap().internal_directory_object_id;
if obj_id != INVALID_OBJECT_ID {
return Ok(obj_id);
}
// Need to create an internal directory.
let directory = Directory::create(&mut transaction, self, Default::default()).await?;
transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
transaction.commit().await?;
Ok(directory.object_id())
}
/// Returns the file size for the object without opening the object.
async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
let item = self
.tree
.find(&ObjectKey::attribute(
object_id,
DEFAULT_DATA_ATTRIBUTE_ID,
AttributeKey::Attribute,
))
.await?
.ok_or(FxfsError::NotFound)?;
if let ObjectValue::Attribute { size } = item.value {
Ok(size)
} else {
bail!(FxfsError::NotFile);
}
}
/// `crypt` can be provided if the crypt service should be different to the default; see the
/// comment on create_object. Users should avoid having more than one handle open for the same
/// object at the same time because they might get out-of-sync; there is no code that will
/// prevent this. One example where this can cause an issue is if the object ends up using a
/// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
/// dropped when a handle is dropped, which will impact any other handles for the same object.
pub async fn open_object<S: HandleOwner>(
owner: &Arc<S>,
object_id: u64,
options: HandleOptions,
crypt: Option<Arc<dyn Crypt>>,
) -> Result<DataObjectHandle<S>, Error> {
let store = owner.as_ref().as_ref();
let mut fsverity_descriptor = None;
let item = store
.tree
.find(&ObjectKey::attribute(
object_id,
DEFAULT_DATA_ATTRIBUTE_ID,
AttributeKey::Attribute,
))
.await?
.ok_or(FxfsError::NotFound)?;
let size = match item.value {
ObjectValue::Attribute { size } => size,
ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
fsverity_descriptor = Some(fsverity_metadata);
size
}
_ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attribute")),
};
ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
// If a crypt service has been specified, it needs to be a permanent key because cached
// keys can only use the store's crypt service.
let permanent = if let Some(crypt) = crypt {
store
.key_manager
.get_or_insert(
object_id,
crypt,
store.get_keys(object_id),
/* permanent: */ true,
)
.await?;
true
} else {
false
};
let data_object_handle = DataObjectHandle::new(
owner.clone(),
object_id,
permanent,
DEFAULT_DATA_ATTRIBUTE_ID,
size,
FsverityState::None,
options,
false,
);
if let Some(descriptor) = fsverity_descriptor {
match data_object_handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await? {
None => {
return Err(anyhow!(FxfsError::NotFound));
}
Some(data) => {
data_object_handle.set_fsverity_state_some(descriptor, data);
}
}
}
Ok(data_object_handle)
}
// See the comment on create_object for the semantics of the `crypt` argument. If object_id ==
// INVALID_OBJECT_ID (which should usually be the case), an object ID will be chosen.
async fn create_object_with_id<S: HandleOwner>(
owner: &Arc<S>,
transaction: &mut Transaction<'_>,
mut object_id: u64,
options: HandleOptions,
mut crypt: Option<&dyn Crypt>,
create_attributes: Option<&fio::MutableNodeAttributes>,
) -> Result<DataObjectHandle<S>, Error> {
let store = owner.as_ref().as_ref();
if object_id == INVALID_OBJECT_ID {
object_id = store.get_next_object_id(transaction).await?;
} else {
store.update_last_object_id(object_id);
}
let store_crypt;
// See comment for equivalent in open_object.
let permanent = crypt.is_some();
if crypt.is_none() {
store_crypt = store.crypt();
crypt = store_crypt.as_deref();
}
let now = Timestamp::now();
let creation_time = create_attributes
.and_then(|a| a.creation_time)
.map(Timestamp::from_nanos)
.unwrap_or_else(|| now.clone());
let modification_time = create_attributes
.and_then(|a| a.modification_time)
.map(Timestamp::from_nanos)
.unwrap_or_else(|| now.clone());
let access_time = create_attributes
.and_then(|a| a.access_time)
.map(Timestamp::from_nanos)
.unwrap_or_else(|| now.clone());
let change_time = now;
let posix_attributes = create_attributes.and_then(|a| {
(a.mode.is_some() || a.uid.is_some() || a.gid.is_some() || a.rdev.is_some()).then_some(
PosixAttributes {
mode: a.mode.unwrap_or_default(),
uid: a.uid.unwrap_or_default(),
gid: a.gid.unwrap_or_default(),
rdev: a.rdev.unwrap_or_default(),
},
)
});
transaction.add(
store.store_object_id(),
Mutation::insert_object(
ObjectKey::object(object_id),
ObjectValue::file(
1,
0,
creation_time,
modification_time,
access_time,
change_time,
0,
posix_attributes,
),
),
);
if let Some(crypt) = crypt {
let (key, unwrapped_key) = crypt.create_key(object_id, KeyPurpose::Data).await?;
transaction.add(
store.store_object_id(),
Mutation::insert_object(
ObjectKey::keys(object_id),
ObjectValue::keys(EncryptionKeys::AES256XTS(WrappedKeys::from(vec![(0, key)]))),
),
);
store.key_manager.insert(object_id, &vec![(0, unwrapped_key)], permanent);
}
transaction.add(
store.store_object_id(),
Mutation::insert_object(
ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute),
ObjectValue::attribute(0),
),
);
Ok(DataObjectHandle::new(
owner.clone(),
object_id,
permanent,
DEFAULT_DATA_ATTRIBUTE_ID,
0,
FsverityState::None,
options,
false,
))
}
/// There are instances where a store might not be an encrypted store, but the object should
/// still be encrypted. For example, the layer files for child stores should be encrypted using
/// the crypt service of the child store even though the root store doesn't have encryption. If
/// `crypt` is None, the default for the store is used (and prefer to pass None over passing the
/// store's crypt service directly because the latter will end up using a permanent key that
/// isn't purged when inactive).
pub async fn create_object<S: HandleOwner>(
owner: &Arc<S>,
mut transaction: &mut Transaction<'_>,
options: HandleOptions,
crypt: Option<&dyn Crypt>,
create_attributes: Option<&fio::MutableNodeAttributes>,
) -> Result<DataObjectHandle<S>, Error> {
ObjectStore::create_object_with_id(
owner,
&mut transaction,
INVALID_OBJECT_ID,
options,
crypt,
create_attributes,
)
.await
}
/// Adjusts the reference count for a given object. If the reference count reaches zero, the
/// object is moved into the graveyard and true is returned.
pub async fn adjust_refs(
&self,
transaction: &mut Transaction<'_>,
oid: u64,
delta: i64,
) -> Result<bool, Error> {
let mut mutation = self.txn_get_object_mutation(transaction, oid).await?;
let refs = if let ObjectValue::Object {
kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
..
} = &mut mutation.item.value
{
*refs = refs.checked_add_signed(delta).ok_or(anyhow!("refs underflow/overflow"))?;
refs
} else {
bail!(FxfsError::NotFile);
};
if *refs == 0 {
self.add_to_graveyard(transaction, oid);
// We might still need to adjust the reference count if delta was something other than
// -1.
if delta != -1 {
*refs = 1;
transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
}
Ok(true)
} else {
transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
Ok(false)
}
}
// Purges an object that is in the graveyard.
pub async fn tombstone_object(
&self,
object_id: u64,
txn_options: Options<'_>,
) -> Result<(), Error> {
self.trim_or_tombstone(object_id, true, txn_options).await
}
/// Trim extents beyond the end of a file for all attributes. This will remove the entry from
/// the graveyard when done.
pub async fn trim(&self, object_id: u64) -> Result<(), Error> {
// For the root and root parent store, we would need to use the metadata reservation which
// we don't currently support, so assert that we're not those stores.
assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
self.trim_or_tombstone(
object_id,
false,
Options { borrow_metadata_space: true, ..Default::default() },
)
.await
}
async fn trim_or_tombstone(
&self,
object_id: u64,
for_tombstone: bool,
txn_options: Options<'_>,
) -> Result<(), Error> {
let fs = self.filesystem();
let mut next_attribute = Some(0);
while let Some(attribute_id) = next_attribute.take() {
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![
LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
LockKey::object(self.store_object_id, object_id),
],
txn_options,
)
.await?;
match self
.trim_some(
&mut transaction,
object_id,
attribute_id,
if for_tombstone {
TrimMode::Tombstone(TombstoneMode::Object)
} else {
TrimMode::UseSize
},
)
.await?
{
TrimResult::Incomplete => next_attribute = Some(attribute_id),
TrimResult::Done(None) => {
if for_tombstone
|| matches!(
self.tree
.find(&ObjectKey::graveyard_entry(
self.graveyard_directory_object_id(),
object_id,
))
.await?,
Some(Item { value: ObjectValue::Trim, .. })
)
{
self.remove_from_graveyard(&mut transaction, object_id);
}
}
TrimResult::Done(id) => next_attribute = id,
}
if !transaction.mutations().is_empty() {
transaction.commit().await?;
}
}
Ok(())
}
// Purges an object's attribute that is in the graveyard.
pub async fn tombstone_attribute(
&self,
object_id: u64,
attribute_id: u64,
txn_options: Options<'_>,
) -> Result<(), Error> {
let fs = self.filesystem();
let mut trim_result = TrimResult::Incomplete;
while matches!(trim_result, TrimResult::Incomplete) {
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![
LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
LockKey::object(self.store_object_id, object_id),
],
txn_options,
)
.await?;
trim_result = self
.trim_some(
&mut transaction,
object_id,
attribute_id,
TrimMode::Tombstone(TombstoneMode::Attribute),
)
.await?;
if let TrimResult::Done(..) = trim_result {
self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
}
if !transaction.mutations().is_empty() {
transaction.commit().await?;
}
}
Ok(())
}
/// Deletes extents for attribute `attribute_id` in object `object_id`. Also see the comments
/// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
/// performs a read-modify-write on the sizes.
pub async fn trim_some(
&self,
transaction: &mut Transaction<'_>,
object_id: u64,
attribute_id: u64,
mode: TrimMode,
) -> Result<TrimResult, Error> {
let layer_set = self.tree.layer_set();
let mut merger = layer_set.merger();
let aligned_offset = match mode {
TrimMode::FromOffset(offset) => {
round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
}
TrimMode::Tombstone(..) => 0,
TrimMode::UseSize => {
let iter = merger
.seek(Bound::Included(&ObjectKey::attribute(
object_id,
attribute_id,
AttributeKey::Attribute,
)))
.await?;
if let Some(item_ref) = iter.get() {
if item_ref.key.object_id != object_id {
return Ok(TrimResult::Done(None));
}
if let ItemRef {
key:
ObjectKey {
data:
ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
..
},
value: ObjectValue::Attribute { size },
..
} = item_ref
{
// If we found a different attribute_id, return so we can get the
// right lock.
if *size_attribute_id != attribute_id {
return Ok(TrimResult::Done(Some(*size_attribute_id)));
}
round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
} else {
// At time of writing, we should always see a size record or None here, but
// asserting here would be brittle so just skip to the the next attribute
// instead.
return Ok(TrimResult::Done(Some(attribute_id + 1)));
}
} else {
// End of the tree.
return Ok(TrimResult::Done(None));
}
}
};
// Loop over the extents and deallocate them.
let mut iter = merger
.seek(Bound::Included(&ObjectKey::from_extent(
object_id,
attribute_id,
ExtentKey::search_key_from_offset(aligned_offset),
)))
.await?;
let mut end = 0;
let allocator = self.allocator();
let mut result = TrimResult::Done(None);
let mut deallocated = 0;
let block_size = self.block_size;
while let Some(item_ref) = iter.get() {
if item_ref.key.object_id != object_id {
break;
}
if let ObjectKey {
data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
..
} = item_ref.key
{
if *extent_attribute_id != attribute_id {
result = TrimResult::Done(Some(*extent_attribute_id));
break;
}
if let (
AttributeKey::Extent(ExtentKey { range }),
ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
) = (attribute_key, item_ref.value)
{
let start = std::cmp::max(range.start, aligned_offset);
ensure!(start < range.end, FxfsError::Inconsistent);
let device_offset = device_offset
.checked_add(start - range.start)
.ok_or(FxfsError::Inconsistent)?;
end = range.end;
let len = end - start;
let device_range = device_offset..device_offset + len;
ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
allocator.deallocate(transaction, self.store_object_id, device_range).await?;
deallocated += len;
// Stop if the transaction is getting too big.
if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
result = TrimResult::Incomplete;
break;
}
}
}
iter.advance().await?;
}
let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
&& matches!(result, TrimResult::Done(None));
let finished_tombstone_attribute =
matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
&& !matches!(result, TrimResult::Incomplete);
let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
mutation.item.value
{
let nodes = if finished_tombstone_object { -1 } else { 0 };
if project_id != 0 && (deallocated != 0 || nodes != 0) {
transaction.add(
self.store_object_id,
Mutation::merge_object(
ObjectKey::project_usage(self.root_directory_object_id(), project_id),
ObjectValue::BytesAndNodes {
bytes: -i64::try_from(deallocated).unwrap(),
nodes,
},
),
);
}
}
// Deletion marker records *must* be merged so as to consume all other records for the
// object.
if finished_tombstone_object {
transaction.add(
self.store_object_id,
Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
);
} else {
if finished_tombstone_attribute {
transaction.add(
self.store_object_id,
Mutation::merge_object(
ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
ObjectValue::None,
),
);
}
if deallocated > 0 {
transaction.add(
self.store_object_id,
Mutation::merge_object(
ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
ObjectValue::deleted_extent(),
),
);
// Update allocated size.
if let ObjectValue::Object {
attributes: ObjectAttributes { allocated_size, .. },
..
} = &mut mutation.item.value
{
// The only way for these to fail are if the volume is inconsistent.
*allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
})?;
} else {
panic!("Unexpected object value");
}
transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
}
}
Ok(result)
}
/// Returns all objects that exist in the parent store that pertain to this object store.
/// Note that this doesn't include the object_id of the store itself which is generally
/// referenced externally.
pub fn parent_objects(&self) -> Vec<u64> {
assert!(self.store_info_handle.get().is_some());
self.store_info.lock().unwrap().info().unwrap().parent_objects()
}
/// Returns root objects for this store.
pub fn root_objects(&self) -> Vec<u64> {
let mut objects = Vec::new();
let store_info = self.store_info.lock().unwrap();
let info = store_info.info().unwrap();
if info.root_directory_object_id != INVALID_OBJECT_ID {
objects.push(info.root_directory_object_id);
}
if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
objects.push(info.graveyard_directory_object_id);
}
if info.internal_directory_object_id != INVALID_OBJECT_ID {
objects.push(info.internal_directory_object_id);
}
objects
}
pub fn store_info(&self) -> Option<StoreInfo> {
self.store_info.lock().unwrap().info().cloned()
}
/// Returns None if called during journal replay.
pub fn store_info_handle_object_id(&self) -> Option<u64> {
self.store_info_handle.get().map(|h| h.object_id())
}
/// Called when replay for a store has completed.
async fn on_replay_complete(&self) -> Result<(), Error> {
if self.parent_store.is_none() || self.store_info_handle.get().is_some() {
return Ok(());
}
let parent_store = self.parent_store.as_ref().unwrap();
let handle = ObjectStore::open_object(
&parent_store,
self.store_object_id,
HandleOptions::default(),
None,
)
.await?;
let object_tree_layer_object_ids;
let encrypted;
{
let mut info = self.load_store_info().await?;
if info.object_id_key.is_none() {
self.update_last_object_id(info.last_object_id);
}
// Merge the replay information.
let mut store_info = self.store_info.lock().unwrap();
// The frontmost element of the replay information is the most recent so we must apply
// that last.
for replay_info in store_info.replay_info_mut().unwrap().iter_mut().rev() {
if replay_info.object_count_delta < 0 {
info.object_count =
info.object_count.saturating_sub(-replay_info.object_count_delta as u64);
} else {
info.object_count =
info.object_count.saturating_add(replay_info.object_count_delta as u64);
}
if let Some(dir_id) = replay_info.internal_directory_object_id {
info.internal_directory_object_id = dir_id;
}
}
object_tree_layer_object_ids = info.layers.clone();
encrypted = if info.mutations_key.is_some() {
Some(info.encrypted_mutations_object_id)
} else {
None
};
if encrypted.is_some() {
*store_info = StoreOrReplayInfo::Locked;
} else {
*store_info = StoreOrReplayInfo::Info(info);
}
}
if encrypted.is_some() {
*self.lock_state.lock().unwrap() = LockState::Locked;
} else {
*self.lock_state.lock().unwrap() = LockState::Unencrypted;
}
// TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
// If the store is encrypted, we can't open the object tree layers now, but we need to
// compute the size of the layers.
let total_size: u64 = if let Some(encrypted_mutations_object_id) = encrypted {
let mut size = 0;
let parent_store = self.parent_store.as_ref().unwrap();
for oid in object_tree_layer_object_ids.into_iter() {
size += parent_store.get_file_size(oid).await?;
}
if encrypted_mutations_object_id != INVALID_OBJECT_ID {
size += layer_size_from_encrypted_mutations_size(
parent_store.get_file_size(encrypted_mutations_object_id).await?,
);
}
size
} else {
let object_layers = self.open_layers(object_tree_layer_object_ids, None).await?;
let size: u64 = object_layers.iter().map(|h| h.get_size()).sum();
self.tree
.append_layers(object_layers)
.await
.context("Failed to read object store layers")?;
*self.lock_state.lock().unwrap() = LockState::Unencrypted;
size
};
assert!(self.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
self.filesystem().object_manager().update_reservation(
self.store_object_id,
tree::reservation_amount_from_layer_size(total_size),
);
Ok(())
}
async fn load_store_info(&self) -> Result<StoreInfo, Error> {
load_store_info(self.parent_store.as_ref().unwrap(), self.store_object_id).await
}
async fn open_layers(
&self,
object_ids: impl std::iter::IntoIterator<Item = u64>,
crypt: Option<Arc<dyn Crypt>>,
) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
let parent_store = self.parent_store.as_ref().unwrap();
let mut handles = Vec::new();
let mut sizes = Vec::new();
for object_id in object_ids {
let handle = ObjectStore::open_object(
&parent_store,
object_id,
HandleOptions::default(),
crypt.clone(),
)
.await
.with_context(|| format!("Failed to open layer file {}", object_id))?;
sizes.push(handle.get_size());
handles.push(handle);
}
self.counters.lock().unwrap().persistent_layer_file_sizes = sizes;
Ok(handles)
}
/// Unlocks a store so that it is ready to be used.
/// This is not thread-safe.
pub async fn unlock(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
self.unlock_inner(crypt, /*read_only=*/ false).await
}
/// Unlocks a store so that it is ready to be read from.
/// The store will generally behave like it is still locked: when flushed, the store will
/// write out its mutations into the encrypted mutations file, rather than directly updating
/// the layer files of the object store.
/// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
/// flush, although the store might still be flushed during other operations.
/// This is not thread-safe.
pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
self.unlock_inner(crypt, /*read_only=*/ true).await
}
async fn unlock_inner(
self: &Arc<Self>,
crypt: Arc<dyn Crypt>,
read_only: bool,
) -> Result<(), Error> {
match &*self.lock_state.lock().unwrap() {
LockState::Locked => {}
LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
LockState::Invalid => bail!(FxfsError::Internal),
LockState::Unlocked(_) | LockState::UnlockedReadOnly(..) => {
bail!(FxfsError::AlreadyBound)
}
LockState::Unknown => panic!("Store was unlocked before replay"),
LockState::Locking => panic!("Store is being locked"),
LockState::Unlocking => panic!("Store is being unlocked"),
}
// We must lock flushing since that can modify store_info and the encrypted mutations file.
let keys = lock_keys![LockKey::flush(self.store_object_id())];
let fs = self.filesystem();
let guard = fs.lock_manager().write_lock(keys).await;
let store_info = self.load_store_info().await?;
self.tree
.append_layers(
self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
)
.await
.context("Failed to read object tree layer file contents")?;
let unwrapped_key = crypt
.unwrap_key(store_info.mutations_key.as_ref().unwrap(), self.store_object_id)
.await
.context("Failed to unwrap mutations keys")?;
// The ChaCha20 stream cipher we use supports up to 64 GiB. By default we'll roll the key
// after every 128 MiB. Here we just need to pick a number that won't cause issues if it
// wraps, so we just use u32::MAX (the offset is u64).
ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
let mut mutations_cipher =
StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
let wrapped_key = store_info.object_id_key.as_ref().ok_or(FxfsError::Inconsistent)?;
let object_id_cipher =
Ff1::new(&crypt.unwrap_key(wrapped_key, self.store_object_id).await?);
{
let mut last_object_id = self.last_object_id.lock().unwrap();
last_object_id.cipher = Some(object_id_cipher);
}
self.update_last_object_id(store_info.last_object_id);
// Apply the encrypted mutations.
let mut mutations = {
if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
EncryptedMutations::default()
} else {
let parent_store = self.parent_store.as_ref().unwrap();
let handle = ObjectStore::open_object(
&parent_store,
store_info.encrypted_mutations_object_id,
HandleOptions::default(),
None,
)
.await?;
let mut cursor = std::io::Cursor::new(
handle
.contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
.await
.context(FxfsError::Inconsistent)?,
);
let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
.context("Failed to deserialize EncryptedMutations")?
.0;
let len = cursor.get_ref().len() as u64;
while cursor.position() < len {
mutations.extend(
&EncryptedMutations::deserialize_with_version(&mut cursor)
.context("Failed to deserialize EncryptedMutations")?
.0,
);
}
mutations
}
};
// This assumes that the journal has no buffered mutations for this store (see Self::lock).
let journaled = EncryptedMutations::from_replayed_mutations(
self.store_object_id,
fs.journal()
.read_transactions_for_object(self.store_object_id)
.await
.context("Failed to read encrypted mutations from journal")?,
);
mutations.extend(&journaled);
let _ = std::mem::replace(&mut *self.lock_state.lock().unwrap(), LockState::Unlocking);
*self.store_info.lock().unwrap() = StoreOrReplayInfo::Info(store_info);
// If we fail, clean up.
let clean_up = scopeguard::guard((), |_| {
*self.lock_state.lock().unwrap() = LockState::Locked;
*self.store_info.lock().unwrap() = StoreOrReplayInfo::Locked;
// Make sure we don't leave unencrypted data lying around in memory.
self.tree.reset();
});
let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
let mut slice = &mut data[..];
let mut last_offset = 0;
for (offset, key) in mutations_key_roll {
let split_offset = offset
.checked_sub(last_offset)
.ok_or(FxfsError::Inconsistent)
.context("Invalid mutation key roll offset")?;
last_offset = offset;
ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
let (old, new) = slice.split_at_mut(split_offset);
mutations_cipher.decrypt(old);
let unwrapped_key = crypt
.unwrap_key(&key, self.store_object_id)
.await
.context("Failed to unwrap mutations keys")?;
mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
slice = new;
}
mutations_cipher.decrypt(slice);
// Always roll the mutations key when we unlock which guarantees we won't reuse a
// previous key and nonce.
self.roll_mutations_key(crypt.as_ref()).await?;
let mut cursor = std::io::Cursor::new(data);
for (checkpoint, count) in transactions {
let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
for _ in 0..count {
let mutation =
Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
.context("failed to deserialize encrypted mutation")?;
self.apply_mutation(mutation, &context, AssocObj::None)
.context("failed to apply encrypted mutation")?;
}
}
*self.lock_state.lock().unwrap() =
if read_only { LockState::UnlockedReadOnly(crypt) } else { LockState::Unlocked(crypt) };
// To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
// it's possible for more writes to be queued and for the store to be locked before we can
// flush anything and that can repeat.
std::mem::drop(guard);
if !read_only && !self.filesystem().options().read_only {
self.flush_with_reason(flush::Reason::Unlock).await?;
// Reap purged files within this store.
let _ = self.filesystem().graveyard().initial_reap(&self).await?;
}
// Return and cancel the clean up.
Ok(ScopeGuard::into_inner(clean_up))
}
pub fn is_locked(&self) -> bool {
matches!(
*self.lock_state.lock().unwrap(),
LockState::Locked | LockState::Locking | LockState::Unknown
)
}
pub fn is_unknown(&self) -> bool {
matches!(*self.lock_state.lock().unwrap(), LockState::Unknown)
}
pub fn is_encrypted(&self) -> bool {
self.store_info.lock().unwrap().info().unwrap().mutations_key.is_some()
}
// Locks a store. This assumes no other concurrent access to the store (other than flushing
// which is guarded for here). Whilst this can return an error, the store will be placed into
// an unusable but safe state (i.e. no lingering unencrypted data) if an error is encountered.
pub async fn lock(&self) -> Result<(), Error> {
// We must lock flushing since it is not safe for that to be happening whilst we are locking
// the store.
let keys = lock_keys![LockKey::flush(self.store_object_id())];
let fs = self.filesystem();
let _guard = fs.lock_manager().write_lock(keys).await;
{
let mut lock_state = self.lock_state.lock().unwrap();
if let LockState::Unlocked(_) = &*lock_state {
*lock_state = LockState::Locking;
} else {
panic!("Unexpected lock state: {:?}", &*lock_state);
}
}
// Sync the journal now to ensure that any buffered mutations for this store make it out to
// disk. This is necessary to be able to unlock the store again.
// We need to establish a barrier at this point (so that the journaled writes are observable
// by any future attempts to unlock the store), hence the flush_device.
let sync_result =
self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
*self.lock_state.lock().unwrap() = if let Err(error) = &sync_result {
error!(?error, "Failed to sync journal; store will no longer be usable");
LockState::Invalid
} else {
LockState::Locked
};
self.key_manager.clear();
*self.store_info.lock().unwrap() = StoreOrReplayInfo::Locked;
self.tree.reset();
sync_result
}
// Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`). Data
// is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
// and will be replayed next time the store is unlocked.
pub fn lock_read_only(&self) {
*self.lock_state.lock().unwrap() = LockState::Locked;
*self.store_info.lock().unwrap() = StoreOrReplayInfo::Locked;
self.tree.reset();
}
// Returns INVALID_OBJECT_ID if the object ID cipher needs to be created or rolled.
fn maybe_get_next_object_id(&self) -> u64 {
let mut last_object_id = self.last_object_id.lock().unwrap();
if last_object_id.should_create_cipher() {
INVALID_OBJECT_ID
} else {
last_object_id.get_next_object_id()
}
}
// Returns a new object ID that can be used. This will create an object ID cipher if necessary.
// We require a transaction to guarantee that a guard is held because if we create a transaction
// to roll the object ID key, we skip taking the filesystem lock.
pub async fn get_next_object_id(&self, transaction: &Transaction<'_>) -> Result<u64, Error> {
let object_id = self.maybe_get_next_object_id();
if object_id != INVALID_OBJECT_ID {
return Ok(object_id);
}
// Create a transaction (which has a lock) and then check again.
let mut transaction = self
.filesystem()
.new_transaction(
lock_keys![LockKey::object(
self.parent_store.as_ref().unwrap().store_object_id,
self.store_object_id,
)],
Options {
// We must skip journal checks because this transaction might be needed to
// compact.
skip_journal_checks: true,
borrow_metadata_space: true,
txn_guard: Some(transaction.txn_guard()),
..Default::default()
},
)
.await?;
{
let mut last_object_id = self.last_object_id.lock().unwrap();
if !last_object_id.should_create_cipher() {
// We lost a race.
return Ok(last_object_id.get_next_object_id());
}
// It shouldn't be possible for last_object_id to wrap within our lifetime, so if this
// happens, it's most likely due to corruption.
ensure!(
last_object_id.id & OBJECT_ID_HI_MASK != OBJECT_ID_HI_MASK,
FxfsError::Inconsistent
);
}
// Create a key.
let (object_id_wrapped, object_id_unwrapped) =
self.crypt().unwrap().create_key(self.store_object_id, KeyPurpose::Metadata).await?;
// Update StoreInfo.
let buf = {
let serialized_info = {
let mut store_info = self.store_info.lock().unwrap();
let store_info = store_info.info_mut().unwrap();
store_info.object_id_key = Some(object_id_wrapped);
let mut serialized_info = Vec::new();
store_info.serialize_with_version(&mut serialized_info)?;
serialized_info
};
let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
buf
};
self.store_info_handle
.get()
.unwrap()
.txn_write(&mut transaction, 0u64, buf.as_ref())
.await?;
transaction.commit().await?;
let mut last_object_id = self.last_object_id.lock().unwrap();
last_object_id.cipher = Some(Ff1::new(&object_id_unwrapped));
last_object_id.id = (last_object_id.id + (1 << 32)) & OBJECT_ID_HI_MASK;
Ok((last_object_id.id & OBJECT_ID_HI_MASK)
| last_object_id.cipher.as_ref().unwrap().encrypt(last_object_id.id as u32) as u64)
}
fn allocator(&self) -> Arc<Allocator> {
self.filesystem().allocator()
}
// If |transaction| has an impending mutation for the underlying object, returns that.
// Otherwise, looks up the object from the tree and returns a suitable mutation for it. The
// mutation is returned here rather than the item because the mutation includes the operation
// which has significance: inserting an object implies it's the first of its kind unlike
// replacing an object.
async fn txn_get_object_mutation(
&self,
transaction: &Transaction<'_>,
object_id: u64,
) -> Result<ObjectStoreMutation, Error> {
if let Some(mutation) =
transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
{
Ok(mutation.clone())
} else {
Ok(ObjectStoreMutation {
item: self
.tree
.find(&ObjectKey::object(object_id))
.await?
.ok_or(FxfsError::NotFound)?,
op: Operation::ReplaceOrInsert,
})
}
}
fn update_last_object_id(&self, mut object_id: u64) {
let mut last_object_id = self.last_object_id.lock().unwrap();
// For encrypted stores, object_id will be encrypted here, so we must decrypt first.
if let Some(cipher) = &last_object_id.cipher {
// If the object ID cipher has been rolled, then it's possible we might see object IDs
// that were generated using a different cipher so the decrypt here will return the
// wrong value, but that won't matter because the hi part of the object ID should still
// discriminate.
object_id = object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
}
if object_id > last_object_id.id {
last_object_id.id = object_id;
}
}
/// Adds the specified object to the graveyard.
pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
let graveyard_id = self.graveyard_directory_object_id();
assert_ne!(graveyard_id, INVALID_OBJECT_ID);
transaction.add(
self.store_object_id,
Mutation::replace_or_insert_object(
ObjectKey::graveyard_entry(graveyard_id, object_id),
ObjectValue::Some,
),
);
}
/// Removes the specified object from the graveyard. NB: Care should be taken when calling
/// this because graveyard entries are used for purging deleted files *and* for trimming
/// extents. For example, consider the following sequence:
///
/// 1. Add Trim graveyard entry.
/// 2. Replace with Some graveyard entry (see above).
/// 3. Remove graveyard entry.
///
/// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
/// actually be:
///
/// 3. Replace with Trim graveyard entry.
pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
transaction.add(
self.store_object_id,
Mutation::replace_or_insert_object(
ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
ObjectValue::None,
),
);
}
/// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
/// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
/// so the caller does not need to be concerned about replacing the graveyard attribute entry
/// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
pub fn remove_attribute_from_graveyard(
&self,
transaction: &mut Transaction<'_>,
object_id: u64,
attribute_id: u64,
) {
transaction.add(
self.store_object_id,
Mutation::replace_or_insert_object(
ObjectKey::graveyard_attribute_entry(
self.graveyard_directory_object_id(),
object_id,
attribute_id,
),
ObjectValue::None,
),
);
}
// Roll the mutations key. The new key will be written for the next encrypted mutation.
async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
let (wrapped_key, unwrapped_key) =
crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
// The mutations_cipher lock must be held for the duration so that mutations_cipher and
// store_info are updated atomically. Otherwise, write_mutation could find a new cipher but
// end up writing the wrong wrapped key.
let mut cipher = self.mutations_cipher.lock().unwrap();
*cipher = Some(StreamCipher::new(&unwrapped_key, 0));
self.store_info.lock().unwrap().info_mut().unwrap().mutations_key = Some(wrapped_key);
// mutations_cipher_offset is updated by flush.
Ok(())
}
/// Returns the link of a symlink object.
pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
match self.tree.find(&ObjectKey::object(object_id)).await? {
None | Some(Item { value: ObjectValue::None, .. }) => bail!(FxfsError::NotFound),
Some(Item {
value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
..
}) => Ok(link),
Some(item) => Err(anyhow!(FxfsError::Inconsistent)
.context(format!("Unexpected item in lookup: {item:?}"))),
}
}
/// Retrieves the wrapped keys for the given object.
async fn get_keys(&self, object_id: u64) -> Result<WrappedKeys, Error> {
match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::NotFound)? {
Item { value: ObjectValue::Keys(EncryptionKeys::AES256XTS(keys)), .. } => Ok(keys),
_ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
}
}
pub async fn update_attributes<'a>(
&self,
transaction: &mut Transaction<'a>,
object_id: u64,
node_attributes: Option<&fio::MutableNodeAttributes>,
change_time: Option<Timestamp>,
) -> Result<(), Error> {
if change_time.is_none() {
if let Some(attributes) = node_attributes {
let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
if *attributes == empty_attributes {
return Ok(());
}
} else {
return Ok(());
}
}
let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
if let Some(time) = change_time {
attributes.change_time = time;
}
if let Some(node_attributes) = node_attributes {
if let Some(time) = node_attributes.creation_time {
attributes.creation_time = Timestamp::from_nanos(time);
}
if let Some(time) = node_attributes.modification_time {
attributes.modification_time = Timestamp::from_nanos(time);
}
if let Some(time) = node_attributes.access_time {
attributes.access_time = Timestamp::from_nanos(time);
}
if node_attributes.mode.is_some()
|| node_attributes.uid.is_some()
|| node_attributes.gid.is_some()
|| node_attributes.rdev.is_some()
{
if let Some(a) = &mut attributes.posix_attributes {
if let Some(mode) = node_attributes.mode {
a.mode = mode;
}
if let Some(uid) = node_attributes.uid {
a.uid = uid;
}
if let Some(gid) = node_attributes.gid {
a.gid = gid;
}
if let Some(rdev) = node_attributes.rdev {
a.rdev = rdev;
}
} else {
attributes.posix_attributes = Some(PosixAttributes {
mode: node_attributes.mode.unwrap_or_default(),
uid: node_attributes.uid.unwrap_or_default(),
gid: node_attributes.gid.unwrap_or_default(),
rdev: node_attributes.rdev.unwrap_or_default(),
});
}
}
}
} else {
bail!(anyhow!(FxfsError::Inconsistent)
.context("ObjectStore.update_attributes: Expected object value"));
};
transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
Ok(())
}
}
#[async_trait]
impl JournalingObject for ObjectStore {
fn apply_mutation(
&self,
mutation: Mutation,
context: &ApplyContext<'_, '_>,
_assoc_obj: AssocObj<'_>,
) -> Result<(), Error> {
if context.mode.is_live() {
let lock_state = self.lock_state.lock().unwrap();
match &*lock_state {
LockState::Locked | LockState::Locking => {
assert_matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
}
LockState::Invalid
| LockState::Unlocking
| LockState::Unencrypted
| LockState::Unlocked(_)
| LockState::UnlockedReadOnly(..) => {}
_ => panic!("Unexpected lock state: {:?}", &*lock_state),
}
}
match mutation {
Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => {
item.sequence = context.checkpoint.file_offset;
match op {
Operation::Insert => {
// If we are inserting an object record for the first time, it signifies the
// birth of the object so we need to adjust the object count.
if matches!(item.value, ObjectValue::Object { .. }) {
self.store_info.lock().unwrap().adjust_object_count(1);
if context.mode.is_replay() {
self.update_last_object_id(item.key.object_id);
}
}
self.tree.insert(item)?;
}
Operation::ReplaceOrInsert => {
self.tree.replace_or_insert(item);
}
Operation::Merge => {
if item.is_tombstone() {
self.store_info.lock().unwrap().adjust_object_count(-1);
}
let lower_bound = item.key.key_for_merge_into();
self.tree.merge_into(item, &lower_bound);
}
}
}
Mutation::BeginFlush => {
ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
self.tree.seal();
self.store_info.lock().unwrap().begin_flush();
}
Mutation::EndFlush => {
ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
if context.mode.is_replay() {
self.tree.reset_immutable_layers();
self.store_info.lock().unwrap().end_flush();
}
}
Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
// We will process these during Self::unlock.
ensure!(
!matches!(&*self.lock_state.lock().unwrap(), LockState::Unencrypted),
FxfsError::Inconsistent
);
}
Mutation::CreateInternalDir(object_id) => {
ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
self.store_info.lock().unwrap().set_internal_dir(object_id);
}
_ => bail!("unexpected mutation: {:?}", mutation),
}
self.counters.lock().unwrap().mutations_applied += 1;
Ok(())
}
fn drop_mutation(&self, _mutation: Mutation, _transaction: &Transaction<'_>) {
self.counters.lock().unwrap().mutations_dropped += 1;
}
/// Push all in-memory structures to the device. This is not necessary for sync since the
/// journal will take care of it. This is supposed to be called when there is either memory or
/// space pressure (flushing the store will persist in-memory data and allow the journal file to
/// be trimmed).
///
/// Also returns the earliest version of a struct in the filesystem (when known).
async fn flush(&self) -> Result<Version, Error> {
return self.flush_with_reason(flush::Reason::Journal).await;
}
fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
// Intentionally enumerating all variants to force a decision on any new variants. Encrypt
// all mutations that could affect an encrypted object store contents or the `StoreInfo` of
// the encrypted object store. During `unlock()` any mutations which haven't been encrypted
// won't be replayed after reading `StoreInfo`.
match mutation {
// Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
// still choose to encrypt the mutation because it makes it easier to deal with replay.
// When we replay mutations for an encrypted store, the only thing we keep in memory are
// the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
// encrypting the CreateInternalDir mutation here, it means we don't have to track both
// encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
// to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
// encrypted mutations and handled them all in sequence during `unlock()`.
Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
let mut cipher = self.mutations_cipher.lock().unwrap();
if let Some(cipher) = cipher.as_mut() {
// If this is the first time we've used this key, we must write the key out.
if cipher.offset() == 0 {
writer.write(Mutation::update_mutations_key(
self.store_info
.lock()
.unwrap()
.info()
.unwrap()
.mutations_key
.as_ref()
.unwrap()
.clone(),
));
}
let mut buffer = Vec::new();
mutation.serialize_into(&mut buffer).unwrap();
cipher.encrypt(&mut buffer);
writer.write(Mutation::EncryptedObjectStore(buffer.into()));
return;
}
}
// `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
// encrypted object stores, but are either the encrypted mutation data itself or
// metadata governing how the data will be encrypted. They should only be produced here.
Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
debug_assert!(false, "Only this method should generate encrypted mutations");
}
// `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
// the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
// `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
// encrypt the allocator or root/root-parent stores so we can avoid the locking.
Mutation::Allocator(_)
| Mutation::BeginFlush
| Mutation::EndFlush
| Mutation::DeleteVolume
| Mutation::UpdateBorrowed(_) => {}
}
writer.write(mutation.clone());
}
}
impl HandleOwner for ObjectStore {}
impl AsRef<ObjectStore> for ObjectStore {
fn as_ref(&self) -> &ObjectStore {
self
}
}
fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
// This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
// the amount of metadata space that might need to be reserved to allow the encrypted mutations
// to be written to layer files. It needs to be >= than reservation_amount_from_layer_size will
// return once the data has been written to layer files and <= than
// reserved_space_from_journal_usage would use. We can't just use
// reserved_space_from_journal_usage because the encrypted mutations file includes some extra
// data (it includes the checkpoints) that isn't written in the same way to the journal.
size * 3
}
impl AssociatedObject for ObjectStore {}
/// Argument to the trim_some method.
#[derive(Debug)]
pub enum TrimMode {
/// Trim extents beyond the current size.
UseSize,
/// Trim extents beyond the supplied offset.
FromOffset(u64),
/// Remove the object (or attribute) from the store once it is fully trimmed.
Tombstone(TombstoneMode),
}
/// Sets the mode for tombstoning (either at the object or attribute level).
#[derive(Debug)]
pub enum TombstoneMode {
Object,
Attribute,
}
/// Result of the trim_some method.
#[derive(Debug)]
pub enum TrimResult {
/// We reached the limit of the transaction and more extents might follow.
Incomplete,
/// We finished this attribute. Returns the ID of the next attribute for the same object if
/// there is one.
Done(Option<u64>),
}
/// Loads store info.
pub async fn load_store_info(
parent: &Arc<ObjectStore>,
store_object_id: u64,
) -> Result<StoreInfo, Error> {
let handle =
ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?;
Ok(if handle.get_size() > 0 {
let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
let mut cursor = std::io::Cursor::new(serialized_info);
let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
.context("Failed to deserialize StoreInfo")?;
store_info
} else {
// The store_info will be absent for a newly created and empty object store.
StoreInfo::default()
})
}
#[cfg(test)]
mod tests {
use {
super::{
StoreInfo, DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID,
MAX_STORE_INFO_SERIALIZED_SIZE, OBJECT_ID_HI_MASK,
},
crate::{
errors::FxfsError,
filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem, SyncOptions},
fsck::fsck,
lsm_tree::types::{Item, ItemRef, LayerIterator},
object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle, INVALID_OBJECT_ID},
object_store::{
directory::Directory,
object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue},
transaction::{lock_keys, Options},
volume::root_volume,
FsverityMetadata, HandleOptions, LockKey, Mutation, ObjectStore, RootDigest,
},
serialized_types::VersionedLatest,
},
assert_matches::assert_matches,
fuchsia_async as fasync,
futures::join,
fxfs_crypto::{Crypt, WrappedKey, WrappedKeyBytes, WRAPPED_KEY_SIZE},
fxfs_insecure_crypto::InsecureCrypt,
std::{
ops::Bound,
sync::{Arc, Mutex},
time::Duration,
},
storage_device::{fake_device::FakeDevice, DeviceHolder},
};
const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
async fn test_filesystem() -> OpenFxFilesystem {
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
FxFilesystem::new_empty(device).await.expect("new_empty failed")
}
#[fuchsia::test]
async fn test_item_sequences() {
let fs = test_filesystem().await;
let object1;
let object2;
let object3;
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let store = fs.root_store();
object1 = Arc::new(
ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed"),
);
transaction.commit().await.expect("commit failed");
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
object2 = Arc::new(
ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed"),
);
transaction.commit().await.expect("commit failed");
fs.sync(SyncOptions::default()).await.expect("sync failed");
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
object3 = Arc::new(
ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed"),
);
transaction.commit().await.expect("commit failed");
let layer_set = store.tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
let mut sequences = [0u64; 3];
while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() {
if *object_id == object1.object_id() {
sequences[0] = sequence;
} else if *object_id == object2.object_id() {
sequences[1] = sequence;
} else if *object_id == object3.object_id() {
sequences[2] = sequence;
}
iter.advance().await.expect("advance failed");
}
assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences);
// The last item came after a sync, so should be strictly greater.
assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_verified_file_with_verified_attribute() {
let fs: OpenFxFilesystem = test_filesystem().await;
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let store = fs.root_store();
let object = Arc::new(
ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed"),
);
transaction.add(
store.store_object_id(),
Mutation::replace_or_insert_object(
ObjectKey::attribute(
object.object_id(),
DEFAULT_DATA_ATTRIBUTE_ID,
AttributeKey::Attribute,
),
ObjectValue::verified_attribute(
0,
FsverityMetadata { root_digest: RootDigest::Sha256([0; 32]), salt: vec![] },
),
),
);
transaction.add(
store.store_object_id(),
Mutation::replace_or_insert_object(
ObjectKey::attribute(
object.object_id(),
FSVERITY_MERKLE_ATTRIBUTE_ID,
AttributeKey::Attribute,
),
ObjectValue::attribute(0),
),
);
transaction.commit().await.unwrap();
let handle =
ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
.await
.expect("open_object failed");
assert!(handle.verified_file());
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_verified_file_without_verified_attribute() {
let fs: OpenFxFilesystem = test_filesystem().await;
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let store = fs.root_store();
let object = Arc::new(
ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed"),
);
transaction.commit().await.unwrap();
let handle =
ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
.await
.expect("open_object failed");
assert!(!handle.verified_file());
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_create_and_open_store() {
let fs = test_filesystem().await;
let store_id = {
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
root_volume
.new_volume("test", Some(Arc::new(InsecureCrypt::new())))
.await
.expect("new_volume failed")
.store_object_id()
};
fs.close().await.expect("close failed");
let device = fs.take_device().await;
device.reopen(false);
let fs = FxFilesystem::open(device).await.expect("open failed");
{
let store = fs.object_manager().store(store_id).expect("store not found");
store.unlock(Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
}
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_create_and_open_internal_dir() {
let fs = test_filesystem().await;
let dir_id;
let store_id;
{
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
let store = root_volume
.new_volume("test", Some(Arc::new(InsecureCrypt::new())))
.await
.expect("new_volume failed");
dir_id =
store.get_or_create_internal_directory_id().await.expect("Create internal dir");
store_id = store.store_object_id();
}
fs.close().await.expect("close failed");
let device = fs.take_device().await;
device.reopen(false);
let fs = FxFilesystem::open(device).await.expect("open failed");
{
let store = fs.object_manager().store(store_id).expect("store not found");
store.unlock(Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
assert_eq!(
dir_id,
store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
);
let obj = store
.tree()
.find(&ObjectKey::object(dir_id))
.await
.expect("Searching tree for dir")
.unwrap();
assert_matches!(
obj.value,
ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
);
}
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_create_and_open_internal_dir_unencrypted() {
let fs = test_filesystem().await;
let dir_id;
let store_id;
{
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
let store = root_volume.new_volume("test", None).await.expect("new_volume failed");
dir_id =
store.get_or_create_internal_directory_id().await.expect("Create internal dir");
store_id = store.store_object_id();
}
fs.close().await.expect("close failed");
let device = fs.take_device().await;
device.reopen(false);
let fs = FxFilesystem::open(device).await.expect("open failed");
{
let store = fs.object_manager().store(store_id).expect("store not found");
assert_eq!(
dir_id,
store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
);
let obj = store
.tree()
.find(&ObjectKey::object(dir_id))
.await
.expect("Searching tree for dir")
.unwrap();
assert_matches!(
obj.value,
ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
);
}
fs.close().await.expect("Close failed");
}
#[fuchsia::test(threads = 10)]
async fn test_old_layers_are_purged() {
let fs = test_filesystem().await;
let store = fs.root_store();
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let object = Arc::new(
ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed"),
);
transaction.commit().await.expect("commit failed");
store.flush().await.expect("flush failed");
let mut buf = object.allocate_buffer(5).await;
buf.as_mut_slice().copy_from_slice(b"hello");
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
// Getting the layer-set should cause the flush to stall.
let layer_set = store.tree().layer_set();
let done = Mutex::new(false);
let mut object_id = 0;
join!(
async {
store.flush().await.expect("flush failed");
assert!(*done.lock().unwrap());
},
async {
// This is a halting problem so all we can do is sleep.
fasync::Timer::new(Duration::from_secs(1)).await;
*done.lock().unwrap() = true;
object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
std::mem::drop(layer_set);
}
);
if let Err(e) = ObjectStore::open_object(
&store.parent_store.as_ref().unwrap(),
object_id,
HandleOptions::default(),
store.crypt(),
)
.await
{
assert!(FxfsError::NotFound.matches(&e));
} else {
panic!("open_object succeeded");
}
}
#[fuchsia::test]
async fn test_tombstone_deletes_data() {
let fs = test_filesystem().await;
let root_store = fs.root_store();
let child_id = {
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let child = ObjectStore::create_object(
&root_store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed");
transaction.commit().await.expect("commit failed");
// Allocate an extent in the file.
let mut buffer = child.allocate_buffer(8192).await;
buffer.as_mut_slice().fill(0xaa);
child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
child.object_id()
};
root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
// Let fsck check allocations.
fsck(fs.clone()).await.expect("fsck failed");
}
#[fuchsia::test]
async fn test_major_compaction_discards_unnecessary_records() {
let fs = test_filesystem().await;
let root_store = fs.root_store();
let child_id = {
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let child = ObjectStore::create_object(
&root_store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed");
transaction.commit().await.expect("commit failed");
// Allocate an extent in the file.
let mut buffer = child.allocate_buffer(8192).await;
buffer.as_mut_slice().fill(0xaa);
child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
child.object_id()
};
root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
assert_matches!(
root_store.tree.find(&ObjectKey::object(child_id)).await.expect("find failed"),
Some(Item { value: ObjectValue::None, .. })
);
root_store.flush().await.expect("flush failed");
// There should be no records for the object.
let layers = root_store.tree.layer_set();
let mut merger = layers.merger();
let iter =
merger.seek(Bound::Included(&ObjectKey::object(child_id))).await.expect("seek failed");
match iter.get() {
None => {}
Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
assert_ne!(*object_id, child_id)
}
}
}
#[fuchsia::test]
async fn test_overlapping_extents_in_different_layers() {
let fs = test_filesystem().await;
let store = fs.root_store();
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
store.root_directory_object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
let root_directory =
Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
let object = root_directory
.create_child_file(&mut transaction, "test", None)
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
let buf = object.allocate_buffer(16384).await;
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
store.flush().await.expect("flush failed");
object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
// At this point, we should have an extent for 0..16384 in a layer that has been flushed,
// and an extent for 0..4096 that partially overwrites it. Writing to 0..16384 should
// overwrite both of those extents.
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
fsck(fs.clone()).await.expect("fsck failed");
}
#[fuchsia::test(threads = 10)]
async fn test_encrypted_mutations() {
async fn one_iteration(
fs: OpenFxFilesystem,
crypt: Arc<dyn Crypt>,
iteration: u64,
) -> OpenFxFilesystem {
async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
fs.close().await.expect("Close failed");
let device = fs.take_device().await;
device.reopen(false);
FxFilesystem::open(device).await.expect("FS open failed")
}
let fs = reopen(fs).await;
let (store_object_id, object_id) = {
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
let store =
root_volume.volume("test", Some(crypt.clone())).await.expect("volume failed");
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
store.root_directory_object_id(),
)],
Options::default(),
)
.await
.expect("new_transaction failed");
let root_directory = Directory::open(&store, store.root_directory_object_id())
.await
.expect("open failed");
let object = root_directory
.create_child_file(&mut transaction, &format!("test {}", iteration), None)
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
let mut buf = object.allocate_buffer(1000).await;
for i in 0..buf.len() {
buf.as_mut_slice()[i] = i as u8;
}
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
(store.store_object_id(), object.object_id())
};
let fs = reopen(fs).await;
let check_object = |fs: Arc<FxFilesystem>| {
let crypt = crypt.clone();
async move {
let root_volume = root_volume(fs).await.expect("root_volume failed");
let volume =
root_volume.volume("test", Some(crypt)).await.expect("volume failed");
let object = ObjectStore::open_object(
&volume,
object_id,
HandleOptions::default(),
None,
)
.await
.expect("open_object failed");
let mut buf = object.allocate_buffer(1000).await;
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
for i in 0..buf.len() {
assert_eq!(buf.as_slice()[i], i as u8);
}
}
};
check_object(fs.clone()).await;
let fs = reopen(fs).await;
// At this point the "test" volume is locked. Before checking the object, flush the
// filesystem. This should leave a file with encrypted mutations.
fs.object_manager().flush().await.expect("flush failed");
assert_ne!(
fs.object_manager()
.store(store_object_id)
.unwrap()
.load_store_info()
.await
.expect("load_store_info failed")
.encrypted_mutations_object_id,
INVALID_OBJECT_ID
);
check_object(fs.clone()).await;
// Checking the object should have triggered a flush and so now there should be no
// encrypted mutations object.
assert_eq!(
fs.object_manager()
.store(store_object_id)
.unwrap()
.load_store_info()
.await
.expect("load_store_info failed")
.encrypted_mutations_object_id,
INVALID_OBJECT_ID
);
let fs = reopen(fs).await;
fsck(fs.clone()).await.expect("fsck failed");
let fs = reopen(fs).await;
check_object(fs.clone()).await;
fs
}
let mut fs = test_filesystem().await;
let crypt = Arc::new(InsecureCrypt::new());
{
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
let _store = root_volume
.new_volume("test", Some(crypt.clone()))
.await
.expect("new_volume failed");
}
// Run a few iterations so that we test changes with the stream cipher offset.
for i in 0..5 {
fs = one_iteration(fs, crypt.clone(), i).await;
}
}
#[fuchsia::test(threads = 10)]
async fn test_object_id_cipher_roll() {
let fs = test_filesystem().await;
let crypt = Arc::new(InsecureCrypt::new());
{
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
let store = root_volume
.new_volume("test", Some(crypt.clone()))
.await
.expect("new_volume failed");
let store_info = store.store_info().unwrap();
// Hack the last object ID to force a roll of the object ID cipher.
{
let mut last_object_id = store.last_object_id.lock().unwrap();
assert_eq!(last_object_id.id & OBJECT_ID_HI_MASK, 1u64 << 32);
last_object_id.id |= 0xffffffff;
}
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
store.root_directory_object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
let root_directory = Directory::open(&store, store.root_directory_object_id())
.await
.expect("open failed");
let object = root_directory
.create_child_file(&mut transaction, "test", None)
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 2u64 << 32);
// Check that the key has been changed.
assert_ne!(store.store_info().unwrap().object_id_key, store_info.object_id_key);
assert_eq!(store.last_object_id.lock().unwrap().id, 2u64 << 32);
};
fs.close().await.expect("Close failed");
let device = fs.take_device().await;
device.reopen(false);
let fs = FxFilesystem::open(device).await.expect("open failed");
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
let store = root_volume.volume("test", Some(crypt.clone())).await.expect("volume failed");
assert_eq!(store.last_object_id.lock().unwrap().id, 2u64 << 32);
}
#[fuchsia::test(threads = 10)]
async fn test_lock_store() {
let fs = test_filesystem().await;
let crypt = Arc::new(InsecureCrypt::new());
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
let store =
root_volume.new_volume("test", Some(crypt.clone())).await.expect("new_volume failed");
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
store.root_directory_object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
let root_directory =
Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
root_directory
.create_child_file(&mut transaction, "test", None)
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
store.lock().await.expect("lock failed");
store.unlock(crypt).await.expect("unlock failed");
root_directory.lookup("test").await.expect("lookup failed").expect("not found");
}
#[fuchsia::test(threads = 10)]
async fn test_unlock_read_only() {
let fs = test_filesystem().await;
let crypt = Arc::new(InsecureCrypt::new());
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
let store =
root_volume.new_volume("test", Some(crypt.clone())).await.expect("new_volume failed");
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
store.root_directory_object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
let root_directory =
Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
root_directory
.create_child_file(&mut transaction, "test", None)
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
store.lock().await.expect("lock failed");
store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
root_directory.lookup("test").await.expect("lookup failed").expect("not found");
store.lock_read_only();
store.unlock_read_only(crypt).await.expect("unlock failed");
root_directory.lookup("test").await.expect("lookup failed").expect("not found");
}
#[fuchsia::test(threads = 10)]
async fn test_key_rolled_when_unlocked() {
let fs = test_filesystem().await;
let crypt = Arc::new(InsecureCrypt::new());
let object_id;
{
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
let store = root_volume
.new_volume("test", Some(crypt.clone()))
.await
.expect("new_volume failed");
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
store.root_directory_object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
let root_directory = Directory::open(&store, store.root_directory_object_id())
.await
.expect("open failed");
object_id = root_directory
.create_child_file(&mut transaction, "test", None)
.await
.expect("create_child_file failed")
.object_id();
transaction.commit().await.expect("commit failed");
}
fs.close().await.expect("Close failed");
let mut device = fs.take_device().await;
// Repeatedly remount so that we can be sure that we can remount when there are many
// mutations keys.
for _ in 0..100 {
device.reopen(false);
let fs = FxFilesystem::open(device).await.expect("open failed");
{
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
let store = root_volume
.volume("test", Some(crypt.clone()))
.await
.expect("open_volume failed");
// The key should get rolled every time we unlock.
assert_eq!(store.mutations_cipher.lock().unwrap().as_ref().unwrap().offset(), 0);
// Make sure there's an encrypted mutation.
let handle =
ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
.await
.expect("open_object failed");
let buffer = handle.allocate_buffer(100).await;
handle
.write_or_append(Some(0), buffer.as_ref())
.await
.expect("write_or_append failed");
}
fs.close().await.expect("Close failed");
device = fs.take_device().await;
}
}
#[test]
fn test_store_info_max_serialized_size() {
let info = StoreInfo {
guid: [0xff; 16],
last_object_id: 0x1234567812345678,
// Worst case, each layer should be 3/4 the size of the layer below it (because of the
// compaction policy we're using). If the smallest layer is 8,192 bytes, then 120
// layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
// any size should fit.
layers: vec![0x1234567812345678; 120],
root_directory_object_id: 0x1234567812345678,
graveyard_directory_object_id: 0x1234567812345678,
object_count: 0x1234567812345678,
mutations_key: Some(WrappedKey {
wrapping_key_id: 0x1234567812345678,
key: WrappedKeyBytes::from([0xff; WRAPPED_KEY_SIZE]),
}),
mutations_cipher_offset: 0x1234567812345678,
encrypted_mutations_object_id: 0x1234567812345678,
object_id_key: Some(WrappedKey {
wrapping_key_id: 0x1234567812345678,
key: WrappedKeyBytes::from([0xff; WRAPPED_KEY_SIZE]),
}),
internal_directory_object_id: INVALID_OBJECT_ID,
};
let mut serialized_info = Vec::new();
info.serialize_with_version(&mut serialized_info).unwrap();
assert!(
serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
"{}",
serialized_info.len()
);
}
async fn reopen_after_crypt_failure_inner(read_only: bool) {
let fs = test_filesystem().await;
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
let store = {
let crypt = Arc::new(InsecureCrypt::new());
let store = root_volume
.new_volume("vol", Some(crypt.clone()))
.await
.expect("new_volume failed");
let root_directory = Directory::open(&store, store.root_directory_object_id())
.await
.expect("open failed");
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
root_directory.object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
root_directory
.create_child_file(&mut transaction, "test", None)
.await
.expect("create_child_file failed");
transaction.commit().await.expect("commit failed");
crypt.shutdown();
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
root_directory.object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
root_directory
.create_child_file(&mut transaction, "test2", None)
.await
.map(|_| ())
.expect_err("create_child_file should fail");
store.lock().await.expect("lock failed");
store
};
let crypt = Arc::new(InsecureCrypt::new());
if read_only {
store.unlock_read_only(crypt).await.expect("unlock failed");
} else {
store.unlock(crypt).await.expect("unlock failed");
}
let root_directory =
Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
root_directory.lookup("test").await.expect("lookup failed").expect("not found");
}
#[fuchsia::test(threads = 10)]
async fn test_reopen_after_crypt_failure() {
reopen_after_crypt_failure_inner(false).await;
}
#[fuchsia::test(threads = 10)]
async fn test_reopen_read_only_after_crypt_failure() {
reopen_after_crypt_failure_inner(true).await;
}
#[fuchsia::test(threads = 10)]
#[should_panic(expected = "Insufficient reservation space")]
#[cfg(debug_assertions)]
async fn large_transaction_causes_panic_in_debug_builds() {
let fs = test_filesystem().await;
let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
let store = root_volume.new_volume("vol", None).await.expect("new_volume failed");
let root_directory =
Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
Options::default(),
)
.await
.expect("transaction");
for i in 0..500 {
root_directory
.create_symlink(&mut transaction, b"link", &format!("{}", i))
.await
.expect("symlink");
}
assert_eq!(transaction.commit().await.expect("commit"), 0);
}
}