[fxfs] Add read locks
This adds support for read locks which are necessary in some cases, the
primary example being that of a write to an object involving a size
change, where the size change should be observable with the data that
was written to cause the size change, or not at all.
Change-Id: I2eaefb743f5051a93a0b52e0237340296a81abf4
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/514221
Commit-Queue: Chris Suter <csuter@google.com>
Reviewed-by: James Sullivan <jfsulliv@google.com>
diff --git a/src/storage/fxfs/BUILD.gn b/src/storage/fxfs/BUILD.gn
index 5030cb8..29bf874 100644
--- a/src/storage/fxfs/BUILD.gn
+++ b/src/storage/fxfs/BUILD.gn
@@ -12,6 +12,7 @@
"//third_party/rust_crates:async-trait",
"//third_party/rust_crates:bincode",
"//third_party/rust_crates:byteorder",
+ "//third_party/rust_crates:either-v1_5_0",
"//third_party/rust_crates:futures",
"//third_party/rust_crates:log",
"//third_party/rust_crates:num",
diff --git a/src/storage/fxfs/src/object_store.rs b/src/storage/fxfs/src/object_store.rs
index 4c44922..7d0e9ed 100644
--- a/src/storage/fxfs/src/object_store.rs
+++ b/src/storage/fxfs/src/object_store.rs
@@ -498,7 +498,7 @@
/// Extend the file with the given extent. The only use case for this right now is for files
/// that must exist at certain offsets on the device, such as super-blocks.
pub async fn extend<'a>(&'a self, transaction: &mut Transaction<'a>, device_range: Range<u64>) {
- let old_size = self.get_size();
+ let old_size = self.txn_get_size(transaction);
let new_size = old_size + device_range.end - device_range.start;
self.store.allocator().reserve(transaction, device_range.clone()).await;
transaction.add_with_object(
@@ -738,6 +738,14 @@
if buf.len() == 0 {
return Ok(0);
}
+ let fs = self.store.filesystem();
+ let _guard = fs
+ .read_lock(&[LockKey::object_attribute(
+ self.store.store_object_id,
+ self.object_id,
+ self.attribute_id,
+ )])
+ .await;
let size = self.get_size();
if offset >= size {
return Ok(0);
@@ -1039,6 +1047,7 @@
},
fuchsia_async as fasync,
futures::{channel::oneshot::channel, join},
+ rand::Rng,
std::{
ops::Bound,
sync::{Arc, Mutex},
@@ -1441,6 +1450,50 @@
}
);
}
+
+ #[fasync::run(10, test)]
+ async fn test_racy_reads() {
+ let (fs, _allocator, store) = test_filesystem_and_store().await;
+ let object;
+ let mut transaction =
+ fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
+ object = Arc::new(
+ store
+ .create_object(&mut transaction, HandleOptions::default())
+ .await
+ .expect("create_object failed"),
+ );
+ transaction.commit().await;
+ for _ in 0..100 {
+ let cloned_object = object.clone();
+ let writer = fasync::Task::spawn(async move {
+ let mut buf = cloned_object.allocate_buffer(10);
+ buf.as_mut_slice().fill(123);
+ cloned_object.write(0, buf.as_ref()).await.expect("write failed");
+ });
+ let cloned_object = object.clone();
+ let reader = fasync::Task::spawn(async move {
+ let wait_time = rand::thread_rng().gen_range(0, 5);
+ fasync::Timer::new(Duration::from_millis(wait_time)).await;
+ let mut buf = cloned_object.allocate_buffer(10);
+ buf.as_mut_slice().fill(23);
+ let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
+ // If we succeed in reading data, it must include the write; i.e. if we see the size
+ // change, we should see the data too. For this to succeed it requires locking on
+ // the read size to ensure that when we read the size, we get the extents changed in
+ // that same transaction.
+ if amount != 0 {
+ assert_eq!(amount, 10);
+ assert_eq!(buf.as_slice(), &[123; 10]);
+ }
+ });
+ writer.await;
+ reader.await;
+ let mut transaction = object.new_transaction().await.expect("new_transaction failed");
+ object.truncate(&mut transaction, 0).await.expect("truncate failed");
+ transaction.commit().await;
+ }
+ }
}
// TODO(csuter): validation of all deserialized structs.
diff --git a/src/storage/fxfs/src/object_store/filesystem.rs b/src/storage/fxfs/src/object_store/filesystem.rs
index ce6d8c5..722196f 100644
--- a/src/storage/fxfs/src/object_store/filesystem.rs
+++ b/src/storage/fxfs/src/object_store/filesystem.rs
@@ -10,8 +10,8 @@
constants::INVALID_OBJECT_ID,
journal::{Journal, JournalCheckpoint},
transaction::{
- AssociatedObject, LockKey, LockManager, Mutation, Transaction, TransactionHandler,
- TxnMutation,
+ AssociatedObject, LockKey, LockManager, Mutation, ReadGuard, Transaction,
+ TransactionHandler, TxnMutation,
},
ObjectStore,
},
@@ -352,6 +352,7 @@
}
async fn commit_transaction(&self, transaction: Transaction<'_>) {
+ self.lock_manager.commit_prepare(&transaction).await;
self.journal.commit(transaction).await;
}
@@ -359,6 +360,10 @@
self.objects.drop_transaction(transaction);
self.lock_manager.drop_transaction(transaction);
}
+
+ async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a> {
+ self.lock_manager.read_lock(lock_keys).await
+ }
}
// TODO(csuter): How do we ensure sync prior to drop?
diff --git a/src/storage/fxfs/src/object_store/testing/fake_filesystem.rs b/src/storage/fxfs/src/object_store/testing/fake_filesystem.rs
index f9c4ada..1266af2 100644
--- a/src/storage/fxfs/src/object_store/testing/fake_filesystem.rs
+++ b/src/storage/fxfs/src/object_store/testing/fake_filesystem.rs
@@ -9,7 +9,9 @@
allocator::Allocator,
filesystem::{Filesystem, ObjectManager, ObjectSync},
journal::JournalCheckpoint,
- transaction::{LockKey, LockManager, Transaction, TransactionHandler, TxnMutation},
+ transaction::{
+ LockKey, LockManager, ReadGuard, Transaction, TransactionHandler, TxnMutation,
+ },
ObjectStore,
},
},
@@ -71,6 +73,7 @@
}
async fn commit_transaction(&self, mut transaction: Transaction<'_>) {
+ self.lock_manager.commit_prepare(&transaction).await;
for TxnMutation { object_id, mutation, associated_object } in
std::mem::take(&mut transaction.mutations)
{
@@ -90,4 +93,8 @@
self.object_manager.drop_transaction(transaction);
self.lock_manager.drop_transaction(transaction);
}
+
+ async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a> {
+ self.lock_manager.read_lock(lock_keys).await
+ }
}
diff --git a/src/storage/fxfs/src/object_store/transaction.rs b/src/storage/fxfs/src/object_store/transaction.rs
index 01b6454..aabb9f4 100644
--- a/src/storage/fxfs/src/object_store/transaction.rs
+++ b/src/storage/fxfs/src/object_store/transaction.rs
@@ -47,6 +47,14 @@
/// removed the mutations. This is called automatically when Transaction is dropped, which is
/// why this isn't async.
fn drop_transaction(&self, transaction: &mut Transaction<'_>);
+
+ /// Acquires a read lock for the given keys. Read locks are only blocked whilst a transaction
+ /// is being committed for the same locks. They are only necessary where consistency is
+ /// required between different mutations within a transaction. For example, a write might
+ /// change the size and extents for an object, in which case a read lock is required so that
+ /// observed size and extents are seen together or not at all. Implementations should call
+ /// through to LockManager's read_lock implementation.
+ async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a>;
}
/// The journal consists of these records which will be replayed at mount time. Within a a
@@ -306,7 +314,20 @@
struct Locks {
sequence: u64,
- keys: HashMap<LockKey, (u64, Vec<Waker>)>,
+ keys: HashMap<LockKey, LockEntry>,
+}
+
+struct LockEntry {
+ sequence: u64,
+ read_count: u64,
+ state: LockState,
+ wakers: Vec<Waker>,
+}
+
+enum LockState {
+ Unlocked,
+ Locked,
+ Committing(Waker),
}
impl LockManager {
@@ -328,19 +349,29 @@
match keys.entry(lock.clone()) {
Entry::Vacant(vacant) => {
*sequence += 1;
- vacant.insert((*sequence, Vec::new()));
+ vacant.insert(LockEntry {
+ sequence: *sequence,
+ read_count: 0,
+ state: LockState::Locked,
+ wakers: Vec::new(),
+ });
Poll::Ready(())
}
Entry::Occupied(mut occupied) => {
- let (sequence, wakers) = occupied.get_mut();
- if *sequence == waker_sequence {
- wakers[waker_index] = cx.waker().clone();
+ let entry = occupied.get_mut();
+ if let LockState::Unlocked = entry.state {
+ entry.state = LockState::Locked;
+ Poll::Ready(())
} else {
- waker_index = wakers.len();
- waker_sequence = *sequence;
- wakers.push(cx.waker().clone());
+ if entry.sequence == waker_sequence {
+ entry.wakers[waker_index] = cx.waker().clone();
+ } else {
+ waker_index = entry.wakers.len();
+ waker_sequence = *sequence;
+ entry.wakers.push(cx.waker().clone());
+ }
+ Poll::Pending
}
- Poll::Pending
}
}
})
@@ -352,12 +383,101 @@
pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
let mut locks = self.locks.lock().unwrap();
for lock in transaction.lock_keys.drain(..) {
- let (_, (_, wakers)) = locks.keys.remove_entry(&lock).unwrap();
- for waker in wakers {
+ let (_, entry) = locks.keys.remove_entry(&lock).unwrap();
+ for waker in entry.wakers {
waker.wake();
}
}
}
+
+ /// Prepares to commit by waiting for readers to finish.
+ pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
+ for lock in &transaction.lock_keys {
+ poll_fn(|cx| {
+ let mut locks = self.locks.lock().unwrap();
+ let entry = locks.keys.get_mut(&lock).expect("key missing!");
+ entry.state = LockState::Committing(cx.waker().clone());
+ if entry.read_count > 0 {
+ Poll::Pending
+ } else {
+ Poll::Ready(())
+ }
+ })
+ .await;
+ }
+ }
+
+ pub async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a> {
+ let mut lock_keys: Vec<_> = lock_keys.iter().cloned().collect();
+ lock_keys.sort_unstable();
+ for lock in &lock_keys {
+ let mut waker_sequence = 0;
+ let mut waker_index = 0;
+ poll_fn(|cx| {
+ let mut locks = self.locks.lock().unwrap();
+ let Locks { sequence, keys } = &mut *locks;
+ match keys.entry(lock.clone()) {
+ Entry::Vacant(vacant) => {
+ *sequence += 1;
+ vacant.insert(LockEntry {
+ sequence: *sequence,
+ read_count: 1,
+ state: LockState::Unlocked,
+ wakers: Vec::new(),
+ });
+ Poll::Ready(())
+ }
+ Entry::Occupied(mut occupied) => {
+ let entry = occupied.get_mut();
+ if let LockState::Committing(_) = entry.state {
+ if entry.sequence == waker_sequence {
+ entry.wakers[waker_index] = cx.waker().clone();
+ } else {
+ waker_index = entry.wakers.len();
+ waker_sequence = *sequence;
+ entry.wakers.push(cx.waker().clone());
+ }
+ Poll::Pending
+ } else {
+ entry.read_count += 1;
+ Poll::Ready(())
+ }
+ }
+ }
+ })
+ .await;
+ }
+ ReadGuard { manager: self, lock_keys }
+ }
+}
+
+#[must_use]
+pub struct ReadGuard<'a> {
+ manager: &'a LockManager,
+ lock_keys: Vec<LockKey>,
+}
+
+impl Drop for ReadGuard<'_> {
+ fn drop(&mut self) {
+ let mut locks = self.manager.locks.lock().unwrap();
+ for lock in std::mem::take(&mut self.lock_keys) {
+ if let Entry::Occupied(mut occupied) = locks.keys.entry(lock) {
+ let entry = occupied.get_mut();
+ entry.read_count -= 1;
+ if entry.read_count == 0 {
+ match entry.state {
+ LockState::Unlocked => {
+ occupied.remove_entry();
+ }
+ LockState::Locked => {}
+ LockState::Committing(ref waker) => waker.wake_by_ref(),
+ }
+ }
+ } else {
+ unreachable!(); // The entry *must* be in the HashMap.
+ }
+ }
+ }
}
#[cfg(test)]
@@ -426,4 +546,70 @@
}
);
}
+
+ #[fasync::run_singlethreaded(test)]
+ async fn test_read_lock_after_write_lock() {
+ let device = Arc::new(FakeDevice::new(1024, 1024));
+ let fs = FakeFilesystem::new(device);
+ let (send1, recv1) = channel();
+ let (send2, recv2) = channel();
+ let done = Mutex::new(false);
+ join!(
+ async {
+ let t = fs
+ .clone()
+ .new_transaction(&[LockKey::object_attribute(1, 2, 3)])
+ .await
+ .expect("new_transaction failed");
+ send1.send(()).unwrap(); // Tell the next future to continue.
+ recv2.await.unwrap();
+ t.commit().await;
+ *done.lock().unwrap() = true;
+ },
+ async {
+ recv1.await.unwrap();
+ // Reads should not be blocked until the transaction is committed.
+ let _guard = fs.read_lock(&[LockKey::object_attribute(1, 2, 3)]).await;
+ // Tell the first future to continue.
+ send2.send(()).unwrap();
+ // It shouldn't proceed until we release our read lock, but it's a halting
+ // problem, so sleep.
+ fasync::Timer::new(Duration::from_millis(100)).await;
+ assert!(!*done.lock().unwrap());
+ },
+ );
+ }
+
+ #[fasync::run_singlethreaded(test)]
+ async fn test_write_lock_after_read_lock() {
+ let device = Arc::new(FakeDevice::new(1024, 1024));
+ let fs = FakeFilesystem::new(device);
+ let (send1, recv1) = channel();
+ let (send2, recv2) = channel();
+ let done = Mutex::new(false);
+ join!(
+ async {
+ // Reads should not be blocked until the transaction is committed.
+ let _guard = fs.read_lock(&[LockKey::object_attribute(1, 2, 3)]).await;
+ // Tell the next future to continue and then nwait.
+ send1.send(()).unwrap();
+ recv2.await.unwrap();
+ // It shouldn't proceed until we release our read lock, but it's a halting
+ // problem, so sleep.
+ fasync::Timer::new(Duration::from_millis(100)).await;
+ assert!(!*done.lock().unwrap());
+ },
+ async {
+ recv1.await.unwrap();
+ let t = fs
+ .clone()
+ .new_transaction(&[LockKey::object_attribute(1, 2, 3)])
+ .await
+ .expect("new_transaction failed");
+ send2.send(()).unwrap(); // Tell the first future to continue;
+ t.commit().await;
+ *done.lock().unwrap() = true;
+ },
+ );
+ }
}
diff --git a/src/storage/fxfs/src/server/directory.rs b/src/storage/fxfs/src/server/directory.rs
index 38fd936..e367d99 100644
--- a/src/storage/fxfs/src/server/directory.rs
+++ b/src/storage/fxfs/src/server/directory.rs
@@ -14,6 +14,7 @@
},
anyhow::{bail, Error},
async_trait::async_trait,
+ either::{Left, Right},
fidl::endpoints::ServerEnd,
fidl_fuchsia_io::{
self as fio, NodeAttributes, NodeMarker, MODE_TYPE_DIRECTORY, OPEN_FLAG_CREATE,
@@ -58,6 +59,7 @@
// writers, but the readers don't have any locks, so there needs to be checks that nodes
// still exist at some point.
let mut current_node = FxNode::Dir(self.clone());
+ let fs = self.volume.store().filesystem();
while !path.is_empty() {
let last_segment = path.is_single_component();
let current_dir = match current_node {
@@ -68,10 +70,9 @@
// Create the transaction here if we might need to create the object so that we have a
// lock in place.
let store = self.volume.store();
- let transaction = if last_segment && flags & OPEN_FLAG_CREATE != 0 {
- Some(
- store
- .filesystem()
+ let transaction_or_guard = if last_segment && flags & OPEN_FLAG_CREATE != 0 {
+ Left(
+ fs.clone()
.new_transaction(&[LockKey::object(
store.store_object_id(),
current_dir.directory.object_id(),
@@ -79,7 +80,21 @@
.await?,
)
} else {
- None
+ // When child objects are created, the object is created along with the directory
+ // entry in the same transaction, and so we need to hold a read lock over the lookup
+ // and open calls.
+
+ // TODO(csuter): I think that this cannot be tested easily at the moment because it
+ // would only result in open_or_load_node returning NotFound, which is what we would
+ // want to return anyway. When we add unlink support, we might be able to use this
+ // same lock to guard against the race mentioned above.
+ Right(
+ fs.read_lock(&[LockKey::object(
+ store.store_object_id(),
+ current_dir.directory.object_id(),
+ )])
+ .await,
+ )
};
match current_dir.directory.lookup(name).await {
Ok((object_id, object_descriptor)) => {
@@ -93,10 +108,10 @@
self.volume.open_or_load_node(object_id, object_descriptor).await?;
}
Err(e) if FxfsError::NotFound.matches(&e) => {
- if let Some(transaction) = transaction {
+ if let Left(transaction) = transaction_or_guard {
return self.create_child(transaction, ¤t_dir, name, mode).await;
} else {
- bail!(FxfsError::NotFound);
+ return Err(e);
}
}
Err(e) => return Err(e),
diff --git a/src/storage/fxfs/src/testing/fake_object.rs b/src/storage/fxfs/src/testing/fake_object.rs
index d5935ba..5a5bb44 100644
--- a/src/storage/fxfs/src/testing/fake_object.rs
+++ b/src/storage/fxfs/src/testing/fake_object.rs
@@ -9,7 +9,9 @@
buffer_allocator::{BufferAllocator, MemBufferSource},
},
object_handle::ObjectHandle,
- object_store::transaction::{LockKey, LockManager, Transaction, TransactionHandler},
+ object_store::transaction::{
+ LockKey, LockManager, ReadGuard, Transaction, TransactionHandler,
+ },
},
anyhow::Error,
async_trait::async_trait,
@@ -66,11 +68,17 @@
Ok(Transaction::new(self, locks))
}
- async fn commit_transaction(&self, _transaction: Transaction<'_>) {}
+ async fn commit_transaction(&self, mut transaction: Transaction<'_>) {
+ std::mem::take(&mut transaction.mutations);
+ }
fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
self.lock_manager.drop_transaction(transaction);
}
+
+ async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a> {
+ self.lock_manager.read_lock(lock_keys).await
+ }
}
impl AsRef<LockManager> for FakeObject {