[fxfs] Rework metadata space reservations

This includes:

 1. Switch to having just a single reservation for all metadata.

 2. Metadata space either needs to be reserved up-front or it can be
    borrowed.  When it is borrowed, we have to keep track of it so that
    upon remount, we can set up reservations correctly.

 3. Separate out the overwrite mode so that it has its own method that
    doesn't take a transaction.  This avoids locking issues (which
    allows us to just use Transaction::commit() in the journal) and
    makes clear that overwrite involves no metadata changes.

 4. Rename TreeSeal and TreeCompact to be BeginFlush and EndFlush as that
    now more correctly indicates what is happening.

 5. Tracking the dependencies on the journal needs to be done whilst
    under transaction locks and we can use BeginFlush and EndFlush for
    that, which obviates the need for ObjectSync, so it is gone.

Change-Id: I84ef7981e1055c1893e0b0fefc579c7f86123115
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/540065
Reviewed-by: James Sullivan <jfsulliv@google.com>
Commit-Queue: Chris Suter <csuter@google.com>
diff --git a/src/storage/fxfs/src/object_handle.rs b/src/storage/fxfs/src/object_handle.rs
index b7ea1dd..ca2b264 100644
--- a/src/storage/fxfs/src/object_handle.rs
+++ b/src/storage/fxfs/src/object_handle.rs
@@ -59,6 +59,10 @@
         buf: BufferRef<'_>,
     ) -> Result<(), Error>;
 
+    /// Writes |buf| to the device at |offset|.  The ranges involved *must* already be allocated and
+    /// the buffer must be aligned whole blocks.
+    async fn overwrite(&self, offset: u64, buf: BufferRef<'_>) -> Result<(), Error>;
+
     // Returns the size of the object.
     fn get_size(&self) -> u64;
 
diff --git a/src/storage/fxfs/src/object_store.rs b/src/storage/fxfs/src/object_store.rs
index 9f790b6..3506b14 100644
--- a/src/storage/fxfs/src/object_store.rs
+++ b/src/storage/fxfs/src/object_store.rs
@@ -30,7 +30,6 @@
         object_store::{
             filesystem::{Filesystem, Mutations},
             journal::{checksum_list::ChecksumList, fletcher64},
-            object_manager::ObjectFlush,
             record::{
                 Checksums, ExtentKey, ExtentValue, ObjectAttributes, ObjectItem, ObjectKey,
                 ObjectKind, ObjectValue, DEFAULT_DATA_ATTRIBUTE_ID,
@@ -97,9 +96,6 @@
 
 #[derive(Default)]
 pub struct HandleOptions {
-    /// If true, don't COW, write to blocks that are already allocated.
-    pub overwrite: bool,
-
     /// If true, transactions used by this handle will skip journal space checks.
     pub skip_journal_checks: bool,
 }
@@ -520,14 +516,17 @@
                 store_info.as_ref().unwrap().layers.clone()
             };
             let mut handles = Vec::new();
+            let mut total_size = 0;
             for object_id in layer_object_ids {
-                handles.push(
+                let handle =
                     ObjectStore::open_object(&parent_store, object_id, HandleOptions::default())
-                        .await?,
-                );
+                        .await?;
+                total_size += handle.get_size();
+                handles.push(handle);
             }
             self.tree.append_layers(handles.into()).await?;
             let _ = self.store_info_handle.set(handle);
+            self.filesystem().object_manager().update_reservation(self.store_object_id, total_size);
             Ok(())
         }
         .boxed()
@@ -648,10 +647,20 @@
             Mutation::ObjectStoreInfo(StoreInfoMutation(store_info)) => {
                 *self.store_info.lock().unwrap() = Some(store_info);
             }
-            Mutation::TreeSeal => self.tree.seal().await,
-            Mutation::TreeCompact => {
+            Mutation::BeginFlush => self.tree.seal().await,
+            Mutation::EndFlush => {
                 if transaction.is_none() {
                     self.tree.reset_immutable_layers();
+                } else {
+                    let layers = self.tree.immutable_layer_set();
+                    self.filesystem().object_manager().update_reservation(
+                        self.store_object_id,
+                        layers
+                            .layers
+                            .iter()
+                            .map(|l| l.handle().map(|h| h.get_size()).unwrap_or(0))
+                            .sum(),
+                    );
                 }
             }
             _ => panic!("unexpected mutation: {:?}", mutation), // TODO(csuter): can't panic
@@ -663,15 +672,15 @@
     /// Push all in-memory structures to the device. This is not necessary for sync since the
     /// journal will take care of it.  This is supposed to be called when there is either memory or
     /// space pressure (flushing the store will persist in-memory data and allow the journal file to
-    /// be trimmed).
+    /// be trimmed).  This is not thread-safe insofar as calling flush from multiple threads at the
+    /// same time is not safe.
     async fn flush(&self) -> Result<(), Error> {
         trace_duration!("ObjectStore::flush", "store_object_id" => self.store_object_id);
         if self.parent_store.is_none() {
             return Ok(());
         }
         self.ensure_open().await?;
-        // TODO(csuter): This whole process needs to be within a transaction, or otherwise safe in
-        // the event of power loss.
+
         let filesystem = self.filesystem();
         let object_manager = filesystem.object_manager();
         if !object_manager.needs_flush(self.store_object_id) {
@@ -681,10 +690,10 @@
         let parent_store = self.parent_store.as_ref().unwrap();
         let graveyard = object_manager.graveyard().ok_or(anyhow!("Missing graveyard!"))?;
 
-        let object_sync = ObjectFlush::new(object_manager, self.store_object_id);
-        let reservation = filesystem.flush_reservation();
+        let reservation = object_manager.metadata_reservation();
         let txn_options = Options {
             skip_journal_checks: true,
+            borrow_metadata_space: true,
             allocator_reservation: Some(reservation),
             ..Default::default()
         };
@@ -697,11 +706,7 @@
         .await?;
         let object_id = object_handle.object_id();
         graveyard.add(&mut transaction, parent_store.store_object_id(), object_id);
-        transaction.add_with_object(
-            self.store_object_id(),
-            Mutation::TreeSeal,
-            AssocObj::Borrowed(&object_sync),
-        );
+        transaction.add(self.store_object_id(), Mutation::BeginFlush);
         transaction.commit().await;
 
         // This size can't be too big because we've been called to flush in-memory data in order to
@@ -769,15 +774,12 @@
             .unwrap()
             .txn_write(&mut transaction, 0u64, buf.as_ref())
             .await?;
-        transaction.add(self.store_object_id(), Mutation::TreeCompact);
+        transaction.add(self.store_object_id(), Mutation::EndFlush);
         graveyard.remove(&mut transaction, parent_store.store_object_id(), object_id);
-        // TODO(csuter): This isn't thread-safe.
         *self.store_info.lock().unwrap() = Some(new_store_info);
-        transaction.commit().await;
 
         self.tree.set_layers(layers);
-
-        object_sync.commit();
+        transaction.commit().await;
 
         // Now close the layers and purge them.
         for layer in layer_set.layers {
@@ -897,114 +899,6 @@
         self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
     }
 
-    async fn write_cow<'a>(
-        &'a self,
-        transaction: &mut Transaction<'a>,
-        mut offset: u64,
-        buf: BufferRef<'_>,
-    ) -> Result<(), Error> {
-        let aligned = round_down(offset, self.block_size)
-            ..round_up(offset + buf.len() as u64, self.block_size).ok_or(FxfsError::TooBig)?;
-        let mut buf_offset = 0;
-        let store = self.store();
-        let store_id = store.store_object_id;
-        if offset + buf.len() as u64 > self.txn_get_size(transaction) {
-            transaction.add_with_object(
-                store_id,
-                Mutation::replace_or_insert_object(
-                    ObjectKey::attribute(self.object_id, self.attribute_id),
-                    ObjectValue::attribute(offset + buf.len() as u64),
-                ),
-                AssocObj::Borrowed(self),
-            );
-        }
-        let mut allocated = 0;
-        let allocator = store.allocator();
-        let trace = self.trace.load(atomic::Ordering::Relaxed);
-        let futures = FuturesUnordered::new();
-        let mut aligned_offset = aligned.start;
-        while buf_offset < buf.len() {
-            let device_range = allocator
-                .allocate(transaction, aligned.end - aligned_offset)
-                .await
-                .context("allocation failed")?;
-            if trace {
-                log::info!("{}.{} A {:?}", store_id, self.object_id, device_range);
-            }
-            allocated += device_range.end - device_range.start;
-            let end = aligned_offset + device_range.end - device_range.start;
-            let len = min(buf.len() - buf_offset, (end - offset) as usize);
-            assert!(len > 0);
-            futures.push(async move {
-                let checksum = self
-                    .write_at(
-                        offset,
-                        buf.subslice(buf_offset..buf_offset + len),
-                        device_range.start + offset % self.block_size,
-                        true,
-                    )
-                    .await?;
-                Ok(Mutation::merge_object(
-                    ObjectKey::extent(self.object_id, self.attribute_id, aligned_offset..end),
-                    ObjectValue::extent_with_checksum(device_range.start, checksum),
-                ))
-            });
-            aligned_offset = end;
-            buf_offset += len;
-            offset += len as u64;
-        }
-        let (mutations, _): (Vec<_>, _) = try_join!(futures.try_collect(), async {
-            let deallocated = self.deallocate_old_extents(transaction, aligned.clone()).await?;
-            self.update_allocated_size(transaction, allocated, deallocated).await
-        })?;
-        for m in mutations {
-            transaction.add(store_id, m);
-        }
-        Ok(())
-    }
-
-    // All the extents for the range must have been preallocated using preallocate_range or from
-    // existing writes.
-    async fn overwrite(&self, mut offset: u64, buf: BufferRef<'_>) -> Result<(), Error> {
-        let tree = &self.store().tree;
-        let layer_set = tree.layer_set();
-        let mut merger = layer_set.merger();
-        let end = offset + buf.len() as u64;
-        let mut iter = merger
-            .seek(Bound::Included(
-                &ObjectKey::extent(self.object_id, self.attribute_id, offset..end).search_key(),
-            ))
-            .await?;
-        let mut pos = 0;
-        loop {
-            let (device_offset, to_do) = match iter.get().and_then(Into::into) {
-                Some((
-                    object_id,
-                    attribute_id,
-                    ExtentKey { range },
-                    ExtentValue { device_offset: Some((device_offset, _)) },
-                )) if object_id == self.object_id
-                    && attribute_id == self.attribute_id
-                    && range.start <= offset =>
-                {
-                    (
-                        device_offset + (offset - range.start),
-                        min(buf.len() - pos, (range.end - offset) as usize),
-                    )
-                }
-                _ => bail!("offset {} not allocated", offset),
-            };
-            self.write_at(offset, buf.subslice(pos..pos + to_do), device_offset, false).await?;
-            pos += to_do;
-            if pos == buf.len() {
-                break;
-            }
-            offset += to_do as u64;
-            iter.advance().await?;
-        }
-        Ok(())
-    }
-
     async fn write_at(
         &self,
         offset: u64,
@@ -1190,6 +1084,8 @@
             .unwrap_or_else(|| self.get_size())
     }
 
+    // TODO(csuter): make this used
+    #[cfg(test)]
     async fn get_allocated_size(&self) -> Result<u64, Error> {
         self.store().ensure_open().await?;
         if let ObjectItem {
@@ -1387,25 +1283,118 @@
         Ok(to_do)
     }
 
+    // This function has some alignment requirements: any whole blocks that are to be written must
+    // be aligned; writes that only touch the head and tail blocks are fine.
     async fn txn_write<'a>(
         &'a self,
         transaction: &mut Transaction<'a>,
-        offset: u64,
+        mut offset: u64,
         buf: BufferRef<'_>,
     ) -> Result<(), Error> {
         if buf.is_empty() {
             return Ok(());
         }
-        if offset % self.block_size() as u64 != buf.range().start as u64 % self.block_size() as u64
-        {
-            panic!("Unaligned write off: {} buf.range: {:?}", offset, buf.range());
-        }
         self.apply_pending_properties(transaction).await?;
-        if self.options.overwrite {
-            self.overwrite(offset, buf).await
-        } else {
-            self.write_cow(transaction, offset, buf).await
+        let aligned = round_down(offset, self.block_size)
+            ..round_up(offset + buf.len() as u64, self.block_size).ok_or(FxfsError::TooBig)?;
+        let mut buf_offset = 0;
+        let store = self.store();
+        let store_id = store.store_object_id;
+        if offset + buf.len() as u64 > self.txn_get_size(transaction) {
+            transaction.add_with_object(
+                store_id,
+                Mutation::replace_or_insert_object(
+                    ObjectKey::attribute(self.object_id, self.attribute_id),
+                    ObjectValue::attribute(offset + buf.len() as u64),
+                ),
+                AssocObj::Borrowed(self),
+            );
         }
+        let mut allocated = 0;
+        let allocator = store.allocator();
+        let trace = self.trace.load(atomic::Ordering::Relaxed);
+        let futures = FuturesUnordered::new();
+        let mut aligned_offset = aligned.start;
+        while buf_offset < buf.len() {
+            let device_range = allocator
+                .allocate(transaction, aligned.end - aligned_offset)
+                .await
+                .context("allocation failed")?;
+            if trace {
+                log::info!("{}.{} A {:?}", store_id, self.object_id, device_range);
+            }
+            allocated += device_range.end - device_range.start;
+            let end = aligned_offset + device_range.end - device_range.start;
+            let len = min(buf.len() - buf_offset, (end - offset) as usize);
+            assert!(len > 0);
+            futures.push(async move {
+                let checksum = self
+                    .write_at(
+                        offset,
+                        buf.subslice(buf_offset..buf_offset + len),
+                        device_range.start + offset % self.block_size,
+                        true,
+                    )
+                    .await?;
+                Ok(Mutation::merge_object(
+                    ObjectKey::extent(self.object_id, self.attribute_id, aligned_offset..end),
+                    ObjectValue::extent_with_checksum(device_range.start, checksum),
+                ))
+            });
+            aligned_offset = end;
+            buf_offset += len;
+            offset += len as u64;
+        }
+        let (mutations, _): (Vec<_>, _) = try_join!(futures.try_collect(), async {
+            let deallocated = self.deallocate_old_extents(transaction, aligned.clone()).await?;
+            self.update_allocated_size(transaction, allocated, deallocated).await
+        })?;
+        for m in mutations {
+            transaction.add(store_id, m);
+        }
+        Ok(())
+    }
+
+    // All the extents for the range must have been preallocated using preallocate_range or from
+    // existing writes.
+    async fn overwrite(&self, mut offset: u64, buf: BufferRef<'_>) -> Result<(), Error> {
+        let tree = &self.store().tree;
+        let layer_set = tree.layer_set();
+        let mut merger = layer_set.merger();
+        let end = offset + buf.len() as u64;
+        let mut iter = merger
+            .seek(Bound::Included(
+                &ObjectKey::extent(self.object_id, self.attribute_id, offset..end).search_key(),
+            ))
+            .await?;
+        let mut pos = 0;
+        loop {
+            let (device_offset, to_do) = match iter.get().and_then(Into::into) {
+                Some((
+                    object_id,
+                    attribute_id,
+                    ExtentKey { range },
+                    ExtentValue { device_offset: Some((device_offset, Checksums::None)) },
+                )) if object_id == self.object_id
+                    && attribute_id == self.attribute_id
+                    && range.start <= offset =>
+                {
+                    (
+                        device_offset + (offset - range.start),
+                        min(buf.len() - pos, (range.end - offset) as usize),
+                    )
+                }
+                _ => bail!("offset {} not allocated/has checksums", offset),
+            };
+            self.write_at(offset, buf.subslice(pos..pos + to_do), device_offset, false).await?;
+            pos += to_do;
+            if pos == buf.len() {
+                break;
+            }
+            offset += to_do as u64;
+            iter.advance().await?;
+        }
+        Ok(())
     }
 
     fn get_size(&self) -> u64 {
@@ -1436,7 +1425,7 @@
                 // Is there a better way?
                 let mut buf = self.store().device.allocate_buffer(to_zero as usize);
                 buf.as_mut_slice().fill(0);
-                self.write_cow(transaction, size, buf.as_ref()).await?;
+                self.txn_write(transaction, size, buf.as_ref()).await?;
             }
         }
         transaction.add_with_object(
@@ -1873,13 +1862,10 @@
         assert_eq!(allocated_after - allocated_before, 1048576 - TEST_DEVICE_BLOCK_SIZE as u64);
 
         // Reopen the object in overwrite mode.
-        let object = ObjectStore::open_object(
-            &object.owner,
-            object.object_id(),
-            HandleOptions { overwrite: true, ..Default::default() },
-        )
-        .await
-        .expect("open_object failed");
+        let object =
+            ObjectStore::open_object(&object.owner, object.object_id(), HandleOptions::default())
+                .await
+                .expect("open_object failed");
         let mut buf = object.allocate_buffer(2048);
         buf.as_mut_slice().fill(47);
         object.write(0, buf.subslice(..TEST_DATA_OFFSET as usize)).await.expect("write failed");
@@ -1939,17 +1925,14 @@
     async fn test_overwrite_fails_if_not_preallocated() {
         let (fs, object) = test_filesystem_and_object().await;
 
-        let object = ObjectStore::open_object(
-            &object.owner,
-            object.object_id(),
-            HandleOptions { overwrite: true, ..Default::default() },
-        )
-        .await
-        .expect("open_object failed");
+        let object =
+            ObjectStore::open_object(&object.owner, object.object_id(), HandleOptions::default())
+                .await
+                .expect("open_object failed");
         let mut buf = object.allocate_buffer(2048);
         buf.as_mut_slice().fill(95);
         let offset = round_up(TEST_OBJECT_SIZE, TEST_DEVICE_BLOCK_SIZE).unwrap();
-        object.write(offset, buf.as_ref()).await.expect_err("write suceceded");
+        object.overwrite(offset, buf.as_ref()).await.expect_err("write succeeded");
         fs.close().await.expect("Close failed");
     }
 
@@ -1963,13 +1946,9 @@
             .await
             .expect("new_transaction failed");
         let store = fs.root_store();
-        handle = ObjectStore::create_object(
-            &store,
-            &mut transaction,
-            HandleOptions { overwrite: true, ..Default::default() },
-        )
-        .await
-        .expect("create_object failed");
+        handle = ObjectStore::create_object(&store, &mut transaction, HandleOptions::default())
+            .await
+            .expect("create_object failed");
         handle
             .extend(&mut transaction, 0..5 * TEST_DEVICE_BLOCK_SIZE as u64)
             .await
@@ -2050,7 +2029,10 @@
         assert_eq!(allocator.get_allocated_bytes(), allocated_before);
 
         store
-            .tombstone(object.object_id, Options { skip_space_checks: true, ..Default::default() })
+            .tombstone(
+                object.object_id,
+                Options { borrow_metadata_space: true, ..Default::default() },
+            )
             .await
             .expect("purge failed");
 
diff --git a/src/storage/fxfs/src/object_store/allocator.rs b/src/storage/fxfs/src/object_store/allocator.rs
index 8a2a4bb..8c7de35 100644
--- a/src/storage/fxfs/src/object_store/allocator.rs
+++ b/src/storage/fxfs/src/object_store/allocator.rs
@@ -20,7 +20,6 @@
         object_store::{
             filesystem::{Filesystem, Mutations},
             journal::checksum_list::ChecksumList,
-            object_manager::ObjectFlush,
             transaction::{AllocatorMutation, AssocObj, Mutation, Options, Transaction},
             HandleOptions, ObjectStore,
         },
@@ -95,7 +94,7 @@
     fn reserve_at_most(self: Arc<Self>, amount: u64) -> Reservation;
 
     /// Releases the reservation.
-    fn release_reservation(&self, reservation: &mut Reservation);
+    fn release_reservation(&self, amount: u64);
 
     /// Returns the number of allocated bytes.
     fn get_allocated_bytes(&self) -> u64;
@@ -111,52 +110,90 @@
     ) -> Result<bool, Error>;
 }
 
+/// A reservation guarantees that when it comes time to actually allocate, it will not fail due to
+/// lack of space.  A hold can be placed on some of the reservation, which can later be committed.
 pub struct Reservation {
     allocator: Arc<dyn Allocator>,
-    amount: Mutex<u64>,
+    inner: Mutex<ReservationInner>,
+}
+
+#[derive(Debug, Default)]
+struct ReservationInner {
+    // Amount currently held by this reservation.
+    amount: u64,
+
+    // The amount within this reservation that is held for some purpose.
+    held: u64,
 }
 
 impl std::fmt::Debug for Reservation {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("Reservation").field("amount", &*self.amount.lock().unwrap()).finish()
+        self.inner.lock().unwrap().fmt(f)
     }
 }
 
 impl Reservation {
     pub fn new(allocator: Arc<dyn Allocator>, amount: u64) -> Self {
-        Self { allocator, amount: Mutex::new(amount) }
+        Self { allocator, inner: Mutex::new(ReservationInner { amount, held: 0 }) }
     }
 
+    /// Returns the total amount of the reservation, not accounting for anything that might be held.
     pub fn amount(&self) -> u64 {
-        *self.amount.lock().unwrap()
+        self.inner.lock().unwrap().amount
     }
 
+    /// Returns the amount available after accounting for space that is held.
+    pub fn avail(&self) -> u64 {
+        let inner = self.inner.lock().unwrap();
+        inner.amount - inner.held
+    }
+
+    /// Adds more to the reservation.
     pub fn add(&self, amount: u64) {
-        *self.amount.lock().unwrap() += amount;
+        self.inner.lock().unwrap().amount += amount;
     }
 
-    pub fn sub(&self, delta: u64) -> Result<(), Error> {
-        let mut amount = self.amount.lock().unwrap();
-        *amount = amount.checked_sub(delta).ok_or(FxfsError::NoSpace)?;
+    /// Places a hold an `amount` from the reservation.
+    pub fn hold(&self, amount: u64) -> Result<(), Error> {
+        let mut inner = self.inner.lock().unwrap();
+        if amount > inner.amount - inner.held {
+            bail!(FxfsError::NoSpace);
+        }
+        inner.held += amount;
         Ok(())
     }
 
-    pub fn take(&self) -> u64 {
-        std::mem::take(&mut self.amount.lock().unwrap())
+    /// Releases some previously held amount.
+    pub fn release(&self, amount: u64) {
+        self.inner.lock().unwrap().held -= amount;
     }
 
-    pub fn try_top_up(&self, target: u64) -> bool {
-        let mut amount = self.amount.lock().unwrap();
-        if *amount < target {
-            *amount += self.allocator.clone().reserve_at_most(target - *amount).take();
-        }
-        *amount >= target
+    /// Commits a previously held amount.
+    pub fn commit(&self, amount: u64) {
+        let mut inner = self.inner.lock().unwrap();
+        inner.amount -= amount;
+        inner.held -= amount;
+    }
+
+    /// Returns the entire amount of the reservation.  The caller is responsible for maintaining
+    /// consistency, i.e. updating counters, etc.
+    pub fn take(&self) -> u64 {
+        std::mem::take(&mut *self.inner.lock().unwrap()).amount
+    }
+
+    /// Returns some of the reservation back to the allocator.  Asserts that the amount with a hold
+    /// is still valid afterwards.
+    pub fn give_back(&self, amount: u64) {
+        self.allocator.release_reservation(amount);
+        let mut inner = self.inner.lock().unwrap();
+        inner.amount -= amount;
+        assert!(inner.held <= inner.amount);
     }
 }
 
 impl Drop for Reservation {
     fn drop(&mut self) {
-        self.allocator.clone().release_reservation(self);
+        self.allocator.release_reservation(self.inner.get_mut().unwrap().amount);
     }
 }
 
@@ -325,11 +362,13 @@
                 let serialized_info = handle.contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE).await?;
                 let info: AllocatorInfo = deserialize_from(&serialized_info[..])?;
                 let mut handles = Vec::new();
+                let mut total_size = 0;
                 for object_id in &info.layers {
-                    handles.push(
+                    let handle =
                         ObjectStore::open_object(&root_store, *object_id, HandleOptions::default())
-                            .await?,
-                    );
+                            .await?;
+                    total_size += handle.get_size();
+                    handles.push(handle);
                 }
                 {
                     let mut inner = self.inner.lock().unwrap();
@@ -346,6 +385,11 @@
                     inner.info = info;
                 }
                 self.tree.append_layers(handles.into_boxed_slice()).await?;
+                self.filesystem
+                    .upgrade()
+                    .unwrap()
+                    .object_manager()
+                    .update_reservation(self.object_id, total_size);
             }
         }
 
@@ -380,7 +424,7 @@
 
         if let Some(reservation) = transaction.allocator_reservation {
             ensure!(
-                reservation.amount() >= len,
+                reservation.avail() >= len,
                 anyhow!(FxfsError::NoSpace).context("Insufficient space in reservation")
             );
         }
@@ -411,6 +455,8 @@
             let mut iter = merger.seek(Bound::Unbounded).await?;
             let mut last_offset = 0;
             loop {
+                // TODO(csuter): This is inconsistent; here we return no-space, but otherwise we
+                // return less than requested.
                 if last_offset + len >= self.device_size as u64 {
                     bail!(anyhow!(FxfsError::NoSpace).context("no space after search"));
                 }
@@ -443,7 +489,7 @@
         if let Some(reservation) = &mut transaction.allocator_reservation {
             // This shouldn't fail because we checked the reservation had enough space at the
             // beginning of allocate, after we took the lock and the lock should still be held.
-            reservation.sub(device_range.length()).unwrap();
+            reservation.hold(device_range.length()).unwrap();
         } else {
             self.inner.lock().unwrap().reserved_bytes += device_range.length();
         }
@@ -614,8 +660,8 @@
         Reservation::new(self, amount)
     }
 
-    fn release_reservation(&self, reservation: &mut Reservation) {
-        self.inner.lock().unwrap().reserved_bytes -= reservation.take();
+    fn release_reservation(&self, amount: u64) {
+        self.inner.lock().unwrap().reserved_bytes -= amount;
     }
 
     fn get_allocated_bytes(&self) -> u64 {
@@ -676,8 +722,11 @@
                     }
                     let mut inner = self.inner.lock().unwrap();
                     inner.allocated_bytes = inner.allocated_bytes.saturating_add(len as i64);
-                    if transaction.is_some() {
+                    if let Some(transaction) = transaction {
                         inner.reserved_bytes -= len;
+                        if let Some(reservation) = transaction.allocator_reservation {
+                            reservation.commit(len);
+                        }
                     }
                 } else {
                     let mut inner = self.inner.lock().unwrap();
@@ -695,11 +744,7 @@
                 let lower_bound = item.key.lower_bound_for_merge_into();
                 self.tree.merge_into(item, &lower_bound).await;
             }
-            // TODO(csuter): Since Seal and Compact are no longer being used for just trees, we
-            // should consider changing the names to something else, maybe FlushBegin and
-            // FlushCommit to match ObjectFlush, and maybe ObjectFlush::commit should be responsible
-            // for adding it to a transaction.
-            Mutation::TreeSeal => {
+            Mutation::BeginFlush => {
                 {
                     // After we seal the tree, we will start adding mutations to the new mutable
                     // layer, but we cannot safely do that whilst we are attempting to allocate
@@ -713,15 +758,25 @@
                 let mut inner = self.inner.lock().unwrap();
                 inner.info.allocated_bytes = inner.allocated_bytes as u64;
             }
-            Mutation::TreeCompact => {
+            Mutation::EndFlush => {
                 if transaction.is_none() {
                     self.tree.reset_immutable_layers();
                     // AllocatorInfo is written in the same transaction and will contain the count
-                    // at the point TreeSeal was applied, so we need to adjust allocated_bytes so
+                    // at the point BeginFlush was applied, so we need to adjust allocated_bytes so
                     // that it just covers the delta from that point.  Later, when we properly open
                     // the allocator, we'll add this back.
                     let mut inner = self.inner.lock().unwrap();
                     inner.allocated_bytes -= inner.info.allocated_bytes as i64;
+                } else {
+                    let layers = self.tree.immutable_layer_set();
+                    self.filesystem.upgrade().unwrap().object_manager().update_reservation(
+                        self.object_id,
+                        layers
+                            .layers
+                            .iter()
+                            .map(|l| l.handle().map(|h| h.get_size()).unwrap_or(0))
+                            .sum(),
+                    );
                 }
             }
             _ => panic!("unexpected mutation! {:?}", mutation), // TODO(csuter): This can't panic
@@ -734,7 +789,7 @@
                 if item.value.delta > 0 {
                     let mut inner = self.inner.lock().unwrap();
                     if let Some(reservation) = transaction.allocator_reservation {
-                        reservation.add(item.key.device_range.length());
+                        reservation.release(item.key.device_range.length());
                     } else {
                         inner.reserved_bytes -= item.key.device_range.length();
                     }
@@ -754,13 +809,13 @@
             return Ok(());
         }
         let graveyard = object_manager.graveyard().ok_or(anyhow!("Missing graveyard!"))?;
-        let object_sync = ObjectFlush::new(object_manager, self.object_id());
         // TODO(csuter): This all needs to be atomic somehow. We'll need to use different
         // transactions for each stage, but we need make sure objects are cleaned up if there's a
         // failure.
-        let reservation = filesystem.flush_reservation();
+        let reservation = object_manager.metadata_reservation();
         let txn_options = Options {
             skip_journal_checks: true,
+            borrow_metadata_space: true,
             allocator_reservation: Some(reservation),
             ..Default::default()
         };
@@ -776,16 +831,12 @@
         let object_id = layer_object_handle.object_id();
         graveyard.add(&mut transaction, root_store.store_object_id(), object_id);
         // It's important that this transaction does not include any allocations because we use
-        // TreeSeal as a snapshot point for mutations to the tree: other allocator mutations within
-        // this transaction might get applied before seal (which would be OK), but they could
+        // BeginFlush as a snapshot point for mutations to the tree: other allocator mutations
+        // within this transaction might get applied before seal (which would be OK), but they could
         // equally get applied afterwards (since Transaction makes no guarantees about the order in
         // which mutations are applied whilst committing), in which case they'd get lost on replay
         // because the journal will only send mutations that follow this transaction.
-        transaction.add_with_object(
-            self.object_id(),
-            Mutation::TreeSeal,
-            AssocObj::Borrowed(&object_sync),
-        );
+        transaction.add(self.object_id(), Mutation::BeginFlush);
         transaction.commit().await;
 
         let layer_set = self.tree.immutable_layer_set();
@@ -823,16 +874,14 @@
         buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
         object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
 
-        // It's important that TreeCompact is in the same transaction that we write AllocatorInfo,
-        // because we use TreeCompact to make the required adjustments to allocated_bytes.
-        transaction.add(self.object_id(), Mutation::TreeCompact);
+        // It's important that EndFlush is in the same transaction that we write AllocatorInfo,
+        // because we use EndFlush to make the required adjustments to allocated_bytes.
+        transaction.add(self.object_id(), Mutation::EndFlush);
         graveyard.remove(&mut transaction, root_store.store_object_id(), object_id);
-        transaction.commit().await;
 
         // TODO(csuter): what if this fails.
         self.tree.set_layers(layers_from_handles(Box::new([layer_object_handle])).await?);
-
-        object_sync.commit();
+        transaction.commit().await;
 
         // Now close the layers and purge them.
         for layer in layer_set.layers {
@@ -1027,7 +1076,7 @@
 
     #[fasync::run_singlethreaded(test)]
     async fn test_allocations() {
-        let device = DeviceHolder::new(FakeDevice::new(1024, 512));
+        let device = DeviceHolder::new(FakeDevice::new(4096, 512));
         let fs = FakeFilesystem::new(device);
         let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
         fs.object_manager().set_allocator(allocator.clone());
@@ -1058,7 +1107,7 @@
 
     #[fasync::run_singlethreaded(test)]
     async fn test_deallocations() {
-        let device = DeviceHolder::new(FakeDevice::new(1024, 512));
+        let device = DeviceHolder::new(FakeDevice::new(4096, 512));
         let fs = FakeFilesystem::new(device);
         let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
         fs.object_manager().set_allocator(allocator.clone());
@@ -1081,7 +1130,7 @@
 
     #[fasync::run_singlethreaded(test)]
     async fn test_mark_allocated() {
-        let device = DeviceHolder::new(FakeDevice::new(1024, 512));
+        let device = DeviceHolder::new(FakeDevice::new(4096, 512));
         let fs = FakeFilesystem::new(device);
         let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
         fs.object_manager().set_allocator(allocator.clone());
@@ -1106,7 +1155,7 @@
 
     #[fasync::run_singlethreaded(test)]
     async fn test_flush() {
-        let device = DeviceHolder::new(FakeDevice::new(1024, 512));
+        let device = DeviceHolder::new(FakeDevice::new(4096, 512));
         let fs = FakeFilesystem::new(device);
         let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
         fs.object_manager().set_allocator(allocator.clone());
@@ -1147,7 +1196,7 @@
 
     #[fasync::run_singlethreaded(test)]
     async fn test_dropped_transaction() {
-        let device = DeviceHolder::new(FakeDevice::new(1024, 512));
+        let device = DeviceHolder::new(FakeDevice::new(4096, 512));
         let fs = FakeFilesystem::new(device);
         let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
         fs.object_manager().set_allocator(allocator.clone());
@@ -1176,7 +1225,7 @@
 
     #[fasync::run_singlethreaded(test)]
     async fn test_allocated_bytes() {
-        const BLOCK_COUNT: u32 = 1024;
+        const BLOCK_COUNT: u32 = 4096;
         const BLOCK_SIZE: u32 = 512;
         let device = DeviceHolder::new(FakeDevice::new(BLOCK_COUNT.into(), BLOCK_SIZE));
         let fs = FakeFilesystem::new(device);
diff --git a/src/storage/fxfs/src/object_store/filesystem.rs b/src/storage/fxfs/src/object_store/filesystem.rs
index 7106713..e02a137 100644
--- a/src/storage/fxfs/src/object_store/filesystem.rs
+++ b/src/storage/fxfs/src/object_store/filesystem.rs
@@ -7,18 +7,18 @@
         debug_assert_not_too_long,
         errors::FxfsError,
         object_store::{
-            allocator::{Allocator, Reservation},
+            allocator::Allocator,
             journal::{super_block::SuperBlock, Journal},
             object_manager::ObjectManager,
             trace_duration,
             transaction::{
-                AssocObj, LockKey, LockManager, Mutation, Options, ReadGuard, Transaction,
-                TransactionHandler, WriteGuard,
+                AssocObj, LockKey, LockManager, MetadataReservation, Mutation, Options, ReadGuard,
+                Transaction, TransactionHandler, WriteGuard,
             },
             ObjectStore,
         },
     },
-    anyhow::{bail, Error},
+    anyhow::Error,
     async_trait::async_trait,
     fuchsia_async as fasync,
     futures::channel::oneshot::{channel, Sender},
@@ -30,8 +30,6 @@
     storage_device::{Device, DeviceHolder},
 };
 
-const FLUSH_RESERVATION_SIZE: u64 = 524288;
-
 #[async_trait]
 pub trait Filesystem: TransactionHandler {
     /// Returns access to the undeyling device.
@@ -48,9 +46,6 @@
 
     /// Flushes buffered data to the underlying device.
     async fn sync(&self, options: SyncOptions) -> Result<(), Error>;
-
-    /// Returns a reservation to be used for flushing in-memory data.
-    fn flush_reservation(&self) -> &Reservation;
 }
 
 #[async_trait]
@@ -125,7 +120,6 @@
     lock_manager: LockManager,
     compaction_task: Mutex<Option<fasync::Task<()>>>,
     device_sender: OnceCell<Sender<DeviceHolder>>,
-    flush_reservation: OnceCell<Reservation>,
     closed: AtomicBool,
     read_only: bool,
 }
@@ -147,13 +141,11 @@
             lock_manager: LockManager::new(),
             compaction_task: Mutex::new(None),
             device_sender: OnceCell::new(),
-            flush_reservation: OnceCell::new(),
             closed: AtomicBool::new(false),
             read_only: false,
         });
         filesystem.device.set(device).unwrap_or_else(|_| unreachable!());
         filesystem.journal.init_empty(filesystem.clone()).await?;
-        let _ = filesystem.flush_reservation.set(filesystem.allocator().reserve(0).unwrap());
         Ok(filesystem.into())
     }
 
@@ -163,7 +155,6 @@
     ) -> Result<OpenFxFilesystem, Error> {
         let objects = Arc::new(ObjectManager::new());
         let journal = Journal::new(objects.clone());
-        journal.set_trace(options.trace);
         let filesystem = Arc::new(FxFilesystem {
             device: OnceCell::new(),
             objects,
@@ -171,13 +162,11 @@
             lock_manager: LockManager::new(),
             compaction_task: Mutex::new(None),
             device_sender: OnceCell::new(),
-            flush_reservation: OnceCell::new(),
             closed: AtomicBool::new(false),
             read_only: options.read_only,
         });
         filesystem.device.set(device).unwrap_or_else(|_| unreachable!());
         filesystem.journal.replay(filesystem.clone()).await?;
-        let _ = filesystem.flush_reservation.set(filesystem.allocator().reserve_at_most(0));
         if !options.read_only {
             if let Some(graveyard) = filesystem.objects.graveyard() {
                 // Purge the graveyard of old entries in a background task.
@@ -187,10 +176,6 @@
         Ok(filesystem.into())
     }
 
-    pub fn set_trace(&self, v: bool) {
-        self.journal.set_trace(v);
-    }
-
     pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
         Self::open_with_options(device, OpenOptions { trace: false, read_only: false }).await
     }
@@ -250,13 +235,6 @@
     pub fn super_block(&self) -> SuperBlock {
         self.journal.super_block()
     }
-
-    // Returns the reservation, and a bool where true means the reservation is at its target size
-    // and false means it's not (i.e. we are in a low space condition).
-    fn update_flush_reservation(&self) -> (&Reservation, bool) {
-        let flush_reservation = self.flush_reservation.get().unwrap();
-        (flush_reservation, flush_reservation.try_top_up(FLUSH_RESERVATION_SIZE))
-    }
 }
 
 impl Drop for FxFilesystem {
@@ -289,10 +267,6 @@
     async fn sync(&self, options: SyncOptions) -> Result<(), Error> {
         self.journal.sync(options).await
     }
-
-    fn flush_reservation(&self) -> &Reservation {
-        self.update_flush_reservation().0
-    }
 }
 
 #[async_trait]
@@ -307,13 +281,47 @@
             // not committed.  In theory, if there are a large number of them, it would be possible
             // to run out of journal space.  We should probably have an in-flight limit.
             self.journal.check_journal_space().await;
-            if options.allocator_reservation.is_none() {
-                if !self.update_flush_reservation().1 && !options.skip_space_checks {
-                    bail!(FxfsError::NoSpace);
+        }
+
+        // This is the amount of space that we reserve for metadata.  A transaction should not take
+        // more than this.  At time of writing, this means that a single transaction must not take
+        // any more than 16 KiB of space when written to the journal (see
+        // object_manager::reserved_space_from_journal_usage).
+        const METADATA_RESERVATION_AMOUNT: u64 = 32_768;
+
+        // We support three options for metadata space reservation:
+        //
+        //   1. We can borrow from the filesystem's metadata reservation.  This should only be
+        //      be used on the understanding that eventually, potentially after a full compaction,
+        //      there should be no net increase in space used.  For example, unlinking an object
+        //      should eventually decrease the amount of space used and setting most attributes
+        //      should not result in any change.
+        //
+        //   2. A reservation is provided in which case we'll place a hold on some of it for
+        //      metadata.
+        //
+        //   3. No reservation is supplied, so we try and reserve space with the allocator now,
+        //      and will return NoSpace if that fails.
+        let metadata_reservation = if options.borrow_metadata_space {
+            MetadataReservation::Borrowed
+        } else {
+            match options.allocator_reservation {
+                Some(reservation) => {
+                    reservation.hold(METADATA_RESERVATION_AMOUNT)?;
+                    MetadataReservation::Hold(METADATA_RESERVATION_AMOUNT)
+                }
+                None => {
+                    let reservation = self
+                        .allocator()
+                        .reserve(METADATA_RESERVATION_AMOUNT)
+                        .ok_or(FxfsError::NoSpace)?;
+                    reservation.hold(METADATA_RESERVATION_AMOUNT).unwrap();
+                    MetadataReservation::Reservation(reservation)
                 }
             }
-        }
-        let mut transaction = Transaction::new(self, &[LockKey::Filesystem], locks).await;
+        };
+        let mut transaction =
+            Transaction::new(self, metadata_reservation, &[LockKey::Filesystem], locks).await;
         transaction.allocator_reservation = options.allocator_reservation;
         Ok(transaction)
     }
@@ -332,6 +340,11 @@
     }
 
     fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
+        // If we placed a hold for metadata space, return it now.
+        if let MetadataReservation::Hold(hold_amount) = &mut transaction.metadata_reservation {
+            transaction.allocator_reservation.unwrap().release(*hold_amount);
+            *hold_amount = 0;
+        }
         self.objects.drop_transaction(transaction);
         self.lock_manager.drop_transaction(transaction);
     }
diff --git a/src/storage/fxfs/src/object_store/graveyard.rs b/src/storage/fxfs/src/object_store/graveyard.rs
index 5232cf6..adebac9 100644
--- a/src/storage/fxfs/src/object_store/graveyard.rs
+++ b/src/storage/fxfs/src/object_store/graveyard.rs
@@ -140,12 +140,16 @@
             {
                 Options {
                     skip_journal_checks: true,
-                    skip_space_checks: true,
-                    allocator_reservation: Some(fs.flush_reservation()),
+                    borrow_metadata_space: true,
+                    allocator_reservation: Some(object_manager.metadata_reservation()),
                     ..Default::default()
                 }
             } else {
-                Options { skip_journal_checks: true, skip_space_checks: true, ..Default::default() }
+                Options {
+                    skip_journal_checks: true,
+                    borrow_metadata_space: true,
+                    ..Default::default()
+                }
             };
             store.tombstone(id, options).await.context("Failed to tombstone object")?;
         }
diff --git a/src/storage/fxfs/src/object_store/journal.rs b/src/storage/fxfs/src/object_store/journal.rs
index 16ab9f1..f375c05 100644
--- a/src/storage/fxfs/src/object_store/journal.rs
+++ b/src/storage/fxfs/src/object_store/journal.rs
@@ -28,7 +28,7 @@
         lsm_tree::LSMTree,
         object_handle::ObjectHandle,
         object_store::{
-            allocator::{Allocator, Reservation, SimpleAllocator},
+            allocator::{Allocator, SimpleAllocator},
             constants::{SUPER_BLOCK_A_OBJECT_ID, SUPER_BLOCK_B_OBJECT_ID},
             directory::Directory,
             filesystem::{Filesystem, Mutations, SyncOptions},
@@ -41,7 +41,7 @@
                 writer::JournalWriter,
             },
             merge::{self},
-            object_manager::{ObjectFlush, ObjectManager},
+            object_manager::ObjectManager,
             record::{ExtentKey, ObjectKey, DEFAULT_DATA_ATTRIBUTE_ID},
             round_down,
             transaction::{
@@ -61,12 +61,8 @@
     serde::{Deserialize, Serialize},
     std::{
         clone::Clone,
-        iter::IntoIterator,
         ops::Bound,
-        sync::{
-            atomic::{self, AtomicBool},
-            Arc, Mutex,
-        },
+        sync::{Arc, Mutex},
         vec::Vec,
     },
 };
@@ -82,19 +78,15 @@
 // written to the journal.
 const RECLAIM_SIZE: u64 = 262_144;
 
+// Temporary space that should be reserved for the journal.  For example: space that is currently
+// used in the journal file but cannot be deallocated yet because we are flushing.
+pub const RESERVED_SPACE: u64 = 1_048_576;
+
 // After replaying the journal, it's possible that the stream doesn't end cleanly, in which case the
 // next journal block needs to indicate this.  This is done by pretending the previous block's
 // checksum is xored with this value, and using that as the seed for the next journal block.
 const RESET_XOR: u64 = 0xffffffffffffffff;
 
-// This size needs to be chosen carefully such that we cannot run out of journal space when we are
-// compacting.  New transactions that are unrelated to compaction are paused when its live data hits
-// RECLAIM_SIZE.  During compaction, more is written to the journal before a new super-block is
-// written.  When we write a new super-block, we only free up whatever the previous super-block
-// allows (because we only flush the device once), so this needs to be at least 2 * (RECLAIM_SIZE +
-// buffer).
-const RESERVATION_SIZE: u64 = 4 * RECLAIM_SIZE;
-
 type Checksum = u64;
 
 // To keep track of offsets within a journal file, we need both the file offset and the check-sum of
@@ -141,15 +133,7 @@
 }
 
 pub(super) fn journal_handle_options() -> HandleOptions {
-    HandleOptions { overwrite: true, skip_journal_checks: true, ..Default::default() }
-}
-
-fn clone_mutations<'a>(transaction: &Transaction<'_>) -> Vec<(u64, Mutation)> {
-    transaction
-        .mutations
-        .iter()
-        .map(|TxnMutation { object_id, mutation, .. }| (*object_id, mutation.clone()))
-        .collect()
+    HandleOptions { skip_journal_checks: true, ..Default::default() }
 }
 
 /// The journal records a stream of mutations that are to be applied to other objects.  At mount
@@ -160,20 +144,13 @@
 pub struct Journal {
     objects: Arc<ObjectManager>,
     writer: futures::lock::Mutex<JournalWriter>,
-    handle_and_reservation: OnceCell<(StoreObjectHandle<ObjectStore>, Reservation)>,
+    handle: OnceCell<StoreObjectHandle<ObjectStore>>,
     inner: Mutex<Inner>,
-    trace: AtomicBool,
 }
 
 struct Inner {
     needs_super_block: bool,
 
-    // This is a cached copy of the journal-file-offset which is held under a regular mutex rather
-    // than an async one, which allows computations to be made in non-async contexts, such as
-    // whether or not a compaction is required and whether we should be pausing new non-compaction
-    // related transactions.
-    journal_file_offset: u64,
-
     super_block: SuperBlock,
     super_block_to_write: SuperBlockCopy,
 
@@ -193,23 +170,17 @@
                 BLOCK_SIZE as usize,
                 starting_checksum,
             )),
-            handle_and_reservation: OnceCell::new(),
+            handle: OnceCell::new(),
             inner: Mutex::new(Inner {
                 needs_super_block: true,
                 super_block: SuperBlock::default(),
                 super_block_to_write: SuperBlockCopy::A,
-                journal_file_offset: 0,
                 reclaim_event: None,
                 zero_offset: None,
             }),
-            trace: AtomicBool::new(false),
         }
     }
 
-    pub fn set_trace(&self, v: bool) {
-        self.trace.store(v, atomic::Ordering::Relaxed);
-    }
-
     pub fn journal_file_offset(&self) -> u64 {
         self.inner.lock().unwrap().super_block.super_block_journal_file_offset
     }
@@ -289,6 +260,8 @@
             false,
         ));
         self.objects.set_allocator(allocator.clone());
+        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
+        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
         {
             let mut inner = self.inner.lock().unwrap();
             inner.needs_super_block = false;
@@ -330,7 +303,7 @@
         let mut reader =
             JournalReader::new(handle, self.block_size(), &super_block.journal_checkpoint);
         let mut checksum_list = ChecksumList::new();
-        let mut mutations = Vec::new();
+        let mut transactions = Vec::new();
         let mut current_transaction = None;
         let mut end_block = false;
         loop {
@@ -339,7 +312,7 @@
                 ReadResult::Reset => {
                     if current_transaction.is_some() {
                         current_transaction = None;
-                        mutations.pop();
+                        transactions.pop();
                     }
                 }
                 ReadResult::Some(record) => {
@@ -350,41 +323,44 @@
                             end_block = true;
                         }
                         JournalRecord::Mutation { object_id, mutation } => {
-                            if current_transaction.is_none() {
-                                mutations.push((current_checkpoint, Vec::new()));
-                                current_transaction = mutations.last_mut();
+                            let current_transaction = match current_transaction.as_mut() {
+                                None => {
+                                    transactions.push((current_checkpoint, Vec::new(), 0));
+                                    current_transaction = transactions.last_mut();
+                                    current_transaction.as_mut().unwrap()
+                                }
+                                Some(transaction) => transaction,
+                            };
+                            if !self
+                                .objects
+                                .validate_mutation(
+                                    current_transaction.0.file_offset,
+                                    object_id,
+                                    &mutation,
+                                    &mut checksum_list,
+                                )
+                                .await?
+                            {
+                                log::debug!("Stopping replay at bad mutation: {:?}", mutation);
+                                break;
                             }
-                            current_transaction.as_mut().unwrap().1.push((object_id, mutation));
+                            // If this mutation doesn't need to be applied, don't bother adding it
+                            // to the transaction.
+                            if self.should_apply(object_id, &current_transaction.0) {
+                                current_transaction.1.push((object_id, mutation));
+                            }
                         }
                         JournalRecord::Commit => {
-                            if let Some((checkpoint, mutations)) = current_transaction.take() {
+                            if let Some((checkpoint, mutations, ref mut end_offset)) =
+                                current_transaction.take()
+                            {
                                 for (object_id, mutation) in mutations {
-                                    if !self.should_apply(*object_id, checkpoint) {
-                                        continue;
-                                    }
-                                    if !self
-                                        .objects
-                                        .validate_mutation(
-                                            checkpoint.file_offset,
-                                            *object_id,
-                                            &mutation,
-                                            &mut checksum_list,
-                                        )
-                                        .await?
-                                    {
-                                        if self.trace.load(atomic::Ordering::Relaxed) {
-                                            log::info!(
-                                                "Stopping replay at bad mutation: {:?}",
-                                                mutation
-                                            );
-                                        }
-                                        break;
-                                    }
-
                                     // Snoop the mutations for any that might apply to the journal
                                     // file so that we can pass them to the reader so that it can
                                     // read the journal file.
-                                    if *object_id == super_block.root_parent_store_object_id {
+                                    if *object_id == super_block.root_parent_store_object_id
+                                        && self.should_apply(*object_id, &checkpoint)
+                                    {
                                         if let Mutation::ObjectStore(ObjectStoreMutation {
                                             item,
                                             ..
@@ -396,6 +372,7 @@
                                         }
                                     }
                                 }
+                                *end_offset = reader.journal_file_checkpoint().file_offset;
                             }
                         }
                         JournalRecord::Discard(offset) => {
@@ -406,11 +383,11 @@
                                 }
                             }
                             current_transaction = None;
-                            while let Some(transaction) = mutations.last() {
+                            while let Some(transaction) = transactions.last() {
                                 if transaction.0.file_offset < offset {
                                     break;
                                 }
-                                mutations.pop();
+                                transactions.pop();
                             }
                         }
                     }
@@ -420,27 +397,26 @@
             }
         }
 
+        // Discard any uncommitted transaction.
+        if current_transaction.is_some() {
+            transactions.pop();
+        }
+
         // Validate the checksums.
         let journal_offset = checksum_list
             .verify(device.as_ref(), reader.journal_file_checkpoint().file_offset)
             .await?;
 
         // Apply the mutations.
-        let mut last_checkpoint = if mutations.is_empty() {
+        let mut last_checkpoint = if transactions.is_empty() {
             super_block.journal_checkpoint.clone()
         } else {
             'outer: loop {
-                for (checkpoint, mutations) in mutations {
+                for (checkpoint, mutations, end_offset) in transactions {
                     if checkpoint.file_offset >= journal_offset {
                         break 'outer checkpoint;
                     }
-                    if self.trace.load(atomic::Ordering::Relaxed) {
-                        log::info!("REPLAY {}", checkpoint.file_offset);
-                    }
-                    for (object_id, mutation) in mutations {
-                        self.apply_mutation(object_id, &checkpoint, mutation, None, AssocObj::None)
-                            .await;
-                    }
+                    self.objects.replay_mutations(mutations, checkpoint, end_offset).await;
                 }
                 break reader.journal_file_checkpoint();
             }
@@ -462,19 +438,13 @@
                 journal_handle_options(),
             )
             .await?;
-            let current_journal_size = handle.get_allocated_size().await.unwrap();
-            let allocator_reservation = allocator
-                .reserve(RESERVATION_SIZE.saturating_sub(current_journal_size))
-                .ok_or(FxfsError::NoSpace)
-                .context("unable to reserve space for the journal")?;
-            let _ = self.handle_and_reservation.set((handle, allocator_reservation));
+            let _ = self.handle.set(handle);
             let mut writer = self.writer.lock().await;
             // If the last entry wasn't an end_block, then we need to reset the stream.
             if !end_block {
                 last_checkpoint.checksum ^= RESET_XOR;
             }
             let offset = last_checkpoint.file_offset;
-            self.inner.lock().unwrap().journal_file_offset = offset;
             writer.seek_to_checkpoint(last_checkpoint);
             if offset < reader.journal_file_checkpoint().file_offset {
                 // TODO(csuter): We need to make sure that this is tested.  If a corruption test
@@ -539,7 +509,7 @@
             &root_store,
             &mut transaction,
             SUPER_BLOCK_A_OBJECT_ID,
-            HandleOptions { overwrite: true, ..Default::default() },
+            HandleOptions::default(),
         )
         .await
         .context("create super block")?;
@@ -551,7 +521,7 @@
             &root_store,
             &mut transaction,
             SUPER_BLOCK_B_OBJECT_ID,
-            HandleOptions { overwrite: true, ..Default::default() },
+            HandleOptions::default(),
         )
         .await
         .context("create super block")?;
@@ -596,15 +566,9 @@
         }
 
         allocator.ensure_open().await?;
-        let allocator_reservation = allocator
-            .reserve(
-                RESERVATION_SIZE.saturating_sub(journal_handle.get_allocated_size().await.unwrap()),
-            )
-            .ok_or(FxfsError::NoSpace)
-            .context("unable to reserve space for the journal")?;
 
         // Initialize the journal writer.
-        let _ = self.handle_and_reservation.set((journal_handle, allocator_reservation));
+        let _ = self.handle.set(journal_handle);
 
         Ok(())
     }
@@ -618,14 +582,9 @@
         // TODO(csuter): handle the case where we are unable to extend the journal file.
         self.maybe_extend_journal_file(&mut writer).await.unwrap();
         // TODO(csuter): writing to the journal here can be asynchronous.
-        let journal_file_checkpoint = writer.journal_file_checkpoint();
-        writer.write_mutations(
-            transaction
-                .mutations
-                .iter()
-                .map(|TxnMutation { object_id, mutation, .. }| (*object_id, mutation.clone())),
-        );
-        if let Some((handle, _)) = self.handle_and_reservation.get() {
+        self.write_and_apply_mutations(&mut *writer, transaction).await;
+
+        if let Some(handle) = self.handle.get() {
             // TODO(jfsulliv): We should separate writing to the journal buffer from flushing the
             // journal buffer (i.e. consider doing this in a background task). Flushing here is
             // prone to deadlock, since |flush_buffer| itself creates a transaction which locks the
@@ -637,15 +596,12 @@
                 log::warn!("journal write failed: {}", e);
             }
         }
-        self.apply_mutations(transaction, journal_file_checkpoint).await;
-        self.inner.lock().unwrap().journal_file_offset =
-            writer.journal_file_checkpoint().file_offset;
     }
 
     async fn maybe_extend_journal_file(&self, writer: &mut JournalWriter) -> Result<(), Error> {
         // TODO(csuter): this currently assumes that a transaction can fit in CHUNK_SIZE.
         let file_offset = writer.journal_file_checkpoint().file_offset;
-        let (handle, reservation) = match self.handle_and_reservation.get() {
+        let handle = match self.handle.get() {
             None => return Ok(()),
             Some(x) => x,
         };
@@ -660,7 +616,8 @@
         let mut transaction = handle
             .new_transaction_with_options(Options {
                 skip_journal_checks: true,
-                allocator_reservation: Some(reservation),
+                borrow_metadata_space: true,
+                allocator_reservation: Some(self.objects.metadata_reservation()),
                 ..Default::default()
             })
             .await?;
@@ -671,17 +628,7 @@
             handle.zero(&mut transaction, 0..zero_offset).await?;
         }
 
-        let journal_file_checkpoint = writer.journal_file_checkpoint();
-
-        // We have to apply the mutations before writing them because we borrowed the writer for the
-        // transaction.  First we clone the mutations without the associated objects since that's
-        // where the handle is borrowed.
-        let cloned_mutations = clone_mutations(&transaction);
-
-        self.apply_mutations(&mut transaction, journal_file_checkpoint).await;
-
-        std::mem::drop(transaction);
-        writer.write_mutations(cloned_mutations);
+        self.write_and_apply_mutations(writer, &mut transaction).await;
 
         // We need to be sure that any journal records that arose from preallocation can fit in
         // within the old preallocated range.  If this situation arose (it shouldn't, so it would be
@@ -690,8 +637,7 @@
         if needs_extending {
             assert!(writer.journal_file_checkpoint().file_offset <= size);
             let file_offset = writer.journal_file_checkpoint().file_offset;
-            let (handle, _) = self.handle_and_reservation.get().unwrap();
-            assert!(file_offset + self.chunk_size() <= handle.get_size());
+            assert!(file_offset + self.chunk_size() <= self.handle.get().unwrap().get_size());
         }
 
         let mut inner = self.inner.lock().unwrap();
@@ -702,30 +648,6 @@
         Ok(())
     }
 
-    async fn apply_mutations(
-        &self,
-        transaction: &mut Transaction<'_>,
-        journal_file_checkpoint: JournalCheckpoint,
-    ) {
-        if self.trace.load(atomic::Ordering::Relaxed) {
-            log::info!("BEGIN TXN {}", journal_file_checkpoint.file_offset);
-        }
-        let mutations = std::mem::take(&mut transaction.mutations);
-        for TxnMutation { object_id, mutation, associated_object } in mutations {
-            self.apply_mutation(
-                object_id,
-                &journal_file_checkpoint,
-                mutation,
-                Some(transaction),
-                associated_object,
-            )
-            .await;
-        }
-        if self.trace.load(atomic::Ordering::Relaxed) {
-            log::info!("END TXN");
-        }
-    }
-
     // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
     // all records should be applied because the object store or allocator might already contain the
     // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
@@ -740,50 +662,22 @@
         journal_file_checkpoint.file_offset >= offset
     }
 
-    async fn apply_mutation(
-        &self,
-        object_id: u64,
-        journal_file_checkpoint: &JournalCheckpoint,
-        mutation: Mutation,
-        transaction: Option<&Transaction<'_>>,
-        object: AssocObj<'_>,
-    ) {
-        if transaction.is_some() || self.should_apply(object_id, journal_file_checkpoint) {
-            if self.trace.load(atomic::Ordering::Relaxed) {
-                log::info!("applying mutation: {}: {:?}", object_id, mutation);
-            }
-            self.objects
-                .apply_mutation(object_id, mutation, transaction, journal_file_checkpoint, object)
-                .await;
-        } else {
-            if self.trace.load(atomic::Ordering::Relaxed) {
-                log::info!("ignoring mutation: {}, {:?}", object_id, mutation);
-            }
-        }
-    }
-
     pub async fn write_super_block(&self) -> Result<(), Error> {
         let root_parent_store = self.objects.root_parent_store();
 
         // First we must lock the root parent store so that no new entries are written to it.
-        let sync = ObjectFlush::new(self.objects.clone(), root_parent_store.store_object_id());
         let mutable_layer = root_parent_store.tree().mutable_layer();
         let _guard = mutable_layer.lock_writes();
 
         // After locking, we need to flush the journal because it might have records that a new
         // super-block would refer to.
-        let journal_file_checkpoint = {
+        let (journal_file_checkpoint, borrowed) = {
             let mut writer = self.writer.lock().await;
 
-            // We are holding the appropriate locks now (no new transaction can be applied whilst we
-            // are holding the writer lock, so we can call ObjectFlush::begin for the root parent
-            // object store.
-            sync.begin();
-
             serialize_into(&mut *writer, &JournalRecord::EndBlock)?;
             writer.pad_to_block()?;
-            writer.flush_buffer(&self.handle_and_reservation.get().unwrap().0).await?;
-            writer.journal_file_checkpoint()
+            writer.flush_buffer(self.handle.get().unwrap()).await?;
+            (writer.journal_file_checkpoint(), self.objects.borrowed_metadata_space())
         };
 
         // We need to flush previous writes to the device since the new super-block we are writing
@@ -807,6 +701,7 @@
         new_super_block.super_block_journal_file_offset = journal_file_checkpoint.file_offset;
         new_super_block.journal_checkpoint = min_checkpoint.unwrap_or(journal_file_checkpoint);
         new_super_block.journal_file_offsets = journal_file_offsets;
+        new_super_block.borrowed_metadata_space = borrowed;
 
         // TODO(csuter); the super-block needs space reserved for it.
         new_super_block
@@ -832,8 +727,6 @@
             }
         }
 
-        sync.commit();
-
         Ok(())
     }
 
@@ -848,7 +741,7 @@
         let mut writer = self.writer.lock().await;
         serialize_into(&mut *writer, &JournalRecord::EndBlock)?;
         writer.pad_to_block()?;
-        writer.flush_buffer(&self.handle_and_reservation.get().unwrap().0).await?;
+        writer.flush_buffer(self.handle.get().unwrap()).await?;
         Ok(())
     }
 
@@ -863,8 +756,8 @@
         // The / 2 is here because after compacting, we cannot reclaim the space until the
         // _next_ time we flush the device since the super-block is not guaranteed to persist
         // until then.
-        let inner = self.inner.lock().unwrap();
-        inner.journal_file_offset - inner.super_block.journal_checkpoint.file_offset
+        self.objects.last_end_offset()
+            - self.inner.lock().unwrap().super_block.journal_checkpoint.file_offset
             > RECLAIM_SIZE / 2
     }
 
@@ -873,7 +766,7 @@
         loop {
             debug_assert_not_too_long!({
                 let mut inner = self.inner.lock().unwrap();
-                if inner.journal_file_offset - inner.super_block.journal_checkpoint.file_offset
+                if self.objects.last_end_offset() - inner.super_block.journal_checkpoint.file_offset
                     < RECLAIM_SIZE
                 {
                     break;
@@ -891,15 +784,35 @@
     fn chunk_size(&self) -> u64 {
         CHUNK_SIZE
     }
+
+    async fn write_and_apply_mutations(
+        &self,
+        writer: &mut JournalWriter,
+        transaction: &mut Transaction<'_>,
+    ) {
+        let checkpoint = writer.journal_file_checkpoint();
+        writer.write_mutations(transaction);
+        if let Some(mutation) = self.objects.apply_transaction(transaction, &checkpoint).await {
+            writer.write_record(&JournalRecord::Mutation { object_id: 0, mutation });
+        }
+        writer.write_record(&JournalRecord::Commit);
+        self.objects.did_commit_transaction(
+            transaction,
+            &checkpoint,
+            writer.journal_file_checkpoint().file_offset,
+        );
+    }
 }
 
 impl JournalWriter {
-    // Extends JournalWriter to write a transaction.
-    fn write_mutations<'a>(&mut self, mutations: impl IntoIterator<Item = (u64, Mutation)>) {
-        for (object_id, mutation) in mutations {
-            self.write_record(&JournalRecord::Mutation { object_id, mutation });
+    // Extends JournalWriter to write mutations.
+    fn write_mutations<'a>(&mut self, transaction: &Transaction<'_>) {
+        for TxnMutation { object_id, mutation, .. } in &transaction.mutations {
+            self.write_record(&JournalRecord::Mutation {
+                object_id: *object_id,
+                mutation: mutation.clone(),
+            });
         }
-        self.write_record(&JournalRecord::Commit);
     }
 }
 
diff --git a/src/storage/fxfs/src/object_store/journal/handle.rs b/src/storage/fxfs/src/object_store/journal/handle.rs
index 5557b29..0b5ab6e 100644
--- a/src/storage/fxfs/src/object_store/journal/handle.rs
+++ b/src/storage/fxfs/src/object_store/journal/handle.rs
@@ -122,6 +122,10 @@
         unreachable!();
     }
 
+    async fn overwrite(&self, _offset: u64, _buf: BufferRef<'_>) -> Result<(), Error> {
+        unreachable!();
+    }
+
     fn get_size(&self) -> u64 {
         self.size
     }
diff --git a/src/storage/fxfs/src/object_store/journal/reader.rs b/src/storage/fxfs/src/object_store/journal/reader.rs
index cc950a5..ca457fc 100644
--- a/src/storage/fxfs/src/object_store/journal/reader.rs
+++ b/src/storage/fxfs/src/object_store/journal/reader.rs
@@ -71,10 +71,6 @@
         JournalCheckpoint::new(self.buf_file_offset, self.checksums[0])
     }
 
-    pub fn read_offset(&self) -> u64 {
-        self.read_offset
-    }
-
     pub fn last_read_checksum(&self) -> Checksum {
         *self.checksums.last().unwrap()
     }
@@ -90,10 +86,6 @@
         }
     }
 
-    pub fn take_handle(self) -> OH {
-        self.handle
-    }
-
     pub fn handle(&mut self) -> &mut OH {
         &mut self.handle
     }
@@ -285,7 +277,7 @@
     }
 
     #[fasync::run_singlethreaded(test)]
-    async fn test_journal_file_checkpoint_and_take_handle() {
+    async fn test_journal_file_checkpoint() {
         let object = Arc::new(FakeObject::new());
         let mut reader = JournalReader::new(
             FakeObjectHandle::new(object.clone()),
@@ -306,31 +298,12 @@
         // If we take the checkpoint here and then create another reader, we should see the second
         // item.
         let checkpoint = reader.journal_file_checkpoint();
-        let mut reader = JournalReader::new(reader.take_handle(), TEST_BLOCK_SIZE, &checkpoint);
+        let mut reader =
+            JournalReader::new(FakeObjectHandle::new(object.clone()), TEST_BLOCK_SIZE, &checkpoint);
         assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(7u32));
     }
 
     #[fasync::run_singlethreaded(test)]
-    async fn test_read_offset() {
-        let object = Arc::new(FakeObject::new());
-        let mut reader = JournalReader::new(
-            FakeObjectHandle::new(object.clone()),
-            TEST_BLOCK_SIZE,
-            &JournalCheckpoint::default(),
-        );
-        assert_eq!(reader.read_offset(), 0);
-        // Make the journal file a minimum of two blocks since reading to EOF is an error.
-        let handle = FakeObjectHandle::new(object.clone());
-        let len = TEST_BLOCK_SIZE as usize * 2;
-        let mut buf = handle.allocate_buffer(len);
-        buf.as_mut_slice().fill(0u8);
-        handle.write(0, buf.as_ref()).await.expect("write failed");
-        write_items(FakeObjectHandle::new(object.clone()), &[4u32, 7u32]).await;
-        assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(4u32));
-        assert_eq!(reader.read_offset(), TEST_BLOCK_SIZE);
-    }
-
-    #[fasync::run_singlethreaded(test)]
     async fn test_skip_to_end_of_block() {
         let object = Arc::new(FakeObject::new());
         // Make the journal file a minimum of two blocks since reading to EOF is an error.
@@ -444,7 +417,7 @@
         writer.flush_buffer(&handle).await.expect("flush_buffer failed");
 
         let mut reader = JournalReader::new(
-            reader.take_handle(),
+            FakeObjectHandle::new(object.clone()),
             TEST_BLOCK_SIZE,
             &JournalCheckpoint::default(),
         );
@@ -465,7 +438,8 @@
         );
 
         // Make sure a reader can start from the middle of a reset block.
-        let mut reader = JournalReader::new(reader.take_handle(), TEST_BLOCK_SIZE, &checkpoint);
+        let mut reader =
+            JournalReader::new(FakeObjectHandle::new(object.clone()), TEST_BLOCK_SIZE, &checkpoint);
         assert_eq!(
             reader.deserialize().await.expect("deserialize failed"),
             ReadResult::Some(78u32)
@@ -504,3 +478,7 @@
         }
     }
 }
+
+// TODO(csuter): Add test that checks that the file offset *after* writing an entry that lies
+// *exactly* at the end of a journal block matches the file offset *after* reading that same entry
+// i.e. it should be *after* the checksum.
diff --git a/src/storage/fxfs/src/object_store/journal/super_block.rs b/src/storage/fxfs/src/object_store/journal/super_block.rs
index 07b40aa..408c9d5 100644
--- a/src/storage/fxfs/src/object_store/journal/super_block.rs
+++ b/src/storage/fxfs/src/object_store/journal/super_block.rs
@@ -15,6 +15,7 @@
                 JournalCheckpoint,
             },
             record::ObjectItem,
+            transaction::Options,
             ObjectStore,
         },
     },
@@ -117,6 +118,10 @@
 
     // object id -> journal file offset. Indicates where each object has been flushed to.
     pub journal_file_offsets: HashMap<u64, u64>,
+
+    // Records the amount of borrowed metadata space as applicable at
+    // `super_block_journal_file_offset`.
+    pub borrowed_metadata_space: u64,
 }
 
 #[derive(Serialize, Deserialize)]
@@ -194,12 +199,21 @@
         let mut iter = merger.seek(Bound::Unbounded).await?;
 
         let mut next_extent_offset = MIN_SUPER_BLOCK_SIZE;
+        let object_manager = root_parent_store.filesystem().object_manager();
+        let reservation = object_manager.metadata_reservation();
 
         while let Some(item_ref) = iter.get() {
             if writer.journal_file_checkpoint().file_offset
                 >= next_extent_offset - SUPER_BLOCK_CHUNK_SIZE
             {
-                let mut transaction = handle.new_transaction().await?;
+                let mut transaction = handle
+                    .new_transaction_with_options(Options {
+                        skip_journal_checks: true,
+                        borrow_metadata_space: true,
+                        allocator_reservation: Some(reservation),
+                        ..Default::default()
+                    })
+                    .await?;
                 let allocated = handle
                     .preallocate_range(
                         &mut transaction,
diff --git a/src/storage/fxfs/src/object_store/journal/writer.rs b/src/storage/fxfs/src/object_store/journal/writer.rs
index 9e7968c0..166710d 100644
--- a/src/storage/fxfs/src/object_store/journal/writer.rs
+++ b/src/storage/fxfs/src/object_store/journal/writer.rs
@@ -74,15 +74,9 @@
             // buffer. Doing so will require picking an appropriate size up front, and forcing
             // flush as we fill it up.
             let mut buf = handle.allocate_buffer(to_do);
-            buf.as_mut_slice()[..to_do].copy_from_slice(self.buf.drain(..to_do).as_slice());
-            buf.as_mut_slice()[to_do..].fill(0u8);
-            let mut txn = handle.new_transaction().await?;
-            handle.txn_write(&mut txn, self.checkpoint.file_offset, buf.as_ref()).await?;
-            // Any mutations would rely on the journal to be applied, so the transaction must be
-            // empty. Writes are done in overwrite mode and the journal file is pre-allocated,
-            // so they should not cause any mutations to be added to the transaction.
-            assert!(txn.is_empty());
-            txn.commit().await;
+            buf.as_mut_slice()[..to_do].copy_from_slice(&self.buf[..to_do]);
+            handle.overwrite(self.checkpoint.file_offset, buf.as_ref()).await?;
+            self.buf.drain(..to_do);
             self.checkpoint.file_offset += to_do as u64;
             self.checkpoint.checksum = self.last_checksum;
         }
diff --git a/src/storage/fxfs/src/object_store/object_manager.rs b/src/storage/fxfs/src/object_store/object_manager.rs
index bd07f2e..388a497 100644
--- a/src/storage/fxfs/src/object_store/object_manager.rs
+++ b/src/storage/fxfs/src/object_store/object_manager.rs
@@ -7,31 +7,56 @@
         lsm_tree::LSMTree,
         object_handle::INVALID_OBJECT_ID,
         object_store::{
-            allocator::Allocator,
+            allocator::{Allocator, Reservation},
             filesystem::Mutations,
             graveyard::Graveyard,
-            journal::{checksum_list::ChecksumList, JournalCheckpoint},
+            journal::{self, checksum_list::ChecksumList, JournalCheckpoint},
             merge::{self},
-            transaction::{AssocObj, AssociatedObject, Mutation, Transaction, TxnMutation},
+            transaction::{AssocObj, MetadataReservation, Mutation, Transaction, TxnMutation},
             ObjectStore,
         },
     },
     anyhow::Error,
     once_cell::sync::OnceCell,
     std::{
-        collections::HashMap,
+        collections::{hash_map::Entry, HashMap},
         sync::{Arc, RwLock},
     },
 };
 
+// Data written to the journal eventually needs to be flushed somewhere (typically into layer
+// files).  Here we conservatively assume that could take up to twice us much space as it does in
+// the journal.  In practice, it should be less than that.
+fn reserved_space_from_journal_usage(journal_usage: u64) -> u64 {
+    journal_usage * 2
+}
+
 /// ObjectManager is a global loading cache for object stores and other special objects.
 pub struct ObjectManager {
-    objects: RwLock<Objects>,
+    inner: RwLock<Inner>,
+    metadata_reservation: OnceCell<Reservation>,
+}
+
+// Whilst we are flushing we need to keep track of the old checkpoint that we are hoping to flush,
+// and a new one that should apply if we successfully finish the flush.
+enum Checkpoints {
+    Current(JournalCheckpoint),
+    Old(JournalCheckpoint),
+    Both(/* old: */ JournalCheckpoint, /* current: */ JournalCheckpoint),
+}
+
+impl Checkpoints {
+    // Returns the earliest checkpoint (which will always be the old one if present).
+    fn earliest(&self) -> &JournalCheckpoint {
+        match self {
+            Checkpoints::Old(x) | Checkpoints::Both(x, _) | Checkpoints::Current(x) => x,
+        }
+    }
 }
 
 // We currently maintain strong references to all stores that have been opened, but there's no
 // currently no mechanism for releasing stores that aren't being used.
-struct Objects {
+struct Inner {
     stores: HashMap<u64, Arc<ObjectStore>>,
     root_parent_store_object_id: u64,
     root_store_object_id: u64,
@@ -40,15 +65,49 @@
 
     // Records dependencies on the journal for objects i.e. an entry for object ID 1, would mean it
     // has a dependency on journal records from that offset.
-    journal_file_checkpoints: HashMap<u64, JournalCheckpoint>,
+    journal_file_checkpoints: HashMap<u64, Checkpoints>,
 
     graveyard: Option<Arc<Graveyard>>,
+
+    // Mappings from object-id to a target reservation amount.  The object IDs here are from the
+    // root store namespace, so it can be associated with any object in the root store.  A
+    // reservation will be made to cover the *maximum* in this map, since it is assumed that any
+    // requirement is only temporary, for the duration of a compaction, and that once compaction has
+    // finished for a particular object, the space will be recovered.
+    reservations: HashMap<u64, u64>,
+
+    // The last journal end offset for a transaction that has been applied.  This is not necessarily
+    // the same as the start offset for the next transaction because of padding.
+    last_end_offset: u64,
+
+    // A running counter that tracks metadata space that has been borrowed on the understanding that
+    // eventually it will be recovered (potentially after a full compaction).
+    borrowed_metadata_space: u64,
+}
+
+impl Inner {
+    // Returns the required size of the metadata reservation assuming that no space has been
+    // borrowed.  The invariant is: reservation-size + borrowed-space = required.
+    fn required_reservation(&self) -> u64 {
+        // Start with the maximum amount of temporary space we might need during compactions.
+        self.reservations.values().max().unwrap_or(&0)
+
+        // Account for data that has been written to the journal that will need to be written
+        // to layer files when flushed.
+            + self.journal_file_checkpoints.values().map(|c| c.earliest().file_offset).min()
+            .map(|min| reserved_space_from_journal_usage(self.last_end_offset - min))
+            .unwrap_or(0)
+
+        // Add extra for temporary space that might be tied up in the journal that hasn't yet been
+        // deallocated.
+            + journal::RESERVED_SPACE
+    }
 }
 
 impl ObjectManager {
     pub fn new() -> ObjectManager {
         ObjectManager {
-            objects: RwLock::new(Objects {
+            inner: RwLock::new(Inner {
                 stores: HashMap::new(),
                 root_parent_store_object_id: INVALID_OBJECT_ID,
                 root_store_object_id: INVALID_OBJECT_ID,
@@ -56,44 +115,48 @@
                 allocator: None,
                 journal_file_checkpoints: HashMap::new(),
                 graveyard: None,
+                reservations: HashMap::new(),
+                last_end_offset: 0,
+                borrowed_metadata_space: 0,
             }),
+            metadata_reservation: OnceCell::new(),
         }
     }
 
     pub fn store_object_ids(&self) -> Vec<u64> {
-        self.objects.read().unwrap().stores.keys().cloned().collect()
+        self.inner.read().unwrap().stores.keys().cloned().collect()
     }
 
     pub fn root_parent_store_object_id(&self) -> u64 {
-        self.objects.read().unwrap().root_parent_store_object_id
+        self.inner.read().unwrap().root_parent_store_object_id
     }
 
     pub fn root_parent_store(&self) -> Arc<ObjectStore> {
-        let objects = self.objects.read().unwrap();
-        objects.stores.get(&objects.root_parent_store_object_id).unwrap().clone()
+        let inner = self.inner.read().unwrap();
+        inner.stores.get(&inner.root_parent_store_object_id).unwrap().clone()
     }
 
     pub fn set_root_parent_store(&self, store: Arc<ObjectStore>) {
-        let mut objects = self.objects.write().unwrap();
+        let mut inner = self.inner.write().unwrap();
         let store_id = store.store_object_id();
-        objects.stores.insert(store_id, store);
-        objects.root_parent_store_object_id = store_id;
+        inner.stores.insert(store_id, store);
+        inner.root_parent_store_object_id = store_id;
     }
 
     pub fn root_store_object_id(&self) -> u64 {
-        self.objects.read().unwrap().root_store_object_id
+        self.inner.read().unwrap().root_store_object_id
     }
 
     pub fn root_store(&self) -> Arc<ObjectStore> {
-        let objects = self.objects.read().unwrap();
-        objects.stores.get(&objects.root_store_object_id).unwrap().clone()
+        let inner = self.inner.read().unwrap();
+        inner.stores.get(&inner.root_store_object_id).unwrap().clone()
     }
 
     pub fn set_root_store(&self, store: Arc<ObjectStore>) {
-        let mut objects = self.objects.write().unwrap();
+        let mut inner = self.inner.write().unwrap();
         let store_id = store.store_object_id();
-        objects.stores.insert(store_id, store);
-        objects.root_store_object_id = store_id;
+        inner.stores.insert(store_id, store);
+        inner.root_store_object_id = store_id;
     }
 
     /// When replaying the journal, we need to replay mutation records into the LSM tree, but we
@@ -102,12 +165,12 @@
     /// backing this store.  The store will get properly opened whenever an action is taken that
     /// needs the store to be opened (via ObjectStore::ensure_open).
     pub fn lazy_open_store(&self, store_object_id: u64) -> Arc<ObjectStore> {
-        let mut objects = self.objects.write().unwrap();
-        assert_ne!(store_object_id, objects.allocator_object_id);
-        let root_parent_store_object_id = objects.root_parent_store_object_id;
-        let root_store = objects.stores.get(&objects.root_store_object_id).unwrap().clone();
+        let mut inner = self.inner.write().unwrap();
+        assert_ne!(store_object_id, inner.allocator_object_id);
+        let root_parent_store_object_id = inner.root_parent_store_object_id;
+        let root_store = inner.stores.get(&inner.root_store_object_id).unwrap().clone();
         let fs = root_store.filesystem();
-        objects
+        inner
             .stores
             .entry(store_object_id)
             .or_insert_with(|| {
@@ -132,30 +195,31 @@
     }
 
     pub fn add_store(&self, store: Arc<ObjectStore>) {
-        let mut objects = self.objects.write().unwrap();
+        let mut inner = self.inner.write().unwrap();
         let store_object_id = store.store_object_id();
-        assert_ne!(store_object_id, objects.root_parent_store_object_id);
-        assert_ne!(store_object_id, objects.root_store_object_id);
-        assert_ne!(store_object_id, objects.allocator_object_id);
-        objects.stores.insert(store_object_id, store);
+        assert_ne!(store_object_id, inner.root_parent_store_object_id);
+        assert_ne!(store_object_id, inner.root_store_object_id);
+        assert_ne!(store_object_id, inner.allocator_object_id);
+        inner.stores.insert(store_object_id, store);
     }
 
     #[cfg(test)]
     pub fn forget_store(&self, store_object_id: u64) {
-        let mut objects = self.objects.write().unwrap();
-        assert_ne!(store_object_id, objects.allocator_object_id);
-        objects.stores.remove(&store_object_id);
+        let mut inner = self.inner.write().unwrap();
+        assert_ne!(store_object_id, inner.allocator_object_id);
+        inner.stores.remove(&store_object_id);
+        inner.reservations.remove(&store_object_id);
     }
 
     pub fn set_allocator(&self, allocator: Arc<dyn Allocator>) {
-        let mut objects = self.objects.write().unwrap();
-        assert!(!objects.stores.contains_key(&allocator.object_id()));
-        objects.allocator_object_id = allocator.object_id();
-        objects.allocator = Some(allocator.clone());
+        let mut inner = self.inner.write().unwrap();
+        assert!(!inner.stores.contains_key(&allocator.object_id()));
+        inner.allocator_object_id = allocator.object_id();
+        inner.allocator = Some(allocator.clone());
     }
 
     pub fn allocator(&self) -> Arc<dyn Allocator> {
-        self.objects.read().unwrap().allocator.clone().unwrap()
+        self.inner.read().unwrap().allocator.clone().unwrap()
     }
 
     /// Used during replay to validate a mutation.  This should return false if the mutation is not
@@ -171,9 +235,9 @@
         checksum_list: &mut ChecksumList,
     ) -> Result<bool, Error> {
         if let Some(allocator) = {
-            let objects = self.objects.read().unwrap();
-            if object_id == objects.allocator_object_id {
-                Some(objects.allocator.clone().unwrap())
+            let inner = self.inner.read().unwrap();
+            if object_id == inner.allocator_object_id {
+                Some(inner.allocator.clone().unwrap())
             } else {
                 None
             }
@@ -184,11 +248,7 @@
         }
     }
 
-    /// The journaling system should call this when a mutation needs to be applied. |replay|
-    /// indicates whether this is for replay.  |checkpoint| indicates the location in the journal
-    /// file for this mutation and is used to keep track of each object's dependencies on the
-    /// journal.
-    pub async fn apply_mutation(
+    async fn apply_mutation(
         &self,
         object_id: u64,
         mutation: Mutation,
@@ -196,13 +256,53 @@
         checkpoint: &JournalCheckpoint,
         associated_object: AssocObj<'_>,
     ) {
+        log::debug!("applying mutation: {}: {:?}", object_id, mutation);
         let object = {
-            let mut objects = self.objects.write().unwrap();
-            objects.journal_file_checkpoints.entry(object_id).or_insert_with(|| checkpoint.clone());
-            if object_id == objects.allocator_object_id {
-                Some(objects.allocator.clone().unwrap().as_mutations())
+            let mut inner = self.inner.write().unwrap();
+            match mutation {
+                Mutation::BeginFlush => {
+                    if let Some(entry) = inner.journal_file_checkpoints.get_mut(&object_id) {
+                        match entry {
+                            Checkpoints::Current(x) | Checkpoints::Both(x, _) => {
+                                *entry = Checkpoints::Old(x.clone());
+                            }
+                            _ => {}
+                        }
+                    }
+                }
+                Mutation::EndFlush => {
+                    if let Entry::Occupied(mut o) = inner.journal_file_checkpoints.entry(object_id)
+                    {
+                        let entry = o.get_mut();
+                        match entry {
+                            Checkpoints::Old(_) => {
+                                o.remove();
+                            }
+                            Checkpoints::Both(_, x) => {
+                                *entry = Checkpoints::Current(x.clone());
+                            }
+                            _ => {}
+                        }
+                    }
+                }
+                _ => {
+                    if object_id != inner.root_parent_store_object_id {
+                        inner
+                            .journal_file_checkpoints
+                            .entry(object_id)
+                            .and_modify(|entry| {
+                                if let Checkpoints::Old(x) = entry {
+                                    *entry = Checkpoints::Both(x.clone(), checkpoint.clone());
+                                }
+                            })
+                            .or_insert_with(|| Checkpoints::Current(checkpoint.clone()));
+                    }
+                }
+            }
+            if object_id == inner.allocator_object_id {
+                Some(inner.allocator.clone().unwrap().as_mutations())
             } else {
-                objects.stores.get(&object_id).map(|x| x.clone() as Arc<dyn Mutations>)
+                inner.stores.get(&object_id).map(|x| x.clone() as Arc<dyn Mutations>)
             }
         }
         .unwrap_or_else(|| self.lazy_open_store(object_id));
@@ -212,6 +312,151 @@
             .await;
     }
 
+    /// Called by the journaling system to replay the given mutations.  `checkpoint` indicates the
+    /// location in the journal file for this transaction and `end_offset` is the ending journal
+    /// offset.
+    pub async fn replay_mutations(
+        &self,
+        mutations: Vec<(u64, Mutation)>,
+        journal_file_checkpoint: JournalCheckpoint,
+        end_offset: u64,
+    ) {
+        log::debug!("REPLAY {}", journal_file_checkpoint.file_offset);
+        let txn_size = {
+            let mut inner = self.inner.write().unwrap();
+            if end_offset > inner.last_end_offset {
+                Some(end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset))
+            } else {
+                None
+            }
+        };
+        for (object_id, mutation) in mutations {
+            if let Mutation::UpdateBorrowed(borrowed) = mutation {
+                if let Some(txn_size) = txn_size {
+                    self.inner.write().unwrap().borrowed_metadata_space =
+                        borrowed + reserved_space_from_journal_usage(txn_size);
+                }
+                continue;
+            }
+            self.apply_mutation(
+                object_id,
+                mutation,
+                None,
+                &journal_file_checkpoint,
+                AssocObj::None,
+            )
+            .await;
+        }
+    }
+
+    /// Called by the journaling system to apply a transaction.  `checkpoint` indicates the location
+    /// in the journal file for this transaction.  Returns an optional mutation to be written to be
+    /// included with the transaction.
+    pub async fn apply_transaction(
+        &self,
+        transaction: &mut Transaction<'_>,
+        checkpoint: &JournalCheckpoint,
+    ) -> Option<Mutation> {
+        // Record old values so we can see what changes as a result of this transaction.
+        let old_amount = self.metadata_reservation().amount();
+        let old_required = self.inner.read().unwrap().required_reservation();
+
+        log::debug!("BEGIN TXN {}", checkpoint.file_offset);
+        let mutations = std::mem::take(&mut transaction.mutations);
+        for TxnMutation { object_id, mutation, associated_object } in mutations {
+            self.apply_mutation(
+                object_id,
+                mutation,
+                Some(transaction),
+                &checkpoint,
+                associated_object,
+            )
+            .await;
+        }
+        log::debug!("END TXN");
+
+        if let MetadataReservation::Borrowed = transaction.metadata_reservation {
+            // If this transaction is borrowing metadata, figure out what has changed and return a
+            // mutation with the updated value for borrowed.  The transaction might have allocated
+            // or deallocated some data from the metadata reservation, or it might have made a
+            // change that means we need to reserve more or less space (e.g. we compacted).
+            let new_amount = self.metadata_reservation().amount();
+            let mut inner = self.inner.write().unwrap();
+            let new_required = inner.required_reservation();
+            let add = old_amount + new_required;
+            let sub = new_amount + old_required;
+            if add >= sub {
+                inner.borrowed_metadata_space += add - sub;
+            } else {
+                inner.borrowed_metadata_space =
+                    inner.borrowed_metadata_space.saturating_sub(sub - add);
+            }
+            Some(Mutation::UpdateBorrowed(inner.borrowed_metadata_space))
+        } else {
+            // This transaction should have had no impact on the metadata reservation or the amount
+            // we need to reserve.
+            debug_assert_eq!(self.metadata_reservation().amount(), old_amount);
+            debug_assert_eq!(self.inner.read().unwrap().required_reservation(), old_required);
+            None
+        }
+    }
+
+    /// Called by the journaling system after a transaction has been written providing the end
+    /// offset for the transaction so that we can adjust borrowed metadata space accordingly.
+    pub fn did_commit_transaction(
+        &self,
+        transaction: &mut Transaction<'_>,
+        _checkpoint: &JournalCheckpoint,
+        end_offset: u64,
+    ) {
+        let reservation = self.metadata_reservation();
+        let mut inner = self.inner.write().unwrap();
+        let txn_space = reserved_space_from_journal_usage(
+            end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset),
+        );
+        match &mut transaction.metadata_reservation {
+            MetadataReservation::Borrowed => {
+                // Account for the amount we need to borrow for the transaction itself now that we
+                // know the transaction size.
+                inner.borrowed_metadata_space += txn_space;
+
+                // This transaction borrowed metadata space, but it might have returned space to the
+                // transaction that we can now give back to the allocator.
+                let to_give_back = (reservation.amount() + inner.borrowed_metadata_space)
+                    .saturating_sub(inner.required_reservation());
+                if to_give_back > 0 {
+                    reservation.give_back(to_give_back);
+                }
+            }
+            MetadataReservation::Hold(hold_amount) => {
+                // Transfer reserved space into the metadata reservation.
+                let txn_reservation = transaction.allocator_reservation.unwrap();
+                assert_ne!(
+                    txn_reservation as *const _, reservation as *const _,
+                    "MetadataReservation::Borrowed should be used."
+                );
+                txn_reservation.commit(txn_space);
+                *hold_amount -= txn_space;
+                reservation.add(txn_space);
+            }
+            MetadataReservation::Reservation(txn_reservation) => {
+                // Transfer reserved space into the metadata reservation.
+                txn_reservation.commit(txn_space);
+                reservation.add(txn_space);
+            }
+        }
+        // Check that our invariant holds true.
+        debug_assert_eq!(
+            reservation.amount() + inner.borrowed_metadata_space,
+            inner.required_reservation(),
+            "txn_space: {}, reservation_amount: {}, borrowed: {}, required: {}",
+            txn_space,
+            reservation.amount(),
+            inner.borrowed_metadata_space,
+            inner.required_reservation(),
+        );
+    }
+
     /// Drops a transaction.  This is called automatically when a transaction is dropped.  If the
     /// transaction has been committed, it should contain no mutations and so nothing will get rolled
     /// back.  For each mutation, drop_mutation is called to allow for roll back (e.g. the allocator
@@ -225,10 +470,11 @@
     /// Returns the journal file offsets that each object depends on and the checkpoint for the
     /// minimum offset.
     pub fn journal_file_offsets(&self) -> (HashMap<u64, u64>, Option<JournalCheckpoint>) {
-        let objects = self.objects.read().unwrap();
+        let inner = self.inner.read().unwrap();
         let mut min_checkpoint = None;
         let mut offsets = HashMap::new();
-        for (&object_id, checkpoint) in &objects.journal_file_checkpoints {
+        for (&object_id, checkpoint) in &inner.journal_file_checkpoints {
+            let checkpoint = checkpoint.earliest();
             match &mut min_checkpoint {
                 None => min_checkpoint = Some(checkpoint),
                 Some(ref mut min_checkpoint) => {
@@ -245,21 +491,21 @@
     /// Returns true if the object identified by `object_id` is known to have updates recorded in
     /// the journal that the object depends upon.
     pub fn needs_flush(&self, object_id: u64) -> bool {
-        self.objects.read().unwrap().journal_file_checkpoints.contains_key(&object_id)
+        self.inner.read().unwrap().journal_file_checkpoints.contains_key(&object_id)
     }
 
     pub fn graveyard(&self) -> Option<Arc<Graveyard>> {
-        self.objects.read().unwrap().graveyard.clone()
+        self.inner.read().unwrap().graveyard.clone()
     }
 
     pub fn register_graveyard(&self, graveyard: Arc<Graveyard>) {
-        self.objects.write().unwrap().graveyard = Some(graveyard);
+        self.inner.write().unwrap().graveyard = Some(graveyard);
     }
 
     /// Flushes all known objects.  This will then allow the journal space to be freed.
     pub async fn flush(&self) -> Result<(), Error> {
         let object_ids: Vec<_> =
-            self.objects.read().unwrap().journal_file_checkpoints.keys().cloned().collect();
+            self.inner.read().unwrap().journal_file_checkpoints.keys().cloned().collect();
         for object_id in object_ids {
             self.object(object_id).unwrap().flush().await?;
         }
@@ -267,73 +513,45 @@
     }
 
     fn object(&self, object_id: u64) -> Option<Arc<dyn Mutations>> {
-        let objects = self.objects.read().unwrap();
-        if object_id == objects.allocator_object_id {
-            Some(objects.allocator.clone().unwrap().as_mutations())
+        let inner = self.inner.read().unwrap();
+        if object_id == inner.allocator_object_id {
+            Some(inner.allocator.clone().unwrap().as_mutations())
         } else {
-            objects.stores.get(&object_id).map(|x| x.clone() as Arc<dyn Mutations>)
-        }
-    }
-}
-
-/// ObjectFlush is used by objects to indicate some kind of event such that if successful, existing
-/// mutation records are no longer required from the journal.  For example, for object stores, it is
-/// used when the in-memory layer is persisted since once that is done the records in the journal
-/// are no longer required.  Clients must make sure to call the commit function upon success; the
-/// default is to roll back.
-#[must_use]
-pub struct ObjectFlush {
-    object_manager: Arc<ObjectManager>,
-    object_id: u64,
-    old_journal_file_checkpoint: OnceCell<JournalCheckpoint>,
-}
-
-impl ObjectFlush {
-    pub fn new(object_manager: Arc<ObjectManager>, object_id: u64) -> Self {
-        Self { object_manager, object_id, old_journal_file_checkpoint: OnceCell::new() }
-    }
-
-    /// This marks the point at which the flush is beginning.  This begins a commitment (in the
-    /// absence of errors) to flush _all_ mutations that were made to the object prior to this point
-    /// and should therefore be called when appropriate locks are held (see the AssociatedObject
-    /// implementation below).  Mutations that come after this will be preserved in the journal
-    /// until the next flush.  This can panic if called more than once; it shouldn't be called
-    /// directly if being used as an AssociatedObject since will_apply_mutation will call it below.
-    pub fn begin(&self) {
-        if let Some(checkpoint) = self
-            .object_manager
-            .objects
-            .write()
-            .unwrap()
-            .journal_file_checkpoints
-            .remove(&self.object_id)
-        {
-            self.old_journal_file_checkpoint.set(checkpoint).unwrap();
+            inner.stores.get(&object_id).map(|x| x.clone() as Arc<dyn Mutations>)
         }
     }
 
-    pub fn commit(mut self) {
-        self.old_journal_file_checkpoint.take();
-    }
-}
-
-impl Drop for ObjectFlush {
-    fn drop(&mut self) {
-        if let Some(checkpoint) = self.old_journal_file_checkpoint.take() {
-            self.object_manager
-                .objects
-                .write()
+    pub fn metadata_reservation(&self) -> &Reservation {
+        self.metadata_reservation.get_or_init(|| {
+            let inner = self.inner.read().unwrap();
+            // TODO(csuter): Find a way to gracefully recover here.
+            self.allocator()
+                .reserve(inner.required_reservation() - inner.borrowed_metadata_space)
                 .unwrap()
-                .journal_file_checkpoints
-                .insert(self.object_id, checkpoint);
-        }
+        })
     }
-}
 
-/// ObjectFlush can be used as an associated object in a transaction such that we begin the flush at
-/// the appropriate time (whilst a lock is held on the journal).
-impl AssociatedObject for ObjectFlush {
-    fn will_apply_mutation(&self, _: &Mutation) {
-        self.begin();
+    pub fn update_reservation(&self, object_id: u64, amount: u64) {
+        self.inner.write().unwrap().reservations.insert(object_id, amount);
+    }
+
+    pub fn last_end_offset(&self) -> u64 {
+        self.inner.read().unwrap().last_end_offset
+    }
+
+    pub fn set_last_end_offset(&self, v: u64) {
+        self.inner.write().unwrap().last_end_offset = v;
+    }
+
+    pub fn remove_journal_file_checkpoints(&self, object_id: u64) {
+        self.inner.write().unwrap().journal_file_checkpoints.remove(&object_id);
+    }
+
+    pub fn borrowed_metadata_space(&self) -> u64 {
+        self.inner.read().unwrap().borrowed_metadata_space
+    }
+
+    pub fn set_borrowed_metadata_space(&self, v: u64) {
+        self.inner.write().unwrap().borrowed_metadata_space = v;
     }
 }
diff --git a/src/storage/fxfs/src/object_store/testing/fake_allocator.rs b/src/storage/fxfs/src/object_store/testing/fake_allocator.rs
index ca95a5e..0444397 100644
--- a/src/storage/fxfs/src/object_store/testing/fake_allocator.rs
+++ b/src/storage/fxfs/src/object_store/testing/fake_allocator.rs
@@ -90,15 +90,15 @@
 
     async fn did_flush_device(&self, _flush_log_offset: u64) {}
 
-    fn reserve(self: Arc<Self>, _amount: u64) -> Option<Reservation> {
-        panic!("Not supported");
+    fn reserve(self: Arc<Self>, amount: u64) -> Option<Reservation> {
+        Some(Reservation::new(self, amount))
     }
 
-    fn reserve_at_most(self: Arc<Self>, _amount: u64) -> Reservation {
-        panic!("Not supported");
+    fn reserve_at_most(self: Arc<Self>, amount: u64) -> Reservation {
+        Reservation::new(self, amount)
     }
 
-    fn release_reservation(&self, _reservation: &mut Reservation) {}
+    fn release_reservation(&self, _amount: u64) {}
 
     fn get_allocated_bytes(&self) -> u64 {
         let inner = self.0.lock().unwrap();
diff --git a/src/storage/fxfs/src/object_store/testing/fake_filesystem.rs b/src/storage/fxfs/src/object_store/testing/fake_filesystem.rs
index 4c7e474f..fd44368 100644
--- a/src/storage/fxfs/src/object_store/testing/fake_filesystem.rs
+++ b/src/storage/fxfs/src/object_store/testing/fake_filesystem.rs
@@ -4,19 +4,18 @@
 
 use {
     crate::object_store::{
-        allocator::{Allocator, Reservation},
+        allocator::Allocator,
         filesystem::{Filesystem, SyncOptions},
         journal::JournalCheckpoint,
         object_manager::ObjectManager,
         transaction::{
-            LockKey, LockManager, Options, ReadGuard, Transaction, TransactionHandler, TxnMutation,
-            WriteGuard,
+            LockKey, LockManager, MetadataReservation, Options, ReadGuard, Transaction,
+            TransactionHandler, WriteGuard,
         },
         ObjectStore,
     },
     anyhow::Error,
     async_trait::async_trait,
-    once_cell::sync::OnceCell,
     std::sync::{
         atomic::{AtomicU64, Ordering},
         Arc,
@@ -29,7 +28,6 @@
     object_manager: Arc<ObjectManager>,
     lock_manager: LockManager,
     num_syncs: AtomicU64,
-    flush_reservation: OnceCell<Reservation>,
 }
 
 impl FakeFilesystem {
@@ -40,7 +38,6 @@
             object_manager,
             lock_manager: LockManager::new(),
             num_syncs: AtomicU64::new(0),
-            flush_reservation: OnceCell::new(),
         })
     }
 }
@@ -67,11 +64,6 @@
         self.num_syncs.fetch_add(1u64, Ordering::Relaxed);
         Ok(())
     }
-
-    fn flush_reservation(&self) -> &Reservation {
-        self.flush_reservation
-            .get_or_init(|| self.object_manager.allocator().reserve(262144).unwrap())
-    }
 }
 
 #[async_trait]
@@ -79,28 +71,21 @@
     async fn new_transaction<'a>(
         self: Arc<Self>,
         locks: &[LockKey],
-        _options: Options<'a>,
+        options: Options<'a>,
     ) -> Result<Transaction<'a>, Error> {
-        Ok(Transaction::new(self, &[], locks).await)
+        let reservation = if options.borrow_metadata_space {
+            MetadataReservation::Borrowed
+        } else {
+            MetadataReservation::Reservation(self.allocator().reserve_at_most(10000))
+        };
+        Ok(Transaction::new(self, reservation, &[], locks).await)
     }
 
     async fn commit_transaction(self: Arc<Self>, transaction: &mut Transaction<'_>) {
         let checkpoint =
             JournalCheckpoint { file_offset: self.num_syncs.load(Ordering::Relaxed), checksum: 0 };
         self.lock_manager.commit_prepare(transaction).await;
-        for TxnMutation { object_id, mutation, associated_object } in
-            std::mem::take(&mut transaction.mutations)
-        {
-            self.object_manager
-                .apply_mutation(
-                    object_id,
-                    mutation,
-                    Some(transaction),
-                    &checkpoint,
-                    associated_object,
-                )
-                .await;
-        }
+        self.object_manager.apply_transaction(transaction, &checkpoint).await;
     }
 
     fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
diff --git a/src/storage/fxfs/src/object_store/transaction.rs b/src/storage/fxfs/src/object_store/transaction.rs
index 2c929ec..5484b2e 100644
--- a/src/storage/fxfs/src/object_store/transaction.rs
+++ b/src/storage/fxfs/src/object_store/transaction.rs
@@ -37,12 +37,11 @@
     /// might alleviate journal space (i.e. compaction).
     pub skip_journal_checks: bool,
 
-    /// If false (the default), check for free space and fail creating the transaction with a
-    /// NoSpace error when low on space.  If skip_journal_checks is true, this setting is implied
-    /// (setting it to false will be ignored).  This setting should be set to true for any
-    /// transaction that will either not affect space usage after compaction (e.g. setting
-    /// attributes), or reduce space usage (e.g. unlinking).
-    pub skip_space_checks: bool,
+    /// If true, borrow metadata space from the metadata reservation.  This setting should be set to
+    /// true for any transaction that will either not affect space usage after compaction
+    /// (e.g. setting attributes), or reduce space usage (e.g. unlinking).  Otherwise, a transaction
+    /// might fail with an out-of-space error.
+    pub borrow_metadata_space: bool,
 
     /// If specified, a reservation to be used with the transaction.  If not set, any allocations
     /// that are part of this transaction will have to take their chances, and will fail if there is
@@ -96,10 +95,12 @@
     Allocator(AllocatorMutation),
     // Like an Allocator mutation, but without any change in allocated counts.
     AllocatorRef(AllocatorMutation),
-    // Seal the mutable layer and create a new one.
-    TreeSeal,
-    // Discards all non-mutable layers.
-    TreeCompact,
+    // Indicates the beginning of a flush.  This would typically involve sealing a tree.
+    BeginFlush,
+    // Indicates the end of a flush.  This would typically involve replacing the immutable layers
+    // with compacted ones.
+    EndFlush,
+    UpdateBorrowed(u64),
 }
 
 impl Mutation {
@@ -356,6 +357,18 @@
     }
 }
 
+pub enum MetadataReservation {
+    // Metadata space for this transaction is being borrowed from ObjectManager's metadata
+    // reservation.
+    Borrowed,
+
+    // A metadata reservation was made when the transaction was created.
+    Reservation(Reservation),
+
+    // The metadata space is being _held_ within `allocator_reservation`.
+    Hold(u64),
+}
+
 /// A transaction groups mutation records to be committed as a group.
 pub struct Transaction<'a> {
     handler: Arc<dyn TransactionHandler>,
@@ -371,6 +384,9 @@
 
     /// If set, an allocator reservation that should be used for allocations.
     pub allocator_reservation: Option<&'a Reservation>,
+
+    /// The reservation for the metadata for this transaction.
+    pub metadata_reservation: MetadataReservation,
 }
 
 impl<'a> Transaction<'a> {
@@ -379,6 +395,7 @@
     /// locks (see LockManager for the semantics of the different kinds of locks).
     pub async fn new<H: TransactionHandler + AsRef<LockManager> + 'static>(
         handler: Arc<H>,
+        metadata_reservation: MetadataReservation,
         read_locks: &[LockKey],
         txn_locks: &[LockKey],
     ) -> Transaction<'a> {
@@ -394,6 +411,7 @@
             txn_locks,
             read_locks,
             allocator_reservation: None,
+            metadata_reservation,
         }
     }
 
@@ -768,7 +786,7 @@
 mod tests {
     use {
         super::{LockKey, LockManager, LockState, Mutation, Options, TransactionHandler},
-        crate::object_store::testing::fake_filesystem::FakeFilesystem,
+        crate::object_store::filesystem::FxFilesystem,
         fuchsia_async as fasync,
         futures::{channel::oneshot::channel, future::FutureExt, join},
         std::{sync::Mutex, task::Poll, time::Duration},
@@ -777,21 +795,21 @@
 
     #[fasync::run_singlethreaded(test)]
     async fn test_simple() {
-        let device = DeviceHolder::new(FakeDevice::new(1024, 1024));
-        let fs = FakeFilesystem::new(device);
+        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
+        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
         let mut t = fs
             .clone()
             .new_transaction(&[], Options::default())
             .await
             .expect("new_transaction failed");
-        t.add(1, Mutation::TreeSeal);
+        t.add(1, Mutation::BeginFlush);
         assert!(!t.is_empty());
     }
 
     #[fasync::run_singlethreaded(test)]
     async fn test_locks() {
-        let device = DeviceHolder::new(FakeDevice::new(1024, 1024));
-        let fs = FakeFilesystem::new(device);
+        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
+        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
         let (send1, recv1) = channel();
         let (send2, recv2) = channel();
         let (send3, recv3) = channel();
@@ -835,8 +853,8 @@
 
     #[fasync::run_singlethreaded(test)]
     async fn test_read_lock_after_write_lock() {
-        let device = DeviceHolder::new(FakeDevice::new(1024, 1024));
-        let fs = FakeFilesystem::new(device);
+        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
+        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
         let (send1, recv1) = channel();
         let (send2, recv2) = channel();
         let done = Mutex::new(false);
@@ -868,8 +886,8 @@
 
     #[fasync::run_singlethreaded(test)]
     async fn test_write_lock_after_read_lock() {
-        let device = DeviceHolder::new(FakeDevice::new(1024, 1024));
-        let fs = FakeFilesystem::new(device);
+        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
+        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
         let (send1, recv1) = channel();
         let (send2, recv2) = channel();
         let done = Mutex::new(false);
@@ -901,8 +919,8 @@
 
     #[fasync::run_singlethreaded(test)]
     async fn test_drop_uncommitted_transaction() {
-        let device = DeviceHolder::new(FakeDevice::new(1024, 1024));
-        let fs = FakeFilesystem::new(device);
+        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
+        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
         let key = LockKey::object(1, 1);
 
         // Dropping while there's a reader.
diff --git a/src/storage/fxfs/src/server/directory.rs b/src/storage/fxfs/src/server/directory.rs
index 7a3bcef..0a301e1 100644
--- a/src/storage/fxfs/src/server/directory.rs
+++ b/src/storage/fxfs/src/server/directory.rs
@@ -107,6 +107,7 @@
         self: &Arc<Self>,
         extra_keys: &[LockKey],
         name: &str,
+        borrow_metadata_space: bool,
     ) -> Result<(Transaction<'a>, u64, ObjectDescriptor), Error> {
         // Since we don't know the child object ID until we've looked up the child, we need to loop
         // until we have acquired a lock on a child whose ID is the same as it was in the last
@@ -131,7 +132,7 @@
             let transaction = fs
                 .new_transaction(
                     &lock_keys,
-                    Options { skip_space_checks: true, ..Default::default() },
+                    Options { borrow_metadata_space, ..Default::default() },
                 )
                 .await?;
 
@@ -353,7 +354,7 @@
     async fn unlink(&self, name: &str, must_be_directory: bool) -> Result<(), Status> {
         let this = self.as_strong().await;
         let (mut transaction, _object_id, object_descriptor) =
-            this.acquire_transaction_for_unlink(&[], name).await.map_err(map_to_status)?;
+            this.acquire_transaction_for_unlink(&[], name, true).await.map_err(map_to_status)?;
         if let ObjectDescriptor::Directory = object_descriptor {
         } else if must_be_directory {
             return Err(Status::NOT_DIR);
@@ -404,7 +405,7 @@
             .clone()
             .new_transaction(
                 &[LockKey::object(self.store().store_object_id(), self.directory.object_id())],
-                Options { skip_space_checks: true, ..Default::default() },
+                Options { borrow_metadata_space: true, ..Default::default() },
             )
             .await
             .map_err(map_to_status)?;
diff --git a/src/storage/fxfs/src/server/file.rs b/src/storage/fxfs/src/server/file.rs
index 226b75b..92b8312 100644
--- a/src/storage/fxfs/src/server/file.rs
+++ b/src/storage/fxfs/src/server/file.rs
@@ -164,7 +164,10 @@
         // actually use any data on disk (either for data or metadata).
         let mut transaction = self
             .handle
-            .new_transaction_with_options(Options { skip_space_checks: true, ..Default::default() })
+            .new_transaction_with_options(Options {
+                borrow_metadata_space: true,
+                ..Default::default()
+            })
             .await
             .map_err(map_to_status)?;
         self.handle.truncate(&mut transaction, length).await.map_err(map_to_status)?;
@@ -221,7 +224,7 @@
             Some(
                 self.handle
                     .new_transaction_with_options(Options {
-                        skip_space_checks: true,
+                        borrow_metadata_space: true,
                         ..Default::default()
                     })
                     .await
diff --git a/src/storage/fxfs/src/server/volume.rs b/src/storage/fxfs/src/server/volume.rs
index c47f780..068a2c9 100644
--- a/src/storage/fxfs/src/server/volume.rs
+++ b/src/storage/fxfs/src/server/volume.rs
@@ -108,7 +108,7 @@
             }
         }
         self.store
-            .tombstone(object_id, Options { skip_space_checks: true, ..Default::default() })
+            .tombstone(object_id, Options { borrow_metadata_space: true, ..Default::default() })
             .await?;
         Ok(())
     }
@@ -142,6 +142,7 @@
             .acquire_transaction_for_unlink(
                 &[LockKey::object(self.store.store_object_id(), src_dir.object_id())],
                 dst,
+                false,
             )
             .await
         {
@@ -153,9 +154,9 @@
                             LockKey::object(self.store.store_object_id(), src_dir.object_id()),
                             LockKey::object(self.store.store_object_id(), dst_dir.object_id()),
                         ],
-                        // It's ok to skip space checks here since after compaction, it should be a
-                        // wash.
-                        Options { skip_space_checks: true, ..Default::default() },
+                        // It's ok to borrow metadata space here since after compaction, it should
+                        // be a wash.
+                        Options { borrow_metadata_space: true, ..Default::default() },
                     )
                     .await
                     .map_err(map_to_status)?;
diff --git a/src/storage/fxfs/src/testing/fake_object.rs b/src/storage/fxfs/src/testing/fake_object.rs
index 3f824fe..e5a4269 100644
--- a/src/storage/fxfs/src/testing/fake_object.rs
+++ b/src/storage/fxfs/src/testing/fake_object.rs
@@ -7,8 +7,8 @@
         object_handle::{ObjectHandle, ObjectProperties},
         object_store::{
             transaction::{
-                LockKey, LockManager, Options, ReadGuard, Transaction, TransactionHandler,
-                WriteGuard,
+                LockKey, LockManager, MetadataReservation, Options, ReadGuard, Transaction,
+                TransactionHandler, WriteGuard,
             },
             Timestamp,
         },
@@ -67,7 +67,7 @@
         locks: &[LockKey],
         _options: Options<'a>,
     ) -> Result<Transaction<'a>, Error> {
-        Ok(Transaction::new(self, &[], locks).await)
+        Ok(Transaction::new(self, MetadataReservation::Borrowed, &[], locks).await)
     }
 
     async fn commit_transaction(self: Arc<Self>, transaction: &mut Transaction<'_>) {
@@ -133,6 +133,10 @@
         self.object.write(offset, buf)
     }
 
+    async fn overwrite(&self, offset: u64, buf: BufferRef<'_>) -> Result<(), Error> {
+        self.object.write(offset, buf)
+    }
+
     fn get_size(&self) -> u64 {
         self.object.get_size()
     }