blob: a1fc26bb3baed289457e985853ff3c56ca512019 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Mock implementation of blobfs for blobfs::Client.
use {
fidl::{encoding::Decodable as _, endpoints::RequestStream as _},
fidl_fuchsia_io as fio,
fuchsia_hash::Hash,
fuchsia_zircon::{self as zx, AsHandleRef as _, Status},
futures::{Future, StreamExt as _, TryStreamExt as _},
std::{cmp::min, collections::HashSet, convert::TryInto as _},
};
/// A testing server implementation of /blob.
///
/// Mock does not handle requests until instructed to do so.
pub struct Mock {
pub(super) stream: fio::DirectoryRequestStream,
}
impl Mock {
/// Consume the next directory request, verifying it is intended to read the blob identified
/// by `merkle`. Returns a `Blob` representing the open blob file.
///
/// # Panics
///
/// Panics on error or assertion violation (unexpected requests or a mismatched open call)
pub async fn expect_open_blob(&mut self, merkle: Hash) -> Blob {
match self.stream.next().await {
Some(Ok(fio::DirectoryRequest::Open {
flags,
mode: _,
path,
object,
control_handle: _,
})) => {
assert_eq!(path, merkle.to_string());
FlagSet::OPEN_FOR_READ.verify(flags);
let stream = object.into_stream().unwrap().cast_stream();
Blob { stream }
}
other => panic!("unexpected request: {:?}", other),
}
}
/// Consume the next directory request, verifying it is intended to create the blob identified
/// by `merkle`. Returns a `Blob` representing the open blob file.
///
/// # Panics
///
/// Panics on error or assertion violation (unexpected requests or a mismatched open call)
pub async fn expect_create_blob(&mut self, merkle: Hash) -> Blob {
match self.stream.next().await {
Some(Ok(fio::DirectoryRequest::Open {
flags,
mode: _,
path,
object,
control_handle: _,
})) => {
FlagSet::OPEN_FOR_WRITE.verify(flags);
assert_eq!(path, merkle.to_string());
let stream = object.into_stream().unwrap().cast_stream();
Blob { stream }
}
other => panic!("unexpected request: {:?}", other),
}
}
async fn handle_rewind(&mut self) {
match self.stream.next().await {
Some(Ok(fio::DirectoryRequest::Rewind { responder })) => {
responder.send(Status::OK.into_raw()).unwrap();
}
other => panic!("unexpected request: {:?}", other),
}
}
/// Consume directory requests, verifying they are requests to read directory entries. Respond
/// with dirents constructed from the given entries.
///
/// # Panics
///
/// Panics on error or assertion violation (unexpected requests or not all entries are read)
pub async fn expect_readdir(&mut self, entries: impl Iterator<Item = Hash>) {
// files_async starts by resetting the directory channel's readdir position.
self.handle_rewind().await;
const NAME_LEN: usize = 64;
#[repr(C, packed)]
struct Dirent {
ino: u64,
size: u8,
kind: u8,
name: [u8; NAME_LEN],
}
impl Dirent {
fn as_bytes<'a>(&'a self) -> &'a [u8] {
let start = self as *const Self as *const u8;
// Safe because the FIDL wire format for directory entries is
// defined to be the C packed struct representation used here.
unsafe { std::slice::from_raw_parts(start, std::mem::size_of::<Self>()) }
}
}
let mut entries_iter = entries.map(|hash| Dirent {
ino: fio::INO_UNKNOWN,
size: NAME_LEN as u8,
kind: fio::DirentType::File.into_primitive(),
name: hash.to_string().as_bytes().try_into().unwrap(),
});
loop {
match self.stream.try_next().await.unwrap() {
Some(fio::DirectoryRequest::ReadDirents { max_bytes, responder }) => {
let max_bytes = max_bytes as usize;
assert!(max_bytes >= std::mem::size_of::<Dirent>());
let mut buf = vec![];
while buf.len() + std::mem::size_of::<Dirent>() <= max_bytes {
match entries_iter.next() {
Some(need) => {
buf.extend(need.as_bytes());
}
None => break,
}
}
responder.send(Status::OK.into_raw(), &buf).unwrap();
// Finish after providing an empty chunk.
if buf.is_empty() {
break;
}
}
Some(other) => panic!("unexpected request: {:?}", other),
None => panic!("unexpected stream termination"),
}
}
}
/// Consume N directory requests, verifying they are intended to determine whether the blobs
/// specified `readable` and `missing` are readable or not, responding to the check based on
/// which collection the hash is in.
///
/// # Panics
///
/// Panics on error or assertion violation (unexpected requests, request for unspecified blob)
pub async fn expect_readable_missing_checks(&mut self, readable: &[Hash], missing: &[Hash]) {
let mut readable = readable.iter().copied().collect::<HashSet<_>>();
let mut missing = missing.iter().copied().collect::<HashSet<_>>();
while !(readable.is_empty() && missing.is_empty()) {
match self.stream.next().await {
Some(Ok(fio::DirectoryRequest::Open {
flags,
mode: _,
path,
object,
control_handle: _,
})) => {
FlagSet::OPEN_FOR_READ.verify(flags);
let path: Hash = path.parse().unwrap();
let stream = object.into_stream().unwrap().cast_stream();
let blob = Blob { stream };
if readable.remove(&path) {
blob.succeed_open_with_blob_readable().await;
} else if missing.remove(&path) {
blob.fail_open_with_not_found();
} else {
panic!("Unexpected blob existance check for {}", path);
}
}
other => panic!("unexpected request: {:?}", other),
}
}
}
/// Expects and handles a call to [`Client::filter_to_missing_blobs`], handling its internal
/// heuristic when more than 20 blobs are specified. Verifies the call intends to determine
/// whether the blobs specified in `readable` and `missing` are readable or not, responding to
/// the check based on which collection the hash is in.
///
/// # Panics
///
/// Panics on error or assertion violation (unexpected requests, request for unspecified blob)
pub async fn expect_filter_to_missing_blobs_with_readable_missing_ids(
&mut self,
readable: &[Hash],
missing: &[Hash],
) {
// TODO(fxbug.dev/77717) re-evaluate filter_to_missing_blobs heuristic.
if readable.len() + missing.len() > 20 {
// heuristic path, handle the readdir, and trigger the fast path indicating
// none of the missing blobs may be present by excluding them from the readdir.
self.expect_readdir(readable.iter().copied()).await;
self.expect_readable_missing_checks(readable, &[]).await;
} else {
self.expect_readable_missing_checks(readable, missing).await;
}
}
/// Asserts that the request stream closes without any further requests.
///
/// # Panics
///
/// Panics on error
pub async fn expect_done(mut self) {
match self.stream.next().await {
None => {}
Some(request) => panic!("unexpected request: {:?}", request),
}
}
}
/// A testing server implementation of an open /blob/<merkle> file.
///
/// Blob does not send the OnOpen event or handle requests until instructed to do so.
pub struct Blob {
stream: fio::FileRequestStream,
}
impl Blob {
fn send_on_open_with_file_signals(&mut self, status: Status, signals: zx::Signals) {
let event = fidl::Event::create().unwrap();
event.signal_handle(zx::Signals::NONE, signals).unwrap();
let mut info = fio::NodeInfo::File(fio::FileObject { event: Some(event), stream: None });
let () =
self.stream.control_handle().send_on_open_(status.into_raw(), Some(&mut info)).unwrap();
}
fn send_on_open(&mut self, status: Status) {
self.send_on_open_with_file_signals(status, zx::Signals::NONE);
}
fn send_on_open_with_readable(&mut self, status: Status) {
// Send USER_0 signal to indicate that the blob is available.
self.send_on_open_with_file_signals(status, zx::Signals::USER_0);
}
fn fail_open_with_error(mut self, status: Status) {
assert_ne!(status, Status::OK);
self.send_on_open(status);
}
/// Fail the open request with an error indicating the blob already exists.
///
/// # Panics
///
/// Panics on error
pub fn fail_open_with_already_exists(self) {
self.fail_open_with_error(Status::ACCESS_DENIED);
}
/// Fail the open request with an error indicating the blob does not exist.
///
/// # Panics
///
/// Panics on error
pub fn fail_open_with_not_found(self) {
self.fail_open_with_error(Status::NOT_FOUND);
}
/// Fail the open request with a generic IO error.
///
/// # Panics
///
/// Panics on error
pub fn fail_open_with_io_error(self) {
self.fail_open_with_error(Status::IO);
}
/// Succeeds the open request, but indicate the blob is not yet readable by not asserting the
/// USER_0 signal on the file event handle, then asserts that the connection to the blob is
/// closed.
///
/// # Panics
///
/// Panics on error
pub async fn fail_open_with_not_readable(mut self) {
self.send_on_open(Status::OK);
self.expect_done().await;
}
/// Succeeds the open request, indicating that the blob is readable, then asserts that the
/// connection to the blob is closed.
///
/// # Panics
///
/// Panics on error
pub async fn succeed_open_with_blob_readable(mut self) {
self.send_on_open_with_readable(Status::OK);
self.expect_done().await;
}
/// Succeeds the open request, then verifies the blob is immediately closed (possibly after
/// handling a single Close request).
///
/// # Panics
///
/// Panics on error
pub async fn expect_close(mut self) {
self.send_on_open_with_readable(Status::OK);
match self.stream.next().await {
None => {}
Some(Ok(fio::FileRequest::Close { responder })) => {
let _ = responder.send(&mut Ok(()));
self.expect_done().await;
}
Some(other) => panic!("unexpected request: {:?}", other),
}
}
/// Asserts that the request stream closes without any further requests.
///
/// # Panics
///
/// Panics on error
pub async fn expect_done(mut self) {
match self.stream.next().await {
None => {}
Some(request) => panic!("unexpected request: {:?}", request),
}
}
async fn handle_read(&mut self, data: &[u8]) -> usize {
match self.stream.next().await {
Some(Ok(fio::FileRequest::Read { count, responder })) => {
let count = min(count.try_into().unwrap(), data.len());
responder.send(&mut Ok(data[..count].to_vec())).unwrap();
return count;
}
other => panic!("unexpected request: {:?}", other),
}
}
/// Succeeds the open request, then handle read request with the given blob data.
///
/// # Panics
///
/// Panics on error
pub async fn expect_read(mut self, blob: &[u8]) {
self.send_on_open_with_readable(Status::OK);
let mut rest = blob;
while !rest.is_empty() {
let count = self.handle_read(rest).await;
rest = &rest[count..];
}
// Handle one extra request with empty buffer to signal EOF.
self.handle_read(rest).await;
match self.stream.next().await {
None => {}
Some(Ok(fio::FileRequest::Close { responder })) => {
let _ = responder.send(&mut Ok(()));
}
Some(other) => panic!("unexpected request: {:?}", other),
}
}
/// Succeeds the open request. Then handles get_attr, read, read_at, and possibly a final close
/// requests with the given blob data.
///
/// # Panics
///
/// Panics on error
pub async fn serve_contents(mut self, data: &[u8]) {
self.send_on_open_with_readable(Status::OK);
let mut pos: usize = 0;
loop {
match self.stream.next().await {
Some(Ok(fio::FileRequest::Read { count, responder })) => {
let avail = data.len() - pos;
let count = min(count.try_into().unwrap(), avail);
responder.send(&mut Ok(data[pos..pos + count].to_vec())).unwrap();
pos += count;
}
Some(Ok(fio::FileRequest::ReadAt { count, offset, responder })) => {
let pos: usize = offset.try_into().unwrap();
let avail = data.len() - pos;
let count = min(count.try_into().unwrap(), avail);
responder.send(&mut Ok(data[pos..pos + count].to_vec())).unwrap();
}
Some(Ok(fio::FileRequest::GetAttr { responder })) => {
let mut attr = fio::NodeAttributes::new_empty();
attr.content_size = data.len().try_into().unwrap();
responder.send(Status::OK.into_raw(), &mut attr).unwrap();
}
Some(Ok(fio::FileRequest::Close { responder })) => {
let _ = responder.send(&mut Ok(()));
return;
}
None => {
return;
}
other => panic!("unexpected request: {:?}", other),
}
}
}
async fn handle_truncate(&mut self, status: Status) -> u64 {
match self.stream.next().await {
Some(Ok(fio::FileRequest::Resize { length, responder })) => {
responder
.send(&mut if status == Status::OK { Ok(()) } else { Err(status.into_raw()) })
.unwrap();
length
}
other => panic!("unexpected request: {:?}", other),
}
}
async fn expect_truncate(&mut self) -> u64 {
self.handle_truncate(Status::OK).await
}
async fn handle_write(&mut self, status: Status) -> Vec<u8> {
match self.stream.next().await {
Some(Ok(fio::FileRequest::Write { data, responder })) => {
responder
.send(&mut if status == Status::OK {
Ok(data.len() as u64)
} else {
Err(status.into_raw())
})
.unwrap();
data
}
other => panic!("unexpected request: {:?}", other),
}
}
async fn fail_write_with_status(mut self, status: Status) {
self.send_on_open(Status::OK);
let length = self.expect_truncate().await;
// divide rounding up
let expected_write_calls = (length + (fio::MAX_BUF - 1)) / fio::MAX_BUF;
for _ in 0..(expected_write_calls - 1) {
self.handle_write(Status::OK).await;
}
self.handle_write(status).await;
}
/// Succeeds the open request, consumes the truncate request, the initial write calls, then
/// fails the final write indicating the written data was corrupt.
///
/// # Panics
///
/// Panics on error
pub async fn fail_write_with_corrupt(self) {
self.fail_write_with_status(Status::IO_DATA_INTEGRITY).await
}
/// Succeeds the open request, then returns a future that, when awaited, verifies the blob is
/// truncated, written, and closed with the given `expected` payload.
///
/// # Panics
///
/// Panics on error
pub fn expect_payload(mut self, expected: &[u8]) -> impl Future<Output = ()> + '_ {
self.send_on_open(Status::OK);
async move {
assert_eq!(self.expect_truncate().await, expected.len() as u64);
let mut rest = expected;
while !rest.is_empty() {
let expected_chunk = if rest.len() > fio::MAX_BUF as usize {
&rest[..fio::MAX_BUF as usize]
} else {
rest
};
assert_eq!(self.handle_write(Status::OK).await, expected_chunk);
rest = &rest[expected_chunk.len()..];
}
match self.stream.next().await {
Some(Ok(fio::FileRequest::Close { responder })) => {
responder.send(&mut Ok(())).unwrap();
}
other => panic!("unexpected request: {:?}", other),
}
self.expect_done().await;
}
}
}
#[derive(Copy, Clone, Debug)]
struct FlagSet {
required: fio::OpenFlags,
anti_required: fio::OpenFlags,
}
impl FlagSet {
const OPEN_FOR_READ: FlagSet = FlagSet::new()
.require_present(fio::OpenFlags::RIGHT_READABLE)
.require_absent(fio::OpenFlags::CREATE)
.require_absent(fio::OpenFlags::RIGHT_WRITABLE);
const OPEN_FOR_WRITE: FlagSet = FlagSet::new()
.require_present(fio::OpenFlags::CREATE)
.require_present(fio::OpenFlags::RIGHT_WRITABLE);
const fn new() -> Self {
Self { required: fio::OpenFlags::empty(), anti_required: fio::OpenFlags::empty() }
}
const fn require_present(self, flags: fio::OpenFlags) -> Self {
let Self { required, anti_required } = self;
let required = required.union(flags);
Self { required, anti_required }
}
const fn require_absent(self, flags: fio::OpenFlags) -> Self {
let Self { required, anti_required } = self;
let anti_required = anti_required.union(flags);
Self { required, anti_required }
}
fn verify(self, flags: fio::OpenFlags) {
assert_eq!(flags & self.required, self.required);
assert_eq!(flags & self.anti_required, fio::OpenFlags::empty());
}
}