[fxfs] Make fsck check reference counts

And then fix up all test cases so that fsck passes.  Compactions should
no longer leak objects if they fail part-way through.

Change-Id: I190bc49220cbae7353271a7c84351a6a4c6b4091
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/524682
Commit-Queue: Chris Suter <csuter@google.com>
Reviewed-by: James Sullivan <jfsulliv@google.com>
diff --git a/src/storage/fxfs/BUILD.gn b/src/storage/fxfs/BUILD.gn
index 1263eef..46aee1d 100644
--- a/src/storage/fxfs/BUILD.gn
+++ b/src/storage/fxfs/BUILD.gn
@@ -47,6 +47,7 @@
   "src/object_store/directory.rs",
   "src/object_store/filesystem.rs",
   "src/object_store/fsck.rs",
+  "src/object_store/graveyard.rs",
   "src/object_store/journal.rs",
   "src/object_store/journal/reader.rs",
   "src/object_store/journal/super_block.rs",
diff --git a/src/storage/fxfs/src/lsm_tree.rs b/src/storage/fxfs/src/lsm_tree.rs
index 7b47aea..2d902b6 100644
--- a/src/storage/fxfs/src/lsm_tree.rs
+++ b/src/storage/fxfs/src/lsm_tree.rs
@@ -83,6 +83,17 @@
         Ok(())
     }
 
+    /// Appends to the given layers at the end i.e. they should be base layers.  This is supposed
+    /// to be used after replay when we are opening a tree and we have discovered the base layers.
+    pub async fn append_layers(
+        &self,
+        handles: Box<[impl ObjectHandle + 'static]>,
+    ) -> Result<(), Error> {
+        let mut layers = Self::layers_from_handles(handles).await?;
+        self.data.write().unwrap().layers.append(&mut layers);
+        Ok(())
+    }
+
     /// Resets the immutable layers.
     pub fn reset_immutable_layers(&self) {
         self.data.write().unwrap().layers = Vec::new();
diff --git a/src/storage/fxfs/src/object_store.rs b/src/storage/fxfs/src/object_store.rs
index 605846c..c594088 100644
--- a/src/storage/fxfs/src/object_store.rs
+++ b/src/storage/fxfs/src/object_store.rs
@@ -7,6 +7,7 @@
 pub mod directory;
 pub mod filesystem;
 pub mod fsck;
+mod graveyard;
 mod journal;
 mod merge;
 mod record;
@@ -26,7 +27,7 @@
         },
         errors::FxfsError,
         lsm_tree::{types::LayerIterator, LSMTree},
-        object_handle::{ObjectHandle, ObjectHandleExt},
+        object_handle::{ObjectHandle, ObjectHandleExt, INVALID_OBJECT_ID},
         object_store::{
             filesystem::{Filesystem, Mutations, ObjectFlush},
             record::{
@@ -50,7 +51,7 @@
         cmp::min,
         ops::{Bound, Range},
         sync::{
-            atomic::{self, AtomicBool},
+            atomic::{self, AtomicBool, AtomicU64},
             Arc, Mutex, Weak,
         },
     },
@@ -60,7 +61,8 @@
 // store, and is used, for example, to get the persistent layer objects.
 #[derive(Clone, Debug, Default, Serialize, Deserialize)]
 pub struct StoreInfo {
-    // The last used object ID.
+    // The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
+    // last_object_id field is the one to use in that case.
     last_object_id: u64,
 
     // Object ids for layers.  TODO(csuter): need a layer of indirection here so we can
@@ -71,6 +73,7 @@
     root_directory_object_id: u64,
 
     // The object ID for the graveyard.
+    // TODO(csuter): Move this out of here.  This can probably be a child of the root directory.
     graveyard_directory_object_id: u64,
 }
 
@@ -94,7 +97,8 @@
     device: Arc<dyn Device>,
     block_size: u32,
     filesystem: Weak<dyn Filesystem>,
-    store_info: Mutex<StoreInfo>,
+    last_object_id: AtomicU64,
+    store_info: Mutex<Option<StoreInfo>>,
     tree: LSMTree<ObjectKey, ObjectValue>,
 
     // When replaying the journal, the store cannot read StoreInfo until the whole journal
@@ -109,7 +113,7 @@
         parent_store: Option<Arc<ObjectStore>>,
         store_object_id: u64,
         filesystem: Arc<dyn Filesystem>,
-        store_info: StoreInfo,
+        store_info: Option<StoreInfo>,
         tree: LSMTree<ObjectKey, ObjectValue>,
     ) -> Arc<ObjectStore> {
         let device = filesystem.device();
@@ -120,6 +124,7 @@
             device,
             block_size,
             filesystem: Arc::downgrade(&filesystem),
+            last_object_id: AtomicU64::new(0),
             store_info: Mutex::new(store_info),
             tree,
             store_info_handle: OnceCell::new(),
@@ -137,7 +142,7 @@
             parent_store,
             store_object_id,
             filesystem,
-            StoreInfo::default(),
+            Some(StoreInfo::default()),
             LSMTree::new(merge::merge),
         )
     }
@@ -159,7 +164,7 @@
     }
 
     pub fn root_directory_object_id(&self) -> u64 {
-        self.store_info.lock().unwrap().root_directory_object_id
+        self.store_info.lock().unwrap().as_ref().unwrap().root_directory_object_id
     }
 
     pub fn set_root_directory_object_id<'a>(&'a self, transaction: &mut Transaction<'a>, oid: u64) {
@@ -169,7 +174,7 @@
     }
 
     pub fn graveyard_directory_object_id(&self) -> u64 {
-        self.store_info.lock().unwrap().graveyard_directory_object_id
+        self.store_info.lock().unwrap().as_ref().unwrap().graveyard_directory_object_id
     }
 
     pub fn set_graveyard_directory_object_id<'a>(
@@ -226,7 +231,7 @@
             Some(self.clone()),
             store_object_id,
             self.filesystem.upgrade().unwrap(),
-            StoreInfo::default(),
+            None,
             LSMTree::new(merge::merge),
         )
     }
@@ -275,6 +280,10 @@
     ) -> Result<StoreObjectHandle<S>, Error> {
         let store = owner.as_ref().as_ref();
         store.ensure_open().await?;
+        // If the object ID was specified i.e. this hasn't come via create_object, then we
+        // should update last_object_id in case the caller wants to create more objects in
+        // the same transaction.
+        store.update_last_object_id(object_id);
         transaction.add(
             store.store_object_id(),
             Mutation::insert_object(ObjectKey::object(object_id), ObjectValue::file(1, 0)),
@@ -370,6 +379,33 @@
         Ok(())
     }
 
+    /// Returns all objects that exist in the parent store that pertain to this object store.
+    pub fn parent_objects(&self) -> Vec<u64> {
+        assert!(self.store_info_handle.get().is_some());
+        let mut objects = Vec::new();
+        // We should not include the ID of the store itself, since that should be referred to in the
+        // volume directory.
+        objects.extend_from_slice(&self.store_info.lock().unwrap().as_ref().unwrap().layers);
+        objects
+    }
+
+    /// Returns root objects for this store.
+    pub fn root_objects(&self) -> Vec<u64> {
+        let mut objects = Vec::new();
+        let store_info = self.store_info.lock().unwrap();
+        if store_info.as_ref().unwrap().root_directory_object_id != INVALID_OBJECT_ID {
+            objects.push(store_info.as_ref().unwrap().root_directory_object_id);
+        }
+        if store_info.as_ref().unwrap().graveyard_directory_object_id != INVALID_OBJECT_ID {
+            objects.push(store_info.as_ref().unwrap().graveyard_directory_object_id);
+        }
+        objects
+    }
+
+    pub fn store_info(&self) -> StoreInfo {
+        self.store_info.lock().unwrap().as_ref().unwrap().clone()
+    }
+
     async fn ensure_open(&self) -> Result<(), Error> {
         if self.parent_store.is_none() || self.store_info_handle.get().is_some() {
             return Ok(());
@@ -390,23 +426,25 @@
                 HandleOptions::default(),
             )
             .await?;
-            if handle.get_size() > 0 {
+            let need_store_info = self.store_info.lock().unwrap().is_none();
+            let layer_object_ids = if need_store_info && handle.get_size() > 0 {
                 let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
                 let store_info: StoreInfo = deserialize_from(&serialized_info[..])?;
-                let mut handles = Vec::new();
-                for object_id in &store_info.layers {
-                    handles.push(
-                        ObjectStore::open_object(
-                            &parent_store,
-                            *object_id,
-                            HandleOptions::default(),
-                        )
+                let layer_object_ids = store_info.layers.clone();
+                self.update_last_object_id(store_info.last_object_id);
+                *self.store_info.lock().unwrap() = Some(store_info);
+                layer_object_ids
+            } else {
+                self.store_info.lock().unwrap().as_ref().unwrap().layers.clone()
+            };
+            let mut handles = Vec::new();
+            for object_id in layer_object_ids {
+                handles.push(
+                    ObjectStore::open_object(&parent_store, object_id, HandleOptions::default())
                         .await?,
-                    );
-                }
-                self.tree.set_layers(handles.into()).await?;
-                self.update_store_info(store_info);
+                );
             }
+            self.tree.append_layers(handles.into()).await?;
             let _ = self.store_info_handle.set(handle);
             Ok(())
         }
@@ -414,9 +452,7 @@
     }
 
     fn get_next_object_id(&self) -> u64 {
-        let mut store_info = self.store_info.lock().unwrap();
-        store_info.last_object_id += 1;
-        store_info.last_object_id
+        self.last_object_id.fetch_add(1, atomic::Ordering::Relaxed) + 1
     }
 
     fn allocator(&self) -> Arc<dyn Allocator> {
@@ -425,19 +461,17 @@
 
     fn txn_get_store_info(&self, transaction: &Transaction<'_>) -> StoreInfo {
         match transaction.get_store_info(self.store_object_id) {
-            None => self.store_info.lock().unwrap().clone(),
+            None => self.store_info(),
             Some(store_info) => store_info.clone(),
         }
     }
 
-    // The last object ID is updated as we create new objects, so we need to always take the maximum
-    // rather than blindly overwriting the last_object_id field.
-    fn update_store_info(&self, mut new_store_info: StoreInfo) {
-        let mut store_info = self.store_info.lock().unwrap();
-        if store_info.last_object_id > new_store_info.last_object_id {
-            new_store_info.last_object_id = store_info.last_object_id;
-        }
-        *store_info = new_store_info;
+    fn update_last_object_id(&self, object_id: u64) {
+        let _ = self.last_object_id.fetch_update(
+            atomic::Ordering::Relaxed,
+            atomic::Ordering::Relaxed,
+            |oid| if object_id > oid { Some(object_id) } else { None },
+        );
     }
 }
 
@@ -453,12 +487,7 @@
 
         match mutation {
             Mutation::ObjectStore(ObjectStoreMutation { item, op }) => {
-                {
-                    let mut store_info = self.store_info.lock().unwrap();
-                    if item.key.object_id > store_info.last_object_id {
-                        store_info.last_object_id = item.key.object_id;
-                    }
-                }
+                self.update_last_object_id(item.key.object_id);
                 match op {
                     Operation::Insert => self.tree.insert(item).await,
                     Operation::ReplaceOrInsert => self.tree.replace_or_insert(item).await,
@@ -468,12 +497,8 @@
                     }
                 }
             }
-            Mutation::ObjectStoreInfo(StoreInfoMutation(mut store_info)) => {
-                let mut info = self.store_info.lock().unwrap();
-                if info.last_object_id > store_info.last_object_id {
-                    store_info.last_object_id = info.last_object_id;
-                }
-                *info = store_info;
+            Mutation::ObjectStoreInfo(StoreInfoMutation(store_info)) => {
+                *self.store_info.lock().unwrap() = Some(store_info);
             }
             Mutation::TreeSeal => self.tree.seal().await,
             Mutation::TreeCompact => {
@@ -503,33 +528,47 @@
         if !object_manager.needs_flush(self.store_object_id) {
             return Ok(());
         }
-        let object_sync = ObjectFlush::new(object_manager, self.store_object_id);
+
         let parent_store = self.parent_store.as_ref().unwrap();
+        let graveyard = object_manager.graveyard().ok_or(anyhow!("Missing graveyard!"))?;
+
+        let object_sync = ObjectFlush::new(object_manager, self.store_object_id);
         let mut transaction = filesystem.clone().new_transaction(&[]).await?;
         let object_handle =
             ObjectStore::create_object(parent_store, &mut transaction, HandleOptions::default())
                 .await?;
+        let object_id = object_handle.object_id();
+        graveyard.add(&mut transaction, parent_store.store_object_id(), object_id);
         transaction.add_with_object(self.store_object_id(), Mutation::TreeSeal, &object_sync);
         transaction.commit().await;
 
-        let object_id = object_handle.object_id();
         self.tree.compact(&object_handle).await?;
 
         let mut serialized_info = Vec::new();
-        let mut new_store_info = self.store_info.lock().unwrap().clone();
+        let mut new_store_info = self.store_info();
+
+        let mut transaction = filesystem.clone().new_transaction(&[]).await?;
+
+        // Move all the existing layers to the graveyard.
+        for object_id in new_store_info.layers {
+            graveyard.add(&mut transaction, parent_store.store_object_id(), object_id);
+        }
+
+        new_store_info.last_object_id = self.last_object_id.load(atomic::Ordering::Relaxed);
         new_store_info.layers = vec![object_id];
         serialize_into(&mut serialized_info, &new_store_info)?;
         let mut buf = self.device.allocate_buffer(serialized_info.len());
         buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
 
-        let mut transaction = filesystem.clone().new_transaction(&[]).await?;
         self.store_info_handle
             .get()
             .unwrap()
             .txn_write(&mut transaction, 0u64, buf.as_ref())
             .await?;
         transaction.add(self.store_object_id(), Mutation::TreeCompact);
-        self.update_store_info(new_store_info);
+        graveyard.remove(&mut transaction, parent_store.store_object_id(), object_id);
+        // TODO(csuter): This isn't thread-safe.
+        *self.store_info.lock().unwrap() = Some(new_store_info);
         transaction.commit().await;
 
         self.tree.set_layers(Box::new([object_handle])).await.expect("set_layers failed");
@@ -549,7 +588,7 @@
     fn will_apply_mutation(&self, mutation: &Mutation) {
         match mutation {
             Mutation::ObjectStoreInfo(StoreInfoMutation(store_info)) => {
-                self.update_store_info(store_info.clone());
+                *self.store_info.lock().unwrap() = Some(store_info.clone());
             }
             _ => {}
         }
@@ -1269,6 +1308,7 @@
             object_handle::{ObjectHandle, ObjectHandleExt},
             object_store::{
                 filesystem::{Filesystem, Mutations},
+                graveyard::Graveyard,
                 record::{ObjectKey, ObjectKeyData},
                 round_up,
                 testing::{fake_allocator::FakeAllocator, fake_filesystem::FakeFilesystem},
@@ -1308,6 +1348,9 @@
             .create_child_store_with_id(&mut transaction, 3)
             .await
             .expect("create_child_store failed");
+        let graveyard =
+            Arc::new(Graveyard::create(&mut transaction, &store).await.expect("create failed"));
+        fs.object_manager().register_graveyard(graveyard);
         transaction.commit().await;
         (fs.clone(), allocator, store)
     }
diff --git a/src/storage/fxfs/src/object_store/allocator.rs b/src/storage/fxfs/src/object_store/allocator.rs
index 0f6478c..6f821d3 100644
--- a/src/storage/fxfs/src/object_store/allocator.rs
+++ b/src/storage/fxfs/src/object_store/allocator.rs
@@ -22,7 +22,7 @@
             HandleOptions, ObjectStore,
         },
     },
-    anyhow::{bail, ensure, Error},
+    anyhow::{anyhow, bail, ensure, Error},
     async_trait::async_trait,
     bincode::{deserialize_from, serialize_into},
     merge::merge,
@@ -220,13 +220,20 @@
                     );
                 }
                 self.inner.lock().unwrap().info = info;
-                self.tree.set_layers(handles.into_boxed_slice()).await?;
+                self.tree.append_layers(handles.into_boxed_slice()).await?;
             }
         }
 
         self.inner.lock().unwrap().opened = true;
         Ok(())
     }
+
+    /// Returns all objects that exist in the parent store that pertain to this allocator.
+    pub fn parent_objects(&self) -> Vec<u64> {
+        // The allocator tree needs to store a file for each of the layers in the tree, so we return
+        // those, since nothing else references them.
+        self.inner.lock().unwrap().info.layers.clone()
+    }
 }
 
 #[async_trait]
@@ -354,6 +361,7 @@
         if !object_manager.needs_flush(self.object_id()) {
             return Ok(());
         }
+        let graveyard = object_manager.graveyard().ok_or(anyhow!("Missing graveyard!"))?;
         let object_sync = ObjectFlush::new(object_manager, self.object_id());
         // TODO(csuter): This all needs to be atomic somehow. We'll need to use different
         // transactions for each stage, but we need make sure objects are cleaned up if there's a
@@ -364,11 +372,11 @@
         let layer_object_handle =
             ObjectStore::create_object(&root_store, &mut transaction, HandleOptions::default())
                 .await?;
-
+        let object_id = layer_object_handle.object_id();
+        graveyard.add(&mut transaction, root_store.store_object_id(), object_id);
         transaction.add_with_object(self.object_id(), Mutation::TreeSeal, &object_sync);
         transaction.commit().await;
 
-        let object_id = layer_object_handle.object_id();
         let layer_set = self.tree.immutable_layer_set();
         let mut merger = layer_set.merger();
         self.tree
@@ -382,20 +390,28 @@
         let object_handle =
             ObjectStore::open_object(&root_store, self.object_id(), HandleOptions::default())
                 .await?;
+
         // TODO(jfsulliv): Can we preallocate the buffer instead of doing a bounce? Do we know the
         // size up front?
+        let mut transaction = filesystem.clone().new_transaction(&[]).await?;
         let mut serialized_info = Vec::new();
         {
             let mut inner = self.inner.lock().unwrap();
+
+            // Move all the existing layers to the graveyard.
+            for object_id in &inner.info.layers {
+                graveyard.add(&mut transaction, root_store.store_object_id(), *object_id);
+            }
+
             inner.info.layers = vec![object_id];
             serialize_into(&mut serialized_info, &inner.info)?;
         }
         let mut buf = object_handle.allocate_buffer(serialized_info.len());
         buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
-        let mut transaction = filesystem.clone().new_transaction(&[]).await?;
         object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
 
         transaction.add(self.object_id(), Mutation::TreeCompact);
+        graveyard.remove(&mut transaction, root_store.store_object_id(), object_id);
         transaction.commit().await;
 
         // TODO(csuter): what if this fails.
@@ -482,6 +498,7 @@
                     SimpleAllocator,
                 },
                 filesystem::{Filesystem, Mutations},
+                graveyard::Graveyard,
                 testing::fake_filesystem::FakeFilesystem,
                 transaction::TransactionHandler,
                 ObjectStore,
@@ -665,9 +682,13 @@
         let fs = FakeFilesystem::new(device);
         let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
         fs.object_manager().set_allocator(allocator.clone());
-        let _store = ObjectStore::new_empty(None, 2, fs.clone());
+        let store = ObjectStore::new_empty(None, 2, fs.clone());
         fs.object_manager().set_root_store_object_id(2);
+        allocator.ensure_open().await.expect("ensure_open failed");
         let mut transaction = fs.clone().new_transaction(&[]).await.expect("new failed");
+        let graveyard =
+            Arc::new(Graveyard::create(&mut transaction, &store).await.expect("create failed"));
+        fs.object_manager().register_graveyard(graveyard);
         let mut device_ranges = Vec::new();
         device_ranges
             .push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
diff --git a/src/storage/fxfs/src/object_store/directory.rs b/src/storage/fxfs/src/object_store/directory.rs
index 46405a1..4ecf1cc 100644
--- a/src/storage/fxfs/src/object_store/directory.rs
+++ b/src/storage/fxfs/src/object_store/directory.rs
@@ -65,6 +65,7 @@
 
     pub async fn open(owner: &Arc<S>, object_id: u64) -> Result<Directory<S>, Error> {
         let store = owner.as_ref().as_ref();
+        store.ensure_open().await?;
         if let ObjectItem { value: ObjectValue::Object { kind: ObjectKind::Directory }, .. } =
             store.tree.find(&ObjectKey::object(object_id)).await?.ok_or(FxfsError::NotFound)?
         {
diff --git a/src/storage/fxfs/src/object_store/filesystem.rs b/src/storage/fxfs/src/object_store/filesystem.rs
index a121d95..165af6a 100644
--- a/src/storage/fxfs/src/object_store/filesystem.rs
+++ b/src/storage/fxfs/src/object_store/filesystem.rs
@@ -8,8 +8,8 @@
         object_handle::INVALID_OBJECT_ID,
         object_store::{
             allocator::Allocator,
-            directory::Directory,
-            journal::{Journal, JournalCheckpoint},
+            graveyard::Graveyard,
+            journal::{super_block::SuperBlock, Journal, JournalCheckpoint},
             transaction::{
                 AssociatedObject, LockKey, LockManager, Mutation, ReadGuard, Transaction,
                 TransactionHandler, TxnMutation, WriteGuard,
@@ -64,7 +64,7 @@
     // has a dependency on journal records from that offset.
     journal_file_checkpoints: HashMap<u64, JournalCheckpoint>,
 
-    graveyards: HashMap<u64, Arc<Directory<ObjectStore>>>,
+    graveyard: Option<Arc<Graveyard>>,
 }
 
 impl ObjectManager {
@@ -77,7 +77,7 @@
                 allocator_object_id: INVALID_OBJECT_ID,
                 allocator: None,
                 journal_file_checkpoints: HashMap::new(),
-                graveyards: HashMap::new(),
+                graveyard: None,
             }),
         }
     }
@@ -196,12 +196,12 @@
         self.objects.read().unwrap().journal_file_checkpoints.contains_key(&object_id)
     }
 
-    pub fn graveyard(&self, store_object_id: u64) -> Option<Arc<Directory<ObjectStore>>> {
-        self.objects.read().unwrap().graveyards.get(&store_object_id).cloned()
+    pub fn graveyard(&self) -> Option<Arc<Graveyard>> {
+        self.objects.read().unwrap().graveyard.clone()
     }
 
-    pub fn register_graveyard(&self, store_object_id: u64, directory: Arc<Directory<ObjectStore>>) {
-        self.objects.write().unwrap().graveyards.insert(store_object_id, directory);
+    pub fn register_graveyard(&self, graveyard: Arc<Graveyard>) {
+        self.objects.write().unwrap().graveyard = Some(graveyard);
     }
 
     /// Flushes all known objects.  This will then allow the journal space to be freed.
@@ -330,9 +330,13 @@
         Ok(filesystem)
     }
 
-    pub async fn open(device: DeviceHolder) -> Result<Arc<FxFilesystem>, Error> {
+    pub async fn open_with_trace(
+        device: DeviceHolder,
+        trace: bool,
+    ) -> Result<Arc<FxFilesystem>, Error> {
         let objects = Arc::new(ObjectManager::new());
         let journal = Journal::new(objects.clone());
+        journal.set_trace(trace);
         let filesystem = Arc::new(FxFilesystem {
             device: OnceCell::new(),
             objects: objects.clone(),
@@ -346,6 +350,14 @@
         Ok(filesystem)
     }
 
+    pub fn set_trace(&self, v: bool) {
+        self.journal.set_trace(v);
+    }
+
+    pub async fn open(device: DeviceHolder) -> Result<Arc<FxFilesystem>, Error> {
+        Self::open_with_trace(device, false).await
+    }
+
     pub fn root_parent_store(&self) -> Arc<ObjectStore> {
         self.objects.root_parent_store()
     }
@@ -403,6 +415,10 @@
         receiver.await.unwrap()
     }
 
+    pub fn super_block(&self) -> SuperBlock {
+        self.journal.super_block()
+    }
+
     async fn wait_for_compaction_to_finish(&self) {
         let compaction_task = self.compaction_task.lock().unwrap().take();
         if let Some(compaction_task) = compaction_task {
@@ -486,10 +502,10 @@
             device::DeviceHolder,
             object_handle::{ObjectHandle, ObjectHandleExt},
             object_store::{
+                directory::Directory,
                 filesystem::{FxFilesystem, SyncOptions},
                 fsck::fsck,
                 transaction::TransactionHandler,
-                HandleOptions, ObjectStore,
             },
             testing::fake_device::FakeDevice,
         },
@@ -505,17 +521,19 @@
 
         // If compaction is not working correctly, this test will run out of space.
         let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
+        let root_store = fs.root_store();
+        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
+            .await
+            .expect("open failed");
+
         let mut tasks = Vec::new();
-        for _ in 0..2 {
+        for i in 0..2 {
             let mut transaction =
                 fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
-            let handle = ObjectStore::create_object(
-                &fs.root_store(),
-                &mut transaction,
-                HandleOptions::default(),
-            )
-            .await
-            .expect("create_object failed");
+            let handle = root_directory
+                .create_child_file(&mut transaction, &format!("{}", i))
+                .await
+                .expect("create_child_file failed");
             transaction.commit().await;
             tasks.push(fasync::Task::spawn(async move {
                 const TEST_DATA: &[u8] = b"hello";
diff --git a/src/storage/fxfs/src/object_store/fsck.rs b/src/storage/fxfs/src/object_store/fsck.rs
index 61925e2..d5c6fe9 100644
--- a/src/storage/fxfs/src/object_store/fsck.rs
+++ b/src/storage/fxfs/src/object_store/fsck.rs
@@ -6,18 +6,27 @@
     crate::{
         lsm_tree::{
             skip_list_layer::SkipListLayer,
-            types::{Item, Layer, LayerIterator, MutableLayer},
+            types::{Item, ItemRef, Layer, LayerIterator, MutableLayer},
         },
         object_store::{
             allocator::{self, AllocatorKey, AllocatorValue, CoalescingIterator, SimpleAllocator},
+            constants::SUPER_BLOCK_OBJECT_ID,
             filesystem::{Filesystem, FxFilesystem},
-            record::ExtentValue,
+            graveyard::Graveyard,
+            record::{
+                AttributeKey, ExtentValue, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue,
+            },
             transaction::LockKey,
+            ObjectStore,
         },
     },
-    anyhow::{bail, Error},
+    anyhow::{anyhow, bail, Error},
     futures::try_join,
-    std::ops::Bound,
+    std::{
+        collections::hash_map::{Entry, HashMap},
+        ops::Bound,
+        sync::Arc,
+    },
 };
 
 // TODO(csuter): for now, this just checks allocations. We should think about adding checks for:
@@ -41,43 +50,51 @@
     let _guard = filesystem.write_lock(&[LockKey::Filesystem]).await;
 
     let object_manager = filesystem.object_manager();
-    let skip_list = SkipListLayer::new(2048); // TODO(csuter): fix magic number
+    let graveyard = object_manager.graveyard().ok_or(anyhow!("Missing graveyard!"))?;
+    let fsck = Fsck::new();
+    let super_block = filesystem.super_block();
+
+    // Scan the root parent object store.
+    let mut root_objects = vec![super_block.root_store_object_id, super_block.journal_object_id];
+    root_objects.append(&mut object_manager.root_store().parent_objects());
+    fsck.scan_store(&object_manager.root_parent_store(), &root_objects, &graveyard).await?;
+
+    let root_store = &object_manager.root_store();
+    let mut root_store_root_objects = Vec::new();
+    root_store_root_objects
+        .append(&mut vec![super_block.allocator_object_id, SUPER_BLOCK_OBJECT_ID]);
+    root_store_root_objects.append(&mut root_store.root_objects());
 
     // TODO(csuter): We could maybe iterate over stores concurrently.
     for store_id in object_manager.store_object_ids() {
+        if store_id == super_block.root_parent_store_object_id
+            || store_id == super_block.root_store_object_id
+        {
+            continue;
+        }
         let store = object_manager.store(store_id).expect("store disappeared!");
         store.ensure_open().await?;
-        let layer_set = store.tree.layer_set();
-        let mut merger = layer_set.merger();
-        let mut iter = merger.seek(Bound::Unbounded).await?;
-        while let Some(item_ref) = iter.get() {
-            match item_ref.into() {
-                Some((_, _, extent_key, ExtentValue { device_offset: Some(device_offset) })) => {
-                    let item = Item::new(
-                        AllocatorKey {
-                            device_range: *device_offset
-                                ..*device_offset + extent_key.range.end - extent_key.range.start,
-                        },
-                        AllocatorValue { delta: 1 },
-                    );
-                    let lower_bound = item.key.lower_bound_for_merge_into();
-                    skip_list.merge_into(item, &lower_bound, allocator::merge::merge).await;
-                }
-                _ => {}
-            }
-            iter.advance().await?;
-        }
+        fsck.scan_store(&store, &store.root_objects(), &graveyard).await?;
+        let mut parent_objects = store.parent_objects();
+        root_store_root_objects.append(&mut parent_objects);
     }
-    // Now compare our regenerated allocation map with what we actually have.
+
     // TODO(csuter): It's a bit crude how details of SimpleAllocator are leaking here. Is there
     // a better way?
     let allocator = filesystem.allocator().as_any().downcast::<SimpleAllocator>().unwrap();
     allocator.ensure_open().await?;
+    root_store_root_objects.append(&mut allocator.parent_objects());
+
+    // Finally scan the root object store.
+    fsck.scan_store(root_store, &root_store_root_objects, &graveyard).await?;
+
+    // Now compare our regenerated allocation map with what we actually have.
     let layer_set = allocator.tree().layer_set();
     let mut merger = layer_set.merger();
     let iter = merger.seek(Bound::Unbounded).await?;
     let mut actual = CoalescingIterator::new(Box::new(iter)).await?;
-    let mut expected = CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await?).await?;
+    let mut expected =
+        CoalescingIterator::new(fsck.allocations.seek(Bound::Unbounded).await?).await?;
     while let Some(actual_item) = actual.get() {
         match expected.get() {
             None => bail!("found extra allocation {:?}", actual_item),
@@ -95,6 +112,108 @@
     Ok(())
 }
 
+struct Fsck {
+    allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
+}
+
+impl Fsck {
+    fn new() -> Self {
+        Fsck { allocations: SkipListLayer::new(2048) } // TODO(csuter): fix magic number
+    }
+
+    pub async fn scan_store(
+        &self,
+        store: &ObjectStore,
+        root_objects: &[u64],
+        graveyard: &Graveyard,
+    ) -> Result<(), Error> {
+        let mut object_refs: HashMap<u64, (u64, u64)> = HashMap::new();
+
+        // Add all the graveyard references.
+        let layer_set = graveyard.store().tree().layer_set();
+        let mut merger = layer_set.merger();
+        let mut iter = graveyard.iter_from(&mut merger, (store.store_object_id(), 0)).await?;
+        while let Some((store_object_id, object_id)) = iter.get() {
+            if store_object_id != store.store_object_id() {
+                break;
+            }
+            object_refs.insert(object_id, (0, 1));
+            iter.advance().await?;
+        }
+
+        let layer_set = store.tree.layer_set();
+        let mut merger = layer_set.merger();
+        let mut iter = merger.seek(Bound::Unbounded).await?;
+        for root_object in root_objects {
+            object_refs.insert(*root_object, (0, 1));
+        }
+        while let Some(ItemRef { key, value }) = iter.get() {
+            match (key, value) {
+                (
+                    ObjectKey { object_id, data: ObjectKeyData::Object },
+                    ObjectValue::Object { kind },
+                ) => {
+                    let refs = match kind {
+                        ObjectKind::File { refs, .. } => *refs,
+                        ObjectKind::Directory | ObjectKind::Graveyard => 1,
+                    };
+                    match object_refs.entry(*object_id) {
+                        Entry::Occupied(mut occupied) => {
+                            occupied.get_mut().0 += refs;
+                        }
+                        Entry::Vacant(vacant) => {
+                            vacant.insert((refs, 0));
+                        }
+                    }
+                }
+                (
+                    ObjectKey {
+                        data: ObjectKeyData::Attribute(_, AttributeKey::Extent(extent_key)),
+                        ..
+                    },
+                    ObjectValue::Extent(ExtentValue { device_offset: Some(device_offset) }),
+                ) => {
+                    let item = Item::new(
+                        AllocatorKey {
+                            device_range: *device_offset
+                                ..*device_offset + extent_key.range.end - extent_key.range.start,
+                        },
+                        AllocatorValue { delta: 1 },
+                    );
+                    let lower_bound = item.key.lower_bound_for_merge_into();
+                    self.allocations.merge_into(item, &lower_bound, allocator::merge::merge).await;
+                }
+                (
+                    ObjectKey { data: ObjectKeyData::Child { .. }, .. },
+                    ObjectValue::Child { object_id, .. },
+                ) => match object_refs.entry(*object_id) {
+                    Entry::Occupied(mut occupied) => {
+                        occupied.get_mut().1 += 1;
+                    }
+                    Entry::Vacant(vacant) => {
+                        vacant.insert((0, 1));
+                    }
+                },
+                _ => {}
+            }
+            iter.advance().await?;
+        }
+        // Check object reference counts.
+        for (object_id, (count, references)) in object_refs {
+            if count != references {
+                bail!(
+                    "object {}.{} reference count mismatch: actual: {}, expected: {}",
+                    store.store_object_id(),
+                    object_id,
+                    count,
+                    references
+                );
+            }
+        }
+        Ok(())
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use {
@@ -102,12 +221,16 @@
         crate::{
             device::DeviceHolder,
             lsm_tree::types::{Item, ItemRef, LayerIterator},
+            object_handle::ObjectHandle,
             object_store::{
                 allocator::{
                     Allocator, AllocatorKey, AllocatorValue, CoalescingIterator, SimpleAllocator,
                 },
+                directory::Directory,
                 filesystem::{Filesystem, FxFilesystem},
+                record::ObjectDescriptor,
                 transaction::TransactionHandler,
+                HandleOptions, ObjectStore,
             },
             testing::fake_device::FakeDevice,
         },
@@ -190,4 +313,66 @@
         let error = format!("{}", fsck(&fs).await.expect_err("fsck succeeded"));
         assert!(error.contains("missing allocation"), "{}", error);
     }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_too_many_object_refs() {
+        let fs = FxFilesystem::new_empty(DeviceHolder::new(FakeDevice::new(
+            2048,
+            TEST_DEVICE_BLOCK_SIZE,
+        )))
+        .await
+        .expect("new_empty failed");
+
+        let root_store = fs.root_store();
+        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
+            .await
+            .expect("open failed");
+
+        let mut transaction =
+            fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
+        let child_file = root_directory
+            .create_child_file(&mut transaction, "child_file")
+            .await
+            .expect("create_child_file failed");
+        let child_dir = root_directory
+            .create_child_dir(&mut transaction, "child_dir")
+            .await
+            .expect("create_child_directory failed");
+
+        // Add an extra reference to the child file.
+        child_dir.insert_child(
+            &mut transaction,
+            "test",
+            child_file.object_id(),
+            ObjectDescriptor::File,
+        );
+        transaction.commit().await;
+
+        let error = format!("{}", fsck(&fs).await.expect_err("fsck succeeded"));
+        assert!(error.contains("reference count mismatch"), "{}", error);
+    }
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_too_few_object_refs() {
+        let fs = FxFilesystem::new_empty(DeviceHolder::new(FakeDevice::new(
+            2048,
+            TEST_DEVICE_BLOCK_SIZE,
+        )))
+        .await
+        .expect("new_empty failed");
+
+        let root_store = fs.root_store();
+
+        // Create an object but no directory entry referencing that object, so it will end up with a
+        // reference count of one, but zero references.
+        let mut transaction =
+            fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
+        ObjectStore::create_object(&root_store, &mut transaction, HandleOptions::default())
+            .await
+            .expect("create_object failed");
+        transaction.commit().await;
+
+        let error = format!("{}", fsck(&fs).await.expect_err("fsck succeeded"));
+        assert!(error.contains("reference count mismatch"), "{}", error);
+    }
 }
diff --git a/src/storage/fxfs/src/object_store/graveyard.rs b/src/storage/fxfs/src/object_store/graveyard.rs
new file mode 100644
index 0000000..1db0c25
--- /dev/null
+++ b/src/storage/fxfs/src/object_store/graveyard.rs
@@ -0,0 +1,220 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    crate::{
+        errors::FxfsError,
+        lsm_tree::{
+            merge::{Merger, MergerIterator},
+            types::{ItemRef, LayerIterator},
+        },
+        object_store::{
+            record::{ObjectItem, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue},
+            transaction::{Mutation, Transaction},
+            ObjectStore,
+        },
+    },
+    anyhow::{bail, Error},
+    std::{ops::Bound, sync::Arc},
+};
+
+/// A graveyard exists as a place to park objects that should be deleted when they are no longer in
+/// use.  How objects enter and leave the graveyard is up to the caller to decide.  The intention is
+/// that at mount time, any objects in the graveyard will get removed.
+pub struct Graveyard {
+    store: Arc<ObjectStore>,
+    object_id: u64,
+}
+
+impl Graveyard {
+    pub fn store(&self) -> &Arc<ObjectStore> {
+        &self.store
+    }
+
+    pub fn object_id(&self) -> u64 {
+        self.object_id
+    }
+
+    /// Creates a graveyard object in `store`.
+    pub async fn create(
+        transaction: &mut Transaction<'_>,
+        store: &Arc<ObjectStore>,
+    ) -> Result<Graveyard, Error> {
+        store.ensure_open().await?;
+        let object_id = store.get_next_object_id();
+        transaction.add(
+            store.store_object_id,
+            Mutation::insert_object(
+                ObjectKey::object(object_id),
+                ObjectValue::Object { kind: ObjectKind::Graveyard },
+            ),
+        );
+        Ok(Graveyard { store: store.clone(), object_id })
+    }
+
+    /// Opens a graveyard object in `store`.
+    pub async fn open(store: &Arc<ObjectStore>, object_id: u64) -> Result<Graveyard, Error> {
+        store.ensure_open().await?;
+        if let ObjectItem { value: ObjectValue::Object { kind: ObjectKind::Graveyard }, .. } =
+            store.tree.find(&ObjectKey::object(object_id)).await?.ok_or(FxfsError::NotFound)?
+        {
+            Ok(Graveyard { store: store.clone(), object_id })
+        } else {
+            bail!("Found an object, but it's not a graveyard");
+        }
+    }
+
+    /// Adds an object to the graveyard.
+    pub fn add(&self, transaction: &mut Transaction<'_>, store_object_id: u64, object_id: u64) {
+        transaction.add(
+            self.store.store_object_id(),
+            Mutation::replace_or_insert_object(
+                ObjectKey::graveyard_entry(self.object_id, store_object_id, object_id),
+                ObjectValue::Some,
+            ),
+        );
+    }
+
+    /// Removes an object from the graveyard.
+    pub fn remove(&self, transaction: &mut Transaction<'_>, store_object_id: u64, object_id: u64) {
+        transaction.add(
+            self.store.store_object_id(),
+            Mutation::replace_or_insert_object(
+                ObjectKey::graveyard_entry(self.object_id, store_object_id, object_id),
+                ObjectValue::None,
+            ),
+        );
+    }
+
+    /// Returns an iterator that will return graveyard entries skipping deleted ones.  Example
+    /// usage:
+    ///
+    ///   let layer_set = graveyard.store().tree().layer_set();
+    ///   let mut merger = layer_set.merger();
+    ///   let mut iter = graveyard.iter(&mut merger).await?;
+    ///
+    pub async fn iter<'a, 'b>(
+        &self,
+        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
+    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
+        self.iter_from(merger, (0, 0)).await
+    }
+
+    /// Like "iter", but seeks from a specific (store-id, object-id) tuple.  Example usage:
+    ///
+    ///   let layer_set = graveyard.store().tree().layer_set();
+    ///   let mut merger = layer_set.merger();
+    ///   let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await?;
+    ///
+    pub async fn iter_from<'a, 'b>(
+        &self,
+        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
+        from: (u64, u64),
+    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
+        let mut iter = merger
+            .seek(Bound::Included(&ObjectKey::graveyard_entry(self.object_id, from.0, from.1)))
+            .await?;
+        // Skip deleted entries.
+        // TODO(csuter): Remove this once we've developed a filtering iterator.
+        loop {
+            match iter.get() {
+                Some(ItemRef { key: ObjectKey { object_id, .. }, value: ObjectValue::None })
+                    if *object_id == self.object_id => {}
+                _ => break,
+            }
+            iter.advance().await?;
+        }
+        Ok(GraveyardIterator { object_id: self.object_id, iter })
+    }
+}
+
+pub struct GraveyardIterator<'a, 'b> {
+    object_id: u64,
+    iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
+}
+
+impl GraveyardIterator<'_, '_> {
+    pub fn get(&self) -> Option<(u64, u64)> {
+        match self.iter.get() {
+            Some(ItemRef {
+                key:
+                    ObjectKey {
+                        object_id: oid,
+                        data: ObjectKeyData::GraveyardEntry { store_object_id, object_id },
+                    },
+                ..
+            }) if *oid == self.object_id => Some((*store_object_id, *object_id)),
+            _ => None,
+        }
+    }
+
+    pub async fn advance(&mut self) -> Result<(), Error> {
+        loop {
+            self.iter.advance().await?;
+            // Skip deleted entries.
+            match self.iter.get() {
+                Some(ItemRef { key: ObjectKey { object_id, .. }, value: ObjectValue::None })
+                    if *object_id == self.object_id => {}
+                _ => return Ok(()),
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use {
+        super::Graveyard,
+        crate::{
+            device::DeviceHolder,
+            object_store::{filesystem::FxFilesystem, transaction::TransactionHandler},
+            testing::fake_device::FakeDevice,
+        },
+        fuchsia_async as fasync,
+    };
+
+    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
+
+    #[fasync::run_singlethreaded(test)]
+    async fn test_graveyard() {
+        let device = DeviceHolder::new(FakeDevice::new(2048, TEST_DEVICE_BLOCK_SIZE));
+        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
+        let root_store = fs.root_store();
+
+        // Create and add two objects to the graveyard.
+        let mut transaction =
+            fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
+        let graveyard =
+            Graveyard::create(&mut transaction, &root_store).await.expect("create failed");
+        graveyard.add(&mut transaction, 2, 3);
+        graveyard.add(&mut transaction, 3, 4);
+        transaction.commit().await;
+
+        // Reopen the graveyard and check that we see the objects we added.
+        let graveyard =
+            Graveyard::open(&root_store, graveyard.object_id()).await.expect("open failed");
+        let layer_set = graveyard.store().tree().layer_set();
+        let mut merger = layer_set.merger();
+        let mut iter = graveyard.iter(&mut merger).await.expect("iter failed");
+        assert_eq!(iter.get().expect("missing entry"), (2, 3));
+        iter.advance().await.expect("advance failed");
+        assert_eq!(iter.get().expect("missing entry"), (3, 4));
+        iter.advance().await.expect("advance failed");
+        assert_eq!(iter.get(), None);
+
+        // Remove one of the objects.
+        let mut transaction =
+            fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
+        graveyard.remove(&mut transaction, 3, 4);
+        transaction.commit().await;
+
+        // Check that the graveyard has been updated as expected.
+        let layer_set = graveyard.store().tree().layer_set();
+        let mut merger = layer_set.merger();
+        let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await.expect("iter failed");
+        assert_eq!(iter.get().expect("missing entry"), (2, 3));
+        iter.advance().await.expect("advance failed");
+        assert_eq!(iter.get(), None);
+    }
+}
diff --git a/src/storage/fxfs/src/object_store/journal.rs b/src/storage/fxfs/src/object_store/journal.rs
index 8994c01..c5406e2 100644
--- a/src/storage/fxfs/src/object_store/journal.rs
+++ b/src/storage/fxfs/src/object_store/journal.rs
@@ -16,7 +16,7 @@
 // same per-block checksum that is used for the journal file.
 
 mod reader;
-mod super_block;
+pub mod super_block;
 mod writer;
 
 use {
@@ -28,6 +28,7 @@
             constants::SUPER_BLOCK_OBJECT_ID,
             directory::Directory,
             filesystem::{Filesystem, Mutations, ObjectFlush, ObjectManager, SyncOptions},
+            graveyard::Graveyard,
             journal::{
                 reader::{JournalReader, ReadResult},
                 super_block::SuperBlock,
@@ -48,7 +49,10 @@
     std::{
         clone::Clone,
         iter::IntoIterator,
-        sync::{Arc, Mutex},
+        sync::{
+            atomic::{self, AtomicBool},
+            Arc, Mutex,
+        },
         vec::Vec,
     },
 };
@@ -137,6 +141,7 @@
     objects: Arc<ObjectManager>,
     writer: futures::lock::Mutex<JournalWriter<StoreObjectHandle<ObjectStore>>>,
     inner: Mutex<Inner>,
+    trace: AtomicBool,
 }
 
 struct Inner {
@@ -160,9 +165,14 @@
                 super_block: SuperBlock::default(),
                 should_flush: false,
             }),
+            trace: AtomicBool::new(false),
         }
     }
 
+    pub fn set_trace(&self, v: bool) {
+        self.trace.store(v, atomic::Ordering::Relaxed);
+    }
+
     /// Reads a super-block and then replays journaled records.
     pub async fn replay(&self, filesystem: Arc<dyn Filesystem>) -> Result<(), Error> {
         let device = filesystem.device();
@@ -223,7 +233,9 @@
                         }
                         JournalRecord::Commit => {
                             if let Some(checkpoint) = journal_file_checkpoint.take() {
-                                log::debug!("REPLAY {}", checkpoint.file_offset);
+                                if self.trace.load(atomic::Ordering::Relaxed) {
+                                    log::info!("REPLAY {}", checkpoint.file_offset);
+                                }
                                 for (object_id, mutation) in mutations {
                                     // Snoop the mutations for any that might apply to the journal
                                     // file to ensure that we accurately track changes in size.
@@ -283,6 +295,18 @@
             }
             writer.seek_to_checkpoint(checkpoint);
         }
+
+        let root_store = self.objects.root_store();
+        root_store.ensure_open().await?;
+        self.objects.register_graveyard(Arc::new(
+            Graveyard::open(&self.objects.root_store(), root_store.graveyard_directory_object_id())
+                .await
+                .context(format!(
+                    "failed to open graveyard (object_id: {})",
+                    root_store.graveyard_directory_object_id()
+                ))?,
+        ));
+
         log::info!("replay done");
         Ok(())
     }
@@ -345,9 +369,9 @@
             .context("preallocate journal")?;
 
         // the root store's graveyard and root directory...
-        let graveyard = Arc::new(Directory::create(&mut transaction, &root_store).await?);
+        let graveyard = Arc::new(Graveyard::create(&mut transaction, &root_store).await?);
         root_store.set_graveyard_directory_object_id(&mut transaction, graveyard.object_id());
-        self.objects.register_graveyard(root_store.store_object_id(), graveyard);
+        self.objects.register_graveyard(graveyard);
 
         let root_directory = Directory::create(&mut transaction, &root_store)
             .await
@@ -442,7 +466,9 @@
         mutations: impl IntoIterator<Item = TxnMutation<'_>>,
         journal_file_checkpoint: JournalCheckpoint,
     ) {
-        log::debug!("BEGIN TXN {}", journal_file_checkpoint.file_offset);
+        if self.trace.load(atomic::Ordering::Relaxed) {
+            log::info!("BEGIN TXN {}", journal_file_checkpoint.file_offset);
+        }
         for TxnMutation { object_id, mutation, associated_object } in mutations {
             self.apply_mutation(
                 object_id,
@@ -453,7 +479,9 @@
             )
             .await;
         }
-        log::debug!("END TXN");
+        if self.trace.load(atomic::Ordering::Relaxed) {
+            log::info!("END TXN");
+        }
     }
 
     // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
@@ -479,12 +507,16 @@
         object: Option<&dyn AssociatedObject>,
     ) {
         if !filter || self.should_apply(object_id, journal_file_checkpoint) {
-            log::debug!("applying mutation: {}: {:?}, filter: {}", object_id, mutation, filter);
+            if self.trace.load(atomic::Ordering::Relaxed) {
+                log::info!("applying mutation: {}: {:?}, filter: {}", object_id, mutation, filter);
+            }
             self.objects
                 .apply_mutation(object_id, mutation, filter, journal_file_checkpoint, object)
                 .await;
         } else {
-            log::debug!("ignoring mutation: {}, {:?}", object_id, mutation);
+            if self.trace.load(atomic::Ordering::Relaxed) {
+                log::info!("ignoring mutation: {}, {:?}", object_id, mutation);
+            }
         }
     }
 
@@ -574,6 +606,8 @@
         Ok(())
     }
 
+    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
+    /// so it still does not guarantee data will have been persisted to lower layers.
     pub async fn sync(&self, _options: SyncOptions) -> Result<(), Error> {
         // TODO(csuter): There needs to be some kind of locking here.
         let needs_super_block = self.inner.lock().unwrap().needs_super_block;
@@ -587,6 +621,11 @@
         Ok(())
     }
 
+    /// Returns a copy of the super-block.
+    pub fn super_block(&self) -> SuperBlock {
+        self.inner.lock().unwrap().super_block.clone()
+    }
+
     /// Returns whether or not a flush should be performed.  This is only updated after committing a
     /// transaction.
     pub fn should_flush(&self) -> bool {
@@ -619,6 +658,7 @@
             device::DeviceHolder,
             object_handle::{ObjectHandle, ObjectHandleExt},
             object_store::{
+                directory::Directory,
                 filesystem::{FxFilesystem, SyncOptions},
                 fsck::fsck,
                 transaction::TransactionHandler,
@@ -638,16 +678,20 @@
         let device = DeviceHolder::new(FakeDevice::new(2048, TEST_DEVICE_BLOCK_SIZE));
 
         let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
+
         let object_id = {
+            let root_store = fs.root_store();
+            let root_directory =
+                Directory::open(&root_store, root_store.root_directory_object_id())
+                    .await
+                    .expect("open failed");
             let mut transaction =
                 fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
-            let handle = ObjectStore::create_object(
-                &fs.root_store(),
-                &mut transaction,
-                HandleOptions::default(),
-            )
-            .await
-            .expect("create_object failed");
+            let handle = root_directory
+                .create_child_file(&mut transaction, "test")
+                .await
+                .expect("create_child_file failed");
+
             transaction.commit().await;
             let mut buf = handle.allocate_buffer(TEST_DATA.len());
             buf.as_mut_slice().copy_from_slice(TEST_DATA);
@@ -663,7 +707,7 @@
             let handle =
                 ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default())
                     .await
-                    .expect("create_object failed");
+                    .expect("open_object failed");
             let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize);
             assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
             assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
@@ -675,21 +719,23 @@
     async fn test_reset() {
         const TEST_DATA: &[u8] = b"hello";
 
-        let device = DeviceHolder::new(FakeDevice::new(4096, TEST_DEVICE_BLOCK_SIZE));
+        let device = DeviceHolder::new(FakeDevice::new(6144, TEST_DEVICE_BLOCK_SIZE));
 
         let mut object_ids = Vec::new();
 
         let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
         {
+            let root_store = fs.root_store();
+            let root_directory =
+                Directory::open(&root_store, root_store.root_directory_object_id())
+                    .await
+                    .expect("open failed");
             let mut transaction =
                 fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
-            let handle = ObjectStore::create_object(
-                &fs.root_store(),
-                &mut transaction,
-                HandleOptions::default(),
-            )
-            .await
-            .expect("create_object failed");
+            let handle = root_directory
+                .create_child_file(&mut transaction, "test")
+                .await
+                .expect("create_child_file failed");
             transaction.commit().await;
             let mut buf = handle.allocate_buffer(TEST_DATA.len());
             buf.as_mut_slice().copy_from_slice(TEST_DATA);
@@ -699,16 +745,13 @@
 
             // Create a lot of objects but don't sync at the end. This should leave the filesystem
             // with a half finished transaction that cannot be replayed.
-            for _ in 0..1000 {
+            for i in 0..1000 {
                 let mut transaction =
                     fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
-                let handle = ObjectStore::create_object(
-                    &fs.root_store(),
-                    &mut transaction,
-                    HandleOptions::default(),
-                )
-                .await
-                .expect("create_object failed");
+                let handle = root_directory
+                    .create_child_file(&mut transaction, &format!("{}", i))
+                    .await
+                    .expect("create_child_file failed");
                 transaction.commit().await;
                 let mut buf = handle.allocate_buffer(TEST_DATA.len());
                 buf.as_mut_slice().copy_from_slice(TEST_DATA);
@@ -720,12 +763,13 @@
         let fs = FxFilesystem::open(fs.take_device().await).await.expect("open failed");
         fsck(&fs).await.expect("fsck failed");
         {
+            let root_store = fs.root_store();
             // Check the first two objects which should exist.
             for &object_id in &object_ids[0..1] {
                 let handle =
-                    ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default())
+                    ObjectStore::open_object(&root_store, object_id, HandleOptions::default())
                         .await
-                        .expect("create_object failed");
+                        .expect("open_object failed");
                 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize);
                 assert_eq!(
                     handle.read(0, buf.as_mut()).await.expect("read failed"),
@@ -735,15 +779,16 @@
             }
 
             // Write one more object and sync.
+            let root_directory =
+                Directory::open(&root_store, root_store.root_directory_object_id())
+                    .await
+                    .expect("open failed");
             let mut transaction =
                 fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
-            let handle = ObjectStore::create_object(
-                &fs.root_store(),
-                &mut transaction,
-                HandleOptions::default(),
-            )
-            .await
-            .expect("create_object failed");
+            let handle = root_directory
+                .create_child_file(&mut transaction, "test2")
+                .await
+                .expect("create_child_file failed");
             transaction.commit().await;
             let mut buf = handle.allocate_buffer(TEST_DATA.len());
             buf.as_mut_slice().copy_from_slice(TEST_DATA);
@@ -752,14 +797,18 @@
             object_ids.push(handle.object_id());
         }
 
-        let fs = FxFilesystem::open(fs.take_device().await).await.expect("open failed");
+        let fs = FxFilesystem::open_with_trace(fs.take_device().await, false)
+            .await
+            .expect("open failed");
         {
+            fsck(&fs).await.expect("fsck failed");
+
             // Check the first two and the last objects.
             for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
                 let handle =
                     ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default())
                         .await
-                        .expect("create_object failed");
+                        .expect(&format!("open_object failed (object_id: {})", object_id));
                 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize);
                 assert_eq!(
                     handle.read(0, buf.as_mut()).await.expect("read failed"),
@@ -767,8 +816,6 @@
                 );
                 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
             }
-
-            fsck(&fs).await.expect("fsck failed");
         }
     }
 }
diff --git a/src/storage/fxfs/src/object_store/journal/super_block.rs b/src/storage/fxfs/src/object_store/journal/super_block.rs
index a74c07f..6fd1fe3 100644
--- a/src/storage/fxfs/src/object_store/journal/super_block.rs
+++ b/src/storage/fxfs/src/object_store/journal/super_block.rs
@@ -40,7 +40,7 @@
 // A super-block consists of this header followed by records that are to be replayed into the root
 // parent object store.
 #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
-pub(super) struct SuperBlock {
+pub struct SuperBlock {
     // TODO(csuter): version stuff
     // TODO(csuter): UUID
 
diff --git a/src/storage/fxfs/src/object_store/record.rs b/src/storage/fxfs/src/object_store/record.rs
index db938c8..ceb9232a 100644
--- a/src/storage/fxfs/src/object_store/record.rs
+++ b/src/storage/fxfs/src/object_store/record.rs
@@ -36,6 +36,8 @@
     Attribute(u64, AttributeKey),
     /// A child of a directory.
     Child { name: String }, // TODO(jfsulliv): Should this be a string or array of bytes?
+    /// A graveyard entry.
+    GraveyardEntry { store_object_id: u64, object_id: u64 },
 }
 
 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
@@ -169,6 +171,14 @@
         Self { object_id, data: ObjectKeyData::Child { name: name.to_owned() } }
     }
 
+    /// Creates a graveyard entry.
+    pub fn graveyard_entry(graveyard_object_id: u64, store_object_id: u64, object_id: u64) -> Self {
+        Self {
+            object_id: graveyard_object_id,
+            data: ObjectKeyData::GraveyardEntry { store_object_id, object_id },
+        }
+    }
+
     pub fn tombstone(object_id: u64) -> Self {
         Self { object_id, data: ObjectKeyData::Tombstone }
     }
@@ -274,6 +284,7 @@
         allocated_size: u64,
     },
     Directory,
+    Graveyard,
 }
 
 /// ObjectValue is the value of an item in the object store.
@@ -283,6 +294,9 @@
 pub enum ObjectValue {
     /// Some keys (e.g. tombstones) have no value.
     None,
+    /// Some keys have no value but need to differentiate between a present value and no value
+    /// (None) i.e. their value is really a boolean: None => false, Some => true.
+    Some,
     /// The value for an ObjectKey::Object record.
     Object { kind: ObjectKind },
     /// An attribute associated with a file object. |size| is the size of the attribute in bytes.
diff --git a/src/storage/fxfs/src/server/directory.rs b/src/storage/fxfs/src/server/directory.rs
index bf2362e..47dd71e 100644
--- a/src/storage/fxfs/src/server/directory.rs
+++ b/src/storage/fxfs/src/server/directory.rs
@@ -291,17 +291,11 @@
         {
             let store = self.store();
             if let ObjectDescriptor::File = descriptor {
-                store
-                    .filesystem()
-                    .object_manager()
-                    .graveyard(store.store_object_id())
-                    .unwrap()
-                    .insert_child(
-                        transaction,
-                        &format!("{}", existing_oid),
-                        existing_oid,
-                        descriptor,
-                    );
+                store.filesystem().object_manager().graveyard().unwrap().add(
+                    transaction,
+                    self.store().store_object_id(),
+                    existing_oid,
+                );
             } else {
                 directory::remove(transaction, &store, existing_oid);
             }
diff --git a/src/storage/fxfs/src/volume.rs b/src/storage/fxfs/src/volume.rs
index ca8b1e18..83b8bb5 100644
--- a/src/storage/fxfs/src/volume.rs
+++ b/src/storage/fxfs/src/volume.rs
@@ -7,7 +7,7 @@
         errors::FxfsError,
         object_store::{
             directory::Directory,
-            filesystem::{Filesystem, FxFilesystem},
+            filesystem::FxFilesystem,
             transaction::{LockKey, TransactionHandler},
             ObjectDescriptor, ObjectStore,
         },
@@ -44,9 +44,6 @@
         let mut transaction = self.filesystem.clone().new_transaction(&[]).await?;
         store = root_store.create_child_store(&mut transaction).await?;
 
-        let graveyard = Arc::new(Directory::create(&mut transaction, &store).await?);
-        store.set_graveyard_directory_object_id(&mut transaction, graveyard.object_id());
-
         let root_directory = Directory::create(&mut transaction, &store).await?;
         store.set_root_directory_object_id(&mut transaction, root_directory.object_id());
 
@@ -56,7 +53,6 @@
             store.store_object_id(),
         );
         transaction.commit().await;
-        self.filesystem.object_manager().register_graveyard(store.store_object_id(), graveyard);
 
         Ok(store)
     }
@@ -71,17 +67,7 @@
         Ok(if let Some(volume_store) = self.filesystem.store(object_id) {
             volume_store
         } else {
-            let store = self.filesystem.root_store().open_store(object_id).await?;
-
-            // Make sure the graveyard is registered.
-            if self.filesystem.object_manager().graveyard(object_id).is_none() {
-                self.filesystem.object_manager().register_graveyard(
-                    object_id,
-                    Arc::new(Directory::open(&store, store.graveyard_directory_object_id()).await?),
-                );
-            }
-
-            store
+            self.filesystem.root_store().open_store(object_id).await?
         })
     }