blob: ab2bbc54290c63f837bf9ecbfab2064b9fe527a2 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
super::{BufferSlice, MutableBufferSlice, RemoteBlockClientSync, VmoId},
anyhow::{ensure, Error},
fuchsia_syslog::fx_log_err,
fuchsia_zircon as zx,
linked_hash_map::LinkedHashMap,
std::io::SeekFrom,
std::io::Write,
};
const VMO_SIZE: u64 = 262_144;
const BLOCK_SIZE: u64 = 8192;
const BLOCK_COUNT: usize = (VMO_SIZE / BLOCK_SIZE) as usize;
struct CacheEntry {
vmo_offset: u64,
dirty: bool,
}
#[derive(Debug, Default, Eq, PartialEq)]
pub struct Stats {
read_count: u64,
write_count: u64,
cache_hits: u64,
}
/// Wraps RemoteBlockDeviceSync providing a simple LRU cache and trait implementations for
/// std::io::{Read, Seek, Write}. This is unlikely to be performant; the implementation is single
/// threaded. The cache works by dividing up a VMO into BLOCK_COUNT blocks of BLOCK_SIZE bytes, and
/// maintaining mappings from device offsets to offsets in the VMO.
pub struct Cache {
device: RemoteBlockClientSync,
vmo: zx::Vmo,
vmo_id: VmoId,
map: LinkedHashMap<u64, CacheEntry>,
offset: u64, // For std::io::{Read, Seek, Write}
stats: Stats,
}
impl Cache {
/// Returns a new Cache wrapping the given RemoteBlockClientSync.
pub fn new(device: RemoteBlockClientSync) -> Result<Self, Error> {
ensure!(
BLOCK_SIZE % device.block_size() as u64 == 0,
"underlying block size not supported"
);
let vmo = zx::Vmo::create(VMO_SIZE)?;
let vmo_id = device.attach_vmo(&vmo)?;
Ok(Self {
device,
vmo,
vmo_id,
map: Default::default(),
offset: 0,
stats: Stats::default(),
})
}
fn device_size(&self) -> u64 {
self.device.block_count() * self.device.block_size() as u64
}
// Finds a block that can be used for the given offset, marking dirty if requested. Returns a
// tuple with the VMO offset and whether it was a cache hit. If not a cache hit, the caller is
// responsible for initializing the data and inserting a cache entry.
fn get_block(&mut self, offset: u64, mark_dirty: bool) -> Result<(u64, bool), Error> {
if let Some(ref mut entry) = self.map.get_refresh(&offset) {
self.stats.cache_hits += 1;
if mark_dirty {
entry.dirty = true;
}
Ok((entry.vmo_offset, true))
} else {
let vmo_offset = if self.map.len() < BLOCK_COUNT {
self.map.len() as u64 * BLOCK_SIZE
} else {
let entry = self.map.pop_front().unwrap();
if entry.1.dirty {
self.stats.write_count += 1;
self.device.write_at(
BufferSlice::new_with_vmo_id(
&self.vmo_id,
entry.1.vmo_offset,
std::cmp::min(BLOCK_SIZE, self.device_size() - entry.0),
),
entry.0,
)?;
}
entry.1.vmo_offset
};
Ok((vmo_offset, false))
}
}
// Reads the block at the given offset and marks it dirty if requested. Returns the offset in
// the VMO.
fn read_block(&mut self, offset: u64, mark_dirty: bool) -> Result<u64, Error> {
let (vmo_offset, hit) = self.get_block(offset, mark_dirty)?;
if !hit {
self.stats.read_count += 1;
self.device.read_at(
MutableBufferSlice::new_with_vmo_id(
&self.vmo_id,
vmo_offset,
std::cmp::min(BLOCK_SIZE, self.device_size() - offset),
),
offset,
)?;
self.map.insert(offset, CacheEntry { vmo_offset, dirty: mark_dirty });
}
Ok(vmo_offset)
}
/// Reads at |offset| into |buf|.
pub fn read_at(&mut self, mut buf: &mut [u8], offset: u64) -> Result<(), Error> {
ensure!(
offset <= self.device_size() && buf.len() as u64 <= self.device_size() - offset,
"read exceeds device size"
);
// Start by reading the head.
let mut aligned_offset = offset - offset % BLOCK_SIZE;
let end = offset + buf.len() as u64;
if aligned_offset < offset {
let vmo_offset = self.read_block(aligned_offset, false)?;
let to_copy = std::cmp::min(aligned_offset + BLOCK_SIZE, end) - offset;
self.vmo.read(&mut buf[..to_copy as usize], vmo_offset + offset - aligned_offset)?;
aligned_offset += BLOCK_SIZE;
buf = &mut buf[to_copy as usize..];
}
// Now do whole blocks.
while aligned_offset + BLOCK_SIZE <= end {
let vmo_offset = self.read_block(aligned_offset, false)?;
self.vmo.read(&mut buf[..BLOCK_SIZE as usize], vmo_offset)?;
aligned_offset += BLOCK_SIZE;
buf = &mut buf[BLOCK_SIZE as usize..];
}
// And finally the tail.
if end > aligned_offset {
let vmo_offset = self.read_block(aligned_offset, false)?;
self.vmo.read(buf, vmo_offset)?;
}
Ok(())
}
/// Writes from |buf| to |offset|.
pub fn write_at(&mut self, mut buf: &[u8], offset: u64) -> Result<(), Error> {
ensure!(
offset <= self.device_size() && buf.len() as u64 <= self.device_size() - offset,
"write exceeds device size"
);
// Start by writing the head.
let mut aligned_offset = offset - offset % BLOCK_SIZE;
let end = offset + buf.len() as u64;
if aligned_offset < offset {
let vmo_offset = self.read_block(aligned_offset, true)?;
let to_copy = std::cmp::min(aligned_offset + BLOCK_SIZE, end) - offset;
self.vmo.write(&buf[..to_copy as usize], vmo_offset + offset - aligned_offset)?;
aligned_offset += BLOCK_SIZE;
buf = &buf[to_copy as usize..];
}
// Now do whole blocks.
while aligned_offset + BLOCK_SIZE <= end {
let (vmo_offset, hit) = self.get_block(aligned_offset, true)?;
self.vmo.write(&buf[..BLOCK_SIZE as usize], vmo_offset)?;
if !hit {
self.map.insert(aligned_offset, CacheEntry { vmo_offset, dirty: true });
}
aligned_offset += BLOCK_SIZE;
buf = &buf[BLOCK_SIZE as usize..];
}
// And finally the tail.
if end > aligned_offset {
let vmo_offset = self.read_block(aligned_offset, true)?;
self.vmo.write(buf, vmo_offset)?;
}
Ok(())
}
/// Returns statistics.
pub fn stats(&self) -> &Stats {
&self.stats
}
/// Returns a reference to the underlying device
/// Can be used for additional control, like instructing the device to flush any written data
pub fn device(&self) -> &RemoteBlockClientSync {
&self.device
}
pub fn flush_device(&self) -> Result<(), Error> {
self.device.flush()
}
}
impl Drop for Cache {
fn drop(&mut self) {
if let Err(e) = self.flush() {
fx_log_err!("Flush failed: {}", e);
}
self.vmo_id.take().into_id(); // Ok to leak because fifo will be closed.
}
}
fn into_io_error(error: Error) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, error)
}
impl std::io::Read for Cache {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
if self.offset > self.device_size() {
return Ok(0);
}
let max_len = self.device_size() - self.offset;
if buf.len() as u64 > max_len {
buf = &mut buf[0..max_len as usize];
}
self.read_at(buf, self.offset).map_err(into_io_error)?;
self.offset += buf.len() as u64;
Ok(buf.len())
}
}
impl Write for Cache {
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
if self.offset > self.device_size() {
return Ok(0);
}
let max_len = self.device_size() - self.offset;
if buf.len() as u64 > max_len {
buf = &buf[0..max_len as usize];
}
self.write_at(&buf, self.offset).map_err(into_io_error)?;
self.offset += buf.len() as u64;
Ok(buf.len())
}
/// This does *not* issue a flush to the underlying block device; this will only send the
/// writes.
fn flush(&mut self) -> std::io::Result<()> {
let max = self.device_size();
for mut entry in self.map.entries() {
if entry.get().dirty {
self.stats.write_count += 1;
self.device
.write_at(
BufferSlice::new_with_vmo_id(
&self.vmo_id,
entry.get().vmo_offset,
std::cmp::min(BLOCK_SIZE, max - *entry.key()),
),
*entry.key(),
)
.map_err(into_io_error)?;
entry.get_mut().dirty = false;
}
}
Ok(())
}
}
impl std::io::Seek for Cache {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.offset = match pos {
SeekFrom::Start(offset) => Some(offset),
SeekFrom::End(delta) => {
if delta >= 0 {
self.device_size().checked_add(delta as u64)
} else {
self.device_size().checked_sub(-delta as u64)
}
}
SeekFrom::Current(delta) => {
if delta >= 0 {
self.offset.checked_add(delta as u64)
} else {
self.offset.checked_sub(-delta as u64)
}
}
}
.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidInput, "bad delta"))?;
Ok(self.offset)
}
}
#[cfg(test)]
mod tests {
use {
super::{Cache, Stats},
crate::RemoteBlockClientSync,
ramdevice_client::RamdiskClient,
std::io::{Read, Seek, SeekFrom, Write},
};
const RAMDISK_BLOCK_SIZE: u64 = 1024;
const RAMDISK_BLOCK_COUNT: u64 = 1023; // Deliberate for testing max offset.
const RAMDISK_SIZE: u64 = RAMDISK_BLOCK_SIZE * RAMDISK_BLOCK_COUNT;
pub fn make_ramdisk() -> (RamdiskClient, RemoteBlockClientSync) {
ramdevice_client::wait_for_device(
"/dev/sys/platform/00:00:2d/ramctl",
std::time::Duration::from_secs(30),
)
.expect("ramctl did not appear");
let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
.expect("RamdiskClient::create failed");
let remote_block_device =
RemoteBlockClientSync::new(ramdisk.open().expect("ramdisk.open failed"))
.expect("RemoteBlockClientSync::new failed");
(ramdisk, remote_block_device)
}
#[test]
fn test_cache_read_at_and_write_at_with_no_hits() {
let (_ramdisk, remote_block_device) = make_ramdisk();
let mut cache = Cache::new(remote_block_device).expect("Cache::new failed");
let mut offset = 5;
const TEST_COUNT: usize = super::BLOCK_COUNT * 2; // Chosen so there are no cache hits.
const DATA: &[u8] = b"hello";
for _ in 0..TEST_COUNT {
cache.write_at(DATA, offset).expect("cache.write failed");
// The delta here is deliberately chosen to catch mistakes such as returning data from
// the wrong block.
offset += super::BLOCK_SIZE + 1;
}
assert_eq!(
cache.stats(),
&Stats {
read_count: TEST_COUNT as u64,
write_count: super::BLOCK_COUNT as u64,
cache_hits: 0
}
);
offset = 5;
for _ in 0..TEST_COUNT {
let mut buf = [0; 5];
cache.read_at(&mut buf, offset).expect("cache.read_at failed");
assert_eq!(&buf, DATA);
offset += super::BLOCK_SIZE + 1;
}
assert_eq!(
cache.stats(),
&Stats {
read_count: 2 * TEST_COUNT as u64,
write_count: TEST_COUNT as u64,
cache_hits: 0
}
);
}
#[test]
fn test_cache_read_at_and_write_at_with_hit() {
let (_ramdisk, remote_block_device) = make_ramdisk();
let mut cache = Cache::new(remote_block_device).expect("Cache::new failed");
const OFFSET: u64 = 11;
const DATA: &[u8] = b"hello";
cache.write_at(DATA, OFFSET).expect("cache.write failed");
let mut buf = [0; 5];
cache.read_at(&mut buf, OFFSET).expect("cache.read_at failed");
assert_eq!(&buf, DATA);
assert_eq!(cache.stats(), &Stats { read_count: 1, write_count: 0, cache_hits: 1 });
}
#[test]
fn test_cache_aligned_read_at_and_write_at() {
let (_ramdisk, remote_block_device) = make_ramdisk();
let mut cache = Cache::new(remote_block_device).expect("Cache::new failed");
const OFFSET: u64 = 11;
const BLOCKS: usize = 3;
const DATA_LEN: usize = super::BLOCK_SIZE as usize * BLOCKS + 11;
let data = [0xe2u8; DATA_LEN];
// This should require alignment at the start, and at the end with some whole blocks.
cache.write_at(&data, OFFSET).expect("cache.write failed");
let mut buf = [0; DATA_LEN + 2]; // Read an extra byte at the start and at the end.
cache.read_at(&mut buf, OFFSET - 1).expect("cache.read_at failed");
assert_eq!(buf[0], 0);
assert_eq!(buf[DATA_LEN + 1], 0);
assert_eq!(&buf[1..DATA_LEN + 1], &data[0..DATA_LEN]);
// We should have only read the first and last blocks. The writes to the whole blocks should
// not have triggered reads.
assert_eq!(
cache.stats(),
&Stats { read_count: 2, write_count: 0, cache_hits: BLOCKS as u64 + 1 }
);
}
#[test]
fn test_cache_aligned_read_at_and_write_at_cold() {
// The same as the previous test, but tear down the cache after the writes.
let (ramdisk, remote_block_device) = make_ramdisk();
let mut cache = Cache::new(remote_block_device).expect("Cache::new failed");
const OFFSET: u64 = 11;
const BLOCKS: usize = 3;
const DATA_LEN: usize = super::BLOCK_SIZE as usize * BLOCKS + 11;
let data = [0xe2u8; DATA_LEN];
// This should require alignment at the start, and at the end with some whole blocks.
cache.write_at(&data, OFFSET).expect("cache.write failed");
assert_eq!(cache.stats(), &Stats { read_count: 2, write_count: 0, cache_hits: 0 });
drop(cache);
let mut cache = Cache::new(
RemoteBlockClientSync::new(ramdisk.open().expect("ramdisk.open failed"))
.expect("RemoteBlockClientSync::new failed"),
)
.expect("Cache::new failed");
let mut buf = [0; DATA_LEN + 2]; // Read an extra byte at the start and at the end.
cache.read_at(&mut buf, OFFSET - 1).expect("cache.read_at failed");
assert_eq!(buf[0], 0);
assert_eq!(buf[DATA_LEN + 1], 0);
assert_eq!(&buf[1..DATA_LEN + 1], &data[0..DATA_LEN]);
// We should have only read the first and last blocks. The writes to the whole blocks should
// not have triggered reads.
assert_eq!(
cache.stats(),
&Stats { read_count: BLOCKS as u64 + 1, write_count: 0, cache_hits: 0 }
);
}
#[test]
fn test_io_read_write_and_seek() {
let (_ramdisk, remote_block_device) = make_ramdisk();
let mut cache = Cache::new(remote_block_device).expect("Cache::new failed");
const OFFSET: u64 = 11;
const DATA: &[u8] = b"hello";
assert_eq!(cache.seek(SeekFrom::Start(OFFSET)).expect("seek failed"), OFFSET);
cache.write(DATA).expect("cache.write failed");
assert_eq!(
cache.seek(SeekFrom::Current(-(DATA.len() as i64))).expect("seek failed"),
OFFSET
);
let mut buf = [0u8; 5];
assert_eq!(cache.read(&mut buf).expect("cache.read failed"), DATA.len());
assert_eq!(&buf, DATA);
}
#[test]
fn test_io_read_write_and_seek_at_max_offset() {
let (_ramdisk, remote_block_device) = make_ramdisk();
let mut cache = Cache::new(remote_block_device).expect("Cache::new failed");
const DATA: &[u8] = b"hello";
assert_eq!(cache.seek(SeekFrom::End(-1)).expect("seek failed"), RAMDISK_SIZE - 1);
assert_eq!(cache.write(DATA).expect("cache.write failed"), 1);
assert_eq!(cache.seek(SeekFrom::End(-4)).expect("seek failed"), RAMDISK_SIZE - 4);
let mut buf = [0x56u8; 5];
assert_eq!(cache.read(&mut buf).expect("cache.read failed"), 4);
assert_eq!(&buf, &[0, 0, 0, b'h', 0x56]);
}
#[test]
fn test_read_beyond_max_offset_returns_error() {
let (_ramdisk, remote_block_device) = make_ramdisk();
let mut cache = Cache::new(remote_block_device).expect("Cache::new failed");
let mut buf = [0u8; 2];
cache.read_at(&mut buf, RAMDISK_SIZE).expect_err("read_at succeeded");
cache.read_at(&mut buf, RAMDISK_SIZE - 1).expect_err("read_at succeeded");
}
#[test]
fn test_write_beyond_max_offset_returns_error() {
let (_ramdisk, remote_block_device) = make_ramdisk();
let mut cache = Cache::new(remote_block_device).expect("Cache::new failed");
let buf = [0u8; 2];
cache.write_at(&buf, RAMDISK_SIZE).expect_err("write_at succeeded");
cache.write_at(&buf, RAMDISK_SIZE - 1).expect_err("write_at succeeded");
}
#[test]
fn test_read_with_overflow_returns_error() {
let (_ramdisk, remote_block_device) = make_ramdisk();
let mut cache = Cache::new(remote_block_device).expect("Cache::new failed");
let mut buf = [0u8; 2];
cache.read_at(&mut buf, u64::MAX - 1).expect_err("read_at succeeded");
}
#[test]
fn test_write_with_overflow_returns_error() {
let (_ramdisk, remote_block_device) = make_ramdisk();
let mut cache = Cache::new(remote_block_device).expect("Cache::new failed");
let buf = [0u8; 2];
cache.write_at(&buf, u64::MAX - 1).expect_err("write_at succeeded");
}
#[test]
fn test_read_and_write_at_max_offset_suceeds() {
let (_ramdisk, remote_block_device) = make_ramdisk();
let mut cache = Cache::new(remote_block_device).expect("Cache::new failed");
let buf = [0xd4u8; 2];
cache.write_at(&buf, RAMDISK_SIZE - buf.len() as u64).expect("write_at failed");
let mut read_buf = [0xf3u8; 2];
cache.read_at(&mut read_buf, RAMDISK_SIZE - buf.len() as u64).expect("read_at failed");
assert_eq!(&buf, &read_buf);
}
#[test]
fn test_seek_with_bad_delta_returns_error() {
let (_ramdisk, remote_block_device) = make_ramdisk();
let mut cache = Cache::new(remote_block_device).expect("Cache::new failed");
cache.seek(SeekFrom::End(-(RAMDISK_SIZE as i64) - 1)).expect_err("seek suceeded");
cache.seek(SeekFrom::Current(-1)).expect_err("seek succeeded");
}
#[test]
fn test_ramdisk_with_large_block_size_returns_error() {
ramdevice_client::wait_for_device(
"/dev/sys/platform/00:00:2d/ramctl",
std::time::Duration::from_secs(30),
)
.expect("ramctl did not appear");
let ramdisk =
RamdiskClient::create(super::BLOCK_SIZE * 2, 10).expect("RamdiskClient::create failed");
let remote_block_device =
RemoteBlockClientSync::new(ramdisk.open().expect("ramdisk.open failed"))
.expect("RemoteBlockClientSync::new failed");
Cache::new(remote_block_device).err().expect("Cache::new succeeded");
}
}