blob: 5484b2efffe24a4f772ca3b7a6ff3fbc10d0daaf [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
debug_assert_not_too_long,
lsm_tree::types::Item,
object_handle::INVALID_OBJECT_ID,
object_store::{
allocator::{AllocatorItem, Reservation},
record::{ObjectItem, ObjectKey, ObjectValue},
StoreInfo,
},
},
anyhow::Error,
async_trait::async_trait,
either::{Either, Left, Right},
futures::future::poll_fn,
serde::{Deserialize, Serialize},
std::{
any::Any,
cmp::Ordering,
collections::{
hash_map::{Entry, HashMap},
BTreeSet,
},
sync::{Arc, Mutex},
task::{Poll, Waker},
vec::Vec,
},
};
#[derive(Clone, Copy, Default)]
pub struct Options<'a> {
/// If true, don't check for low journal space. This should be true for any transactions that
/// might alleviate journal space (i.e. compaction).
pub skip_journal_checks: bool,
/// If true, borrow metadata space from the metadata reservation. This setting should be set to
/// true for any transaction that will either not affect space usage after compaction
/// (e.g. setting attributes), or reduce space usage (e.g. unlinking). Otherwise, a transaction
/// might fail with an out-of-space error.
pub borrow_metadata_space: bool,
/// If specified, a reservation to be used with the transaction. If not set, any allocations
/// that are part of this transaction will have to take their chances, and will fail if there is
/// no free space. The intention is that this should be used for things like the journal which
/// require guaranteed space.
pub allocator_reservation: Option<&'a Reservation>,
}
#[async_trait]
pub trait TransactionHandler: Send + Sync {
/// Initiates a new transaction. Implementations should check to see that a transaction can be
/// created (for example, by checking to see that the journaling system can accept more
/// transactions), and then call Transaction::new.
async fn new_transaction<'a>(
self: Arc<Self>,
lock_keys: &[LockKey],
options: Options<'a>,
) -> Result<Transaction<'a>, Error>;
/// Implementations should perform any required journaling and then apply the mutations via
/// ObjectManager's apply_mutation method. Any mutations within the transaction should be
/// removed so that drop_transaction can tell that the transaction was committed.
async fn commit_transaction(self: Arc<Self>, transaction: &mut Transaction<'_>);
/// Drops a transaction (rolling back if not committed). Committing a transaction should have
/// removed the mutations. This is called automatically when Transaction is dropped, which is
/// why this isn't async.
fn drop_transaction(&self, transaction: &mut Transaction<'_>);
/// Acquires a read lock for the given keys. Read locks are only blocked whilst a transaction
/// is being committed for the same locks. They are only necessary where consistency is
/// required between different mutations within a transaction. For example, a write might
/// change the size and extents for an object, in which case a read lock is required so that
/// observed size and extents are seen together or not at all. Implementations should call
/// through to LockManager's read_lock implementation.
async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a>;
/// Acquires a write lock for the given keys. Write locks provide exclusive access to the
/// requested lock keys.
async fn write_lock<'a>(&'a self, lock_keys: &[LockKey]) -> WriteGuard<'a>;
}
/// The journal consists of these records which will be replayed at mount time. Within a a
/// transaction, these are stored as a set which allows some mutations to be deduplicated and found
/// (and we require custom comparison functions below). For example, we need to be able to find
/// object size changes.
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
pub enum Mutation {
ObjectStore(ObjectStoreMutation),
ObjectStoreInfo(StoreInfoMutation),
Allocator(AllocatorMutation),
// Like an Allocator mutation, but without any change in allocated counts.
AllocatorRef(AllocatorMutation),
// Indicates the beginning of a flush. This would typically involve sealing a tree.
BeginFlush,
// Indicates the end of a flush. This would typically involve replacing the immutable layers
// with compacted ones.
EndFlush,
UpdateBorrowed(u64),
}
impl Mutation {
pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
Mutation::ObjectStore(ObjectStoreMutation {
item: Item::new(key, value),
op: Operation::Insert,
})
}
pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
Mutation::ObjectStore(ObjectStoreMutation {
item: Item::new(key, value),
op: Operation::ReplaceOrInsert,
})
}
pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
Mutation::ObjectStore(ObjectStoreMutation {
item: Item::new(key, value),
op: Operation::Merge,
})
}
pub fn store_info(store_info: StoreInfo) -> Self {
Mutation::ObjectStoreInfo(StoreInfoMutation(store_info))
}
pub fn allocation(item: AllocatorItem) -> Self {
Mutation::Allocator(AllocatorMutation(item))
}
pub fn allocation_ref(item: AllocatorItem) -> Self {
Mutation::AllocatorRef(AllocatorMutation(item))
}
}
// We have custom comparison functions for mutations that just use the key, rather than the key and
// value that would be used by default so that we can deduplicate and find mutations (see
// get_object_mutation below).
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ObjectStoreMutation {
pub item: ObjectItem,
pub op: Operation,
}
// The different LSM tree operations that can be performed as part of a mutation.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Operation {
Insert,
ReplaceOrInsert,
Merge,
}
impl Ord for ObjectStoreMutation {
fn cmp(&self, other: &Self) -> Ordering {
self.item.key.cmp(&other.item.key)
}
}
impl PartialOrd for ObjectStoreMutation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for ObjectStoreMutation {
fn eq(&self, other: &Self) -> bool {
self.item.key.eq(&other.item.key)
}
}
impl Eq for ObjectStoreMutation {}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StoreInfoMutation(pub StoreInfo);
impl Ord for StoreInfoMutation {
fn cmp(&self, _other: &Self) -> Ordering {
Ordering::Equal
}
}
impl PartialOrd for StoreInfoMutation {
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
Some(Ordering::Equal)
}
}
impl PartialEq for StoreInfoMutation {
fn eq(&self, _other: &Self) -> bool {
true
}
}
impl Eq for StoreInfoMutation {}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AllocatorMutation(pub AllocatorItem);
impl Ord for AllocatorMutation {
fn cmp(&self, other: &Self) -> Ordering {
self.0.key.cmp(&other.0.key)
}
}
impl PartialOrd for AllocatorMutation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for AllocatorMutation {
fn eq(&self, other: &Self) -> bool {
self.0.key.eq(&other.0.key)
}
}
impl Eq for AllocatorMutation {}
/// When creating a transaction, locks typically need to be held to prevent two or more writers
/// trying to make conflicting mutations at the same time. LockKeys are used for this.
/// TODO(csuter): At the moment, these keys only apply to writers, but there needs to be some
/// support for readers, since there are races that can occur whilst a transaction is being
/// committed.
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub enum LockKey {
/// Used to lock changes to a particular object attribute (e.g. writes).
ObjectAttribute { store_object_id: u64, object_id: u64, attribute_id: u64 },
/// Used to lock changes to a particular object (e.g. adding a child to a directory).
Object { store_object_id: u64, object_id: u64 },
/// Used to lock changes to the root volume (e.g. adding or removing a volume).
RootVolume,
/// Locks the entire filesystem.
Filesystem,
}
impl LockKey {
pub fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
}
pub fn object(store_object_id: u64, object_id: u64) -> Self {
LockKey::Object { store_object_id, object_id }
}
}
pub trait AsAny: Any {
fn as_any(&self) -> &dyn Any;
fn as_any_box(self: Box<Self>) -> Box<dyn Any>;
}
/// Mutations can be associated with an object so that when mutations are applied, updates can be
/// applied to in-memory structures. For example, we cache object sizes, so when a size change is
/// applied, we can update the cached object size.
pub trait AssociatedObject: AsAny + Send + Sync {
fn will_apply_mutation(&self, _mutation: &Mutation) {}
}
impl<T: AssociatedObject + 'static> AsAny for T {
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_box(self: Box<Self>) -> Box<dyn Any> {
self
}
}
pub enum AssocObj<'a> {
None,
Borrowed(&'a (dyn AssociatedObject)),
Owned(Box<dyn AssociatedObject>),
}
impl AssocObj<'_> {
pub fn downcast_ref<T: AssociatedObject + 'static>(&self) -> Option<&T> {
match self {
AssocObj::None => None,
AssocObj::Borrowed(b) => b.as_any().downcast_ref(),
AssocObj::Owned(o) => o.as_any().downcast_ref(),
}
}
pub fn will_apply_mutation(&self, mutation: &Mutation) {
match self {
AssocObj::None => {}
AssocObj::Borrowed(ref b) => b.will_apply_mutation(mutation),
AssocObj::Owned(ref o) => o.will_apply_mutation(mutation),
}
}
pub fn take<T: AssociatedObject + 'static>(&mut self) -> Option<Box<T>> {
if let AssocObj::Owned(o) = self {
if o.as_any().is::<T>() {
if let AssocObj::Owned(o) = std::mem::replace(self, AssocObj::None) {
return Some(o.as_any_box().downcast().unwrap());
} else {
unreachable!();
}
}
}
None
}
}
pub struct TxnMutation<'a> {
// This, at time of writing, is either the object ID of an object store, or the object ID of the
// allocator. In the case of an object mutation, there's another object ID in the mutation
// record that would be for the object actually being changed.
pub object_id: u64,
// The actual mutation. This gets serialized to the journal.
pub mutation: Mutation,
// An optional associated object for the mutation. During replay, there will always be no
// associated object.
pub associated_object: AssocObj<'a>,
}
// We store TxnMutation in a set, and for that, we only use object_id and mutation and not the
// associated object.
impl Ord for TxnMutation<'_> {
fn cmp(&self, other: &Self) -> Ordering {
self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
}
}
impl PartialOrd for TxnMutation<'_> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for TxnMutation<'_> {
fn eq(&self, other: &Self) -> bool {
self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
}
}
impl Eq for TxnMutation<'_> {}
impl std::fmt::Debug for TxnMutation<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TxnMutation")
.field("object_id", &self.object_id)
.field("mutation", &self.mutation)
.finish()
}
}
pub enum MetadataReservation {
// Metadata space for this transaction is being borrowed from ObjectManager's metadata
// reservation.
Borrowed,
// A metadata reservation was made when the transaction was created.
Reservation(Reservation),
// The metadata space is being _held_ within `allocator_reservation`.
Hold(u64),
}
/// A transaction groups mutation records to be committed as a group.
pub struct Transaction<'a> {
handler: Arc<dyn TransactionHandler>,
/// The mutations that make up this transaction.
pub mutations: BTreeSet<TxnMutation<'a>>,
// The locks that this transaction currently holds.
txn_locks: Vec<LockKey>,
// The read locks that this transaction currently holds.
read_locks: Vec<LockKey>,
/// If set, an allocator reservation that should be used for allocations.
pub allocator_reservation: Option<&'a Reservation>,
/// The reservation for the metadata for this transaction.
pub metadata_reservation: MetadataReservation,
}
impl<'a> Transaction<'a> {
/// Creates a new transaction. This should typically be called by a TransactionHandler's
/// implementation of new_transaction. The read locks are acquired before the transaction
/// locks (see LockManager for the semantics of the different kinds of locks).
pub async fn new<H: TransactionHandler + AsRef<LockManager> + 'static>(
handler: Arc<H>,
metadata_reservation: MetadataReservation,
read_locks: &[LockKey],
txn_locks: &[LockKey],
) -> Transaction<'a> {
let (read_locks, txn_locks) = {
let lock_manager: &LockManager = handler.as_ref().as_ref();
let mut read_guard = debug_assert_not_too_long!(lock_manager.read_lock(read_locks));
let mut write_guard = debug_assert_not_too_long!(lock_manager.txn_lock(txn_locks));
(std::mem::take(&mut read_guard.lock_keys), std::mem::take(&mut write_guard.lock_keys))
};
Transaction {
handler,
mutations: BTreeSet::new(),
txn_locks,
read_locks,
allocator_reservation: None,
metadata_reservation,
}
}
/// Adds a mutation to this transaction.
pub fn add(&mut self, object_id: u64, mutation: Mutation) {
assert!(object_id != INVALID_OBJECT_ID);
self.mutations.replace(TxnMutation {
object_id,
mutation,
associated_object: AssocObj::None,
});
}
/// Removes a mutation that matches `mutation`.
pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
self.mutations.remove(&TxnMutation {
object_id,
mutation,
associated_object: AssocObj::None,
});
}
/// Adds a mutation with an associated object.
pub fn add_with_object(
&mut self,
object_id: u64,
mutation: Mutation,
associated_object: AssocObj<'a>,
) {
assert!(object_id != INVALID_OBJECT_ID);
self.mutations.replace(TxnMutation { object_id, mutation, associated_object });
}
/// Returns true if this transaction has no mutations.
pub fn is_empty(&self) -> bool {
self.mutations.is_empty()
}
/// Searches for an existing object mutation within the transaction that has the given key and
/// returns it if found.
pub fn get_object_mutation(
&self,
object_id: u64,
key: ObjectKey,
) -> Option<&ObjectStoreMutation> {
if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
self.mutations.get(&TxnMutation {
object_id,
mutation: Mutation::insert_object(key, ObjectValue::None),
associated_object: AssocObj::None,
})
{
Some(mutation)
} else {
None
}
}
/// Searches for an exsting store info object mutation within the transaction and returns it if
/// found.
pub fn get_store_info(&self, object_id: u64) -> Option<&StoreInfo> {
if let Some(TxnMutation {
mutation: Mutation::ObjectStoreInfo(StoreInfoMutation(store_info)),
..
}) = self.mutations.get(&TxnMutation {
object_id,
mutation: Mutation::store_info(StoreInfo::default()),
associated_object: AssocObj::None,
}) {
Some(store_info)
} else {
None
}
}
/// Commits a transaction.
pub async fn commit(mut self) {
log::debug!("Commit {:?}", &self);
self.handler.clone().commit_transaction(&mut self).await;
}
/// Commits and then runs the callback whilst locks are held.
pub async fn commit_with_callback<R>(mut self, f: impl FnOnce() -> R) -> R {
self.handler.clone().commit_transaction(&mut self).await;
f()
}
}
impl Drop for Transaction<'_> {
fn drop(&mut self) {
// Call the TransactionHandler implementation of drop_transaction which should, as a
// minimum, call LockManager's drop_transaction to ensure the locks are released.
log::debug!("Drop {:?}", &self);
self.handler.clone().drop_transaction(self);
}
}
impl std::fmt::Debug for Transaction<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Transaction")
.field("mutations", &self.mutations)
.field("txn_locks", &self.txn_locks)
.field("read_locks", &self.read_locks)
.finish()
}
}
/// LockManager holds the locks that transactions might have taken. A TransactionManager
/// implementation would typically have one of these. Three different kinds of locks are supported.
/// There are read locks and write locks, which are as one would expect. The third kind of lock is
/// a _transaction_ lock. When first acquired, these block other writes but do not block reads.
/// When it is time to commit a transaction, these locks are upgraded to full write locks and then
/// dropped after committing. This way, reads are only blocked for the shortest possible time. It
/// follows that write locks should be used sparingly.
pub struct LockManager {
locks: Mutex<Locks>,
}
struct Locks {
sequence: u64,
keys: HashMap<LockKey, LockEntry>,
}
impl Locks {
fn drop_read_locks(&mut self, lock_keys: Vec<LockKey>) {
for lock in lock_keys {
match self.keys.entry(lock) {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
entry.read_count -= 1;
if entry.read_count == 0 {
match entry.state {
LockState::ReadLock => {
occupied.remove_entry();
}
LockState::Locked => {}
LockState::WantWrite(ref waker) => waker.wake_by_ref(),
LockState::WriteLock => unreachable!(),
}
}
}
}
}
}
fn drop_write_locks(&mut self, lock_keys: Vec<LockKey>) {
for lock in lock_keys {
match self.keys.entry(lock) {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
let wakers = std::mem::take(&mut entry.wakers);
match entry.state {
LockState::WriteLock => {
occupied.remove_entry();
}
LockState::Locked | LockState::WantWrite(_) => {
// There might be active readers referencing the same lock key, so we
// shouldn't remove it from the lock-set yet. The last reader will
// remove the entry (See Guard::drop).
if entry.read_count == 0 {
occupied.remove_entry();
} else {
entry.state = LockState::ReadLock;
self.sequence += 1;
entry.sequence = self.sequence;
}
}
LockState::ReadLock => unreachable!(),
}
for waker in wakers {
waker.wake();
}
}
}
}
}
}
#[derive(Debug)]
struct LockEntry {
sequence: u64,
read_count: u64,
state: LockState,
wakers: Vec<Waker>,
}
#[derive(Clone, Debug)]
enum LockState {
// In this state, there are only readers.
ReadLock,
// This state is used for transactions to lock other writers, but it still allows readers.
Locked,
// This state is used to block new readers. When all existing readers are done, the lock
// should be promoted to a write lock.
WantWrite(Waker),
// A writer has exclusive access; all other readers and writers are blocked.
WriteLock,
}
impl LockManager {
pub fn new() -> Self {
LockManager { locks: Mutex::new(Locks { sequence: 0, keys: HashMap::new() }) }
}
/// Acquires the locks. It is the caller's responsibility to ensure that drop_transaction is
/// called when a transaction is dropped i.e. implementers of TransactionHandler's
/// drop_transaction method should call LockManager's drop_transaction method.
pub async fn txn_lock<'a>(&'a self, lock_keys: &[LockKey]) -> WriteGuard<'a> {
self.lock(lock_keys, LockState::Locked).await.right().unwrap()
}
// `state` indicates the kind of lock required. ReadLock means acquire a read lock. Locked
// means lock other writers, but still allow readers. WriteLock means acquire a write lock.
async fn lock<'a>(
&'a self,
lock_keys: &[LockKey],
target_state: LockState,
) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
let mut guard = match &target_state {
LockState::ReadLock => Left(ReadGuard { manager: self, lock_keys: Vec::new() }),
LockState::Locked | LockState::WriteLock => {
Right(WriteGuard { manager: self, lock_keys: Vec::new() })
}
LockState::WantWrite(_) => panic!("Use LockState::WriteLocked"),
};
let guard_keys = match &mut guard {
Left(g) => &mut g.lock_keys,
Right(g) => &mut g.lock_keys,
};
let mut lock_keys = lock_keys.to_vec();
lock_keys.sort_unstable();
lock_keys.dedup();
for lock in lock_keys {
let mut waker_sequence = 0;
let mut waker_index = 0;
let mut want_write = false;
poll_fn(|cx| {
let mut locks = self.locks.lock().unwrap();
let Locks { sequence, keys } = &mut *locks;
match keys.entry(lock.clone()) {
Entry::Vacant(vacant) => {
*sequence += 1;
vacant.insert(LockEntry {
sequence: *sequence,
read_count: if let LockState::ReadLock = target_state {
guard_keys.push(lock.clone());
1
} else {
guard_keys.push(lock.clone());
0
},
state: target_state.clone(),
wakers: Vec::new(),
});
Poll::Ready(())
}
Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
if want_write {
if entry.read_count == 0 {
entry.state = LockState::WriteLock;
Poll::Ready(())
} else {
entry.state = LockState::WantWrite(cx.waker().clone());
Poll::Pending
}
} else {
match (&entry.state, &target_state) {
(LockState::ReadLock, LockState::WriteLock) => {
entry.state = LockState::WantWrite(cx.waker().clone());
want_write = true;
guard_keys.push(lock.clone());
Poll::Pending
}
(LockState::ReadLock, _)
| (LockState::Locked, LockState::ReadLock) => {
if let LockState::ReadLock = target_state {
entry.read_count += 1;
guard_keys.push(lock.clone());
} else {
entry.state = target_state.clone();
guard_keys.push(lock.clone());
}
Poll::Ready(())
}
_ => {
if entry.sequence == waker_sequence {
entry.wakers[waker_index] = cx.waker().clone();
} else {
waker_index = entry.wakers.len();
waker_sequence = *sequence;
entry.wakers.push(cx.waker().clone());
}
Poll::Pending
}
}
}
}
}
})
.await;
}
guard
}
/// This should be called by a TransactionHandler drop_transaction implementation.
pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
let mut locks = self.locks.lock().unwrap();
locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
locks.drop_read_locks(std::mem::take(&mut transaction.read_locks));
}
/// Prepares to commit by waiting for readers to finish.
pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
for lock in &transaction.txn_locks {
poll_fn(|cx| {
let mut locks = self.locks.lock().unwrap();
let entry = locks.keys.get_mut(&lock).expect("key missing!");
if entry.read_count > 0 {
entry.state = LockState::WantWrite(cx.waker().clone());
Poll::Pending
} else {
entry.state = LockState::WriteLock;
Poll::Ready(())
}
})
.await;
}
}
pub async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a> {
self.lock(lock_keys, LockState::ReadLock).await.left().unwrap()
}
pub async fn write_lock<'a>(&'a self, lock_keys: &[LockKey]) -> WriteGuard<'a> {
self.lock(lock_keys, LockState::WriteLock).await.right().unwrap()
}
}
#[must_use]
pub struct ReadGuard<'a> {
manager: &'a LockManager,
lock_keys: Vec<LockKey>,
}
impl Drop for ReadGuard<'_> {
fn drop(&mut self) {
let mut locks = self.manager.locks.lock().unwrap();
locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
}
}
#[must_use]
pub struct WriteGuard<'a> {
manager: &'a LockManager,
lock_keys: Vec<LockKey>,
}
impl Drop for WriteGuard<'_> {
fn drop(&mut self) {
let mut locks = self.manager.locks.lock().unwrap();
locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
}
}
#[cfg(test)]
mod tests {
use {
super::{LockKey, LockManager, LockState, Mutation, Options, TransactionHandler},
crate::object_store::filesystem::FxFilesystem,
fuchsia_async as fasync,
futures::{channel::oneshot::channel, future::FutureExt, join},
std::{sync::Mutex, task::Poll, time::Duration},
storage_device::{fake_device::FakeDevice, DeviceHolder},
};
#[fasync::run_singlethreaded(test)]
async fn test_simple() {
let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let mut t = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
t.add(1, Mutation::BeginFlush);
assert!(!t.is_empty());
}
#[fasync::run_singlethreaded(test)]
async fn test_locks() {
let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let (send1, recv1) = channel();
let (send2, recv2) = channel();
let (send3, recv3) = channel();
let done = Mutex::new(false);
join!(
async {
let _t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(1, 2, 3)], Options::default())
.await
.expect("new_transaction failed");
send1.send(()).unwrap(); // Tell the next future to continue.
send3.send(()).unwrap(); // Tell the last future to continue.
recv2.await.unwrap();
// This is a halting problem so all we can do is sleep.
fasync::Timer::new(Duration::from_millis(100)).await;
assert!(!*done.lock().unwrap());
},
async {
recv1.await.unwrap();
// This should not block since it is a different key.
let _t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(2, 2, 3)], Options::default())
.await
.expect("new_transaction failed");
// Tell the first future to continue.
send2.send(()).unwrap();
},
async {
// This should block until the first future has completed.
recv3.await.unwrap();
let _t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(1, 2, 3)], Options::default())
.await;
*done.lock().unwrap() = true;
}
);
}
#[fasync::run_singlethreaded(test)]
async fn test_read_lock_after_write_lock() {
let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let (send1, recv1) = channel();
let (send2, recv2) = channel();
let done = Mutex::new(false);
join!(
async {
let t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(1, 2, 3)], Options::default())
.await
.expect("new_transaction failed");
send1.send(()).unwrap(); // Tell the next future to continue.
recv2.await.unwrap();
t.commit().await;
*done.lock().unwrap() = true;
},
async {
recv1.await.unwrap();
// Reads should not be blocked until the transaction is committed.
let _guard = fs.read_lock(&[LockKey::object_attribute(1, 2, 3)]).await;
// Tell the first future to continue.
send2.send(()).unwrap();
// It shouldn't proceed until we release our read lock, but it's a halting
// problem, so sleep.
fasync::Timer::new(Duration::from_millis(100)).await;
assert!(!*done.lock().unwrap());
},
);
}
#[fasync::run_singlethreaded(test)]
async fn test_write_lock_after_read_lock() {
let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let (send1, recv1) = channel();
let (send2, recv2) = channel();
let done = Mutex::new(false);
join!(
async {
// Reads should not be blocked until the transaction is committed.
let _guard = fs.read_lock(&[LockKey::object_attribute(1, 2, 3)]).await;
// Tell the next future to continue and then nwait.
send1.send(()).unwrap();
recv2.await.unwrap();
// It shouldn't proceed until we release our read lock, but it's a halting
// problem, so sleep.
fasync::Timer::new(Duration::from_millis(100)).await;
assert!(!*done.lock().unwrap());
},
async {
recv1.await.unwrap();
let t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(1, 2, 3)], Options::default())
.await
.expect("new_transaction failed");
send2.send(()).unwrap(); // Tell the first future to continue;
t.commit().await;
*done.lock().unwrap() = true;
},
);
}
#[fasync::run_singlethreaded(test)]
async fn test_drop_uncommitted_transaction() {
let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let key = LockKey::object(1, 1);
// Dropping while there's a reader.
{
let mut write_lock = fs
.clone()
.new_transaction(&[key.clone()], Options::default())
.await
.expect("new_transaction failed");
let _read_lock = fs.read_lock(&[key.clone()]).await;
fs.clone().drop_transaction(&mut write_lock);
}
// Dropping while there's no reader.
let mut write_lock = fs
.clone()
.new_transaction(&[key.clone()], Options::default())
.await
.expect("new_transaction failed");
fs.clone().drop_transaction(&mut write_lock);
// Make sure we can take the lock again (i.e. it was actually released).
fs.clone()
.new_transaction(&[key.clone()], Options::default())
.await
.expect("new_transaction failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_drop_waiting_write_lock() {
let manager = LockManager::new();
let keys = &[LockKey::object(1, 1)];
{
let _guard = manager.lock(keys, LockState::ReadLock).await;
if let Poll::Ready(_) = futures::poll!(manager.lock(keys, LockState::WriteLock).boxed())
{
assert!(false);
}
}
let _ = manager.lock(keys, LockState::WriteLock).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_write_lock_blocks_everything() {
let manager = LockManager::new();
let keys = &[LockKey::object(1, 1)];
{
let _guard = manager.lock(keys, LockState::WriteLock).await;
if let Poll::Ready(_) = futures::poll!(manager.lock(keys, LockState::WriteLock).boxed())
{
assert!(false);
}
if let Poll::Ready(_) = futures::poll!(manager.lock(keys, LockState::ReadLock).boxed())
{
assert!(false);
}
}
{
let _guard = manager.lock(keys, LockState::WriteLock).await;
}
{
let _guard = manager.lock(keys, LockState::ReadLock).await;
}
}
}