blob: 3794360ba1ff49f490ef30757a554847f5999fdd [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![deny(missing_docs)]
//! Typesafe wrappers around the /blob filesystem.
use {
fidl::endpoints::{Proxy as _, ServerEnd},
fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg,
fuchsia_hash::{Hash, ParseHashError},
fuchsia_zircon::{self as zx, AsHandleRef as _, Status},
futures::{stream, StreamExt as _},
std::collections::HashSet,
thiserror::Error,
tracing::{error, info, warn},
vfs::{
common::send_on_open_with_error, execution_scope::ExecutionScope, file::StreamIoConnection,
ObjectRequest, ObjectRequestRef, ProtocolsExt, ToObjectRequest as _,
},
};
pub mod mock;
pub use mock::Mock;
/// Blobfs client errors.
#[derive(Debug, Error)]
#[allow(missing_docs)]
pub enum BlobfsError {
#[error("while opening blobfs dir")]
OpenDir(#[from] fuchsia_fs::node::OpenError),
#[error("while cloning the blobfs dir")]
CloneDir(#[from] fuchsia_fs::node::CloneError),
#[error("while listing blobfs dir")]
ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
#[error("while deleting blob")]
Unlink(#[source] Status),
#[error("while sync'ing")]
Sync(#[source] Status),
#[error("while parsing blob merkle hash")]
ParseHash(#[from] ParseHashError),
#[error("FIDL error")]
Fidl(#[from] fidl::Error),
#[error("while connecting to fuchsia.fxfs/BlobCreator")]
ConnectToBlobCreator(#[source] anyhow::Error),
#[error("while connecting to fuchsia.fxfs/BlobReader")]
ConnectToBlobReader(#[source] anyhow::Error),
#[error("while connecting to fuchsia.kernel/VmexResource")]
ConnectToVmexResource(#[source] anyhow::Error),
#[error("while setting the VmexResource")]
InitVmexResource(#[source] anyhow::Error),
}
/// An error encountered while creating a blob
#[derive(Debug, Error)]
#[allow(missing_docs)]
pub enum CreateError {
#[error("the blob already exists or is being concurrently written")]
AlreadyExists,
#[error("while creating the blob")]
Io(#[source] fuchsia_fs::node::OpenError),
#[error("while converting the proxy into a client end")]
ConvertToClientEnd,
#[error("FIDL error")]
Fidl(#[from] fidl::Error),
#[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
BlobCreator(ffxfs::CreateBlobError),
}
impl From<ffxfs::CreateBlobError> for CreateError {
fn from(e: ffxfs::CreateBlobError) -> Self {
match e {
ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
}
}
}
/// A builder for [`Client`]
#[derive(Default)]
pub struct ClientBuilder {
use_reader: Reader,
use_creator: bool,
readable: bool,
writable: bool,
executable: bool,
}
#[derive(Default)]
enum Reader {
#[default]
DontUse,
Use {
use_vmex: bool,
},
}
impl ClientBuilder {
/// Opens the /blob directory in the component's namespace with readable, writable, and/or
/// executable flags. Connects to the fuchsia.fxfs.BlobCreator and BlobReader if requested.
/// Connects to and initializes the VmexResource if `use_vmex` is set. Returns a `Client`.
pub async fn build(self) -> Result<Client, BlobfsError> {
let mut flags = fio::OpenFlags::empty();
if self.readable {
flags |= fio::OpenFlags::RIGHT_READABLE
}
if self.writable {
flags |= fio::OpenFlags::RIGHT_WRITABLE
}
if self.executable {
flags |= fio::OpenFlags::RIGHT_EXECUTABLE
}
let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
let reader = match self.use_reader {
Reader::DontUse => None,
Reader::Use { use_vmex } => {
if use_vmex {
if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
fidl_fuchsia_kernel::VmexResourceMarker,
>() {
if let Ok(vmex) = client.get().await {
info!("Got vmex resource");
vmo_blob::init_vmex_resource(vmex)
.map_err(BlobfsError::InitVmexResource)?;
}
}
}
Some(
fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
.map_err(BlobfsError::ConnectToBlobReader)?,
)
}
};
let creator = if self.use_creator {
Some(
fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
.map_err(BlobfsError::ConnectToBlobCreator)?,
)
} else {
None
};
Ok(Client { dir, creator, reader })
}
/// `Client` will connect to and use fuchsia.fxfs/BlobReader for reads. Sets the VmexResource
/// for `Client`. The VmexResource is used by `get_backing_memory` to mark blobs as executable.
pub fn use_reader(self) -> Self {
Self { use_reader: Reader::Use { use_vmex: true }, ..self }
}
/// `Client` will connect to and use fuchsia.fxfs/BlobReader for reads. Does not set the
/// VmexResource.
pub fn use_reader_no_vmex(self) -> Self {
Self { use_reader: Reader::Use { use_vmex: false }, ..self }
}
/// If set, `Client` will connect to and use fuchsia.fxfs/BlobCreator for writes.
pub fn use_creator(self) -> Self {
Self { use_creator: true, ..self }
}
/// If set, `Client` will connect to /blob in the current component's namespace with
/// OPEN_RIGHT_READABLE.
pub fn readable(self) -> Self {
Self { readable: true, ..self }
}
/// If set, `Client` will connect to /blob in the current component's namespace with
/// OPEN_RIGHT_WRITABLE. WRITABLE is needed so that `delete_blob` can call
/// fuchsia.io/Directory.Unlink.
pub fn writable(self) -> Self {
Self { writable: true, ..self }
}
/// If set, `Client` will connect to /blob in the current component's namespace with
/// OPEN_RIGHT_EXECUTABLE.
pub fn executable(self) -> Self {
Self { executable: true, ..self }
}
}
impl Client {
/// Create an empty `ClientBuilder`
pub fn builder() -> ClientBuilder {
Default::default()
}
}
/// Blobfs client
#[derive(Debug, Clone)]
pub struct Client {
dir: fio::DirectoryProxy,
creator: Option<ffxfs::BlobCreatorProxy>,
reader: Option<ffxfs::BlobReaderProxy>,
}
impl Client {
/// Returns a client connected to the given blob directory, BlobCreatorProxy, and
/// BlobReaderProxy. If `vmex` is passed in, sets the VmexResource, which is used to mark blobs
/// as executable. If `creator` or `reader` is not supplied, writes or reads respectively will
/// be performed through the blob directory.
pub fn new(
dir: fio::DirectoryProxy,
creator: Option<ffxfs::BlobCreatorProxy>,
reader: Option<ffxfs::BlobReaderProxy>,
vmex: Option<zx::Resource>,
) -> Result<Self, anyhow::Error> {
if let Some(vmex) = vmex {
vmo_blob::init_vmex_resource(vmex)?;
}
Ok(Self { dir, creator, reader })
}
/// Creates a new client backed by the returned request stream. This constructor should not be
/// used outside of tests.
///
/// # Panics
///
/// Panics on error
pub fn new_test() -> (Self, fio::DirectoryRequestStream) {
let (dir, stream) =
fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>().unwrap();
(Self { dir, creator: None, reader: None }, stream)
}
/// Creates a new client backed by the returned mock. This constructor should not be used
/// outside of tests.
///
/// # Panics
///
/// Panics on error
pub fn new_mock() -> (Self, mock::Mock) {
let (dir, stream) =
fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>().unwrap();
(Self { dir, creator: None, reader: None }, mock::Mock { stream })
}
/// Open a blob for read. `scope` will only be used if the client was configured to use
/// fuchsia.fxfs.BlobReader.
pub fn open_blob_for_read(
&self,
blob: &Hash,
flags: fio::OpenFlags,
scope: ExecutionScope,
server_end: ServerEnd<fio::NodeMarker>,
) -> Result<(), fidl::Error> {
let describe = flags.contains(fio::OpenFlags::DESCRIBE);
// Reject requests that attempt to open blobs as writable.
if flags.contains(fio::OpenFlags::RIGHT_WRITABLE) {
send_on_open_with_error(describe, server_end, zx::Status::ACCESS_DENIED);
return Ok(());
}
// Reject requests that attempt to create new blobs.
if flags.intersects(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT) {
send_on_open_with_error(describe, server_end, zx::Status::NOT_SUPPORTED);
return Ok(());
}
// Use blob reader protocol if available, otherwise fallback to fuchsia.io/Directory.Open.
return if let Some(reader) = &self.reader {
let object_request = flags.to_object_request(server_end);
let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
Ok(())
} else {
self.dir.open(flags, fio::ModeType::empty(), &blob.to_string(), server_end)
};
}
/// Open a blob for read using open2. `scope` will only be used if the client was configured to
/// use fuchsia.fxfs.BlobReader.
pub fn open2_blob_for_read(
&self,
blob: &Hash,
protocols: fio::ConnectionProtocols,
scope: ExecutionScope,
object_request: ObjectRequestRef<'_>,
) -> Result<(), zx::Status> {
// Reject requests that attempt to open blobs as writable.
if protocols.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
return Err(zx::Status::ACCESS_DENIED);
}
// Reject requests that attempt to create new blobs.
if protocols.creation_mode() != vfs::CreationMode::Never {
return Err(zx::Status::NOT_SUPPORTED);
}
// Errors below will be communicated via the `object_request` channel.
let object_request = object_request.take();
// Use blob reader protocol if available, otherwise fallback to fuchsia.io/Directory.Open2.
if let Some(reader) = &self.reader {
let () = open_blob_with_reader(reader.clone(), *blob, scope, protocols, object_request);
} else {
let _: Result<(), ()> = self
.dir
.open2(&blob.to_string(), &protocols, object_request.into_channel())
.map_err(|fidl_error| warn!("Failed to open blob {:?}: {:?}", blob, fidl_error));
}
Ok(())
}
/// Returns the list of known blobs in blobfs.
pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
// fuchsia.io.Directory.ReadDirents uses a per-connection index into the array of
// directory entries. To prevent contention over this index by concurrent calls (either
// from concurrent calls to list_known_blobs on this object, or on clones of this object,
// or other clones of the DirectoryProxy this object was made from), create a new
// connection which will have its own index.
let private_connection = fuchsia_fs::directory::clone_no_describe(&self.dir, None)?;
fuchsia_fs::directory::readdir(&private_connection)
.await
.map_err(BlobfsError::ReadDir)?
.into_iter()
.filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
.map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
.collect()
}
/// Delete the blob with the given merkle hash.
pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
self.dir
.unlink(&blob.to_string(), &fio::UnlinkOptions::default())
.await?
.map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
}
/// Open a new blob for write.
pub async fn open_blob_for_write(&self, blob: &Hash) -> Result<fpkg::BlobWriter, CreateError> {
Ok(if let Some(blob_creator) = &self.creator {
fpkg::BlobWriter::Writer(blob_creator.create(blob, false).await??)
} else {
fpkg::BlobWriter::File(
self.open_blob_proxy_from_dir_for_write(blob)
.await?
.into_channel()
.map_err(|_: fio::FileProxy| CreateError::ConvertToClientEnd)?
.into_zx_channel()
.into(),
)
})
}
/// Open a new blob for write, unconditionally using the blob directory.
async fn open_blob_proxy_from_dir_for_write(
&self,
blob: &Hash,
) -> Result<fio::FileProxy, CreateError> {
let flags = fio::OpenFlags::CREATE
| fio::OpenFlags::RIGHT_WRITABLE
| fio::OpenFlags::RIGHT_READABLE;
let path = delivery_blob::delivery_blob_path(blob);
fuchsia_fs::directory::open_file(&self.dir, &path, flags).await.map_err(|e| match e {
fuchsia_fs::node::OpenError::OpenError(Status::ACCESS_DENIED) => {
CreateError::AlreadyExists
}
other => CreateError::Io(other),
})
}
/// Returns whether blobfs has a blob with the given hash.
pub async fn has_blob(&self, blob: &Hash) -> bool {
if let Some(reader) = &self.reader {
// TODO(https://fxbug.dev/295552228): Use faster API for determining blob presence.
matches!(reader.get_vmo(blob).await, Ok(Ok(_)))
} else {
let file = match fuchsia_fs::directory::open_file_no_describe(
&self.dir,
&blob.to_string(),
fio::OpenFlags::DESCRIBE | fio::OpenFlags::RIGHT_READABLE,
) {
Ok(file) => file,
Err(_) => return false,
};
let mut events = file.take_event_stream();
let event = match events.next().await {
None => return false,
Some(event) => match event {
Err(_) => return false,
Ok(event) => match event {
fio::FileEvent::OnOpen_ { s, info } => {
if Status::from_raw(s) != Status::OK {
return false;
}
match info {
Some(info) => match *info {
fio::NodeInfoDeprecated::File(fio::FileObject {
event: Some(event),
stream: _, // TODO(https://fxbug.dev/293606235): Use stream
}) => event,
_ => return false,
},
_ => return false,
}
}
fio::FileEvent::OnRepresentation { payload } => match payload {
fio::Representation::File(fio::FileInfo {
observer: Some(event),
stream: _, // TODO(https://fxbug.dev/293606235): Use stream
..
}) => event,
_ => return false,
},
},
},
};
// Check that the USER_0 signal has been asserted on the file's event to make sure we
// return false on the edge case of the blob is current being written.
match event.wait_handle(zx::Signals::USER_0, zx::Time::INFINITE_PAST) {
Ok(_) => true,
Err(status) => {
if status != Status::TIMED_OUT {
warn!("blobfs: unknown error asserting blob existence: {}", status);
}
false
}
}
}
}
/// Determines which of candidate blobs exist and are readable in blobfs, returning the
/// set difference of candidates and readable.
/// If provided, `all_known` should be a superset of all readable blobs in blobfs, i.e.
/// if a blob is readable it must be in `all_known`, but non-readable blobs may also be
/// included.
/// `all_known` is used to skip the expensive per-blob readable check for blobs that we are
/// sure are missing.
pub async fn filter_to_missing_blobs(
&self,
candidates: &HashSet<Hash>,
all_known: Option<&HashSet<Hash>>,
) -> HashSet<Hash> {
// This heuristic was taken from pkgfs. We are not sure how useful it is or why it was
// added, however it is kept in out of an abundance of caution. We *suspect* the heuristic
// is a performance optimization. Without the heuristic, we would always have to open every
// candidate blob and see if it's readable, which may be expensive if there are many blobs.
//
// Note that if there are less than 20 blobs, we don't use the heuristic. This is because we
// assume there is a certain threshold of number of blobs in a package where it is faster to
// first do a readdir on blobfs to help rule out some blobs without having to open them. We
// assume this threshold is 20. The optimal threshold is likely different between pkg-cache
// and pkgfs, and, especially since this checks multiple blobs concurrently, we may not be
// getting any benefits from the heuristic anymore.
//
// If you wish to remove this heuristic or change the threshold, consider doing a trace on
// packages with varying numbers of blobs present/missing.
// TODO(https://fxbug.dev/42157763) re-evaluate filter_to_missing_blobs heuristic.
let all_known_storage;
let all_known = if let Some(all_known) = all_known {
Some(all_known)
} else {
if candidates.len() > 20 {
if let Some(all_known) = self.list_known_blobs().await.ok() {
all_known_storage = all_known;
Some(&all_known_storage)
} else {
None
}
} else {
None
}
};
stream::iter(candidates.clone())
.map(move |blob| {
async move {
// We still need to check `has_blob()` even if the blob is in `all_known`,
// because it might not have been fully written yet.
if all_known.map(|blobs| blobs.contains(&blob)) == Some(false)
|| !self.has_blob(&blob).await
{
Some(blob)
} else {
None
}
}
})
.buffer_unordered(50)
.filter_map(|blob| async move { blob })
.collect()
.await
}
/// Call fuchsia.io/Node.Sync on the blobfs directory.
pub async fn sync(&self) -> Result<(), BlobfsError> {
self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
}
}
/// Spawns a task on `scope` to attempt opening `blob` via `reader`. Creates a file connection to
/// the blob using [`vmo_blob::VmoBlob`]. Errors will be sent via `object_request` asynchronously.
fn open_blob_with_reader<P: ProtocolsExt + Send>(
reader: ffxfs::BlobReaderProxy,
blob_hash: Hash,
scope: ExecutionScope,
protocols: P,
object_request: ObjectRequest,
) {
object_request.spawn(&scope.clone(), move |object_request| {
Box::pin(async move {
let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
error!("Blob reader channel closed: {:?}", status);
status
} else {
error!("Transport error on get_vmo: {:?}", fidl_error);
zx::Status::INTERNAL
}
})?;
let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
let vmo_blob = vmo_blob::VmoBlob::new(vmo);
object_request.create_connection(scope, vmo_blob, protocols, StreamIoConnection::create)
})
});
}
#[cfg(test)]
impl Client {
/// Constructs a new [`Client`] connected to the provided [`BlobfsRamdisk`]. Tests in this
/// crate should use this constructor rather than [`BlobfsRamdisk::client`], which returns
/// the non-cfg(test) build of this crate's [`blobfs::Client`]. While tests could use the
/// [`blobfs::Client`] returned by [`BlobfsRamdisk::client`], it will be a different type than
/// [`super::Client`], and the tests could not access its private members or any cfg(test)
/// specific functionality.
///
/// # Panics
///
/// Panics on error.
pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
Self::new(
blobfs.root_dir_proxy().unwrap(),
blobfs.blob_creator_proxy().unwrap(),
blobfs.blob_reader_proxy().unwrap(),
None,
)
.unwrap()
}
}
#[cfg(test)]
#[allow(clippy::bool_assert_comparison)]
mod tests {
use {
super::*, assert_matches::assert_matches, blobfs_ramdisk::BlobfsRamdisk,
fuchsia_async as fasync, futures::stream::TryStreamExt, maplit::hashset,
std::io::Write as _, std::sync::Arc,
};
#[fasync::run_singlethreaded(test)]
async fn list_known_blobs_empty() {
let blobfs = BlobfsRamdisk::start().await.unwrap();
let client = Client::for_ramdisk(&blobfs);
assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn list_known_blobs() {
let blobfs = BlobfsRamdisk::builder()
.with_blob(&b"blob 1"[..])
.with_blob(&b"blob 2"[..])
.start()
.await
.unwrap();
let client = Client::for_ramdisk(&blobfs);
let expected = blobfs.list_blobs().unwrap().into_iter().collect();
assert_eq!(client.list_known_blobs().await.unwrap(), expected);
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn delete_blob_and_then_list() {
let blobfs = BlobfsRamdisk::builder()
.with_blob(&b"blob 1"[..])
.with_blob(&b"blob 2"[..])
.start()
.await
.unwrap();
let client = Client::for_ramdisk(&blobfs);
let merkle = fuchsia_merkle::from_slice(&b"blob 1"[..]).root();
assert_matches!(client.delete_blob(&merkle).await, Ok(()));
let expected = hashset! {fuchsia_merkle::from_slice(&b"blob 2"[..]).root()};
assert_eq!(client.list_known_blobs().await.unwrap(), expected);
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn delete_non_existing_blob() {
let blobfs = BlobfsRamdisk::start().await.unwrap();
let client = Client::for_ramdisk(&blobfs);
let blob_merkle = Hash::from([1; 32]);
assert_matches!(
client.delete_blob(&blob_merkle).await,
Err(BlobfsError::Unlink(Status::NOT_FOUND))
);
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn delete_blob_mock() {
let (client, mut stream) = Client::new_test();
let blob_merkle = Hash::from([1; 32]);
fasync::Task::spawn(async move {
match stream.try_next().await.unwrap().unwrap() {
fio::DirectoryRequest::Unlink { name, responder, .. } => {
assert_eq!(name, blob_merkle.to_string());
responder.send(Ok(())).unwrap();
}
other => panic!("unexpected request: {other:?}"),
}
})
.detach();
assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
}
#[fasync::run_singlethreaded(test)]
async fn has_blob() {
let blobfs = BlobfsRamdisk::builder().with_blob(&b"blob 1"[..]).start().await.unwrap();
let client = Client::for_ramdisk(&blobfs);
assert_eq!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await, true);
assert_eq!(client.has_blob(&Hash::from([1; 32])).await, false);
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn has_blob_fxblob() {
let blobfs =
BlobfsRamdisk::builder().fxblob().with_blob(&b"blob 1"[..]).start().await.unwrap();
let client = Client::for_ramdisk(&blobfs);
assert!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await);
assert!(!client.has_blob(&Hash::from([1; 32])).await);
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn has_blob_return_false_if_blob_is_partially_written() {
let blobfs = BlobfsRamdisk::start().await.unwrap();
let client = Client::for_ramdisk(&blobfs);
let blob = [3; 1024];
let hash = fuchsia_merkle::from_slice(&blob).root();
let mut file = blobfs.root_dir().unwrap().write_file(hash.to_string(), 0o777).unwrap();
assert_eq!(client.has_blob(&hash).await, false);
file.set_len(blob.len() as u64).unwrap();
assert_eq!(client.has_blob(&hash).await, false);
file.write_all(&blob[..512]).unwrap();
assert_eq!(client.has_blob(&hash).await, false);
file.write_all(&blob[512..]).unwrap();
assert_eq!(client.has_blob(&hash).await, true);
blobfs.stop().await.unwrap();
}
async fn resize(blob: &fio::FileProxy, size: usize) {
let () = blob.resize(size as u64).await.unwrap().map_err(Status::from_raw).unwrap();
}
async fn write(blob: &fio::FileProxy, bytes: &[u8]) {
assert_eq!(
blob.write(bytes).await.unwrap().map_err(Status::from_raw).unwrap(),
bytes.len() as u64
);
}
#[fasync::run_singlethreaded(test)]
async fn write_delivery_blob() {
let blobfs = BlobfsRamdisk::start().await.unwrap();
let client = Client::for_ramdisk(&blobfs);
let content = [3; 1024];
let hash = fuchsia_merkle::from_slice(&content).root();
let delivery_content =
delivery_blob::Type1Blob::generate(&content, delivery_blob::CompressionMode::Always);
let proxy = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
let () = resize(&proxy, delivery_content.len()).await;
let () = write(&proxy, &delivery_content).await;
assert!(client.has_blob(&hash).await);
blobfs.stop().await.unwrap();
}
/// Wrapper for a blob and its hash. This lets the tests retain ownership of the Blob,
/// which is important because it ensures blobfs will not close partially written blobs for the
/// duration of the test.
struct TestBlob {
_blob: fio::FileProxy,
hash: Hash,
}
async fn open_blob_only(client: &Client, content: &[u8]) -> TestBlob {
let hash = fuchsia_merkle::from_slice(content).root();
let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
TestBlob { _blob, hash }
}
async fn open_and_truncate_blob(client: &Client, content: &[u8]) -> TestBlob {
let hash = fuchsia_merkle::from_slice(content).root();
let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
let () = resize(&_blob, content.len()).await;
TestBlob { _blob, hash }
}
async fn partially_write_blob(client: &Client, content: &[u8]) -> TestBlob {
let hash = fuchsia_merkle::from_slice(content).root();
let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
let () = resize(&_blob, content.len()).await;
let () = write(&_blob, &content[..content.len() / 2]).await;
TestBlob { _blob, hash }
}
async fn fully_write_blob(client: &Client, content: &[u8]) -> TestBlob {
let hash = fuchsia_merkle::from_slice(content).root();
let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
let () = resize(&_blob, content.len()).await;
let () = write(&_blob, &content).await;
TestBlob { _blob, hash }
}
#[fasync::run_singlethreaded(test)]
async fn filter_to_missing_blobs_without_heuristic() {
let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
let client = Client::for_ramdisk(&blobfs);
let missing_hash0 = Hash::from([0; 32]);
let missing_hash1 = Hash::from([1; 32]);
let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
assert_eq!(
client
.filter_to_missing_blobs(
// Pass in <= 20 candidates so the heuristic is not used.
&hashset! { missing_hash0, missing_hash1,
present_blob0.hash, present_blob1.hash
},
None
)
.await,
hashset! { missing_hash0, missing_hash1 }
);
blobfs.stop().await.unwrap();
}
/// Similar to the above test, except also test that partially written blobs count as missing.
#[fasync::run_singlethreaded(test)]
async fn filter_to_missing_blobs_without_heuristic_and_with_partially_written_blobs() {
let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
let client = Client::for_ramdisk(&blobfs);
// Some blobs are created (but not yet truncated).
let missing_blob0 = open_blob_only(&client, &[0; 1024]).await;
// Some are truncated but not written.
let missing_blob1 = open_and_truncate_blob(&client, &[1; 1024]).await;
// Some are partially written.
let missing_blob2 = partially_write_blob(&client, &[2; 1024]).await;
// Some are fully written.
let present_blob = fully_write_blob(&client, &[3; 1024]).await;
assert_eq!(
client
.filter_to_missing_blobs(
&hashset! {
missing_blob0.hash,
missing_blob1.hash,
missing_blob2.hash,
present_blob.hash
},
None
)
.await,
// All partially written blobs should count as missing.
hashset! { missing_blob0.hash, missing_blob1.hash, missing_blob2.hash }
);
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn filter_to_missing_blobs_with_heuristic() {
let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
let client = Client::for_ramdisk(&blobfs);
let missing_hash0 = Hash::from([0; 32]);
let missing_hash1 = Hash::from([1; 32]);
let missing_hash2 = Hash::from([2; 32]);
let missing_hash3 = Hash::from([3; 32]);
let missing_hash4 = Hash::from([4; 32]);
let missing_hash5 = Hash::from([5; 32]);
let missing_hash6 = Hash::from([6; 32]);
let missing_hash7 = Hash::from([7; 32]);
let missing_hash8 = Hash::from([8; 32]);
let missing_hash9 = Hash::from([9; 32]);
let missing_hash10 = Hash::from([10; 32]);
let present_blob0 = fully_write_blob(&client, &[20; 1024]).await;
let present_blob1 = fully_write_blob(&client, &[21; 1024]).await;
let present_blob2 = fully_write_blob(&client, &[22; 1024]).await;
let present_blob3 = fully_write_blob(&client, &[23; 1024]).await;
let present_blob4 = fully_write_blob(&client, &[24; 1024]).await;
let present_blob5 = fully_write_blob(&client, &[25; 1024]).await;
let present_blob6 = fully_write_blob(&client, &[26; 1024]).await;
let present_blob7 = fully_write_blob(&client, &[27; 1024]).await;
let present_blob8 = fully_write_blob(&client, &[28; 1024]).await;
let present_blob9 = fully_write_blob(&client, &[29; 1024]).await;
let present_blob10 = fully_write_blob(&client, &[30; 1024]).await;
assert_eq!(
client
.filter_to_missing_blobs(
// Pass in over 20 candidates to trigger the heuristic.
&hashset! { missing_hash0, missing_hash1, missing_hash2, missing_hash3,
missing_hash4, missing_hash5, missing_hash6, missing_hash7, missing_hash8,
missing_hash9, missing_hash10, present_blob0.hash, present_blob1.hash,
present_blob2.hash, present_blob3.hash, present_blob4.hash,
present_blob5.hash, present_blob6.hash, present_blob7.hash,
present_blob8.hash, present_blob9.hash, present_blob10.hash
},
None
)
.await,
hashset! { missing_hash0, missing_hash1, missing_hash2, missing_hash3,
missing_hash4, missing_hash5, missing_hash6, missing_hash7, missing_hash8,
missing_hash9, missing_hash10
}
);
blobfs.stop().await.unwrap();
}
/// Similar to the above test, except also test that partially written blobs count as missing.
#[fasync::run_singlethreaded(test)]
async fn filter_to_missing_blobs_with_heuristic_and_with_partially_written_blobs() {
let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
let client = Client::for_ramdisk(&blobfs);
// Some blobs are created (but not yet truncated).
let missing_blob0 = open_blob_only(&client, &[0; 1024]).await;
let missing_blob1 = open_blob_only(&client, &[1; 1024]).await;
let missing_blob2 = open_blob_only(&client, &[2; 1024]).await;
// Some are truncated but not written.
let missing_blob3 = open_and_truncate_blob(&client, &[3; 1024]).await;
let missing_blob4 = open_and_truncate_blob(&client, &[4; 1024]).await;
let missing_blob5 = open_and_truncate_blob(&client, &[5; 1024]).await;
// Some are partially written.
let missing_blob6 = partially_write_blob(&client, &[6; 1024]).await;
let missing_blob7 = partially_write_blob(&client, &[7; 1024]).await;
let missing_blob8 = partially_write_blob(&client, &[8; 1024]).await;
// Some aren't even open.
let missing_hash9 = Hash::from([9; 32]);
let missing_hash10 = Hash::from([10; 32]);
let present_blob0 = fully_write_blob(&client, &[20; 1024]).await;
let present_blob1 = fully_write_blob(&client, &[21; 1024]).await;
let present_blob2 = fully_write_blob(&client, &[22; 1024]).await;
let present_blob3 = fully_write_blob(&client, &[23; 1024]).await;
let present_blob4 = fully_write_blob(&client, &[24; 1024]).await;
let present_blob5 = fully_write_blob(&client, &[25; 1024]).await;
let present_blob6 = fully_write_blob(&client, &[26; 1024]).await;
let present_blob7 = fully_write_blob(&client, &[27; 1024]).await;
let present_blob8 = fully_write_blob(&client, &[28; 1024]).await;
let present_blob9 = fully_write_blob(&client, &[29; 1024]).await;
let present_blob10 = fully_write_blob(&client, &[30; 1024]).await;
assert_eq!(
client
.filter_to_missing_blobs(
&hashset! { missing_blob0.hash, missing_blob1.hash, missing_blob2.hash,
missing_blob3.hash, missing_blob4.hash, missing_blob5.hash,
missing_blob6.hash, missing_blob7.hash, missing_blob8.hash,
missing_hash9, missing_hash10, present_blob0.hash,
present_blob1.hash, present_blob2.hash, present_blob3.hash,
present_blob4.hash, present_blob5.hash, present_blob6.hash,
present_blob7.hash, present_blob8.hash, present_blob9.hash,
present_blob10.hash
},
None
)
.await,
// All partially written blobs should count as missing.
hashset! { missing_blob0.hash, missing_blob1.hash, missing_blob2.hash,
missing_blob3.hash, missing_blob4.hash, missing_blob5.hash, missing_blob6.hash,
missing_blob7.hash, missing_blob8.hash, missing_hash9, missing_hash10
}
);
blobfs.stop().await.unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn sync() {
let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
let (client, mut stream) = Client::new_test();
fasync::Task::spawn(async move {
match stream.try_next().await.unwrap().unwrap() {
fio::DirectoryRequest::Sync { responder } => {
counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
responder.send(Ok(())).unwrap();
}
other => panic!("unexpected request: {other:?}"),
}
})
.detach();
assert_matches!(client.sync().await, Ok(()));
assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
}
#[fasync::run_singlethreaded(test)]
async fn open_blob_for_write_uses_fxblob_if_configured() {
let (blob_creator, mut blob_creator_stream) =
fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>().unwrap();
let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>().unwrap();
let client = Client::new(
fidl::endpoints::create_proxy::<fio::DirectoryMarker>().unwrap().0,
Some(blob_creator),
Some(blob_reader),
None,
)
.unwrap();
fuchsia_async::Task::spawn(async move {
match blob_creator_stream.next().await.unwrap().unwrap() {
ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
assert_eq!(hash, [0; 32]);
assert!(!allow_existing);
let () = responder.send(Ok(fidl::endpoints::create_endpoints().0)).unwrap();
}
}
})
.detach();
assert_matches!(
client.open_blob_for_write(&[0; 32].into()).await,
Ok(fpkg::BlobWriter::Writer(_))
);
}
#[fasync::run_singlethreaded(test)]
async fn open_blob_for_write_fxblob_maps_already_exists() {
let (blob_creator, mut blob_creator_stream) =
fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>().unwrap();
let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>().unwrap();
let client = Client::new(
fidl::endpoints::create_proxy::<fio::DirectoryMarker>().unwrap().0,
Some(blob_creator),
Some(blob_reader),
None,
)
.unwrap();
fuchsia_async::Task::spawn(async move {
match blob_creator_stream.next().await.unwrap().unwrap() {
ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
assert_eq!(hash, [0; 32]);
assert!(!allow_existing);
let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
}
}
})
.detach();
assert_matches!(
client.open_blob_for_write(&[0; 32].into()).await,
Err(CreateError::AlreadyExists)
);
}
#[fasync::run_singlethreaded(test)]
async fn concurrent_list_known_blobs_all_return_full_contents() {
use futures::StreamExt;
let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
let client = Client::for_ramdisk(&blobfs);
// ReadDirents returns an 8,192 byte buffer, and each entry is 74 bytes [0] (including 64
// bytes of filename), so use more than 110 entries to guarantee that listing all contents
// requires multiple ReadDirents calls. This isn't necessary to cause conflict, because
// each successful listing requires a call to Rewind as well, but it does make conflict
// more likely.
// [0] https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.io/directory.fidl;l=261;drc=9e84e19d3f42240c46d2b0c3c132c2f0b5a3343f
for i in 0..256u16 {
let _: TestBlob = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
}
let () = futures::stream::iter(0..100)
.for_each_concurrent(None, |_| async {
assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
})
.await;
}
#[fasync::run_singlethreaded(test)]
async fn filter_to_missing_uses_provided_all_known() {
let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
let client = Client::for_ramdisk(&blobfs);
let present_blob = fully_write_blob(&client, &[0; 1024]).await;
// Even though actually present, the written blob will be reported as missing because the
// provided `all_known` is empty.
assert_eq!(
client
.filter_to_missing_blobs(&HashSet::from([present_blob.hash]), Some(&HashSet::new()))
.await,
HashSet::from([present_blob.hash])
);
}
}