blob: 5587690fb09a28ecacb22a0268db2b3d91b2dbef [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
lsm_tree::types::{
BoxedLayerIterator, Item, ItemRef, Key, Layer, LayerIterator, LayerWriter, Value,
},
object_handle::{ObjectHandle, WriteBytes},
},
anyhow::{bail, Context, Error},
async_trait::async_trait,
async_utils::event::Event,
byteorder::{ByteOrder, LittleEndian, ReadBytesExt},
serde::Serialize,
std::{
cmp::Ordering,
ops::{Bound, Drop},
sync::{Arc, Mutex},
vec::Vec,
},
};
/// Implements a very primitive persistent layer where items are packed into blocks and searching
/// for items is done via a simple binary search. Each block starts with a 2 byte item count so
/// there is a 64k item limit per block.
pub struct SimplePersistentLayer {
object_handle: Arc<dyn ObjectHandle>,
block_size: u32,
size: u64,
close_event: Mutex<Option<Event>>,
}
pub struct Iterator<'iter, K, V> {
layer: &'iter SimplePersistentLayer,
// The position of the _next_ block to be read.
pos: u64,
// A cursor contiaining the most recently read block.
reader: Option<std::io::Cursor<Box<[u8]>>>,
// The item index in the current block.
item_index: u16,
// The number of items in the current block.
item_count: u16,
// The current item.
item: Option<Item<K, V>>,
}
impl<K, V> Iterator<'_, K, V> {
fn new<'iter>(layer: &'iter SimplePersistentLayer, pos: u64) -> Iterator<'iter, K, V> {
Iterator { layer, pos, reader: None, item_index: 0, item_count: 0, item: None }
}
}
#[async_trait]
impl<'iter, K: Key, V: Value> LayerIterator<K, V> for Iterator<'iter, K, V> {
async fn advance(&mut self) -> Result<(), Error> {
if self.item_index >= self.item_count {
if self.pos >= self.layer.size {
self.item = None;
return Ok(());
}
// TODO(jfsulliv): Reuse this transfer buffer.
let bs = self.layer.block_size as usize;
let mut buf = self.layer.object_handle.allocate_buffer(bs);
let len = self.layer.object_handle.read(self.pos, buf.as_mut()).await?;
log::debug!(
"pos={}, object size={}, object id={}",
self.pos,
self.layer.size,
self.layer.object_handle.object_id()
);
let mut reader = std::io::Cursor::new(buf.as_slice()[..len].to_vec().into());
self.item_count = reader.read_u16::<LittleEndian>()?;
if self.item_count == 0 {
bail!(
"Read block with zero item count (object: {}, offset: {})",
self.layer.object_handle.object_id(),
self.pos
);
}
self.reader = Some(reader);
self.pos += self.layer.block_size as u64;
self.item_index = 0;
}
self.item = Some(
bincode::deserialize_from(self.reader.as_mut().unwrap()).context("Corrupt layer")?,
);
self.item_index += 1;
Ok(())
}
fn get(&self) -> Option<ItemRef<'_, K, V>> {
return self.item.as_ref().map(<&Item<K, V>>::into);
}
}
fn round_down(value: u64, block_size: u32) -> u64 {
value - value % block_size as u64
}
fn round_up(value: u64, block_size: u32) -> u64 {
round_down(value + block_size as u64 - 1, block_size)
}
impl SimplePersistentLayer {
/// Opens an existing layer that is accessible via |object_handle| (which provides a read
/// interface to the object). The layer should have been written prior using
/// SimplePersistentLayerWriter.
pub async fn open(
object_handle: impl ObjectHandle + 'static,
block_size: u32,
) -> Result<Arc<Self>, Error> {
let size = object_handle.get_size();
Ok(Arc::new(SimplePersistentLayer {
object_handle: Arc::new(object_handle),
block_size,
size,
close_event: Mutex::new(Some(Event::new())),
}))
}
}
#[async_trait]
impl<K: Key, V: Value> Layer<K, V> for SimplePersistentLayer {
fn handle(&self) -> Option<&dyn ObjectHandle> {
Some(self.object_handle.as_ref())
}
async fn seek<'a>(&'a self, bound: Bound<&K>) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
let (key, excluded) = match bound {
Bound::Unbounded => {
let mut iterator = Iterator::new(self, 0);
iterator.advance().await?;
return Ok(Box::new(iterator));
}
Bound::Included(k) => (k, false),
Bound::Excluded(k) => (k, true),
};
let mut left_offset = 0;
let mut right_offset = round_up(self.size, self.block_size);
let mut left = Iterator::new(self, left_offset);
left.advance().await?;
match left.get() {
None => return Ok(Box::new(left)),
Some(item) => match item.key.cmp_upper_bound(key) {
Ordering::Greater => return Ok(Box::new(left)),
Ordering::Equal => {
if excluded {
left.advance().await?;
}
return Ok(Box::new(left));
}
Ordering::Less => {}
},
}
while right_offset - left_offset > self.block_size as u64 {
// Pick a block midway.
let mid_offset =
round_down(left_offset + (right_offset - left_offset) / 2, self.block_size);
let mut iterator = Iterator::new(self, mid_offset);
iterator.advance().await?;
let item: ItemRef<'_, K, V> = iterator.get().unwrap();
match item.key.cmp_upper_bound(key) {
Ordering::Greater => right_offset = mid_offset,
Ordering::Equal => {
if excluded {
iterator.advance().await?;
}
return Ok(Box::new(iterator));
}
Ordering::Less => {
left_offset = mid_offset;
left = iterator;
}
}
}
// At this point, we know that left_key < key and right_key >= key, so we have to iterate
// through left_key to find the key we want.
loop {
left.advance().await?;
match left.get() {
None => return Ok(Box::new(left)),
Some(item) => match item.key.cmp_upper_bound(key) {
Ordering::Greater => return Ok(Box::new(left)),
Ordering::Equal => {
if excluded {
left.advance().await?;
}
return Ok(Box::new(left));
}
Ordering::Less => {}
},
}
}
}
fn lock(&self) -> Option<Event> {
self.close_event.lock().unwrap().clone()
}
async fn close(&self) {
let _ = {
let event = self.close_event.lock().unwrap().take().expect("close already called");
event.wait_or_dropped()
}
.await;
}
}
// -- Writer support --
pub struct SimplePersistentLayerWriter<W> {
block_size: u32,
buf: Vec<u8>,
writer: W,
item_count: u16,
}
impl<W: WriteBytes> SimplePersistentLayerWriter<W> {
/// Creates a new writer that will serialize items to the object accessible via |object_handle|
/// (which provdes a write interface to the object).
pub fn new(writer: W, block_size: u32) -> Self {
SimplePersistentLayerWriter { block_size, buf: vec![0; 2], writer, item_count: 0 }
}
async fn flush_some(&mut self, len: usize) -> Result<(), Error> {
if self.item_count == 0 {
return Ok(());
}
LittleEndian::write_u16(&mut self.buf[0..2], self.item_count);
self.writer.write_bytes(&self.buf[..len]).await?;
self.writer.skip(self.block_size as u64 - len as u64);
log::debug!("wrote {} items, {} bytes", self.item_count, len);
self.buf.drain(..len - 2); // 2 bytes are used for the next item count.
self.item_count = 0;
Ok(())
}
}
#[async_trait]
impl<W: WriteBytes + Send> LayerWriter for SimplePersistentLayerWriter<W> {
async fn write<K: Send + Serialize + Sync, V: Send + Serialize + Sync>(
&mut self,
item: ItemRef<'_, K, V>,
) -> Result<(), Error> {
// Note the length before we write this item.
let len = self.buf.len();
bincode::serialize_into(&mut self.buf, &item)?;
// If writing the item took us over a block, flush the bytes in the buffer prior to this
// item.
if self.buf.len() > self.block_size as usize - 1 || self.item_count == 65535 {
self.flush_some(len).await?;
}
self.item_count += 1;
Ok(())
}
async fn flush(&mut self) -> Result<(), Error> {
self.flush_some(self.buf.len()).await
}
}
impl<W> Drop for SimplePersistentLayerWriter<W> {
fn drop(&mut self) {
if self.item_count > 0 {
log::warn!("Dropping unwritten items; did you forget to flush?");
}
}
}
#[cfg(test)]
mod tests {
use {
super::{SimplePersistentLayer, SimplePersistentLayerWriter},
crate::{
lsm_tree::types::{DefaultOrdUpperBound, Item, ItemRef, Layer, LayerWriter},
object_handle::Writer,
object_store::transaction,
testing::fake_object::{FakeObject, FakeObjectHandle},
},
fuchsia_async as fasync,
std::{ops::Bound, sync::Arc},
};
impl DefaultOrdUpperBound for i32 {}
#[fasync::run_singlethreaded(test)]
async fn test_iterate_after_write() {
const BLOCK_SIZE: u32 = 512;
const ITEM_COUNT: i32 = 10000;
let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
{
let mut writer = SimplePersistentLayerWriter::new(
Writer::new(&handle, transaction::Options::default()),
BLOCK_SIZE,
);
for i in 0..ITEM_COUNT {
writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
}
writer.flush().await.expect("flush failed");
}
let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed");
let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
for i in 0..ITEM_COUNT {
let ItemRef { key, value, .. } = iterator.get().expect("missing item");
assert_eq!((key, value), (&i, &i));
iterator.advance().await.expect("failed to advance");
}
assert!(iterator.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_seek_after_write() {
const BLOCK_SIZE: u32 = 512;
const ITEM_COUNT: i32 = 10000;
let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
{
let mut writer = SimplePersistentLayerWriter::new(
Writer::new(&handle, transaction::Options::default()),
BLOCK_SIZE,
);
for i in 0..ITEM_COUNT {
writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
}
writer.flush().await.expect("flush failed");
}
let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed");
for i in 0..ITEM_COUNT {
let mut iterator = layer.seek(Bound::Included(&i)).await.expect("failed to seek");
let ItemRef { key, value, .. } = iterator.get().expect("missing item");
assert_eq!((key, value), (&i, &i));
// Check that we can advance to the next item.
iterator.advance().await.expect("failed to advance");
if i == ITEM_COUNT - 1 {
assert!(iterator.get().is_none());
} else {
let ItemRef { key, value, .. } = iterator.get().expect("missing item");
let j = i + 1;
assert_eq!((key, value), (&j, &j));
}
}
}
#[fasync::run_singlethreaded(test)]
async fn test_seek_unbounded() {
const BLOCK_SIZE: u32 = 512;
const ITEM_COUNT: i32 = 10000;
let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
{
let mut writer = SimplePersistentLayerWriter::new(
Writer::new(&handle, transaction::Options::default()),
BLOCK_SIZE,
);
for i in 0..ITEM_COUNT {
writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
}
writer.flush().await.expect("flush failed");
}
let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed");
let mut iterator = layer.seek(Bound::Unbounded).await.expect("failed to seek");
let ItemRef { key, value, .. } = iterator.get().expect("missing item");
assert_eq!((key, value), (&0, &0));
// Check that we can advance to the next item.
iterator.advance().await.expect("failed to advance");
let ItemRef { key, value, .. } = iterator.get().expect("missing item");
assert_eq!((key, value), (&1, &1));
}
#[fasync::run_singlethreaded(test)]
async fn test_zero_items() {
const BLOCK_SIZE: u32 = 512;
let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
{
let mut writer = SimplePersistentLayerWriter::new(
Writer::new(&handle, transaction::Options::default()),
BLOCK_SIZE,
);
writer.flush().await.expect("flush failed");
}
let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed");
let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
.seek(Bound::Unbounded)
.await
.expect("seek failed");
assert!(iterator.get().is_none())
}
#[fasync::run_singlethreaded(test)]
async fn test_large_block_size() {
// Large enough such that we hit the 64k item limit.
const BLOCK_SIZE: u32 = 2097152;
const ITEM_COUNT: i32 = 70000;
let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
{
let mut writer = SimplePersistentLayerWriter::new(
Writer::new(&handle, transaction::Options::default()),
BLOCK_SIZE,
);
for i in 0..ITEM_COUNT {
writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
}
writer.flush().await.expect("flush failed");
}
let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed");
let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
for i in 0..ITEM_COUNT {
let ItemRef { key, value, .. } = iterator.get().expect("missing item");
assert_eq!((key, value), (&i, &i));
iterator.advance().await.expect("failed to advance");
}
assert!(iterator.get().is_none());
}
#[fasync::run_singlethreaded(test)]
async fn test_seek_bound_excluded() {
const BLOCK_SIZE: u32 = 512;
const ITEM_COUNT: i32 = 10000;
let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
{
let mut writer = SimplePersistentLayerWriter::new(
Writer::new(&handle, transaction::Options::default()),
BLOCK_SIZE,
);
for i in 0..ITEM_COUNT {
writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
}
writer.flush().await.expect("flush failed");
}
let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed");
for i in 0..ITEM_COUNT {
let mut iterator = layer.seek(Bound::Excluded(&i)).await.expect("failed to seek");
let i_plus_one = i + 1;
if i_plus_one < ITEM_COUNT {
let ItemRef { key, value, .. } = iterator.get().expect("missing item");
assert_eq!((key, value), (&i_plus_one, &i_plus_one));
// Check that we can advance to the next item.
iterator.advance().await.expect("failed to advance");
let i_plus_two = i + 2;
if i_plus_two < ITEM_COUNT {
let ItemRef { key, value, .. } = iterator.get().expect("missing item");
assert_eq!((key, value), (&i_plus_two, &i_plus_two));
} else {
assert!(iterator.get().is_none());
}
} else {
assert!(iterator.get().is_none());
}
}
}
}