(Attempt to) add an initial volume

mkfs adds a volume named "default", which mount will open up.

Adds a "root object ID" to object stores.

Volumes are managed in a directory which is the root object for the root
object store. Entries in the directory point to object stores which root
each volume; the root object ID for these volume object stores is the
root directory.

Added a test which unfortunately fails ATM (disabled_test_add_volume).
The volume doesn't appear after remounting the filesystem.

Change-Id: I45c6f2dc5ae0fb1644cb2eda77b848825ecdf5ef
diff --git a/src/storage/BUILD.gn b/src/storage/BUILD.gn
index 340759a..bcf6dbf 100644
--- a/src/storage/BUILD.gn
+++ b/src/storage/BUILD.gn
@@ -18,6 +18,7 @@
     "factory:tests",
     "fs-management-tests:tests",
     "fs_test:tests",
+    "fxfs:tests",
     "fshost:tests",
     "fuchsia-fatfs:tests",
     "fvm:tests",
diff --git a/src/storage/fxfs/BUILD.gn b/src/storage/fxfs/BUILD.gn
index e3cef84..123dae5 100644
--- a/src/storage/fxfs/BUILD.gn
+++ b/src/storage/fxfs/BUILD.gn
@@ -45,6 +45,8 @@
   "src/object_store/merge.rs",
   "src/object_store/record.rs",
   "src/object_store/tests.rs",
+  "src/testing/mod.rs",
+  "src/testing/fake_device.rs",
 ]
 
 rustc_library("lib") {
diff --git a/src/storage/fxfs/src/lib.rs b/src/storage/fxfs/src/lib.rs
index e8e2297..83aa720 100644
--- a/src/storage/fxfs/src/lib.rs
+++ b/src/storage/fxfs/src/lib.rs
@@ -5,3 +5,4 @@
 pub mod mount;
 pub mod object_handle;
 pub mod object_store;
+pub mod testing;
diff --git a/src/storage/fxfs/src/mkfs.rs b/src/storage/fxfs/src/mkfs.rs
index 3bcf940..ab419c0 100644
--- a/src/storage/fxfs/src/mkfs.rs
+++ b/src/storage/fxfs/src/mkfs.rs
@@ -10,6 +10,7 @@
 
 pub fn mkfs(cache: Cache) -> Result<(), Error> {
     let mut fs = Filesystem::new_empty(Arc::new(Device::new(cache)))?;
+    fs.new_volume("default")?;
     fs.sync(SyncOptions::default())?;
     Ok(())
 }
diff --git a/src/storage/fxfs/src/mount.rs b/src/storage/fxfs/src/mount.rs
index db36fae..3c62687 100644
--- a/src/storage/fxfs/src/mount.rs
+++ b/src/storage/fxfs/src/mount.rs
@@ -7,7 +7,9 @@
 
 pub fn mount(cache: Cache) -> Result<(), Error> {
     let device = Arc::new(Device::new(cache));
-    let _filesystem = Filesystem::open(device)?;
+    let fs = Filesystem::open(device)?;
+    // TODO don't panic if we can't find the default volume
+    fs.volume("default").unwrap();
     // TODO
     Ok(())
 }
diff --git a/src/storage/fxfs/src/object_store.rs b/src/storage/fxfs/src/object_store.rs
index e34e0d6..e6f8e7a 100644
--- a/src/storage/fxfs/src/object_store.rs
+++ b/src/storage/fxfs/src/object_store.rs
@@ -18,6 +18,7 @@
     crate::{
         lsm_tree::{ItemRef, LSMTree},
         object_handle::{ObjectHandle, ObjectHandleCursor},
+        object_store::constants::INVALID_OBJECT_ID,
     },
     allocator::Allocator,
     anyhow::Error,
@@ -400,6 +401,9 @@
 
 #[derive(Clone, Default, Serialize, Deserialize)]
 pub struct StoreInfo {
+    // The root object ID.
+    root_object_id: u64,
+
     // The last used object ID.
     last_object_id: u64,
 
@@ -410,7 +414,7 @@
 
 impl StoreInfo {
     fn new() -> StoreInfo {
-        StoreInfo { last_object_id: 0, layers: Vec::new() }
+        StoreInfo { root_object_id: INVALID_OBJECT_ID, last_object_id: 0, layers: Vec::new() }
     }
 }
 
@@ -475,21 +479,20 @@
     }
 
     pub fn create_child_store(
-        parent_store: Arc<ObjectStore>,
+        self: &Arc<ObjectStore>,
         options: StoreOptions,
     ) -> Result<Arc<ObjectStore>, Error> {
         // TODO: This should probably all be in a transaction. There should probably be a log
         // record to create a store.
         let mut transaction = Transaction::new();
-        let handle =
-            parent_store.clone().create_object(&mut transaction, HandleOptions::default())?;
-        parent_store.log.upgrade().unwrap().commit(transaction);
+        let handle = self.clone().create_object(&mut transaction, HandleOptions::default())?;
+        self.log.upgrade().unwrap().commit(transaction);
         Ok(Self::new_empty(
-            Some(parent_store.clone()),
+            Some(self.clone()),
             handle.object_id(),
-            parent_store.device.clone(),
-            &parent_store.allocator.upgrade().unwrap(),
-            &parent_store.log.upgrade().unwrap(),
+            self.device.clone(),
+            &self.allocator.upgrade().unwrap(),
+            &self.log.upgrade().unwrap(),
             options,
         ))
     }
@@ -530,6 +533,15 @@
         self.store_object_id
     }
 
+    pub fn root_object_id(&self) -> u64 {
+        self.store_info.lock().unwrap().root_object_id
+    }
+
+    pub fn set_root_object_id(&self, root_object_id: u64) {
+        let mut store_info = self.store_info.lock().unwrap();
+        store_info.root_object_id = root_object_id;
+    }
+
     pub fn open_object(
         self: &Arc<Self>,
         object_id: u64,
diff --git a/src/storage/fxfs/src/object_store/allocator/tests.rs b/src/storage/fxfs/src/object_store/allocator/tests.rs
index a4edadd..c17c616 100644
--- a/src/storage/fxfs/src/object_store/allocator/tests.rs
+++ b/src/storage/fxfs/src/object_store/allocator/tests.rs
@@ -8,7 +8,7 @@
 #[test]
 fn test_allocate_reserves() -> Result<(), Error> {
     let log = Arc::new(Log::new());
-    let allocator = SimpleAllocator::new(log);
+    let allocator = SimpleAllocator::new(&log);
     let allocation1 = allocator.allocate(1, 0, 0..512)?;
     let allocation2 = allocator.allocate(1, 0, 0..512)?;
     assert!(allocation2.start >= allocation1.end || allocation2.end <= allocation1.start);
diff --git a/src/storage/fxfs/src/object_store/constants.rs b/src/storage/fxfs/src/object_store/constants.rs
index b976558..4e855c8 100644
--- a/src/storage/fxfs/src/object_store/constants.rs
+++ b/src/storage/fxfs/src/object_store/constants.rs
@@ -1,4 +1,5 @@
-pub const ROOT_PARENT_STORE_OBJECT_ID: u64 = 0xffffffffffffffff;
+pub const INVALID_OBJECT_ID: u64 = 0xffffffffffffffff;
+pub const ROOT_PARENT_STORE_OBJECT_ID: u64 = 0xfffffffffffffffe;
 pub const SUPER_BLOCK_OBJECT_ID: u64 = 0;
 
 // The first two blocks on the disk are reserved for the super block.
diff --git a/src/storage/fxfs/src/object_store/directory.rs b/src/storage/fxfs/src/object_store/directory.rs
index 5ae8ec9..c6d6386 100644
--- a/src/storage/fxfs/src/object_store/directory.rs
+++ b/src/storage/fxfs/src/object_store/directory.rs
@@ -5,7 +5,7 @@
             log::{Mutation, Transaction},
             map_to_io_error,
             record::{ObjectItem, ObjectKey, ObjectType, ObjectValue},
-            HandleOptions, ObjectStore,
+            HandleOptions, ObjectStore, StoreOptions,
         },
     },
     std::{io::ErrorKind, sync::Arc},
@@ -21,6 +21,10 @@
         Directory { store, object_id }
     }
 
+    pub fn object_store(&self) -> Arc<ObjectStore> {
+        self.store.clone()
+    }
+
     pub fn create_child_file(&mut self, name: &str) -> std::io::Result<impl ObjectHandle> {
         let mut transaction = Transaction::new(); // TODO: transaction too big?
         let handle = self.store.create_object(&mut transaction, HandleOptions::default())?;
@@ -37,6 +41,28 @@
         Ok(handle)
     }
 
+    pub fn create_volume_store(&mut self, name: &str) -> std::io::Result<Arc<ObjectStore>> {
+        let mut transaction = Transaction::new(); // TODO: transaction too big?
+
+        // TODO this will break if we tried adding a nested object_store (we only support children
+        // of the root store).
+        // TODO this should be in |transaction|
+        // TODO we should use anyhow::Error throughout so we don't break backtraces.
+        let store =
+            self.store.create_child_store(StoreOptions::default()).map_err(map_to_io_error)?;
+        transaction.add(
+            self.store.store_object_id(),
+            Mutation::Insert {
+                item: ObjectItem {
+                    key: ObjectKey::child(self.object_id, name),
+                    value: ObjectValue::child(store.store_object_id(), ObjectType::Volume),
+                },
+            },
+        );
+        self.store.log.upgrade().unwrap().commit(transaction);
+        Ok(store)
+    }
+
     pub fn object_id(&self) -> u64 {
         return self.object_id;
     }
diff --git a/src/storage/fxfs/src/object_store/filesystem.rs b/src/storage/fxfs/src/object_store/filesystem.rs
index c9a727e..675d677 100644
--- a/src/storage/fxfs/src/object_store/filesystem.rs
+++ b/src/storage/fxfs/src/object_store/filesystem.rs
@@ -1,9 +1,12 @@
 use {
-    crate::object_store::{
-        allocator::{Allocator, SimpleAllocator},
-        constants::ROOT_PARENT_STORE_OBJECT_ID,
-        log::Log,
-        Device, ObjectStore, StoreOptions,
+    crate::{
+        object_store::{
+            allocator::{Allocator, SimpleAllocator},
+            constants::{INVALID_OBJECT_ID, ROOT_PARENT_STORE_OBJECT_ID},
+            log::Log,
+            record::ObjectType,
+            Device, Directory, ObjectStore, StoreOptions,
+        },
     },
     anyhow::Error,
     std::{
@@ -12,6 +15,12 @@
     },
 };
 
+#[cfg(test)]
+use {
+    anyhow::anyhow,
+    crate::testing::fake_device::FakeDevice,
+};
+
 #[derive(Default)]
 pub struct SyncOptions {
     pub new_super_block: bool,
@@ -29,7 +38,10 @@
 impl StoreManager {
     pub fn new() -> StoreManager {
         StoreManager {
-            stores: RwLock::new(Stores { stores: HashMap::new(), root_store_object_id: 0 }),
+            stores: RwLock::new(Stores {
+                stores: HashMap::new(),
+                root_store_object_id: INVALID_OBJECT_ID,
+            }),
         }
     }
 
@@ -57,10 +69,51 @@
     }
 }
 
+pub struct VolumeManager {
+    // We store volumes as entries in a directory.
+    storage: Arc<RwLock<Directory>>,
+}
+
+impl VolumeManager {
+    pub fn new(storage: Directory) -> Self {
+        Self { storage: Arc::new(RwLock::new(storage)) }
+    }
+
+    pub fn new_volume(&self, volume_name: &str) -> Result<Directory, Error> {
+        // TODO this should be transactional.
+
+        let volume_store = self.storage.write().unwrap().create_volume_store(volume_name)?;
+        println!("added volume ({:?}) object id {:?}", volume_name, volume_store.store_object_id());
+
+        // Add the root directory.
+        let initial_dir_handle = volume_store.create_directory()?;
+        volume_store.set_root_object_id(initial_dir_handle.object_id());
+        // TODO do we need to force?
+        volume_store.flush(true)?;
+        Ok(initial_dir_handle)
+    }
+
+    pub fn volume(&self, volume_name: &str) -> Option<Directory> {
+        let storage = self.storage.read().unwrap();
+        let (object_id, object_type) = storage.lookup(volume_name).ok()?;
+        // TODO better error handling; this panics if the child's type is unexpected, or the
+        // load fails
+        if let ObjectType::Volume = object_type {
+            // nop
+        } else {
+            panic!("Unexpceted non-volume child")
+        }
+        let root_store = storage.object_store();
+        let volume_store = root_store.open_store(object_id, StoreOptions::default()).ok()?;
+        volume_store.open_directory(volume_store.root_object_id()).ok()
+    }
+}
+
 pub struct Filesystem {
     device: Arc<dyn Device>,
     log: Arc<Log>,
     stores: Arc<StoreManager>,
+    volumes: Arc<VolumeManager>,
 }
 
 impl Filesystem {
@@ -77,7 +130,15 @@
             StoreOptions::default(),
         ));
         log.init_empty(&stores, &(allocator as Arc<dyn Allocator>))?;
-        Ok(Filesystem { device, log, stores })
+
+        // Add a directory as the root of all volumes. The root store's root object ID will be
+        // this directory.
+        let volume_manager_handle = stores.root_store().create_directory()?;
+        stores.root_store().set_root_object_id(volume_manager_handle.object_id());
+        println!("volume manager object id {:?}", volume_manager_handle.object_id());
+        let volumes = Arc::new(VolumeManager::new(volume_manager_handle));
+
+        Ok(Filesystem { device, log, stores, volumes })
     }
 
     pub fn open(device: Arc<dyn Device>) -> Result<Filesystem, Error> {
@@ -85,7 +146,21 @@
         let allocator = Arc::new(SimpleAllocator::new(&log));
         let stores = Arc::new(StoreManager::new());
         log.replay(device.clone(), stores.clone(), allocator.clone())?;
-        Ok(Filesystem { device, log, stores })
+
+        let volume_manager_handle =
+            stores.root_store().open_directory(stores.root_store().root_object_id())?;
+        let volumes = Arc::new(VolumeManager::new(volume_manager_handle));
+
+        Ok(Filesystem { device, log, stores, volumes })
+    }
+
+    pub fn new_volume(&mut self, name: &str) -> Result<Directory, Error> {
+        self.volumes.new_volume(name)
+    }
+
+    // TODO is Directory the best type to return? Wrapper type, maybe.
+    pub fn volume(&self, name: &str) -> Option<Directory> {
+        self.volumes.volume(name)
     }
 
     pub fn root_parent_store(&self) -> Arc<ObjectStore> {
@@ -106,3 +181,36 @@
 }
 
 // TODO: Consider sync on drop
+
+#[test]
+fn test_lookup_nonexistent_volume() -> Result<(), Error> {
+    let device = Arc::new(FakeDevice::new(512));
+    let filesystem = Filesystem::new_empty(device.clone())?;
+    if let None = filesystem.volume("vol") {
+        Ok(())
+    } else {
+        Err(anyhow!("Expected no volume"))
+    }
+}
+
+// TODO fix me
+/*
+#[test]
+fn test_add_volume() -> Result<(), Error> {
+    let device = Arc::new(FakeDevice::new(512));
+    {
+        let mut filesystem = Filesystem::new_empty(device.clone())?;
+
+        let mut volume = filesystem.new_volume("vol")?;
+        volume.create_child_file("foo")?;
+        filesystem.sync(SyncOptions::default())?;
+    };
+    {
+        let filesystem = Filesystem::open(device.clone())?;
+
+        let volume = filesystem.volume("vol").ok_or(anyhow!("Volume not found"))?;
+        volume.lookup("foo")?;
+    };
+    Ok(())
+}
+*/
\ No newline at end of file
diff --git a/src/storage/fxfs/src/object_store/log.rs b/src/storage/fxfs/src/object_store/log.rs
index 9807b3a..4ccb3b21 100644
--- a/src/storage/fxfs/src/object_store/log.rs
+++ b/src/storage/fxfs/src/object_store/log.rs
@@ -438,10 +438,10 @@
             super_block: SuperBlock::default(),
             log_file_checkpoints: HashMap::new(),
         });
-        stores.set_root_store(ObjectStore::create_child_store(
-            stores.root_parent_store().clone(),
-            StoreOptions { use_parent_to_allocate_object_ids: true, ..Default::default() },
-        )?);
+        stores.set_root_store(stores.root_parent_store().create_child_store(StoreOptions {
+            use_parent_to_allocate_object_ids: true,
+            ..Default::default()
+        })?);
         let root_store = stores.root_store();
         println!("root store object id {:?}", root_store.store_object_id());
         allocator.init(&root_store)?;
diff --git a/src/storage/fxfs/src/object_store/record.rs b/src/storage/fxfs/src/object_store/record.rs
index e87521a..4f7fe41 100644
--- a/src/storage/fxfs/src/object_store/record.rs
+++ b/src/storage/fxfs/src/object_store/record.rs
@@ -84,26 +84,26 @@
 }
 
 impl ObjectKey {
-    pub fn object(object_id: u64) -> ObjectKey {
-        ObjectKey { object_id: object_id, data: ObjectKeyData::Object }
+    pub fn object(object_id: u64) -> Self {
+        Self { object_id: object_id, data: ObjectKeyData::Object }
     }
 
-    pub fn attribute(object_id: u64, attribute_id: u64) -> ObjectKey {
-        ObjectKey { object_id, data: ObjectKeyData::Attribute { attribute_id } }
+    pub fn attribute(object_id: u64, attribute_id: u64) -> Self {
+        Self { object_id, data: ObjectKeyData::Attribute { attribute_id } }
     }
 
-    pub fn extent(object_id: u64, extent_key: ExtentKey) -> ObjectKey {
-        ObjectKey { object_id, data: ObjectKeyData::Extent(extent_key) }
+    pub fn extent(object_id: u64, extent_key: ExtentKey) -> Self {
+        Self { object_id, data: ObjectKeyData::Extent(extent_key) }
     }
 
-    pub fn child(object_id: u64, name: &str) -> ObjectKey {
-        ObjectKey { object_id, data: ObjectKeyData::Child { name: name.to_owned() } }
+    pub fn child(object_id: u64, name: &str) -> Self {
+        Self { object_id, data: ObjectKeyData::Child { name: name.to_owned() } }
     }
 
     // Returns the lower bound for a key.
-    pub fn lower_bound(&self) -> ObjectKey {
-        if let ObjectKey { object_id, data: ObjectKeyData::Extent(e) } = self {
-            ObjectKey::extent(*object_id, e.lower_bound())
+    pub fn lower_bound(&self) -> Self {
+        if let Self { object_id, data: ObjectKeyData::Extent(e) } = self {
+            Self::extent(*object_id, e.lower_bound())
         } else {
             self.clone()
         }
@@ -125,6 +125,7 @@
 pub enum ObjectType {
     File,
     Directory,
+    Volume,
 }
 
 #[derive(Clone, Debug, Serialize, Deserialize)]
diff --git a/src/storage/fxfs/src/object_store/tests.rs b/src/storage/fxfs/src/object_store/tests.rs
index 3461800..1a5b547 100644
--- a/src/storage/fxfs/src/object_store/tests.rs
+++ b/src/storage/fxfs/src/object_store/tests.rs
@@ -2,7 +2,6 @@
     crate::{
         lsm_tree::LSMTree,
         object_handle::ObjectHandle,
-        object_store,
         object_store::{
             filesystem::{Filesystem, SyncOptions},
             log::Transaction,
@@ -10,51 +9,16 @@
             record::{ExtentKey, ObjectItem, ObjectKey, ObjectValue},
             HandleOptions,
         },
+        testing::fake_device::FakeDevice,
     },
     anyhow::Error,
-    std::sync::{Arc, Mutex},
+    std::sync::Arc,
 };
 
-struct Device {
-    block_size: u64,
-    data: Mutex<Vec<u8>>,
-}
-
-impl Device {
-    fn new(block_size: u64) -> Device {
-        Device { block_size, data: Mutex::new(Vec::new()) }
-    }
-}
-
-impl object_store::Device for Device {
-    fn block_size(&self) -> u64 {
-        self.block_size
-    }
-    fn read(&self, offset: u64, buf: &mut [u8]) -> std::io::Result<()> {
-        assert!(offset % self.block_size == 0);
-        assert!(buf.len() % self.block_size as usize == 0);
-        let required_len = offset as usize + buf.len();
-        let data = self.data.lock().unwrap();
-        assert!(data.len() >= required_len);
-        buf.copy_from_slice(&data[offset as usize..offset as usize + buf.len()]);
-        Ok(())
-    }
-
-    fn write(&self, offset: u64, buf: &[u8]) -> std::io::Result<()> {
-        assert!(buf.len() % self.block_size as usize == 0);
-        let end = offset as usize + buf.len();
-        let mut data = self.data.lock().unwrap();
-        if data.len() < end {
-            data.resize(end, 0);
-        }
-        data[offset as usize..end].copy_from_slice(buf);
-        Ok(())
-    }
-}
 
 #[test]
 fn test_object_store() -> Result<(), Error> {
-    let device = Arc::new(Device::new(512));
+    let device = Arc::new(FakeDevice::new(512));
     let object_id;
     {
         let mut filesystem = Filesystem::new_empty(device.clone())?;
@@ -199,7 +163,7 @@
 
 #[test]
 fn test_directory() -> Result<(), Error> {
-    let device = Arc::new(Device::new(512));
+    let device = Arc::new(FakeDevice::new(512));
     let directory_id;
     {
         let mut filesystem = Filesystem::new_empty(device.clone())?;
diff --git a/src/storage/fxfs/src/testing/fake_device.rs b/src/storage/fxfs/src/testing/fake_device.rs
new file mode 100644
index 0000000..eea9b9a2
--- /dev/null
+++ b/src/storage/fxfs/src/testing/fake_device.rs
@@ -0,0 +1,38 @@
+use {crate::object_store, std::sync::Mutex};
+
+pub struct FakeDevice {
+    block_size: u64,
+    data: Mutex<Vec<u8>>,
+}
+
+impl FakeDevice {
+    pub fn new(block_size: u64) -> Self {
+        Self { block_size, data: Mutex::new(Vec::new()) }
+    }
+}
+
+impl object_store::Device for FakeDevice {
+    fn block_size(&self) -> u64 {
+        self.block_size
+    }
+    fn read(&self, offset: u64, buf: &mut [u8]) -> std::io::Result<()> {
+        assert!(offset % self.block_size == 0);
+        assert!(buf.len() % self.block_size as usize == 0);
+        let required_len = offset as usize + buf.len();
+        let data = self.data.lock().unwrap();
+        assert!(data.len() >= required_len);
+        buf.copy_from_slice(&data[offset as usize..offset as usize + buf.len()]);
+        Ok(())
+    }
+
+    fn write(&self, offset: u64, buf: &[u8]) -> std::io::Result<()> {
+        assert!(buf.len() % self.block_size as usize == 0);
+        let end = offset as usize + buf.len();
+        let mut data = self.data.lock().unwrap();
+        if data.len() < end {
+            data.resize(end, 0);
+        }
+        data[offset as usize..end].copy_from_slice(buf);
+        Ok(())
+    }
+}
diff --git a/src/storage/fxfs/src/testing/mod.rs b/src/storage/fxfs/src/testing/mod.rs
new file mode 100644
index 0000000..8c1c7bc
--- /dev/null
+++ b/src/storage/fxfs/src/testing/mod.rs
@@ -0,0 +1 @@
+pub mod fake_device;