blob: 9744a5d731f3ede64e44becb7ca7456df19e3f2f [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
super::{OpenBlobError, TruncateBlobError, WriteBlobError},
anyhow::Context as _,
fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg,
fuchsia_zircon_status::Status,
};
pub(super) fn into_blob_writer_and_closer(
fidl: fpkg::BlobWriter,
) -> Result<(Box<dyn Writer>, Box<dyn Closer>), OpenBlobError> {
use fpkg::BlobWriter::*;
match fidl {
File(file) => {
let proxy = file.into_proxy()?;
Ok((Box::new(Clone::clone(&proxy)), Box::new(proxy)))
}
Writer(writer) => {
// fuchsia.fxfs/BlobCreator allows concurrent creation attempts, so we don't need to
// cancel an ongoing attempt before trying again.
Ok((Box::new(FxBlob::new(writer.into_proxy()?)), Box::new(())))
}
}
}
#[async_trait::async_trait]
pub(super) trait Closer: Send + Sync + std::fmt::Debug {
/// Close the blob to enable immediate retry of create and write.
async fn close(&mut self);
/// Attempt to close the blob. Function may return before blob is closed if closing requires
/// async.
fn best_effort_close(&mut self);
}
#[async_trait::async_trait]
impl Closer for fio::FileProxy {
async fn close(&mut self) {
let _: Result<Result<(), i32>, fidl::Error> = fio::FileProxy::close(self).await;
}
fn best_effort_close(&mut self) {
let _: fidl::client::QueryResponseFut<Result<(), i32>> = fio::FileProxy::close(self);
}
}
// fuchsia.fxfs/BlobCreator allows concurrent creation attempts, so we don't need to cancel an
// ongoing attempt before trying again.
#[async_trait::async_trait]
impl Closer for () {
async fn close(&mut self) {}
fn best_effort_close(&mut self) {}
}
#[async_trait::async_trait]
pub(super) trait Writer: Send + std::fmt::Debug {
/// Set the size of the blob.
/// If the blob is size zero, the returned Future should not complete until the blob
/// is readable.
async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError>;
/// Write `bytes` to the blob.
/// The Future returned by the `write` call that writes the final bytes should
/// not complete until the blob is readable.
async fn write(
&mut self,
bytes: &[u8],
after_write: &(dyn Fn(u64) + Send + Sync),
after_write_ack: &(dyn Fn() + Send + Sync),
) -> Result<(), WriteBlobError>;
}
#[async_trait::async_trait]
impl Writer for fio::FileProxy {
async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError> {
self.resize(size).await?.map_err(|i| match Status::from_raw(i) {
Status::NO_SPACE => TruncateBlobError::NoSpace,
other => TruncateBlobError::UnexpectedResponse(other),
})
}
async fn write(
&mut self,
mut bytes: &[u8],
after_write: &(dyn Fn(u64) + Send + Sync),
after_write_ack: &(dyn Fn() + Send + Sync),
) -> Result<(), WriteBlobError> {
while !bytes.is_empty() {
let limit = bytes.len().min(fio::MAX_BUF as usize);
let result_fut = fio::FileProxy::write(self, &bytes[..limit]);
after_write(bytes.len() as u64);
let result = result_fut.await;
after_write_ack();
let written = result?.map_err(|i| match Status::from_raw(i) {
Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
Status::NO_SPACE => WriteBlobError::NoSpace,
other => WriteBlobError::UnexpectedResponse(other),
})? as usize;
if written > bytes.len() {
return Err(WriteBlobError::Overwrite);
}
bytes = &bytes[written..];
}
Ok(())
}
}
#[derive(Debug)]
enum FxBlob {
NeedsTruncate(ffxfs::BlobWriterProxy),
NeedsBytes(blob_writer::BlobWriter),
Invalid,
}
impl FxBlob {
fn new(proxy: ffxfs::BlobWriterProxy) -> Self {
Self::NeedsTruncate(proxy)
}
fn state_str(&self) -> &'static str {
match self {
Self::NeedsTruncate(_) => "needs truncate",
Self::NeedsBytes(_) => "needs bytes",
Self::Invalid => "invalid",
}
}
}
#[async_trait::async_trait]
impl Writer for FxBlob {
async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError> {
*self = match std::mem::replace(self, Self::Invalid) {
Self::NeedsTruncate(proxy) => Self::NeedsBytes(
blob_writer::BlobWriter::create(proxy, size)
.await
.context("creating a BlobWriter")
.map_err(TruncateBlobError::Other)?,
),
Self::NeedsBytes(_) => {
return Err(TruncateBlobError::AlreadyTruncated(self.state_str()))
}
Self::Invalid => return Err(TruncateBlobError::BadState),
};
Ok(())
}
async fn write(
&mut self,
bytes: &[u8],
after_write: &(dyn Fn(u64) + Send + Sync),
after_write_ack: &(dyn Fn() + Send + Sync),
) -> Result<(), WriteBlobError> {
let Self::NeedsBytes(writer) = self else {
return Err(WriteBlobError::BytesNotNeeded(self.state_str()));
};
let fut = writer.write(bytes);
let () = after_write(bytes.len() as u64);
let res = fut.await;
let () = after_write_ack();
res.context("calling write on BlobWriter").map_err(WriteBlobError::Other)
}
}
#[cfg(test)]
mod tests {
use {super::*, futures::stream::TryStreamExt as _};
#[fuchsia_async::run_singlethreaded(test)]
async fn file_proxy_chunks_writes() {
let (mut proxy, mut server) =
fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>().unwrap();
let bytes = vec![0; fio::MAX_BUF as usize + 1];
let write_fut = async move {
<fio::FileProxy as Writer>::write(&mut proxy, &bytes, &|_| (), &|| ()).await.unwrap()
};
let server_fut = async move {
match server.try_next().await.unwrap().unwrap() {
fio::FileRequest::Write { data, responder } => {
// Proxy limited writes to MAX_BUF bytes.
assert_eq!(data, vec![0; fio::MAX_BUF as usize]);
let () = responder.send(Ok(fio::MAX_BUF)).unwrap();
}
req => panic!("unexpected request {req:?}"),
}
match server.try_next().await.unwrap().unwrap() {
fio::FileRequest::Write { data, responder } => {
assert_eq!(data, vec![0; 1]);
let () = responder.send(Ok(1)).unwrap();
}
req => panic!("unexpected request {req:?}"),
}
assert!(server.try_next().await.unwrap().is_none());
};
let ((), ()) = futures::future::join(write_fut, server_fut).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn file_proxy_handles_short_writes() {
let (mut proxy, mut server) =
fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>().unwrap();
let bytes = [0; 10];
let write_fut = async move {
<fio::FileProxy as Writer>::write(&mut proxy, &bytes, &|_| (), &|| ()).await.unwrap()
};
let server_fut = async move {
match server.try_next().await.unwrap().unwrap() {
fio::FileRequest::Write { data, responder } => {
assert_eq!(data, [0; 10]);
// Ack only 8 of the 10 bytes.
let () = responder.send(Ok(8)).unwrap();
}
req => panic!("unexpected request {req:?}"),
}
match server.try_next().await.unwrap().unwrap() {
fio::FileRequest::Write { data, responder } => {
assert_eq!(data, [0; 2]);
let () = responder.send(Ok(2)).unwrap();
}
req => panic!("unexpected request {req:?}"),
}
assert!(server.try_next().await.unwrap().is_none());
};
let ((), ()) = futures::future::join(write_fut, server_fut).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn fxblob_writer() {
let blobfs = blobfs_ramdisk::BlobfsRamdisk::builder().fxblob().start().await.unwrap();
assert_eq!(blobfs.list_blobs().unwrap(), std::collections::BTreeSet::new());
let contents = [0u8; 7];
let hash = fuchsia_merkle::from_slice(&contents).root();
let compressed = delivery_blob::Type1Blob::generate(
&contents[..],
delivery_blob::CompressionMode::Attempt,
);
let writer = blobfs
.blob_creator_proxy()
.unwrap()
.unwrap()
.create(&hash.into(), false)
.await
.unwrap()
.unwrap();
let (mut writer, _closer) =
into_blob_writer_and_closer(fpkg::BlobWriter::Writer(writer)).unwrap();
let () = writer.truncate(compressed.len().try_into().unwrap()).await.unwrap();
let () = writer.write(&compressed, &|_| (), &|| ()).await.unwrap();
assert_eq!(blobfs.list_blobs().unwrap(), std::collections::BTreeSet::from([hash]));
let () = blobfs.stop().await.unwrap();
}
}