blob: 97485da70c1bccd4f1875a93ca5bc3ce5fc80025 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Typesafe wrappers around writing blobs to blobfs.
use {fidl_fuchsia_io as fio, fuchsia_hash::Hash, fuchsia_zircon::Status, thiserror::Error};
pub(crate) async fn create(
blobfs: &fio::DirectoryProxy,
hash: &Hash,
) -> Result<Blob<NeedsTruncate>, CreateError> {
let flags =
fio::OpenFlags::CREATE | fio::OpenFlags::RIGHT_WRITABLE | fio::OpenFlags::RIGHT_READABLE;
let proxy = io_util::directory::open_file(&blobfs, &hash.to_string(), flags).await.map_err(
|e| match e {
io_util::node::OpenError::OpenError(Status::ACCESS_DENIED) => {
CreateError::AlreadyExists
}
other => CreateError::Io(other),
},
)?;
Ok(Blob { proxy, state: NeedsTruncate })
}
/// State for a blob that can be truncated.
#[derive(Debug)]
pub struct NeedsTruncate;
/// State for a blob that can be written to.
#[derive(Debug)]
pub struct NeedsData {
size: u64,
written: u64,
}
/// State for a blob that is present, readable, and has a seek position at the end.
#[derive(Debug)]
pub struct AtEof;
/// A blob in the process of being written.
#[derive(Debug)]
pub struct Blob<S> {
proxy: fio::FileProxy,
state: S,
}
/// A handle to a blob that must be explicitly closed to prevent future opens of the same blob from
/// racing with this blob closing.
#[derive(Debug)]
#[must_use = "Subsequent opens of this blob may race with closing this one"]
pub struct BlobCloser {
proxy: fio::FileProxy,
closed: bool,
}
/// The successful result of truncating a blob to its size.
#[derive(Debug)]
pub enum TruncateSuccess {
/// The blob needs all its data written.
NeedsData(Blob<NeedsData>),
/// The blob is the empty blob and so does not need any data written.
Done(fio::FileProxy),
}
/// The successful result of writing some data to a blob.
#[derive(Debug)]
pub enum WriteSuccess {
/// There is still more data to write.
MoreToWrite(Blob<NeedsData>),
/// The blob is fully written.
Done(Blob<AtEof>),
}
/// An error encountered while creating a blob
#[derive(Debug, Error)]
#[allow(missing_docs)]
pub enum CreateError {
#[error("the blob already exists or is being concurrently written")]
AlreadyExists,
#[error("while creating the blob")]
Io(#[source] io_util::node::OpenError),
}
/// An error encountered while truncating a blob
#[derive(Debug, Error)]
#[allow(missing_docs)]
pub enum TruncateError {
#[error("transport error")]
Fidl(#[from] fidl::Error),
#[error("insufficient storage space is available")]
NoSpace,
#[error("the blob is not the empty blob")]
Corrupt,
#[error("the blob is in the process of being written")]
ConcurrentWrite,
#[error("received unexpected failure status")]
UnexpectedResponse(#[source] Status),
}
/// An error encountered while writing to a blob
#[derive(Debug, Error)]
#[allow(missing_docs)]
pub enum WriteError {
#[error("transport error")]
Fidl(#[from] fidl::Error),
#[error("file endpoint reported it wrote more bytes written than were actually provided to the file endpoint")]
Overwrite,
#[error("the written data was corrupt")]
Corrupt,
#[error("insufficient storage space is available")]
NoSpace,
#[error("received unexpected failure status")]
UnexpectedResponse(#[source] Status),
}
/// An error encountered while reusing a written blob handle to read
#[derive(Debug, Error)]
#[allow(missing_docs)]
pub enum IntoReadError {
#[error("transport error")]
Fidl(#[from] fidl::Error),
#[error("received unexpected failure status")]
UnexpectedResponse(#[from] Status),
}
impl<S> Blob<S> {
fn with_state<T>(self, state: T) -> Blob<T> {
Blob { proxy: self.proxy, state }
}
/// Closes the blob proxy, ignoring errors. More importantly, if the blob isn't fully written,
/// waits until the blob can again be opened for write.
async fn close(self) {
// Not much we can do with an error here, and some code paths will call this close() and
// BlobCloser::close(), so some failures are expected. Either way, we tried to close the
// blob, and this method returns after blobfs's acknowledges the request (or we observe a
// closed proxy).
let _ = self.proxy.close().await;
}
}
impl Blob<NeedsTruncate> {
/// Returns an object that can be used to explicitly close the blob independently of what state
/// this blob is in or when the returned object is dropped, whichever happens first.
pub fn closer(&self) -> BlobCloser {
BlobCloser { proxy: Clone::clone(&self.proxy), closed: false }
}
/// Truncates the blob to the given size. On success, the blob enters the writable state.
pub async fn truncate(self, size: u64) -> Result<TruncateSuccess, TruncateError> {
match self.proxy.resize(size).await?.map_err(Status::from_raw).map_err(
|status| match status {
Status::IO_DATA_INTEGRITY => TruncateError::Corrupt,
Status::NO_SPACE => TruncateError::NoSpace,
Status::BAD_STATE => TruncateError::ConcurrentWrite,
status => TruncateError::UnexpectedResponse(status),
},
) {
Ok(()) => {}
Err(err) => {
// Dropping the file proxy will close the blob asynchronously. Make an explicit call to
// blobfs to close the blob and wait for it to acknowledge that it closed it, ensuring
// a quick retry to re-open this blob for write won't race with the blob closing in the
// background.
self.close().await;
return Err(err);
}
}
Ok(match size {
0 => TruncateSuccess::Done(self.proxy),
_ => TruncateSuccess::NeedsData(self.with_state(NeedsData { size, written: 0 })),
})
}
/// Creates a new blob client backed by the returned request stream. This constructor should
/// not be used outside of tests.
///
/// # Panics
///
/// Panics on error
pub fn new_test() -> (Self, fio::FileRequestStream) {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>().unwrap();
(Blob { proxy, state: NeedsTruncate }, stream)
}
}
impl Blob<NeedsData> {
/// Writes all of the given buffer to the blob.
///
/// # Panics
///
/// Panics if a write is attempted with a buf larger than the remaining blob size.
pub async fn write(mut self, buf: &[u8]) -> Result<WriteSuccess, WriteError> {
assert!(self.state.written + buf.len() as u64 <= self.state.size);
match io_util::file::write(&self.proxy, buf).await {
Ok(()) => {
self.state.written += buf.len() as u64;
if self.state.written == self.state.size {
Ok(WriteSuccess::Done(self.with_state(AtEof)))
} else {
Ok(WriteSuccess::MoreToWrite(self))
}
}
Err(e) => {
self.close().await;
Err(match e {
io_util::file::WriteError::Create(_) => {
// unreachable!(), but opt to return a confusing error instead of panic.
WriteError::UnexpectedResponse(Status::OK)
}
io_util::file::WriteError::Fidl(e) => WriteError::Fidl(e),
io_util::file::WriteError::WriteError(Status::IO_DATA_INTEGRITY) => {
WriteError::Corrupt
}
io_util::file::WriteError::WriteError(Status::NO_SPACE) => WriteError::NoSpace,
io_util::file::WriteError::WriteError(status) => {
WriteError::UnexpectedResponse(status)
}
io_util::file::WriteError::Overwrite => WriteError::Overwrite,
})
}
}
}
}
impl Blob<AtEof> {
/// Rewinds the file position to the start, returning the now read-only FileProxy representing
/// the blob.
pub async fn reopen_for_read(self) -> Result<fio::FileProxy, IntoReadError> {
let _pos: u64 =
self.proxy.seek(fio::SeekOrigin::Start, 0).await?.map_err(Status::from_raw)?;
Ok(self.proxy)
}
}
impl BlobCloser {
/// Close the blob, silently ignoring errors.
pub async fn close(mut self) {
let _ = self.proxy.close().await;
self.closed = true;
}
/// Drops this BlobCloser without closing the underlying blob.
pub fn disarm(mut self) {
self.closed = true;
drop(self);
}
}
impl Drop for BlobCloser {
fn drop(&mut self) {
if !self.closed {
// Dropped without waiting on close. We can at least send the close request here, but
// there could be a race with another attempt to open the blob.
let _ = self.proxy.close();
}
}
}
impl TruncateSuccess {
/// Returns the contained [`TruncateSuccess::NeedsData`] value, consuming the `self` value.
///
/// # Panics
///
/// Panics if the value is a [`TruncateSuccess::Done`].
pub fn unwrap_needs_data(self) -> Blob<NeedsData> {
match self {
TruncateSuccess::NeedsData(blob) => blob,
_ => panic!("unwrap_needs_data() called on {:?}", self),
}
}
/// Returns the contained [`TruncateSuccess::Done`] value, consuming the `self` value.
///
/// # Panics
///
/// Panics if the value is a [`TruncateSuccess::NeedsData`].
pub fn unwrap_done(self) -> fio::FileProxy {
match self {
TruncateSuccess::Done(blob) => blob,
_ => panic!("unwrap_done() called on {:?}", self),
}
}
}
impl WriteSuccess {
/// Returns the contained [`WriteSuccess::MoreToWrite`] value, consuming the `self` value.
///
/// # Panics
///
/// Panics if the value is a [`WriteSuccess::Done`].
pub fn unwrap_more_to_write(self) -> Blob<NeedsData> {
match self {
WriteSuccess::MoreToWrite(blob) => blob,
_ => panic!("unwrap_more_to_write() called on {:?}", self),
}
}
/// Returns the contained [`WriteSuccess::Done`] value, consuming the `self` value.
///
/// # Panics
///
/// Panics if the value is a [`WriteSuccess::MoreToWrite`].
pub fn unwrap_done(self) -> Blob<AtEof> {
match self {
WriteSuccess::Done(blob) => blob,
_ => panic!("unwrap_done() called on {:?}", self),
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::Client,
assert_matches::assert_matches,
blobfs_ramdisk::{BlobfsRamdisk, Ramdisk},
fuchsia_async as fasync,
fuchsia_merkle::MerkleTree,
maplit::hashset,
rand::prelude::*,
};
#[fasync::run_singlethreaded(test)]
async fn empty_blob_is_present_after_truncate() {
let blobfs = BlobfsRamdisk::start().unwrap();
let client = Client::for_ramdisk(&blobfs);
// _ - - _
let contents = [0_0; 0_0];
let hash = MerkleTree::from_reader(&contents[..]).unwrap().root();
let blob = client.open_blob_for_write(&hash).await.unwrap();
let blob = blob.truncate(0u64).await.unwrap().unwrap_done();
// Verify that:
// * opening the blob for read has the file_readable signal asserted and is readable
// * it shows up in readdir
// * the original proxy is now readable
assert!(client.has_blob(&hash).await);
assert_eq!(client.list_known_blobs().await.unwrap(), hashset! {hash});
let also_blob = client.open_blob_for_read(&hash).await.unwrap();
assert_eq!(contents, io_util::file::read(&also_blob).await.unwrap().as_slice());
assert_eq!(contents, io_util::file::read(&blob).await.unwrap().as_slice());
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn detects_corrupt_empty_blob() {
let blobfs = BlobfsRamdisk::start().unwrap();
let client = Client::for_ramdisk(&blobfs);
// The empty blob is always named
// 15ec7bf0b50732b49f8228e07d24365338f9e3ab994b00af08e5a3bffe55fd8b.
// A 0 length blob with any other name (like all f's) is corrupt, but also empty.
let hash = Hash::from([0xff; 32]);
let blob = client.open_blob_for_write(&hash).await.unwrap();
assert_matches!(blob.truncate(0u64).await, Err(TruncateError::Corrupt));
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn detects_corrupt_blob() {
let blobfs = BlobfsRamdisk::start().unwrap();
let client = Client::for_ramdisk(&blobfs);
// The merkle root of b"test" is not all f's, so this blob is corrupt.
let hash = Hash::from([0xff; 32]);
let blob = client.open_blob_for_write(&hash).await.unwrap();
let blob = blob.truncate(4u64).await.unwrap().unwrap_needs_data();
assert_matches!(blob.write(&b"test"[..]).await, Err(WriteError::Corrupt));
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn create_already_present_empty_blob_fails() {
let blobfs = BlobfsRamdisk::builder().with_blob(&b""[..]).start().unwrap();
let client = Client::for_ramdisk(&blobfs);
let hash = MerkleTree::from_reader(&b""[..]).unwrap().root();
assert_matches!(client.open_blob_for_write(&hash).await, Err(CreateError::AlreadyExists));
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn create_already_present_blob_fails() {
let blobfs = BlobfsRamdisk::builder().with_blob(&b"present"[..]).start().unwrap();
let client = Client::for_ramdisk(&blobfs);
let hash = MerkleTree::from_reader(&b"present"[..]).unwrap().root();
assert_matches!(client.open_blob_for_write(&hash).await, Err(CreateError::AlreadyExists));
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn write_read_small_blob() {
let blobfs = BlobfsRamdisk::start().unwrap();
let client = Client::for_ramdisk(&blobfs);
let contents = [3; 1024];
let hash = MerkleTree::from_reader(&contents[..]).unwrap().root();
let blob = client.open_blob_for_write(&hash).await.unwrap();
let blob = blob.truncate(1024u64).await.unwrap().unwrap_needs_data();
let blob = blob.write(&contents[..]).await.unwrap().unwrap_done();
// New connections can now read the blob, even with the original proxy still open.
let also_blob = client.open_blob_for_read(&hash).await.unwrap();
assert_eq!(contents, io_util::file::read(&also_blob).await.unwrap().as_slice());
let blob = blob.reopen_for_read().await.unwrap();
let actual = io_util::file::read(&blob).await.unwrap();
assert_eq!(contents, actual.as_slice());
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn write_small_blob_slowly() {
let blobfs = BlobfsRamdisk::start().unwrap();
let client = Client::for_ramdisk(&blobfs);
let contents = [4; 1024];
let chunk = [4; 1];
let hash = MerkleTree::from_reader(&contents[..]).unwrap().root();
let blob = client.open_blob_for_write(&hash).await.unwrap();
let mut blob = blob.truncate(1024u64).await.unwrap().unwrap_needs_data();
// verify Blob does correct accounting of written data and forwards small writes correctly
// by issuing more than 1 write call instead of 1 big call.
for _ in 0..1023 {
blob = blob.write(&chunk[..]).await.unwrap().unwrap_more_to_write();
}
let blob = blob.write(&chunk[..]).await.unwrap().unwrap_done();
let blob = blob.reopen_for_read().await.unwrap();
let actual = io_util::file::read(&blob).await.unwrap();
assert_eq!(contents, actual.as_slice());
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn write_large_blob() {
let blobfs = BlobfsRamdisk::start().unwrap();
let client = Client::for_ramdisk(&blobfs);
let contents = (0u8..=255u8).cycle().take(1_000_000).collect::<Vec<u8>>();
let hash = MerkleTree::from_reader(&contents[..]).unwrap().root();
let blob = client.open_blob_for_write(&hash).await.unwrap();
let blob = blob.truncate(contents.len() as u64).await.unwrap().unwrap_needs_data();
let blob = blob.write(&contents[..]).await.unwrap().unwrap_done();
let blob = blob.reopen_for_read().await.unwrap();
let actual = io_util::file::read(&blob).await.unwrap();
assert_eq!(contents, actual.as_slice());
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn close_blob_closer() {
let blobfs = BlobfsRamdisk::start().unwrap();
let client = Client::for_ramdisk(&blobfs);
let contents = [3; 1024];
let hash = MerkleTree::from_reader(&contents[..]).unwrap().root();
let blob = client.open_blob_for_write(&hash).await.unwrap();
// make a closer and then immediately close it, then use the now closed blob proxy to show
// that it has been closed.
blob.closer().close().await;
assert_matches!(
blob.truncate(1024u64).await,
Err(TruncateError::Fidl(fidl::Error::ClientChannelClosed {
status: Status::PEER_CLOSED,
..
}))
);
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn disarm_blob_closer() {
let blobfs = BlobfsRamdisk::start().unwrap();
let client = Client::for_ramdisk(&blobfs);
let contents = [3; 1024];
let hash = MerkleTree::from_reader(&contents[..]).unwrap().root();
let blob = client.open_blob_for_write(&hash).await.unwrap();
// make and disarm a closer, then use the not-closed blob proxy to show it is still open.
blob.closer().disarm();
let blob = blob.truncate(1024u64).await.unwrap().unwrap_needs_data();
let blob = blob.write(&contents[..]).await.unwrap().unwrap_done();
drop(blob);
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn concurrent_write_at_truncate() {
let blobfs = BlobfsRamdisk::start().unwrap();
let client = Client::for_ramdisk(&blobfs);
let contents = [3; 1024];
let hash = MerkleTree::from_reader(&contents[..]).unwrap().root();
let blob1 = client.open_blob_for_write(&hash).await.unwrap();
let _blob1 = blob1.truncate(1024u64).await.unwrap().unwrap_needs_data();
assert_matches!(client.open_blob_for_write(&hash).await, Err(CreateError::AlreadyExists));
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn concurrent_write_at_create() {
let blobfs = BlobfsRamdisk::start().unwrap();
let client = Client::for_ramdisk(&blobfs);
let contents = [3; 1024];
let hash = MerkleTree::from_reader(&contents[..]).unwrap().root();
// concurrent opens will succeed as long as no connections have called truncate() yet.
// First truncate() wins.
let blob1 = client.open_blob_for_write(&hash).await.unwrap();
let blob2 = client.open_blob_for_write(&hash).await.unwrap();
let _blob1 = blob1.truncate(1024u64).await.unwrap().unwrap_needs_data();
assert_matches!(blob2.truncate(1024u64).await, Err(TruncateError::ConcurrentWrite));
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn write_too_big_blob_fails_with_no_space() {
let tiny_blobfs =
Ramdisk::builder().block_count(4096).into_blobfs_builder().unwrap().start().unwrap();
let client = Client::for_ramdisk(&tiny_blobfs);
// Deterministically generate a blob that cannot be compressed and is bigger than blobfs
// can store.
const LARGE_BLOB_FILE_SIZE: u64 = 4 * 1024 * 1024;
let mut contents = vec![0u8; LARGE_BLOB_FILE_SIZE as usize];
let mut rng = StdRng::from_seed([0u8; 32]);
rng.fill_bytes(&mut contents[..]);
let hash = MerkleTree::from_reader(&contents[..]).unwrap().root();
// Verify writing that blob fails with an out of space error.
let blob = client.open_blob_for_write(&hash).await.unwrap();
let blob = blob.truncate(LARGE_BLOB_FILE_SIZE).await.unwrap().unwrap_needs_data();
assert_matches!(blob.write(&contents[..]).await, Err(WriteError::NoSpace));
tiny_blobfs.stop().await.unwrap();
}
}