[fxfs] Make fsck check reference counts
And then fix up all test cases so that fsck passes. Compactions should
no longer leak objects if they fail part-way through.
Change-Id: I190bc49220cbae7353271a7c84351a6a4c6b4091
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/524682
Commit-Queue: Chris Suter <csuter@google.com>
Reviewed-by: James Sullivan <jfsulliv@google.com>
diff --git a/src/storage/fxfs/BUILD.gn b/src/storage/fxfs/BUILD.gn
index 1263eef..46aee1d 100644
--- a/src/storage/fxfs/BUILD.gn
+++ b/src/storage/fxfs/BUILD.gn
@@ -47,6 +47,7 @@
"src/object_store/directory.rs",
"src/object_store/filesystem.rs",
"src/object_store/fsck.rs",
+ "src/object_store/graveyard.rs",
"src/object_store/journal.rs",
"src/object_store/journal/reader.rs",
"src/object_store/journal/super_block.rs",
diff --git a/src/storage/fxfs/src/lsm_tree.rs b/src/storage/fxfs/src/lsm_tree.rs
index 7b47aea..2d902b6 100644
--- a/src/storage/fxfs/src/lsm_tree.rs
+++ b/src/storage/fxfs/src/lsm_tree.rs
@@ -83,6 +83,17 @@
Ok(())
}
+ /// Appends to the given layers at the end i.e. they should be base layers. This is supposed
+ /// to be used after replay when we are opening a tree and we have discovered the base layers.
+ pub async fn append_layers(
+ &self,
+ handles: Box<[impl ObjectHandle + 'static]>,
+ ) -> Result<(), Error> {
+ let mut layers = Self::layers_from_handles(handles).await?;
+ self.data.write().unwrap().layers.append(&mut layers);
+ Ok(())
+ }
+
/// Resets the immutable layers.
pub fn reset_immutable_layers(&self) {
self.data.write().unwrap().layers = Vec::new();
diff --git a/src/storage/fxfs/src/object_store.rs b/src/storage/fxfs/src/object_store.rs
index 605846c..c594088 100644
--- a/src/storage/fxfs/src/object_store.rs
+++ b/src/storage/fxfs/src/object_store.rs
@@ -7,6 +7,7 @@
pub mod directory;
pub mod filesystem;
pub mod fsck;
+mod graveyard;
mod journal;
mod merge;
mod record;
@@ -26,7 +27,7 @@
},
errors::FxfsError,
lsm_tree::{types::LayerIterator, LSMTree},
- object_handle::{ObjectHandle, ObjectHandleExt},
+ object_handle::{ObjectHandle, ObjectHandleExt, INVALID_OBJECT_ID},
object_store::{
filesystem::{Filesystem, Mutations, ObjectFlush},
record::{
@@ -50,7 +51,7 @@
cmp::min,
ops::{Bound, Range},
sync::{
- atomic::{self, AtomicBool},
+ atomic::{self, AtomicBool, AtomicU64},
Arc, Mutex, Weak,
},
},
@@ -60,7 +61,8 @@
// store, and is used, for example, to get the persistent layer objects.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct StoreInfo {
- // The last used object ID.
+ // The last used object ID. Note that this field is not accurate in memory; ObjectStore's
+ // last_object_id field is the one to use in that case.
last_object_id: u64,
// Object ids for layers. TODO(csuter): need a layer of indirection here so we can
@@ -71,6 +73,7 @@
root_directory_object_id: u64,
// The object ID for the graveyard.
+ // TODO(csuter): Move this out of here. This can probably be a child of the root directory.
graveyard_directory_object_id: u64,
}
@@ -94,7 +97,8 @@
device: Arc<dyn Device>,
block_size: u32,
filesystem: Weak<dyn Filesystem>,
- store_info: Mutex<StoreInfo>,
+ last_object_id: AtomicU64,
+ store_info: Mutex<Option<StoreInfo>>,
tree: LSMTree<ObjectKey, ObjectValue>,
// When replaying the journal, the store cannot read StoreInfo until the whole journal
@@ -109,7 +113,7 @@
parent_store: Option<Arc<ObjectStore>>,
store_object_id: u64,
filesystem: Arc<dyn Filesystem>,
- store_info: StoreInfo,
+ store_info: Option<StoreInfo>,
tree: LSMTree<ObjectKey, ObjectValue>,
) -> Arc<ObjectStore> {
let device = filesystem.device();
@@ -120,6 +124,7 @@
device,
block_size,
filesystem: Arc::downgrade(&filesystem),
+ last_object_id: AtomicU64::new(0),
store_info: Mutex::new(store_info),
tree,
store_info_handle: OnceCell::new(),
@@ -137,7 +142,7 @@
parent_store,
store_object_id,
filesystem,
- StoreInfo::default(),
+ Some(StoreInfo::default()),
LSMTree::new(merge::merge),
)
}
@@ -159,7 +164,7 @@
}
pub fn root_directory_object_id(&self) -> u64 {
- self.store_info.lock().unwrap().root_directory_object_id
+ self.store_info.lock().unwrap().as_ref().unwrap().root_directory_object_id
}
pub fn set_root_directory_object_id<'a>(&'a self, transaction: &mut Transaction<'a>, oid: u64) {
@@ -169,7 +174,7 @@
}
pub fn graveyard_directory_object_id(&self) -> u64 {
- self.store_info.lock().unwrap().graveyard_directory_object_id
+ self.store_info.lock().unwrap().as_ref().unwrap().graveyard_directory_object_id
}
pub fn set_graveyard_directory_object_id<'a>(
@@ -226,7 +231,7 @@
Some(self.clone()),
store_object_id,
self.filesystem.upgrade().unwrap(),
- StoreInfo::default(),
+ None,
LSMTree::new(merge::merge),
)
}
@@ -275,6 +280,10 @@
) -> Result<StoreObjectHandle<S>, Error> {
let store = owner.as_ref().as_ref();
store.ensure_open().await?;
+ // If the object ID was specified i.e. this hasn't come via create_object, then we
+ // should update last_object_id in case the caller wants to create more objects in
+ // the same transaction.
+ store.update_last_object_id(object_id);
transaction.add(
store.store_object_id(),
Mutation::insert_object(ObjectKey::object(object_id), ObjectValue::file(1, 0)),
@@ -370,6 +379,33 @@
Ok(())
}
+ /// Returns all objects that exist in the parent store that pertain to this object store.
+ pub fn parent_objects(&self) -> Vec<u64> {
+ assert!(self.store_info_handle.get().is_some());
+ let mut objects = Vec::new();
+ // We should not include the ID of the store itself, since that should be referred to in the
+ // volume directory.
+ objects.extend_from_slice(&self.store_info.lock().unwrap().as_ref().unwrap().layers);
+ objects
+ }
+
+ /// Returns root objects for this store.
+ pub fn root_objects(&self) -> Vec<u64> {
+ let mut objects = Vec::new();
+ let store_info = self.store_info.lock().unwrap();
+ if store_info.as_ref().unwrap().root_directory_object_id != INVALID_OBJECT_ID {
+ objects.push(store_info.as_ref().unwrap().root_directory_object_id);
+ }
+ if store_info.as_ref().unwrap().graveyard_directory_object_id != INVALID_OBJECT_ID {
+ objects.push(store_info.as_ref().unwrap().graveyard_directory_object_id);
+ }
+ objects
+ }
+
+ pub fn store_info(&self) -> StoreInfo {
+ self.store_info.lock().unwrap().as_ref().unwrap().clone()
+ }
+
async fn ensure_open(&self) -> Result<(), Error> {
if self.parent_store.is_none() || self.store_info_handle.get().is_some() {
return Ok(());
@@ -390,23 +426,25 @@
HandleOptions::default(),
)
.await?;
- if handle.get_size() > 0 {
+ let need_store_info = self.store_info.lock().unwrap().is_none();
+ let layer_object_ids = if need_store_info && handle.get_size() > 0 {
let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
let store_info: StoreInfo = deserialize_from(&serialized_info[..])?;
- let mut handles = Vec::new();
- for object_id in &store_info.layers {
- handles.push(
- ObjectStore::open_object(
- &parent_store,
- *object_id,
- HandleOptions::default(),
- )
+ let layer_object_ids = store_info.layers.clone();
+ self.update_last_object_id(store_info.last_object_id);
+ *self.store_info.lock().unwrap() = Some(store_info);
+ layer_object_ids
+ } else {
+ self.store_info.lock().unwrap().as_ref().unwrap().layers.clone()
+ };
+ let mut handles = Vec::new();
+ for object_id in layer_object_ids {
+ handles.push(
+ ObjectStore::open_object(&parent_store, object_id, HandleOptions::default())
.await?,
- );
- }
- self.tree.set_layers(handles.into()).await?;
- self.update_store_info(store_info);
+ );
}
+ self.tree.append_layers(handles.into()).await?;
let _ = self.store_info_handle.set(handle);
Ok(())
}
@@ -414,9 +452,7 @@
}
fn get_next_object_id(&self) -> u64 {
- let mut store_info = self.store_info.lock().unwrap();
- store_info.last_object_id += 1;
- store_info.last_object_id
+ self.last_object_id.fetch_add(1, atomic::Ordering::Relaxed) + 1
}
fn allocator(&self) -> Arc<dyn Allocator> {
@@ -425,19 +461,17 @@
fn txn_get_store_info(&self, transaction: &Transaction<'_>) -> StoreInfo {
match transaction.get_store_info(self.store_object_id) {
- None => self.store_info.lock().unwrap().clone(),
+ None => self.store_info(),
Some(store_info) => store_info.clone(),
}
}
- // The last object ID is updated as we create new objects, so we need to always take the maximum
- // rather than blindly overwriting the last_object_id field.
- fn update_store_info(&self, mut new_store_info: StoreInfo) {
- let mut store_info = self.store_info.lock().unwrap();
- if store_info.last_object_id > new_store_info.last_object_id {
- new_store_info.last_object_id = store_info.last_object_id;
- }
- *store_info = new_store_info;
+ fn update_last_object_id(&self, object_id: u64) {
+ let _ = self.last_object_id.fetch_update(
+ atomic::Ordering::Relaxed,
+ atomic::Ordering::Relaxed,
+ |oid| if object_id > oid { Some(object_id) } else { None },
+ );
}
}
@@ -453,12 +487,7 @@
match mutation {
Mutation::ObjectStore(ObjectStoreMutation { item, op }) => {
- {
- let mut store_info = self.store_info.lock().unwrap();
- if item.key.object_id > store_info.last_object_id {
- store_info.last_object_id = item.key.object_id;
- }
- }
+ self.update_last_object_id(item.key.object_id);
match op {
Operation::Insert => self.tree.insert(item).await,
Operation::ReplaceOrInsert => self.tree.replace_or_insert(item).await,
@@ -468,12 +497,8 @@
}
}
}
- Mutation::ObjectStoreInfo(StoreInfoMutation(mut store_info)) => {
- let mut info = self.store_info.lock().unwrap();
- if info.last_object_id > store_info.last_object_id {
- store_info.last_object_id = info.last_object_id;
- }
- *info = store_info;
+ Mutation::ObjectStoreInfo(StoreInfoMutation(store_info)) => {
+ *self.store_info.lock().unwrap() = Some(store_info);
}
Mutation::TreeSeal => self.tree.seal().await,
Mutation::TreeCompact => {
@@ -503,33 +528,47 @@
if !object_manager.needs_flush(self.store_object_id) {
return Ok(());
}
- let object_sync = ObjectFlush::new(object_manager, self.store_object_id);
+
let parent_store = self.parent_store.as_ref().unwrap();
+ let graveyard = object_manager.graveyard().ok_or(anyhow!("Missing graveyard!"))?;
+
+ let object_sync = ObjectFlush::new(object_manager, self.store_object_id);
let mut transaction = filesystem.clone().new_transaction(&[]).await?;
let object_handle =
ObjectStore::create_object(parent_store, &mut transaction, HandleOptions::default())
.await?;
+ let object_id = object_handle.object_id();
+ graveyard.add(&mut transaction, parent_store.store_object_id(), object_id);
transaction.add_with_object(self.store_object_id(), Mutation::TreeSeal, &object_sync);
transaction.commit().await;
- let object_id = object_handle.object_id();
self.tree.compact(&object_handle).await?;
let mut serialized_info = Vec::new();
- let mut new_store_info = self.store_info.lock().unwrap().clone();
+ let mut new_store_info = self.store_info();
+
+ let mut transaction = filesystem.clone().new_transaction(&[]).await?;
+
+ // Move all the existing layers to the graveyard.
+ for object_id in new_store_info.layers {
+ graveyard.add(&mut transaction, parent_store.store_object_id(), object_id);
+ }
+
+ new_store_info.last_object_id = self.last_object_id.load(atomic::Ordering::Relaxed);
new_store_info.layers = vec![object_id];
serialize_into(&mut serialized_info, &new_store_info)?;
let mut buf = self.device.allocate_buffer(serialized_info.len());
buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
- let mut transaction = filesystem.clone().new_transaction(&[]).await?;
self.store_info_handle
.get()
.unwrap()
.txn_write(&mut transaction, 0u64, buf.as_ref())
.await?;
transaction.add(self.store_object_id(), Mutation::TreeCompact);
- self.update_store_info(new_store_info);
+ graveyard.remove(&mut transaction, parent_store.store_object_id(), object_id);
+ // TODO(csuter): This isn't thread-safe.
+ *self.store_info.lock().unwrap() = Some(new_store_info);
transaction.commit().await;
self.tree.set_layers(Box::new([object_handle])).await.expect("set_layers failed");
@@ -549,7 +588,7 @@
fn will_apply_mutation(&self, mutation: &Mutation) {
match mutation {
Mutation::ObjectStoreInfo(StoreInfoMutation(store_info)) => {
- self.update_store_info(store_info.clone());
+ *self.store_info.lock().unwrap() = Some(store_info.clone());
}
_ => {}
}
@@ -1269,6 +1308,7 @@
object_handle::{ObjectHandle, ObjectHandleExt},
object_store::{
filesystem::{Filesystem, Mutations},
+ graveyard::Graveyard,
record::{ObjectKey, ObjectKeyData},
round_up,
testing::{fake_allocator::FakeAllocator, fake_filesystem::FakeFilesystem},
@@ -1308,6 +1348,9 @@
.create_child_store_with_id(&mut transaction, 3)
.await
.expect("create_child_store failed");
+ let graveyard =
+ Arc::new(Graveyard::create(&mut transaction, &store).await.expect("create failed"));
+ fs.object_manager().register_graveyard(graveyard);
transaction.commit().await;
(fs.clone(), allocator, store)
}
diff --git a/src/storage/fxfs/src/object_store/allocator.rs b/src/storage/fxfs/src/object_store/allocator.rs
index 0f6478c..6f821d3 100644
--- a/src/storage/fxfs/src/object_store/allocator.rs
+++ b/src/storage/fxfs/src/object_store/allocator.rs
@@ -22,7 +22,7 @@
HandleOptions, ObjectStore,
},
},
- anyhow::{bail, ensure, Error},
+ anyhow::{anyhow, bail, ensure, Error},
async_trait::async_trait,
bincode::{deserialize_from, serialize_into},
merge::merge,
@@ -220,13 +220,20 @@
);
}
self.inner.lock().unwrap().info = info;
- self.tree.set_layers(handles.into_boxed_slice()).await?;
+ self.tree.append_layers(handles.into_boxed_slice()).await?;
}
}
self.inner.lock().unwrap().opened = true;
Ok(())
}
+
+ /// Returns all objects that exist in the parent store that pertain to this allocator.
+ pub fn parent_objects(&self) -> Vec<u64> {
+ // The allocator tree needs to store a file for each of the layers in the tree, so we return
+ // those, since nothing else references them.
+ self.inner.lock().unwrap().info.layers.clone()
+ }
}
#[async_trait]
@@ -354,6 +361,7 @@
if !object_manager.needs_flush(self.object_id()) {
return Ok(());
}
+ let graveyard = object_manager.graveyard().ok_or(anyhow!("Missing graveyard!"))?;
let object_sync = ObjectFlush::new(object_manager, self.object_id());
// TODO(csuter): This all needs to be atomic somehow. We'll need to use different
// transactions for each stage, but we need make sure objects are cleaned up if there's a
@@ -364,11 +372,11 @@
let layer_object_handle =
ObjectStore::create_object(&root_store, &mut transaction, HandleOptions::default())
.await?;
-
+ let object_id = layer_object_handle.object_id();
+ graveyard.add(&mut transaction, root_store.store_object_id(), object_id);
transaction.add_with_object(self.object_id(), Mutation::TreeSeal, &object_sync);
transaction.commit().await;
- let object_id = layer_object_handle.object_id();
let layer_set = self.tree.immutable_layer_set();
let mut merger = layer_set.merger();
self.tree
@@ -382,20 +390,28 @@
let object_handle =
ObjectStore::open_object(&root_store, self.object_id(), HandleOptions::default())
.await?;
+
// TODO(jfsulliv): Can we preallocate the buffer instead of doing a bounce? Do we know the
// size up front?
+ let mut transaction = filesystem.clone().new_transaction(&[]).await?;
let mut serialized_info = Vec::new();
{
let mut inner = self.inner.lock().unwrap();
+
+ // Move all the existing layers to the graveyard.
+ for object_id in &inner.info.layers {
+ graveyard.add(&mut transaction, root_store.store_object_id(), *object_id);
+ }
+
inner.info.layers = vec![object_id];
serialize_into(&mut serialized_info, &inner.info)?;
}
let mut buf = object_handle.allocate_buffer(serialized_info.len());
buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
- let mut transaction = filesystem.clone().new_transaction(&[]).await?;
object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
transaction.add(self.object_id(), Mutation::TreeCompact);
+ graveyard.remove(&mut transaction, root_store.store_object_id(), object_id);
transaction.commit().await;
// TODO(csuter): what if this fails.
@@ -482,6 +498,7 @@
SimpleAllocator,
},
filesystem::{Filesystem, Mutations},
+ graveyard::Graveyard,
testing::fake_filesystem::FakeFilesystem,
transaction::TransactionHandler,
ObjectStore,
@@ -665,9 +682,13 @@
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true));
fs.object_manager().set_allocator(allocator.clone());
- let _store = ObjectStore::new_empty(None, 2, fs.clone());
+ let store = ObjectStore::new_empty(None, 2, fs.clone());
fs.object_manager().set_root_store_object_id(2);
+ allocator.ensure_open().await.expect("ensure_open failed");
let mut transaction = fs.clone().new_transaction(&[]).await.expect("new failed");
+ let graveyard =
+ Arc::new(Graveyard::create(&mut transaction, &store).await.expect("create failed"));
+ fs.object_manager().register_graveyard(graveyard);
let mut device_ranges = Vec::new();
device_ranges
.push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed"));
diff --git a/src/storage/fxfs/src/object_store/directory.rs b/src/storage/fxfs/src/object_store/directory.rs
index 46405a1..4ecf1cc 100644
--- a/src/storage/fxfs/src/object_store/directory.rs
+++ b/src/storage/fxfs/src/object_store/directory.rs
@@ -65,6 +65,7 @@
pub async fn open(owner: &Arc<S>, object_id: u64) -> Result<Directory<S>, Error> {
let store = owner.as_ref().as_ref();
+ store.ensure_open().await?;
if let ObjectItem { value: ObjectValue::Object { kind: ObjectKind::Directory }, .. } =
store.tree.find(&ObjectKey::object(object_id)).await?.ok_or(FxfsError::NotFound)?
{
diff --git a/src/storage/fxfs/src/object_store/filesystem.rs b/src/storage/fxfs/src/object_store/filesystem.rs
index a121d95..165af6a 100644
--- a/src/storage/fxfs/src/object_store/filesystem.rs
+++ b/src/storage/fxfs/src/object_store/filesystem.rs
@@ -8,8 +8,8 @@
object_handle::INVALID_OBJECT_ID,
object_store::{
allocator::Allocator,
- directory::Directory,
- journal::{Journal, JournalCheckpoint},
+ graveyard::Graveyard,
+ journal::{super_block::SuperBlock, Journal, JournalCheckpoint},
transaction::{
AssociatedObject, LockKey, LockManager, Mutation, ReadGuard, Transaction,
TransactionHandler, TxnMutation, WriteGuard,
@@ -64,7 +64,7 @@
// has a dependency on journal records from that offset.
journal_file_checkpoints: HashMap<u64, JournalCheckpoint>,
- graveyards: HashMap<u64, Arc<Directory<ObjectStore>>>,
+ graveyard: Option<Arc<Graveyard>>,
}
impl ObjectManager {
@@ -77,7 +77,7 @@
allocator_object_id: INVALID_OBJECT_ID,
allocator: None,
journal_file_checkpoints: HashMap::new(),
- graveyards: HashMap::new(),
+ graveyard: None,
}),
}
}
@@ -196,12 +196,12 @@
self.objects.read().unwrap().journal_file_checkpoints.contains_key(&object_id)
}
- pub fn graveyard(&self, store_object_id: u64) -> Option<Arc<Directory<ObjectStore>>> {
- self.objects.read().unwrap().graveyards.get(&store_object_id).cloned()
+ pub fn graveyard(&self) -> Option<Arc<Graveyard>> {
+ self.objects.read().unwrap().graveyard.clone()
}
- pub fn register_graveyard(&self, store_object_id: u64, directory: Arc<Directory<ObjectStore>>) {
- self.objects.write().unwrap().graveyards.insert(store_object_id, directory);
+ pub fn register_graveyard(&self, graveyard: Arc<Graveyard>) {
+ self.objects.write().unwrap().graveyard = Some(graveyard);
}
/// Flushes all known objects. This will then allow the journal space to be freed.
@@ -330,9 +330,13 @@
Ok(filesystem)
}
- pub async fn open(device: DeviceHolder) -> Result<Arc<FxFilesystem>, Error> {
+ pub async fn open_with_trace(
+ device: DeviceHolder,
+ trace: bool,
+ ) -> Result<Arc<FxFilesystem>, Error> {
let objects = Arc::new(ObjectManager::new());
let journal = Journal::new(objects.clone());
+ journal.set_trace(trace);
let filesystem = Arc::new(FxFilesystem {
device: OnceCell::new(),
objects: objects.clone(),
@@ -346,6 +350,14 @@
Ok(filesystem)
}
+ pub fn set_trace(&self, v: bool) {
+ self.journal.set_trace(v);
+ }
+
+ pub async fn open(device: DeviceHolder) -> Result<Arc<FxFilesystem>, Error> {
+ Self::open_with_trace(device, false).await
+ }
+
pub fn root_parent_store(&self) -> Arc<ObjectStore> {
self.objects.root_parent_store()
}
@@ -403,6 +415,10 @@
receiver.await.unwrap()
}
+ pub fn super_block(&self) -> SuperBlock {
+ self.journal.super_block()
+ }
+
async fn wait_for_compaction_to_finish(&self) {
let compaction_task = self.compaction_task.lock().unwrap().take();
if let Some(compaction_task) = compaction_task {
@@ -486,10 +502,10 @@
device::DeviceHolder,
object_handle::{ObjectHandle, ObjectHandleExt},
object_store::{
+ directory::Directory,
filesystem::{FxFilesystem, SyncOptions},
fsck::fsck,
transaction::TransactionHandler,
- HandleOptions, ObjectStore,
},
testing::fake_device::FakeDevice,
},
@@ -505,17 +521,19 @@
// If compaction is not working correctly, this test will run out of space.
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
+ let root_store = fs.root_store();
+ let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
+ .await
+ .expect("open failed");
+
let mut tasks = Vec::new();
- for _ in 0..2 {
+ for i in 0..2 {
let mut transaction =
fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
- let handle = ObjectStore::create_object(
- &fs.root_store(),
- &mut transaction,
- HandleOptions::default(),
- )
- .await
- .expect("create_object failed");
+ let handle = root_directory
+ .create_child_file(&mut transaction, &format!("{}", i))
+ .await
+ .expect("create_child_file failed");
transaction.commit().await;
tasks.push(fasync::Task::spawn(async move {
const TEST_DATA: &[u8] = b"hello";
diff --git a/src/storage/fxfs/src/object_store/fsck.rs b/src/storage/fxfs/src/object_store/fsck.rs
index 61925e2..d5c6fe9 100644
--- a/src/storage/fxfs/src/object_store/fsck.rs
+++ b/src/storage/fxfs/src/object_store/fsck.rs
@@ -6,18 +6,27 @@
crate::{
lsm_tree::{
skip_list_layer::SkipListLayer,
- types::{Item, Layer, LayerIterator, MutableLayer},
+ types::{Item, ItemRef, Layer, LayerIterator, MutableLayer},
},
object_store::{
allocator::{self, AllocatorKey, AllocatorValue, CoalescingIterator, SimpleAllocator},
+ constants::SUPER_BLOCK_OBJECT_ID,
filesystem::{Filesystem, FxFilesystem},
- record::ExtentValue,
+ graveyard::Graveyard,
+ record::{
+ AttributeKey, ExtentValue, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue,
+ },
transaction::LockKey,
+ ObjectStore,
},
},
- anyhow::{bail, Error},
+ anyhow::{anyhow, bail, Error},
futures::try_join,
- std::ops::Bound,
+ std::{
+ collections::hash_map::{Entry, HashMap},
+ ops::Bound,
+ sync::Arc,
+ },
};
// TODO(csuter): for now, this just checks allocations. We should think about adding checks for:
@@ -41,43 +50,51 @@
let _guard = filesystem.write_lock(&[LockKey::Filesystem]).await;
let object_manager = filesystem.object_manager();
- let skip_list = SkipListLayer::new(2048); // TODO(csuter): fix magic number
+ let graveyard = object_manager.graveyard().ok_or(anyhow!("Missing graveyard!"))?;
+ let fsck = Fsck::new();
+ let super_block = filesystem.super_block();
+
+ // Scan the root parent object store.
+ let mut root_objects = vec![super_block.root_store_object_id, super_block.journal_object_id];
+ root_objects.append(&mut object_manager.root_store().parent_objects());
+ fsck.scan_store(&object_manager.root_parent_store(), &root_objects, &graveyard).await?;
+
+ let root_store = &object_manager.root_store();
+ let mut root_store_root_objects = Vec::new();
+ root_store_root_objects
+ .append(&mut vec![super_block.allocator_object_id, SUPER_BLOCK_OBJECT_ID]);
+ root_store_root_objects.append(&mut root_store.root_objects());
// TODO(csuter): We could maybe iterate over stores concurrently.
for store_id in object_manager.store_object_ids() {
+ if store_id == super_block.root_parent_store_object_id
+ || store_id == super_block.root_store_object_id
+ {
+ continue;
+ }
let store = object_manager.store(store_id).expect("store disappeared!");
store.ensure_open().await?;
- let layer_set = store.tree.layer_set();
- let mut merger = layer_set.merger();
- let mut iter = merger.seek(Bound::Unbounded).await?;
- while let Some(item_ref) = iter.get() {
- match item_ref.into() {
- Some((_, _, extent_key, ExtentValue { device_offset: Some(device_offset) })) => {
- let item = Item::new(
- AllocatorKey {
- device_range: *device_offset
- ..*device_offset + extent_key.range.end - extent_key.range.start,
- },
- AllocatorValue { delta: 1 },
- );
- let lower_bound = item.key.lower_bound_for_merge_into();
- skip_list.merge_into(item, &lower_bound, allocator::merge::merge).await;
- }
- _ => {}
- }
- iter.advance().await?;
- }
+ fsck.scan_store(&store, &store.root_objects(), &graveyard).await?;
+ let mut parent_objects = store.parent_objects();
+ root_store_root_objects.append(&mut parent_objects);
}
- // Now compare our regenerated allocation map with what we actually have.
+
// TODO(csuter): It's a bit crude how details of SimpleAllocator are leaking here. Is there
// a better way?
let allocator = filesystem.allocator().as_any().downcast::<SimpleAllocator>().unwrap();
allocator.ensure_open().await?;
+ root_store_root_objects.append(&mut allocator.parent_objects());
+
+ // Finally scan the root object store.
+ fsck.scan_store(root_store, &root_store_root_objects, &graveyard).await?;
+
+ // Now compare our regenerated allocation map with what we actually have.
let layer_set = allocator.tree().layer_set();
let mut merger = layer_set.merger();
let iter = merger.seek(Bound::Unbounded).await?;
let mut actual = CoalescingIterator::new(Box::new(iter)).await?;
- let mut expected = CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await?).await?;
+ let mut expected =
+ CoalescingIterator::new(fsck.allocations.seek(Bound::Unbounded).await?).await?;
while let Some(actual_item) = actual.get() {
match expected.get() {
None => bail!("found extra allocation {:?}", actual_item),
@@ -95,6 +112,108 @@
Ok(())
}
+struct Fsck {
+ allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
+}
+
+impl Fsck {
+ fn new() -> Self {
+ Fsck { allocations: SkipListLayer::new(2048) } // TODO(csuter): fix magic number
+ }
+
+ pub async fn scan_store(
+ &self,
+ store: &ObjectStore,
+ root_objects: &[u64],
+ graveyard: &Graveyard,
+ ) -> Result<(), Error> {
+ let mut object_refs: HashMap<u64, (u64, u64)> = HashMap::new();
+
+ // Add all the graveyard references.
+ let layer_set = graveyard.store().tree().layer_set();
+ let mut merger = layer_set.merger();
+ let mut iter = graveyard.iter_from(&mut merger, (store.store_object_id(), 0)).await?;
+ while let Some((store_object_id, object_id)) = iter.get() {
+ if store_object_id != store.store_object_id() {
+ break;
+ }
+ object_refs.insert(object_id, (0, 1));
+ iter.advance().await?;
+ }
+
+ let layer_set = store.tree.layer_set();
+ let mut merger = layer_set.merger();
+ let mut iter = merger.seek(Bound::Unbounded).await?;
+ for root_object in root_objects {
+ object_refs.insert(*root_object, (0, 1));
+ }
+ while let Some(ItemRef { key, value }) = iter.get() {
+ match (key, value) {
+ (
+ ObjectKey { object_id, data: ObjectKeyData::Object },
+ ObjectValue::Object { kind },
+ ) => {
+ let refs = match kind {
+ ObjectKind::File { refs, .. } => *refs,
+ ObjectKind::Directory | ObjectKind::Graveyard => 1,
+ };
+ match object_refs.entry(*object_id) {
+ Entry::Occupied(mut occupied) => {
+ occupied.get_mut().0 += refs;
+ }
+ Entry::Vacant(vacant) => {
+ vacant.insert((refs, 0));
+ }
+ }
+ }
+ (
+ ObjectKey {
+ data: ObjectKeyData::Attribute(_, AttributeKey::Extent(extent_key)),
+ ..
+ },
+ ObjectValue::Extent(ExtentValue { device_offset: Some(device_offset) }),
+ ) => {
+ let item = Item::new(
+ AllocatorKey {
+ device_range: *device_offset
+ ..*device_offset + extent_key.range.end - extent_key.range.start,
+ },
+ AllocatorValue { delta: 1 },
+ );
+ let lower_bound = item.key.lower_bound_for_merge_into();
+ self.allocations.merge_into(item, &lower_bound, allocator::merge::merge).await;
+ }
+ (
+ ObjectKey { data: ObjectKeyData::Child { .. }, .. },
+ ObjectValue::Child { object_id, .. },
+ ) => match object_refs.entry(*object_id) {
+ Entry::Occupied(mut occupied) => {
+ occupied.get_mut().1 += 1;
+ }
+ Entry::Vacant(vacant) => {
+ vacant.insert((0, 1));
+ }
+ },
+ _ => {}
+ }
+ iter.advance().await?;
+ }
+ // Check object reference counts.
+ for (object_id, (count, references)) in object_refs {
+ if count != references {
+ bail!(
+ "object {}.{} reference count mismatch: actual: {}, expected: {}",
+ store.store_object_id(),
+ object_id,
+ count,
+ references
+ );
+ }
+ }
+ Ok(())
+ }
+}
+
#[cfg(test)]
mod tests {
use {
@@ -102,12 +221,16 @@
crate::{
device::DeviceHolder,
lsm_tree::types::{Item, ItemRef, LayerIterator},
+ object_handle::ObjectHandle,
object_store::{
allocator::{
Allocator, AllocatorKey, AllocatorValue, CoalescingIterator, SimpleAllocator,
},
+ directory::Directory,
filesystem::{Filesystem, FxFilesystem},
+ record::ObjectDescriptor,
transaction::TransactionHandler,
+ HandleOptions, ObjectStore,
},
testing::fake_device::FakeDevice,
},
@@ -190,4 +313,66 @@
let error = format!("{}", fsck(&fs).await.expect_err("fsck succeeded"));
assert!(error.contains("missing allocation"), "{}", error);
}
+
+ #[fasync::run_singlethreaded(test)]
+ async fn test_too_many_object_refs() {
+ let fs = FxFilesystem::new_empty(DeviceHolder::new(FakeDevice::new(
+ 2048,
+ TEST_DEVICE_BLOCK_SIZE,
+ )))
+ .await
+ .expect("new_empty failed");
+
+ let root_store = fs.root_store();
+ let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
+ .await
+ .expect("open failed");
+
+ let mut transaction =
+ fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
+ let child_file = root_directory
+ .create_child_file(&mut transaction, "child_file")
+ .await
+ .expect("create_child_file failed");
+ let child_dir = root_directory
+ .create_child_dir(&mut transaction, "child_dir")
+ .await
+ .expect("create_child_directory failed");
+
+ // Add an extra reference to the child file.
+ child_dir.insert_child(
+ &mut transaction,
+ "test",
+ child_file.object_id(),
+ ObjectDescriptor::File,
+ );
+ transaction.commit().await;
+
+ let error = format!("{}", fsck(&fs).await.expect_err("fsck succeeded"));
+ assert!(error.contains("reference count mismatch"), "{}", error);
+ }
+
+ #[fasync::run_singlethreaded(test)]
+ async fn test_too_few_object_refs() {
+ let fs = FxFilesystem::new_empty(DeviceHolder::new(FakeDevice::new(
+ 2048,
+ TEST_DEVICE_BLOCK_SIZE,
+ )))
+ .await
+ .expect("new_empty failed");
+
+ let root_store = fs.root_store();
+
+ // Create an object but no directory entry referencing that object, so it will end up with a
+ // reference count of one, but zero references.
+ let mut transaction =
+ fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
+ ObjectStore::create_object(&root_store, &mut transaction, HandleOptions::default())
+ .await
+ .expect("create_object failed");
+ transaction.commit().await;
+
+ let error = format!("{}", fsck(&fs).await.expect_err("fsck succeeded"));
+ assert!(error.contains("reference count mismatch"), "{}", error);
+ }
}
diff --git a/src/storage/fxfs/src/object_store/graveyard.rs b/src/storage/fxfs/src/object_store/graveyard.rs
new file mode 100644
index 0000000..1db0c25
--- /dev/null
+++ b/src/storage/fxfs/src/object_store/graveyard.rs
@@ -0,0 +1,220 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+ crate::{
+ errors::FxfsError,
+ lsm_tree::{
+ merge::{Merger, MergerIterator},
+ types::{ItemRef, LayerIterator},
+ },
+ object_store::{
+ record::{ObjectItem, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue},
+ transaction::{Mutation, Transaction},
+ ObjectStore,
+ },
+ },
+ anyhow::{bail, Error},
+ std::{ops::Bound, sync::Arc},
+};
+
+/// A graveyard exists as a place to park objects that should be deleted when they are no longer in
+/// use. How objects enter and leave the graveyard is up to the caller to decide. The intention is
+/// that at mount time, any objects in the graveyard will get removed.
+pub struct Graveyard {
+ store: Arc<ObjectStore>,
+ object_id: u64,
+}
+
+impl Graveyard {
+ pub fn store(&self) -> &Arc<ObjectStore> {
+ &self.store
+ }
+
+ pub fn object_id(&self) -> u64 {
+ self.object_id
+ }
+
+ /// Creates a graveyard object in `store`.
+ pub async fn create(
+ transaction: &mut Transaction<'_>,
+ store: &Arc<ObjectStore>,
+ ) -> Result<Graveyard, Error> {
+ store.ensure_open().await?;
+ let object_id = store.get_next_object_id();
+ transaction.add(
+ store.store_object_id,
+ Mutation::insert_object(
+ ObjectKey::object(object_id),
+ ObjectValue::Object { kind: ObjectKind::Graveyard },
+ ),
+ );
+ Ok(Graveyard { store: store.clone(), object_id })
+ }
+
+ /// Opens a graveyard object in `store`.
+ pub async fn open(store: &Arc<ObjectStore>, object_id: u64) -> Result<Graveyard, Error> {
+ store.ensure_open().await?;
+ if let ObjectItem { value: ObjectValue::Object { kind: ObjectKind::Graveyard }, .. } =
+ store.tree.find(&ObjectKey::object(object_id)).await?.ok_or(FxfsError::NotFound)?
+ {
+ Ok(Graveyard { store: store.clone(), object_id })
+ } else {
+ bail!("Found an object, but it's not a graveyard");
+ }
+ }
+
+ /// Adds an object to the graveyard.
+ pub fn add(&self, transaction: &mut Transaction<'_>, store_object_id: u64, object_id: u64) {
+ transaction.add(
+ self.store.store_object_id(),
+ Mutation::replace_or_insert_object(
+ ObjectKey::graveyard_entry(self.object_id, store_object_id, object_id),
+ ObjectValue::Some,
+ ),
+ );
+ }
+
+ /// Removes an object from the graveyard.
+ pub fn remove(&self, transaction: &mut Transaction<'_>, store_object_id: u64, object_id: u64) {
+ transaction.add(
+ self.store.store_object_id(),
+ Mutation::replace_or_insert_object(
+ ObjectKey::graveyard_entry(self.object_id, store_object_id, object_id),
+ ObjectValue::None,
+ ),
+ );
+ }
+
+ /// Returns an iterator that will return graveyard entries skipping deleted ones. Example
+ /// usage:
+ ///
+ /// let layer_set = graveyard.store().tree().layer_set();
+ /// let mut merger = layer_set.merger();
+ /// let mut iter = graveyard.iter(&mut merger).await?;
+ ///
+ pub async fn iter<'a, 'b>(
+ &self,
+ merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
+ ) -> Result<GraveyardIterator<'a, 'b>, Error> {
+ self.iter_from(merger, (0, 0)).await
+ }
+
+ /// Like "iter", but seeks from a specific (store-id, object-id) tuple. Example usage:
+ ///
+ /// let layer_set = graveyard.store().tree().layer_set();
+ /// let mut merger = layer_set.merger();
+ /// let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await?;
+ ///
+ pub async fn iter_from<'a, 'b>(
+ &self,
+ merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
+ from: (u64, u64),
+ ) -> Result<GraveyardIterator<'a, 'b>, Error> {
+ let mut iter = merger
+ .seek(Bound::Included(&ObjectKey::graveyard_entry(self.object_id, from.0, from.1)))
+ .await?;
+ // Skip deleted entries.
+ // TODO(csuter): Remove this once we've developed a filtering iterator.
+ loop {
+ match iter.get() {
+ Some(ItemRef { key: ObjectKey { object_id, .. }, value: ObjectValue::None })
+ if *object_id == self.object_id => {}
+ _ => break,
+ }
+ iter.advance().await?;
+ }
+ Ok(GraveyardIterator { object_id: self.object_id, iter })
+ }
+}
+
+pub struct GraveyardIterator<'a, 'b> {
+ object_id: u64,
+ iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
+}
+
+impl GraveyardIterator<'_, '_> {
+ pub fn get(&self) -> Option<(u64, u64)> {
+ match self.iter.get() {
+ Some(ItemRef {
+ key:
+ ObjectKey {
+ object_id: oid,
+ data: ObjectKeyData::GraveyardEntry { store_object_id, object_id },
+ },
+ ..
+ }) if *oid == self.object_id => Some((*store_object_id, *object_id)),
+ _ => None,
+ }
+ }
+
+ pub async fn advance(&mut self) -> Result<(), Error> {
+ loop {
+ self.iter.advance().await?;
+ // Skip deleted entries.
+ match self.iter.get() {
+ Some(ItemRef { key: ObjectKey { object_id, .. }, value: ObjectValue::None })
+ if *object_id == self.object_id => {}
+ _ => return Ok(()),
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use {
+ super::Graveyard,
+ crate::{
+ device::DeviceHolder,
+ object_store::{filesystem::FxFilesystem, transaction::TransactionHandler},
+ testing::fake_device::FakeDevice,
+ },
+ fuchsia_async as fasync,
+ };
+
+ const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
+
+ #[fasync::run_singlethreaded(test)]
+ async fn test_graveyard() {
+ let device = DeviceHolder::new(FakeDevice::new(2048, TEST_DEVICE_BLOCK_SIZE));
+ let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
+ let root_store = fs.root_store();
+
+ // Create and add two objects to the graveyard.
+ let mut transaction =
+ fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
+ let graveyard =
+ Graveyard::create(&mut transaction, &root_store).await.expect("create failed");
+ graveyard.add(&mut transaction, 2, 3);
+ graveyard.add(&mut transaction, 3, 4);
+ transaction.commit().await;
+
+ // Reopen the graveyard and check that we see the objects we added.
+ let graveyard =
+ Graveyard::open(&root_store, graveyard.object_id()).await.expect("open failed");
+ let layer_set = graveyard.store().tree().layer_set();
+ let mut merger = layer_set.merger();
+ let mut iter = graveyard.iter(&mut merger).await.expect("iter failed");
+ assert_eq!(iter.get().expect("missing entry"), (2, 3));
+ iter.advance().await.expect("advance failed");
+ assert_eq!(iter.get().expect("missing entry"), (3, 4));
+ iter.advance().await.expect("advance failed");
+ assert_eq!(iter.get(), None);
+
+ // Remove one of the objects.
+ let mut transaction =
+ fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
+ graveyard.remove(&mut transaction, 3, 4);
+ transaction.commit().await;
+
+ // Check that the graveyard has been updated as expected.
+ let layer_set = graveyard.store().tree().layer_set();
+ let mut merger = layer_set.merger();
+ let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await.expect("iter failed");
+ assert_eq!(iter.get().expect("missing entry"), (2, 3));
+ iter.advance().await.expect("advance failed");
+ assert_eq!(iter.get(), None);
+ }
+}
diff --git a/src/storage/fxfs/src/object_store/journal.rs b/src/storage/fxfs/src/object_store/journal.rs
index 8994c01..c5406e2 100644
--- a/src/storage/fxfs/src/object_store/journal.rs
+++ b/src/storage/fxfs/src/object_store/journal.rs
@@ -16,7 +16,7 @@
// same per-block checksum that is used for the journal file.
mod reader;
-mod super_block;
+pub mod super_block;
mod writer;
use {
@@ -28,6 +28,7 @@
constants::SUPER_BLOCK_OBJECT_ID,
directory::Directory,
filesystem::{Filesystem, Mutations, ObjectFlush, ObjectManager, SyncOptions},
+ graveyard::Graveyard,
journal::{
reader::{JournalReader, ReadResult},
super_block::SuperBlock,
@@ -48,7 +49,10 @@
std::{
clone::Clone,
iter::IntoIterator,
- sync::{Arc, Mutex},
+ sync::{
+ atomic::{self, AtomicBool},
+ Arc, Mutex,
+ },
vec::Vec,
},
};
@@ -137,6 +141,7 @@
objects: Arc<ObjectManager>,
writer: futures::lock::Mutex<JournalWriter<StoreObjectHandle<ObjectStore>>>,
inner: Mutex<Inner>,
+ trace: AtomicBool,
}
struct Inner {
@@ -160,9 +165,14 @@
super_block: SuperBlock::default(),
should_flush: false,
}),
+ trace: AtomicBool::new(false),
}
}
+ pub fn set_trace(&self, v: bool) {
+ self.trace.store(v, atomic::Ordering::Relaxed);
+ }
+
/// Reads a super-block and then replays journaled records.
pub async fn replay(&self, filesystem: Arc<dyn Filesystem>) -> Result<(), Error> {
let device = filesystem.device();
@@ -223,7 +233,9 @@
}
JournalRecord::Commit => {
if let Some(checkpoint) = journal_file_checkpoint.take() {
- log::debug!("REPLAY {}", checkpoint.file_offset);
+ if self.trace.load(atomic::Ordering::Relaxed) {
+ log::info!("REPLAY {}", checkpoint.file_offset);
+ }
for (object_id, mutation) in mutations {
// Snoop the mutations for any that might apply to the journal
// file to ensure that we accurately track changes in size.
@@ -283,6 +295,18 @@
}
writer.seek_to_checkpoint(checkpoint);
}
+
+ let root_store = self.objects.root_store();
+ root_store.ensure_open().await?;
+ self.objects.register_graveyard(Arc::new(
+ Graveyard::open(&self.objects.root_store(), root_store.graveyard_directory_object_id())
+ .await
+ .context(format!(
+ "failed to open graveyard (object_id: {})",
+ root_store.graveyard_directory_object_id()
+ ))?,
+ ));
+
log::info!("replay done");
Ok(())
}
@@ -345,9 +369,9 @@
.context("preallocate journal")?;
// the root store's graveyard and root directory...
- let graveyard = Arc::new(Directory::create(&mut transaction, &root_store).await?);
+ let graveyard = Arc::new(Graveyard::create(&mut transaction, &root_store).await?);
root_store.set_graveyard_directory_object_id(&mut transaction, graveyard.object_id());
- self.objects.register_graveyard(root_store.store_object_id(), graveyard);
+ self.objects.register_graveyard(graveyard);
let root_directory = Directory::create(&mut transaction, &root_store)
.await
@@ -442,7 +466,9 @@
mutations: impl IntoIterator<Item = TxnMutation<'_>>,
journal_file_checkpoint: JournalCheckpoint,
) {
- log::debug!("BEGIN TXN {}", journal_file_checkpoint.file_offset);
+ if self.trace.load(atomic::Ordering::Relaxed) {
+ log::info!("BEGIN TXN {}", journal_file_checkpoint.file_offset);
+ }
for TxnMutation { object_id, mutation, associated_object } in mutations {
self.apply_mutation(
object_id,
@@ -453,7 +479,9 @@
)
.await;
}
- log::debug!("END TXN");
+ if self.trace.load(atomic::Ordering::Relaxed) {
+ log::info!("END TXN");
+ }
}
// Determines whether a mutation at the given checkpoint should be applied. During replay, not
@@ -479,12 +507,16 @@
object: Option<&dyn AssociatedObject>,
) {
if !filter || self.should_apply(object_id, journal_file_checkpoint) {
- log::debug!("applying mutation: {}: {:?}, filter: {}", object_id, mutation, filter);
+ if self.trace.load(atomic::Ordering::Relaxed) {
+ log::info!("applying mutation: {}: {:?}, filter: {}", object_id, mutation, filter);
+ }
self.objects
.apply_mutation(object_id, mutation, filter, journal_file_checkpoint, object)
.await;
} else {
- log::debug!("ignoring mutation: {}, {:?}", object_id, mutation);
+ if self.trace.load(atomic::Ordering::Relaxed) {
+ log::info!("ignoring mutation: {}, {:?}", object_id, mutation);
+ }
}
}
@@ -574,6 +606,8 @@
Ok(())
}
+ /// Flushes any buffered journal data to the device. Note that this does not flush the device
+ /// so it still does not guarantee data will have been persisted to lower layers.
pub async fn sync(&self, _options: SyncOptions) -> Result<(), Error> {
// TODO(csuter): There needs to be some kind of locking here.
let needs_super_block = self.inner.lock().unwrap().needs_super_block;
@@ -587,6 +621,11 @@
Ok(())
}
+ /// Returns a copy of the super-block.
+ pub fn super_block(&self) -> SuperBlock {
+ self.inner.lock().unwrap().super_block.clone()
+ }
+
/// Returns whether or not a flush should be performed. This is only updated after committing a
/// transaction.
pub fn should_flush(&self) -> bool {
@@ -619,6 +658,7 @@
device::DeviceHolder,
object_handle::{ObjectHandle, ObjectHandleExt},
object_store::{
+ directory::Directory,
filesystem::{FxFilesystem, SyncOptions},
fsck::fsck,
transaction::TransactionHandler,
@@ -638,16 +678,20 @@
let device = DeviceHolder::new(FakeDevice::new(2048, TEST_DEVICE_BLOCK_SIZE));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
+
let object_id = {
+ let root_store = fs.root_store();
+ let root_directory =
+ Directory::open(&root_store, root_store.root_directory_object_id())
+ .await
+ .expect("open failed");
let mut transaction =
fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
- let handle = ObjectStore::create_object(
- &fs.root_store(),
- &mut transaction,
- HandleOptions::default(),
- )
- .await
- .expect("create_object failed");
+ let handle = root_directory
+ .create_child_file(&mut transaction, "test")
+ .await
+ .expect("create_child_file failed");
+
transaction.commit().await;
let mut buf = handle.allocate_buffer(TEST_DATA.len());
buf.as_mut_slice().copy_from_slice(TEST_DATA);
@@ -663,7 +707,7 @@
let handle =
ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default())
.await
- .expect("create_object failed");
+ .expect("open_object failed");
let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize);
assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
@@ -675,21 +719,23 @@
async fn test_reset() {
const TEST_DATA: &[u8] = b"hello";
- let device = DeviceHolder::new(FakeDevice::new(4096, TEST_DEVICE_BLOCK_SIZE));
+ let device = DeviceHolder::new(FakeDevice::new(6144, TEST_DEVICE_BLOCK_SIZE));
let mut object_ids = Vec::new();
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
{
+ let root_store = fs.root_store();
+ let root_directory =
+ Directory::open(&root_store, root_store.root_directory_object_id())
+ .await
+ .expect("open failed");
let mut transaction =
fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
- let handle = ObjectStore::create_object(
- &fs.root_store(),
- &mut transaction,
- HandleOptions::default(),
- )
- .await
- .expect("create_object failed");
+ let handle = root_directory
+ .create_child_file(&mut transaction, "test")
+ .await
+ .expect("create_child_file failed");
transaction.commit().await;
let mut buf = handle.allocate_buffer(TEST_DATA.len());
buf.as_mut_slice().copy_from_slice(TEST_DATA);
@@ -699,16 +745,13 @@
// Create a lot of objects but don't sync at the end. This should leave the filesystem
// with a half finished transaction that cannot be replayed.
- for _ in 0..1000 {
+ for i in 0..1000 {
let mut transaction =
fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
- let handle = ObjectStore::create_object(
- &fs.root_store(),
- &mut transaction,
- HandleOptions::default(),
- )
- .await
- .expect("create_object failed");
+ let handle = root_directory
+ .create_child_file(&mut transaction, &format!("{}", i))
+ .await
+ .expect("create_child_file failed");
transaction.commit().await;
let mut buf = handle.allocate_buffer(TEST_DATA.len());
buf.as_mut_slice().copy_from_slice(TEST_DATA);
@@ -720,12 +763,13 @@
let fs = FxFilesystem::open(fs.take_device().await).await.expect("open failed");
fsck(&fs).await.expect("fsck failed");
{
+ let root_store = fs.root_store();
// Check the first two objects which should exist.
for &object_id in &object_ids[0..1] {
let handle =
- ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default())
+ ObjectStore::open_object(&root_store, object_id, HandleOptions::default())
.await
- .expect("create_object failed");
+ .expect("open_object failed");
let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize);
assert_eq!(
handle.read(0, buf.as_mut()).await.expect("read failed"),
@@ -735,15 +779,16 @@
}
// Write one more object and sync.
+ let root_directory =
+ Directory::open(&root_store, root_store.root_directory_object_id())
+ .await
+ .expect("open failed");
let mut transaction =
fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
- let handle = ObjectStore::create_object(
- &fs.root_store(),
- &mut transaction,
- HandleOptions::default(),
- )
- .await
- .expect("create_object failed");
+ let handle = root_directory
+ .create_child_file(&mut transaction, "test2")
+ .await
+ .expect("create_child_file failed");
transaction.commit().await;
let mut buf = handle.allocate_buffer(TEST_DATA.len());
buf.as_mut_slice().copy_from_slice(TEST_DATA);
@@ -752,14 +797,18 @@
object_ids.push(handle.object_id());
}
- let fs = FxFilesystem::open(fs.take_device().await).await.expect("open failed");
+ let fs = FxFilesystem::open_with_trace(fs.take_device().await, false)
+ .await
+ .expect("open failed");
{
+ fsck(&fs).await.expect("fsck failed");
+
// Check the first two and the last objects.
for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
let handle =
ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default())
.await
- .expect("create_object failed");
+ .expect(&format!("open_object failed (object_id: {})", object_id));
let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize);
assert_eq!(
handle.read(0, buf.as_mut()).await.expect("read failed"),
@@ -767,8 +816,6 @@
);
assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
}
-
- fsck(&fs).await.expect("fsck failed");
}
}
}
diff --git a/src/storage/fxfs/src/object_store/journal/super_block.rs b/src/storage/fxfs/src/object_store/journal/super_block.rs
index a74c07f..6fd1fe3 100644
--- a/src/storage/fxfs/src/object_store/journal/super_block.rs
+++ b/src/storage/fxfs/src/object_store/journal/super_block.rs
@@ -40,7 +40,7 @@
// A super-block consists of this header followed by records that are to be replayed into the root
// parent object store.
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
-pub(super) struct SuperBlock {
+pub struct SuperBlock {
// TODO(csuter): version stuff
// TODO(csuter): UUID
diff --git a/src/storage/fxfs/src/object_store/record.rs b/src/storage/fxfs/src/object_store/record.rs
index db938c8..ceb9232a 100644
--- a/src/storage/fxfs/src/object_store/record.rs
+++ b/src/storage/fxfs/src/object_store/record.rs
@@ -36,6 +36,8 @@
Attribute(u64, AttributeKey),
/// A child of a directory.
Child { name: String }, // TODO(jfsulliv): Should this be a string or array of bytes?
+ /// A graveyard entry.
+ GraveyardEntry { store_object_id: u64, object_id: u64 },
}
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
@@ -169,6 +171,14 @@
Self { object_id, data: ObjectKeyData::Child { name: name.to_owned() } }
}
+ /// Creates a graveyard entry.
+ pub fn graveyard_entry(graveyard_object_id: u64, store_object_id: u64, object_id: u64) -> Self {
+ Self {
+ object_id: graveyard_object_id,
+ data: ObjectKeyData::GraveyardEntry { store_object_id, object_id },
+ }
+ }
+
pub fn tombstone(object_id: u64) -> Self {
Self { object_id, data: ObjectKeyData::Tombstone }
}
@@ -274,6 +284,7 @@
allocated_size: u64,
},
Directory,
+ Graveyard,
}
/// ObjectValue is the value of an item in the object store.
@@ -283,6 +294,9 @@
pub enum ObjectValue {
/// Some keys (e.g. tombstones) have no value.
None,
+ /// Some keys have no value but need to differentiate between a present value and no value
+ /// (None) i.e. their value is really a boolean: None => false, Some => true.
+ Some,
/// The value for an ObjectKey::Object record.
Object { kind: ObjectKind },
/// An attribute associated with a file object. |size| is the size of the attribute in bytes.
diff --git a/src/storage/fxfs/src/server/directory.rs b/src/storage/fxfs/src/server/directory.rs
index bf2362e..47dd71e 100644
--- a/src/storage/fxfs/src/server/directory.rs
+++ b/src/storage/fxfs/src/server/directory.rs
@@ -291,17 +291,11 @@
{
let store = self.store();
if let ObjectDescriptor::File = descriptor {
- store
- .filesystem()
- .object_manager()
- .graveyard(store.store_object_id())
- .unwrap()
- .insert_child(
- transaction,
- &format!("{}", existing_oid),
- existing_oid,
- descriptor,
- );
+ store.filesystem().object_manager().graveyard().unwrap().add(
+ transaction,
+ self.store().store_object_id(),
+ existing_oid,
+ );
} else {
directory::remove(transaction, &store, existing_oid);
}
diff --git a/src/storage/fxfs/src/volume.rs b/src/storage/fxfs/src/volume.rs
index ca8b1e18..83b8bb5 100644
--- a/src/storage/fxfs/src/volume.rs
+++ b/src/storage/fxfs/src/volume.rs
@@ -7,7 +7,7 @@
errors::FxfsError,
object_store::{
directory::Directory,
- filesystem::{Filesystem, FxFilesystem},
+ filesystem::FxFilesystem,
transaction::{LockKey, TransactionHandler},
ObjectDescriptor, ObjectStore,
},
@@ -44,9 +44,6 @@
let mut transaction = self.filesystem.clone().new_transaction(&[]).await?;
store = root_store.create_child_store(&mut transaction).await?;
- let graveyard = Arc::new(Directory::create(&mut transaction, &store).await?);
- store.set_graveyard_directory_object_id(&mut transaction, graveyard.object_id());
-
let root_directory = Directory::create(&mut transaction, &store).await?;
store.set_root_directory_object_id(&mut transaction, root_directory.object_id());
@@ -56,7 +53,6 @@
store.store_object_id(),
);
transaction.commit().await;
- self.filesystem.object_manager().register_graveyard(store.store_object_id(), graveyard);
Ok(store)
}
@@ -71,17 +67,7 @@
Ok(if let Some(volume_store) = self.filesystem.store(object_id) {
volume_store
} else {
- let store = self.filesystem.root_store().open_store(object_id).await?;
-
- // Make sure the graveyard is registered.
- if self.filesystem.object_manager().graveyard(object_id).is_none() {
- self.filesystem.object_manager().register_graveyard(
- object_id,
- Arc::new(Directory::open(&store, store.graveyard_directory_object_id()).await?),
- );
- }
-
- store
+ self.filesystem.root_store().open_store(object_id).await?
})
}