blob: 41cce2766ba8b0ba77665b2739567a752ef278a1 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
data_buffer::DataBuffer,
errors::FxfsError,
object_handle::ReadObjectHandle,
object_store::allocator::{self},
round::{round_down, round_up},
},
anyhow::{ensure, Error},
interval_tree::{interval::Interval, interval_tree::IntervalTree, utils::RangeOps},
std::ops::Range,
std::sync::Mutex,
std::time::Duration,
storage_device::buffer::Buffer,
};
// This module contains an implementation of a writeback cache.
//
// The writeback cache has two important constants associated with it:
// - The block size (BS), which is established by the filesystem, and
// - The read-ahead size (RAS), which is established by the cache (and must be a multiple of block
// size).
//
// # State management
//
// Data in the cache is managed in BS-aligned chunks. Each of these chunks can be in a few states:
// - Absent (The cache doesn't know what is there, and needs to fetch from disk -- even if the range
// is sparse, the cache needs to attempt a fetch to find out)
// - Clean (The cache has loaded the contents from disk into memory)
// - Dirty (There is a pending write of the chunk)
// - Flushing (There is a pending write of the chunk which is being flushed by some task)
//
// For either of the Dirty states, the cache will have a backing storage reservation for that chunk.
// (While Flushing, there's also a reservation held by the flush task.)
//
// # Resizing
//
// Extending the cache will result in all of the new pages being Clean and immediately
// cache-readable (the cache may, in fact, be sparse).
//
// Shrinking the cache will clear any of the affected ranges (even those which are Dirty or
// Flushing). If an in-progress flush is affected by the truncate, the flush will still write out
// the truncated ranges, and they will be deleted on the next flush.
//
// # Flushing
//
// Flushing is separated into two parts: the first copies out all flushable data/metadata, and the
// second marks that flush as complete (or aborts the flush). At most one task can be flushing at a
// time, and flushing does not block any other cache operations.
/// When reading into the cache, this is the minimum granularity that we load data at.
pub const CACHE_READ_AHEAD_SIZE: u64 = 32_768;
/// StorageReservation should be implemented by filesystems to provide in-memory reservation of
/// blocks for pending writes.
pub trait StorageReservation: Send + Sync {
/// Returns the number of bytes needed to sync |amount| bytes of dirty data. This will take
/// into account sync overhead, and if a flush needs to be split up into several syncs, each
/// should be taken into account.
fn reservation_needed(&self, data: u64) -> u64;
/// Reserves at least |amount| bytes in the filesystem, taking into account alignment.
/// Returns the actual number of bytes reserved, which is >= |amount|.
fn reserve(&self, amount: u64) -> Result<allocator::Reservation, Error>;
/// Wraps a raw reserved-byte count in the RAII type.
fn wrap_reservation(&self, amount: u64) -> allocator::Reservation;
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
// There's an implicit state when there is no entry in the interval tree, which represents an
// absent range in the file (i.e. a hole which is not memory-resident).
enum CacheState {
// The range is memory-resident and needs to be flushed. The cache should have enough bytes
// reserved to write this data range. The range's data should not be discarded.
Dirty,
// The range is memory-resident and is being flushed. The range's data should not be discarded
// until it is Clean when the flush completes.
Flushing,
}
#[derive(Clone, Debug)]
struct CachedRange {
range: Range<u64>,
state: CacheState,
}
impl CachedRange {
fn is_flushing(&self) -> bool {
self.state == CacheState::Flushing
}
}
impl Interval<u64> for CachedRange {
fn clone_with(&self, new_range: &Range<u64>) -> Self {
Self { range: new_range.clone(), state: self.state.clone() }
}
fn merge(&self, other: &Self) -> Self {
let state = self.state;
Self { range: self.range.merge(&other.range), state }
}
fn has_mergeable_properties(&self, other: &Self) -> bool {
self.state == other.state
}
fn overrides(&self, other: &Self) -> bool {
// This follows the state-machine transitions that we expect to perform.
match (&self.state, &other.state) {
// Writing to a Flushing range results in it being Dirty.
(CacheState::Dirty, CacheState::Flushing) => true,
_ => false,
}
}
}
impl AsRef<Range<u64>> for CachedRange {
fn as_ref(&self) -> &Range<u64> {
&self.range
}
}
struct Inner {
intervals: IntervalTree<CachedRange, u64>,
// Number of dirty bytes so far, excluding those which are in the midst of a flush. Every dirty
// byte must have a byte reserved for it to be written back, plus some extra reservation made
// for transaction overhead.
dirty_bytes: u64,
creation_time: Option<Duration>,
modification_time: Option<Duration>,
}
impl Inner {
fn complete_flush(&mut self, data: FlushableData<'_, '_>, completed: bool) {
let mut dirty_bytes = 0;
for range in data.ranges {
let removed =
self.intervals.remove_matching_interval(&range, |i| i.is_flushing()).unwrap();
if !completed {
for mut interval in removed {
interval.state = CacheState::Dirty;
self.intervals.add_interval(&interval).unwrap();
dirty_bytes += interval.range.end - interval.range.start;
}
}
}
if dirty_bytes > 0 {
// If we didn't complete the flush, take back whatever reservation we'll need for
// another attempt.
let reserved_before = data.reserver.reservation_needed(self.dirty_bytes);
self.dirty_bytes += dirty_bytes;
let needed_reservation = data.reserver.reservation_needed(self.dirty_bytes);
let delta = needed_reservation.checked_sub(reserved_before).unwrap();
data.reservation.forget_some(delta);
}
}
fn take_metadata(&mut self, content_size: Option<u64>) -> Option<FlushableMetadata> {
if content_size.is_some()
|| self.creation_time.is_some()
|| self.modification_time.is_some()
{
Some(FlushableMetadata {
content_size,
creation_time: self.creation_time.take(),
modification_time: self.modification_time.take(),
})
} else {
None
}
}
}
pub struct WritebackCache<B> {
inner: Mutex<Inner>,
data: B,
}
#[derive(Debug)]
pub struct FlushableMetadata {
/// The size of the file at flush time, if it has changed.
pub content_size: Option<u64>,
/// Measured in time since the UNIX epoch in the UTC timezone. Individual filesystems set their
/// own granularities.
pub creation_time: Option<Duration>,
/// Measured in time since the UNIX epoch in the UTC timezone. Individual filesystems set their
/// own granularities.
pub modification_time: Option<Duration>,
}
pub struct FlushableData<'a, 'b> {
reservation: &'a allocator::Reservation,
reserver: &'a dyn StorageReservation,
pub ranges: Vec<Range<u64>>,
pub buffer: Buffer<'b>,
}
impl std::fmt::Debug for FlushableData<'_, '_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FlushableData")
.field("reservation", &self.reservation)
.field("ranges", &self.ranges)
.finish()
}
}
impl FlushableData<'_, '_> {
#[cfg(test)]
fn dirty_bytes(&self) -> u64 {
self.ranges.iter().map(|r| r.end - r.start).sum()
}
}
pub struct Flushable<'a, 'b, B: DataBuffer> {
cache: &'a WritebackCache<B>,
pub metadata: Option<FlushableMetadata>,
pub data: Option<FlushableData<'a, 'b>>,
}
impl<B: DataBuffer> std::fmt::Debug for Flushable<'_, '_, B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Flushable")
.field("metadata", &self.metadata)
.field("data", &self.data)
.finish()
}
}
impl<B: DataBuffer> Drop for Flushable<'_, '_, B> {
fn drop(&mut self) {
if self.metadata.is_none() && self.data.is_none() {
return;
}
let mut inner = self.cache.inner.lock().unwrap();
if let Some(metadata) = self.metadata.take() {
if metadata.creation_time.is_some() && inner.creation_time.is_none() {
inner.creation_time = metadata.creation_time;
}
if metadata.modification_time.is_some() && inner.modification_time.is_none() {
inner.modification_time = metadata.modification_time;
}
}
if let Some(data) = self.data.take() {
inner.complete_flush(data, false);
}
}
}
#[derive(Debug)]
pub struct CachedMetadata {
pub content_size: u64,
pub dirty_bytes: u64,
/// Measured in time since the UNIX epoch in the UTC timezone. Individual filesystems set their
/// own granularities.
pub creation_time: Option<Duration>,
/// Measured in time since the UNIX epoch in the UTC timezone. Individual filesystems set their
/// own granularities.
pub modification_time: Option<Duration>,
}
impl<B> Drop for WritebackCache<B> {
fn drop(&mut self) {
let inner = self.inner.lock().unwrap();
if inner.dirty_bytes > 0 {
panic!("Dropping a WritebackCache without calling cleanup will leak reserved bytes");
}
}
}
impl<B: DataBuffer> WritebackCache<B> {
pub fn new(data: B) -> Self {
Self {
inner: Mutex::new(Inner {
intervals: IntervalTree::new(),
dirty_bytes: 0,
creation_time: None,
modification_time: None,
}),
data,
}
}
pub fn cleanup(&self, reserve: &dyn StorageReservation) {
let mut inner = self.inner.lock().unwrap();
if inner.dirty_bytes > 0 {
// Let the RAII wrapper give back the reserved bytes.
let _reservation =
reserve.wrap_reservation(reserve.reservation_needed(inner.dirty_bytes));
inner.dirty_bytes = 0;
}
inner.intervals.remove_interval(&(0..u64::MAX)).unwrap();
}
pub fn cached_metadata(&self) -> CachedMetadata {
let inner = self.inner.lock().unwrap();
CachedMetadata {
content_size: self.data.size(),
dirty_bytes: inner.dirty_bytes,
creation_time: inner.creation_time.clone(),
modification_time: inner.modification_time.clone(),
}
}
pub fn content_size(&self) -> u64 {
self.data.size()
}
pub fn dirty_bytes(&self) -> u64 {
self.inner.lock().unwrap().dirty_bytes
}
/// This is not thread-safe; the caller is responsible for making sure that only one thread is
/// mutating the cache at any point in time.
pub async fn resize(
&self,
size: u64,
block_size: u64,
reserver: &dyn StorageReservation,
) -> Result<(), Error> {
ensure!(size <= i64::MAX as u64, FxfsError::TooBig);
{
let mut inner = self.inner.lock().unwrap();
let old_size = self.content_size();
let aligned_size = round_up(size, block_size).unwrap();
if size < old_size {
let removed = inner.intervals.remove_interval(&(aligned_size..u64::MAX)).unwrap();
let mut dirty_bytes = 0;
for interval in removed {
if let CacheState::Dirty = interval.state {
dirty_bytes += interval.range.end - interval.range.start;
}
}
let before = reserver.reservation_needed(inner.dirty_bytes);
inner.dirty_bytes = inner.dirty_bytes.checked_sub(dirty_bytes).unwrap();
let needed = reserver.reservation_needed(inner.dirty_bytes);
if needed < before {
let _ = reserver.wrap_reservation(before - needed);
}
}
}
// Resize the buffer after making changes to |inner| to avoid a race when truncating (where
// we could have intervals that reference nonexistent parts of the buffer).
// Note that there's a similar race for extending when we do things in this order, but we
// don't think that is problematic since there are no intervals (since we're expanding).
// If that turns out to be problematic, we'll have to atomically update both the buffer and
// the interval tree at once.
self.data.resize(size).await;
Ok(())
}
/// Read from the cache.
pub async fn read(
&self,
offset: u64,
buf: &mut [u8],
source: &dyn ReadObjectHandle,
) -> Result<usize, Error> {
self.data.read(offset, buf, source).await
}
/// Writes some new data into the cache, marking that data as Dirty. If |offset| is None, the
/// data is appended to the end of the existing data. If |current_time| is set (as a duration
/// since the UNIX epoch in UTC, with whatever granularity the filesystem supports), the cached
/// mtime will be set to this value. If the filesystem doesn't support timestamps, it may
/// simply set this to None. Returns the size after the write completes. This is not
/// thread-safe; the caller is responsible for making sure that only one thread is mutating the
/// cache at any point in time.
pub async fn write_or_append(
&self,
offset: Option<u64>,
mut buf: &[u8],
block_size: u64,
reserver: &dyn StorageReservation,
current_time: Option<Duration>,
source: &dyn ReadObjectHandle,
) -> Result<(), Error> {
let size = self.data.size();
let offset = offset.unwrap_or(size);
// Whilst Fxfs could support up to u64::MAX, off_t is i64 so allowing files larger than that
// become difficult to deal with via the POSIX APIs, so, for now, we limit sizes to
// i64::MAX.
ensure!(offset < i64::MAX as u64, FxfsError::TooBig);
let max_len = i64::MAX as u64 - offset;
if buf.len() as u64 > max_len {
buf = &buf[..max_len as usize];
}
// |inner| shouldn't be modified until we're at a part of the function where nothing can
// fail (either before an early-return, or at the end of the function).
let mut dirtied_intervals = vec![];
let mut dirtied_bytes = 0;
let reservation;
{
let inner = self.inner.lock().unwrap();
let end = offset + buf.len() as u64;
let aligned_range = round_down(offset, block_size)..round_up(end, block_size).unwrap();
let intervals = inner.intervals.get_intervals(&aligned_range).unwrap();
// TODO(fxbug.dev/96146): This might be much simpler and more readable if we refactored
// interval_tree to have an iterator interface. See
// https://fuchsia-review.googlesource.com/c/fuchsia/+/547024/comments/523de326_6e2b4766.
let mut current_offset = aligned_range.start;
let mut i = 0;
while current_offset < end {
let interval = intervals.get(i);
if interval.is_none() {
// Passed the last known interval.
dirtied_bytes += aligned_range.end - current_offset;
dirtied_intervals.push(CachedRange {
range: current_offset..aligned_range.end,
state: CacheState::Dirty,
});
break;
}
let interval = interval.unwrap();
assert!(interval.range.start % block_size == 0);
assert!(interval.range.end % block_size == 0);
if current_offset < interval.range.start {
// There's a hole before the next interval.
dirtied_bytes += interval.range.start - current_offset;
dirtied_intervals.push(CachedRange {
range: current_offset..interval.range.start,
state: CacheState::Dirty,
});
current_offset = interval.range.start;
}
// Writing over an existing interval.
let next_end = std::cmp::min(interval.range.end, aligned_range.end);
let overlap_range = current_offset..next_end;
match &interval.state {
CacheState::Dirty => {
// The range is already dirty and has a reservation. Nothing needs to be
// done.
}
CacheState::Flushing => {
// The range is flushing. Since this is a new write, we need to reserve
// space for it, and mark the range as Dirty.
dirtied_bytes += overlap_range.end - overlap_range.start;
dirtied_intervals
.push(CachedRange { range: overlap_range, state: CacheState::Dirty })
}
};
current_offset = next_end;
i += 1;
}
// This needs to be the worst-case amount to reserve, which means just using
// dirtied_bytes here. We'll adjust this down if necessary later.
let reservation_needed = reserver.reservation_needed(dirtied_bytes);
reservation = if reservation_needed > 0 {
Some(reserver.reserve(reservation_needed)?)
} else {
None
};
}
// TODO(fxbug.dev/96074): This will need to change to support partial writes: when short of
// free space it's possible that some of the write will succeed but not all.
self.data.write(offset, buf, source).await?;
// After this point, we're committing changes, so nothing should fail.
let mut inner = self.inner.lock().unwrap();
for interval in dirtied_intervals {
assert!(interval.range.start % block_size == 0);
assert!(interval.range.end % block_size == 0);
inner.intervals.add_interval(&interval).unwrap();
}
if dirtied_bytes > 0 {
let before = reserver.reservation_needed(inner.dirty_bytes);
inner.dirty_bytes += dirtied_bytes;
reservation
.unwrap()
.forget_some(reserver.reservation_needed(inner.dirty_bytes) - before);
}
inner.modification_time = current_time;
Ok(())
}
/// Returns all data which can be flushed. |allocate_buffer| is a callback which is used to
/// allocate Buffer objects. Each pending data region will be copied into a Buffer and returned
/// to the caller in block-aligned ranges. This is not thread-safe with respect to cache
/// mutations; the caller must ensure that no changes can be made to the cache for the duration
/// of this call. The content size returned will only ever increase the size of the object.
/// Truncation must be dealt with by calling take_flushable_metadata.
pub fn take_flushable<'a, F>(
&'a self,
block_size: u64,
last_known_size: u64,
allocate_buffer: F,
reserver: &'a dyn StorageReservation,
reservation: &'a allocator::Reservation,
) -> Flushable<'_, 'a, B>
where
F: Fn(usize) -> Buffer<'a>,
{
let mut inner = self.inner.lock().unwrap();
let size = self.data.size();
let intervals = inner.intervals.remove_interval(&(0..u64::MAX)).unwrap();
let mut bytes_to_flush = 0;
let mut ranges = vec![];
for mut interval in intervals {
assert!(interval.range.start % block_size == 0);
assert!(interval.range.end % block_size == 0);
assert!(!interval.is_flushing(), "Unexpected interval {:?}", interval);
interval.state = CacheState::Flushing;
inner.intervals.add_interval(&interval).unwrap();
bytes_to_flush += interval.range.end - interval.range.start;
ranges.push(interval.range);
}
let content_size = if size > last_known_size { Some(size) } else { None };
let metadata = inner.take_metadata(content_size);
if bytes_to_flush == 0 {
return Flushable { cache: self, metadata, data: None };
}
// Transfer reserved bytes into the supplied reservation.
reservation.add(reserver.reservation_needed(inner.dirty_bytes));
inner.dirty_bytes = 0;
let mut buffer = allocate_buffer(bytes_to_flush as usize);
let mut slice = buffer.as_mut_slice();
for r in &ranges {
let (head, tail) = slice.split_at_mut((r.end - r.start) as usize);
if r.end > size {
let (head, tail) = head.split_at_mut((size - r.start) as usize);
self.data.raw_read(r.start, head);
tail.fill(0);
} else {
self.data.raw_read(r.start, head);
}
slice = tail;
}
Flushable {
cache: self,
metadata,
data: Some(FlushableData { reservation: &reservation, reserver, ranges, buffer }),
}
}
// Returns any cached metadata that needs to be flushed. This will only capture changes in
// content size that shrink the file from it's last recorded/uncached size; `take_flushable`
// handles the case where the file has grown.
pub fn take_flushable_metadata<'a>(&'a self, last_known_size: u64) -> Flushable<'_, 'a, B> {
let mut inner = self.inner.lock().unwrap();
let size = self.data.size();
Flushable {
cache: self,
metadata: inner.take_metadata(if size < last_known_size { Some(size) } else { None }),
data: None,
}
}
/// Indicates that a flush was successful.
pub fn complete_flush<'a>(&self, mut flushed: Flushable<'a, '_, B>) {
flushed.metadata.take();
if let Some(data) = flushed.data.take() {
self.inner.lock().unwrap().complete_flush(data, true);
}
}
/// Sets the cached timestamp values. The filesystem should provide values which are truncated
/// to the filesystem's maximum supported granularity. This is not thread-safe; the caller is
/// responsible for making sure that only one thread is mutating the cache at any point in time.
pub fn update_timestamps(
&self,
creation_time: Option<Duration>,
modification_time: Option<Duration>,
) {
if creation_time.is_none() && modification_time.is_none() {
return;
}
let mut inner = self.inner.lock().unwrap();
inner.creation_time = creation_time.or(inner.creation_time);
inner.modification_time = modification_time.or(inner.modification_time);
}
/// Returns the data buffer.
pub fn data_buffer(&self) -> &B {
&self.data
}
}
#[cfg(test)]
mod tests {
use {
super::{Flushable, FlushableData, StorageReservation, WritebackCache},
crate::{
data_buffer::MemDataBuffer,
filesystem::JournalingObject,
object_store::{
allocator::{Allocator, AllocatorInfo, Reservation, ReservationOwner},
transaction::Transaction,
},
round::round_up,
testing::fake_object::{FakeObject, FakeObjectHandle},
},
anyhow::{anyhow, Error},
assert_matches::assert_matches,
async_trait::async_trait,
fuchsia_async as fasync,
futures::{channel::oneshot::channel, join},
std::{
any::Any,
collections::BTreeMap,
ops::Range,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
},
time::Duration,
},
storage_device::buffer_allocator::{BufferAllocator, MemBufferSource},
};
struct FakeReserverInner {
amount: Mutex<u64>,
limit: u64,
}
impl Drop for FakeReserverInner {
fn drop(&mut self) {
assert_eq!(*self.amount.lock().unwrap(), self.limit);
}
}
struct FakeReserver {
inner: Arc<FakeReserverInner>,
granularity: u64,
sync_overhead: u64,
flush_limit: u64,
}
impl FakeReserver {
fn new(amount: u64, granularity: u64) -> Self {
Self::new_with_sync_overhead(amount, granularity, 0, 0)
}
fn new_with_sync_overhead(
amount: u64,
granularity: u64,
sync_overhead: u64,
flush_limit: u64,
) -> Self {
Self {
inner: Arc::new(FakeReserverInner { amount: Mutex::new(amount), limit: amount }),
granularity,
sync_overhead,
flush_limit,
}
}
}
impl StorageReservation for FakeReserver {
fn reservation_needed(&self, mut amount: u64) -> u64 {
amount = round_up(amount, self.granularity).unwrap();
if self.sync_overhead > 0 && self.flush_limit > 0 {
amount
+ round_up(amount, self.flush_limit).unwrap() / self.flush_limit
* self.sync_overhead
} else {
amount
}
}
fn reserve(&self, amount: u64) -> Result<Reservation, Error> {
self.inner
.clone()
.reserve(round_up(amount, self.granularity).unwrap())
.ok_or(anyhow!("No Space"))
}
fn wrap_reservation(&self, amount: u64) -> Reservation {
Reservation::new(self.inner.clone(), amount)
}
}
// TODO(fxbug.dev/96148): It's crude to implement Allocator here, but we need to clean all of
// this up anyways when we make Reservation a VFS-level construct.
#[async_trait]
impl Allocator for FakeReserverInner {
fn object_id(&self) -> u64 {
unreachable!();
}
fn info(&self) -> AllocatorInfo {
unreachable!();
}
async fn allocate(
&self,
_transaction: &mut Transaction<'_>,
_store_object_id: u64,
_len: u64,
) -> Result<Range<u64>, Error> {
unreachable!();
}
async fn deallocate(
&self,
_transaction: &mut Transaction<'_>,
_object_id: u64,
_device_range: Range<u64>,
) -> Result<u64, Error> {
unreachable!();
}
async fn mark_allocated(
&self,
_transaction: &mut Transaction<'_>,
_store_object_id: u64,
_device_range: Range<u64>,
) -> Result<(), Error> {
unreachable!();
}
async fn mark_for_deletion(
&self,
_transaction: &mut Transaction<'_>,
_owner_object_id: u64,
) {
unimplemented!();
}
fn as_journaling_object(self: Arc<Self>) -> Arc<dyn JournalingObject> {
unreachable!();
}
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
unreachable!();
}
async fn did_flush_device(&self, _flush_log_offset: u64) {
unreachable!();
}
fn reserve(self: Arc<Self>, amount: u64) -> Option<Reservation> {
{
let mut inner = self.amount.lock().unwrap();
if *inner < amount {
return None;
} else {
*inner -= amount;
}
}
Some(Reservation::new(self, amount))
}
fn reserve_at_most(self: Arc<Self>, _amount: u64) -> Reservation {
unreachable!();
}
fn get_allocated_bytes(&self) -> u64 {
unreachable!();
}
fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, i64> {
unimplemented!();
}
fn get_used_bytes(&self) -> u64 {
unreachable!();
}
}
impl ReservationOwner for FakeReserverInner {
fn release_reservation(&self, amount: u64) {
let mut inner = self.amount.lock().unwrap();
*inner += amount;
assert!(*inner <= self.limit);
}
}
#[derive(Debug)]
struct ExpectedRange(u64, Vec<u8>);
fn check_data_matches(actual: &FlushableData<'_, '_>, expected: &[ExpectedRange]) {
if actual.ranges.len() != expected.len() {
panic!("Expected {} ranges, got {} ranges", expected.len(), actual.ranges.len());
}
let mut i = 0;
let mut slice = actual.buffer.as_slice();
while i < actual.ranges.len() {
let expected = expected.get(i).unwrap();
let actual = actual.ranges.get(i).unwrap();
let (head, tail) = slice.split_at((actual.end - actual.start) as usize);
if expected.0 != actual.start || &expected.1[..] != head {
panic!("Expected {:?}, got {:?}, {:?}", expected, actual, slice);
}
slice = tail;
i += 1;
}
}
#[fasync::run_singlethreaded(test)]
async fn test_write_read() {
let reserver = FakeReserver::new(8192, 1);
let cache = WritebackCache::new(MemDataBuffer::new(0));
let mut buffer = vec![0u8; 8192];
let source = FakeObjectHandle::new(Arc::new(FakeObject::new()));
buffer.fill(123u8);
cache
.write_or_append(Some(0), &buffer[..3000], 512, &reserver, None, &source)
.await
.expect("write failed");
buffer.fill(0u8);
assert_eq!(cache.read(0, &mut buffer[..4096], &source).await.expect("read failed"), 3000);
assert_eq!(&buffer[..3000], vec![123u8; 3000 as usize]);
cache.cleanup(&reserver);
}
#[fasync::run_singlethreaded(test)]
async fn test_append() {
let reserver = FakeReserver::new(8192, 1);
let cache = WritebackCache::new(MemDataBuffer::new(0));
let source = FakeObjectHandle::new(Arc::new(FakeObject::new()));
let mut buffer = vec![0u8; 8192];
buffer.fill(123u8);
cache
.write_or_append(None, &buffer[..3000], 512, &reserver, None, &source)
.await
.expect("write failed");
buffer.fill(45u8);
cache
.write_or_append(None, &buffer[..3000], 512, &reserver, None, &source)
.await
.expect("write failed");
buffer.fill(0u8);
assert_eq!(cache.content_size(), 6000);
assert_eq!(cache.read(0, &mut buffer[..6000], &source).await.expect("read failed"), 6000);
assert_eq!(&buffer[..3000], vec![123u8; 3000]);
assert_eq!(&buffer[3000..6000], vec![45u8; 3000]);
cache.cleanup(&reserver);
}
#[fasync::run_singlethreaded(test)]
async fn test_write_reserving_bytes_fails() {
let allocator = BufferAllocator::new(512, Box::new(MemBufferSource::new(16384)));
// We size the reserver so that only a one-block write can succeed.
let reserver = FakeReserver::new(512, 512);
let cache = WritebackCache::new(MemDataBuffer::new(8192));
let source = FakeObjectHandle::new(Arc::new(FakeObject::new()));
let buffer = vec![0u8; 8192];
// Create a clean region in the middle of the cache so that we split the write into two
// dirty ranges.
cache
.write_or_append(Some(512), &buffer[..512], 512, &reserver, None, &source)
.await
.expect("write failed");
{
let reservation = reserver.wrap_reservation(0);
let flushable = cache.take_flushable(
512,
0,
|size| allocator.allocate_buffer(size),
&reserver,
&reservation,
);
let data = flushable.data.as_ref().expect("no data");
assert_eq!(data.dirty_bytes(), 512);
assert_eq!(data.ranges.len(), 1);
cache.complete_flush(flushable);
}
cache
.write_or_append(Some(0), &buffer[..], 512, &reserver, None, &source)
.await
.expect_err("write succeeded");
// Ensure we can still reserve bytes, i.e. no reservations are leaked by the failed write.
assert_matches!(reserver.reserve(512), Ok(_));
// Ensure neither regions were marked dirty, i.e. the tree state wasn't affected.
let reservation = reserver.wrap_reservation(0);
let flushable = cache.take_flushable(
512,
0,
|size| allocator.allocate_buffer(size),
&reserver,
&reservation,
);
assert_matches!(flushable.data, None);
cache.complete_flush(flushable);
cache.cleanup(&reserver);
}
#[fasync::run_singlethreaded(test)]
async fn test_resize_expand() {
let allocator = BufferAllocator::new(512, Box::new(MemBufferSource::new(16384)));
let reserver = FakeReserver::new(8192, 1);
let cache = WritebackCache::new(MemDataBuffer::new(0));
let source = FakeObjectHandle::new(Arc::new(FakeObject::new()));
let mut buffer = vec![0u8; 8192];
buffer.fill(123u8);
cache
.write_or_append(None, &buffer[..1], 512, &reserver, None, &source)
.await
.expect("write failed");
cache.resize(1000, 512, &reserver).await.expect("resize failed");
assert_eq!(cache.content_size(), 1000);
// The entire length of the file should be clean and contain the write, plus some zeroes.
buffer.fill(0xaa);
assert_eq!(cache.read(0, &mut buffer[..], &source).await.expect("read failed"), 1000);
assert_eq!(&buffer[..1], vec![123u8]);
assert_eq!(&buffer[1..1000], vec![0u8; 999]);
// Only the first blocks should need a flush.
let reservation = reserver.wrap_reservation(0);
let flushable = cache.take_flushable(
512,
0,
|size| allocator.allocate_buffer(size),
&reserver,
&reservation,
);
let data = flushable.data.as_ref().expect("no data");
check_data_matches(
&data,
&[ExpectedRange(0, {
let mut data = vec![0u8; 512];
data[0] = 123u8;
data
})],
);
assert_eq!(flushable.metadata.as_ref().expect("no metadata").content_size, Some(1000));
cache.complete_flush(flushable);
cache.cleanup(&reserver);
}
#[fasync::run_singlethreaded(test)]
async fn test_resize_shrink() {
let allocator = BufferAllocator::new(512, Box::new(MemBufferSource::new(16384)));
let reserver = FakeReserver::new(8192, 1);
let cache = WritebackCache::new(MemDataBuffer::new(0));
let source = FakeObjectHandle::new(Arc::new(FakeObject::new()));
let mut buffer = vec![0u8; 8192];
buffer.fill(123u8);
cache
.write_or_append(None, &buffer[..], 512, &reserver, None, &source)
.await
.expect("write failed");
cache.resize(1000, 512, &reserver).await.expect("resize failed");
assert_eq!(cache.content_size(), 1000);
// The resize should have truncated the pending writes.
let reservation = reserver.wrap_reservation(0);
let flushable = cache.take_flushable(
512,
0,
|size| allocator.allocate_buffer(size),
&reserver,
&reservation,
);
let data = flushable.data.as_ref().expect("no data");
assert_eq!(data.dirty_bytes(), 1024);
check_data_matches(
data,
&[ExpectedRange(0, {
let mut data = vec![0; 1024];
data[..1000].fill(123);
data
})],
);
assert_eq!(flushable.metadata.as_ref().expect("no metadata").content_size, Some(1000));
cache.complete_flush(flushable);
cache.cleanup(&reserver);
}
#[fasync::run_singlethreaded(test)]
async fn test_flush_no_data() {
let allocator = BufferAllocator::new(512, Box::new(MemBufferSource::new(8192)));
let reserver = FakeReserver::new(1, 1);
let cache = WritebackCache::new(MemDataBuffer::new(0));
let reservation = reserver.wrap_reservation(0);
assert_matches!(
cache.take_flushable(
512,
0,
|size| allocator.allocate_buffer(size),
&reserver,
&reservation,
),
Flushable { data: None, metadata: None, .. }
);
cache.cleanup(&reserver);
}
#[fasync::run_singlethreaded(test)]
async fn test_flush_some_data() {
let allocator = BufferAllocator::new(512, Box::new(MemBufferSource::new(65536)));
let reserver = FakeReserver::new(65536, 512);
let cache = WritebackCache::new(MemDataBuffer::new(0));
let source = FakeObjectHandle::new(Arc::new(FakeObject::new()));
let mut buffer = vec![0u8; 8192];
buffer.fill(123u8);
cache
.write_or_append(Some(0), &buffer[..2000], 512, &reserver, None, &source)
.await
.expect("write failed");
buffer.fill(45u8);
cache
.write_or_append(Some(2048), &buffer[..1], 512, &reserver, None, &source)
.await
.expect("write failed");
buffer.fill(67u8);
cache
.write_or_append(Some(4000), &buffer[..100], 512, &reserver, None, &source)
.await
.expect("write failed");
buffer.fill(89u8);
cache
.write_or_append(Some(4000), &buffer[..50], 512, &reserver, None, &source)
.await
.expect("write failed");
let reservation = reserver.wrap_reservation(0);
let flushable = cache.take_flushable(
512,
0,
|size| allocator.allocate_buffer(size),
&reserver,
&reservation,
);
let data = flushable.data.as_ref().expect("no data");
assert_eq!(data.dirty_bytes(), 2048 + 512 + 1024);
check_data_matches(
&data,
&[
ExpectedRange(0, {
let mut data = vec![0u8; 2560];
data[..2000].fill(123u8);
data[2048..2049].fill(45u8);
data
}),
ExpectedRange(3584, {
let mut data = vec![0u8; 1024];
data[416..466].fill(89u8);
data[466..516].fill(67u8);
data
}),
],
);
assert_eq!(flushable.metadata.as_ref().expect("no metadata").content_size, Some(4100));
cache.complete_flush(flushable);
cache.cleanup(&reserver);
}
#[fasync::run_singlethreaded(test)]
async fn test_flush_returns_reservation_on_abort() {
let allocator = BufferAllocator::new(512, Box::new(MemBufferSource::new(65536)));
// Enough room for 2 flushes of 512 bytes each
let reserver = FakeReserver::new_with_sync_overhead(2048, 512, 512, 1024);
let cache = WritebackCache::new(MemDataBuffer::new(0));
let source = FakeObjectHandle::new(Arc::new(FakeObject::new()));
let buffer = [0u8; 1];
cache
.write_or_append(Some(0), &buffer, 512, &reserver, None, &source)
.await
.expect("write failed");
let reservation = reserver.wrap_reservation(0);
{
let flushable = cache.take_flushable(
512,
0,
|size| allocator.allocate_buffer(size),
&reserver,
&reservation,
);
let data = flushable.data.as_ref().expect("no data");
assert_eq!(data.dirty_bytes(), 512);
assert_eq!(reservation.amount(), 1024);
// This write will claim another 512 + 512 bytes of reservation, taking the rest of the
// pool.
cache
.write_or_append(Some(512), &buffer, 512, &reserver, None, &source)
.await
.expect("write failed");
reserver.reserve(1).expect_err("Reservation should be full");
}
// Dropping |data| should have given 512 bytes back to the |reservation| (and thus the
// pool) and kept a total of 1536 bytes, since we only need 1536 bytes to flush the
// remaining 1024 bytes of data (v.s. the 2048 bytes we needed when we interleaved
// writing/syncing).
assert_eq!(reservation.amount(), 512);
reserver.reserve(1).expect_err("Reservation should be full");
let reservation2 = reserver.wrap_reservation(0);
let flushable = cache.take_flushable(
512,
0,
|size| allocator.allocate_buffer(size),
&reserver,
&reservation2,
);
let data = flushable.data.as_ref().expect("no data");
assert_eq!(data.dirty_bytes(), 1024);
assert_eq!(reservation2.amount(), 1536);
cache.cleanup(&reserver);
}
#[fasync::run_singlethreaded(test)]
async fn test_flush_most_recent_write_timestamp() {
let allocator = BufferAllocator::new(512, Box::new(MemBufferSource::new(65536)));
let reserver = FakeReserver::new(65536, 4096);
let cache = WritebackCache::new(MemDataBuffer::new(0));
let source = FakeObjectHandle::new(Arc::new(FakeObject::new()));
let secs = AtomicU64::new(1);
let current_time = || Some(Duration::new(secs.fetch_add(1, Ordering::SeqCst), 0));
let mut buffer = vec![0u8; 8192];
buffer.fill(123u8);
cache
.write_or_append(Some(0), &buffer[..1], 512, &reserver, current_time(), &source)
.await
.expect("write failed");
buffer.fill(45u8);
cache
.write_or_append(Some(0), &buffer[..1], 512, &reserver, current_time(), &source)
.await
.expect("write failed");
let reservation = reserver.wrap_reservation(0);
let flushable = cache.take_flushable(
512,
0,
|size| allocator.allocate_buffer(size),
&reserver,
&reservation,
);
assert_eq!(
flushable.metadata.as_ref().expect("no metadata").modification_time,
Some(Duration::new(2, 0))
);
cache.complete_flush(flushable);
cache.cleanup(&reserver);
}
#[fasync::run_singlethreaded(test)]
async fn test_flush_explicit_timestamps() {
let allocator = BufferAllocator::new(512, Box::new(MemBufferSource::new(65536)));
let reserver = FakeReserver::new(65536, 4096);
let cache = WritebackCache::new(MemDataBuffer::new(0));
let source = FakeObjectHandle::new(Arc::new(FakeObject::new()));
let buffer = vec![0u8; 8192];
cache
.write_or_append(Some(0), &buffer[..1], 512, &reserver, Some(Duration::ZERO), &source)
.await
.expect("write failed");
cache.update_timestamps(Some(Duration::new(1, 0)), Some(Duration::new(2, 0)));
let reservation = reserver.wrap_reservation(0);
let flushable = cache.take_flushable(
512,
0,
|size| allocator.allocate_buffer(size),
&reserver,
&reservation,
);
let metadata = flushable.metadata.as_ref().expect("no metadata");
assert_eq!(metadata.creation_time, Some(Duration::new(1, 0)));
assert_eq!(metadata.modification_time, Some(Duration::new(2, 0)));
cache.complete_flush(flushable);
cache.cleanup(&reserver);
}
#[fasync::run_singlethreaded(test)]
async fn test_resize_while_flushing() {
let allocator = BufferAllocator::new(512, Box::new(MemBufferSource::new(8192)));
let reserver = FakeReserver::new(65536, 512);
let cache = WritebackCache::new(MemDataBuffer::new(0));
let source = FakeObjectHandle::new(Arc::new(FakeObject::new()));
let mut buffer = vec![0u8; 512];
buffer.fill(123u8);
cache
.write_or_append(Some(0), &buffer[..], 512, &reserver, None, &source)
.await
.expect("write failed");
let (send1, recv1) = channel();
let (send2, recv2) = channel();
join!(
async {
let reservation = reserver.wrap_reservation(0);
let flushable = cache.take_flushable(
512,
0,
|size| allocator.allocate_buffer(size),
&reserver,
&reservation,
);
assert_eq!(
flushable.metadata.as_ref().expect("no metadata").content_size,
Some(512)
);
check_data_matches(
flushable.data.as_ref().expect("no data"),
&[ExpectedRange(0, vec![123u8; 512])],
);
send1.send(()).unwrap();
recv2.await.unwrap();
},
async {
recv1.await.unwrap();
cache.resize(511, 512, &reserver).await.expect("resize failed");
send2.send(()).unwrap();
},
);
let reservation = reserver.wrap_reservation(0);
let flushable = cache.take_flushable(
512,
0,
|size| allocator.allocate_buffer(size),
&reserver,
&reservation,
);
assert_eq!(flushable.metadata.as_ref().expect("no metadata").content_size, Some(511));
let mut expected = vec![123u8; 511];
expected.append(&mut vec![0]);
let data = flushable.data.as_ref().expect("no data");
check_data_matches(&data, &[ExpectedRange(0, expected)]);
cache.complete_flush(flushable);
cache.cleanup(&reserver);
}
}