[fxfs] Fixed volume creation.

Object stores need to be registered with store manager so that we can
find them when replaying the log. When we're replaying the log we need a
way to lazily open stores.

Volume information is stored in a new reserved object (0) that exists
for all object stores. This volume information points at the root
directory for the volume.

Change-Id: I13b055f9f0df8d95da977a09661ba9d2e0162986
diff --git a/src/storage/fxfs/src/lsm_tree.rs b/src/storage/fxfs/src/lsm_tree.rs
index 56da729..94eb950 100644
--- a/src/storage/fxfs/src/lsm_tree.rs
+++ b/src/storage/fxfs/src/lsm_tree.rs
@@ -32,7 +32,13 @@
 {
 }
 pub trait Value:
-    std::fmt::Debug + Send + Sync + serde::de::DeserializeOwned + serde::Serialize + std::marker::Unpin + 'static
+    std::fmt::Debug
+    + Send
+    + Sync
+    + serde::de::DeserializeOwned
+    + serde::Serialize
+    + std::marker::Unpin
+    + 'static
 {
 }
 
@@ -49,7 +55,13 @@
 {
 }
 impl<V> Value for V where
-    V: std::fmt::Debug + Send + Sync + std::marker::Unpin + serde::de::DeserializeOwned + serde::Serialize + 'static
+    V: std::fmt::Debug
+        + Send
+        + Sync
+        + std::marker::Unpin
+        + serde::de::DeserializeOwned
+        + serde::Serialize
+        + 'static
 {
 }
 
@@ -253,6 +265,7 @@
             let mut merger = merge::Merger::new(iterators, self.merge_fn);
             merger.advance()?;
             while let Some(item_ref) = merger.get() {
+                eprintln!("{:?}", item_ref);
                 writer.write(item_ref)?;
                 merger.advance()?;
             }
diff --git a/src/storage/fxfs/src/object_store.rs b/src/storage/fxfs/src/object_store.rs
index e6f8e7a..6e37c02 100644
--- a/src/storage/fxfs/src/object_store.rs
+++ b/src/storage/fxfs/src/object_store.rs
@@ -18,11 +18,11 @@
     crate::{
         lsm_tree::{ItemRef, LSMTree},
         object_handle::{ObjectHandle, ObjectHandleCursor},
-        object_store::constants::INVALID_OBJECT_ID,
     },
     allocator::Allocator,
     anyhow::Error,
     bincode::{deserialize_from, serialize_into},
+    constants::{LOWEST_SPECIAL_OBJECT_ID, RESERVED_OBJECT_ID},
     log::{Log, Mutation},
     record::{decode_extent, ExtentKey, ObjectItem, ObjectKey, ObjectKeyData, ObjectValue},
     serde::{Deserialize, Serialize},
@@ -42,6 +42,7 @@
 
 #[derive(Default)]
 pub struct StoreOptions {
+    // TODO: This is horrible, and we should revisit this to see if it's necessary.
     use_parent_to_allocate_object_ids: bool,
 }
 
@@ -401,9 +402,6 @@
 
 #[derive(Clone, Default, Serialize, Deserialize)]
 pub struct StoreInfo {
-    // The root object ID.
-    root_object_id: u64,
-
     // The last used object ID.
     last_object_id: u64,
 
@@ -414,7 +412,7 @@
 
 impl StoreInfo {
     fn new() -> StoreInfo {
-        StoreInfo { root_object_id: INVALID_OBJECT_ID, last_object_id: 0, layers: Vec::new() }
+        StoreInfo { last_object_id: RESERVED_OBJECT_ID, layers: Vec::new() }
     }
 }
 
@@ -428,6 +426,12 @@
     options: StoreOptions,
     store_info: Mutex<StoreInfo>,
     tree: LSMTree<ObjectKey, ObjectValue>,
+
+    // When replaying the log, the store cannot read StoreInfo until the whole log
+    // has been replayed, so during that time, opened will be false and records
+    // just get sent to the tree. Once the log has been replayed, we can open the store
+    // and load all the other layer information.
+    opened: Mutex<bool>,
 }
 
 impl ObjectStore {
@@ -440,6 +444,7 @@
         store_info: StoreInfo,
         tree: LSMTree<ObjectKey, ObjectValue>,
         options: StoreOptions,
+        opened: bool,
     ) -> Arc<ObjectStore> {
         Arc::new(ObjectStore {
             parent_store,
@@ -451,6 +456,7 @@
             options,
             store_info: Mutex::new(store_info),
             tree,
+            opened: Mutex::new(opened),
         })
     }
 
@@ -471,6 +477,7 @@
             StoreInfo::new(),
             LSMTree::new(merge::merge),
             options,
+            true,
         )
     }
 
@@ -482,19 +489,73 @@
         self: &Arc<ObjectStore>,
         options: StoreOptions,
     ) -> Result<Arc<ObjectStore>, Error> {
+        self.ensure_open()?;
         // TODO: This should probably all be in a transaction. There should probably be a log
         // record to create a store.
         let mut transaction = Transaction::new();
         let handle = self.clone().create_object(&mut transaction, HandleOptions::default())?;
-        self.log.upgrade().unwrap().commit(transaction);
-        Ok(Self::new_empty(
+        let log = self.log.upgrade().unwrap();
+        log.commit(transaction);
+
+        // Write a default StoreInfo file.  TODO: this should be part of a bigger transaction i.e.
+        // this function should take transaction as an arg.
+        let mut writer = BufWriter::new(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0));
+        serialize_into(&mut writer, &StoreInfo::default())?;
+        writer.flush()?;
+
+        let new_store = Self::new_empty(
             Some(self.clone()),
             handle.object_id(),
             self.device.clone(),
             &self.allocator.upgrade().unwrap(),
             &self.log.upgrade().unwrap(),
             options,
-        ))
+        );
+        log.register_store(&new_store);
+        Ok(new_store)
+    }
+
+    pub fn lazy_open_store(
+        self: &Arc<ObjectStore>,
+        store_object_id: u64,
+        options: StoreOptions,
+    ) -> Arc<ObjectStore> {
+        Self::new(
+            Some(self.clone()),
+            store_object_id,
+            self.device.clone(),
+            &self.allocator.upgrade().unwrap(),
+            &self.log.upgrade().unwrap(),
+            StoreInfo::default(),
+            LSMTree::new(merge::merge),
+            options,
+            false,
+        )
+    }
+
+    // TODO: find a way to make sure this is always called at the right time. Any time we add
+    // mutation records to a transaction, this needs to be called prior to that.
+    fn ensure_open(&self) -> Result<(), Error> {
+        let mut opened = self.opened.lock().unwrap();
+        if *opened {
+            return Ok(());
+        }
+
+        let parent_store = self.parent_store.as_ref().unwrap();
+        let handle = parent_store.open_object(self.store_object_id, HandleOptions::default())?;
+        let store_info: StoreInfo =
+            deserialize_from(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0))?;
+        let mut handles = Vec::new();
+        for object_id in &store_info.layers {
+            handles.push(parent_store.open_object(*object_id, HandleOptions::default())?);
+        }
+        self.tree.set_layers(handles.into());
+        let mut current_store_info = self.store_info_for_last_object_id().lock().unwrap();
+        if store_info.last_object_id > current_store_info.last_object_id {
+            current_store_info.last_object_id = store_info.last_object_id
+        }
+        *opened = true;
+        Ok(())
     }
 
     pub fn open_store(
@@ -502,51 +563,21 @@
         store_object_id: u64,
         options: StoreOptions,
     ) -> Result<Arc<ObjectStore>, Error> {
-        println!("opening handle");
-        let handle = self.clone().open_object(store_object_id, HandleOptions::default())?;
-        println!("deserializing");
-        let store_info: StoreInfo =
-            deserialize_from(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0))?;
-        println!("opening handles");
-        let mut handles = Vec::new();
-        for object_id in &store_info.layers {
-            handles.push(self.clone().open_object(*object_id, HandleOptions::default())?);
-        }
-        if options.use_parent_to_allocate_object_ids {
-            if store_info.last_object_id > self.store_info.lock().unwrap().last_object_id {
-                self.store_info.lock().unwrap().last_object_id = store_info.last_object_id;
-            }
-        }
-        Ok(Self::new(
-            Some(self.clone()),
-            store_object_id,
-            self.device.clone(),
-            &self.allocator.upgrade().unwrap(),
-            &self.log.upgrade().unwrap(),
-            store_info,
-            LSMTree::open(merge::merge, handles.into_boxed_slice()),
-            StoreOptions::default(),
-        ))
+        let store = self.lazy_open_store(store_object_id, options);
+        store.ensure_open()?;
+        Ok(store)
     }
 
     pub fn store_object_id(&self) -> u64 {
         self.store_object_id
     }
 
-    pub fn root_object_id(&self) -> u64 {
-        self.store_info.lock().unwrap().root_object_id
-    }
-
-    pub fn set_root_object_id(&self, root_object_id: u64) {
-        let mut store_info = self.store_info.lock().unwrap();
-        store_info.root_object_id = root_object_id;
-    }
-
     pub fn open_object(
         self: &Arc<Self>,
         object_id: u64,
         options: HandleOptions,
     ) -> std::io::Result<StoreObjectHandle> {
+        self.ensure_open().map_err(map_to_io_error)?;
         let item = self
             .tree
             .find(&ObjectKey::attribute(object_id, 0))
@@ -572,6 +603,7 @@
         object_id: u64,
         options: HandleOptions,
     ) -> std::io::Result<StoreObjectHandle> {
+        self.ensure_open().map_err(map_to_io_error)?;
         transaction.add(
             self.store_object_id,
             Mutation::Insert {
@@ -601,6 +633,7 @@
     }
 
     pub fn create_directory(self: &Arc<Self>) -> std::io::Result<directory::Directory> {
+        self.ensure_open().map_err(map_to_io_error)?;
         let object_id = self.get_next_object_id();
         let mut transaction = Transaction::new();
         transaction.add(
@@ -662,6 +695,7 @@
     // Push all in-memory structures to the device. This is not necessary for sync since the log
     // will take care of it. This will panic if called on the root parent store.
     pub fn flush(&self, force: bool) -> Result<(), Error> {
+        self.ensure_open()?;
         let log = self.log();
         let mut object_sync = log.begin_object_sync(self.store_object_id);
         if !force && !object_sync.needs_sync() {
@@ -694,7 +728,9 @@
     // -- Methods only to be called by Log --
     pub fn insert(&self, item: ObjectItem) {
         let store_info = self.store_info_for_last_object_id();
-        if item.key.object_id > store_info.lock().unwrap().last_object_id {
+        if item.key.object_id < LOWEST_SPECIAL_OBJECT_ID
+            && item.key.object_id > store_info.lock().unwrap().last_object_id
+        {
             store_info.lock().unwrap().last_object_id = item.key.object_id;
         }
         self.tree.insert(item);
diff --git a/src/storage/fxfs/src/object_store/constants.rs b/src/storage/fxfs/src/object_store/constants.rs
index 4e855c8..7bb96bd 100644
--- a/src/storage/fxfs/src/object_store/constants.rs
+++ b/src/storage/fxfs/src/object_store/constants.rs
@@ -1,6 +1,11 @@
 pub const INVALID_OBJECT_ID: u64 = 0xffffffffffffffff;
 pub const ROOT_PARENT_STORE_OBJECT_ID: u64 = 0xfffffffffffffffe;
-pub const SUPER_BLOCK_OBJECT_ID: u64 = 0;
+// This only exists in the root parent store.
+pub const SUPER_BLOCK_OBJECT_ID: u64 = 0xfffffffffffffffd;
+
+pub const LOWEST_SPECIAL_OBJECT_ID: u64 = SUPER_BLOCK_OBJECT_ID;
+
+pub const RESERVED_OBJECT_ID: u64 = 0;
 
 // The first two blocks on the disk are reserved for the super block.
 pub const MIN_SUPER_BLOCK_SIZE: u64 = 8192;
diff --git a/src/storage/fxfs/src/object_store/filesystem.rs b/src/storage/fxfs/src/object_store/filesystem.rs
index 675d677..5ff567a 100644
--- a/src/storage/fxfs/src/object_store/filesystem.rs
+++ b/src/storage/fxfs/src/object_store/filesystem.rs
@@ -1,25 +1,26 @@
 use {
     crate::{
+        object_handle::{ObjectHandle, ObjectHandleCursor},
         object_store::{
             allocator::{Allocator, SimpleAllocator},
-            constants::{INVALID_OBJECT_ID, ROOT_PARENT_STORE_OBJECT_ID},
+            constants::{INVALID_OBJECT_ID, RESERVED_OBJECT_ID, ROOT_PARENT_STORE_OBJECT_ID},
             log::Log,
             record::ObjectType,
-            Device, Directory, ObjectStore, StoreOptions,
+            Device, Directory, HandleOptions, ObjectStore, StoreOptions, Transaction,
         },
     },
     anyhow::Error,
+    bincode::{deserialize_from, serialize_into},
+    serde::{Deserialize, Serialize},
     std::{
         collections::HashMap,
+        io::{BufWriter, Write},
         sync::{Arc, RwLock},
     },
 };
 
 #[cfg(test)]
-use {
-    anyhow::anyhow,
-    crate::testing::fake_device::FakeDevice,
-};
+use {crate::testing::fake_device::FakeDevice, anyhow::anyhow};
 
 #[derive(Default)]
 pub struct SyncOptions {
@@ -49,8 +50,8 @@
         self.store(ROOT_PARENT_STORE_OBJECT_ID).unwrap()
     }
 
-    pub fn new_store(&self, store: Arc<ObjectStore>) {
-        self.stores.write().unwrap().stores.insert(store.store_object_id(), store);
+    pub fn new_store(&self, store: &Arc<ObjectStore>) {
+        self.stores.write().unwrap().stores.insert(store.store_object_id(), store.clone());
     }
 
     pub fn store(&self, store_object_id: u64) -> Option<Arc<ObjectStore>> {
@@ -69,51 +70,16 @@
     }
 }
 
-pub struct VolumeManager {
-    // We store volumes as entries in a directory.
-    storage: Arc<RwLock<Directory>>,
-}
-
-impl VolumeManager {
-    pub fn new(storage: Directory) -> Self {
-        Self { storage: Arc::new(RwLock::new(storage)) }
-    }
-
-    pub fn new_volume(&self, volume_name: &str) -> Result<Directory, Error> {
-        // TODO this should be transactional.
-
-        let volume_store = self.storage.write().unwrap().create_volume_store(volume_name)?;
-        println!("added volume ({:?}) object id {:?}", volume_name, volume_store.store_object_id());
-
-        // Add the root directory.
-        let initial_dir_handle = volume_store.create_directory()?;
-        volume_store.set_root_object_id(initial_dir_handle.object_id());
-        // TODO do we need to force?
-        volume_store.flush(true)?;
-        Ok(initial_dir_handle)
-    }
-
-    pub fn volume(&self, volume_name: &str) -> Option<Directory> {
-        let storage = self.storage.read().unwrap();
-        let (object_id, object_type) = storage.lookup(volume_name).ok()?;
-        // TODO better error handling; this panics if the child's type is unexpected, or the
-        // load fails
-        if let ObjectType::Volume = object_type {
-            // nop
-        } else {
-            panic!("Unexpceted non-volume child")
-        }
-        let root_store = storage.object_store();
-        let volume_store = root_store.open_store(object_id, StoreOptions::default()).ok()?;
-        volume_store.open_directory(volume_store.root_object_id()).ok()
-    }
+#[derive(Clone, Default, Serialize, Deserialize)]
+struct VolumeInfo {
+    root_directory_object_id: u64,
 }
 
 pub struct Filesystem {
     device: Arc<dyn Device>,
     log: Arc<Log>,
     stores: Arc<StoreManager>,
-    volumes: Arc<VolumeManager>,
+    volume_directory: Arc<RwLock<Directory>>,
 }
 
 impl Filesystem {
@@ -121,7 +87,7 @@
         let log = Arc::new(Log::new());
         let allocator = Arc::new(SimpleAllocator::new(&log));
         let stores = Arc::new(StoreManager::new());
-        stores.new_store(ObjectStore::new_empty(
+        stores.new_store(&ObjectStore::new_empty(
             None,
             ROOT_PARENT_STORE_OBJECT_ID,
             device.clone(),
@@ -133,12 +99,25 @@
 
         // Add a directory as the root of all volumes. The root store's root object ID will be
         // this directory.
+        let mut transaction = Transaction::new();
+        let handle = stores.root_store().create_object_with_id(
+            &mut transaction,
+            RESERVED_OBJECT_ID,
+            HandleOptions::default(),
+        )?;
+        log.commit(transaction);
+        let mut writer = BufWriter::new(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0));
         let volume_manager_handle = stores.root_store().create_directory()?;
-        stores.root_store().set_root_object_id(volume_manager_handle.object_id());
-        println!("volume manager object id {:?}", volume_manager_handle.object_id());
-        let volumes = Arc::new(VolumeManager::new(volume_manager_handle));
+        let info = VolumeInfo { root_directory_object_id: volume_manager_handle.object_id() };
+        serialize_into(&mut writer, &info)?;
+        writer.flush()?;
 
-        Ok(Filesystem { device, log, stores, volumes })
+        Ok(Filesystem {
+            device,
+            log,
+            stores,
+            volume_directory: Arc::new(RwLock::new(volume_manager_handle)),
+        })
     }
 
     pub fn open(device: Arc<dyn Device>) -> Result<Filesystem, Error> {
@@ -147,20 +126,19 @@
         let stores = Arc::new(StoreManager::new());
         log.replay(device.clone(), stores.clone(), allocator.clone())?;
 
+        let handle =
+            stores.root_store().open_object(RESERVED_OBJECT_ID, HandleOptions::default())?;
+        let info: VolumeInfo =
+            deserialize_from(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0))?;
         let volume_manager_handle =
-            stores.root_store().open_directory(stores.root_store().root_object_id())?;
-        let volumes = Arc::new(VolumeManager::new(volume_manager_handle));
+            stores.root_store().open_directory(info.root_directory_object_id)?;
 
-        Ok(Filesystem { device, log, stores, volumes })
-    }
-
-    pub fn new_volume(&mut self, name: &str) -> Result<Directory, Error> {
-        self.volumes.new_volume(name)
-    }
-
-    // TODO is Directory the best type to return? Wrapper type, maybe.
-    pub fn volume(&self, name: &str) -> Option<Directory> {
-        self.volumes.volume(name)
+        Ok(Filesystem {
+            device,
+            log,
+            stores,
+            volume_directory: Arc::new(RwLock::new(volume_manager_handle)),
+        })
     }
 
     pub fn root_parent_store(&self) -> Arc<ObjectStore> {
@@ -178,6 +156,49 @@
     pub fn device(&self) -> &Arc<dyn Device> {
         return &self.device;
     }
+
+    pub fn new_volume(&self, volume_name: &str) -> Result<Directory, Error> {
+        // TODO this should be transactional.
+
+        let volume_store =
+            self.volume_directory.write().unwrap().create_volume_store(volume_name)?;
+
+        // Add the root directory.
+        let mut transaction = Transaction::new();
+        let handle = volume_store.create_object_with_id(
+            &mut transaction,
+            RESERVED_OBJECT_ID,
+            HandleOptions::default(),
+        )?;
+        self.log.commit(transaction);
+        let mut writer = BufWriter::new(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0));
+        let initial_dir_handle = volume_store.create_directory()?;
+        let info = VolumeInfo { root_directory_object_id: initial_dir_handle.object_id() };
+        serialize_into(&mut writer, &info)?;
+        writer.flush()?;
+
+        Ok(initial_dir_handle)
+    }
+
+    // TODO is Directory the best type to return? Wrapper type, maybe.
+    pub fn volume(&self, volume_name: &str) -> Option<Directory> {
+        let volume_directory = self.volume_directory.read().unwrap();
+        let (object_id, object_type) = volume_directory.lookup(volume_name).ok()?;
+        // TODO better error handling; this panics if the child's type is unexpected, or the
+        // load fails
+        if let ObjectType::Volume = object_type {
+            // nop
+        } else {
+            panic!("Unexpected non-volume child")
+        }
+        let volume_store = self.stores.store(object_id).unwrap_or(
+            self.stores.root_store().open_store(object_id, StoreOptions::default()).ok()?,
+        );
+        let handle = volume_store.open_object(RESERVED_OBJECT_ID, HandleOptions::default()).ok()?;
+        let info: VolumeInfo =
+            deserialize_from(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0)).ok()?;
+        volume_store.open_directory(info.root_directory_object_id).ok()
+    }
 }
 
 // TODO: Consider sync on drop
@@ -193,8 +214,6 @@
     }
 }
 
-// TODO fix me
-/*
 #[test]
 fn test_add_volume() -> Result<(), Error> {
     let device = Arc::new(FakeDevice::new(512));
@@ -213,4 +232,3 @@
     };
     Ok(())
 }
-*/
\ No newline at end of file
diff --git a/src/storage/fxfs/src/object_store/log.rs b/src/storage/fxfs/src/object_store/log.rs
index 4ccb3b21..ad7fb84 100644
--- a/src/storage/fxfs/src/object_store/log.rs
+++ b/src/storage/fxfs/src/object_store/log.rs
@@ -314,7 +314,7 @@
         }));
         let super_block: SuperBlock = deserialize_from(&mut reader)?;
         println!("super-block: {:?}", super_block);
-        stores.new_store(ObjectStore::new_empty(
+        stores.new_store(&ObjectStore::new_empty(
             None,
             ROOT_PARENT_STORE_OBJECT_ID,
             device,
@@ -386,7 +386,6 @@
                             }
                             if !found_end_of_super_block {
                                 let root_parent = stores.root_parent_store();
-                                println!("opening object store");
                                 stores.set_root_store(root_parent.open_store(
                                     super_block.root_store_object_id,
                                     StoreOptions {
@@ -398,7 +397,6 @@
                                 reader.read_offset = super_block.log_checkpoint.file_offset;
                                 reader.last_check_sum = super_block.log_checkpoint.check_sum;
                                 reader.last_read_check_sum = super_block.log_checkpoint.check_sum;
-                                println!("opening log file");
                                 reader.handle = Box::new(stores.root_store().open_object(
                                     super_block.log_object_id,
                                     log_handle_options(),
@@ -543,6 +541,10 @@
     pub fn commit(&self, transaction: Transaction) {
         self.log().as_mut().unwrap().commit(transaction, None);
     }
+
+    pub fn register_store(&self, store: &Arc<ObjectStore>) {
+        self.log().as_ref().unwrap().stores.new_store(store);
+    }
 }
 
 pub struct InitializedLog {
@@ -635,6 +637,16 @@
         return true;
     }
 
+    fn get_store(&self, store_object_id: u64) -> Arc<ObjectStore> {
+        if let Some(store) = self.stores.store(store_object_id) {
+            return store;
+        }
+        let store =
+            self.stores.root_store().lazy_open_store(store_object_id, StoreOptions::default());
+        self.stores.new_store(&store);
+        store
+    }
+
     fn apply_mutation(
         &mut self,
         object_id: u64,
@@ -642,12 +654,10 @@
         mutation: Mutation,
         log_file_offsets: Option<&HashMap<u64, u64>>,
     ) {
-        // If a store isn't found below, that's OK --- it means it must have been deleted in a
-        // later log message. TODO: is there a nicer way?
         match mutation {
             Mutation::Insert { item } => {
                 if self.should_apply(object_id, log_file_checkpoint, log_file_offsets) {
-                    self.stores.store(object_id).map(|s| s.insert(item));
+                    self.get_store(object_id).insert(item);
                 }
             }
             Mutation::ReplaceExtent { item } => {
@@ -668,12 +678,12 @@
                     );
                 }
                 if self.should_apply(object_id, log_file_checkpoint, log_file_offsets) {
-                    self.stores.store(object_id).map(|s| s.replace_extent(item));
+                    self.get_store(object_id).replace_extent(item);
                 }
             }
             Mutation::ReplaceOrInsert { item } => {
                 if self.should_apply(object_id, log_file_checkpoint, log_file_offsets) {
-                    self.stores.store(object_id).map(|s| s.replace_or_insert(item));
+                    self.get_store(object_id).replace_or_insert(item);
                 }
             }
             Mutation::Deallocate(item) => {