[fxfs] Fixed volume creation.
Object stores need to be registered with store manager so that we can
find them when replaying the log. When we're replaying the log we need a
way to lazily open stores.
Volume information is stored in a new reserved object (0) that exists
for all object stores. This volume information points at the root
directory for the volume.
Change-Id: I13b055f9f0df8d95da977a09661ba9d2e0162986
diff --git a/src/storage/fxfs/src/lsm_tree.rs b/src/storage/fxfs/src/lsm_tree.rs
index 56da729..94eb950 100644
--- a/src/storage/fxfs/src/lsm_tree.rs
+++ b/src/storage/fxfs/src/lsm_tree.rs
@@ -32,7 +32,13 @@
{
}
pub trait Value:
- std::fmt::Debug + Send + Sync + serde::de::DeserializeOwned + serde::Serialize + std::marker::Unpin + 'static
+ std::fmt::Debug
+ + Send
+ + Sync
+ + serde::de::DeserializeOwned
+ + serde::Serialize
+ + std::marker::Unpin
+ + 'static
{
}
@@ -49,7 +55,13 @@
{
}
impl<V> Value for V where
- V: std::fmt::Debug + Send + Sync + std::marker::Unpin + serde::de::DeserializeOwned + serde::Serialize + 'static
+ V: std::fmt::Debug
+ + Send
+ + Sync
+ + std::marker::Unpin
+ + serde::de::DeserializeOwned
+ + serde::Serialize
+ + 'static
{
}
@@ -253,6 +265,7 @@
let mut merger = merge::Merger::new(iterators, self.merge_fn);
merger.advance()?;
while let Some(item_ref) = merger.get() {
+ eprintln!("{:?}", item_ref);
writer.write(item_ref)?;
merger.advance()?;
}
diff --git a/src/storage/fxfs/src/object_store.rs b/src/storage/fxfs/src/object_store.rs
index e6f8e7a..6e37c02 100644
--- a/src/storage/fxfs/src/object_store.rs
+++ b/src/storage/fxfs/src/object_store.rs
@@ -18,11 +18,11 @@
crate::{
lsm_tree::{ItemRef, LSMTree},
object_handle::{ObjectHandle, ObjectHandleCursor},
- object_store::constants::INVALID_OBJECT_ID,
},
allocator::Allocator,
anyhow::Error,
bincode::{deserialize_from, serialize_into},
+ constants::{LOWEST_SPECIAL_OBJECT_ID, RESERVED_OBJECT_ID},
log::{Log, Mutation},
record::{decode_extent, ExtentKey, ObjectItem, ObjectKey, ObjectKeyData, ObjectValue},
serde::{Deserialize, Serialize},
@@ -42,6 +42,7 @@
#[derive(Default)]
pub struct StoreOptions {
+ // TODO: This is horrible, and we should revisit this to see if it's necessary.
use_parent_to_allocate_object_ids: bool,
}
@@ -401,9 +402,6 @@
#[derive(Clone, Default, Serialize, Deserialize)]
pub struct StoreInfo {
- // The root object ID.
- root_object_id: u64,
-
// The last used object ID.
last_object_id: u64,
@@ -414,7 +412,7 @@
impl StoreInfo {
fn new() -> StoreInfo {
- StoreInfo { root_object_id: INVALID_OBJECT_ID, last_object_id: 0, layers: Vec::new() }
+ StoreInfo { last_object_id: RESERVED_OBJECT_ID, layers: Vec::new() }
}
}
@@ -428,6 +426,12 @@
options: StoreOptions,
store_info: Mutex<StoreInfo>,
tree: LSMTree<ObjectKey, ObjectValue>,
+
+ // When replaying the log, the store cannot read StoreInfo until the whole log
+ // has been replayed, so during that time, opened will be false and records
+ // just get sent to the tree. Once the log has been replayed, we can open the store
+ // and load all the other layer information.
+ opened: Mutex<bool>,
}
impl ObjectStore {
@@ -440,6 +444,7 @@
store_info: StoreInfo,
tree: LSMTree<ObjectKey, ObjectValue>,
options: StoreOptions,
+ opened: bool,
) -> Arc<ObjectStore> {
Arc::new(ObjectStore {
parent_store,
@@ -451,6 +456,7 @@
options,
store_info: Mutex::new(store_info),
tree,
+ opened: Mutex::new(opened),
})
}
@@ -471,6 +477,7 @@
StoreInfo::new(),
LSMTree::new(merge::merge),
options,
+ true,
)
}
@@ -482,19 +489,73 @@
self: &Arc<ObjectStore>,
options: StoreOptions,
) -> Result<Arc<ObjectStore>, Error> {
+ self.ensure_open()?;
// TODO: This should probably all be in a transaction. There should probably be a log
// record to create a store.
let mut transaction = Transaction::new();
let handle = self.clone().create_object(&mut transaction, HandleOptions::default())?;
- self.log.upgrade().unwrap().commit(transaction);
- Ok(Self::new_empty(
+ let log = self.log.upgrade().unwrap();
+ log.commit(transaction);
+
+ // Write a default StoreInfo file. TODO: this should be part of a bigger transaction i.e.
+ // this function should take transaction as an arg.
+ let mut writer = BufWriter::new(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0));
+ serialize_into(&mut writer, &StoreInfo::default())?;
+ writer.flush()?;
+
+ let new_store = Self::new_empty(
Some(self.clone()),
handle.object_id(),
self.device.clone(),
&self.allocator.upgrade().unwrap(),
&self.log.upgrade().unwrap(),
options,
- ))
+ );
+ log.register_store(&new_store);
+ Ok(new_store)
+ }
+
+ pub fn lazy_open_store(
+ self: &Arc<ObjectStore>,
+ store_object_id: u64,
+ options: StoreOptions,
+ ) -> Arc<ObjectStore> {
+ Self::new(
+ Some(self.clone()),
+ store_object_id,
+ self.device.clone(),
+ &self.allocator.upgrade().unwrap(),
+ &self.log.upgrade().unwrap(),
+ StoreInfo::default(),
+ LSMTree::new(merge::merge),
+ options,
+ false,
+ )
+ }
+
+ // TODO: find a way to make sure this is always called at the right time. Any time we add
+ // mutation records to a transaction, this needs to be called prior to that.
+ fn ensure_open(&self) -> Result<(), Error> {
+ let mut opened = self.opened.lock().unwrap();
+ if *opened {
+ return Ok(());
+ }
+
+ let parent_store = self.parent_store.as_ref().unwrap();
+ let handle = parent_store.open_object(self.store_object_id, HandleOptions::default())?;
+ let store_info: StoreInfo =
+ deserialize_from(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0))?;
+ let mut handles = Vec::new();
+ for object_id in &store_info.layers {
+ handles.push(parent_store.open_object(*object_id, HandleOptions::default())?);
+ }
+ self.tree.set_layers(handles.into());
+ let mut current_store_info = self.store_info_for_last_object_id().lock().unwrap();
+ if store_info.last_object_id > current_store_info.last_object_id {
+ current_store_info.last_object_id = store_info.last_object_id
+ }
+ *opened = true;
+ Ok(())
}
pub fn open_store(
@@ -502,51 +563,21 @@
store_object_id: u64,
options: StoreOptions,
) -> Result<Arc<ObjectStore>, Error> {
- println!("opening handle");
- let handle = self.clone().open_object(store_object_id, HandleOptions::default())?;
- println!("deserializing");
- let store_info: StoreInfo =
- deserialize_from(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0))?;
- println!("opening handles");
- let mut handles = Vec::new();
- for object_id in &store_info.layers {
- handles.push(self.clone().open_object(*object_id, HandleOptions::default())?);
- }
- if options.use_parent_to_allocate_object_ids {
- if store_info.last_object_id > self.store_info.lock().unwrap().last_object_id {
- self.store_info.lock().unwrap().last_object_id = store_info.last_object_id;
- }
- }
- Ok(Self::new(
- Some(self.clone()),
- store_object_id,
- self.device.clone(),
- &self.allocator.upgrade().unwrap(),
- &self.log.upgrade().unwrap(),
- store_info,
- LSMTree::open(merge::merge, handles.into_boxed_slice()),
- StoreOptions::default(),
- ))
+ let store = self.lazy_open_store(store_object_id, options);
+ store.ensure_open()?;
+ Ok(store)
}
pub fn store_object_id(&self) -> u64 {
self.store_object_id
}
- pub fn root_object_id(&self) -> u64 {
- self.store_info.lock().unwrap().root_object_id
- }
-
- pub fn set_root_object_id(&self, root_object_id: u64) {
- let mut store_info = self.store_info.lock().unwrap();
- store_info.root_object_id = root_object_id;
- }
-
pub fn open_object(
self: &Arc<Self>,
object_id: u64,
options: HandleOptions,
) -> std::io::Result<StoreObjectHandle> {
+ self.ensure_open().map_err(map_to_io_error)?;
let item = self
.tree
.find(&ObjectKey::attribute(object_id, 0))
@@ -572,6 +603,7 @@
object_id: u64,
options: HandleOptions,
) -> std::io::Result<StoreObjectHandle> {
+ self.ensure_open().map_err(map_to_io_error)?;
transaction.add(
self.store_object_id,
Mutation::Insert {
@@ -601,6 +633,7 @@
}
pub fn create_directory(self: &Arc<Self>) -> std::io::Result<directory::Directory> {
+ self.ensure_open().map_err(map_to_io_error)?;
let object_id = self.get_next_object_id();
let mut transaction = Transaction::new();
transaction.add(
@@ -662,6 +695,7 @@
// Push all in-memory structures to the device. This is not necessary for sync since the log
// will take care of it. This will panic if called on the root parent store.
pub fn flush(&self, force: bool) -> Result<(), Error> {
+ self.ensure_open()?;
let log = self.log();
let mut object_sync = log.begin_object_sync(self.store_object_id);
if !force && !object_sync.needs_sync() {
@@ -694,7 +728,9 @@
// -- Methods only to be called by Log --
pub fn insert(&self, item: ObjectItem) {
let store_info = self.store_info_for_last_object_id();
- if item.key.object_id > store_info.lock().unwrap().last_object_id {
+ if item.key.object_id < LOWEST_SPECIAL_OBJECT_ID
+ && item.key.object_id > store_info.lock().unwrap().last_object_id
+ {
store_info.lock().unwrap().last_object_id = item.key.object_id;
}
self.tree.insert(item);
diff --git a/src/storage/fxfs/src/object_store/constants.rs b/src/storage/fxfs/src/object_store/constants.rs
index 4e855c8..7bb96bd 100644
--- a/src/storage/fxfs/src/object_store/constants.rs
+++ b/src/storage/fxfs/src/object_store/constants.rs
@@ -1,6 +1,11 @@
pub const INVALID_OBJECT_ID: u64 = 0xffffffffffffffff;
pub const ROOT_PARENT_STORE_OBJECT_ID: u64 = 0xfffffffffffffffe;
-pub const SUPER_BLOCK_OBJECT_ID: u64 = 0;
+// This only exists in the root parent store.
+pub const SUPER_BLOCK_OBJECT_ID: u64 = 0xfffffffffffffffd;
+
+pub const LOWEST_SPECIAL_OBJECT_ID: u64 = SUPER_BLOCK_OBJECT_ID;
+
+pub const RESERVED_OBJECT_ID: u64 = 0;
// The first two blocks on the disk are reserved for the super block.
pub const MIN_SUPER_BLOCK_SIZE: u64 = 8192;
diff --git a/src/storage/fxfs/src/object_store/filesystem.rs b/src/storage/fxfs/src/object_store/filesystem.rs
index 675d677..5ff567a 100644
--- a/src/storage/fxfs/src/object_store/filesystem.rs
+++ b/src/storage/fxfs/src/object_store/filesystem.rs
@@ -1,25 +1,26 @@
use {
crate::{
+ object_handle::{ObjectHandle, ObjectHandleCursor},
object_store::{
allocator::{Allocator, SimpleAllocator},
- constants::{INVALID_OBJECT_ID, ROOT_PARENT_STORE_OBJECT_ID},
+ constants::{INVALID_OBJECT_ID, RESERVED_OBJECT_ID, ROOT_PARENT_STORE_OBJECT_ID},
log::Log,
record::ObjectType,
- Device, Directory, ObjectStore, StoreOptions,
+ Device, Directory, HandleOptions, ObjectStore, StoreOptions, Transaction,
},
},
anyhow::Error,
+ bincode::{deserialize_from, serialize_into},
+ serde::{Deserialize, Serialize},
std::{
collections::HashMap,
+ io::{BufWriter, Write},
sync::{Arc, RwLock},
},
};
#[cfg(test)]
-use {
- anyhow::anyhow,
- crate::testing::fake_device::FakeDevice,
-};
+use {crate::testing::fake_device::FakeDevice, anyhow::anyhow};
#[derive(Default)]
pub struct SyncOptions {
@@ -49,8 +50,8 @@
self.store(ROOT_PARENT_STORE_OBJECT_ID).unwrap()
}
- pub fn new_store(&self, store: Arc<ObjectStore>) {
- self.stores.write().unwrap().stores.insert(store.store_object_id(), store);
+ pub fn new_store(&self, store: &Arc<ObjectStore>) {
+ self.stores.write().unwrap().stores.insert(store.store_object_id(), store.clone());
}
pub fn store(&self, store_object_id: u64) -> Option<Arc<ObjectStore>> {
@@ -69,51 +70,16 @@
}
}
-pub struct VolumeManager {
- // We store volumes as entries in a directory.
- storage: Arc<RwLock<Directory>>,
-}
-
-impl VolumeManager {
- pub fn new(storage: Directory) -> Self {
- Self { storage: Arc::new(RwLock::new(storage)) }
- }
-
- pub fn new_volume(&self, volume_name: &str) -> Result<Directory, Error> {
- // TODO this should be transactional.
-
- let volume_store = self.storage.write().unwrap().create_volume_store(volume_name)?;
- println!("added volume ({:?}) object id {:?}", volume_name, volume_store.store_object_id());
-
- // Add the root directory.
- let initial_dir_handle = volume_store.create_directory()?;
- volume_store.set_root_object_id(initial_dir_handle.object_id());
- // TODO do we need to force?
- volume_store.flush(true)?;
- Ok(initial_dir_handle)
- }
-
- pub fn volume(&self, volume_name: &str) -> Option<Directory> {
- let storage = self.storage.read().unwrap();
- let (object_id, object_type) = storage.lookup(volume_name).ok()?;
- // TODO better error handling; this panics if the child's type is unexpected, or the
- // load fails
- if let ObjectType::Volume = object_type {
- // nop
- } else {
- panic!("Unexpceted non-volume child")
- }
- let root_store = storage.object_store();
- let volume_store = root_store.open_store(object_id, StoreOptions::default()).ok()?;
- volume_store.open_directory(volume_store.root_object_id()).ok()
- }
+#[derive(Clone, Default, Serialize, Deserialize)]
+struct VolumeInfo {
+ root_directory_object_id: u64,
}
pub struct Filesystem {
device: Arc<dyn Device>,
log: Arc<Log>,
stores: Arc<StoreManager>,
- volumes: Arc<VolumeManager>,
+ volume_directory: Arc<RwLock<Directory>>,
}
impl Filesystem {
@@ -121,7 +87,7 @@
let log = Arc::new(Log::new());
let allocator = Arc::new(SimpleAllocator::new(&log));
let stores = Arc::new(StoreManager::new());
- stores.new_store(ObjectStore::new_empty(
+ stores.new_store(&ObjectStore::new_empty(
None,
ROOT_PARENT_STORE_OBJECT_ID,
device.clone(),
@@ -133,12 +99,25 @@
// Add a directory as the root of all volumes. The root store's root object ID will be
// this directory.
+ let mut transaction = Transaction::new();
+ let handle = stores.root_store().create_object_with_id(
+ &mut transaction,
+ RESERVED_OBJECT_ID,
+ HandleOptions::default(),
+ )?;
+ log.commit(transaction);
+ let mut writer = BufWriter::new(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0));
let volume_manager_handle = stores.root_store().create_directory()?;
- stores.root_store().set_root_object_id(volume_manager_handle.object_id());
- println!("volume manager object id {:?}", volume_manager_handle.object_id());
- let volumes = Arc::new(VolumeManager::new(volume_manager_handle));
+ let info = VolumeInfo { root_directory_object_id: volume_manager_handle.object_id() };
+ serialize_into(&mut writer, &info)?;
+ writer.flush()?;
- Ok(Filesystem { device, log, stores, volumes })
+ Ok(Filesystem {
+ device,
+ log,
+ stores,
+ volume_directory: Arc::new(RwLock::new(volume_manager_handle)),
+ })
}
pub fn open(device: Arc<dyn Device>) -> Result<Filesystem, Error> {
@@ -147,20 +126,19 @@
let stores = Arc::new(StoreManager::new());
log.replay(device.clone(), stores.clone(), allocator.clone())?;
+ let handle =
+ stores.root_store().open_object(RESERVED_OBJECT_ID, HandleOptions::default())?;
+ let info: VolumeInfo =
+ deserialize_from(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0))?;
let volume_manager_handle =
- stores.root_store().open_directory(stores.root_store().root_object_id())?;
- let volumes = Arc::new(VolumeManager::new(volume_manager_handle));
+ stores.root_store().open_directory(info.root_directory_object_id)?;
- Ok(Filesystem { device, log, stores, volumes })
- }
-
- pub fn new_volume(&mut self, name: &str) -> Result<Directory, Error> {
- self.volumes.new_volume(name)
- }
-
- // TODO is Directory the best type to return? Wrapper type, maybe.
- pub fn volume(&self, name: &str) -> Option<Directory> {
- self.volumes.volume(name)
+ Ok(Filesystem {
+ device,
+ log,
+ stores,
+ volume_directory: Arc::new(RwLock::new(volume_manager_handle)),
+ })
}
pub fn root_parent_store(&self) -> Arc<ObjectStore> {
@@ -178,6 +156,49 @@
pub fn device(&self) -> &Arc<dyn Device> {
return &self.device;
}
+
+ pub fn new_volume(&self, volume_name: &str) -> Result<Directory, Error> {
+ // TODO this should be transactional.
+
+ let volume_store =
+ self.volume_directory.write().unwrap().create_volume_store(volume_name)?;
+
+ // Add the root directory.
+ let mut transaction = Transaction::new();
+ let handle = volume_store.create_object_with_id(
+ &mut transaction,
+ RESERVED_OBJECT_ID,
+ HandleOptions::default(),
+ )?;
+ self.log.commit(transaction);
+ let mut writer = BufWriter::new(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0));
+ let initial_dir_handle = volume_store.create_directory()?;
+ let info = VolumeInfo { root_directory_object_id: initial_dir_handle.object_id() };
+ serialize_into(&mut writer, &info)?;
+ writer.flush()?;
+
+ Ok(initial_dir_handle)
+ }
+
+ // TODO is Directory the best type to return? Wrapper type, maybe.
+ pub fn volume(&self, volume_name: &str) -> Option<Directory> {
+ let volume_directory = self.volume_directory.read().unwrap();
+ let (object_id, object_type) = volume_directory.lookup(volume_name).ok()?;
+ // TODO better error handling; this panics if the child's type is unexpected, or the
+ // load fails
+ if let ObjectType::Volume = object_type {
+ // nop
+ } else {
+ panic!("Unexpected non-volume child")
+ }
+ let volume_store = self.stores.store(object_id).unwrap_or(
+ self.stores.root_store().open_store(object_id, StoreOptions::default()).ok()?,
+ );
+ let handle = volume_store.open_object(RESERVED_OBJECT_ID, HandleOptions::default()).ok()?;
+ let info: VolumeInfo =
+ deserialize_from(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0)).ok()?;
+ volume_store.open_directory(info.root_directory_object_id).ok()
+ }
}
// TODO: Consider sync on drop
@@ -193,8 +214,6 @@
}
}
-// TODO fix me
-/*
#[test]
fn test_add_volume() -> Result<(), Error> {
let device = Arc::new(FakeDevice::new(512));
@@ -213,4 +232,3 @@
};
Ok(())
}
-*/
\ No newline at end of file
diff --git a/src/storage/fxfs/src/object_store/log.rs b/src/storage/fxfs/src/object_store/log.rs
index 4ccb3b21..ad7fb84 100644
--- a/src/storage/fxfs/src/object_store/log.rs
+++ b/src/storage/fxfs/src/object_store/log.rs
@@ -314,7 +314,7 @@
}));
let super_block: SuperBlock = deserialize_from(&mut reader)?;
println!("super-block: {:?}", super_block);
- stores.new_store(ObjectStore::new_empty(
+ stores.new_store(&ObjectStore::new_empty(
None,
ROOT_PARENT_STORE_OBJECT_ID,
device,
@@ -386,7 +386,6 @@
}
if !found_end_of_super_block {
let root_parent = stores.root_parent_store();
- println!("opening object store");
stores.set_root_store(root_parent.open_store(
super_block.root_store_object_id,
StoreOptions {
@@ -398,7 +397,6 @@
reader.read_offset = super_block.log_checkpoint.file_offset;
reader.last_check_sum = super_block.log_checkpoint.check_sum;
reader.last_read_check_sum = super_block.log_checkpoint.check_sum;
- println!("opening log file");
reader.handle = Box::new(stores.root_store().open_object(
super_block.log_object_id,
log_handle_options(),
@@ -543,6 +541,10 @@
pub fn commit(&self, transaction: Transaction) {
self.log().as_mut().unwrap().commit(transaction, None);
}
+
+ pub fn register_store(&self, store: &Arc<ObjectStore>) {
+ self.log().as_ref().unwrap().stores.new_store(store);
+ }
}
pub struct InitializedLog {
@@ -635,6 +637,16 @@
return true;
}
+ fn get_store(&self, store_object_id: u64) -> Arc<ObjectStore> {
+ if let Some(store) = self.stores.store(store_object_id) {
+ return store;
+ }
+ let store =
+ self.stores.root_store().lazy_open_store(store_object_id, StoreOptions::default());
+ self.stores.new_store(&store);
+ store
+ }
+
fn apply_mutation(
&mut self,
object_id: u64,
@@ -642,12 +654,10 @@
mutation: Mutation,
log_file_offsets: Option<&HashMap<u64, u64>>,
) {
- // If a store isn't found below, that's OK --- it means it must have been deleted in a
- // later log message. TODO: is there a nicer way?
match mutation {
Mutation::Insert { item } => {
if self.should_apply(object_id, log_file_checkpoint, log_file_offsets) {
- self.stores.store(object_id).map(|s| s.insert(item));
+ self.get_store(object_id).insert(item);
}
}
Mutation::ReplaceExtent { item } => {
@@ -668,12 +678,12 @@
);
}
if self.should_apply(object_id, log_file_checkpoint, log_file_offsets) {
- self.stores.store(object_id).map(|s| s.replace_extent(item));
+ self.get_store(object_id).replace_extent(item);
}
}
Mutation::ReplaceOrInsert { item } => {
if self.should_apply(object_id, log_file_checkpoint, log_file_offsets) {
- self.stores.store(object_id).map(|s| s.replace_or_insert(item));
+ self.get_store(object_id).replace_or_insert(item);
}
}
Mutation::Deallocate(item) => {