| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| lsm_tree::types::{ |
| BoxedLayerIterator, Item, ItemRef, Key, Layer, LayerIterator, LayerWriter, Value, |
| }, |
| object_handle::{ObjectHandle, ObjectHandleExt}, |
| }, |
| anyhow::{bail, Error}, |
| async_trait::async_trait, |
| byteorder::{ByteOrder, LittleEndian, ReadBytesExt}, |
| serde::Serialize, |
| std::{ |
| ops::{Bound, Drop}, |
| sync::Arc, |
| vec::Vec, |
| }, |
| }; |
| |
| /// Implements a very primitive persistent layer where items are packed into blocks and searching |
| /// for items is done via a simple binary search. Each block starts with a 2 byte item count so |
| /// there is a 64k item limit per block. |
| pub struct SimplePersistentLayer { |
| object_handle: Arc<dyn ObjectHandle>, |
| block_size: u32, |
| size: u64, |
| } |
| |
| pub struct Iterator<'iter, K, V> { |
| layer: &'iter SimplePersistentLayer, |
| |
| // The position of the _next_ block to be read. |
| pos: u64, |
| |
| // A cursor contiaining the most recently read block. |
| reader: Option<std::io::Cursor<Box<[u8]>>>, |
| |
| // The item index in the current block. |
| item_index: u16, |
| |
| // The number of items in the current block. |
| item_count: u16, |
| |
| // The current item. |
| item: Option<Item<K, V>>, |
| } |
| |
| impl<K, V> Iterator<'_, K, V> { |
| fn new<'iter>(layer: &'iter SimplePersistentLayer, pos: u64) -> Iterator<'iter, K, V> { |
| Iterator { layer, pos, reader: None, item_index: 0, item_count: 0, item: None } |
| } |
| } |
| |
| #[async_trait] |
| impl<'iter, K: Key, V: Value> LayerIterator<K, V> for Iterator<'iter, K, V> { |
| async fn advance(&mut self) -> Result<(), Error> { |
| if self.item_index >= self.item_count { |
| if self.pos >= self.layer.size { |
| self.item = None; |
| return Ok(()); |
| } |
| // TODO(jfsulliv): Reuse this transfer buffer. |
| let bs = self.layer.block_size as usize; |
| let mut buf = self.layer.object_handle.allocate_buffer(bs); |
| self.layer.object_handle.read(self.pos, buf.as_mut()).await?; |
| let mut vec = vec![0u8; bs]; |
| vec.copy_from_slice(&buf.as_slice()[..bs]); |
| log::debug!( |
| "pos={}, object size={}, object id={}", |
| self.pos, |
| self.layer.size, |
| self.layer.object_handle.object_id() |
| ); |
| let mut reader = std::io::Cursor::new(vec.into()); |
| self.item_count = reader.read_u16::<LittleEndian>()?; |
| if self.item_count == 0 { |
| bail!("Read block with zero item count"); |
| } |
| self.reader = Some(reader); |
| self.pos += self.layer.block_size as u64; |
| self.item_index = 0; |
| } |
| self.item = Some(bincode::deserialize_from(self.reader.as_mut().unwrap())?); |
| self.item_index += 1; |
| Ok(()) |
| } |
| |
| fn get(&self) -> Option<ItemRef<'_, K, V>> { |
| return self.item.as_ref().map(<&Item<K, V>>::into); |
| } |
| } |
| |
| fn round_down(value: u64, block_size: u32) -> u64 { |
| value - value % block_size as u64 |
| } |
| |
| fn round_up(value: u64, block_size: u32) -> u64 { |
| round_down(value + block_size as u64 - 1, block_size) |
| } |
| |
| impl SimplePersistentLayer { |
| /// Opens an existing layer that is accessible via |object_handle| (which provides a read |
| /// interface to the object). The layer should have been written prior using |
| /// SimplePersistentLayerWriter. |
| pub async fn open( |
| object_handle: impl ObjectHandle + 'static, |
| block_size: u32, |
| ) -> Result<Arc<Self>, Error> { |
| let size = object_handle.get_size(); |
| Ok(Arc::new(SimplePersistentLayer { |
| object_handle: Arc::new(object_handle), |
| block_size, |
| size, |
| })) |
| } |
| } |
| |
| #[async_trait] |
| impl<K: Key, V: Value> Layer<K, V> for SimplePersistentLayer { |
| async fn seek<'a>(&'a self, bound: Bound<&K>) -> Result<BoxedLayerIterator<'a, K, V>, Error> { |
| let key; |
| match bound { |
| Bound::Unbounded => { |
| let mut iterator = Iterator::new(self, 0); |
| iterator.advance().await?; |
| return Ok(Box::new(iterator)); |
| } |
| Bound::Included(k) => { |
| key = k; |
| } |
| Bound::Excluded(_) => panic!("Excluded bound not supported"), |
| } |
| let mut left_offset = 0; |
| let mut right_offset = round_up(self.size, self.block_size); |
| let mut left = Iterator::new(self, left_offset); |
| left.advance().await?; |
| match left.get() { |
| None => { |
| return Ok(Box::new(left)); |
| } |
| Some(item) => { |
| if item.key >= key { |
| return Ok(Box::new(left)); |
| } |
| } |
| } |
| while right_offset - left_offset > self.block_size as u64 { |
| // Pick a block midway. |
| let mid_offset = |
| round_down(left_offset + (right_offset - left_offset) / 2, self.block_size); |
| let mut iterator = Iterator::new(self, mid_offset); |
| iterator.advance().await?; |
| if iterator.get().unwrap().key >= key { |
| right_offset = mid_offset; |
| } else { |
| left_offset = mid_offset; |
| left = iterator; |
| } |
| } |
| // At this point, we know that left_key < key and right_key >= key, so we have to iterate |
| // through left_key to find the key we want. |
| loop { |
| left.advance().await?; |
| match left.get() { |
| None => break, |
| Some(item) => { |
| if item.key >= key { |
| break; |
| } |
| } |
| } |
| } |
| Ok(Box::new(left)) |
| } |
| } |
| |
| // -- Writer support -- |
| |
| pub struct SimplePersistentLayerWriter<'a> { |
| block_size: u32, |
| buf: Vec<u8>, |
| object_handle: &'a dyn ObjectHandle, |
| item_count: u16, |
| offset: u64, |
| } |
| |
| impl<'a> SimplePersistentLayerWriter<'a> { |
| /// Creates a new writer that will serialize items to the object accessible via |object_handle| |
| /// (which provdes a write interface to the object). |
| pub fn new(object_handle: &'a dyn ObjectHandle, block_size: u32) -> Self { |
| SimplePersistentLayerWriter { |
| block_size, |
| buf: vec![0; 2], |
| object_handle, |
| item_count: 0, |
| offset: 0, |
| } |
| } |
| |
| async fn flush_some(&mut self, len: usize) -> Result<(), Error> { |
| if self.item_count == 0 { |
| return Ok(()); |
| } |
| LittleEndian::write_u16(&mut self.buf[0..2], self.item_count); |
| // TODO(jfsulliv): Buffer directly into the transfer buffer. |
| // TODO(jfsulliv): Be more efficient by writing only aligned chunks of data. |
| let mut buf = self.object_handle.allocate_buffer(len); |
| // TODO(csuter): Consider making BufferRef implement AsRef<[u8]> to make this a bit tidier. |
| buf.as_mut_slice().copy_from_slice(&self.buf[..len]); |
| self.object_handle.write(self.offset, buf.as_ref()).await?; |
| log::debug!("wrote {} items, {} bytes", self.item_count, len); |
| self.buf.drain(..len - 2); // 2 bytes are used for the next item count. |
| self.item_count = 0; |
| self.offset += self.block_size as u64; |
| Ok(()) |
| } |
| } |
| |
| #[async_trait] |
| impl LayerWriter for SimplePersistentLayerWriter<'_> { |
| async fn write<K: Send + Serialize + Sync, V: Send + Serialize + Sync>( |
| &mut self, |
| item: ItemRef<'_, K, V>, |
| ) -> Result<(), Error> { |
| // Note the length before we write this item. |
| let len = self.buf.len(); |
| bincode::serialize_into(&mut self.buf, &item)?; |
| |
| // If writing the item took us over a block, flush the bytes in the buffer prior to this |
| // item. |
| if self.buf.len() > self.block_size as usize - 1 || self.item_count == 65535 { |
| self.flush_some(len).await?; |
| } |
| |
| self.item_count += 1; |
| Ok(()) |
| } |
| |
| async fn flush(&mut self) -> Result<(), Error> { |
| self.flush_some(self.buf.len()).await |
| } |
| } |
| |
| impl Drop for SimplePersistentLayerWriter<'_> { |
| fn drop(&mut self) { |
| if self.item_count > 0 { |
| log::warn!("Dropping unwritten items; did you forget to flush?"); |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::{SimplePersistentLayer, SimplePersistentLayerWriter}, |
| crate::{ |
| lsm_tree::types::{Item, ItemRef, Layer, LayerWriter}, |
| testing::fake_object::{FakeObject, FakeObjectHandle}, |
| }, |
| fuchsia_async as fasync, |
| std::{ops::Bound, sync::Arc}, |
| }; |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_iterate_after_write() { |
| const BLOCK_SIZE: u32 = 512; |
| const ITEM_COUNT: i32 = 10000; |
| |
| let mut handle = FakeObjectHandle::new(Arc::new(FakeObject::new())); |
| { |
| let mut writer = SimplePersistentLayerWriter::new(&mut handle, BLOCK_SIZE); |
| for i in 0..ITEM_COUNT { |
| writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed"); |
| } |
| writer.flush().await.expect("flush failed"); |
| } |
| let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed"); |
| let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed"); |
| for i in 0..ITEM_COUNT { |
| let ItemRef { key, value } = iterator.get().expect("missing item"); |
| assert_eq!((key, value), (&i, &i)); |
| iterator.advance().await.expect("failed to advance"); |
| } |
| assert!(iterator.get().is_none()); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_seek_after_write() { |
| const BLOCK_SIZE: u32 = 512; |
| const ITEM_COUNT: i32 = 10000; |
| |
| let mut handle = FakeObjectHandle::new(Arc::new(FakeObject::new())); |
| { |
| let mut writer = SimplePersistentLayerWriter::new(&mut handle, BLOCK_SIZE); |
| for i in 0..ITEM_COUNT { |
| writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed"); |
| } |
| writer.flush().await.expect("flush failed"); |
| } |
| let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed"); |
| for i in 0..ITEM_COUNT { |
| let mut iterator = layer.seek(Bound::Included(&i)).await.expect("failed to seek"); |
| let ItemRef { key, value } = iterator.get().expect("missing item"); |
| assert_eq!((key, value), (&i, &i)); |
| |
| // Check that we can advance to the next item. |
| iterator.advance().await.expect("failed to advance"); |
| if i == ITEM_COUNT - 1 { |
| assert!(iterator.get().is_none()); |
| } else { |
| let ItemRef { key, value } = iterator.get().expect("missing item"); |
| let j = i + 1; |
| assert_eq!((key, value), (&j, &j)); |
| } |
| } |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_seek_unbounded() { |
| const BLOCK_SIZE: u32 = 512; |
| const ITEM_COUNT: i32 = 10000; |
| |
| let mut handle = FakeObjectHandle::new(Arc::new(FakeObject::new())); |
| { |
| let mut writer = SimplePersistentLayerWriter::new(&mut handle, BLOCK_SIZE); |
| for i in 0..ITEM_COUNT { |
| writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed"); |
| } |
| writer.flush().await.expect("flush failed"); |
| } |
| let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed"); |
| let mut iterator = layer.seek(Bound::Unbounded).await.expect("failed to seek"); |
| let ItemRef { key, value } = iterator.get().expect("missing item"); |
| assert_eq!((key, value), (&0, &0)); |
| |
| // Check that we can advance to the next item. |
| iterator.advance().await.expect("failed to advance"); |
| let ItemRef { key, value } = iterator.get().expect("missing item"); |
| assert_eq!((key, value), (&1, &1)); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_zero_items() { |
| const BLOCK_SIZE: u32 = 512; |
| |
| let mut handle = FakeObjectHandle::new(Arc::new(FakeObject::new())); |
| { |
| let mut writer = SimplePersistentLayerWriter::new(&mut handle, BLOCK_SIZE); |
| writer.flush().await.expect("flush failed"); |
| } |
| |
| let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed"); |
| let iterator = (layer.as_ref() as &dyn Layer<i32, i32>) |
| .seek(Bound::Unbounded) |
| .await |
| .expect("seek failed"); |
| assert!(iterator.get().is_none()) |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_large_block_size() { |
| // Large enough such that we hit the 64k item limit. |
| const BLOCK_SIZE: u32 = 2097152; |
| const ITEM_COUNT: i32 = 70000; |
| |
| let mut handle = FakeObjectHandle::new(Arc::new(FakeObject::new())); |
| { |
| let mut writer = SimplePersistentLayerWriter::new(&mut handle, BLOCK_SIZE); |
| for i in 0..ITEM_COUNT { |
| writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed"); |
| } |
| writer.flush().await.expect("flush failed"); |
| } |
| |
| let layer = SimplePersistentLayer::open(handle, BLOCK_SIZE).await.expect("new failed"); |
| let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed"); |
| for i in 0..ITEM_COUNT { |
| let ItemRef { key, value } = iterator.get().expect("missing item"); |
| assert_eq!((key, value), (&i, &i)); |
| iterator.advance().await.expect("failed to advance"); |
| } |
| assert!(iterator.get().is_none()); |
| } |
| } |