blob: 2026f0b22472c2655c805e239c507ceb4495a1ff [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
crypt::WrappedKey,
debug_assert_not_too_long,
log::*,
lsm_tree::types::Item,
object_handle::INVALID_OBJECT_ID,
object_store::{
allocator::{AllocatorItem, Reservation},
object_manager::{reserved_space_from_journal_usage, ObjectManager},
object_record::{ObjectItem, ObjectKey, ObjectValue},
},
serialized_types::Versioned,
},
anyhow::Error,
async_trait::async_trait,
either::{Either, Left, Right},
futures::future::poll_fn,
serde::{Deserialize, Serialize},
std::{
cmp::Ordering,
collections::{
hash_map::{Entry, HashMap},
BTreeSet,
},
ops::Range,
sync::{Arc, Mutex},
task::{Poll, Waker},
vec::Vec,
},
};
/// `Options` are provided to types that expose the `TransactionHandler` trait.
///
/// This allows for special handling of certain transactions such as deletes and the
/// extension of Journal extents. For most other use cases it is appropriate to use
/// `default()` here.
#[derive(Clone, Copy, Default)]
pub struct Options<'a> {
/// If true, don't check for low journal space. This should be true for any transactions that
/// might alleviate journal space (i.e. compaction).
pub skip_journal_checks: bool,
/// If true, borrow metadata space from the metadata reservation. This setting should be set to
/// true for any transaction that will either not affect space usage after compaction
/// (e.g. setting attributes), or reduce space usage (e.g. unlinking). Otherwise, a transaction
/// might fail with an out-of-space error.
pub borrow_metadata_space: bool,
/// If specified, a reservation to be used with the transaction. If not set, any allocations
/// that are part of this transaction will have to take their chances, and will fail if there is
/// no free space. The intention is that this should be used for things like the journal which
/// require guaranteed space.
pub allocator_reservation: Option<&'a Reservation>,
}
// This is the amount of space that we reserve for metadata when we are creating a new transaction.
// A transaction should not take more than this. This is expressed in terms of space occupied in
// the journal; transactions must not take up more space in the journal than the number below. The
// amount chosen here must be large enough for the maximum possible transaction that can be created,
// so transactions always need to be bounded which might involve splitting an operation up into
// smaller transactions.
pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
#[must_use]
pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
impl TransactionLocks<'_> {
pub async fn commit_prepare(&self) {
self.0.manager.commit_prepare_keys(&self.0.lock_keys).await;
}
}
#[async_trait]
pub trait TransactionHandler: Send + Sync {
/// Initiates a new transaction. Implementations should check to see that a transaction can be
/// created (for example, by checking to see that the journaling system can accept more
/// transactions), and then call Transaction::new.
async fn new_transaction<'a>(
self: Arc<Self>,
lock_keys: &[LockKey],
options: Options<'a>,
) -> Result<Transaction<'a>, Error>;
/// Acquires transaction locks for |lock_keys| which can later be put into a transaction via
/// new_transaction_with_locks.
/// This is useful in situations where the lock needs to be held before the transaction options
/// can be determined, e.g. to take the allocator reservation.
async fn transaction_lock<'a>(&'a self, lock_keys: &[LockKey]) -> TransactionLocks<'a>;
/// Implementations should perform any required journaling and then apply the mutations via
/// ObjectManager's apply_mutation method. Any mutations within the transaction should be
/// removed so that drop_transaction can tell that the transaction was committed. If
/// successful, returns the journal offset that the transaction was written to.
async fn commit_transaction(
self: Arc<Self>,
transaction: &mut Transaction<'_>,
) -> Result<u64, Error>;
/// Drops a transaction (rolling back if not committed). Committing a transaction should have
/// removed the mutations. This is called automatically when Transaction is dropped, which is
/// why this isn't async.
fn drop_transaction(&self, transaction: &mut Transaction<'_>);
/// Acquires a read lock for the given keys. Read locks are only blocked whilst a transaction
/// is being committed for the same locks. They are only necessary where consistency is
/// required between different mutations within a transaction. For example, a write might
/// change the size and extents for an object, in which case a read lock is required so that
/// observed size and extents are seen together or not at all. Implementations should call
/// through to LockManager's read_lock implementation.
async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a>;
/// Acquires a write lock for the given keys. Write locks provide exclusive access to the
/// requested lock keys.
async fn write_lock<'a>(&'a self, lock_keys: &[LockKey]) -> WriteGuard<'a>;
}
/// The journal consists of these records which will be replayed at mount time. Within a
/// transaction, these are stored as a set which allows some mutations to be deduplicated and found
/// (and we require custom comparison functions below). For example, we need to be able to find
/// object size changes.
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, Versioned)]
pub enum Mutation {
ObjectStore(ObjectStoreMutation),
EncryptedObjectStore(Box<[u8]>),
Allocator(AllocatorMutation),
// Indicates the beginning of a flush. This would typically involve sealing a tree.
BeginFlush,
// Indicates the end of a flush. This would typically involve replacing the immutable layers
// with compacted ones.
EndFlush,
// Volume has been deleted. Requires we remove it from the set of managed ObjectStore.
DeleteVolume,
UpdateBorrowed(u64),
UpdateMutationsKey(UpdateMutationsKey),
}
impl Mutation {
pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
Mutation::ObjectStore(ObjectStoreMutation {
item: Item::new(key, value),
op: Operation::Insert,
})
}
pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
Mutation::ObjectStore(ObjectStoreMutation {
item: Item::new(key, value),
op: Operation::ReplaceOrInsert,
})
}
pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
Mutation::ObjectStore(ObjectStoreMutation {
item: Item::new(key, value),
op: Operation::Merge,
})
}
pub fn update_mutations_key(key: WrappedKey) -> Self {
Mutation::UpdateMutationsKey(UpdateMutationsKey(key))
}
}
// We have custom comparison functions for mutations that just use the key, rather than the key and
// value that would be used by default so that we can deduplicate and find mutations (see
// get_object_mutation below).
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ObjectStoreMutation {
pub item: ObjectItem,
pub op: Operation,
}
// The different LSM tree operations that can be performed as part of a mutation.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Operation {
Insert,
ReplaceOrInsert,
Merge,
}
impl Ord for ObjectStoreMutation {
fn cmp(&self, other: &Self) -> Ordering {
self.item.key.cmp(&other.item.key)
}
}
impl PartialOrd for ObjectStoreMutation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for ObjectStoreMutation {
fn eq(&self, other: &Self) -> bool {
self.item.key.eq(&other.item.key)
}
}
impl Eq for ObjectStoreMutation {}
impl Ord for AllocatorItem {
fn cmp(&self, other: &Self) -> Ordering {
self.key.cmp(&other.key)
}
}
impl PartialOrd for AllocatorItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum AllocatorMutation {
Allocate {
device_range: Range<u64>,
owner_object_id: u64,
},
Deallocate {
device_range: Range<u64>,
owner_object_id: u64,
},
/// Marks all extents with a given owner_object_id for deletion.
/// Used to free space allocated to encrypted ObjectStore where we may not have the key.
/// Note that the actual deletion time is undefined so this should never be used where an
/// ObjectStore is still in use due to a high risk of corruption. Similarly, owner_object_id
/// should never be reused for the same reasons.
MarkForDeletion(u64),
}
/// We need Ord and PartialOrd to support deduplication in the associative map used in handling
/// transactions. Range<> doesn't provide a default for this.
impl Ord for AllocatorMutation {
fn cmp(&self, other: &Self) -> Ordering {
match self {
Self::Allocate { device_range, owner_object_id } => match other {
Self::Allocate {
device_range: device_range2,
owner_object_id: owner_object_id2,
} => device_range
.start
.cmp(&device_range2.start)
.then(device_range.end.cmp(&device_range2.end))
.then(owner_object_id.cmp(owner_object_id2)),
_ => Ordering::Less,
},
Self::Deallocate { device_range, owner_object_id } => match other {
Self::Allocate { .. } => Ordering::Greater,
Self::Deallocate {
device_range: device_range2,
owner_object_id: owner_object_id2,
} => device_range
.start
.cmp(&device_range2.start)
.then(device_range.end.cmp(&device_range2.end))
.then(owner_object_id.cmp(owner_object_id2)),
_ => Ordering::Less,
},
Self::MarkForDeletion(owner_object_id) => match other {
Self::MarkForDeletion(owner_object_id2) => owner_object_id.cmp(owner_object_id2),
_ => Ordering::Greater,
},
}
}
}
impl PartialOrd for AllocatorMutation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UpdateMutationsKey(pub WrappedKey);
impl Ord for UpdateMutationsKey {
fn cmp(&self, other: &Self) -> Ordering {
(self as *const UpdateMutationsKey).cmp(&(other as *const _))
}
}
impl PartialOrd for UpdateMutationsKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Eq for UpdateMutationsKey {}
impl PartialEq for UpdateMutationsKey {
fn eq(&self, other: &Self) -> bool {
std::ptr::eq(self, other)
}
}
/// When creating a transaction, locks typically need to be held to prevent two or more writers
/// trying to make conflicting mutations at the same time. LockKeys are used for this.
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub enum LockKey {
/// Used to lock changes to a particular object attribute (e.g. writes).
ObjectAttribute { store_object_id: u64, object_id: u64, attribute_id: u64 },
/// Used to lock changes to a particular object (e.g. adding a child to a directory).
Object { store_object_id: u64, object_id: u64 },
/// Used to lock changes to the root volume (e.g. adding or removing a volume).
RootVolume,
/// Locks the entire filesystem.
Filesystem,
/// Used to lock cached writes to an object attribute.
CachedWrite { store_object_id: u64, object_id: u64, attribute_id: u64 },
/// Used to lock flushing an object.
Flush { object_id: u64 },
}
impl LockKey {
pub fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
}
pub fn object(store_object_id: u64, object_id: u64) -> Self {
LockKey::Object { store_object_id, object_id }
}
pub fn cached_write(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
LockKey::CachedWrite { store_object_id, object_id, attribute_id }
}
pub fn flush(object_id: u64) -> Self {
LockKey::Flush { object_id }
}
}
/// Mutations can be associated with an object so that when mutations are applied, updates can be
/// applied to in-memory structures. For example, we cache object sizes, so when a size change is
/// applied, we can update the cached object size.
pub trait AssociatedObject: Send + Sync {
fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
}
}
pub enum AssocObj<'a> {
None,
Borrowed(&'a (dyn AssociatedObject)),
Owned(Box<dyn AssociatedObject>),
}
impl AssocObj<'_> {
pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
match self {
AssocObj::None => None,
AssocObj::Borrowed(ref b) => Some(f(*b)),
AssocObj::Owned(ref o) => Some(f(o.as_ref())),
}
}
}
pub struct TxnMutation<'a> {
// This, at time of writing, is either the object ID of an object store, or the object ID of the
// allocator. In the case of an object mutation, there's another object ID in the mutation
// record that would be for the object actually being changed.
pub object_id: u64,
// The actual mutation. This gets serialized to the journal.
pub mutation: Mutation,
// An optional associated object for the mutation. During replay, there will always be no
// associated object.
pub associated_object: AssocObj<'a>,
}
// We store TxnMutation in a set, and for that, we only use object_id and mutation and not the
// associated object.
impl Ord for TxnMutation<'_> {
fn cmp(&self, other: &Self) -> Ordering {
self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
}
}
impl PartialOrd for TxnMutation<'_> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for TxnMutation<'_> {
fn eq(&self, other: &Self) -> bool {
self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
}
}
impl Eq for TxnMutation<'_> {}
impl std::fmt::Debug for TxnMutation<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TxnMutation")
.field("object_id", &self.object_id)
.field("mutation", &self.mutation)
.finish()
}
}
pub enum MetadataReservation {
// Metadata space for this transaction is being borrowed from ObjectManager's metadata
// reservation.
Borrowed,
// A metadata reservation was made when the transaction was created.
Reservation(Reservation),
// The metadata space is being _held_ within `allocator_reservation`.
Hold(u64),
}
/// A transaction groups mutation records to be committed as a group.
pub struct Transaction<'a> {
handler: Arc<dyn TransactionHandler>,
/// The mutations that make up this transaction.
pub mutations: BTreeSet<TxnMutation<'a>>,
// The locks that this transaction currently holds.
txn_locks: Vec<LockKey>,
// The read locks that this transaction currently holds.
read_locks: Vec<LockKey>,
/// If set, an allocator reservation that should be used for allocations.
pub allocator_reservation: Option<&'a Reservation>,
/// The reservation for the metadata for this transaction.
pub metadata_reservation: MetadataReservation,
}
impl<'a> Transaction<'a> {
/// Creates a new transaction. This should typically be called by a TransactionHandler's
/// implementation of new_transaction. The read locks are acquired before the transaction
/// locks (see LockManager for the semantics of the different kinds of locks).
pub async fn new<H: TransactionHandler + AsRef<LockManager> + 'static>(
handler: Arc<H>,
metadata_reservation: MetadataReservation,
read_locks: &[LockKey],
txn_locks: &[LockKey],
) -> Transaction<'a> {
let (read_locks, txn_locks) = {
let lock_manager: &LockManager = handler.as_ref().as_ref();
let mut read_guard = debug_assert_not_too_long!(lock_manager.read_lock(read_locks));
let mut write_guard = debug_assert_not_too_long!(lock_manager.txn_lock(txn_locks));
(std::mem::take(&mut read_guard.lock_keys), std::mem::take(&mut write_guard.lock_keys))
};
Transaction {
handler,
mutations: BTreeSet::new(),
txn_locks,
read_locks,
allocator_reservation: None,
metadata_reservation,
}
}
/// Adds a mutation to this transaction. If the mutation already exists, it is replaced and the
/// old mutation is returned.
pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
assert!(object_id != INVALID_OBJECT_ID);
self.mutations
.replace(TxnMutation { object_id, mutation, associated_object: AssocObj::None })
.map(|m| m.mutation)
}
/// Removes a mutation that matches `mutation`.
pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
self.mutations.remove(&TxnMutation {
object_id,
mutation,
associated_object: AssocObj::None,
});
}
/// Adds a mutation with an associated object.
pub fn add_with_object(
&mut self,
object_id: u64,
mutation: Mutation,
associated_object: AssocObj<'a>,
) {
assert!(object_id != INVALID_OBJECT_ID);
self.mutations.replace(TxnMutation { object_id, mutation, associated_object });
}
/// Returns true if this transaction has no mutations.
pub fn is_empty(&self) -> bool {
self.mutations.is_empty()
}
/// Searches for an existing object mutation within the transaction that has the given key and
/// returns it if found.
pub fn get_object_mutation(
&self,
object_id: u64,
key: ObjectKey,
) -> Option<&ObjectStoreMutation> {
if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
self.mutations.get(&TxnMutation {
object_id,
mutation: Mutation::insert_object(key, ObjectValue::None),
associated_object: AssocObj::None,
})
{
Some(mutation)
} else {
None
}
}
/// Commits a transaction. If successful, returns the journal offset of the transaction.
pub async fn commit(mut self) -> Result<u64, Error> {
debug!(txn = ?&self, "Commit");
self.handler.clone().commit_transaction(&mut self).await
}
/// Commits and then runs the callback whilst locks are held. Specifically, any write locks and
/// upgraded transaction locks will be held, but other transactions can still be committed
/// whilst the callback is running. The callback accepts a single parameter which is the
/// journal offset of the transaction.
pub async fn commit_with_callback<R>(mut self, f: impl FnOnce(u64) -> R) -> Result<R, Error> {
debug!(txn = ?&self, "Commit");
Ok(f(self.handler.clone().commit_transaction(&mut self).await?))
}
}
impl Drop for Transaction<'_> {
fn drop(&mut self) {
// Call the TransactionHandler implementation of drop_transaction which should, as a
// minimum, call LockManager's drop_transaction to ensure the locks are released.
debug!(txn = ?&self, "Drop");
self.handler.clone().drop_transaction(self);
}
}
impl std::fmt::Debug for Transaction<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Transaction")
.field("mutations", &self.mutations)
.field("txn_locks", &self.txn_locks)
.field("read_locks", &self.read_locks)
.field("reservation", &self.allocator_reservation)
.finish()
}
}
/// LockManager holds the locks that transactions might have taken. A TransactionManager
/// implementation would typically have one of these. Three different kinds of locks are supported.
/// There are read locks and write locks, which are as one would expect. The third kind of lock is
/// a _transaction_ lock. When first acquired, these block other writes but do not block reads.
/// When it is time to commit a transaction, these locks are upgraded to full write locks and then
/// dropped after committing. This way, reads are only blocked for the shortest possible time. It
/// follows that write locks should be used sparingly.
pub struct LockManager {
locks: Mutex<Locks>,
}
struct Locks {
sequence: u64,
keys: HashMap<LockKey, LockEntry>,
}
impl Locks {
fn drop_read_locks(&mut self, lock_keys: Vec<LockKey>) {
for lock in lock_keys {
match self.keys.entry(lock) {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
entry.read_count -= 1;
if entry.read_count == 0 {
match entry.state {
LockState::ReadLock => {
occupied.remove_entry();
}
LockState::Locked => {}
LockState::WantWrite(ref waker) => waker.wake_by_ref(),
LockState::WriteLock => unreachable!(),
}
}
}
}
}
}
fn drop_write_locks(&mut self, lock_keys: Vec<LockKey>) {
for lock in lock_keys {
match self.keys.entry(lock) {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
let wakers = std::mem::take(&mut entry.wakers);
match entry.state {
LockState::WriteLock => {
occupied.remove_entry();
}
LockState::Locked | LockState::WantWrite(_) => {
// There might be active readers referencing the same lock key, so we
// shouldn't remove it from the lock-set yet. The last reader will
// remove the entry (See Guard::drop).
if entry.read_count == 0 {
occupied.remove_entry();
} else {
entry.state = LockState::ReadLock;
self.sequence += 1;
entry.sequence = self.sequence;
}
}
LockState::ReadLock => unreachable!(),
}
for waker in wakers {
waker.wake();
}
}
}
}
}
}
#[derive(Debug)]
struct LockEntry {
sequence: u64,
read_count: u64,
state: LockState,
wakers: Vec<Waker>,
}
#[derive(Clone, Debug)]
enum LockState {
// In this state, there are only readers.
ReadLock,
// This state is used for transactions to lock other writers, but it still allows readers.
Locked,
// This state is used to block new readers. When all existing readers are done, the lock
// should be promoted to a write lock.
WantWrite(Waker),
// A writer has exclusive access; all other readers and writers are blocked.
WriteLock,
}
impl LockManager {
pub fn new() -> Self {
LockManager { locks: Mutex::new(Locks { sequence: 0, keys: HashMap::new() }) }
}
/// Acquires the locks. It is the caller's responsibility to ensure that drop_transaction is
/// called when a transaction is dropped i.e. implementers of TransactionHandler's
/// drop_transaction method should call LockManager's drop_transaction method.
pub async fn txn_lock<'a>(&'a self, lock_keys: &[LockKey]) -> WriteGuard<'a> {
self.lock(lock_keys, LockState::Locked).await.right().unwrap()
}
// `state` indicates the kind of lock required. ReadLock means acquire a read lock. Locked
// means lock other writers, but still allow readers. WriteLock means acquire a write lock.
async fn lock<'a>(
&'a self,
lock_keys: &[LockKey],
target_state: LockState,
) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
let mut guard = match &target_state {
LockState::ReadLock => Left(ReadGuard { manager: self, lock_keys: Vec::new() }),
LockState::Locked | LockState::WriteLock => {
Right(WriteGuard { manager: self, lock_keys: Vec::new() })
}
LockState::WantWrite(_) => panic!("Use LockState::WriteLocked"),
};
let guard_keys = match &mut guard {
Left(g) => &mut g.lock_keys,
Right(g) => &mut g.lock_keys,
};
let mut lock_keys = lock_keys.to_vec();
lock_keys.sort_unstable();
lock_keys.dedup();
for lock in lock_keys {
let mut waker_sequence = 0;
let mut waker_index = 0;
let mut want_write = false;
poll_fn(|cx| {
let mut locks = self.locks.lock().unwrap();
let Locks { sequence, keys } = &mut *locks;
match keys.entry(lock.clone()) {
Entry::Vacant(vacant) => {
*sequence += 1;
vacant.insert(LockEntry {
sequence: *sequence,
read_count: if let LockState::ReadLock = target_state {
guard_keys.push(lock.clone());
1
} else {
guard_keys.push(lock.clone());
0
},
state: target_state.clone(),
wakers: Vec::new(),
});
Poll::Ready(())
}
Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
if want_write {
if entry.read_count == 0 {
entry.state = LockState::WriteLock;
Poll::Ready(())
} else {
entry.state = LockState::WantWrite(cx.waker().clone());
Poll::Pending
}
} else {
match (&entry.state, &target_state) {
(LockState::ReadLock, LockState::WriteLock) => {
entry.state = LockState::WantWrite(cx.waker().clone());
want_write = true;
guard_keys.push(lock.clone());
Poll::Pending
}
(LockState::ReadLock, _)
| (LockState::Locked, LockState::ReadLock) => {
if let LockState::ReadLock = target_state {
entry.read_count += 1;
guard_keys.push(lock.clone());
} else {
entry.state = target_state.clone();
guard_keys.push(lock.clone());
}
Poll::Ready(())
}
_ => {
if entry.sequence == waker_sequence {
entry.wakers[waker_index] = cx.waker().clone();
} else {
waker_index = entry.wakers.len();
waker_sequence = *sequence;
entry.wakers.push(cx.waker().clone());
}
Poll::Pending
}
}
}
}
}
})
.await;
}
guard
}
/// This should be called by a TransactionHandler drop_transaction implementation.
pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
let mut locks = self.locks.lock().unwrap();
locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
locks.drop_read_locks(std::mem::take(&mut transaction.read_locks));
}
/// Prepares to commit by waiting for readers to finish.
pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
self.commit_prepare_keys(&transaction.txn_locks).await;
}
async fn commit_prepare_keys(&self, lock_keys: &[LockKey]) {
for lock in lock_keys {
poll_fn(|cx| {
let mut locks = self.locks.lock().unwrap();
let entry = locks.keys.get_mut(&lock).expect("key missing!");
if entry.read_count > 0 {
entry.state = LockState::WantWrite(cx.waker().clone());
Poll::Pending
} else {
entry.state = LockState::WriteLock;
Poll::Ready(())
}
})
.await;
}
}
pub async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a> {
self.lock(lock_keys, LockState::ReadLock).await.left().unwrap()
}
pub async fn write_lock<'a>(&'a self, lock_keys: &[LockKey]) -> WriteGuard<'a> {
self.lock(lock_keys, LockState::WriteLock).await.right().unwrap()
}
}
#[must_use]
pub struct ReadGuard<'a> {
manager: &'a LockManager,
lock_keys: Vec<LockKey>,
}
impl Drop for ReadGuard<'_> {
fn drop(&mut self) {
let mut locks = self.manager.locks.lock().unwrap();
locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
}
}
#[must_use]
pub struct WriteGuard<'a> {
manager: &'a LockManager,
lock_keys: Vec<LockKey>,
}
impl Drop for WriteGuard<'_> {
fn drop(&mut self) {
let mut locks = self.manager.locks.lock().unwrap();
locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
}
}
#[cfg(test)]
mod tests {
use {
super::{LockKey, LockManager, LockState, Mutation, Options, TransactionHandler},
crate::filesystem::FxFilesystem,
fuchsia_async as fasync,
futures::{channel::oneshot::channel, future::FutureExt, join},
std::{sync::Mutex, task::Poll, time::Duration},
storage_device::{fake_device::FakeDevice, DeviceHolder},
};
#[fasync::run_singlethreaded(test)]
async fn test_simple() {
let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let mut t = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
t.add(1, Mutation::BeginFlush);
assert!(!t.is_empty());
}
#[fasync::run_singlethreaded(test)]
async fn test_locks() {
let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let (send1, recv1) = channel();
let (send2, recv2) = channel();
let (send3, recv3) = channel();
let done = Mutex::new(false);
join!(
async {
let _t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(1, 2, 3)], Options::default())
.await
.expect("new_transaction failed");
send1.send(()).unwrap(); // Tell the next future to continue.
send3.send(()).unwrap(); // Tell the last future to continue.
recv2.await.unwrap();
// This is a halting problem so all we can do is sleep.
fasync::Timer::new(Duration::from_millis(100)).await;
assert!(!*done.lock().unwrap());
},
async {
recv1.await.unwrap();
// This should not block since it is a different key.
let _t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(2, 2, 3)], Options::default())
.await
.expect("new_transaction failed");
// Tell the first future to continue.
send2.send(()).unwrap();
},
async {
// This should block until the first future has completed.
recv3.await.unwrap();
let _t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(1, 2, 3)], Options::default())
.await;
*done.lock().unwrap() = true;
}
);
}
#[fasync::run_singlethreaded(test)]
async fn test_read_lock_after_write_lock() {
let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let (send1, recv1) = channel();
let (send2, recv2) = channel();
let done = Mutex::new(false);
join!(
async {
let t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(1, 2, 3)], Options::default())
.await
.expect("new_transaction failed");
send1.send(()).unwrap(); // Tell the next future to continue.
recv2.await.unwrap();
t.commit().await.expect("commit failed");
*done.lock().unwrap() = true;
},
async {
recv1.await.unwrap();
// Reads should not be blocked until the transaction is committed.
let _guard = fs.read_lock(&[LockKey::object_attribute(1, 2, 3)]).await;
// Tell the first future to continue.
send2.send(()).unwrap();
// It shouldn't proceed until we release our read lock, but it's a halting
// problem, so sleep.
fasync::Timer::new(Duration::from_millis(100)).await;
assert!(!*done.lock().unwrap());
},
);
}
#[fasync::run_singlethreaded(test)]
async fn test_write_lock_after_read_lock() {
let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let (send1, recv1) = channel();
let (send2, recv2) = channel();
let done = Mutex::new(false);
join!(
async {
// Reads should not be blocked until the transaction is committed.
let _guard = fs.read_lock(&[LockKey::object_attribute(1, 2, 3)]).await;
// Tell the next future to continue and then nwait.
send1.send(()).unwrap();
recv2.await.unwrap();
// It shouldn't proceed until we release our read lock, but it's a halting
// problem, so sleep.
fasync::Timer::new(Duration::from_millis(100)).await;
assert!(!*done.lock().unwrap());
},
async {
recv1.await.unwrap();
let t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(1, 2, 3)], Options::default())
.await
.expect("new_transaction failed");
send2.send(()).unwrap(); // Tell the first future to continue;
t.commit().await.expect("commit failed");
*done.lock().unwrap() = true;
},
);
}
#[fasync::run_singlethreaded(test)]
async fn test_drop_uncommitted_transaction() {
let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let key = LockKey::object(1, 1);
// Dropping while there's a reader.
{
let mut write_lock = fs
.clone()
.new_transaction(&[key.clone()], Options::default())
.await
.expect("new_transaction failed");
let _read_lock = fs.read_lock(&[key.clone()]).await;
fs.clone().drop_transaction(&mut write_lock);
}
// Dropping while there's no reader.
let mut write_lock = fs
.clone()
.new_transaction(&[key.clone()], Options::default())
.await
.expect("new_transaction failed");
fs.clone().drop_transaction(&mut write_lock);
// Make sure we can take the lock again (i.e. it was actually released).
fs.clone()
.new_transaction(&[key.clone()], Options::default())
.await
.expect("new_transaction failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_drop_waiting_write_lock() {
let manager = LockManager::new();
let keys = &[LockKey::object(1, 1)];
{
let _guard = manager.lock(keys, LockState::ReadLock).await;
if let Poll::Ready(_) = futures::poll!(manager.lock(keys, LockState::WriteLock).boxed())
{
assert!(false);
}
}
let _ = manager.lock(keys, LockState::WriteLock).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_write_lock_blocks_everything() {
let manager = LockManager::new();
let keys = &[LockKey::object(1, 1)];
{
let _guard = manager.lock(keys, LockState::WriteLock).await;
if let Poll::Ready(_) = futures::poll!(manager.lock(keys, LockState::WriteLock).boxed())
{
assert!(false);
}
if let Poll::Ready(_) = futures::poll!(manager.lock(keys, LockState::ReadLock).boxed())
{
assert!(false);
}
}
{
let _guard = manager.lock(keys, LockState::WriteLock).await;
}
{
let _guard = manager.lock(keys, LockState::ReadLock).await;
}
}
}