| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| checksum::Checksum, |
| debug_assert_not_too_long, |
| filesystem::TxnGuard, |
| log::*, |
| lsm_tree::types::Item, |
| object_handle::INVALID_OBJECT_ID, |
| object_store::{ |
| allocator::{AllocatorItem, Reservation}, |
| object_manager::{reserved_space_from_journal_usage, ObjectManager}, |
| object_record::{ |
| ObjectItem, ObjectItemV32, ObjectItemV33, ObjectItemV37, ObjectItemV38, ObjectKey, |
| ObjectKeyData, ObjectValue, ProjectProperty, |
| }, |
| }, |
| serialized_types::{migrate_nodefault, migrate_to_version, Migrate, Versioned}, |
| }, |
| anyhow::Error, |
| either::{Either, Left, Right}, |
| fprint::TypeFingerprint, |
| futures::{future::poll_fn, pin_mut}, |
| fxfs_crypto::{WrappedKey, WrappedKeyV32}, |
| rustc_hash::FxHashMap as HashMap, |
| scopeguard::ScopeGuard, |
| serde::{Deserialize, Serialize}, |
| std::{ |
| cell::UnsafeCell, |
| cmp::Ordering, |
| collections::{hash_map::Entry, BTreeSet}, |
| fmt, |
| marker::PhantomPinned, |
| mem, |
| ops::{Deref, DerefMut, Range}, |
| sync::Mutex, |
| task::{Poll, Waker}, |
| }, |
| }; |
| |
| /// This allows for special handling of certain transactions such as deletes and the |
| /// extension of Journal extents. For most other use cases it is appropriate to use |
| /// `default()` here. |
| #[derive(Clone, Copy, Default)] |
| pub struct Options<'a> { |
| /// If true, don't check for low journal space. This should be true for any transactions that |
| /// might alleviate journal space (i.e. compaction). |
| pub skip_journal_checks: bool, |
| |
| /// If true, borrow metadata space from the metadata reservation. This setting should be set to |
| /// true for any transaction that will either not affect space usage after compaction |
| /// (e.g. setting attributes), or reduce space usage (e.g. unlinking). Otherwise, a transaction |
| /// might fail with an out-of-space error. |
| pub borrow_metadata_space: bool, |
| |
| /// If specified, a reservation to be used with the transaction. If not set, any allocations |
| /// that are part of this transaction will have to take their chances, and will fail if there is |
| /// no free space. The intention is that this should be used for things like the journal which |
| /// require guaranteed space. |
| pub allocator_reservation: Option<&'a Reservation>, |
| |
| /// An existing transaction guard to be used. |
| pub txn_guard: Option<&'a TxnGuard<'a>>, |
| } |
| |
| // This is the amount of space that we reserve for metadata when we are creating a new transaction. |
| // A transaction should not take more than this. This is expressed in terms of space occupied in |
| // the journal; transactions must not take up more space in the journal than the number below. The |
| // amount chosen here must be large enough for the maximum possible transaction that can be created, |
| // so transactions always need to be bounded which might involve splitting an operation up into |
| // smaller transactions. |
| pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576; |
| pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 = |
| reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE); |
| |
| #[must_use] |
| pub struct TransactionLocks<'a>(pub WriteGuard<'a>); |
| |
| /// The journal consists of these records which will be replayed at mount time. Within a |
| /// transaction, these are stored as a set which allows some mutations to be deduplicated and found |
| /// (and we require custom comparison functions below). For example, we need to be able to find |
| /// object size changes. |
| pub type Mutation = MutationV38; |
| |
| #[derive( |
| Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned, |
| )] |
| #[cfg_attr(fuzz, derive(arbitrary::Arbitrary))] |
| pub enum MutationV38 { |
| ObjectStore(ObjectStoreMutationV38), |
| EncryptedObjectStore(Box<[u8]>), |
| Allocator(AllocatorMutationV32), |
| // Indicates the beginning of a flush. This would typically involve sealing a tree. |
| BeginFlush, |
| // Indicates the end of a flush. This would typically involve replacing the immutable layers |
| // with compacted ones. |
| EndFlush, |
| // Volume has been deleted. Requires we remove it from the set of managed ObjectStore. |
| DeleteVolume, |
| UpdateBorrowed(u64), |
| UpdateMutationsKey(UpdateMutationsKeyV32), |
| CreateInternalDir(u64), |
| } |
| |
| #[derive(Debug, Deserialize, Migrate, Serialize, Versioned, TypeFingerprint)] |
| pub enum MutationV37 { |
| ObjectStore(ObjectStoreMutationV37), |
| EncryptedObjectStore(Box<[u8]>), |
| Allocator(AllocatorMutationV32), |
| BeginFlush, |
| EndFlush, |
| DeleteVolume, |
| UpdateBorrowed(u64), |
| UpdateMutationsKey(UpdateMutationsKeyV32), |
| CreateInternalDir(u64), |
| } |
| |
| #[derive(Debug, Deserialize, Migrate, Serialize, Versioned, TypeFingerprint)] |
| #[migrate_to_version(MutationV37)] |
| pub enum MutationV33 { |
| ObjectStore(ObjectStoreMutationV33), |
| EncryptedObjectStore(Box<[u8]>), |
| Allocator(AllocatorMutationV32), |
| BeginFlush, |
| EndFlush, |
| DeleteVolume, |
| UpdateBorrowed(u64), |
| UpdateMutationsKey(UpdateMutationsKeyV32), |
| CreateInternalDir(u64), |
| } |
| |
| #[derive(Debug, Deserialize, Migrate, Serialize, Versioned, TypeFingerprint)] |
| #[migrate_to_version(MutationV33)] |
| pub enum MutationV32 { |
| ObjectStore(ObjectStoreMutationV32), |
| EncryptedObjectStore(Box<[u8]>), |
| Allocator(AllocatorMutationV32), |
| BeginFlush, |
| EndFlush, |
| DeleteVolume, |
| UpdateBorrowed(u64), |
| UpdateMutationsKey(UpdateMutationsKeyV32), |
| } |
| |
| impl Mutation { |
| pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self { |
| Mutation::ObjectStore(ObjectStoreMutation { |
| item: Item::new(key, value), |
| op: Operation::Insert, |
| }) |
| } |
| |
| pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self { |
| Mutation::ObjectStore(ObjectStoreMutation { |
| item: Item::new(key, value), |
| op: Operation::ReplaceOrInsert, |
| }) |
| } |
| |
| pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self { |
| Mutation::ObjectStore(ObjectStoreMutation { |
| item: Item::new(key, value), |
| op: Operation::Merge, |
| }) |
| } |
| |
| pub fn update_mutations_key(key: WrappedKey) -> Self { |
| Mutation::UpdateMutationsKey(key.into()) |
| } |
| } |
| |
| // We have custom comparison functions for mutations that just use the key, rather than the key and |
| // value that would be used by default so that we can deduplicate and find mutations (see |
| // get_object_mutation below). |
| pub type ObjectStoreMutation = ObjectStoreMutationV38; |
| |
| #[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)] |
| #[cfg_attr(fuzz, derive(arbitrary::Arbitrary))] |
| pub struct ObjectStoreMutationV38 { |
| pub item: ObjectItemV38, |
| pub op: OperationV32, |
| } |
| |
| #[derive(Debug, Deserialize, Migrate, Serialize, TypeFingerprint)] |
| #[migrate_nodefault] |
| pub struct ObjectStoreMutationV37 { |
| item: ObjectItemV37, |
| op: OperationV32, |
| } |
| |
| #[derive(Debug, Deserialize, Migrate, Serialize, TypeFingerprint)] |
| #[migrate_nodefault] |
| #[migrate_to_version(ObjectStoreMutationV37)] |
| pub struct ObjectStoreMutationV33 { |
| item: ObjectItemV33, |
| op: OperationV32, |
| } |
| |
| #[derive(Debug, Deserialize, Migrate, Serialize, TypeFingerprint)] |
| #[migrate_nodefault] |
| #[migrate_to_version(ObjectStoreMutationV33)] |
| pub struct ObjectStoreMutationV32 { |
| item: ObjectItemV32, |
| op: OperationV32, |
| } |
| |
| /// The different LSM tree operations that can be performed as part of a mutation. |
| pub type Operation = OperationV32; |
| |
| #[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)] |
| #[cfg_attr(fuzz, derive(arbitrary::Arbitrary))] |
| pub enum OperationV32 { |
| Insert, |
| ReplaceOrInsert, |
| Merge, |
| } |
| |
| impl Ord for ObjectStoreMutation { |
| fn cmp(&self, other: &Self) -> Ordering { |
| self.item.key.cmp(&other.item.key) |
| } |
| } |
| |
| impl PartialOrd for ObjectStoreMutation { |
| fn partial_cmp(&self, other: &Self) -> Option<Ordering> { |
| Some(self.cmp(other)) |
| } |
| } |
| |
| impl PartialEq for ObjectStoreMutation { |
| fn eq(&self, other: &Self) -> bool { |
| self.item.key.eq(&other.item.key) |
| } |
| } |
| |
| impl Eq for ObjectStoreMutation {} |
| |
| impl Ord for AllocatorItem { |
| fn cmp(&self, other: &Self) -> Ordering { |
| self.key.cmp(&other.key) |
| } |
| } |
| |
| impl PartialOrd for AllocatorItem { |
| fn partial_cmp(&self, other: &Self) -> Option<Ordering> { |
| Some(self.cmp(other)) |
| } |
| } |
| |
| /// Same as std::ops::Range but with Ord and PartialOrd support, sorted first by start of the range, |
| /// then by the end. |
| pub type DeviceRange = DeviceRangeV32; |
| |
| #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)] |
| #[cfg_attr(fuzz, derive(arbitrary::Arbitrary))] |
| pub struct DeviceRangeV32(pub Range<u64>); |
| |
| impl Deref for DeviceRange { |
| type Target = Range<u64>; |
| |
| fn deref(&self) -> &Self::Target { |
| &self.0 |
| } |
| } |
| |
| impl DerefMut for DeviceRange { |
| fn deref_mut(&mut self) -> &mut Self::Target { |
| &mut self.0 |
| } |
| } |
| |
| impl From<Range<u64>> for DeviceRange { |
| fn from(range: Range<u64>) -> Self { |
| Self(range) |
| } |
| } |
| |
| impl Into<Range<u64>> for DeviceRange { |
| fn into(self) -> Range<u64> { |
| self.0 |
| } |
| } |
| |
| impl Ord for DeviceRange { |
| fn cmp(&self, other: &Self) -> Ordering { |
| self.start.cmp(&other.start).then(self.end.cmp(&other.end)) |
| } |
| } |
| |
| impl PartialOrd for DeviceRange { |
| fn partial_cmp(&self, other: &Self) -> Option<Ordering> { |
| Some(self.cmp(other)) |
| } |
| } |
| |
| pub type AllocatorMutation = AllocatorMutationV32; |
| |
| #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)] |
| #[cfg_attr(fuzz, derive(arbitrary::Arbitrary))] |
| pub enum AllocatorMutationV32 { |
| Allocate { |
| device_range: DeviceRangeV32, |
| owner_object_id: u64, |
| }, |
| Deallocate { |
| device_range: DeviceRangeV32, |
| owner_object_id: u64, |
| }, |
| SetLimit { |
| owner_object_id: u64, |
| bytes: u64, |
| }, |
| /// Marks all extents with a given owner_object_id for deletion. |
| /// Used to free space allocated to encrypted ObjectStore where we may not have the key. |
| /// Note that the actual deletion time is undefined so this should never be used where an |
| /// ObjectStore is still in use due to a high risk of corruption. Similarly, owner_object_id |
| /// should never be reused for the same reasons. |
| MarkForDeletion(u64), |
| } |
| |
| pub type UpdateMutationsKey = UpdateMutationsKeyV32; |
| |
| #[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)] |
| pub struct UpdateMutationsKeyV32(pub WrappedKeyV32); |
| |
| impl From<UpdateMutationsKey> for WrappedKey { |
| fn from(outer: UpdateMutationsKey) -> Self { |
| outer.0 |
| } |
| } |
| |
| impl From<WrappedKey> for UpdateMutationsKey { |
| fn from(inner: WrappedKey) -> Self { |
| Self(inner) |
| } |
| } |
| |
| #[cfg(fuzz)] |
| impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey { |
| fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> { |
| <u64>::arbitrary(u).map(|wrapping_key_id| { |
| UpdateMutationsKey::from(WrappedKey { |
| wrapping_key_id, |
| // There doesn't seem to be much point to randomly generate crypto keys. |
| key: fxfs_crypto::WrappedKeyBytes::default(), |
| }) |
| }) |
| } |
| } |
| |
| impl Ord for UpdateMutationsKey { |
| fn cmp(&self, other: &Self) -> Ordering { |
| (self as *const UpdateMutationsKey).cmp(&(other as *const _)) |
| } |
| } |
| |
| impl PartialOrd for UpdateMutationsKey { |
| fn partial_cmp(&self, other: &Self) -> Option<Ordering> { |
| Some(self.cmp(other)) |
| } |
| } |
| |
| impl Eq for UpdateMutationsKey {} |
| |
| impl PartialEq for UpdateMutationsKey { |
| fn eq(&self, other: &Self) -> bool { |
| std::ptr::eq(self, other) |
| } |
| } |
| |
| /// When creating a transaction, locks typically need to be held to prevent two or more writers |
| /// trying to make conflicting mutations at the same time. LockKeys are used for this. |
| #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)] |
| pub enum LockKey { |
| /// Used to lock changes to a particular object attribute (e.g. writes). |
| ObjectAttribute { |
| store_object_id: u64, |
| object_id: u64, |
| attribute_id: u64, |
| }, |
| |
| /// Used to lock changes to a particular object (e.g. adding a child to a directory). |
| Object { |
| store_object_id: u64, |
| object_id: u64, |
| }, |
| |
| /// Locks the entire filesystem. |
| Filesystem, |
| |
| ProjectId { |
| store_object_id: u64, |
| project_id: u64, |
| }, |
| |
| /// Used to lock flushing an object. |
| Flush { |
| object_id: u64, |
| }, |
| |
| /// Used to lock any truncate operations for a file. |
| Truncate { |
| store_object_id: u64, |
| object_id: u64, |
| }, |
| } |
| |
| impl LockKey { |
| pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self { |
| LockKey::ObjectAttribute { store_object_id, object_id, attribute_id } |
| } |
| |
| pub const fn object(store_object_id: u64, object_id: u64) -> Self { |
| LockKey::Object { store_object_id, object_id } |
| } |
| |
| pub const fn flush(object_id: u64) -> Self { |
| LockKey::Flush { object_id } |
| } |
| |
| pub const fn truncate(store_object_id: u64, object_id: u64) -> Self { |
| LockKey::Truncate { store_object_id, object_id } |
| } |
| } |
| |
| /// A container for holding `LockKey` objects. Can store a single `LockKey` inline. |
| #[derive(Clone, Debug)] |
| pub enum LockKeys { |
| None, |
| Inline(LockKey), |
| Vec(Vec<LockKey>), |
| } |
| |
| impl LockKeys { |
| pub fn with_capacity(capacity: usize) -> Self { |
| if capacity > 1 { |
| LockKeys::Vec(Vec::with_capacity(capacity)) |
| } else { |
| LockKeys::None |
| } |
| } |
| |
| pub fn push(&mut self, key: LockKey) { |
| match self { |
| Self::None => *self = LockKeys::Inline(key), |
| Self::Inline(inline) => { |
| *self = LockKeys::Vec(vec![*inline, key]); |
| } |
| Self::Vec(vec) => vec.push(key), |
| } |
| } |
| |
| pub fn truncate(&mut self, len: usize) { |
| match self { |
| Self::None => {} |
| Self::Inline(_) => { |
| if len == 0 { |
| *self = Self::None; |
| } |
| } |
| Self::Vec(vec) => vec.truncate(len), |
| } |
| } |
| |
| fn len(&self) -> usize { |
| match self { |
| Self::None => 0, |
| Self::Inline(_) => 1, |
| Self::Vec(vec) => vec.len(), |
| } |
| } |
| |
| fn contains(&self, key: &LockKey) -> bool { |
| match self { |
| Self::None => false, |
| Self::Inline(single) => single == key, |
| Self::Vec(vec) => vec.contains(key), |
| } |
| } |
| |
| fn sort_unstable(&mut self) { |
| match self { |
| Self::Vec(vec) => vec.sort_unstable(), |
| _ => {} |
| } |
| } |
| |
| fn dedup(&mut self) { |
| match self { |
| Self::Vec(vec) => vec.dedup(), |
| _ => {} |
| } |
| } |
| |
| fn iter(&self) -> LockKeysIter<'_> { |
| match self { |
| LockKeys::None => LockKeysIter::None, |
| LockKeys::Inline(key) => LockKeysIter::Inline(key), |
| LockKeys::Vec(keys) => LockKeysIter::Vec(keys.into_iter()), |
| } |
| } |
| } |
| |
| enum LockKeysIter<'a> { |
| None, |
| Inline(&'a LockKey), |
| Vec(<&'a Vec<LockKey> as IntoIterator>::IntoIter), |
| } |
| |
| impl<'a> Iterator for LockKeysIter<'a> { |
| type Item = &'a LockKey; |
| fn next(&mut self) -> Option<Self::Item> { |
| match self { |
| Self::None => None, |
| Self::Inline(inline) => { |
| let next = *inline; |
| *self = Self::None; |
| Some(next) |
| } |
| Self::Vec(vec) => vec.next(), |
| } |
| } |
| } |
| |
| impl Default for LockKeys { |
| fn default() -> Self { |
| LockKeys::None |
| } |
| } |
| |
| #[macro_export] |
| macro_rules! lock_keys { |
| () => { |
| $crate::object_store::transaction::LockKeys::None |
| }; |
| ($lock_key:expr $(,)?) => { |
| $crate::object_store::transaction::LockKeys::Inline($lock_key) |
| }; |
| ($($lock_keys:expr),+ $(,)?) => { |
| $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+]) |
| }; |
| } |
| pub use lock_keys; |
| |
| /// Mutations in a transaction can be associated with an object so that when mutations are applied, |
| /// updates can be applied to in-memory structures. For example, we cache object sizes, so when a |
| /// size change is applied, we can update the cached object size. |
| pub trait AssociatedObject: Send + Sync { |
| fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) { |
| } |
| } |
| |
| pub enum AssocObj<'a> { |
| None, |
| Borrowed(&'a (dyn AssociatedObject)), |
| Owned(Box<dyn AssociatedObject>), |
| } |
| |
| impl AssocObj<'_> { |
| pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> { |
| match self { |
| AssocObj::None => None, |
| AssocObj::Borrowed(ref b) => Some(f(*b)), |
| AssocObj::Owned(ref o) => Some(f(o.as_ref())), |
| } |
| } |
| } |
| |
| pub struct TxnMutation<'a> { |
| // This, at time of writing, is either the object ID of an object store, or the object ID of the |
| // allocator. In the case of an object mutation, there's another object ID in the mutation |
| // record that would be for the object actually being changed. |
| pub object_id: u64, |
| |
| // The actual mutation. This gets serialized to the journal. |
| pub mutation: Mutation, |
| |
| // An optional associated object for the mutation. During replay, there will always be no |
| // associated object. |
| pub associated_object: AssocObj<'a>, |
| } |
| |
| // We store TxnMutation in a set, and for that, we only use object_id and mutation and not the |
| // associated object or checksum. |
| impl Ord for TxnMutation<'_> { |
| fn cmp(&self, other: &Self) -> Ordering { |
| self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation)) |
| } |
| } |
| |
| impl PartialOrd for TxnMutation<'_> { |
| fn partial_cmp(&self, other: &Self) -> Option<Ordering> { |
| Some(self.cmp(other)) |
| } |
| } |
| |
| impl PartialEq for TxnMutation<'_> { |
| fn eq(&self, other: &Self) -> bool { |
| self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation) |
| } |
| } |
| |
| impl Eq for TxnMutation<'_> {} |
| |
| impl std::fmt::Debug for TxnMutation<'_> { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| f.debug_struct("TxnMutation") |
| .field("object_id", &self.object_id) |
| .field("mutation", &self.mutation) |
| .finish() |
| } |
| } |
| |
| pub enum MetadataReservation { |
| // The state after a transaction has been dropped. |
| None, |
| |
| // Metadata space for this transaction is being borrowed from ObjectManager's metadata |
| // reservation. |
| Borrowed, |
| |
| // A metadata reservation was made when the transaction was created. |
| Reservation(Reservation), |
| |
| // The metadata space is being _held_ within `allocator_reservation`. |
| Hold(u64), |
| } |
| |
| /// A transaction groups mutation records to be committed as a group. |
| pub struct Transaction<'a> { |
| txn_guard: TxnGuard<'a>, |
| |
| // The mutations that make up this transaction. |
| mutations: BTreeSet<TxnMutation<'a>>, |
| |
| // The locks that this transaction currently holds. |
| txn_locks: LockKeys, |
| |
| /// If set, an allocator reservation that should be used for allocations. |
| pub allocator_reservation: Option<&'a Reservation>, |
| |
| /// The reservation for the metadata for this transaction. |
| pub metadata_reservation: MetadataReservation, |
| |
| // Keep track of objects explicitly created by this transaction. No locks are required for them. |
| // Addressed by (owner_object_id, object_id). |
| new_objects: BTreeSet<(u64, u64)>, |
| |
| /// Any data checksums which should be evaluated when replaying this transaction. |
| checksums: Vec<(Range<u64>, Vec<Checksum>)>, |
| } |
| |
| impl<'a> Transaction<'a> { |
| /// Creates a new transaction. `txn_locks` are read locks that can be upgraded to write locks |
| /// at commit time. |
| pub async fn new( |
| txn_guard: TxnGuard<'a>, |
| options: Options<'a>, |
| txn_locks: LockKeys, |
| ) -> Result<Transaction<'a>, Error> { |
| txn_guard.fs.add_transaction(options.skip_journal_checks).await; |
| let fs = txn_guard.fs.clone(); |
| let guard = scopeguard::guard((), |_| fs.sub_transaction()); |
| let (metadata_reservation, allocator_reservation, hold) = |
| txn_guard.fs.reservation_for_transaction(options).await?; |
| |
| let txn_locks = { |
| let lock_manager = txn_guard.fs.lock_manager(); |
| let mut write_guard = lock_manager.txn_lock(txn_locks).await; |
| std::mem::take(&mut write_guard.0.lock_keys) |
| }; |
| let mut transaction = Transaction { |
| txn_guard, |
| mutations: BTreeSet::new(), |
| txn_locks, |
| allocator_reservation: None, |
| metadata_reservation, |
| new_objects: BTreeSet::new(), |
| checksums: Vec::new(), |
| }; |
| |
| ScopeGuard::into_inner(guard); |
| hold.map(|h| h.forget()); // Transaction takes ownership from here on. |
| transaction.allocator_reservation = allocator_reservation; |
| Ok(transaction) |
| } |
| |
| pub fn txn_guard(&self) -> &TxnGuard<'_> { |
| &self.txn_guard |
| } |
| |
| pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> { |
| &self.mutations |
| } |
| |
| pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> { |
| self.new_objects.clear(); |
| mem::take(&mut self.mutations) |
| } |
| |
| /// Adds a mutation to this transaction. If the mutation already exists, it is replaced and the |
| /// old mutation is returned. |
| pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> { |
| self.add_with_object(object_id, mutation, AssocObj::None) |
| } |
| |
| /// Removes a mutation that matches `mutation`. |
| pub fn remove(&mut self, object_id: u64, mutation: Mutation) { |
| let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None }; |
| if self.mutations.remove(&txn_mutation) { |
| if let Mutation::ObjectStore(ObjectStoreMutation { |
| item: |
| ObjectItem { |
| key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object }, |
| .. |
| }, |
| op: Operation::Insert, |
| }) = txn_mutation.mutation |
| { |
| self.new_objects.remove(&(object_id, new_object_id)); |
| } |
| } |
| } |
| |
| /// Adds a mutation with an associated object. If the mutation already exists, it is replaced |
| /// and the old mutation is returned. |
| pub fn add_with_object( |
| &mut self, |
| object_id: u64, |
| mutation: Mutation, |
| associated_object: AssocObj<'a>, |
| ) -> Option<Mutation> { |
| assert!(object_id != INVALID_OBJECT_ID); |
| let txn_mutation = TxnMutation { object_id, mutation, associated_object }; |
| self.verify_locks(&txn_mutation); |
| self.mutations.replace(txn_mutation).map(|m| m.mutation) |
| } |
| |
| pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>) { |
| self.checksums.push((range, checksums)); |
| } |
| |
| pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>)> { |
| std::mem::replace(&mut self.checksums, Vec::new()) |
| } |
| |
| fn verify_locks(&mut self, mutation: &TxnMutation<'_>) { |
| // It was considered to change the locks from Vec to BTreeSet since we'll now be searching |
| // through it, but given the small set that these locks usually comprise, it probably isn't |
| // worth it. |
| if let TxnMutation { |
| mutation: |
| Mutation::ObjectStore { 0: ObjectStoreMutation { item: ObjectItem { key, .. }, op } }, |
| object_id: store_object_id, |
| .. |
| } = mutation |
| { |
| match &key.data { |
| ObjectKeyData::Attribute(..) => { |
| // TODO(https://fxbug.dev/42073914): Check lock requirements. |
| } |
| ObjectKeyData::Child { .. } => { |
| let id = key.object_id; |
| if !self.txn_locks.contains(&LockKey::object(*store_object_id, id)) |
| && !self.new_objects.contains(&(*store_object_id, id)) |
| { |
| debug_assert!( |
| false, |
| "Not holding required lock for object {id} \ |
| in store {store_object_id}" |
| ); |
| error!( |
| "Not holding required lock for object {id} in store \ |
| {store_object_id}" |
| ) |
| } |
| } |
| ObjectKeyData::GraveyardEntry { .. } => { |
| // TODO(https://fxbug.dev/42073911): Check lock requirements. |
| } |
| ObjectKeyData::GraveyardAttributeEntry { .. } => { |
| // TODO(https://fxbug.dev/122974): Check lock requirements. |
| } |
| ObjectKeyData::Keys => { |
| let id = key.object_id; |
| if !self.txn_locks.contains(&LockKey::object(*store_object_id, id)) |
| && !self.new_objects.contains(&(*store_object_id, id)) |
| { |
| debug_assert!( |
| false, |
| "Not holding required lock for object {id} \ |
| in store {store_object_id}" |
| ); |
| error!( |
| "Not holding required lock for object {id} in store \ |
| {store_object_id}" |
| ) |
| } |
| } |
| ObjectKeyData::Object => match op { |
| // Insert implies the caller expects no object with which to race |
| Operation::Insert => { |
| self.new_objects.insert((*store_object_id, key.object_id)); |
| } |
| Operation::Merge | Operation::ReplaceOrInsert => { |
| let id = key.object_id; |
| if !self.txn_locks.contains(&LockKey::object(*store_object_id, id)) |
| && !self.new_objects.contains(&(*store_object_id, id)) |
| { |
| debug_assert!( |
| false, |
| "Not holding required lock for object {id} \ |
| in store {store_object_id}" |
| ); |
| error!( |
| "Not holding required lock for object {id} in store \ |
| {store_object_id}" |
| ) |
| } |
| } |
| }, |
| ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => { |
| if !self.txn_locks.contains(&LockKey::ProjectId { |
| store_object_id: *store_object_id, |
| project_id: *project_id, |
| }) { |
| debug_assert!( |
| false, |
| "Not holding required lock for project limit id {project_id} \ |
| in store {store_object_id}" |
| ); |
| error!( |
| "Not holding required lock for project limit id {project_id} in \ |
| store {store_object_id}" |
| ) |
| } |
| } |
| ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op { |
| Operation::Insert | Operation::ReplaceOrInsert => { |
| panic!( |
| "Project usage is all handled by merging deltas, no inserts or \ |
| replacements should be used" |
| ); |
| } |
| // Merges are all handled like atomic +/- and serialized by the tree locks. |
| Operation::Merge => {} |
| }, |
| ObjectKeyData::ExtendedAttribute { .. } => { |
| let id = key.object_id; |
| if !self.txn_locks.contains(&LockKey::object(*store_object_id, id)) |
| && !self.new_objects.contains(&(*store_object_id, id)) |
| { |
| debug_assert!( |
| false, |
| "Not holding required lock for object {id} \ |
| in store {store_object_id} while mutating extended attribute" |
| ); |
| error!( |
| "Not holding required lock for object {id} in store \ |
| {store_object_id} while mutating extended attribute" |
| ) |
| } |
| } |
| } |
| } |
| } |
| |
| /// Returns true if this transaction has no mutations. |
| pub fn is_empty(&self) -> bool { |
| self.mutations.is_empty() |
| } |
| |
| /// Searches for an existing object mutation within the transaction that has the given key and |
| /// returns it if found. |
| pub fn get_object_mutation( |
| &self, |
| store_object_id: u64, |
| key: ObjectKey, |
| ) -> Option<&ObjectStoreMutation> { |
| if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) = |
| self.mutations.get(&TxnMutation { |
| object_id: store_object_id, |
| mutation: Mutation::insert_object(key, ObjectValue::None), |
| associated_object: AssocObj::None, |
| }) |
| { |
| Some(mutation) |
| } else { |
| None |
| } |
| } |
| |
| /// Commits a transaction. If successful, returns the journal offset of the transaction. |
| pub async fn commit(mut self) -> Result<u64, Error> { |
| debug!(txn = ?&self, "Commit"); |
| self.txn_guard.fs.clone().commit_transaction(&mut self, &mut |_| {}).await |
| } |
| |
| /// Commits and then runs the callback whilst locks are held. The callback accepts a single |
| /// parameter which is the journal offset of the transaction. |
| pub async fn commit_with_callback<R: Send>( |
| mut self, |
| f: impl FnOnce(u64) -> R + Send, |
| ) -> Result<R, Error> { |
| debug!(txn = ?&self, "Commit"); |
| // It's not possible to pass an FnOnce via a trait without boxing it, but we don't want to |
| // do that (for performance reasons), hence the reason for the following. |
| let mut f = Some(f); |
| let mut result = None; |
| self.txn_guard |
| .fs |
| .clone() |
| .commit_transaction(&mut self, &mut |offset| { |
| result = Some(f.take().unwrap()(offset)); |
| }) |
| .await?; |
| Ok(result.unwrap()) |
| } |
| |
| /// Commits the transaction, but allows the transaction to be used again. The locks are not |
| /// dropped (but transaction locks will get downgraded to read locks). |
| pub async fn commit_and_continue(&mut self) -> Result<(), Error> { |
| debug!(txn = ?self, "Commit"); |
| self.txn_guard.fs.clone().commit_transaction(self, &mut |_| {}).await?; |
| assert!(self.mutations.is_empty()); |
| self.txn_guard.fs.lock_manager().downgrade_locks(&self.txn_locks); |
| Ok(()) |
| } |
| } |
| |
| impl Drop for Transaction<'_> { |
| fn drop(&mut self) { |
| // Call the filesystem implementation of drop_transaction which should, as a minimum, call |
| // LockManager's drop_transaction to ensure the locks are released. |
| debug!(txn = ?&self, "Drop"); |
| self.txn_guard.fs.clone().drop_transaction(self); |
| } |
| } |
| |
| impl std::fmt::Debug for Transaction<'_> { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| f.debug_struct("Transaction") |
| .field("mutations", &self.mutations) |
| .field("txn_locks", &self.txn_locks) |
| .field("reservation", &self.allocator_reservation) |
| .finish() |
| } |
| } |
| |
| pub enum BorrowedOrOwned<'a, T> { |
| Borrowed(&'a T), |
| Owned(T), |
| } |
| |
| impl<T> Deref for BorrowedOrOwned<'_, T> { |
| type Target = T; |
| |
| fn deref(&self) -> &Self::Target { |
| match self { |
| BorrowedOrOwned::Borrowed(b) => b, |
| BorrowedOrOwned::Owned(o) => &o, |
| } |
| } |
| } |
| |
| impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> { |
| fn from(value: &'a T) -> Self { |
| BorrowedOrOwned::Borrowed(value) |
| } |
| } |
| |
| impl<T> From<T> for BorrowedOrOwned<'_, T> { |
| fn from(value: T) -> Self { |
| BorrowedOrOwned::Owned(value) |
| } |
| } |
| |
| /// LockManager holds the locks that transactions might have taken. A TransactionManager |
| /// implementation would typically have one of these. Three different kinds of locks are supported. |
| /// There are read locks and write locks, which are as one would expect. The third kind of lock is |
| /// a _transaction_ lock. When first acquired, these block other writes but do not block reads. |
| /// When it is time to commit a transaction, these locks are upgraded to full write locks and then |
| /// dropped after committing (unless commit_and_continue is used). This way, reads are only blocked |
| /// for the shortest possible time. It follows that write locks should be used sparingly. Locks |
| /// are granted in order with one exception: when a lock is in the initial _transaction_ lock state |
| /// (LockState::Locked), all read locks are allowed even if there are other tasks waiting for the |
| /// lock. The reason for this is because we allow read locks to be taken by tasks that have taken a |
| /// _transaction_ lock (i.e. recursion is allowed). In other cases, such as when a writer is |
| /// waiting and there are only readers, readers will queue up behind the writer. |
| pub struct LockManager { |
| locks: Mutex<Locks>, |
| } |
| |
| struct Locks { |
| keys: HashMap<LockKey, LockEntry>, |
| } |
| |
| impl Locks { |
| fn drop_lock(&mut self, key: LockKey, state: LockState) { |
| if let Entry::Occupied(mut occupied) = self.keys.entry(key) { |
| let entry = occupied.get_mut(); |
| let wake = match state { |
| LockState::ReadLock => { |
| entry.read_count -= 1; |
| entry.read_count == 0 |
| } |
| // drop_write_locks currently depends on us treating Locked and WriteLock the same. |
| LockState::Locked | LockState::WriteLock => { |
| entry.state = LockState::ReadLock; |
| true |
| } |
| }; |
| if wake { |
| // SAFETY: The lock in `LockManager::locks` is held. |
| unsafe { |
| entry.wake(); |
| } |
| if entry.can_remove() { |
| occupied.remove_entry(); |
| } |
| } |
| } else { |
| unreachable!(); |
| } |
| } |
| |
| fn drop_read_locks(&mut self, lock_keys: LockKeys) { |
| for lock in lock_keys.iter() { |
| self.drop_lock(*lock, LockState::ReadLock); |
| } |
| } |
| |
| fn drop_write_locks(&mut self, lock_keys: LockKeys) { |
| for lock in lock_keys.iter() { |
| // This is a bit hacky, but this works for locks in either the Locked or WriteLock |
| // states. |
| self.drop_lock(*lock, LockState::WriteLock); |
| } |
| } |
| |
| // Downgrades locks from WriteLock to Locked. |
| fn downgrade_locks(&mut self, lock_keys: &LockKeys) { |
| for lock in lock_keys.iter() { |
| // SAFETY: The lock in `LockManager::locks` is held. |
| unsafe { |
| self.keys.get_mut(lock).unwrap().downgrade_lock(); |
| } |
| } |
| } |
| } |
| |
| #[derive(Debug)] |
| struct LockEntry { |
| // In the states that allow readers (ReadLock, Locked), this count can be non-zero |
| // to indicate the number of active readers. |
| read_count: u64, |
| |
| // The state of the lock (see below). |
| state: LockState, |
| |
| // A doubly-linked list of wakers that should be woken when they have been granted the lock. |
| // New wakers are usually chained on to tail, with the exception being the case where a lock in |
| // state Locked is to be upgraded to WriteLock, but can't because there are readers. It might |
| // be possible to use intrusive-collections in the future. |
| head: *const LockWaker, |
| tail: *const LockWaker, |
| } |
| |
| unsafe impl Send for LockEntry {} |
| |
| // Represents a node in the waker list. It is only safe to access the members wrapped by UnsafeCell |
| // when LockManager's `locks` member is locked. |
| struct LockWaker { |
| // The next and previous pointers in the doubly-linked list. |
| next: UnsafeCell<*const LockWaker>, |
| prev: UnsafeCell<*const LockWaker>, |
| |
| // Holds the lock key for this waker. This is required so that we can find the associated |
| // `LockEntry`. |
| key: LockKey, |
| |
| // The underlying waker that should be used to wake the task. |
| waker: UnsafeCell<WakerState>, |
| |
| // The target state for this waker. |
| target_state: LockState, |
| |
| // True if this is an upgrade. |
| is_upgrade: bool, |
| |
| // We need to be pinned because these form part of the linked list. |
| _pin: PhantomPinned, |
| } |
| |
| enum WakerState { |
| // This is the initial state before the waker has been first polled. |
| Pending, |
| |
| // Once polled, this contains the actual waker. |
| Registered(Waker), |
| |
| // The waker has been woken and has been granted the lock. |
| Woken, |
| } |
| |
| impl WakerState { |
| fn is_woken(&self) -> bool { |
| matches!(self, WakerState::Woken) |
| } |
| } |
| |
| unsafe impl Send for LockWaker {} |
| unsafe impl Sync for LockWaker {} |
| |
| impl LockWaker { |
| // Waits for the waker to be woken. |
| async fn wait(&self, manager: &LockManager) { |
| // We must guard against the future being dropped. |
| let waker_guard = scopeguard::guard((), |_| { |
| let mut locks = manager.locks.lock().unwrap(); |
| // SAFETY: We've acquired the lock. |
| unsafe { |
| if (*self.waker.get()).is_woken() { |
| // We were woken, but didn't actually run, so we must drop the lock. |
| if self.is_upgrade { |
| locks.keys.get_mut(&self.key).unwrap().downgrade_lock(); |
| } else { |
| locks.drop_lock(self.key, self.target_state); |
| } |
| } else { |
| // We haven't been woken but we've been dropped so we must remove ourself from |
| // the waker list. |
| locks.keys.get_mut(&self.key).unwrap().remove_waker(self); |
| } |
| } |
| }); |
| |
| poll_fn(|cx| { |
| let _locks = manager.locks.lock().unwrap(); |
| // SAFETY: We've acquired the lock. |
| unsafe { |
| if (*self.waker.get()).is_woken() { |
| Poll::Ready(()) |
| } else { |
| *self.waker.get() = WakerState::Registered(cx.waker().clone()); |
| Poll::Pending |
| } |
| } |
| }) |
| .await; |
| |
| ScopeGuard::into_inner(waker_guard); |
| } |
| } |
| |
| #[derive(Copy, Clone, Debug, PartialEq)] |
| enum LockState { |
| // In this state, there are only readers. |
| ReadLock, |
| |
| // This state is used for transactions to lock other writers, but it still allows readers. |
| Locked, |
| |
| // A writer has exclusive access; all other readers and writers are blocked. |
| WriteLock, |
| } |
| |
| impl LockManager { |
| pub fn new() -> Self { |
| LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) } |
| } |
| |
| /// Acquires the locks. It is the caller's responsibility to ensure that drop_transaction is |
| /// called when a transaction is dropped i.e. the filesystem's drop_transaction method should |
| /// call LockManager's drop_transaction method. |
| pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> { |
| TransactionLocks( |
| debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(), |
| ) |
| } |
| |
| // `state` indicates the kind of lock required. ReadLock means acquire a read lock. Locked |
| // means lock other writers, but still allow readers. WriteLock means acquire a write lock. |
| async fn lock<'a>( |
| &'a self, |
| mut lock_keys: LockKeys, |
| target_state: LockState, |
| ) -> Either<ReadGuard<'a>, WriteGuard<'a>> { |
| let mut guard = match &target_state { |
| LockState::ReadLock => Left(ReadGuard { |
| manager: self, |
| lock_keys: LockKeys::with_capacity(lock_keys.len()), |
| }), |
| LockState::Locked | LockState::WriteLock => Right(WriteGuard { |
| manager: self, |
| lock_keys: LockKeys::with_capacity(lock_keys.len()), |
| }), |
| }; |
| let guard_keys = match &mut guard { |
| Left(g) => &mut g.lock_keys, |
| Right(g) => &mut g.lock_keys, |
| }; |
| lock_keys.sort_unstable(); |
| lock_keys.dedup(); |
| for lock in lock_keys.iter() { |
| let lock_waker = None; |
| pin_mut!(lock_waker); |
| { |
| let mut locks = self.locks.lock().unwrap(); |
| match locks.keys.entry(*lock) { |
| Entry::Vacant(vacant) => { |
| vacant.insert(LockEntry { |
| read_count: if let LockState::ReadLock = target_state { |
| guard_keys.push(*lock); |
| 1 |
| } else { |
| guard_keys.push(*lock); |
| 0 |
| }, |
| state: target_state, |
| head: std::ptr::null(), |
| tail: std::ptr::null(), |
| }); |
| } |
| Entry::Occupied(mut occupied) => { |
| let entry = occupied.get_mut(); |
| // SAFETY: We've acquired the lock. |
| if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } { |
| if let LockState::ReadLock = target_state { |
| entry.read_count += 1; |
| guard_keys.push(*lock); |
| } else { |
| entry.state = target_state; |
| guard_keys.push(*lock); |
| } |
| } else { |
| // Initialise a waker and push it on the tail of the list. |
| // SAFETY: `lock_waker` isn't used prior to this point. |
| unsafe { |
| *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker { |
| next: UnsafeCell::new(std::ptr::null()), |
| prev: UnsafeCell::new(entry.tail), |
| key: *lock, |
| waker: UnsafeCell::new(WakerState::Pending), |
| target_state: target_state, |
| is_upgrade: false, |
| _pin: PhantomPinned, |
| }); |
| } |
| let waker = (*lock_waker).as_ref().unwrap(); |
| if entry.tail.is_null() { |
| entry.head = waker; |
| } else { |
| // SAFETY: We've acquired the lock. |
| unsafe { |
| *(*entry.tail).next.get() = waker; |
| } |
| } |
| entry.tail = waker; |
| } |
| } |
| } |
| } |
| if let Some(waker) = &*lock_waker { |
| waker.wait(self).await; |
| guard_keys.push(*lock); |
| } |
| } |
| guard |
| } |
| |
| /// This should be called by the filesystem's drop_transaction implementation. |
| pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) { |
| let mut locks = self.locks.lock().unwrap(); |
| locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks)); |
| } |
| |
| /// Prepares to commit by waiting for readers to finish. |
| pub async fn commit_prepare(&self, transaction: &Transaction<'_>) { |
| self.commit_prepare_keys(&transaction.txn_locks).await; |
| } |
| |
| async fn commit_prepare_keys(&self, lock_keys: &LockKeys) { |
| for lock in lock_keys.iter() { |
| let lock_waker = None; |
| pin_mut!(lock_waker); |
| { |
| let mut locks = self.locks.lock().unwrap(); |
| let entry = locks.keys.get_mut(lock).unwrap(); |
| assert_eq!(entry.state, LockState::Locked); |
| |
| if entry.read_count == 0 { |
| entry.state = LockState::WriteLock; |
| } else { |
| // Initialise a waker and push it on the head of the list. |
| // SAFETY: `lock_waker` isn't used prior to this point. |
| unsafe { |
| *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker { |
| next: UnsafeCell::new(entry.head), |
| prev: UnsafeCell::new(std::ptr::null()), |
| key: *lock, |
| waker: UnsafeCell::new(WakerState::Pending), |
| target_state: LockState::WriteLock, |
| is_upgrade: true, |
| _pin: PhantomPinned, |
| }); |
| } |
| let waker = (*lock_waker).as_ref().unwrap(); |
| if entry.head.is_null() { |
| entry.tail = (*lock_waker).as_ref().unwrap(); |
| } else { |
| // SAFETY: We've acquired the lock. |
| unsafe { |
| *(*entry.head).prev.get() = waker; |
| } |
| } |
| entry.head = waker; |
| } |
| } |
| |
| if let Some(waker) = &*lock_waker { |
| waker.wait(self).await; |
| } |
| } |
| } |
| |
| /// Acquires a read lock for the given keys. Read locks are only blocked whilst a transaction |
| /// is being committed for the same locks. They are only necessary where consistency is |
| /// required between different mutations within a transaction. For example, a write might |
| /// change the size and extents for an object, in which case a read lock is required so that |
| /// observed size and extents are seen together or not at all. |
| pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> { |
| debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap() |
| } |
| |
| /// Acquires a write lock for the given keys. Write locks provide exclusive access to the |
| /// requested lock keys. |
| pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> { |
| debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap() |
| } |
| |
| /// Downgrades locks from the WriteLock state to Locked state. This will panic if the locks are |
| /// not in the WriteLock state. |
| pub fn downgrade_locks(&self, lock_keys: &LockKeys) { |
| self.locks.lock().unwrap().downgrade_locks(lock_keys); |
| } |
| } |
| |
| // These unsafe functions require that `locks` in LockManager is locked. |
| impl LockEntry { |
| unsafe fn wake(&mut self) { |
| // If the lock's state is WriteLock, or there's nothing waiting, return early. |
| if self.head.is_null() || self.state == LockState::WriteLock { |
| return; |
| } |
| |
| let waker = &*self.head; |
| |
| if waker.is_upgrade { |
| if self.read_count > 0 { |
| return; |
| } |
| } else if !self.is_allowed(waker.target_state, true) { |
| return; |
| } |
| |
| self.pop_and_wake(); |
| |
| // If the waker was a write lock, we can't wake any more up, but otherwise, we can keep |
| // waking up readers. |
| if waker.target_state == LockState::WriteLock { |
| return; |
| } |
| |
| while !self.head.is_null() && (*self.head).target_state == LockState::ReadLock { |
| self.pop_and_wake(); |
| } |
| } |
| |
| unsafe fn pop_and_wake(&mut self) { |
| let waker = &*self.head; |
| |
| // Pop the waker. |
| self.head = *waker.next.get(); |
| if self.head.is_null() { |
| self.tail = std::ptr::null() |
| } else { |
| *(*self.head).prev.get() = std::ptr::null(); |
| } |
| |
| // Adjust our state accordingly. |
| if waker.target_state == LockState::ReadLock { |
| self.read_count += 1; |
| } else { |
| self.state = waker.target_state; |
| } |
| |
| // Now wake the task. |
| if let WakerState::Registered(waker) = |
| std::mem::replace(&mut *waker.waker.get(), WakerState::Woken) |
| { |
| waker.wake(); |
| } |
| } |
| |
| fn can_remove(&self) -> bool { |
| self.state == LockState::ReadLock && self.read_count == 0 |
| } |
| |
| unsafe fn remove_waker(&mut self, waker: &LockWaker) { |
| let is_first = (*waker.prev.get()).is_null(); |
| if is_first { |
| self.head = *waker.next.get(); |
| } else { |
| *(**waker.prev.get()).next.get() = *waker.next.get(); |
| } |
| if (*waker.next.get()).is_null() { |
| self.tail = *waker.prev.get(); |
| } else { |
| *(**waker.next.get()).prev.get() = *waker.prev.get(); |
| } |
| if is_first { |
| // We must call wake in case we erased a pending write lock and readers can now proceed. |
| self.wake(); |
| } |
| } |
| |
| // Returns whether or not a lock with given `target_state` can proceed. `is_head` should be |
| // true if this is something at the head of the waker list (or the waker list is empty) and |
| // false if there are other items on the waker list that are prior. |
| unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool { |
| match self.state { |
| LockState::ReadLock => { |
| // Allow ReadLock and Locked so long as nothing else is waiting. |
| (self.read_count == 0 |
| || target_state == LockState::Locked |
| || target_state == LockState::ReadLock) |
| && is_head |
| } |
| LockState::Locked => { |
| // Always allow reads unless there's an upgrade waiting. We have to |
| // always allow reads in this state because tasks that have locks in |
| // the Locked state can later try and acquire ReadLock. |
| target_state == LockState::ReadLock && (is_head || !(*self.head).is_upgrade) |
| } |
| LockState::WriteLock => false, |
| } |
| } |
| |
| unsafe fn downgrade_lock(&mut self) { |
| assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock); |
| self.wake(); |
| } |
| } |
| |
| #[must_use] |
| pub struct ReadGuard<'a> { |
| manager: &'a LockManager, |
| lock_keys: LockKeys, |
| } |
| |
| impl Drop for ReadGuard<'_> { |
| fn drop(&mut self) { |
| let mut locks = self.manager.locks.lock().unwrap(); |
| locks.drop_read_locks(std::mem::take(&mut self.lock_keys)); |
| } |
| } |
| |
| impl fmt::Debug for ReadGuard<'_> { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("ReadGuard") |
| .field("manager", &(&self.manager as *const _)) |
| .field("lock_keys", &self.lock_keys) |
| .finish() |
| } |
| } |
| |
| #[must_use] |
| pub struct WriteGuard<'a> { |
| manager: &'a LockManager, |
| lock_keys: LockKeys, |
| } |
| |
| impl Drop for WriteGuard<'_> { |
| fn drop(&mut self) { |
| let mut locks = self.manager.locks.lock().unwrap(); |
| locks.drop_write_locks(std::mem::take(&mut self.lock_keys)); |
| } |
| } |
| |
| impl fmt::Debug for WriteGuard<'_> { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("WriteGuard") |
| .field("manager", &(&self.manager as *const _)) |
| .field("lock_keys", &self.lock_keys) |
| .finish() |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options}, |
| crate::filesystem::FxFilesystem, |
| fuchsia_async as fasync, |
| futures::{ |
| channel::oneshot::channel, future::FutureExt, join, pin_mut, stream::FuturesUnordered, |
| StreamExt, |
| }, |
| std::{sync::Mutex, task::Poll, time::Duration}, |
| storage_device::{fake_device::FakeDevice, DeviceHolder}, |
| }; |
| |
| #[fuchsia::test] |
| async fn test_simple() { |
| let device = DeviceHolder::new(FakeDevice::new(4096, 1024)); |
| let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed"); |
| let mut t = fs |
| .clone() |
| .new_transaction(lock_keys![], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| t.add(1, Mutation::BeginFlush); |
| assert!(!t.is_empty()); |
| } |
| |
| #[fuchsia::test] |
| async fn test_locks() { |
| let device = DeviceHolder::new(FakeDevice::new(4096, 1024)); |
| let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed"); |
| let (send1, recv1) = channel(); |
| let (send2, recv2) = channel(); |
| let (send3, recv3) = channel(); |
| let done = Mutex::new(false); |
| let mut futures = FuturesUnordered::new(); |
| futures.push( |
| async { |
| let _t = fs |
| .clone() |
| .new_transaction( |
| lock_keys![LockKey::object_attribute(1, 2, 3)], |
| Options::default(), |
| ) |
| .await |
| .expect("new_transaction failed"); |
| send1.send(()).unwrap(); // Tell the next future to continue. |
| send3.send(()).unwrap(); // Tell the last future to continue. |
| recv2.await.unwrap(); |
| // This is a halting problem so all we can do is sleep. |
| fasync::Timer::new(Duration::from_millis(100)).await; |
| assert!(!*done.lock().unwrap()); |
| } |
| .boxed(), |
| ); |
| futures.push( |
| async { |
| recv1.await.unwrap(); |
| // This should not block since it is a different key. |
| let _t = fs |
| .clone() |
| .new_transaction( |
| lock_keys![LockKey::object_attribute(2, 2, 3)], |
| Options::default(), |
| ) |
| .await |
| .expect("new_transaction failed"); |
| // Tell the first future to continue. |
| send2.send(()).unwrap(); |
| } |
| .boxed(), |
| ); |
| futures.push( |
| async { |
| // This should block until the first future has completed. |
| recv3.await.unwrap(); |
| let _t = fs |
| .clone() |
| .new_transaction( |
| lock_keys![LockKey::object_attribute(1, 2, 3)], |
| Options::default(), |
| ) |
| .await; |
| *done.lock().unwrap() = true; |
| } |
| .boxed(), |
| ); |
| while let Some(()) = futures.next().await {} |
| } |
| |
| #[fuchsia::test] |
| async fn test_read_lock_after_write_lock() { |
| let device = DeviceHolder::new(FakeDevice::new(4096, 1024)); |
| let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed"); |
| let (send1, recv1) = channel(); |
| let (send2, recv2) = channel(); |
| let done = Mutex::new(false); |
| join!( |
| async { |
| let t = fs |
| .clone() |
| .new_transaction( |
| lock_keys![LockKey::object_attribute(1, 2, 3)], |
| Options::default(), |
| ) |
| .await |
| .expect("new_transaction failed"); |
| send1.send(()).unwrap(); // Tell the next future to continue. |
| recv2.await.unwrap(); |
| t.commit().await.expect("commit failed"); |
| *done.lock().unwrap() = true; |
| }, |
| async { |
| recv1.await.unwrap(); |
| // Reads should not be blocked until the transaction is committed. |
| let _guard = fs |
| .lock_manager() |
| .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)]) |
| .await; |
| // Tell the first future to continue. |
| send2.send(()).unwrap(); |
| // It shouldn't proceed until we release our read lock, but it's a halting |
| // problem, so sleep. |
| fasync::Timer::new(Duration::from_millis(100)).await; |
| assert!(!*done.lock().unwrap()); |
| }, |
| ); |
| } |
| |
| #[fuchsia::test] |
| async fn test_write_lock_after_read_lock() { |
| let device = DeviceHolder::new(FakeDevice::new(4096, 1024)); |
| let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed"); |
| let (send1, recv1) = channel(); |
| let (send2, recv2) = channel(); |
| let done = Mutex::new(false); |
| join!( |
| async { |
| // Reads should not be blocked until the transaction is committed. |
| let _guard = fs |
| .lock_manager() |
| .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)]) |
| .await; |
| // Tell the next future to continue and then wait. |
| send1.send(()).unwrap(); |
| recv2.await.unwrap(); |
| // It shouldn't proceed until we release our read lock, but it's a halting |
| // problem, so sleep. |
| fasync::Timer::new(Duration::from_millis(100)).await; |
| assert!(!*done.lock().unwrap()); |
| }, |
| async { |
| recv1.await.unwrap(); |
| let t = fs |
| .clone() |
| .new_transaction( |
| lock_keys![LockKey::object_attribute(1, 2, 3)], |
| Options::default(), |
| ) |
| .await |
| .expect("new_transaction failed"); |
| send2.send(()).unwrap(); // Tell the first future to continue; |
| t.commit().await.expect("commit failed"); |
| *done.lock().unwrap() = true; |
| }, |
| ); |
| } |
| |
| #[fuchsia::test] |
| async fn test_drop_uncommitted_transaction() { |
| let device = DeviceHolder::new(FakeDevice::new(4096, 1024)); |
| let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed"); |
| let key = lock_keys![LockKey::object(1, 1)]; |
| |
| // Dropping while there's a reader. |
| { |
| let _write_lock = fs |
| .clone() |
| .new_transaction(key.clone(), Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| let _read_lock = fs.lock_manager().read_lock(key.clone()).await; |
| } |
| // Dropping while there's no reader. |
| { |
| let _write_lock = fs |
| .clone() |
| .new_transaction(key.clone(), Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| } |
| // Make sure we can take the lock again (i.e. it was actually released). |
| fs.clone() |
| .new_transaction(key.clone(), Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| } |
| |
| #[fuchsia::test] |
| async fn test_drop_waiting_write_lock() { |
| let manager = LockManager::new(); |
| let keys = lock_keys![LockKey::object(1, 1)]; |
| { |
| let _guard = manager.lock(keys.clone(), LockState::ReadLock).await; |
| if let Poll::Ready(_) = |
| futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed()) |
| { |
| assert!(false); |
| } |
| } |
| let _ = manager.lock(keys, LockState::WriteLock).await; |
| } |
| |
| #[fuchsia::test] |
| async fn test_write_lock_blocks_everything() { |
| let manager = LockManager::new(); |
| let keys = lock_keys![LockKey::object(1, 1)]; |
| { |
| let _guard = manager.lock(keys.clone(), LockState::WriteLock).await; |
| if let Poll::Ready(_) = |
| futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed()) |
| { |
| assert!(false); |
| } |
| if let Poll::Ready(_) = |
| futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed()) |
| { |
| assert!(false); |
| } |
| } |
| { |
| let _guard = manager.lock(keys.clone(), LockState::WriteLock).await; |
| } |
| { |
| let _guard = manager.lock(keys, LockState::ReadLock).await; |
| } |
| } |
| |
| #[fuchsia::test] |
| async fn test_downgrade_locks() { |
| let manager = LockManager::new(); |
| let keys = lock_keys![LockKey::object(1, 1)]; |
| let _guard = manager.txn_lock(keys.clone()).await; |
| manager.commit_prepare_keys(&keys).await; |
| |
| // Use FuturesUnordered so that we can check that the waker is woken. |
| let mut read_lock: FuturesUnordered<_> = |
| std::iter::once(manager.read_lock(keys.clone())).collect(); |
| |
| // Trying to acquire a read lock now should be blocked. |
| assert!(futures::poll!(read_lock.next()).is_pending()); |
| |
| manager.downgrade_locks(&keys); |
| |
| // After downgrading, it should be possible to take a read lock. |
| assert!(futures::poll!(read_lock.next()).is_ready()); |
| } |
| |
| #[fuchsia::test] |
| async fn test_dropped_write_lock_wakes() { |
| let manager = LockManager::new(); |
| let keys = lock_keys![LockKey::object(1, 1)]; |
| let _guard = manager.lock(keys.clone(), LockState::ReadLock).await; |
| let mut read_lock = FuturesUnordered::new(); |
| read_lock.push(manager.lock(keys.clone(), LockState::ReadLock)); |
| |
| { |
| let write_lock = manager.lock(keys, LockState::WriteLock); |
| pin_mut!(write_lock); |
| |
| // The write lock should be blocked because of the read lock. |
| assert!(futures::poll!(write_lock).is_pending()); |
| |
| // Another read lock should be blocked because of the write lock. |
| assert!(futures::poll!(read_lock.next()).is_pending()); |
| } |
| |
| // Dropping the write lock should allow the read lock to proceed. |
| assert!(futures::poll!(read_lock.next()).is_ready()); |
| } |
| |
| #[fuchsia::test] |
| async fn test_drop_upgrade() { |
| let manager = LockManager::new(); |
| let keys = lock_keys![LockKey::object(1, 1)]; |
| let _guard = manager.lock(keys.clone(), LockState::Locked).await; |
| |
| { |
| let commit_prepare = manager.commit_prepare_keys(&keys); |
| pin_mut!(commit_prepare); |
| let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await; |
| assert!(futures::poll!(commit_prepare).is_pending()); |
| |
| // Now we test dropping read_guard which should wake commit_prepare and |
| // then dropping commit_prepare. |
| } |
| |
| // We should be able to still commit_prepare. |
| manager.commit_prepare_keys(&keys).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_woken_upgrade_blocks_reads() { |
| let manager = LockManager::new(); |
| let keys = lock_keys![LockKey::object(1, 1)]; |
| // Start with a transaction lock. |
| let guard = manager.lock(keys.clone(), LockState::Locked).await; |
| |
| // Take a read lock. |
| let read1 = manager.lock(keys.clone(), LockState::ReadLock).await; |
| |
| // Try and upgrade the transaction lock, which should not be possible because of the read. |
| let commit_prepare = manager.commit_prepare_keys(&keys); |
| pin_mut!(commit_prepare); |
| assert!(futures::poll!(commit_prepare.as_mut()).is_pending()); |
| |
| // Taking another read should also be blocked. |
| let read2 = manager.lock(keys.clone(), LockState::ReadLock); |
| pin_mut!(read2); |
| assert!(futures::poll!(read2.as_mut()).is_pending()); |
| |
| // Drop the first read and the upgrade should complete. |
| std::mem::drop(read1); |
| assert!(futures::poll!(commit_prepare).is_ready()); |
| |
| // But the second read should still be blocked. |
| assert!(futures::poll!(read2.as_mut()).is_pending()); |
| |
| // If we drop the write lock now, the read should be unblocked. |
| std::mem::drop(guard); |
| assert!(futures::poll!(read2).is_ready()); |
| } |
| |
| static LOCK_KEY_1: LockKey = LockKey::flush(1); |
| static LOCK_KEY_2: LockKey = LockKey::flush(2); |
| static LOCK_KEY_3: LockKey = LockKey::flush(3); |
| |
| // The keys, storage method, and capacity must all match. |
| fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) { |
| match (value, expected) { |
| (LockKeys::None, LockKeys::None) => {} |
| (LockKeys::Inline(key1), LockKeys::Inline(key2)) => { |
| if key1 != key2 { |
| panic!("{key1:?} != {key2:?}"); |
| } |
| } |
| (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => { |
| if vec1 != vec2 { |
| panic!("{vec1:?} != {vec2:?}"); |
| } |
| if vec1.capacity() != vec2.capacity() { |
| panic!( |
| "LockKeys have different capacity: {} != {}", |
| vec1.capacity(), |
| vec2.capacity() |
| ); |
| } |
| } |
| (_, _) => panic!("{value:?} != {expected:?}"), |
| } |
| } |
| |
| // Only the keys must match. Storage method and capacity don't matter. |
| fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) { |
| let value: Vec<_> = value.iter().collect(); |
| let expected: Vec<_> = expected.iter().collect(); |
| assert_eq!(value, expected); |
| } |
| |
| #[test] |
| fn test_lock_keys_macro() { |
| assert_lock_keys_equal(&lock_keys![], &LockKeys::None); |
| assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1)); |
| assert_lock_keys_equal( |
| &lock_keys![LOCK_KEY_1, LOCK_KEY_2], |
| &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]), |
| ); |
| } |
| |
| #[test] |
| fn test_lock_keys_with_capacity() { |
| assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None); |
| assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None); |
| assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2))); |
| } |
| |
| #[test] |
| fn test_lock_keys_len() { |
| assert_eq!(lock_keys![].len(), 0); |
| assert_eq!(lock_keys![LOCK_KEY_1].len(), 1); |
| assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2); |
| } |
| |
| #[test] |
| fn test_lock_keys_contains() { |
| assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false); |
| assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true); |
| assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false); |
| assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true); |
| assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true); |
| assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false); |
| } |
| |
| #[test] |
| fn test_lock_keys_push() { |
| let mut keys = lock_keys![]; |
| keys.push(LOCK_KEY_1); |
| assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1)); |
| keys.push(LOCK_KEY_2); |
| assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2])); |
| keys.push(LOCK_KEY_3); |
| assert_lock_keys_equivalent( |
| &keys, |
| &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]), |
| ); |
| } |
| |
| #[test] |
| fn test_lock_keys_sort_unstable() { |
| let mut keys = lock_keys![]; |
| keys.sort_unstable(); |
| assert_lock_keys_equal(&keys, &lock_keys![]); |
| |
| let mut keys = lock_keys![LOCK_KEY_1]; |
| keys.sort_unstable(); |
| assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]); |
| |
| let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1]; |
| keys.sort_unstable(); |
| assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]); |
| } |
| |
| #[test] |
| fn test_lock_keys_dedup() { |
| let mut keys = lock_keys![]; |
| keys.dedup(); |
| assert_lock_keys_equal(&keys, &lock_keys![]); |
| |
| let mut keys = lock_keys![LOCK_KEY_1]; |
| keys.dedup(); |
| assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]); |
| |
| let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1]; |
| keys.dedup(); |
| assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]); |
| } |
| |
| #[test] |
| fn test_lock_keys_truncate() { |
| let mut keys = lock_keys![]; |
| keys.truncate(5); |
| assert_lock_keys_equal(&keys, &lock_keys![]); |
| keys.truncate(0); |
| assert_lock_keys_equal(&keys, &lock_keys![]); |
| |
| let mut keys = lock_keys![LOCK_KEY_1]; |
| keys.truncate(5); |
| assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]); |
| keys.truncate(0); |
| assert_lock_keys_equal(&keys, &lock_keys![]); |
| |
| let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2]; |
| keys.truncate(5); |
| assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]); |
| keys.truncate(1); |
| // Although there's only 1 key after truncate the key is not stored inline. |
| assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]); |
| } |
| |
| #[test] |
| fn test_lock_keys_iter() { |
| assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new()); |
| |
| assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]); |
| |
| assert_eq!( |
| lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(), |
| vec![&LOCK_KEY_1, &LOCK_KEY_2] |
| ); |
| } |
| } |