blob: 6a8f02698fc82dc8fc05077902131ab42b96872c [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
object_handle::{ObjectHandle, ObjectHandleExt},
object_store::journal::{fletcher64, Checksum, JournalCheckpoint, RESET_XOR},
},
anyhow::Error,
bincode::serialize_into,
byteorder::{LittleEndian, WriteBytesExt},
serde::Serialize,
std::{cmp::min, io::Write},
};
/// JournalWriter is responsible for writing log records to a journal file. Each block contains a
/// fletcher64 checksum at the end of the block. This is used by both the main journal file and the
/// super-block.
pub struct JournalWriter<OH> {
// The handle to write to. We allow lazy initialisation so that a writer can be instantiated
// before replay has completed.
handle: Option<OH>,
// The block size used for this journal file.
block_size: usize,
// The offset in the file.
offset: u64,
// The last checksum we wrote.
last_checksum: Checksum,
// The buffered data for the current block.
buf: Vec<u8>,
// If true, the next block we write should indicate the stream was reset.
reset: bool,
}
impl<OH> JournalWriter<OH> {
pub fn new(handle: Option<OH>, block_size: usize, last_checksum: u64) -> Self {
JournalWriter {
handle,
block_size,
offset: 0,
last_checksum,
buf: Vec::new(),
reset: false,
}
}
/// Serializes a new journal record to the journal stream.
pub fn write_record<T: Serialize>(&mut self, record: &T) {
serialize_into(&mut *self, record).unwrap() // Our write implementation cannot fail at the
// moment.
}
/// Pads from the current offset in the buffer to the end of the block.
pub fn pad_to_block(&mut self) -> std::io::Result<()> {
let align = self.buf.len() % self.block_size;
if align > 0 {
self.write(&vec![0; self.block_size - std::mem::size_of::<Checksum>() - align])?;
}
Ok(())
}
/// Returns the checkpoint that corresponds to the current location in the journal stream
/// assuming that it has been flushed.
pub(super) fn journal_file_checkpoint(&self) -> JournalCheckpoint {
JournalCheckpoint::new(self.offset + self.buf.len() as u64, self.last_checksum)
}
/// Flushes any outstanding complete blocks to the journal object. Part blocks can be flushed
/// by calling pad_to_block first. Does nothing if no handle has been set yet.
pub async fn maybe_flush_buffer(&mut self) -> Result<(), Error>
where
OH: ObjectHandle,
{
if let Some(ref handle) = self.handle {
let to_do = self.buf.len() - self.buf.len() % self.block_size;
if to_do > 0 {
// TODO(jfsulliv): This is horribly inefficient. We should reuse the transfer
// buffer. Doing so will require picking an appropriate size up front, and forcing
// flush as we fill it up.
let mut buf = handle.allocate_buffer(to_do);
buf.as_mut_slice()[..to_do].copy_from_slice(self.buf.drain(..to_do).as_slice());
buf.as_mut_slice()[to_do..].fill(0u8);
handle.write(self.offset, buf.as_ref()).await?;
self.offset += to_do as u64;
}
}
Ok(())
}
pub fn handle(&self) -> Option<&OH> {
self.handle.as_ref()
}
/// Sets the handle, to be used once replay has finished.
pub fn set_handle(&mut self, handle: OH) {
assert!(self.handle.is_none());
self.handle = Some(handle);
}
/// Seeks to the given offset in the journal file, to be used once replay has finished.
pub fn seek_to_checkpoint(&mut self, checkpoint: JournalCheckpoint, reset: bool) {
assert!(self.buf.is_empty());
self.offset = checkpoint.file_offset;
self.last_checksum = checkpoint.checksum;
self.reset = reset;
}
}
impl<OH> std::io::Write for JournalWriter<OH> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut offset = 0;
while offset < buf.len() {
let space = self.block_size
- std::mem::size_of::<Checksum>()
- self.buf.len() % self.block_size;
let to_copy = min(space, buf.len() - offset);
self.buf.write(&buf[offset..offset + to_copy])?;
if to_copy == space {
let end = self.buf.len();
let start = end + std::mem::size_of::<Checksum>() - self.block_size;
self.last_checksum = fletcher64(&self.buf[start..end], self.last_checksum);
if self.reset {
self.last_checksum ^= RESET_XOR;
self.reset = false;
}
self.buf.write_u64::<LittleEndian>(self.last_checksum)?;
}
offset += to_copy;
}
Ok(buf.len())
}
// This does nothing because it's sync. Users must call the async flush_buffer function to
// flush outstanding data.
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl<OH> Drop for JournalWriter<OH> {
fn drop(&mut self) {
// If this message is logged it means we forgot to call flush_buffer(), which in turn might
// mean Journal::sync() was not called.
if self.buf.len() > 0 {
log::warn!("journal data dropped!");
}
}
}
#[cfg(test)]
mod tests {
use {
super::JournalWriter,
crate::{
object_handle::ObjectHandle,
object_store::journal::{fletcher64, Checksum, JournalCheckpoint, RESET_XOR},
testing::fake_object::{FakeObject, FakeObjectHandle},
},
bincode::deserialize_from,
byteorder::{ByteOrder, LittleEndian},
fuchsia_async as fasync,
std::sync::Arc,
};
const TEST_BLOCK_SIZE: usize = 512;
#[fasync::run_singlethreaded(test)]
async fn test_write_single_record_and_pad() {
let object = Arc::new(FakeObject::new());
let mut writer =
JournalWriter::new(Some(FakeObjectHandle::new(object.clone())), TEST_BLOCK_SIZE, 0);
writer.write_record(&4u32);
writer.pad_to_block().expect("pad_to_block failed");
writer.maybe_flush_buffer().await.expect("flush_buffer failed");
let handle = FakeObjectHandle::new(object.clone());
let mut buf = handle.allocate_buffer(object.get_size() as usize);
assert_eq!(buf.len(), TEST_BLOCK_SIZE);
handle.read(0, buf.as_mut()).await.expect("read failed");
let value: u32 = deserialize_from(buf.as_slice()).expect("deserialize_from failed");
assert_eq!(value, 4u32);
let (payload, checksum_slice) =
buf.as_slice().split_at(buf.len() - std::mem::size_of::<Checksum>());
let checksum = LittleEndian::read_u64(checksum_slice);
assert_eq!(checksum, fletcher64(payload, 0));
assert_eq!(
writer.journal_file_checkpoint(),
JournalCheckpoint { file_offset: TEST_BLOCK_SIZE as u64, checksum }
);
}
#[fasync::run_singlethreaded(test)]
async fn test_journal_file_checkpoint() {
let object = Arc::new(FakeObject::new());
let mut writer =
JournalWriter::new(Some(FakeObjectHandle::new(object.clone())), TEST_BLOCK_SIZE, 0);
writer.write_record(&4u32);
let checkpoint = writer.journal_file_checkpoint();
assert_eq!(checkpoint.checksum, 0);
writer.write_record(&17u64);
writer.pad_to_block().expect("pad_to_block failed");
writer.maybe_flush_buffer().await.expect("flush_buffer failed");
let handle = FakeObjectHandle::new(object.clone());
let mut buf = handle.allocate_buffer(object.get_size() as usize);
assert_eq!(buf.len(), TEST_BLOCK_SIZE);
handle.read(0, buf.as_mut()).await.expect("read failed");
let value: u64 = deserialize_from(&buf.as_slice()[checkpoint.file_offset as usize..])
.expect("deserialize_from failed");
assert_eq!(value, 17);
}
#[fasync::run_singlethreaded(test)]
async fn test_set_handle() {
let object = Arc::new(FakeObject::new());
let mut writer = JournalWriter::new(None, TEST_BLOCK_SIZE, 0);
writer.set_handle(FakeObjectHandle::new(object.clone()));
writer.seek_to_checkpoint(JournalCheckpoint::new(TEST_BLOCK_SIZE as u64 * 5, 12345), false);
writer.write_record(&12);
writer.pad_to_block().expect("pad_to_block failed");
writer.maybe_flush_buffer().await.expect("flush_buffer failed");
let handle = FakeObjectHandle::new(object.clone());
let mut buf = handle.allocate_buffer(object.get_size() as usize);
assert_eq!(buf.len(), TEST_BLOCK_SIZE * 6);
handle.read(0, buf.as_mut()).await.expect("read failed");
let (first_5_blocks, last_block) = buf.as_slice().split_at(TEST_BLOCK_SIZE * 5);
assert_eq!(first_5_blocks, &vec![0u8; TEST_BLOCK_SIZE * 5]);
let value: u64 =
deserialize_from(&last_block[..TEST_BLOCK_SIZE]).expect("deserialize_from failed");
assert_eq!(value, 12);
let (payload, checksum_slice) =
last_block.split_at(last_block.len() - std::mem::size_of::<Checksum>());
let checksum = LittleEndian::read_u64(checksum_slice);
assert_eq!(checksum, fletcher64(payload, 12345));
assert_eq!(
writer.journal_file_checkpoint(),
JournalCheckpoint { file_offset: TEST_BLOCK_SIZE as u64 * 6, checksum }
);
}
#[fasync::run_singlethreaded(test)]
async fn test_set_reset() {
let object = Arc::new(FakeObject::new());
let mut writer = JournalWriter::new(None, TEST_BLOCK_SIZE, 0);
writer.set_handle(FakeObjectHandle::new(object.clone()));
writer.seek_to_checkpoint(JournalCheckpoint::new(TEST_BLOCK_SIZE as u64 * 5, 12345), true);
writer.write_record(&12);
writer.pad_to_block().expect("pad_to_block failed");
writer.maybe_flush_buffer().await.expect("flush_buffer failed");
let handle = FakeObjectHandle::new(object.clone());
let mut buf = handle.allocate_buffer(object.get_size() as usize);
assert_eq!(buf.len(), TEST_BLOCK_SIZE * 6);
handle.read(0, buf.as_mut()).await.expect("read failed");
let (first_5_blocks, last_block) = buf.as_slice().split_at(TEST_BLOCK_SIZE * 5);
assert_eq!(first_5_blocks, &vec![0u8; TEST_BLOCK_SIZE * 5]);
let value: u64 =
deserialize_from(&last_block[..TEST_BLOCK_SIZE]).expect("deserialize_from failed");
assert_eq!(value, 12);
let (payload, checksum_slice) =
last_block.split_at(last_block.len() - std::mem::size_of::<Checksum>());
let checksum = LittleEndian::read_u64(checksum_slice);
assert_eq!(checksum, fletcher64(payload, 12345) ^ RESET_XOR);
assert_eq!(
writer.journal_file_checkpoint(),
JournalCheckpoint { file_offset: TEST_BLOCK_SIZE as u64 * 6, checksum }
);
}
}