blob: 6fd1fe3436905fb34d9abab0895aa17422551163 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
device::buffer::{Buffer, BufferRef, MutableBufferRef},
lsm_tree::types::LayerIterator,
object_handle::ObjectHandle,
object_store::{
constants::SUPER_BLOCK_OBJECT_ID,
journal::{
reader::{JournalReader, ReadResult},
writer::JournalWriter,
JournalCheckpoint,
},
record::ObjectItem,
transaction::Transaction,
Device, ObjectStore,
},
},
anyhow::{bail, Error},
async_trait::async_trait,
bincode::serialize_into,
serde::{Deserialize, Serialize},
std::{
cmp::min,
collections::HashMap,
ops::{Bound, Range},
sync::Arc,
},
};
const SUPER_BLOCK_BLOCK_SIZE: usize = 8192;
const SUPER_BLOCK_CHUNK_SIZE: u64 = 65536;
// The first 512 KiB on the disk are reserved for the super block.
const MIN_SUPER_BLOCK_SIZE: u64 = 524_288;
// A super-block consists of this header followed by records that are to be replayed into the root
// parent object store.
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct SuperBlock {
// TODO(csuter): version stuff
// TODO(csuter): UUID
// The root parent store is an in-memory only store and serves as the backing store for the root
// store and the journal. The records for this store are serialized into the super-block and
// mutations are also recorded in the journal.
pub root_parent_store_object_id: u64,
// The root object store contains all other metadata objects (including the allocator, the
// journal and the super-blocks) and is the parent for all other object stores.
pub root_store_object_id: u64,
// This is in the root object store.
pub allocator_object_id: u64,
// This is in the root parent object store.
pub journal_object_id: u64,
// Start checkpoint for the journal file.
pub journal_checkpoint: JournalCheckpoint,
// Offset of the journal file when the super-block was written. If no entry is present in
// journal_file_offsets for a particular object, then an object might have dependencies on the
// journal from super_block_journal_file_offset onwards, but not earlier.
pub super_block_journal_file_offset: u64,
// object id -> journal file offset. Indicates where each object has been flushed to.
pub journal_file_offsets: HashMap<u64, u64>,
}
// TODO(csuter): Add support for multiple super-blocks.
pub fn first_extent() -> Range<u64> {
return 0..MIN_SUPER_BLOCK_SIZE;
}
#[derive(Serialize, Deserialize)]
enum SuperBlockRecord {
// When reading the super-block we know the initial extent, but not subsequent extents, so these
// records need to exist to allow us to completely read the super-block.
Extent(Range<u64>),
// Following the super-block header are ObjectItem records that are to be replayed into the root
// parent object store.
Item(ObjectItem),
// Marks the end of the full super-block.
End,
}
// When we are reading the super-block we have to use something special for reading it because we
// don't have an object store we can use.
struct SuperBlockHandle {
device: Arc<dyn Device>,
extents: Vec<Range<u64>>,
}
#[async_trait]
impl ObjectHandle for SuperBlockHandle {
fn object_id(&self) -> u64 {
SUPER_BLOCK_OBJECT_ID
}
fn allocate_buffer(&self, size: usize) -> Buffer<'_> {
self.device.allocate_buffer(size)
}
async fn read(&self, mut offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
let len = buf.len();
let mut buf_offset = 0;
let mut file_offset = 0;
for extent in &self.extents {
let extent_len = extent.end - extent.start;
if offset < file_offset + extent_len {
let device_offset = extent.start + offset - file_offset;
let to_read = min(extent.end - device_offset, (len - buf_offset) as u64) as usize;
assert!(buf_offset % self.device.block_size() as usize == 0);
self.device
.read(
device_offset,
buf.reborrow().subslice_mut(buf_offset..buf_offset + to_read),
)
.await?;
buf_offset += to_read;
if buf_offset == len {
break;
}
offset += to_read as u64;
}
file_offset += extent_len;
}
Ok(len)
}
async fn txn_write<'a>(
&'a self,
_transaction: &mut Transaction<'a>,
_offset: u64,
_buf: BufferRef<'_>,
) -> Result<(), Error> {
unreachable!();
}
fn get_size(&self) -> u64 {
unreachable!();
}
async fn truncate<'a>(
&'a self,
_transaction: &mut Transaction<'a>,
_length: u64,
) -> Result<(), Error> {
unreachable!();
}
async fn preallocate_range<'a>(
&'a self,
_transaction: &mut Transaction<'a>,
_range: Range<u64>,
) -> Result<Vec<Range<u64>>, Error> {
unreachable!();
}
async fn new_transaction<'a>(&self) -> Result<Transaction<'a>, Error> {
unreachable!();
}
}
impl SuperBlock {
pub(super) fn new(
root_parent_store_object_id: u64,
root_store_object_id: u64,
allocator_object_id: u64,
journal_object_id: u64,
journal_checkpoint: JournalCheckpoint,
) -> Self {
SuperBlock {
root_parent_store_object_id,
root_store_object_id,
allocator_object_id,
journal_object_id,
journal_checkpoint,
..Default::default()
}
}
/// Read the super-block header, and return it and a reader that produces the records that are
/// to be replayed in to the root parent object store.
pub async fn read(device: Arc<dyn Device>) -> Result<(SuperBlock, ItemReader), Error> {
let mut reader = JournalReader::new(
SuperBlockHandle { device, extents: vec![first_extent()] },
SUPER_BLOCK_BLOCK_SIZE as u64,
&JournalCheckpoint::default(),
);
let super_block = match reader.deserialize::<SuperBlock>().await? {
ReadResult::Reset => bail!("Unexpected reset"),
ReadResult::ChecksumMismatch => bail!("Checksum mismatch"),
ReadResult::Some(super_block) => super_block,
};
Ok((super_block, ItemReader(reader)))
}
/// Writes the super-block and the records from the root parent store.
pub(super) async fn write<'a>(
&self,
root_parent_store: &'a ObjectStore,
handle: impl ObjectHandle,
) -> Result<(), Error> {
assert_eq!(root_parent_store.store_object_id(), self.root_parent_store_object_id);
let mut writer = JournalWriter::new(Some(handle), SUPER_BLOCK_BLOCK_SIZE, 0);
serialize_into(&mut writer, &self)?;
let tree = root_parent_store.tree();
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Unbounded).await?;
let mut next_extent_offset = MIN_SUPER_BLOCK_SIZE;
while let Some(item_ref) = iter.get() {
if writer.journal_file_checkpoint().file_offset
>= next_extent_offset - SUPER_BLOCK_CHUNK_SIZE
{
let handle = writer.handle().unwrap();
let mut transaction = handle.new_transaction().await?;
let allocated = handle
.preallocate_range(
&mut transaction,
next_extent_offset..next_extent_offset + SUPER_BLOCK_CHUNK_SIZE,
)
.await?;
transaction.commit().await;
for device_range in allocated {
next_extent_offset += device_range.end - device_range.start;
serialize_into(&mut writer, &SuperBlockRecord::Extent(device_range))?;
}
}
serialize_into(
&mut writer,
&SuperBlockRecord::Item(ObjectItem {
key: (*item_ref.key).clone(),
value: (*item_ref.value).clone(),
}),
)?;
iter.advance().await?;
}
serialize_into(&mut writer, &SuperBlockRecord::End)?;
writer.pad_to_block()?;
writer.maybe_flush_buffer().await?;
Ok(())
}
}
pub struct ItemReader(JournalReader<SuperBlockHandle>);
impl ItemReader {
pub async fn next_item(&mut self) -> Result<Option<ObjectItem>, Error> {
loop {
match self.0.deserialize().await? {
ReadResult::Reset => bail!("Unexpected reset"),
ReadResult::ChecksumMismatch => bail!("Checksum mismatch"),
ReadResult::Some(SuperBlockRecord::Extent(extent)) => {
self.0.handle().extents.push(extent)
}
ReadResult::Some(SuperBlockRecord::Item(item)) => return Ok(Some(item)),
ReadResult::Some(SuperBlockRecord::End) => return Ok(None),
}
}
}
}
#[cfg(test)]
mod tests {
use {
super::{SuperBlock, MIN_SUPER_BLOCK_SIZE},
crate::{
device::DeviceHolder,
lsm_tree::types::LayerIterator,
object_handle::ObjectHandle,
object_store::{
allocator::Allocator,
constants::SUPER_BLOCK_OBJECT_ID,
filesystem::Filesystem,
journal::JournalCheckpoint,
testing::{fake_allocator::FakeAllocator, fake_filesystem::FakeFilesystem},
transaction::TransactionHandler,
HandleOptions, ObjectStore,
},
testing::fake_device::FakeDevice,
},
fuchsia_async as fasync,
std::{ops::Bound, sync::Arc},
};
const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
#[fasync::run_singlethreaded(test)]
async fn test_read_written_super_block() {
let device = DeviceHolder::new(FakeDevice::new(2048, TEST_DEVICE_BLOCK_SIZE));
let fs = FakeFilesystem::new(device);
let allocator = Arc::new(FakeAllocator::new());
fs.object_manager().set_allocator(allocator.clone());
let root_parent_store = ObjectStore::new_empty(None, 2, fs.clone());
let mut transaction =
fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
let root_store = root_parent_store
.create_child_store_with_id(&mut transaction, 3)
.await
.expect("create_child_store failed");
const JOURNAL_OBJECT_ID: u64 = 4;
// Create a large number of objects in the root parent store so that we test handling of
// extents.
for _ in 0..10000 {
let mut transaction =
fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
ObjectStore::create_object(
&root_parent_store,
&mut transaction,
HandleOptions::default(),
)
.await
.expect("create_object failed");
transaction.commit().await;
}
let mut super_block = SuperBlock::new(
root_parent_store.store_object_id(),
root_store.store_object_id(),
allocator.object_id(),
JOURNAL_OBJECT_ID,
JournalCheckpoint { file_offset: 1234, checksum: 5678 },
);
super_block.journal_file_offsets.insert(1, 2);
let handle; // extend will borrow handle and needs to outlive transaction.
let mut transaction =
fs.clone().new_transaction(&[]).await.expect("new_transaction failed");
handle = ObjectStore::create_object_with_id(
&root_store,
&mut transaction,
SUPER_BLOCK_OBJECT_ID,
HandleOptions { overwrite: true, ..Default::default() },
)
.await
.expect("create_object_with_id failed");
handle.extend(&mut transaction, super::first_extent()).await.expect("extend failed");
transaction.commit().await;
let layer_set = root_parent_store.tree().layer_set();
let mut merger = layer_set.merger();
super_block.write(&root_parent_store, handle).await.expect("write failed");
// Make sure we did actually extend the super block.
let handle =
ObjectStore::open_object(&root_store, SUPER_BLOCK_OBJECT_ID, HandleOptions::default())
.await
.expect("open_object failed");
assert!(handle.get_size() > MIN_SUPER_BLOCK_SIZE);
let mut written_super_block = SuperBlock::read(fs.device()).await.expect("read failed");
assert_eq!(written_super_block.0, super_block);
// Check that the records match what we expect in the root parent store.
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
while let Some(item) = written_super_block.1.next_item().await.expect("next_item failed") {
assert_eq!(item.as_item_ref(), iter.get().expect("missing item"));
iter.advance().await.expect("advance failed");
}
}
}