blob: 0d99513a2db64a7ca98aa0f29cd68b5b49c5092f [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
async_enter,
errors::FxfsError,
object_handle::ReadObjectHandle,
round::{round_down, round_up},
},
anyhow::Error,
async_trait::async_trait,
async_utils::event::Event,
either::Either::{Left, Right},
futures::{
future::try_join_all, pin_mut, stream::futures_unordered::FuturesUnordered, try_join,
TryStreamExt,
},
pin_project::{pin_project, pinned_drop},
slab::Slab,
std::{
cell::UnsafeCell, cmp::Ordering, collections::BTreeSet, convert::TryInto, ops::Range,
pin::Pin, sync::Mutex,
},
};
// This is unrelated to any system page size; this is merely the size of the pages used by
// MemDataBuffer. These pages are similar in concept but completely independent of any system
// pages.
const PAGE_SIZE: u64 = 4096;
// Reads smaller than this are rounded up to this size if possible.
const READ_SIZE: u64 = 128 * 1024;
// StackList holds a doubly linked list of structures on the stack. After pushing nodes onto the
// list, the caller *must* call erase_node before or as the node is dropped.
struct StackListNodePtr<T>(*const T);
impl<T> Copy for StackListNodePtr<T> {}
impl<T> Clone for StackListNodePtr<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T> Default for StackListNodePtr<T> {
fn default() -> Self {
Self(std::ptr::null())
}
}
unsafe impl<T> Send for StackListNodePtr<T> {}
struct StackListChain<T> {
// Access to previous and next is safe provided we have mutable access to the list.
prev: UnsafeCell<StackListNodePtr<T>>,
next: UnsafeCell<StackListNodePtr<T>>,
}
impl<T> Default for StackListChain<T> {
fn default() -> Self {
Self {
prev: UnsafeCell::new(StackListNodePtr::default()),
next: UnsafeCell::new(StackListNodePtr::default()),
}
}
}
// The list contains just the head pointer.
struct StackList<T>(StackListNodePtr<T>);
impl<T: AsRef<StackListChain<T>>> StackList<T> {
fn new() -> Self {
Self(StackListNodePtr::default())
}
// Pushes node onto the list. After doing this, erase_node *must* be called before the node
// is dropped (which is why this is unsafe).
unsafe fn push_front(&mut self, node: Pin<&mut T>) {
let node_ptr = StackListNodePtr(&*node);
let chain = (*node_ptr.0).as_ref();
*chain.next.get() = std::mem::replace(&mut self.0, node_ptr);
if let Some(next) = (*chain.next.get()).0.as_ref() {
*next.as_ref().prev.get() = node_ptr;
}
}
fn erase_node(&mut self, node: &T) {
unsafe {
let chain = node.as_ref();
let prev = std::mem::take(&mut *chain.prev.get());
if let Some(next) = (*chain.next.get()).0.as_ref() {
*next.as_ref().prev.get() = prev;
}
let next = std::mem::take(&mut *chain.next.get());
if let Some(prev) = prev.0.as_ref() {
*prev.as_ref().next.get() = next;
} else if self.0 .0 == node {
self.0 = next;
}
}
}
fn iter(&self) -> StackListIter<'_, T> {
StackListIter { list: self, last_node: None }
}
}
impl<T> Drop for StackList<T> {
fn drop(&mut self) {
assert!(self.0 .0.is_null());
}
}
struct StackListIter<'a, T: AsRef<StackListChain<T>>> {
list: &'a StackList<T>,
last_node: Option<&'a T>,
}
impl<'a, T: AsRef<StackListChain<T>>> Iterator for StackListIter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
unsafe {
match self.last_node {
None => self.last_node = self.list.0 .0.as_ref(),
Some(node) => self.last_node = (*node.as_ref().next.get()).0.as_ref(),
}
self.last_node
}
}
}
#[must_use]
fn copy_out(source_buf: &[u8], offset: u64, buf: &mut [u8]) -> usize {
if offset < source_buf.len() as u64 {
let range = offset as usize..std::cmp::min(offset as usize + buf.len(), source_buf.len());
let len = range.end - range.start;
buf[..len].copy_from_slice(&source_buf[range]);
len
} else {
0
}
}
/// A readable, writable memory buffer that is not necessarily mapped into memory.
/// Mainly serves as a portable abstraction over a VMO (see VmoDataBuffer).
#[async_trait]
pub trait DataBuffer: Send + Sync {
/// raw_read reads from the data buffer without reading any content from a data source if it is
/// not present. Any data that is not present will likely be returned as zeroes, but the caller
/// should not rely on that behaviour; this function is intended to be used where the caller
/// knows the data is present.
fn raw_read(&self, offset: u64, buf: &mut [u8]);
/// Writes to the buffer. If the writes are unaligned, data will be read from the source to
/// complete any unaligned pages.
async fn write(
&self,
offset: u64,
buf: &[u8],
source: &dyn ReadObjectHandle,
) -> Result<(), Error>;
fn size(&self) -> u64;
async fn resize(&self, size: u64);
/// Read from the buffer but supply content from source where the data is not present.
async fn read(
&self,
offset: u64,
buf: &mut [u8],
source: &dyn ReadObjectHandle,
) -> Result<usize, Error>;
}
/// A default implementation of a DataBuffer.
pub struct MemDataBuffer(Mutex<Inner>);
struct Inner {
size: u64,
buf: Vec<u8>,
// Records which pages are present.
pages: BTreeSet<BoxedPage>,
// In-flight readers.
readers: Slab<ReadContext>,
// Each read has an associated ReadKeys instance which is stored on the stack. So that we can
// correctly handle truncation we need to be able to get to the ReadKeys instances, so they're
// added to a doubly linked list here.
read_keys_list: StackList<ReadKeys>,
}
impl Inner {
// Make all pages within the specified range as present.
fn mark_present(&mut self, range: Range<u64>) {
let mut new_pages = Vec::new();
let mut offset = range.start - range.start % PAGE_SIZE;
for page in self.pages.range(offset..range.end) {
// Preempt any reads: this is done by setting the read_key for the page to usize::MAX.
let page = unsafe { page.page_mut() };
page.read_key = usize::MAX;
// Fill in any missing pages.
if page.offset > offset {
for offset in (offset..page.offset).step_by(PAGE_SIZE as usize) {
new_pages.push(BoxedPage::new(offset, usize::MAX));
}
}
offset = page.offset + PAGE_SIZE;
}
for page in new_pages {
assert!(self.pages.insert(page));
}
// Add any pages for the end of the range.
for offset in (offset..range.end).step_by(PAGE_SIZE as usize) {
assert!(self.pages.insert(BoxedPage::new(offset, usize::MAX)));
}
}
fn resize(&mut self, size: u64) {
let aligned_size = round_up(size, PAGE_SIZE).unwrap() as usize;
self.buf.resize(aligned_size, 0u8);
let old_size = self.size;
self.size = size;
if size < old_size {
let end = std::cmp::min(old_size as usize, aligned_size);
self.buf[size as usize..end].fill(0);
let mut to_remove = Vec::new();
for page in self.pages.range(size..) {
to_remove.push(unsafe { page.page() }.offset());
}
for offset in to_remove {
self.pages.remove(&offset);
}
} else {
self.mark_present(old_size..aligned_size as u64);
}
for read_keys in self.read_keys_list.iter() {
let end_offset = read_keys.end_offset.get();
// Safe because we have &mut self.
unsafe {
if *end_offset > size {
*end_offset = size;
}
}
}
}
}
// Returns an page-aligned range and applies read-ahead. The range will not be extended past
// `limit`.
fn align_range(mut range: Range<u64>, block_size: u64, limit: u64) -> Range<u64> {
// Align the start to the page boundary rather than the block boundary because the preceding
// page might already be present.
range.start = round_down(range.start, PAGE_SIZE);
// We can align the end to the block boundary because we have `limit` which will prevent it from
// being extended to include a page that is already present.
range.end = round_up(range.end, block_size).unwrap();
if range.end - range.start < READ_SIZE {
range.end = range.start + READ_SIZE;
}
if range.end > limit {
range.end = limit;
}
range
}
struct BoxedPage(Box<PageCell>);
impl BoxedPage {
fn new(offset: u64, read_key: usize) -> Self {
BoxedPage(Box::new(PageCell(UnsafeCell::new(Page::new(offset, read_key)))))
}
unsafe fn page(&self) -> &Page {
self.0.page()
}
#[allow(clippy::mut_from_ref)] // TODO(fxbug.dev/95027)
unsafe fn page_mut(&self) -> &mut Page {
self.0.page_mut()
}
}
impl Ord for BoxedPage {
fn cmp(&self, other: &Self) -> Ordering {
unsafe { self.page().cmp(other.page()) }
}
}
impl PartialOrd for BoxedPage {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Eq for BoxedPage {}
impl PartialEq for BoxedPage {
fn eq(&self, other: &Self) -> bool {
unsafe { self.page() == other.page() }
}
}
impl std::borrow::Borrow<u64> for BoxedPage {
fn borrow(&self) -> &u64 {
unsafe { &self.page().offset }
}
}
struct PageCell(UnsafeCell<Page>);
impl PageCell {
unsafe fn page(&self) -> &Page {
&*self.0.get()
}
#[allow(clippy::mut_from_ref)] // TODO(fxbug.dev/95027)
unsafe fn page_mut(&self) -> &mut Page {
&mut *self.0.get()
}
}
// TODO(fxbug.dev/96090): eventually, we should add some kind of LRU list which would chain through
// the pages.
struct Page {
offset: u64,
// When the page is being read, read_key is the slab key for ReadContext.
read_key: usize,
}
impl Page {
fn new(offset: u64, read_key: usize) -> Self {
Self { offset: offset, read_key }
}
fn offset(&self) -> u64 {
self.offset
}
fn is_reading(&self) -> bool {
self.read_key != usize::MAX
}
}
impl Ord for Page {
fn cmp(&self, other: &Self) -> Ordering {
self.offset().cmp(&other.offset())
}
}
impl PartialOrd for Page {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Eq for Page {}
impl PartialEq for Page {
fn eq(&self, other: &Self) -> bool {
self.offset() == other.offset()
}
}
// ReadContext allows other readers to wait on other readers that are reading the same range at the
// same time.
struct ReadContext {
range: Range<u64>,
wait_event: Option<Event>,
}
impl MemDataBuffer {
pub fn new(size: u64) -> Self {
Self(Mutex::new(Inner {
buf: vec![0u8; round_up(size, PAGE_SIZE).unwrap().try_into().unwrap()],
pages: BTreeSet::new(),
readers: Slab::new(),
size,
read_keys_list: StackList::new(),
}))
}
// Reads from the source starting at the provided offset. Calls `f` once the read is complete.
async fn read_some(
&self,
aligned_range: Range<u64>,
source: &dyn ReadObjectHandle,
read_key: usize,
f: impl FnOnce(&Inner),
) -> Result<(), Error> {
let mut read_buf =
source.allocate_buffer((aligned_range.end - aligned_range.start) as usize);
let amount = source.read(aligned_range.start, read_buf.as_mut()).await?;
read_buf.as_mut_slice()[amount..].fill(0);
let mut inner = self.0.lock().unwrap();
let Inner { pages, buf, readers, .. } = &mut *inner;
let read_buf = read_buf.as_slice();
let aligned_start = aligned_range.start;
for page in pages.range(aligned_range) {
let page = unsafe { page.page_mut() };
if page.read_key == read_key {
let read_buf_offset = (page.offset - aligned_start) as usize;
buf[page.offset as usize..page.offset as usize + PAGE_SIZE as usize]
.copy_from_slice(
&read_buf[read_buf_offset..read_buf_offset + PAGE_SIZE as usize],
);
page.read_key = usize::MAX;
}
}
readers.get(read_key).unwrap().wait_event.as_ref().map(|e| e.signal());
f(&*inner);
Ok(())
}
// Like `read_some` but copies out to `buf`.
async fn read_and_copy(
&self,
aligned_range: Range<u64>,
offset: u64,
source: &dyn ReadObjectHandle,
read_key: usize,
buf: &mut [u8],
) -> Result<(), Error> {
self.read_some(aligned_range, source, read_key, |inner| {
// Whilst we were reading, it's possible the buffer was truncated; copy_out will only
// copy out what we can. The read function will make sure that the correct value for
// the amount read is returned.
let _ = copy_out(&inner.buf, offset, buf);
})
.await
}
// Reads or waits for a page to be read and then calls `f`.
async fn read_page(
&self,
offset: u64,
source: &dyn ReadObjectHandle,
keys: ReadKeysPtr,
f: impl FnOnce(&Inner),
) -> Result<(), Error> {
loop {
let result = {
let mut inner_lock = self.0.lock().unwrap();
let inner = &mut *inner_lock; // So that we can split the borrow.
let mut read_keys = keys.get(&mut inner.read_keys_list);
if *read_keys.as_mut().project().end_offset.get_mut() <= offset {
return Ok(());
}
match inner.pages.get(&offset) {
None => {
// In this case, the page might have been pending a read but the
// read was dropped or it failed. Or the page could have been
// evicted. In that case, we must schedule a read. This won't
// be as efficient as the usual path because it is only issuing
// a read for a single page, but it should be rare enough that
// it doesn't matter.
let pages = &mut inner.pages;
let read_key = read_keys.new_read(
offset..offset + PAGE_SIZE,
&mut inner.readers,
|p| {
assert!(pages.insert(p));
},
);
Left(read_key)
}
Some(page) => {
let page = unsafe { page.page_mut() };
if page.is_reading() {
// If the page is pending a read, then we need to wait
// until the read has finished.
let read_key = page.read_key;
let read_context = inner.readers.get_mut(read_key).unwrap();
Right(
read_context
.wait_event
.get_or_insert_with(|| Event::new())
.wait_or_dropped(),
)
} else {
// The page is present and not pending a read so we can pass it to the
// callback.
f(inner);
return Ok(());
}
}
}
};
// With the lock dropped, we can either issue the read or wait for another
// read to finish...
match result {
Left(read_key) => {
let block_size = std::cmp::max(source.block_size(), PAGE_SIZE);
return self
.read_some(
round_down(offset, block_size)..offset + PAGE_SIZE,
source,
read_key,
f,
)
.await;
}
Right(event) => {
let _ = event.await;
}
}
}
}
}
#[async_trait]
impl DataBuffer for MemDataBuffer {
async fn write(
&self,
offset: u64,
buf: &[u8],
source: &dyn ReadObjectHandle,
) -> Result<(), Error> {
let aligned_start = offset - offset % PAGE_SIZE;
let end = offset.checked_add(buf.len() as u64).ok_or(FxfsError::TooBig)?;
let aligned_end = end - end % PAGE_SIZE;
if aligned_start != offset || aligned_end != end {
let read_keys = ReadKeys::new(self);
pin_mut!(read_keys);
let mut reads = Vec::new();
{
let mut inner = self.0.lock().unwrap();
let mut issue_read = |offset| {
reads.push(self.read_page(
offset,
source,
read_keys.as_mut().get_ptr(),
|_| {},
));
};
if aligned_start != offset && aligned_start < inner.size {
issue_read(aligned_start);
};
if aligned_end != end && aligned_end != offset && aligned_end < inner.size {
issue_read(aligned_end);
}
if !reads.is_empty() {
read_keys.as_mut().add_to_list(&mut inner.read_keys_list, end);
}
}
try_join_all(reads).await?;
}
let mut inner = self.0.lock().unwrap();
if end > inner.size {
inner.resize(end);
}
inner.buf[offset as usize..end as usize].copy_from_slice(buf);
inner.mark_present(offset..end);
Ok(())
}
fn size(&self) -> u64 {
self.0.lock().unwrap().size
}
async fn resize(&self, size: u64) {
self.0.lock().unwrap().resize(size);
}
fn raw_read(&self, offset: u64, buf: &mut [u8]) {
let inner = self.0.lock().unwrap();
buf.copy_from_slice(&inner.buf[offset as usize..offset as usize + buf.len()]);
}
async fn read(
&self,
offset: u64,
mut read_buf: &mut [u8],
source: &dyn ReadObjectHandle,
) -> Result<usize, Error> {
async_enter!("MemDataBuffer::read");
// A list of all the futures for any required reads.
let reads = FuturesUnordered::new();
// Tracks any reads that other tasks might be working on.
let pending_reads = FuturesUnordered::new();
// New pages that we might need.
let mut new_pages = Vec::new();
// Keep track of the keys for any reads we schedule. After read_keys has been inserted into
// read_keys_list, care must be taken to only access read_keys whilst holding the lock on
// inner.
let read_keys = ReadKeys::new(self);
pin_mut!(read_keys);
{
let mut inner = self.0.lock().unwrap();
let content_size = inner.size;
if offset >= content_size {
return Ok(0);
}
let aligned_size = round_up(content_size, PAGE_SIZE).unwrap();
let end_offset = if content_size - offset < read_buf.len() as u64 {
if aligned_size - offset < read_buf.len() as u64 {
read_buf = &mut read_buf[0..(aligned_size - offset) as usize];
}
content_size
} else {
offset + read_buf.len() as u64
};
read_keys.as_mut().add_to_list(&mut inner.read_keys_list, end_offset);
let Inner { pages, readers, buf, .. } = &mut *inner;
let mut last_offset = offset;
let block_size = std::cmp::max(source.block_size().into(), PAGE_SIZE);
let aligned_start = round_down(offset, block_size);
let mut readahead_limit = buf.len() as u64;
for page in pages.range(aligned_start..) {
let page = unsafe { page.page_mut() };
let page_offset = page.offset();
if page_offset + PAGE_SIZE <= last_offset {
// This is possible due to different filesystem and page size alignment.
continue;
}
if page_offset >= end_offset {
readahead_limit = page_offset;
break;
}
// Handle any gap between the last page we found and this one.
if page_offset > last_offset {
// Schedule a read for the gap.
let (head, tail) = read_buf.split_at_mut((page_offset - last_offset) as usize);
let page_range = round_down(last_offset, PAGE_SIZE)..page_offset;
let block_range = round_down(page_range.start, block_size)..page_range.end;
reads.push(self.read_and_copy(
block_range,
last_offset,
source,
read_keys.as_mut().new_read(page_range, readers, |p| new_pages.push(p)),
head,
));
read_buf = tail;
last_offset = page_offset;
}
let (head, tail) = read_buf.split_at_mut(std::cmp::min(
(page_offset + PAGE_SIZE - last_offset) as usize,
read_buf.len(),
));
if page.is_reading() {
pending_reads.push(self.read_page(
page_offset,
source,
read_keys.as_mut().get_ptr(),
move |inner| {
// Whilst we were reading, it's possible the buffer was truncated;
// copy_out will only copy out what we can. Later, we will make sure
// that the correct value for the amount read is returned.
let _ = copy_out(&inner.buf, last_offset, head);
},
));
} else {
let _ = copy_out(buf, last_offset, head);
}
read_buf = tail;
last_offset = page_offset + PAGE_SIZE;
}
// Handle the tail.
if !read_buf.is_empty() {
let page_range = align_range(
if last_offset == offset { aligned_start } else { last_offset }..end_offset,
block_size,
readahead_limit,
);
let block_range = round_down(page_range.start, block_size)..page_range.end;
reads.push(self.read_and_copy(
block_range,
last_offset,
source,
read_keys.as_mut().new_read(page_range, readers, |p| new_pages.push(p)),
read_buf,
));
}
for page in new_pages {
assert!(pages.insert(page));
}
}
try_join!(
async {
reads.try_collect().await?;
Result::<(), Error>::Ok(())
},
async {
pending_reads.try_collect().await?;
Result::<(), Error>::Ok(())
}
)?;
let _inner = self.0.lock().unwrap();
// Safe because we have taken the lock.
let end_offset = unsafe { *read_keys.end_offset.get() };
if end_offset > offset {
Ok((end_offset - offset) as usize)
} else {
Ok(0)
}
}
}
// ReadKeys is a structure that tracks in-flight reads and cleans them up if dropped.
#[pin_project(PinnedDrop, !Unpin)]
struct ReadKeys {
// Any reads that are taking place are chained together in a doubly linked list. The nodes
// are pinned to the stack. The chain field holds the previous and next pointers.
chain: StackListChain<ReadKeys>,
// Store a pointer back to MemDataBuffer so that we can tidy up when we are dropped. This is a
// pointer rather than a reference so that we don't have to transmute away the lifetimes when we
// insert and remove from the stack list. It's safe because ReadKeys is always created on the
// stack when we have a reference to MemDataBuffer so the reference will outlive ReadKeys and it
// will be pinned.
buf: *const MemDataBuffer,
// A vector of keys which refer to elements in the readers slab.
keys: Vec<usize>,
// We record the end offset of the read so that if the buffer is truncated whilst we are issuing
// reads, we can detect this and truncate the read result. It is safe to modify this field so
// long as the lock that guards Inner is held.
end_offset: UnsafeCell<u64>,
}
unsafe impl Send for ReadKeys {}
impl ReadKeys {
fn new(buf: &MemDataBuffer) -> Self {
ReadKeys {
chain: Default::default(),
buf,
keys: Vec::new(),
end_offset: UnsafeCell::new(0),
}
}
// Returns a new read key for a read for `range`. It will append new pages for the read that
// will get properly cleaned up if dropped.
fn new_read(
self: Pin<&mut Self>,
aligned_range: Range<u64>,
readers: &mut Slab<ReadContext>,
mut new_page_fn: impl FnMut(BoxedPage),
) -> usize {
let read_key =
readers.insert(ReadContext { range: aligned_range.clone(), wait_event: None });
self.project().keys.push(read_key);
for offset in aligned_range.step_by(PAGE_SIZE as usize) {
new_page_fn(BoxedPage::new(offset, read_key));
}
read_key
}
fn add_to_list(mut self: Pin<&mut Self>, list: &mut StackList<ReadKeys>, end_offset: u64) {
*self.as_mut().project().end_offset.get_mut() = end_offset;
// This is safe because we remove ourselves from the list in drop.
unsafe {
list.push_front(self);
}
}
fn get_ptr(self: Pin<&mut Self>) -> ReadKeysPtr {
// This is safe because we only use this pointer via the get method below.
ReadKeysPtr(unsafe { self.get_unchecked_mut() })
}
}
// Holds a reference to a ReadKeys instance which can be dereferenced with a mutable borrow of the
// list.
#[derive(Clone, Copy)]
struct ReadKeysPtr(*mut ReadKeys);
unsafe impl Send for ReadKeysPtr {}
unsafe impl Sync for ReadKeysPtr {}
impl ReadKeysPtr {
// Allow a dereference provided we have a mutable borrow for the list which ensures exclusive
// access.
fn get<'b>(&self, _list: &'b mut StackList<ReadKeys>) -> Pin<&'b mut ReadKeys> {
// Safe because we are taking a mutable borrow on the list.
unsafe { Pin::new_unchecked(&mut *self.0) }
}
}
#[pinned_drop]
impl PinnedDrop for ReadKeys {
fn drop(self: Pin<&mut Self>) {
// This is safe because ReadKeys is kept on the stack and we always have &MemDataBuffer when
// we create ReadKeys.
let buf = unsafe { &*self.buf };
let mut inner = buf.0.lock().unwrap();
for read_key in &self.keys {
let range = inner.readers.remove(*read_key).range;
let mut to_remove = Vec::new();
for page in inner.pages.range(range) {
// It's possible that the read was pre-empted so we must check that the key matches.
if unsafe { page.page().read_key } == *read_key {
to_remove.push(unsafe { page.page().offset() });
}
}
for offset in to_remove {
inner.pages.remove(&offset);
}
}
inner.read_keys_list.erase_node(&self);
}
}
impl AsRef<StackListChain<ReadKeys>> for ReadKeys {
fn as_ref(&self) -> &StackListChain<ReadKeys> {
&self.chain
}
}
#[cfg(test)]
mod tests {
use {
super::{DataBuffer, MemDataBuffer, PAGE_SIZE, READ_SIZE},
crate::{
errors::FxfsError,
object_handle::{ObjectHandle, ReadObjectHandle},
},
anyhow::Error,
async_trait::async_trait,
async_utils::event::Event,
fuchsia_async as fasync,
futures::{future::poll_fn, join, FutureExt},
std::{
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
task::Poll,
},
storage_device::{
buffer::{Buffer, MutableBufferRef},
fake_device::FakeDevice,
Device,
},
};
// Fills a buffer with a pattern seeded by counter.
fn fill_buf(buf: &mut [u8], counter: u8) {
for (i, chunk) in buf.chunks_exact_mut(2).enumerate() {
chunk[0] = counter;
chunk[1] = i as u8;
}
}
// Returns a buffer filled with fill_buf.
fn make_buf(counter: u8, size: u64) -> Vec<u8> {
let mut buf = vec![0; size as usize];
fill_buf(&mut buf, counter);
buf
}
struct FakeSource {
device: Arc<dyn Device>,
go: Event,
counter: AtomicU8,
}
impl FakeSource {
// `device` is only used to provide allocate_buffer; reads don't go to the device.
fn new(device: Arc<dyn Device>) -> Self {
FakeSource { go: Event::new(), device, counter: AtomicU8::new(1) }
}
}
#[async_trait]
impl ReadObjectHandle for FakeSource {
async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
assert_eq!(offset % PAGE_SIZE, 0);
assert_eq!(buf.len() % PAGE_SIZE as usize, 0);
let _ = self.go.wait_or_dropped().await;
fill_buf(buf.as_mut_slice(), self.counter.fetch_add(1, Ordering::Relaxed));
Ok(buf.len())
}
}
#[async_trait]
impl ObjectHandle for FakeSource {
fn object_id(&self) -> u64 {
unreachable!();
}
fn get_size(&self) -> u64 {
unreachable!();
}
fn block_size(&self) -> u64 {
self.device.block_size().into()
}
fn allocate_buffer(&self, size: usize) -> Buffer<'_> {
self.device.allocate_buffer(size)
}
}
#[fasync::run_singlethreaded(test)]
async fn test_sequential_reads() {
let data_buf = MemDataBuffer::new(100 * PAGE_SIZE);
let device = Arc::new(FakeDevice::new(100, 8192));
let mut buf = [0; PAGE_SIZE as usize];
let source = FakeSource::new(device.clone());
source.go.signal();
data_buf.read(0, &mut buf, &source).await.expect("read failed");
assert_eq!(&buf, make_buf(1, PAGE_SIZE).as_slice());
data_buf.read(0, &mut buf, &source).await.expect("read failed");
assert_eq!(&buf, make_buf(1, PAGE_SIZE).as_slice());
}
#[fasync::run_singlethreaded(test)]
async fn test_parallel_reads() {
let data_buf = MemDataBuffer::new(100 * PAGE_SIZE);
let device = Arc::new(FakeDevice::new(100, PAGE_SIZE as u32));
let source = FakeSource::new(device.clone());
let mut buf = [0; PAGE_SIZE as usize];
let mut buf2 = [0; PAGE_SIZE as usize];
join!(
async {
data_buf.read(PAGE_SIZE, buf.as_mut(), &source).await.expect("read failed");
},
async {
data_buf.read(PAGE_SIZE, buf2.as_mut(), &source).await.expect("read failed");
},
async {
fasync::Timer::new(std::time::Duration::from_millis(100)).await;
source.go.signal();
}
);
assert_eq!(&buf, make_buf(1, PAGE_SIZE).as_slice());
assert_eq!(&buf2, make_buf(1, PAGE_SIZE).as_slice());
}
#[fasync::run_singlethreaded(test)]
async fn test_unaligned_write() {
let data_buf = MemDataBuffer::new(100 * PAGE_SIZE);
let device = Arc::new(FakeDevice::new(100, 8192));
let source = FakeSource::new(device.clone());
source.go.signal();
let mut buf = [0; 3 * PAGE_SIZE as usize];
buf.fill(67);
data_buf
.write(PAGE_SIZE - 10, &buf[..PAGE_SIZE as usize + 20], &source)
.await
.expect("write failed");
data_buf.read(0, &mut buf, &source).await.expect("read failed");
// There are two combinations depending on which read goes first.
let mut expected1 = make_buf(1, PAGE_SIZE - 10);
expected1.extend(&[67; PAGE_SIZE as usize + 20]);
expected1.extend(&make_buf(2, PAGE_SIZE)[..PAGE_SIZE as usize - 10]);
let mut expected2 = make_buf(1, PAGE_SIZE - 10);
expected2.extend(&[67; PAGE_SIZE as usize + 20]);
expected2.extend(&make_buf(2, PAGE_SIZE)[10..]);
assert!(&buf[..] == expected1 || &buf[..] == expected2);
}
#[fasync::run_singlethreaded(test)]
async fn test_dropped_read() {
let data_buf = MemDataBuffer::new(100 * PAGE_SIZE);
let device = Arc::new(FakeDevice::new(100, 8192));
let source = FakeSource::new(device.clone());
let mut buf = [0; PAGE_SIZE as usize];
let mut buf2 = [0; READ_SIZE as usize * 2];
let mut read_fut = data_buf.read(0, buf.as_mut(), &source);
let mut read_fut2 = data_buf.read(0, buf2.as_mut(), &source);
poll_fn(|ctx| {
assert!(read_fut.poll_unpin(ctx).is_pending());
source.go.signal();
assert!(read_fut2.poll_unpin(ctx).is_pending());
Poll::Ready(())
})
.await;
// If we now drop the first future, the second future should complete.
std::mem::drop(read_fut);
assert_eq!(read_fut2.await.expect("read failed"), READ_SIZE as usize * 2);
assert_eq!(&buf2[0..PAGE_SIZE as usize], make_buf(2, PAGE_SIZE).as_slice());
// The tail should not have been waiting for the first read.
assert_eq!(&buf2[READ_SIZE as usize..], make_buf(1, READ_SIZE).as_slice());
}
#[fasync::run_singlethreaded(test)]
async fn test_read_with_gap() {
let data_buf = MemDataBuffer::new(100 * PAGE_SIZE);
let device = Arc::new(FakeDevice::new(100, PAGE_SIZE as u32));
let source = FakeSource::new(device.clone());
source.go.signal();
let mut buf = [0; PAGE_SIZE as usize * 2];
assert_eq!(
data_buf
.read(PAGE_SIZE, &mut buf[..PAGE_SIZE as usize], &source)
.await
.expect("read failed"),
PAGE_SIZE as usize
);
assert_eq!(&buf[..PAGE_SIZE as usize], make_buf(1, PAGE_SIZE).as_slice());
assert_eq!(
data_buf.read(0, &mut buf, &source).await.expect("read failed"),
PAGE_SIZE as usize * 2
);
assert_eq!(&buf[..PAGE_SIZE as usize], make_buf(2, PAGE_SIZE).as_slice());
assert_eq!(&buf[PAGE_SIZE as usize..], make_buf(1, PAGE_SIZE).as_slice());
}
#[fasync::run_singlethreaded(test)]
async fn test_read_unaligned() {
let data_buf = MemDataBuffer::new(100 * PAGE_SIZE);
let device = Arc::new(FakeDevice::new(100, 8192));
let source = FakeSource::new(device.clone());
let mut buf = [0; 10];
let mut buf2 = [0; 10];
let mut read_fut = data_buf.read(10, &mut buf, &source);
let mut read_fut2 = data_buf.read(10, &mut buf2, &source);
poll_fn(|ctx| {
assert!(read_fut.poll_unpin(ctx).is_pending());
source.go.signal();
assert!(read_fut2.poll_unpin(ctx).is_pending());
Poll::Ready(())
})
.await;
assert_eq!(read_fut.await.expect("read failed"), 10 as usize);
assert_eq!(read_fut2.await.expect("read failed"), 10 as usize);
// The read should get aligned to the block size
let mut expected = make_buf(1, READ_SIZE);
assert_eq!(&buf, &expected[10..20]);
assert_eq!(&buf2, &expected[10..20]);
assert_eq!(data_buf.read(0, &mut buf, &source).await.expect("read failed"), 10);
assert_eq!(&buf, &expected[..10]);
// Issue an unaligned read that is big enough to trigger another read (so this needs
// to exceed the read-ahead).
let mut buf = [0; READ_SIZE as usize];
assert_eq!(
data_buf.read(10, &mut buf, &source).await.expect("read failed"),
READ_SIZE as usize
);
// The above should have triggered another read.
expected.extend(&make_buf(2, PAGE_SIZE));
assert_eq!(&buf, &expected[10..10 + READ_SIZE as usize]);
}
#[fasync::run_singlethreaded(test)]
async fn test_write_too_big() {
let data_buf = MemDataBuffer::new(100 * PAGE_SIZE);
let device = Arc::new(FakeDevice::new(100, 8192));
let source = FakeSource::new(device.clone());
let buf = [0; PAGE_SIZE as usize];
assert!(FxfsError::TooBig
.matches(&data_buf.write(u64::MAX, &buf, &source).await.expect_err("write succeeded")));
}
#[fasync::run_singlethreaded(test)]
async fn test_read_and_truncate() {
let data_buf = MemDataBuffer::new(100 * PAGE_SIZE);
let device = Arc::new(FakeDevice::new(100, PAGE_SIZE as u32));
let source = FakeSource::new(device.clone());
let mut buf = [0; PAGE_SIZE as usize];
let mut buf2 = [0; PAGE_SIZE as usize * 2];
let mut buf3 = [0; PAGE_SIZE as usize * 2];
let mut read_fut = data_buf.read(0, buf.as_mut(), &source);
let mut read_fut2 = data_buf.read(0, buf2.as_mut(), &source);
let mut read_fut3 = data_buf.read(PAGE_SIZE, buf3.as_mut(), &source);
// Poll the futures once.
poll_fn(|ctx| {
assert!(read_fut.poll_unpin(ctx).is_pending());
assert!(read_fut2.poll_unpin(ctx).is_pending());
assert!(read_fut3.poll_unpin(ctx).is_pending());
Poll::Ready(())
})
.await;
// Truncate the buffer.
data_buf.resize(20).await;
data_buf.resize(PAGE_SIZE + 10).await;
// Let the reads continue.
source.go.signal();
// All should return the truncated size.
assert_eq!(read_fut.await.expect("read failed"), 20);
assert_eq!(read_fut2.await.expect("read failed"), 20);
assert_eq!(read_fut3.await.expect("read failed"), 0);
// Another read should return the current size.
assert_eq!(
data_buf.read(PAGE_SIZE - 1, buf.as_mut(), &source).await.expect("read failed"),
11
);
}
#[fasync::run_singlethreaded(test)]
async fn test_block_unaligned_read() {
let data_buf = MemDataBuffer::new(100 * PAGE_SIZE);
let device = Arc::new(FakeDevice::new(100, 8192));
let source = FakeSource::new(device.clone());
source.go.signal();
let mut buf = [0; PAGE_SIZE as usize];
assert_eq!(
data_buf.read(PAGE_SIZE, buf.as_mut(), &source).await.expect("read failed"),
buf.len()
);
// The read should have been issued for offset 0 to align with the device block size.
let expected = make_buf(1, PAGE_SIZE * 2);
assert_eq!(&buf, &expected[PAGE_SIZE as usize..]);
// And offset 0 should have been cached.
assert_eq!(data_buf.read(0, buf.as_mut(), &source).await.expect("read failed"), buf.len());
assert_eq!(&buf, &expected[..PAGE_SIZE as usize]);
}
#[fasync::run_singlethreaded(test)]
async fn test_readahead() {
let data_buf = MemDataBuffer::new(100 * PAGE_SIZE);
let device = Arc::new(FakeDevice::new(100, 8192));
let source = FakeSource::new(device.clone());
source.go.signal();
let mut buf = [0; PAGE_SIZE as usize];
assert_eq!(
data_buf.read(10 * PAGE_SIZE, buf.as_mut(), &source).await.expect("read failed"),
buf.len()
);
let expected = make_buf(1, PAGE_SIZE * 2);
assert_eq!(&buf, &expected[..PAGE_SIZE as usize]);
// And there should have been readahead...
assert_eq!(
data_buf.read(11 * PAGE_SIZE, buf.as_mut(), &source).await.expect("read failed"),
buf.len()
);
assert_eq!(&buf, &expected[PAGE_SIZE as usize..]);
// Issue a read at offset zero.
assert_eq!(data_buf.read(0, buf.as_mut(), &source).await.expect("read failed"), buf.len());
let expected2 = make_buf(2, PAGE_SIZE * 2);
assert_eq!(&buf, &expected2[..PAGE_SIZE as usize]);
// And there should have been readahead for that too.
assert_eq!(
data_buf.read(PAGE_SIZE, buf.as_mut(), &source).await.expect("read failed"),
buf.len()
);
assert_eq!(&buf, &expected2[PAGE_SIZE as usize..]);
// And it shouldn't have impacted the results of the first read.
assert_eq!(
data_buf.read(10 * PAGE_SIZE, buf.as_mut(), &source).await.expect("read failed"),
buf.len()
);
assert_eq!(&buf, &expected[..PAGE_SIZE as usize]);
}
}