blob: aabb9f4cc9144b7ff558484650b2aa8eeaa87c77 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
lsm_tree::types::Item,
object_store::{
allocator::AllocatorItem,
constants::INVALID_OBJECT_ID,
record::{ObjectItem, ObjectKey, ObjectValue},
},
},
anyhow::Error,
async_trait::async_trait,
futures::future::poll_fn,
serde::{Deserialize, Serialize},
std::{
any::Any,
cmp::Ordering,
collections::{
hash_map::{Entry, HashMap},
BTreeSet,
},
sync::{Arc, Mutex},
task::{Poll, Waker},
vec::Vec,
},
};
#[async_trait]
pub trait TransactionHandler: Send + Sync {
/// Initiates a new transaction. Implementations should check to see that a transaction can be
/// created (for example, by checking to see that the journaling system can accept more
/// transactions), and then call Transaction::new.
async fn new_transaction<'a>(
self: Arc<Self>,
lock_keys: &[LockKey],
) -> Result<Transaction<'a>, Error>;
/// Implementations should perform any required journaling and then apply the mutations via
/// ObjectManager's apply_mutation method. Any mutations within the transaction should be
/// removed so that drop_transaction can tell that the transaction was committed.
async fn commit_transaction(&self, transaction: Transaction<'_>);
/// Drops a transaction (rolling back if not committed). Committing a transaction should have
/// removed the mutations. This is called automatically when Transaction is dropped, which is
/// why this isn't async.
fn drop_transaction(&self, transaction: &mut Transaction<'_>);
/// Acquires a read lock for the given keys. Read locks are only blocked whilst a transaction
/// is being committed for the same locks. They are only necessary where consistency is
/// required between different mutations within a transaction. For example, a write might
/// change the size and extents for an object, in which case a read lock is required so that
/// observed size and extents are seen together or not at all. Implementations should call
/// through to LockManager's read_lock implementation.
async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a>;
}
/// The journal consists of these records which will be replayed at mount time. Within a a
/// transaction, these are stored as a set which allows some mutations to be deduplicated and found
/// (and we require custom comparison functions below). For example, we need to be able to find
/// object size changes.
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
pub enum Mutation {
ObjectStore(ObjectStoreMutation),
Allocator(AllocatorMutation),
// Seal the mutable layer and create a new one.
TreeSeal,
// Discards all non-mutable layers.
TreeCompact,
}
impl Mutation {
pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
Mutation::ObjectStore(ObjectStoreMutation {
item: Item::new(key, value),
op: Operation::Insert,
})
}
pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
Mutation::ObjectStore(ObjectStoreMutation {
item: Item::new(key, value),
op: Operation::ReplaceOrInsert,
})
}
pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
Mutation::ObjectStore(ObjectStoreMutation {
item: Item::new(key, value),
op: Operation::Merge,
})
}
pub fn allocation(item: AllocatorItem) -> Self {
Mutation::Allocator(AllocatorMutation(item))
}
}
// We have custom comparison functions for mutations that just use the key, rather than the key and
// value that would be used by default so that we can deduplicate and find mutations (see
// get_object_mutation below).
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ObjectStoreMutation {
pub item: ObjectItem,
pub op: Operation,
}
// The different LSM tree operations that can be performed as part of a mutation.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Operation {
Insert,
ReplaceOrInsert,
Merge,
}
impl Ord for ObjectStoreMutation {
fn cmp(&self, other: &Self) -> Ordering {
self.item.key.cmp(&other.item.key)
}
}
impl PartialOrd for ObjectStoreMutation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for ObjectStoreMutation {
fn eq(&self, other: &Self) -> bool {
self.item.key.eq(&other.item.key)
}
}
impl Eq for ObjectStoreMutation {}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AllocatorMutation(pub AllocatorItem);
impl Ord for AllocatorMutation {
fn cmp(&self, other: &Self) -> Ordering {
self.0.key.cmp(&other.0.key)
}
}
impl PartialOrd for AllocatorMutation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for AllocatorMutation {
fn eq(&self, other: &Self) -> bool {
self.0.key.eq(&other.0.key)
}
}
impl Eq for AllocatorMutation {}
/// When creating a transaction, locks typically need to be held to prevent two or more writers
/// trying to make conflicting mutations at the same time. LockKeys are used for this.
/// TODO(csuter): At the moment, these keys only apply to writers, but there needs to be some
/// support for readers, since there are races that can occur whilst a transaction is being
/// committed.
#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub enum LockKey {
/// Used to lock changes to a particular object attribute (e.g. writes).
ObjectAttribute { store_object_id: u64, object_id: u64, attribute_id: u64 },
/// Used to lock changes to a particular object (e.g. adding a child to a directory).
Object { store_object_id: u64, object_id: u64 },
/// Used to lock changes to the volume directory.
VolumeDirectory,
}
impl LockKey {
pub fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
}
pub fn object(store_object_id: u64, object_id: u64) -> Self {
LockKey::Object { store_object_id, object_id }
}
}
// Mutations can be associated with an object so that when mutations are applied, updates can be
// applied to in-memory strucutres. For example, we cache object sizes, so when a size change is
// applied, we can update the cached object size.
pub type AssociatedObject<'a> = &'a (dyn Any + Send + Sync);
#[derive(Clone)]
pub struct TxnMutation<'a> {
// This, at time of writing, is either the object ID of an object store, or the object ID of the
// allocator. In the case of an object mutation, there's another object ID in the mutation
// record that would be for the object actually being changed.
pub object_id: u64,
// The actual mutation. This gets serialized to the journal.
pub mutation: Mutation,
// An optional associated object for the mutation. During replay, there will always be no
// associated object.
pub associated_object: Option<AssociatedObject<'a>>,
}
// We store TxnMutation in a set, and for that, we only use object_id and mutation and not the
// associated object.
impl Ord for TxnMutation<'_> {
fn cmp(&self, other: &Self) -> Ordering {
self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
}
}
impl PartialOrd for TxnMutation<'_> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for TxnMutation<'_> {
fn eq(&self, other: &Self) -> bool {
self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
}
}
impl Eq for TxnMutation<'_> {}
/// A transaction groups mutation records to be commited as a group.
pub struct Transaction<'a> {
handler: Arc<dyn TransactionHandler>,
/// The mutations that make up this transaction.
pub mutations: BTreeSet<TxnMutation<'a>>,
/// The locks that this transaction currently holds.
pub lock_keys: Vec<LockKey>,
}
impl<'a> Transaction<'a> {
/// Creates a new transaction. This should typically be called by a TransactionHandler's
/// implementation of new_transaction.
pub fn new(handler: Arc<dyn TransactionHandler>, lock_keys: Vec<LockKey>) -> Transaction<'a> {
Transaction { handler, mutations: BTreeSet::new(), lock_keys }
}
/// Adds a mutation to this transaction.
pub fn add(&mut self, object_id: u64, mutation: Mutation) {
assert!(object_id != INVALID_OBJECT_ID);
self.mutations.replace(TxnMutation { object_id, mutation, associated_object: None });
}
/// Adds a mutation with an assoicated object.
pub fn add_with_object(
&mut self,
object_id: u64,
mutation: Mutation,
associated_object: AssociatedObject<'a>,
) {
assert!(object_id != INVALID_OBJECT_ID);
self.mutations.replace(TxnMutation {
object_id,
mutation,
associated_object: Some(associated_object),
});
}
/// Returns true if this transaction has no mutations.
pub fn is_empty(&self) -> bool {
self.mutations.is_empty()
}
/// Searches for an existing object mutation within the transaction that has the given key and
/// returns it if found.
pub fn get_object_mutation(
&self,
object_id: u64,
key: ObjectKey,
) -> Option<&ObjectStoreMutation> {
if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
self.mutations.get(&TxnMutation {
object_id,
mutation: Mutation::insert_object(key, ObjectValue::None),
associated_object: None,
})
{
Some(mutation)
} else {
None
}
}
/// Commits a transaction.
pub async fn commit(self) {
self.handler.clone().commit_transaction(self).await;
}
}
impl Drop for Transaction<'_> {
fn drop(&mut self) {
// Call the TransactionHandler implementation of drop_transaction which should, as a
// minimum, call LockManager's drop_transaction to ensure the locks are released.
self.handler.clone().drop_transaction(self);
}
}
/// LockManager holds the locks that transactions might have taken. A TransactionManager
/// implementation would typically have one of these.
pub struct LockManager {
locks: Mutex<Locks>,
}
struct Locks {
sequence: u64,
keys: HashMap<LockKey, LockEntry>,
}
struct LockEntry {
sequence: u64,
read_count: u64,
state: LockState,
wakers: Vec<Waker>,
}
enum LockState {
Unlocked,
Locked,
Committing(Waker),
}
impl LockManager {
pub fn new() -> Self {
LockManager { locks: Mutex::new(Locks { sequence: 0, keys: HashMap::new() }) }
}
/// Acquires the locks. To avoid deadlocks, the locks should be sorted. It is the caller's
/// responsibility to ensure that drop_transaction is called when a transaction is dropped i.e.
/// implementers of TransactionHandler's drop_transaction method should call LockManager's
/// drop_transaction method.
pub async fn lock(&self, lock_keys: &[LockKey]) {
for lock in lock_keys {
let mut waker_sequence = 0;
let mut waker_index = 0;
poll_fn(|cx| {
let mut locks = self.locks.lock().unwrap();
let Locks { sequence, keys } = &mut *locks;
match keys.entry(lock.clone()) {
Entry::Vacant(vacant) => {
*sequence += 1;
vacant.insert(LockEntry {
sequence: *sequence,
read_count: 0,
state: LockState::Locked,
wakers: Vec::new(),
});
Poll::Ready(())
}
Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
if let LockState::Unlocked = entry.state {
entry.state = LockState::Locked;
Poll::Ready(())
} else {
if entry.sequence == waker_sequence {
entry.wakers[waker_index] = cx.waker().clone();
} else {
waker_index = entry.wakers.len();
waker_sequence = *sequence;
entry.wakers.push(cx.waker().clone());
}
Poll::Pending
}
}
}
})
.await;
}
}
/// This should be called by a TransactionHandler drop_transaction implementation.
pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
let mut locks = self.locks.lock().unwrap();
for lock in transaction.lock_keys.drain(..) {
let (_, entry) = locks.keys.remove_entry(&lock).unwrap();
for waker in entry.wakers {
waker.wake();
}
}
}
/// Prepares to commit by waiting for readers to finish.
pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
for lock in &transaction.lock_keys {
poll_fn(|cx| {
let mut locks = self.locks.lock().unwrap();
let entry = locks.keys.get_mut(&lock).expect("key missing!");
entry.state = LockState::Committing(cx.waker().clone());
if entry.read_count > 0 {
Poll::Pending
} else {
Poll::Ready(())
}
})
.await;
}
}
pub async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a> {
let mut lock_keys: Vec<_> = lock_keys.iter().cloned().collect();
lock_keys.sort_unstable();
for lock in &lock_keys {
let mut waker_sequence = 0;
let mut waker_index = 0;
poll_fn(|cx| {
let mut locks = self.locks.lock().unwrap();
let Locks { sequence, keys } = &mut *locks;
match keys.entry(lock.clone()) {
Entry::Vacant(vacant) => {
*sequence += 1;
vacant.insert(LockEntry {
sequence: *sequence,
read_count: 1,
state: LockState::Unlocked,
wakers: Vec::new(),
});
Poll::Ready(())
}
Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
if let LockState::Committing(_) = entry.state {
if entry.sequence == waker_sequence {
entry.wakers[waker_index] = cx.waker().clone();
} else {
waker_index = entry.wakers.len();
waker_sequence = *sequence;
entry.wakers.push(cx.waker().clone());
}
Poll::Pending
} else {
entry.read_count += 1;
Poll::Ready(())
}
}
}
})
.await;
}
ReadGuard { manager: self, lock_keys }
}
}
#[must_use]
pub struct ReadGuard<'a> {
manager: &'a LockManager,
lock_keys: Vec<LockKey>,
}
impl Drop for ReadGuard<'_> {
fn drop(&mut self) {
let mut locks = self.manager.locks.lock().unwrap();
for lock in std::mem::take(&mut self.lock_keys) {
if let Entry::Occupied(mut occupied) = locks.keys.entry(lock) {
let entry = occupied.get_mut();
entry.read_count -= 1;
if entry.read_count == 0 {
match entry.state {
LockState::Unlocked => {
occupied.remove_entry();
}
LockState::Locked => {}
LockState::Committing(ref waker) => waker.wake_by_ref(),
}
}
} else {
unreachable!(); // The entry *must* be in the HashMap.
}
}
}
}
#[cfg(test)]
mod tests {
use {
super::{LockKey, Mutation, TransactionHandler},
crate::{
object_store::testing::fake_filesystem::FakeFilesystem,
testing::fake_device::FakeDevice,
},
fuchsia_async as fasync,
futures::{channel::oneshot::channel, join},
std::{
sync::{Arc, Mutex},
time::Duration,
},
};
#[fasync::run_singlethreaded(test)]
async fn test_simple() {
let device = Arc::new(FakeDevice::new(1024, 1024));
let fs = FakeFilesystem::new(device);
let mut t = fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
t.add(1, Mutation::TreeSeal);
assert!(!t.is_empty());
}
#[fasync::run_singlethreaded(test)]
async fn test_locks() {
let device = Arc::new(FakeDevice::new(1024, 1024));
let fs = FakeFilesystem::new(device);
let (send1, recv1) = channel();
let (send2, recv2) = channel();
let (send3, recv3) = channel();
let done = Mutex::new(false);
join!(
async {
let _t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(1, 2, 3)])
.await
.expect("new_transaction failed");
send1.send(()).unwrap(); // Tell the next future to continue.
send3.send(()).unwrap(); // Tell the last future to continue.
recv2.await.unwrap();
// This is a halting problem so all we can do is sleep.
fasync::Timer::new(Duration::from_millis(100)).await;
assert!(!*done.lock().unwrap());
},
async {
recv1.await.unwrap();
// This should not block since it is a different key.
let _t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(2, 2, 3)])
.await
.expect("new_transaction failed");
// Tell the first future to continue.
send2.send(()).unwrap();
},
async {
// This should block until the first future has completed.
recv3.await.unwrap();
let _t = fs.clone().new_transaction(&[LockKey::object_attribute(1, 2, 3)]).await;
*done.lock().unwrap() = true;
}
);
}
#[fasync::run_singlethreaded(test)]
async fn test_read_lock_after_write_lock() {
let device = Arc::new(FakeDevice::new(1024, 1024));
let fs = FakeFilesystem::new(device);
let (send1, recv1) = channel();
let (send2, recv2) = channel();
let done = Mutex::new(false);
join!(
async {
let t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(1, 2, 3)])
.await
.expect("new_transaction failed");
send1.send(()).unwrap(); // Tell the next future to continue.
recv2.await.unwrap();
t.commit().await;
*done.lock().unwrap() = true;
},
async {
recv1.await.unwrap();
// Reads should not be blocked until the transaction is committed.
let _guard = fs.read_lock(&[LockKey::object_attribute(1, 2, 3)]).await;
// Tell the first future to continue.
send2.send(()).unwrap();
// It shouldn't proceed until we release our read lock, but it's a halting
// problem, so sleep.
fasync::Timer::new(Duration::from_millis(100)).await;
assert!(!*done.lock().unwrap());
},
);
}
#[fasync::run_singlethreaded(test)]
async fn test_write_lock_after_read_lock() {
let device = Arc::new(FakeDevice::new(1024, 1024));
let fs = FakeFilesystem::new(device);
let (send1, recv1) = channel();
let (send2, recv2) = channel();
let done = Mutex::new(false);
join!(
async {
// Reads should not be blocked until the transaction is committed.
let _guard = fs.read_lock(&[LockKey::object_attribute(1, 2, 3)]).await;
// Tell the next future to continue and then nwait.
send1.send(()).unwrap();
recv2.await.unwrap();
// It shouldn't proceed until we release our read lock, but it's a halting
// problem, so sleep.
fasync::Timer::new(Duration::from_millis(100)).await;
assert!(!*done.lock().unwrap());
},
async {
recv1.await.unwrap();
let t = fs
.clone()
.new_transaction(&[LockKey::object_attribute(1, 2, 3)])
.await
.expect("new_transaction failed");
send2.send(()).unwrap(); // Tell the first future to continue;
t.commit().await;
*done.lock().unwrap() = true;
},
);
}
}