blob: 347898068e1117294b2333733f2010fcc1d2a999 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
lsm_tree::types::{
BoxedLayerIterator, Item, ItemRef, Key, Layer, LayerIterator, LayerWriter, Value,
},
object_handle::{ObjectHandle, ObjectHandleExt},
},
anyhow::{bail, Error},
async_trait::async_trait,
byteorder::{ByteOrder, LittleEndian, ReadBytesExt},
serde::Serialize,
std::{
ops::{Bound, Drop},
sync::Arc,
vec::Vec,
},
};
/// Implements a very primitive persistent layer where items are packed into blocks and searching
/// for items is done via a simple binary search. Each block starts with a 2 byte item count so
/// there is a 64k item limit per block.
pub struct SimplePersistentLayer {
object_handle: Arc<dyn ObjectHandle>,
block_size: u32,
size: u64,
}
pub struct Iterator<'iter, K, V> {
layer: &'iter SimplePersistentLayer,
// The position of the _next_ block to be read.
pos: u64,
// A cursor contiaining the most recently read block.
reader: Option<std::io::Cursor<Box<[u8]>>>,
// The item index in the current block.
item_index: u16,
// The number of items in the current block.
item_count: u16,
// The current item.
item: Option<Item<K, V>>,
}
impl<K, V> Iterator<'_, K, V> {
fn new<'iter>(layer: &'iter SimplePersistentLayer, pos: u64) -> Iterator<'iter, K, V> {
Iterator { layer, pos, reader: None, item_index: 0, item_count: 0, item: None }
}
}
#[async_trait]
impl<'iter, K: Key, V: Value> LayerIterator<K, V> for Iterator<'iter, K, V> {
async fn advance(&mut self) -> Result<(), Error> {
if self.item_index >= self.item_count {
if self.pos >= self.layer.size {
self.item = None;
return Ok(());
}
// TODO(jfsulliv): Reuse this transfer buffer.
let bs = self.layer.block_size as usize;
let mut buf = self.layer.object_handle.allocate_buffer(bs);
self.layer.object_handle.read(self.pos, buf.as_mut()).await?;
let mut vec = vec![0u8; bs];
vec.copy_from_slice(&buf.as_slice()[..bs]);
log::debug!(
"pos={}, object size={}, object id={}",
self.pos,
self.layer.size,
self.layer.object_handle.object_id()
);
let mut reader = std::io::Cursor::new(vec.into());
self.item_count = reader.read_u16::<LittleEndian>()?;
if self.item_count == 0 {
bail!("Read block with zero item count");
}
self.reader = Some(reader);
self.pos += self.layer.block_size as u64;
self.item_index = 0;
}
self.item = Some(bincode::deserialize_from(self.reader.as_mut().unwrap())?);
self.item_index += 1;
Ok(())
}
fn get(&self) -> Option<ItemRef<'_, K, V>> {
return self.item.as_ref().map(<&Item<K, V>>::into);
}
}
fn round_down(value: u64, block_size: u32) -> u64 {
value - value % block_size as u64
}
fn round_up(value: u64, block_size: u32) -> u64 {
round_down(value + block_size as u64 - 1, block_size)
}
impl SimplePersistentLayer {
/// Opens an existing layer that is accessible via |object_handle| (which provides a read
/// interface to the object). The layer should have been written prior using
/// SimplePersistentLayerWriter.
pub async fn open(
object_handle: impl ObjectHandle + 'static,
block_size: u32,
) -> Result<Arc<Self>, Error> {
let size = object_handle.get_size();
Ok(Arc::new(SimplePersistentLayer {
object_handle: Arc::new(object_handle),
block_size,
size,
}))
}
}
#[async_trait]
impl<K: Key, V: Value> Layer<K, V> for SimplePersistentLayer {
async fn seek<'a>(&'a self, bound: Bound<&K>) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
let key;
match bound {
Bound::Unbounded => {
let mut iterator = Iterator::new(self, 0);
iterator.advance().await?;
return Ok(Box::new(iterator));
}
Bound::Included(k) => {
key = k;
}
Bound::Excluded(_) => panic!("Excluded bound not supported"),
}
let mut left_offset = 0;
let mut right_offset = round_up(self.size, self.block_size);
let mut left = Iterator::new(self, left_offset);
left.advance().await?;
match left.get() {
None => {
return Ok(Box::new(left));
}
Some(item) => {
if item.key >= key {
return Ok(Box::new(left));
}
}
}
while right_offset - left_offset > self.block_size as u64 {
// Pick a block midway.
let mid_offset =
round_down(left_offset + (right_offset - left_offset) / 2, self.block_size);
let mut iterator = Iterator::new(self, mid_offset);
iterator.advance().await?;
if iterator.get().unwrap().key >= key {
right_offset = mid_offset;
} else {
left_offset = mid_offset;
left = iterator;
}
}
// At this point, we know that left_key < key and right_key >= key, so we have to iterate
// through left_key to find the key we want.
loop {
left.advance().await?;
match left.get() {
None => break,
Some(item) => {
if item.key >= key {
break;
}
}
}
}
Ok(Box::new(left))
}
}
// -- Writer support --
pub struct SimplePersistentLayerWriter<'a> {
block_size: u32,
buf: Vec<u8>,
object_handle: &'a dyn ObjectHandle,
item_count: u16,
offset: u64,
}
impl<'a> SimplePersistentLayerWriter<'a> {
/// Creates a new writer that will serialize items to the object accessible via |object_handle|
/// (which provdes a write interface to the object).
pub fn new(object_handle: &'a dyn ObjectHandle, block_size: u32) -> Self {
SimplePersistentLayerWriter {
block_size,
buf: vec![0; 2],
object_handle,
item_count: 0,
offset: 0,
}
}
async fn flush_some(&mut self, len: usize) -> Result<(), Error> {
if self.item_count == 0 {
return Ok(());
}
LittleEndian::write_u16(&mut self.buf[0..2], self.item_count);
// TODO(jfsulliv): Buffer directly into the transfer buffer.
// TODO(jfsulliv): Be more efficient by writing only aligned chunks of data.
let mut buf = self.object_handle.allocate_buffer(len);
// TODO(csuter): Consider making BufferRef implement AsRef<[u8]> to make this a bit tidier.
buf.as_mut_slice().copy_from_slice(&self.buf[..len]);
self.object_handle.write(self.offset, buf.as_ref()).await?;
log::debug!("wrote {} items, {} bytes", self.item_count, len);
self.buf.drain(..len - 2); // 2 bytes are used for the next item count.
self.item_count = 0;
self.offset += self.block_size as u64;
Ok(())
}
}
#[async_trait]
impl LayerWriter for SimplePersistentLayerWriter<'_> {
async fn write<K: Send + Serialize + Sync, V: Send + Serialize + Sync>(
&mut self,
item: ItemRef<'_, K, V>,
) -> Result<(), Error> {
// Note the length before we write this item.
let len = self.buf.len();
bincode::serialize_into(&mut self.buf, &item)?;
// If writing the item took us over a block, flush the bytes in the buffer prior to this
// item.
if self.buf.len() > self.block_size as usize - 1 || self.item_count == 65535 {
self.flush_some(len).await?;
}
self.item_count += 1;
Ok(())
}
async fn flush(&mut self) -> Result<(), Error> {
self.flush_some(self.buf.len()).await
}
}
impl Drop for SimplePersistentLayerWriter<'_> {
fn drop(&mut self) {
if self.item_count > 0 {
log::warn!("Dropping unwritten items; did you forget to flush?");
}
}
}
#[cfg(test)]
mod tests {
use {
super::{SimplePersistentLayer, SimplePersistentLayerWriter},
crate::{
lsm_tree::types::{Item, ItemRef, Layer, LayerWriter},
testing::fake_object::{FakeObject, FakeObjectHandle},
},
fuchsia_async as fasync,
std::{ops::Bound, sync::Arc},
};
#[fasync::run_singlethreaded(test)]
async fn test_iterate_after_write() {
const BLOCK_SIZE: u32 = 512;
const ITEM_COUNT: i32 = 10000;
let mut handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
{
let mut writer = SimplePersistentLayerWriter::new(&mut handle, BLOCK_SIZE);
for i in 0..ITEM_COUNT {
writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
}
writer.flush().await.expect("flush failed");
}
let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed");
let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
for i in 0..ITEM_COUNT {
let ItemRef { key, value } = iterator.get().expect("missing item");
assert_eq!((key, value), (&i, &i));
iterator.advance().await.expect("failed to advance");
}
assert!(iterator.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_seek_after_write() {
const BLOCK_SIZE: u32 = 512;
const ITEM_COUNT: i32 = 10000;
let mut handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
{
let mut writer = SimplePersistentLayerWriter::new(&mut handle, BLOCK_SIZE);
for i in 0..ITEM_COUNT {
writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
}
writer.flush().await.expect("flush failed");
}
let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed");
for i in 0..ITEM_COUNT {
let mut iterator = layer.seek(Bound::Included(&i)).await.expect("failed to seek");
let ItemRef { key, value } = iterator.get().expect("missing item");
assert_eq!((key, value), (&i, &i));
// Check that we can advance to the next item.
iterator.advance().await.expect("failed to advance");
if i == ITEM_COUNT - 1 {
assert!(iterator.get().is_none());
} else {
let ItemRef { key, value } = iterator.get().expect("missing item");
let j = i + 1;
assert_eq!((key, value), (&j, &j));
}
}
}
#[fasync::run_singlethreaded(test)]
async fn test_seek_unbounded() {
const BLOCK_SIZE: u32 = 512;
const ITEM_COUNT: i32 = 10000;
let mut handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
{
let mut writer = SimplePersistentLayerWriter::new(&mut handle, BLOCK_SIZE);
for i in 0..ITEM_COUNT {
writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
}
writer.flush().await.expect("flush failed");
}
let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed");
let mut iterator = layer.seek(Bound::Unbounded).await.expect("failed to seek");
let ItemRef { key, value } = iterator.get().expect("missing item");
assert_eq!((key, value), (&0, &0));
// Check that we can advance to the next item.
iterator.advance().await.expect("failed to advance");
let ItemRef { key, value } = iterator.get().expect("missing item");
assert_eq!((key, value), (&1, &1));
}
#[fasync::run_singlethreaded(test)]
async fn test_zero_items() {
const BLOCK_SIZE: u32 = 512;
let mut handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
{
let mut writer = SimplePersistentLayerWriter::new(&mut handle, BLOCK_SIZE);
writer.flush().await.expect("flush failed");
}
let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed");
let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
.seek(Bound::Unbounded)
.await
.expect("seek failed");
assert!(iterator.get().is_none())
}
#[fasync::run_singlethreaded(test)]
async fn test_large_block_size() {
// Large enough such that we hit the 64k item limit.
const BLOCK_SIZE: u32 = 2097152;
const ITEM_COUNT: i32 = 70000;
let mut handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
{
let mut writer = SimplePersistentLayerWriter::new(&mut handle, BLOCK_SIZE);
for i in 0..ITEM_COUNT {
writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
}
writer.flush().await.expect("flush failed");
}
let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed");
let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
for i in 0..ITEM_COUNT {
let ItemRef { key, value } = iterator.get().expect("missing item");
assert_eq!((key, value), (&i, &i));
iterator.advance().await.expect("failed to advance");
}
assert!(iterator.get().is_none());
}
}