[fxfs] Add read locks

This adds support for read locks which are necessary in some cases, the
primary example being that of a write to an object involving a size
change, where the size change should be observable with the data that
was written to cause the size change, or not at all.

Change-Id: I2eaefb743f5051a93a0b52e0237340296a81abf4
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/514221
Commit-Queue: Chris Suter <csuter@google.com>
Reviewed-by: James Sullivan <jfsulliv@google.com>
diff --git a/src/storage/fxfs/BUILD.gn b/src/storage/fxfs/BUILD.gn
index 5030cb8..29bf874 100644
--- a/src/storage/fxfs/BUILD.gn
+++ b/src/storage/fxfs/BUILD.gn
@@ -12,6 +12,7 @@
   "//third_party/rust_crates:async-trait",
   "//third_party/rust_crates:bincode",
   "//third_party/rust_crates:byteorder",
+  "//third_party/rust_crates:either-v1_5_0",
   "//third_party/rust_crates:futures",
   "//third_party/rust_crates:log",
   "//third_party/rust_crates:num",
diff --git a/src/storage/fxfs/src/object_store.rs b/src/storage/fxfs/src/object_store.rs
index 4c44922..7d0e9ed 100644
--- a/src/storage/fxfs/src/object_store.rs
+++ b/src/storage/fxfs/src/object_store.rs
@@ -498,7 +498,7 @@
     /// Extend the file with the given extent.  The only use case for this right now is for files
     /// that must exist at certain offsets on the device, such as super-blocks.
     pub async fn extend<'a>(&'a self, transaction: &mut Transaction<'a>, device_range: Range<u64>) {
-        let old_size = self.get_size();
+        let old_size = self.txn_get_size(transaction);
         let new_size = old_size + device_range.end - device_range.start;
         self.store.allocator().reserve(transaction, device_range.clone()).await;
         transaction.add_with_object(
@@ -738,6 +738,14 @@
         if buf.len() == 0 {
             return Ok(0);
         }
+        let fs = self.store.filesystem();
+        let _guard = fs
+            .read_lock(&[LockKey::object_attribute(
+                self.store.store_object_id,
+                self.object_id,
+                self.attribute_id,
+            )])
+            .await;
         let size = self.get_size();
         if offset >= size {
             return Ok(0);
@@ -1039,6 +1047,7 @@
         },
         fuchsia_async as fasync,
         futures::{channel::oneshot::channel, join},
+        rand::Rng,
         std::{
             ops::Bound,
             sync::{Arc, Mutex},
@@ -1441,6 +1450,50 @@
             }
         );
     }
+
+    #[fasync::run(10, test)]
+    async fn test_racy_reads() {
+        let (fs, _allocator, store) = test_filesystem_and_store().await;
+        let object;
+        let mut transaction =
+            fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
+        object = Arc::new(
+            store
+                .create_object(&mut transaction, HandleOptions::default())
+                .await
+                .expect("create_object failed"),
+        );
+        transaction.commit().await;
+        for _ in 0..100 {
+            let cloned_object = object.clone();
+            let writer = fasync::Task::spawn(async move {
+                let mut buf = cloned_object.allocate_buffer(10);
+                buf.as_mut_slice().fill(123);
+                cloned_object.write(0, buf.as_ref()).await.expect("write failed");
+            });
+            let cloned_object = object.clone();
+            let reader = fasync::Task::spawn(async move {
+                let wait_time = rand::thread_rng().gen_range(0, 5);
+                fasync::Timer::new(Duration::from_millis(wait_time)).await;
+                let mut buf = cloned_object.allocate_buffer(10);
+                buf.as_mut_slice().fill(23);
+                let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
+                // If we succeed in reading data, it must include the write; i.e. if we see the size
+                // change, we should see the data too.  For this to succeed it requires locking on
+                // the read size to ensure that when we read the size, we get the extents changed in
+                // that same transaction.
+                if amount != 0 {
+                    assert_eq!(amount, 10);
+                    assert_eq!(buf.as_slice(), &[123; 10]);
+                }
+            });
+            writer.await;
+            reader.await;
+            let mut transaction = object.new_transaction().await.expect("new_transaction failed");
+            object.truncate(&mut transaction, 0).await.expect("truncate failed");
+            transaction.commit().await;
+        }
+    }
 }
 
 // TODO(csuter): validation of all deserialized structs.
diff --git a/src/storage/fxfs/src/object_store/filesystem.rs b/src/storage/fxfs/src/object_store/filesystem.rs
index ce6d8c5..722196f 100644
--- a/src/storage/fxfs/src/object_store/filesystem.rs
+++ b/src/storage/fxfs/src/object_store/filesystem.rs
@@ -10,8 +10,8 @@
             constants::INVALID_OBJECT_ID,
             journal::{Journal, JournalCheckpoint},
             transaction::{
-                AssociatedObject, LockKey, LockManager, Mutation, Transaction, TransactionHandler,
-                TxnMutation,
+                AssociatedObject, LockKey, LockManager, Mutation, ReadGuard, Transaction,
+                TransactionHandler, TxnMutation,
             },
             ObjectStore,
         },
@@ -352,6 +352,7 @@
     }
 
     async fn commit_transaction(&self, transaction: Transaction<'_>) {
+        self.lock_manager.commit_prepare(&transaction).await;
         self.journal.commit(transaction).await;
     }
 
@@ -359,6 +360,10 @@
         self.objects.drop_transaction(transaction);
         self.lock_manager.drop_transaction(transaction);
     }
+
+    async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a> {
+        self.lock_manager.read_lock(lock_keys).await
+    }
 }
 
 // TODO(csuter): How do we ensure sync prior to drop?
diff --git a/src/storage/fxfs/src/object_store/testing/fake_filesystem.rs b/src/storage/fxfs/src/object_store/testing/fake_filesystem.rs
index f9c4ada..1266af2 100644
--- a/src/storage/fxfs/src/object_store/testing/fake_filesystem.rs
+++ b/src/storage/fxfs/src/object_store/testing/fake_filesystem.rs
@@ -9,7 +9,9 @@
             allocator::Allocator,
             filesystem::{Filesystem, ObjectManager, ObjectSync},
             journal::JournalCheckpoint,
-            transaction::{LockKey, LockManager, Transaction, TransactionHandler, TxnMutation},
+            transaction::{
+                LockKey, LockManager, ReadGuard, Transaction, TransactionHandler, TxnMutation,
+            },
             ObjectStore,
         },
     },
@@ -71,6 +73,7 @@
     }
 
     async fn commit_transaction(&self, mut transaction: Transaction<'_>) {
+        self.lock_manager.commit_prepare(&transaction).await;
         for TxnMutation { object_id, mutation, associated_object } in
             std::mem::take(&mut transaction.mutations)
         {
@@ -90,4 +93,8 @@
         self.object_manager.drop_transaction(transaction);
         self.lock_manager.drop_transaction(transaction);
     }
+
+    async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a> {
+        self.lock_manager.read_lock(lock_keys).await
+    }
 }
diff --git a/src/storage/fxfs/src/object_store/transaction.rs b/src/storage/fxfs/src/object_store/transaction.rs
index 01b6454..aabb9f4 100644
--- a/src/storage/fxfs/src/object_store/transaction.rs
+++ b/src/storage/fxfs/src/object_store/transaction.rs
@@ -47,6 +47,14 @@
     /// removed the mutations.  This is called automatically when Transaction is dropped, which is
     /// why this isn't async.
     fn drop_transaction(&self, transaction: &mut Transaction<'_>);
+
+    /// Acquires a read lock for the given keys.  Read locks are only blocked whilst a transaction
+    /// is being committed for the same locks.  They are only necessary where consistency is
+    /// required between different mutations within a transaction.  For example, a write might
+    /// change the size and extents for an object, in which case a read lock is required so that
+    /// observed size and extents are seen together or not at all.  Implementations should call
+    /// through to LockManager's read_lock implementation.
+    async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a>;
 }
 
 /// The journal consists of these records which will be replayed at mount time.  Within a a
@@ -306,7 +314,20 @@
 
 struct Locks {
     sequence: u64,
-    keys: HashMap<LockKey, (u64, Vec<Waker>)>,
+    keys: HashMap<LockKey, LockEntry>,
+}
+
+struct LockEntry {
+    sequence: u64,
+    read_count: u64,
+    state: LockState,
+    wakers: Vec<Waker>,
+}
+
+enum LockState {
+    Unlocked,
+    Locked,
+    Committing(Waker),
 }
 
 impl LockManager {
@@ -328,19 +349,29 @@
                 match keys.entry(lock.clone()) {
                     Entry::Vacant(vacant) => {
                         *sequence += 1;
-                        vacant.insert((*sequence, Vec::new()));
+                        vacant.insert(LockEntry {
+                            sequence: *sequence,
+                            read_count: 0,
+                            state: LockState::Locked,
+                            wakers: Vec::new(),
+                        });
                         Poll::Ready(())
                     }
                     Entry::Occupied(mut occupied) => {
-                        let (sequence, wakers) = occupied.get_mut();
-                        if *sequence == waker_sequence {
-                            wakers[waker_index] = cx.waker().clone();
+                        let entry = occupied.get_mut();
+                        if let LockState::Unlocked = entry.state {
+                            entry.state = LockState::Locked;
+                            Poll::Ready(())
                         } else {
-                            waker_index = wakers.len();
-                            waker_sequence = *sequence;
-                            wakers.push(cx.waker().clone());
+                            if entry.sequence == waker_sequence {
+                                entry.wakers[waker_index] = cx.waker().clone();
+                            } else {
+                                waker_index = entry.wakers.len();
+                                waker_sequence = *sequence;
+                                entry.wakers.push(cx.waker().clone());
+                            }
+                            Poll::Pending
                         }
-                        Poll::Pending
                     }
                 }
             })
@@ -352,12 +383,101 @@
     pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
         let mut locks = self.locks.lock().unwrap();
         for lock in transaction.lock_keys.drain(..) {
-            let (_, (_, wakers)) = locks.keys.remove_entry(&lock).unwrap();
-            for waker in wakers {
+            let (_, entry) = locks.keys.remove_entry(&lock).unwrap();
+            for waker in entry.wakers {
                 waker.wake();
             }
         }
     }
+
+    /// Prepares to commit by waiting for readers to finish.
+    pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
+        for lock in &transaction.lock_keys {
+            poll_fn(|cx| {
+                let mut locks = self.locks.lock().unwrap();
+                let entry = locks.keys.get_mut(&lock).expect("key missing!");
+                entry.state = LockState::Committing(cx.waker().clone());
+                if entry.read_count > 0 {
+                    Poll::Pending
+                } else {
+                    Poll::Ready(())
+                }
+            })
+            .await;
+        }
+    }
+
+    pub async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a> {
+        let mut lock_keys: Vec<_> = lock_keys.iter().cloned().collect();
+        lock_keys.sort_unstable();
+        for lock in &lock_keys {
+            let mut waker_sequence = 0;
+            let mut waker_index = 0;
+            poll_fn(|cx| {
+                let mut locks = self.locks.lock().unwrap();
+                let Locks { sequence, keys } = &mut *locks;
+                match keys.entry(lock.clone()) {
+                    Entry::Vacant(vacant) => {
+                        *sequence += 1;
+                        vacant.insert(LockEntry {
+                            sequence: *sequence,
+                            read_count: 1,
+                            state: LockState::Unlocked,
+                            wakers: Vec::new(),
+                        });
+                        Poll::Ready(())
+                    }
+                    Entry::Occupied(mut occupied) => {
+                        let entry = occupied.get_mut();
+                        if let LockState::Committing(_) = entry.state {
+                            if entry.sequence == waker_sequence {
+                                entry.wakers[waker_index] = cx.waker().clone();
+                            } else {
+                                waker_index = entry.wakers.len();
+                                waker_sequence = *sequence;
+                                entry.wakers.push(cx.waker().clone());
+                            }
+                            Poll::Pending
+                        } else {
+                            entry.read_count += 1;
+                            Poll::Ready(())
+                        }
+                    }
+                }
+            })
+            .await;
+        }
+        ReadGuard { manager: self, lock_keys }
+    }
+}
+
+#[must_use]
+pub struct ReadGuard<'a> {
+    manager: &'a LockManager,
+    lock_keys: Vec<LockKey>,
+}
+
+impl Drop for ReadGuard<'_> {
+    fn drop(&mut self) {
+        let mut locks = self.manager.locks.lock().unwrap();
+        for lock in std::mem::take(&mut self.lock_keys) {
+            if let Entry::Occupied(mut occupied) = locks.keys.entry(lock) {
+                let entry = occupied.get_mut();
+                entry.read_count -= 1;
+                if entry.read_count == 0 {
+                    match entry.state {
+                        LockState::Unlocked => {
+                            occupied.remove_entry();
+                        }
+                        LockState::Locked => {}
+                        LockState::Committing(ref waker) => waker.wake_by_ref(),
+                    }
+                }
+            } else {
+                unreachable!(); // The entry *must* be in the HashMap.
+            }
+        }
+    }
 }
 
 #[cfg(test)]
@@ -426,4 +546,70 @@
             }
         );
     }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_read_lock_after_write_lock() {
+        let device = Arc::new(FakeDevice::new(1024, 1024));
+        let fs = FakeFilesystem::new(device);
+        let (send1, recv1) = channel();
+        let (send2, recv2) = channel();
+        let done = Mutex::new(false);
+        join!(
+            async {
+                let t = fs
+                    .clone()
+                    .new_transaction(&[LockKey::object_attribute(1, 2, 3)])
+                    .await
+                    .expect("new_transaction failed");
+                send1.send(()).unwrap(); // Tell the next future to continue.
+                recv2.await.unwrap();
+                t.commit().await;
+                *done.lock().unwrap() = true;
+            },
+            async {
+                recv1.await.unwrap();
+                // Reads should not be blocked until the transaction is committed.
+                let _guard = fs.read_lock(&[LockKey::object_attribute(1, 2, 3)]).await;
+                // Tell the first future to continue.
+                send2.send(()).unwrap();
+                // It shouldn't proceed until we release our read lock, but it's a halting
+                // problem, so sleep.
+                fasync::Timer::new(Duration::from_millis(100)).await;
+                assert!(!*done.lock().unwrap());
+            },
+        );
+    }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_write_lock_after_read_lock() {
+        let device = Arc::new(FakeDevice::new(1024, 1024));
+        let fs = FakeFilesystem::new(device);
+        let (send1, recv1) = channel();
+        let (send2, recv2) = channel();
+        let done = Mutex::new(false);
+        join!(
+            async {
+                // Reads should not be blocked until the transaction is committed.
+                let _guard = fs.read_lock(&[LockKey::object_attribute(1, 2, 3)]).await;
+                // Tell the next future to continue and then nwait.
+                send1.send(()).unwrap();
+                recv2.await.unwrap();
+                // It shouldn't proceed until we release our read lock, but it's a halting
+                // problem, so sleep.
+                fasync::Timer::new(Duration::from_millis(100)).await;
+                assert!(!*done.lock().unwrap());
+            },
+            async {
+                recv1.await.unwrap();
+                let t = fs
+                    .clone()
+                    .new_transaction(&[LockKey::object_attribute(1, 2, 3)])
+                    .await
+                    .expect("new_transaction failed");
+                send2.send(()).unwrap(); // Tell the first future to continue;
+                t.commit().await;
+                *done.lock().unwrap() = true;
+            },
+        );
+    }
 }
diff --git a/src/storage/fxfs/src/server/directory.rs b/src/storage/fxfs/src/server/directory.rs
index 38fd936..e367d99 100644
--- a/src/storage/fxfs/src/server/directory.rs
+++ b/src/storage/fxfs/src/server/directory.rs
@@ -14,6 +14,7 @@
     },
     anyhow::{bail, Error},
     async_trait::async_trait,
+    either::{Left, Right},
     fidl::endpoints::ServerEnd,
     fidl_fuchsia_io::{
         self as fio, NodeAttributes, NodeMarker, MODE_TYPE_DIRECTORY, OPEN_FLAG_CREATE,
@@ -58,6 +59,7 @@
         // writers, but the readers don't have any locks, so there needs to be checks that nodes
         // still exist at some point.
         let mut current_node = FxNode::Dir(self.clone());
+        let fs = self.volume.store().filesystem();
         while !path.is_empty() {
             let last_segment = path.is_single_component();
             let current_dir = match current_node {
@@ -68,10 +70,9 @@
             // Create the transaction here if we might need to create the object so that we have a
             // lock in place.
             let store = self.volume.store();
-            let transaction = if last_segment && flags & OPEN_FLAG_CREATE != 0 {
-                Some(
-                    store
-                        .filesystem()
+            let transaction_or_guard = if last_segment && flags & OPEN_FLAG_CREATE != 0 {
+                Left(
+                    fs.clone()
                         .new_transaction(&[LockKey::object(
                             store.store_object_id(),
                             current_dir.directory.object_id(),
@@ -79,7 +80,21 @@
                         .await?,
                 )
             } else {
-                None
+                // When child objects are created, the object is created along with the directory
+                // entry in the same transaction, and so we need to hold a read lock over the lookup
+                // and open calls.
+
+                // TODO(csuter): I think that this cannot be tested easily at the moment because it
+                // would only result in open_or_load_node returning NotFound, which is what we would
+                // want to return anyway.  When we add unlink support, we might be able to use this
+                // same lock to guard against the race mentioned above.
+                Right(
+                    fs.read_lock(&[LockKey::object(
+                        store.store_object_id(),
+                        current_dir.directory.object_id(),
+                    )])
+                    .await,
+                )
             };
             match current_dir.directory.lookup(name).await {
                 Ok((object_id, object_descriptor)) => {
@@ -93,10 +108,10 @@
                         self.volume.open_or_load_node(object_id, object_descriptor).await?;
                 }
                 Err(e) if FxfsError::NotFound.matches(&e) => {
-                    if let Some(transaction) = transaction {
+                    if let Left(transaction) = transaction_or_guard {
                         return self.create_child(transaction, &current_dir, name, mode).await;
                     } else {
-                        bail!(FxfsError::NotFound);
+                        return Err(e);
                     }
                 }
                 Err(e) => return Err(e),
diff --git a/src/storage/fxfs/src/testing/fake_object.rs b/src/storage/fxfs/src/testing/fake_object.rs
index d5935ba..5a5bb44 100644
--- a/src/storage/fxfs/src/testing/fake_object.rs
+++ b/src/storage/fxfs/src/testing/fake_object.rs
@@ -9,7 +9,9 @@
             buffer_allocator::{BufferAllocator, MemBufferSource},
         },
         object_handle::ObjectHandle,
-        object_store::transaction::{LockKey, LockManager, Transaction, TransactionHandler},
+        object_store::transaction::{
+            LockKey, LockManager, ReadGuard, Transaction, TransactionHandler,
+        },
     },
     anyhow::Error,
     async_trait::async_trait,
@@ -66,11 +68,17 @@
         Ok(Transaction::new(self, locks))
     }
 
-    async fn commit_transaction(&self, _transaction: Transaction<'_>) {}
+    async fn commit_transaction(&self, mut transaction: Transaction<'_>) {
+        std::mem::take(&mut transaction.mutations);
+    }
 
     fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
         self.lock_manager.drop_transaction(transaction);
     }
+
+    async fn read_lock<'a>(&'a self, lock_keys: &[LockKey]) -> ReadGuard<'a> {
+        self.lock_manager.read_lock(lock_keys).await
+    }
 }
 
 impl AsRef<LockManager> for FakeObject {