blob: 16ab9f1afe301e318e2b9a53022dc9fee1999d82 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// The journal is implemented as an ever extending file which contains variable length records that
// describe mutations to be applied to various objects. The journal file consists of blocks, with a
// checksum at the end of each block, but otherwise it can be considered a continuous stream. The
// checksum is seeded with the checksum from the previous block. To free space in the journal,
// records are replaced with sparse extents when it is known they are no longer needed to mount. At
// mount time, the journal is replayed: the mutations are applied into memory. Eventually, a
// checksum failure will indicate no more records exist to be replayed, at which point the mount can
// continue and the journal will be extended from that point with further mutations as required.
//
// The super-block contains the starting offset and checksum for the journal file and sufficient
// information to locate the initial extents for the journal. The super-block is written using the
// same per-block checksum that is used for the journal file.
pub mod checksum_list;
mod handle;
mod reader;
pub mod super_block;
mod writer;
use {
crate::{
debug_assert_not_too_long,
errors::FxfsError,
lsm_tree::LSMTree,
object_handle::ObjectHandle,
object_store::{
allocator::{Allocator, Reservation, SimpleAllocator},
constants::{SUPER_BLOCK_A_OBJECT_ID, SUPER_BLOCK_B_OBJECT_ID},
directory::Directory,
filesystem::{Filesystem, Mutations, SyncOptions},
graveyard::Graveyard,
journal::{
checksum_list::ChecksumList,
handle::Handle,
reader::{JournalReader, ReadResult},
super_block::{SuperBlock, SuperBlockCopy},
writer::JournalWriter,
},
merge::{self},
object_manager::{ObjectFlush, ObjectManager},
record::{ExtentKey, ObjectKey, DEFAULT_DATA_ATTRIBUTE_ID},
round_down,
transaction::{
AssocObj, Mutation, ObjectStoreMutation, Options, Transaction, TxnMutation,
},
HandleOptions, ObjectStore, StoreObjectHandle,
},
trace_duration,
},
anyhow::{anyhow, bail, Context, Error},
async_utils::event::Event,
bincode::serialize_into,
byteorder::{ByteOrder, LittleEndian},
futures::{self},
once_cell::sync::OnceCell,
rand::Rng,
serde::{Deserialize, Serialize},
std::{
clone::Clone,
iter::IntoIterator,
ops::Bound,
sync::{
atomic::{self, AtomicBool},
Arc, Mutex,
},
vec::Vec,
},
};
// The journal file is written to in blocks of this size.
const BLOCK_SIZE: u64 = 8192;
// The journal file is extended by this amount when necessary.
const CHUNK_SIZE: u64 = 131_072;
// In the steady state, the journal should fluctuate between being approximately half of this number
// and this number. New super-blocks will be written every time about half of this amount is
// written to the journal.
const RECLAIM_SIZE: u64 = 262_144;
// After replaying the journal, it's possible that the stream doesn't end cleanly, in which case the
// next journal block needs to indicate this. This is done by pretending the previous block's
// checksum is xored with this value, and using that as the seed for the next journal block.
const RESET_XOR: u64 = 0xffffffffffffffff;
// This size needs to be chosen carefully such that we cannot run out of journal space when we are
// compacting. New transactions that are unrelated to compaction are paused when its live data hits
// RECLAIM_SIZE. During compaction, more is written to the journal before a new super-block is
// written. When we write a new super-block, we only free up whatever the previous super-block
// allows (because we only flush the device once), so this needs to be at least 2 * (RECLAIM_SIZE +
// buffer).
const RESERVATION_SIZE: u64 = 4 * RECLAIM_SIZE;
type Checksum = u64;
// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
// every block.
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct JournalCheckpoint {
pub file_offset: u64,
// Starting check-sum for block that contains file_offset i.e. the checksum for the previous
// block.
pub checksum: Checksum,
}
impl JournalCheckpoint {
fn new(file_offset: u64, checksum: Checksum) -> JournalCheckpoint {
JournalCheckpoint { file_offset, checksum }
}
}
// All journal blocks are covered by a fletcher64 checksum as the last 8 bytes in a block.
pub fn fletcher64(buf: &[u8], previous: u64) -> u64 {
assert!(buf.len() % 4 == 0);
let mut lo = previous as u32;
let mut hi = (previous >> 32) as u32;
for chunk in buf.chunks(4) {
lo = lo.wrapping_add(LittleEndian::read_u32(chunk));
hi = hi.wrapping_add(lo);
}
(hi as u64) << 32 | lo as u64
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum JournalRecord {
// Indicates no more records in this block.
EndBlock,
// Mutation for a particular object. object_id here is for the collection i.e. the store or
// allocator.
Mutation { object_id: u64, mutation: Mutation },
// Commits records in the transaction.
Commit,
// Discard all mutations with offsets greater than or equal to the given offset.
Discard(u64),
}
pub(super) fn journal_handle_options() -> HandleOptions {
HandleOptions { overwrite: true, skip_journal_checks: true, ..Default::default() }
}
fn clone_mutations<'a>(transaction: &Transaction<'_>) -> Vec<(u64, Mutation)> {
transaction
.mutations
.iter()
.map(|TxnMutation { object_id, mutation, .. }| (*object_id, mutation.clone()))
.collect()
}
/// The journal records a stream of mutations that are to be applied to other objects. At mount
/// time, these records can be replayed into memory. It provides a way to quickly persist changes
/// without having to make a large number of writes; they can be deferred to a later time (e.g.
/// when a sufficient number have been queued). It also provides support for transactions, the
/// ability to have mutations that are to be applied atomically together.
pub struct Journal {
objects: Arc<ObjectManager>,
writer: futures::lock::Mutex<JournalWriter>,
handle_and_reservation: OnceCell<(StoreObjectHandle<ObjectStore>, Reservation)>,
inner: Mutex<Inner>,
trace: AtomicBool,
}
struct Inner {
needs_super_block: bool,
// This is a cached copy of the journal-file-offset which is held under a regular mutex rather
// than an async one, which allows computations to be made in non-async contexts, such as
// whether or not a compaction is required and whether we should be pausing new non-compaction
// related transactions.
journal_file_offset: u64,
super_block: SuperBlock,
super_block_to_write: SuperBlockCopy,
// This event is used when we are waiting for a compaction to free up journal space.
reclaim_event: Option<Event>,
// The offset that we can zero the journal up to now that it is no longer needed.
zero_offset: Option<u64>,
}
impl Journal {
pub fn new(objects: Arc<ObjectManager>) -> Journal {
let starting_checksum = rand::thread_rng().gen();
Journal {
objects: objects,
writer: futures::lock::Mutex::new(JournalWriter::new(
BLOCK_SIZE as usize,
starting_checksum,
)),
handle_and_reservation: OnceCell::new(),
inner: Mutex::new(Inner {
needs_super_block: true,
super_block: SuperBlock::default(),
super_block_to_write: SuperBlockCopy::A,
journal_file_offset: 0,
reclaim_event: None,
zero_offset: None,
}),
trace: AtomicBool::new(false),
}
}
pub fn set_trace(&self, v: bool) {
self.trace.store(v, atomic::Ordering::Relaxed);
}
pub fn journal_file_offset(&self) -> u64 {
self.inner.lock().unwrap().super_block.super_block_journal_file_offset
}
async fn load_superblock(
&self,
filesystem: Arc<dyn Filesystem>,
target_super_block: SuperBlockCopy,
) -> Result<(SuperBlock, SuperBlockCopy, Arc<ObjectStore>), Error> {
let device = filesystem.device();
let (super_block, mut reader) = SuperBlock::read(device, target_super_block)
.await
.context("Failed to read superblocks")?;
// Sanity-check the super-block before we attempt to use it. Further validation has to be
// done after we replay the items in |reader|, since they could involve super-block
// mutations.
if super_block.magic != super_block::SUPER_BLOCK_MAGIC {
return Err(anyhow!(FxfsError::Inconsistent))
.context(format!("Invalid magic, super_block: {:?}", super_block));
} else if super_block.major_version != super_block::SUPER_BLOCK_MAJOR_VERSION {
return Err(anyhow!(FxfsError::InvalidVersion)).context(format!(
"Invalid version (has {}, want {})",
super_block.major_version,
super_block::SUPER_BLOCK_MAJOR_VERSION
));
}
let root_parent = ObjectStore::new_empty(
None,
super_block.root_parent_store_object_id,
filesystem.clone(),
);
while let Some(item) = reader.next_item().await? {
let mutation = Mutation::insert_object(item.key, item.value);
root_parent.apply_mutation(mutation, None, 0, AssocObj::None).await;
}
// TODO(jfsulliv): Upgrade minor revision as needed.
Ok((super_block, target_super_block, root_parent))
}
/// Reads the latest super-block, and then replays journaled records.
pub async fn replay(&self, filesystem: Arc<dyn Filesystem>) -> Result<(), Error> {
trace_duration!("Journal::replay");
let (super_block, current_super_block, root_parent) = match futures::join!(
self.load_superblock(filesystem.clone(), SuperBlockCopy::A),
self.load_superblock(filesystem.clone(), SuperBlockCopy::B)
) {
(Err(e1), Err(e2)) => {
bail!("Failed to load both superblocks due to {:?}\nand\n{:?}", e1, e2)
}
(Ok(result), Err(_)) => result,
(Err(_), Ok(result)) => result,
(Ok(result1), Ok(result2)) => {
// Break the tie by taking the super-block with the greatest generation.
if result2.0.generation > result1.0.generation {
result2
} else {
result1
}
}
};
log::info!(
"replaying journal, superblock: {:?} (copy {:?})",
super_block,
current_super_block
);
self.objects.set_root_parent_store(root_parent.clone());
let allocator = Arc::new(SimpleAllocator::new(
filesystem.clone(),
super_block.allocator_object_id,
false,
));
self.objects.set_allocator(allocator.clone());
{
let mut inner = self.inner.lock().unwrap();
inner.needs_super_block = false;
inner.super_block = super_block.clone();
inner.super_block_to_write = current_super_block.next();
}
let root_store = ObjectStore::new(
Some(root_parent.clone()),
super_block.root_store_object_id,
filesystem.clone(),
None,
LSMTree::new(merge::merge),
);
self.objects.set_root_store(root_store);
let device = filesystem.device();
let mut handle;
{
let root_parent_layer = root_parent.tree().mutable_layer();
let mut iter = root_parent_layer
.seek(Bound::Included(&ObjectKey::with_extent_key(
super_block.journal_object_id,
DEFAULT_DATA_ATTRIBUTE_ID,
ExtentKey::search_key_from_offset(round_down(
super_block.journal_checkpoint.file_offset,
BLOCK_SIZE,
)),
)))
.await?;
handle = Handle::new(super_block.journal_object_id, device.clone());
while let Some(item) = iter.get() {
if !handle.try_push_extent_from_object_item(item)? {
break;
}
iter.advance().await?;
}
}
let mut reader =
JournalReader::new(handle, self.block_size(), &super_block.journal_checkpoint);
let mut checksum_list = ChecksumList::new();
let mut mutations = Vec::new();
let mut current_transaction = None;
let mut end_block = false;
loop {
let current_checkpoint = reader.journal_file_checkpoint();
match reader.deserialize().await? {
ReadResult::Reset => {
if current_transaction.is_some() {
current_transaction = None;
mutations.pop();
}
}
ReadResult::Some(record) => {
end_block = false;
match record {
JournalRecord::EndBlock => {
reader.skip_to_end_of_block();
end_block = true;
}
JournalRecord::Mutation { object_id, mutation } => {
if current_transaction.is_none() {
mutations.push((current_checkpoint, Vec::new()));
current_transaction = mutations.last_mut();
}
current_transaction.as_mut().unwrap().1.push((object_id, mutation));
}
JournalRecord::Commit => {
if let Some((checkpoint, mutations)) = current_transaction.take() {
for (object_id, mutation) in mutations {
if !self.should_apply(*object_id, checkpoint) {
continue;
}
if !self
.objects
.validate_mutation(
checkpoint.file_offset,
*object_id,
&mutation,
&mut checksum_list,
)
.await?
{
if self.trace.load(atomic::Ordering::Relaxed) {
log::info!(
"Stopping replay at bad mutation: {:?}",
mutation
);
}
break;
}
// Snoop the mutations for any that might apply to the journal
// file so that we can pass them to the reader so that it can
// read the journal file.
if *object_id == super_block.root_parent_store_object_id {
if let Mutation::ObjectStore(ObjectStoreMutation {
item,
..
}) = mutation
{
reader.handle().try_push_extent_from_object_item(
item.as_item_ref(),
)?;
}
}
}
}
}
JournalRecord::Discard(offset) => {
if let Some(transaction) = current_transaction.as_ref() {
if transaction.0.file_offset < offset {
// Odd, but OK.
continue;
}
}
current_transaction = None;
while let Some(transaction) = mutations.last() {
if transaction.0.file_offset < offset {
break;
}
mutations.pop();
}
}
}
}
// This is expected when we reach the end of the journal stream.
ReadResult::ChecksumMismatch => break,
}
}
// Validate the checksums.
let journal_offset = checksum_list
.verify(device.as_ref(), reader.journal_file_checkpoint().file_offset)
.await?;
// Apply the mutations.
let mut last_checkpoint = if mutations.is_empty() {
super_block.journal_checkpoint.clone()
} else {
'outer: loop {
for (checkpoint, mutations) in mutations {
if checkpoint.file_offset >= journal_offset {
break 'outer checkpoint;
}
if self.trace.load(atomic::Ordering::Relaxed) {
log::info!("REPLAY {}", checkpoint.file_offset);
}
for (object_id, mutation) in mutations {
self.apply_mutation(object_id, &checkpoint, mutation, None, AssocObj::None)
.await;
}
}
break reader.journal_file_checkpoint();
}
};
// Configure the journal writer so that we can continue.
{
if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
return Err(anyhow!(FxfsError::Inconsistent).context(format!(
"journal replay cut short; journal finishes at {}, but super-block was \
written at {}",
last_checkpoint.file_offset, super_block.super_block_journal_file_offset
)));
}
allocator.ensure_open().await?;
let handle = ObjectStore::open_object(
&root_parent,
super_block.journal_object_id,
journal_handle_options(),
)
.await?;
let current_journal_size = handle.get_allocated_size().await.unwrap();
let allocator_reservation = allocator
.reserve(RESERVATION_SIZE.saturating_sub(current_journal_size))
.ok_or(FxfsError::NoSpace)
.context("unable to reserve space for the journal")?;
let _ = self.handle_and_reservation.set((handle, allocator_reservation));
let mut writer = self.writer.lock().await;
// If the last entry wasn't an end_block, then we need to reset the stream.
if !end_block {
last_checkpoint.checksum ^= RESET_XOR;
}
let offset = last_checkpoint.file_offset;
self.inner.lock().unwrap().journal_file_offset = offset;
writer.seek_to_checkpoint(last_checkpoint);
if offset < reader.journal_file_checkpoint().file_offset {
// TODO(csuter): We need to make sure that this is tested. If a corruption test
// does not trigger this, we may have to add a targeted test.
serialize_into(&mut *writer, &JournalRecord::Discard(offset))?;
}
}
let root_store = self.objects.root_store();
root_store.ensure_open().await?;
self.objects.register_graveyard(
Graveyard::open(&self.objects.root_store(), root_store.graveyard_directory_object_id())
.await
.context(format!(
"failed to open graveyard (object_id: {})",
root_store.graveyard_directory_object_id()
))?,
);
log::info!("replay done");
Ok(())
}
/// Creates an empty filesystem with the minimum viable objects (including a root parent and
/// root store but no further child stores). Nothing is written to the device until sync is
/// called.
pub async fn init_empty(&self, filesystem: Arc<dyn Filesystem>) -> Result<(), Error> {
// The following constants are only used at format time. When mounting, the recorded values
// in the superblock should be used. The root parent store does not have a parent, but
// needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
// the same object ID) with any objects in the root store that use the journal to track
// mutations.
const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
let checkpoint = self.writer.lock().await.journal_file_checkpoint();
let root_parent =
ObjectStore::new_empty(None, INIT_ROOT_PARENT_STORE_OBJECT_ID, filesystem.clone());
self.objects.set_root_parent_store(root_parent.clone());
let allocator =
Arc::new(SimpleAllocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID, true));
self.objects.set_allocator(allocator.clone());
let journal_handle;
let super_block_a_handle;
let super_block_b_handle;
let root_store;
let mut transaction = filesystem
.new_transaction(&[], Options { skip_journal_checks: true, ..Default::default() })
.await?;
root_store = root_parent
.create_child_store_with_id(&mut transaction, INIT_ROOT_STORE_OBJECT_ID)
.await
.context("create root store")?;
self.objects.set_root_store(root_store.clone());
// Create the super-block objects...
super_block_a_handle = ObjectStore::create_object_with_id(
&root_store,
&mut transaction,
SUPER_BLOCK_A_OBJECT_ID,
HandleOptions { overwrite: true, ..Default::default() },
)
.await
.context("create super block")?;
super_block_a_handle
.extend(&mut transaction, SuperBlockCopy::A.first_extent())
.await
.context("extend super block")?;
super_block_b_handle = ObjectStore::create_object_with_id(
&root_store,
&mut transaction,
SUPER_BLOCK_B_OBJECT_ID,
HandleOptions { overwrite: true, ..Default::default() },
)
.await
.context("create super block")?;
super_block_b_handle
.extend(&mut transaction, SuperBlockCopy::B.first_extent())
.await
.context("extend super block")?;
// the journal object...
journal_handle =
ObjectStore::create_object(&root_parent, &mut transaction, journal_handle_options())
.await
.context("create journal")?;
journal_handle
.preallocate_range(&mut transaction, 0..self.chunk_size())
.await
.context("preallocate journal")?;
// the root store's graveyard and root directory...
let graveyard = Graveyard::create(&mut transaction, &root_store).await?;
root_store.set_graveyard_directory_object_id(&mut transaction, graveyard.object_id());
self.objects.register_graveyard(graveyard);
let root_directory = Directory::create(&mut transaction, &root_store)
.await
.context("create root directory")?;
root_store.set_root_directory_object_id(&mut transaction, root_directory.object_id());
transaction.commit().await;
// Cache the super-block.
{
let mut inner = self.inner.lock().unwrap();
inner.super_block = SuperBlock::new(
root_parent.store_object_id(),
root_store.store_object_id(),
allocator.object_id(),
journal_handle.object_id(),
checkpoint,
);
inner.super_block_to_write = SuperBlockCopy::A;
}
allocator.ensure_open().await?;
let allocator_reservation = allocator
.reserve(
RESERVATION_SIZE.saturating_sub(journal_handle.get_allocated_size().await.unwrap()),
)
.ok_or(FxfsError::NoSpace)
.context("unable to reserve space for the journal")?;
// Initialize the journal writer.
let _ = self.handle_and_reservation.set((journal_handle, allocator_reservation));
Ok(())
}
/// Commits a transaction.
pub async fn commit(&self, transaction: &mut Transaction<'_>) {
if transaction.is_empty() {
return;
}
let mut writer = self.writer.lock().await;
// TODO(csuter): handle the case where we are unable to extend the journal file.
self.maybe_extend_journal_file(&mut writer).await.unwrap();
// TODO(csuter): writing to the journal here can be asynchronous.
let journal_file_checkpoint = writer.journal_file_checkpoint();
writer.write_mutations(
transaction
.mutations
.iter()
.map(|TxnMutation { object_id, mutation, .. }| (*object_id, mutation.clone())),
);
if let Some((handle, _)) = self.handle_and_reservation.get() {
// TODO(jfsulliv): We should separate writing to the journal buffer from flushing the
// journal buffer (i.e. consider doing this in a background task). Flushing here is
// prone to deadlock, since |flush_buffer| itself creates a transaction which locks the
// journal handle.
// TODO(csuter): Add a test for the aforementioned deadlock condition.
if let Err(e) = writer.flush_buffer(handle).await {
// TODO(csuter): if writes to the journal start failing then we should prevent the
// creation of new transactions.
log::warn!("journal write failed: {}", e);
}
}
self.apply_mutations(transaction, journal_file_checkpoint).await;
self.inner.lock().unwrap().journal_file_offset =
writer.journal_file_checkpoint().file_offset;
}
async fn maybe_extend_journal_file(&self, writer: &mut JournalWriter) -> Result<(), Error> {
// TODO(csuter): this currently assumes that a transaction can fit in CHUNK_SIZE.
let file_offset = writer.journal_file_checkpoint().file_offset;
let (handle, reservation) = match self.handle_and_reservation.get() {
None => return Ok(()),
Some(x) => x,
};
let size = handle.get_size();
let needs_extending = file_offset + self.chunk_size() > size;
let zero_offset = self.inner.lock().unwrap().zero_offset.clone();
if !needs_extending && zero_offset.is_none() {
return Ok(());
}
let mut transaction = handle
.new_transaction_with_options(Options {
skip_journal_checks: true,
allocator_reservation: Some(reservation),
..Default::default()
})
.await?;
if needs_extending {
handle.preallocate_range(&mut transaction, size..size + self.chunk_size()).await?;
}
if let Some(zero_offset) = zero_offset {
handle.zero(&mut transaction, 0..zero_offset).await?;
}
let journal_file_checkpoint = writer.journal_file_checkpoint();
// We have to apply the mutations before writing them because we borrowed the writer for the
// transaction. First we clone the mutations without the associated objects since that's
// where the handle is borrowed.
let cloned_mutations = clone_mutations(&transaction);
self.apply_mutations(&mut transaction, journal_file_checkpoint).await;
std::mem::drop(transaction);
writer.write_mutations(cloned_mutations);
// We need to be sure that any journal records that arose from preallocation can fit in
// within the old preallocated range. If this situation arose (it shouldn't, so it would be
// a bug if it did), then it could be fixed (e.g. by fsck) by forcing a sync of the root
// store.
if needs_extending {
assert!(writer.journal_file_checkpoint().file_offset <= size);
let file_offset = writer.journal_file_checkpoint().file_offset;
let (handle, _) = self.handle_and_reservation.get().unwrap();
assert!(file_offset + self.chunk_size() <= handle.get_size());
}
let mut inner = self.inner.lock().unwrap();
if inner.zero_offset == zero_offset {
inner.zero_offset = None;
}
Ok(())
}
async fn apply_mutations(
&self,
transaction: &mut Transaction<'_>,
journal_file_checkpoint: JournalCheckpoint,
) {
if self.trace.load(atomic::Ordering::Relaxed) {
log::info!("BEGIN TXN {}", journal_file_checkpoint.file_offset);
}
let mutations = std::mem::take(&mut transaction.mutations);
for TxnMutation { object_id, mutation, associated_object } in mutations {
self.apply_mutation(
object_id,
&journal_file_checkpoint,
mutation,
Some(transaction),
associated_object,
)
.await;
}
if self.trace.load(atomic::Ordering::Relaxed) {
log::info!("END TXN");
}
}
// Determines whether a mutation at the given checkpoint should be applied. During replay, not
// all records should be applied because the object store or allocator might already contain the
// mutation. After replay, that obviously isn't the case and we want to apply all mutations.
// Regardless, we want to keep track of the earliest mutation in the journal for a given object.
fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
let super_block = &self.inner.lock().unwrap().super_block;
let offset = super_block
.journal_file_offsets
.get(&object_id)
.cloned()
.unwrap_or(super_block.super_block_journal_file_offset);
journal_file_checkpoint.file_offset >= offset
}
async fn apply_mutation(
&self,
object_id: u64,
journal_file_checkpoint: &JournalCheckpoint,
mutation: Mutation,
transaction: Option<&Transaction<'_>>,
object: AssocObj<'_>,
) {
if transaction.is_some() || self.should_apply(object_id, journal_file_checkpoint) {
if self.trace.load(atomic::Ordering::Relaxed) {
log::info!("applying mutation: {}: {:?}", object_id, mutation);
}
self.objects
.apply_mutation(object_id, mutation, transaction, journal_file_checkpoint, object)
.await;
} else {
if self.trace.load(atomic::Ordering::Relaxed) {
log::info!("ignoring mutation: {}, {:?}", object_id, mutation);
}
}
}
pub async fn write_super_block(&self) -> Result<(), Error> {
let root_parent_store = self.objects.root_parent_store();
// First we must lock the root parent store so that no new entries are written to it.
let sync = ObjectFlush::new(self.objects.clone(), root_parent_store.store_object_id());
let mutable_layer = root_parent_store.tree().mutable_layer();
let _guard = mutable_layer.lock_writes();
// After locking, we need to flush the journal because it might have records that a new
// super-block would refer to.
let journal_file_checkpoint = {
let mut writer = self.writer.lock().await;
// We are holding the appropriate locks now (no new transaction can be applied whilst we
// are holding the writer lock, so we can call ObjectFlush::begin for the root parent
// object store.
sync.begin();
serialize_into(&mut *writer, &JournalRecord::EndBlock)?;
writer.pad_to_block()?;
writer.flush_buffer(&self.handle_and_reservation.get().unwrap().0).await?;
writer.journal_file_checkpoint()
};
// We need to flush previous writes to the device since the new super-block we are writing
// relies on written data being observable.
root_parent_store.device().flush().await?;
// Tell the allocator that we flushed the device so that it can now start using space that
// was deallocated.
self.objects.allocator().did_flush_device(journal_file_checkpoint.file_offset).await;
let (mut new_super_block, super_block_to_write) = {
let inner = self.inner.lock().unwrap();
(inner.super_block.clone(), inner.super_block_to_write)
};
let old_super_block_offset = new_super_block.journal_checkpoint.file_offset;
let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
// TODO(jfsulliv): Handle overflow.
new_super_block.generation = new_super_block.generation.checked_add(1).unwrap();
new_super_block.super_block_journal_file_offset = journal_file_checkpoint.file_offset;
new_super_block.journal_checkpoint = min_checkpoint.unwrap_or(journal_file_checkpoint);
new_super_block.journal_file_offsets = journal_file_offsets;
// TODO(csuter); the super-block needs space reserved for it.
new_super_block
.write(
&root_parent_store,
ObjectStore::open_object(
&self.objects.root_store(),
super_block_to_write.object_id(),
journal_handle_options(),
)
.await?,
)
.await?;
{
let mut inner = self.inner.lock().unwrap();
inner.super_block = new_super_block;
inner.super_block_to_write = super_block_to_write.next();
inner.needs_super_block = false;
inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
if let Some(event) = inner.reclaim_event.take() {
event.signal();
}
}
sync.commit();
Ok(())
}
/// Flushes any buffered journal data to the device. Note that this does not flush the device
/// so it still does not guarantee data will have been persisted to lower layers.
pub async fn sync(&self, _options: SyncOptions) -> Result<(), Error> {
// TODO(csuter): There needs to be some kind of locking here.
let needs_super_block = self.inner.lock().unwrap().needs_super_block;
if needs_super_block {
self.write_super_block().await?;
}
let mut writer = self.writer.lock().await;
serialize_into(&mut *writer, &JournalRecord::EndBlock)?;
writer.pad_to_block()?;
writer.flush_buffer(&self.handle_and_reservation.get().unwrap().0).await?;
Ok(())
}
/// Returns a copy of the super-block.
pub fn super_block(&self) -> SuperBlock {
self.inner.lock().unwrap().super_block.clone()
}
/// Returns whether or not a flush should be performed. This is only updated after committing a
/// transaction.
pub fn should_flush(&self) -> bool {
// The / 2 is here because after compacting, we cannot reclaim the space until the
// _next_ time we flush the device since the super-block is not guaranteed to persist
// until then.
let inner = self.inner.lock().unwrap();
inner.journal_file_offset - inner.super_block.journal_checkpoint.file_offset
> RECLAIM_SIZE / 2
}
/// Waits for there to be sufficient space in the journal.
pub async fn check_journal_space(&self) {
loop {
debug_assert_not_too_long!({
let mut inner = self.inner.lock().unwrap();
if inner.journal_file_offset - inner.super_block.journal_checkpoint.file_offset
< RECLAIM_SIZE
{
break;
}
let event = inner.reclaim_event.get_or_insert_with(|| Event::new());
event.wait()
});
}
}
fn block_size(&self) -> u64 {
BLOCK_SIZE
}
fn chunk_size(&self) -> u64 {
CHUNK_SIZE
}
}
impl JournalWriter {
// Extends JournalWriter to write a transaction.
fn write_mutations<'a>(&mut self, mutations: impl IntoIterator<Item = (u64, Mutation)>) {
for (object_id, mutation) in mutations {
self.write_record(&JournalRecord::Mutation { object_id, mutation });
}
self.write_record(&JournalRecord::Commit);
}
}
#[cfg(test)]
mod tests {
use {
crate::{
object_handle::{ObjectHandle, ObjectHandleExt},
object_store::{
directory::Directory,
filesystem::{Filesystem, FxFilesystem, SyncOptions},
fsck::fsck,
journal::super_block::{SuperBlock, SuperBlockCopy},
transaction::{Options, TransactionHandler},
HandleOptions, ObjectStore,
},
},
fuchsia_async as fasync,
storage_device::{fake_device::FakeDevice, DeviceHolder},
};
const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
#[fasync::run_singlethreaded(test)]
async fn test_alternating_super_blocks() {
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
fs.close().await.expect("Close failed");
let device = fs.take_device().await;
device.reopen();
let (super_block_a, _) =
SuperBlock::read(device.clone(), SuperBlockCopy::A).await.expect("read failed");
let (super_block_b, _) =
SuperBlock::read(device.clone(), SuperBlockCopy::B).await.expect("read failed");
assert!(super_block_a.generation > super_block_b.generation);
let fs = FxFilesystem::open(device).await.expect("open failed");
let root_store = fs.root_store();
// Generate enough work to induce a journal flush.
for _ in 0..1000 {
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
ObjectStore::create_object(&root_store, &mut transaction, HandleOptions::default())
.await
.expect("create_object failed");
transaction.commit().await;
}
fs.close().await.expect("Close failed");
let device = fs.take_device().await;
device.reopen();
let (super_block_a, _) =
SuperBlock::read(device.clone(), SuperBlockCopy::A).await.expect("read failed");
let (super_block_b, _) =
SuperBlock::read(device.clone(), SuperBlockCopy::B).await.expect("read failed");
assert!(super_block_b.generation > super_block_a.generation);
}
#[fasync::run_singlethreaded(test)]
async fn test_replay() {
const TEST_DATA: &[u8] = b"hello";
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let object_id = {
let root_store = fs.root_store();
let root_directory =
Directory::open(&root_store, root_store.root_directory_object_id())
.await
.expect("open failed");
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let handle = root_directory
.create_child_file(&mut transaction, "test")
.await
.expect("create_child_file failed");
transaction.commit().await;
let mut buf = handle.allocate_buffer(TEST_DATA.len());
buf.as_mut_slice().copy_from_slice(TEST_DATA);
handle.write(0, buf.as_ref()).await.expect("write failed");
// As this is the first sync, this will actually trigger a new super-block, but normally
// this would not be the case.
fs.sync(SyncOptions::default()).await.expect("sync failed");
handle.object_id()
};
{
fs.close().await.expect("Close failed");
let device = fs.take_device().await;
device.reopen();
let fs = FxFilesystem::open(device).await.expect("open failed");
let handle =
ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default())
.await
.expect("open_object failed");
let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize);
assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
fsck(&fs).await.expect("fsck failed");
fs.close().await.expect("Close failed");
}
}
#[fasync::run_singlethreaded(test)]
async fn test_reset() {
const TEST_DATA: &[u8] = b"hello";
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
let mut object_ids = Vec::new();
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
{
let root_store = fs.root_store();
let root_directory =
Directory::open(&root_store, root_store.root_directory_object_id())
.await
.expect("open failed");
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let handle = root_directory
.create_child_file(&mut transaction, "test")
.await
.expect("create_child_file failed");
transaction.commit().await;
let mut buf = handle.allocate_buffer(TEST_DATA.len());
buf.as_mut_slice().copy_from_slice(TEST_DATA);
handle.write(0, buf.as_ref()).await.expect("write failed");
fs.sync(SyncOptions::default()).await.expect("sync failed");
object_ids.push(handle.object_id());
// Create a lot of objects but don't sync at the end. This should leave the filesystem
// with a half finished transaction that cannot be replayed.
for i in 0..1000 {
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let handle = root_directory
.create_child_file(&mut transaction, &format!("{}", i))
.await
.expect("create_child_file failed");
transaction.commit().await;
let mut buf = handle.allocate_buffer(TEST_DATA.len());
buf.as_mut_slice().copy_from_slice(TEST_DATA);
handle.write(0, buf.as_ref()).await.expect("write failed");
object_ids.push(handle.object_id());
}
}
fs.close().await.expect("Close failed");
let device = fs.take_device().await;
device.reopen();
let fs = FxFilesystem::open(device).await.expect("open failed");
fsck(&fs).await.expect("fsck failed");
{
let root_store = fs.root_store();
// Check the first two objects which should exist.
for &object_id in &object_ids[0..1] {
let handle =
ObjectStore::open_object(&root_store, object_id, HandleOptions::default())
.await
.expect("open_object failed");
let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize);
assert_eq!(
handle.read(0, buf.as_mut()).await.expect("read failed"),
TEST_DATA.len()
);
assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
}
// Write one more object and sync.
let root_directory =
Directory::open(&root_store, root_store.root_directory_object_id())
.await
.expect("open failed");
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let handle = root_directory
.create_child_file(&mut transaction, "test2")
.await
.expect("create_child_file failed");
transaction.commit().await;
let mut buf = handle.allocate_buffer(TEST_DATA.len());
buf.as_mut_slice().copy_from_slice(TEST_DATA);
handle.write(0, buf.as_ref()).await.expect("write failed");
fs.sync(SyncOptions::default()).await.expect("sync failed");
object_ids.push(handle.object_id());
}
fs.close().await.expect("Close failed");
let device = fs.take_device().await;
device.reopen();
let fs = FxFilesystem::open(device).await.expect("open failed");
{
fsck(&fs).await.expect("fsck failed");
// Check the first two and the last objects.
for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
let handle =
ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default())
.await
.expect(&format!("open_object failed (object_id: {})", object_id));
let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize);
assert_eq!(
handle.read(0, buf.as_mut()).await.expect("read failed"),
TEST_DATA.len()
);
assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
}
}
fs.close().await.expect("Close failed");
}
}