blob: 45b521ba3fde0f3fdbd6bc8621375ff422bee1f7 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
errors::FxfsError,
object_handle::{ObjectHandle, ReadObjectHandle},
},
anyhow::{anyhow, ensure, Error},
event_listener::{Event, EventListener},
std::{
ops::Deref,
sync::{Arc, Mutex},
},
storage_device::buffer::BufferFuture,
};
pub const CHUNK_SIZE: usize = 128 * 1024;
fn block_aligned_size(source: &impl ReadObjectHandle) -> usize {
let block_size = source.block_size() as usize;
let source_size = source.get_size() as usize;
source_size.checked_next_multiple_of(block_size).unwrap()
}
/// A reference to a chunk of data which is currently in the cache. The data will be held in the
/// cache as long as a reference to this chunk exists.
#[repr(transparent)]
#[derive(Clone)]
pub struct CachedChunk(Arc<Box<[u8]>>);
impl std::fmt::Debug for CachedChunk {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
f.debug_struct("CachedChunk").field("len", &self.len()).finish()
}
}
impl Deref for CachedChunk {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&**self.0
}
}
#[derive(Default, Debug)]
enum Chunk {
#[default]
Missing,
Pending,
Present(CachedChunk),
// The chunk has been marked for eviction, and on the next pass it will be deallocated, unless
// it is used before then (which returns it to Present).
Expired(Box<[u8]>),
}
impl Chunk {
// If the chunk is Present and has no remaining references, move it to Expired.
// If the chunk is Expired, move it to Missing, and return its data (so the caller can
// deallocate).
fn maybe_purge(&mut self) -> Option<Box<[u8]>> {
let this = std::mem::take(self);
match this {
Chunk::Expired(data) => Some(data),
Chunk::Present(chunk) => {
match Arc::try_unwrap(chunk.0) {
Ok(data) => *self = Chunk::Expired(data),
Err(chunk) => *self = Chunk::Present(CachedChunk(chunk)),
}
None
}
_ => {
*self = this;
None
}
}
}
}
/// A wrapper handle around a `ReadObjectHandle` which provides a memory cache for its contents.
/// Contents are fetched as needed and can be evicted (for example due to memory pressure).
pub struct CachingObjectHandle<S> {
source: S,
// Data is stored in separately accessible chunks. These can be independently loaded and
// evicted. The size of this array never changes.
chunks: Mutex<Vec<Chunk>>,
// Reads that are waiting on another read to load a chunk will wait on this event. There's only
// 1 event for all chunks so a read may receive a notification for a different chunk than the
// one it's waiting for and need to re-wait. It's rare for multiple chunks to be loading at once
// and even rarer for a read to be waiting on a chunk to be loaded while another chunk is also
// loading.
event: Event,
}
// SAFETY: Only `buffer` isn't Sync. Access to `buffer` is synchronized with `chunk_states`.
unsafe impl<S> Sync for CachingObjectHandle<S> {}
impl<S: ReadObjectHandle> CachingObjectHandle<S> {
pub fn new(source: S) -> Self {
let block_size = source.block_size() as usize;
assert!(CHUNK_SIZE % block_size == 0);
let aligned_size = block_aligned_size(&source);
let chunk_count = aligned_size.div_ceil(CHUNK_SIZE);
let mut chunks = Vec::<Chunk>::new();
chunks.resize_with(chunk_count, Default::default);
Self { source, chunks: Mutex::new(chunks), event: Event::new() }
}
/// Returns a reference to the chunk (up to `CHUNK_SIZE` bytes) containing `offset`. If the
/// data is already cached, this does not require reading from `source`.
/// `offset` must be less than the size of `source`.
pub async fn read(&self, offset: usize) -> Result<CachedChunk, Error> {
ensure!(offset < self.source.get_size() as usize, FxfsError::OutOfRange);
let chunk_num = offset / CHUNK_SIZE;
enum Action {
Wait(EventListener),
Load,
}
loop {
let action = {
let mut chunks = self.chunks.lock().unwrap();
match std::mem::take(&mut chunks[chunk_num]) {
Chunk::Missing => {
chunks[chunk_num] = Chunk::Pending;
Action::Load
}
Chunk::Pending => {
chunks[chunk_num] = Chunk::Pending;
Action::Wait(self.event.listen())
}
Chunk::Present(cached_chunk) => {
chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
return Ok(cached_chunk);
}
Chunk::Expired(data) => {
let cached_chunk = CachedChunk(Arc::new(data));
chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
return Ok(cached_chunk);
}
}
};
match action {
Action::Wait(listener) => {
listener.await;
}
Action::Load => {
// If this future is dropped or reading fails then put the chunk back into the
// `Missing` state.
let drop_guard = scopeguard::guard((), |_| {
{
let mut chunks = self.chunks.lock().unwrap();
debug_assert!(matches!(chunks[chunk_num], Chunk::Pending));
chunks[chunk_num] = Chunk::Missing;
}
self.event.notify(usize::MAX);
});
let read_start = chunk_num * CHUNK_SIZE;
let len =
std::cmp::min(read_start + CHUNK_SIZE, self.source.get_size() as usize)
- read_start;
let aligned_len =
std::cmp::min(read_start + CHUNK_SIZE, block_aligned_size(&self.source))
- read_start;
let mut read_buf = self.source.allocate_buffer(aligned_len).await;
let amount_read =
self.source.read(read_start as u64, read_buf.as_mut()).await?;
ensure!(amount_read >= len, anyhow!(FxfsError::Internal).context("Short read"));
tracing::debug!("COH {}: Read {len}@{read_start}", self.source.object_id());
let data = Vec::from(&read_buf.as_slice()[..len]).into_boxed_slice();
let cached_chunk = CachedChunk(Arc::new(data));
{
let mut chunks = self.chunks.lock().unwrap();
debug_assert!(matches!(chunks[chunk_num], Chunk::Pending));
chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
}
self.event.notify(usize::MAX);
scopeguard::ScopeGuard::into_inner(drop_guard);
return Ok(cached_chunk);
}
}
}
}
/// Purges unused extents, freeing unused memory. This follows a second-chance algorithm:
/// unused extents will be marked for purging the first time this is called, and if they are not
/// used again by the next time this is called, they will be deallocated.
/// This is intended to be run regularly, e.g. on a timer.
pub fn purge(&self) {
// Deallocate out of the lock scope, so we don't hold up readers.
let mut to_deallocate = vec![];
let mut chunks = self.chunks.lock().unwrap();
for chunk in chunks.iter_mut() {
if let Some(data) = chunk.maybe_purge() {
to_deallocate.push(data);
}
}
tracing::debug!(
"COH {}: Purging {} cached chunks ({} bytes)",
self.source.object_id(),
to_deallocate.len(),
to_deallocate.len() * CHUNK_SIZE
);
}
}
impl<S: ReadObjectHandle> ObjectHandle for CachingObjectHandle<S> {
fn set_trace(&self, v: bool) {
self.source.set_trace(v);
}
fn object_id(&self) -> u64 {
self.source.object_id()
}
fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
self.source.allocate_buffer(size)
}
fn block_size(&self) -> u64 {
self.source.block_size()
}
}
#[cfg(test)]
mod tests {
use {
super::{CachingObjectHandle, CHUNK_SIZE},
crate::object_handle::{ObjectHandle, ReadObjectHandle},
anyhow::{anyhow, ensure, Error},
async_trait::async_trait,
event_listener::Event,
std::sync::{
atomic::{AtomicBool, AtomicU8, Ordering},
Arc,
},
storage_device::{
buffer::{BufferFuture, MutableBufferRef},
fake_device::FakeDevice,
Device,
},
};
// Fills a buffer with a pattern seeded by counter.
fn fill_buf(buf: &mut [u8], counter: u8) {
for (i, chunk) in buf.chunks_exact_mut(2).enumerate() {
chunk[0] = counter;
chunk[1] = i as u8;
}
}
// Returns a buffer filled with fill_buf.
fn make_buf(counter: u8, size: usize) -> Vec<u8> {
let mut buf = vec![0; size];
fill_buf(&mut buf, counter);
buf
}
struct FakeSource {
device: Arc<dyn Device>,
size: usize,
started: AtomicBool,
allow_reads: AtomicBool,
wake: Event,
counter: AtomicU8,
}
impl FakeSource {
// `device` is only used to provide allocate_buffer; reads don't go to the device.
fn new(device: Arc<dyn Device>, size: usize) -> Self {
FakeSource {
started: AtomicBool::new(false),
allow_reads: AtomicBool::new(true),
size,
wake: Event::new(),
device,
counter: AtomicU8::new(1),
}
}
fn start(&self) {
self.started.store(true, Ordering::SeqCst);
self.wake.notify(usize::MAX);
}
// Toggle whether reads from source are allowed. If an uncached read occurs while this was
// set to false, the test panics.
fn allow_reads(&self, allow: bool) {
self.allow_reads.store(allow, Ordering::SeqCst);
}
async fn wait_for_start(&self) {
while !self.started.load(Ordering::SeqCst) {
let listener = self.wake.listen();
if self.started.load(Ordering::SeqCst) {
break;
}
listener.await;
}
}
}
#[async_trait]
impl ReadObjectHandle for FakeSource {
async fn read(&self, _offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
ensure!(self.allow_reads.load(Ordering::SeqCst), anyhow!("Received unexpected read"));
let counter = self.counter.fetch_add(1, Ordering::Relaxed);
self.wait_for_start().await;
fill_buf(buf.as_mut_slice(), counter);
Ok(buf.len())
}
fn get_size(&self) -> u64 {
self.size as u64
}
}
impl ObjectHandle for FakeSource {
fn object_id(&self) -> u64 {
unreachable!();
}
fn block_size(&self) -> u64 {
self.device.block_size().into()
}
fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
self.device.allocate_buffer(size)
}
}
#[fuchsia::test]
async fn test_read_with_missing_chunk() {
let device = Arc::new(FakeDevice::new(1024, 512));
let source = FakeSource::new(device, 4096);
source.start();
let caching_object_handle = CachingObjectHandle::new(source);
let chunk = caching_object_handle.read(0).await.unwrap();
assert_eq!(&*chunk, make_buf(1, 4096));
}
#[fuchsia::test]
async fn test_read_with_present_chunk() {
let device = Arc::new(FakeDevice::new(1024, 512));
let source = FakeSource::new(device, 4096);
source.start();
let caching_object_handle = CachingObjectHandle::new(source);
let expected = make_buf(1, 4096);
let chunk = caching_object_handle.read(0).await.unwrap();
assert_eq!(&*chunk, expected);
// The chunk was already populated so this read receives the same value as the above read.
let chunk = caching_object_handle.read(0).await.unwrap();
assert_eq!(&*chunk, expected);
}
#[fuchsia::test]
async fn test_read_with_pending_chunk() {
let device = Arc::new(FakeDevice::new(1024, 512));
let source = FakeSource::new(device, 8192);
let caching_object_handle = CachingObjectHandle::new(source);
// The two reads should target the same chunk.
let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
let mut read_fut2 = std::pin::pin!(caching_object_handle.read(4096));
// The first future will transition the chunk from `Missing` to `Pending` and then wait
// on the source.
assert!(futures::poll!(&mut read_fut1).is_pending());
// The second future will wait on the event.
assert!(futures::poll!(&mut read_fut2).is_pending());
caching_object_handle.source.start();
// Even though the source is ready the second future can't make progress.
assert!(futures::poll!(&mut read_fut2).is_pending());
// The first future reads from the source, transition the chunk from `Pending` to
// `Present`, and then notifies the event.
let expected = make_buf(1, 8192);
assert_eq!(&*read_fut1.await.unwrap(), expected);
// The event has been notified and the second future can now complete.
assert_eq!(&*read_fut2.await.unwrap(), expected);
}
#[fuchsia::test]
async fn test_read_with_notification_for_other_chunk() {
let device = Arc::new(FakeDevice::new(1024, 512));
let source = FakeSource::new(device, CHUNK_SIZE + 4096);
let caching_object_handle = CachingObjectHandle::new(source);
let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
let mut read_fut2 = std::pin::pin!(caching_object_handle.read(CHUNK_SIZE));
let mut read_fut3 = std::pin::pin!(caching_object_handle.read(0));
// The first and second futures will transition their chunks from `Missing` to `Pending`
// and then wait on the source.
assert!(futures::poll!(&mut read_fut1).is_pending());
assert!(futures::poll!(&mut read_fut2).is_pending());
// The third future will wait on the event.
assert!(futures::poll!(&mut read_fut3).is_pending());
caching_object_handle.source.start();
// Even though the source is ready the third future can't make progress.
assert!(futures::poll!(&mut read_fut3).is_pending());
// The second future will read from the source and notify the event.
assert_eq!(&*read_fut2.await.unwrap(), make_buf(2, 4096));
// The event was notified but the first chunk is still `Pending` so the third future
// resumes waiting.
assert!(futures::poll!(&mut read_fut3).is_pending());
// The first future will read from the source, transition the first chunk to `Present`,
// and notify the event.
let expected = make_buf(1, CHUNK_SIZE);
assert_eq!(&*read_fut1.await.unwrap(), expected);
// The first chunk is now present so the third future can complete.
assert_eq!(&*read_fut3.await.unwrap(), expected);
}
#[fuchsia::test]
async fn test_read_with_dropped_future() {
let device = Arc::new(FakeDevice::new(1024, 512));
let source = FakeSource::new(device, 4096);
let caching_object_handle = CachingObjectHandle::new(source);
let mut read_fut2 = std::pin::pin!(caching_object_handle.read(0));
{
let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
// The first future will transition the chunk from `Missing` to `Pending` and then
// wait on the source.
assert!(futures::poll!(&mut read_fut1).is_pending());
// The second future will wait on the event.
assert!(futures::poll!(&mut read_fut2).is_pending());
caching_object_handle.source.start();
// Even though the source is ready the second future can't make progress.
assert!(futures::poll!(&mut read_fut2).is_pending());
}
// The first future was dropped which transitioned the chunk from `Pending` to `Missing`
// and notified the event. When the second future is polled it transitions the chunk
// from `Missing` back to `Pending`, reads from the source, and then transitions the
// chunk to `Present`.
assert_eq!(&*read_fut2.await.unwrap(), make_buf(2, 4096));
}
#[fuchsia::test]
async fn test_read_past_end_of_source() {
let device = Arc::new(FakeDevice::new(1024, 512));
let source = FakeSource::new(device, 300);
source.start();
let caching_object_handle = CachingObjectHandle::new(source);
caching_object_handle.read(500).await.expect_err("Read should fail");
}
#[fuchsia::test]
async fn test_read_to_end_of_source() {
let device = Arc::new(FakeDevice::new(1024, 512));
const SOURCE_SIZE: usize = 300;
let source = FakeSource::new(device, SOURCE_SIZE);
source.start();
let caching_object_handle = CachingObjectHandle::new(source);
let chunk = caching_object_handle.read(0).await.unwrap();
assert_eq!(&*chunk, make_buf(1, SOURCE_SIZE));
}
#[fuchsia::test]
async fn test_chunk_purging() {
let device = Arc::new(FakeDevice::new(1024, 512));
let source = Arc::new(FakeSource::new(device, CHUNK_SIZE + 4096));
source.start();
let caching_object_handle =
CachingObjectHandle::new(source.clone() as Arc<dyn ReadObjectHandle>);
let _chunk1 = caching_object_handle.read(0).await.unwrap();
// Immediately drop the second chunk.
caching_object_handle.read(CHUNK_SIZE).await.unwrap();
source.allow_reads(false);
// The first purge should not evict the second chunk yet, and the read should save the chunk
// from being evicted by the next purge too.
caching_object_handle.purge();
caching_object_handle.read(0).await.unwrap();
caching_object_handle.read(CHUNK_SIZE).await.unwrap();
caching_object_handle.purge();
caching_object_handle.read(0).await.unwrap();
caching_object_handle.read(CHUNK_SIZE).await.unwrap();
// Purging twice should result in evicting the second chunk.
caching_object_handle.purge();
caching_object_handle.purge();
caching_object_handle.read(0).await.unwrap();
caching_object_handle.read(CHUNK_SIZE).await.expect_err("Chunk was not purged");
}
}