blob: 29c9e620c0bd6fce6435ab3462b4800c160d0078 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
blobs::{open_blob, BlobKind, OpenBlob, OpenBlobError, OpenBlobSuccess},
dynamic_index::{fulfill_meta_far_blob, DynamicIndex, DynamicIndexError},
},
anyhow::{anyhow, Context as _, Error},
cobalt_sw_delivery_registry as metrics,
fidl::endpoints::{RequestStream, ServerEnd},
fidl_fuchsia_io::{DirectoryMarker, FileRequest, FileRequestStream},
fidl_fuchsia_pkg::{
BlobInfoIteratorNextResponder, BlobInfoIteratorRequest, BlobInfoIteratorRequestStream,
NeededBlobsMarker, NeededBlobsRequest, NeededBlobsRequestStream, PackageCacheRequest,
PackageCacheRequestStream, PackageIndexEntry, PackageIndexIteratorNextResponder,
PackageIndexIteratorRequest, PackageIndexIteratorRequestStream, PackageUrl,
},
fidl_fuchsia_pkg_ext::{BlobId, BlobInfo, Measurable},
fuchsia_async::Task,
fuchsia_cobalt::CobaltSender,
fuchsia_hash::Hash,
fuchsia_syslog::{fx_log_err, fx_log_info, fx_log_warn},
fuchsia_trace as trace,
fuchsia_zircon::{sys::ZX_CHANNEL_MAX_MSG_BYTES, Status},
futures::{prelude::*, select_biased, stream::FuturesUnordered},
parking_lot::Mutex,
std::{collections::HashSet, sync::Arc},
system_image::StaticPackages,
};
// FIXME(52297) This constant would ideally be exported by the `fidl` crate.
// sizeof(TransactionHeader) + sizeof(VectorHeader)
const FIDL_VEC_RESPONSE_OVERHEAD_BYTES: usize = 32;
pub async fn serve(
pkgfs_versions: pkgfs::versions::Client,
pkgfs_ctl: pkgfs::control::Client,
pkgfs_install: pkgfs::install::Client,
pkgfs_needs: pkgfs::needs::Client,
dynamic_index: Arc<Mutex<DynamicIndex>>,
blobfs: blobfs::Client,
static_packages: Arc<StaticPackages>,
stream: PackageCacheRequestStream,
cobalt_sender: CobaltSender,
) -> Result<(), Error> {
stream
.map_err(anyhow::Error::new)
.try_for_each_concurrent(None, |event| async {
let cobalt_sender = cobalt_sender.clone();
match event {
PackageCacheRequest::Get {
meta_far_blob,
selectors,
needed_blobs,
dir,
responder,
} => {
let meta_far_blob: BlobInfo = meta_far_blob.into();
trace::duration_begin!("app", "cache_get",
"meta_far_blob_id" => meta_far_blob.blob_id.to_string().as_str()
);
let response = get(
&pkgfs_versions,
&pkgfs_install,
&pkgfs_needs,
&dynamic_index,
&blobfs,
meta_far_blob,
selectors,
needed_blobs,
dir,
cobalt_sender,
)
.await;
trace::duration_end!("app", "cache_get",
"status" => Status::from(response).to_string().as_str()
);
responder.send(&mut response.map_err(|status| status.into_raw()))?;
}
PackageCacheRequest::Open { meta_far_blob_id, selectors, dir, responder } => {
let meta_far_blob_id: BlobId = meta_far_blob_id.into();
trace::duration_begin!("app", "cache_open",
"meta_far_blob_id" => meta_far_blob_id.to_string().as_str()
);
let response =
open(&pkgfs_versions, meta_far_blob_id, selectors, dir, cobalt_sender)
.await;
trace::duration_end!("app", "cache_open",
"status" => Status::from(response).to_string().as_str()
);
responder.send(&mut response.map_err(|status| status.into_raw()))?;
}
PackageCacheRequest::BasePackageIndex { iterator, control_handle: _ } => {
let stream = iterator.into_stream()?;
serve_base_package_index(Arc::clone(&static_packages), stream).await;
}
PackageCacheRequest::Sync { responder } => {
responder.send(&mut pkgfs_ctl.sync().await.map_err(|e| {
fx_log_err!("error syncing /pkgfs/ctl: {:#}", anyhow!(e));
Status::INTERNAL.into_raw()
}))?;
}
}
Ok(())
})
.await
}
/// Fetch a package, and optionally open it.
async fn get<'a>(
pkgfs_versions: &'a pkgfs::versions::Client,
pkgfs_install: &'a pkgfs::install::Client,
pkgfs_needs: &'a pkgfs::needs::Client,
dynamic_index: &Arc<Mutex<DynamicIndex>>,
blobfs: &blobfs::Client,
meta_far_blob: BlobInfo,
selectors: Vec<String>,
needed_blobs: ServerEnd<NeededBlobsMarker>,
dir_request: Option<ServerEnd<DirectoryMarker>>,
mut cobalt_sender: CobaltSender,
) -> Result<(), Status> {
if !selectors.is_empty() {
fx_log_warn!("Get() does not support selectors yet");
}
let needed_blobs = needed_blobs.into_stream().map_err(|_| Status::INTERNAL)?;
let pkg = if let Ok(pkg) = pkgfs_versions.open_package(&meta_far_blob.blob_id.into()).await {
// If the package can already be opened, it is already cached.
needed_blobs.control_handle().shutdown_with_epitaph(Status::OK);
pkg
} else {
// Otherwise, go through the process to cache it.
fx_log_info!("fetching {}", meta_far_blob.blob_id);
let () = serve_needed_blobs(
needed_blobs,
meta_far_blob,
pkgfs_install,
pkgfs_needs,
dynamic_index,
blobfs,
)
.await
.map_err(|e| {
fx_log_err!("error while caching package {}: {:#}", meta_far_blob.blob_id, anyhow!(e));
cobalt_sender.log_event_count(
metrics::PKG_CACHE_OPEN_METRIC_ID,
metrics::PkgCacheOpenMetricDimensionResult::Io,
0,
1,
);
Status::UNAVAILABLE
})?;
match pkgfs_versions.open_package(&meta_far_blob.blob_id.into()).await {
Ok(pkg) => Ok(pkg),
Err(err @ pkgfs::versions::OpenError::NotFound) => {
// FIXME(http://fxbug.dev/74213) Remove this when possible
// We just fetched the package, so it should be visible in pkgfs. Log loudly that
// this is unexpected, but try again after 1 second to help narrow down the source
// of the flake here the next time it happens.
fx_log_err!(
"BAD_STATE: error opening package after fetching it {}: {:#}",
meta_far_blob.blob_id,
anyhow!(err)
);
cobalt_sender.log_event_count(
metrics::PKG_CACHE_OPEN_METRIC_ID,
metrics::PkgCacheOpenMetricDimensionResult::Io,
0,
1,
);
let () = fuchsia_async::Timer::new(std::time::Duration::from_secs(1)).await;
match pkgfs_versions.open_package(&meta_far_blob.blob_id.into()).await {
Ok(pkg) => {
fx_log_err!(
"BAD_STATE: attempt 2 of opening package succeeded {}",
meta_far_blob.blob_id,
);
Ok(pkg)
}
Err(err) => {
fx_log_err!(
"BAD_STATE: error opening package after fetching it (attempt 2) {}: {:#}",
meta_far_blob.blob_id,
anyhow!(err)
);
Err(Status::INTERNAL)
}
}
}
Err(err) => {
fx_log_err!(
"error opening package after fetching it {}: {:#}",
meta_far_blob.blob_id,
anyhow!(err)
);
cobalt_sender.log_event_count(
metrics::PKG_CACHE_OPEN_METRIC_ID,
metrics::PkgCacheOpenMetricDimensionResult::Io,
0,
1,
);
Err(Status::INTERNAL)
}
}?
};
if let Some(dir_request) = dir_request {
pkg.reopen(dir_request).map_err(|err| {
fx_log_err!("error reopening {}: {:#}", meta_far_blob.blob_id, anyhow!(err));
cobalt_sender.log_event_count(
metrics::PKG_CACHE_OPEN_METRIC_ID,
metrics::PkgCacheOpenMetricDimensionResult::Io,
0,
1,
);
Status::INTERNAL
})?;
}
cobalt_sender.log_event_count(
metrics::PKG_CACHE_OPEN_METRIC_ID,
metrics::PkgCacheOpenMetricDimensionResult::Success,
0,
1,
);
Ok(())
}
/// Open a package directory.
async fn open<'a>(
pkgfs_versions: &'a pkgfs::versions::Client,
meta_far_blob_id: BlobId,
selectors: Vec<String>,
dir_request: ServerEnd<DirectoryMarker>,
mut cobalt_sender: CobaltSender,
) -> Result<(), Status> {
// FIXME: need to implement selectors.
if !selectors.is_empty() {
fx_log_warn!("Open() does not support selectors yet");
}
let pkg =
pkgfs_versions.open_package(&meta_far_blob_id.into()).await.map_err(|err| match err {
pkgfs::versions::OpenError::NotFound => {
cobalt_sender.log_event_count(
metrics::PKG_CACHE_OPEN_METRIC_ID,
metrics::PkgCacheOpenMetricDimensionResult::NotFound,
0,
1,
);
Status::NOT_FOUND
}
err => {
cobalt_sender.log_event_count(
metrics::PKG_CACHE_OPEN_METRIC_ID,
metrics::PkgCacheOpenMetricDimensionResult::Io,
0,
1,
);
fx_log_err!("error opening {}: {:?}", meta_far_blob_id, err);
Status::INTERNAL
}
})?;
pkg.reopen(dir_request).map_err(|err| {
fx_log_err!("error opening {}: {:#}", meta_far_blob_id, anyhow!(err));
cobalt_sender.log_event_count(
metrics::PKG_CACHE_OPEN_METRIC_ID,
metrics::PkgCacheOpenMetricDimensionResult::Io,
0,
1,
);
Status::INTERNAL
})?;
cobalt_sender.log_event_count(
metrics::PKG_CACHE_OPEN_METRIC_ID,
metrics::PkgCacheOpenMetricDimensionResult::Success,
0,
1,
);
Ok(())
}
#[derive(thiserror::Error, Debug)]
enum ServeNeededBlobsError {
#[error("protocol violation: request stream terminated unexpectedly")]
UnexpectedClose,
#[error("protocol violation: expected {expected} request, got {received}")]
UnexpectedRequest { received: &'static str, expected: &'static str },
#[error("protocol violation: while reading next request")]
ReceiveRequest(#[source] fidl::Error),
#[error("protocol violation: while responding to last request")]
SendResponse(#[source] fidl::Error),
#[error("while opening {context} for write")]
OpenBlob {
context: BlobContext,
#[source]
source: OpenBlobError,
},
#[error("while writing {context}")]
WriteBlob {
context: BlobContext,
#[source]
source: ServeWriteBlobError,
},
#[error("while listing needs")]
ListNeeds(#[source] pkgfs::needs::ListNeedsError),
#[error("the blob {0} is not needed")]
BlobNotNeeded(Hash),
#[error("the operation was aborted by the caller")]
Aborted,
#[error("while updating dynamic index")]
DynamicIndex(#[from] DynamicIndexError),
}
#[derive(Debug)]
struct BlobContext {
kind: BlobKind,
hash: Hash,
}
impl BlobContext {
fn kind_name(&self) -> &'static str {
match self.kind {
BlobKind::Package => "metadata",
BlobKind::Data => "data",
}
}
}
impl std::fmt::Display for BlobContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} blob ({})", self.kind_name(), self.hash)
}
}
/// Implements the fuchsia.pkg.NeededBlobs protocol, which represents the transaction for caching a
/// particular package.
///
/// Clients should start by requesting to `OpenMetaBlob()`, and fetch and write the metadata blob
/// if needed. Once written, `GetMissingBlobs()` should be used to determine which content blobs
/// need fetched and written using `OpenBlob()`. Violating the expected protocol state will result
/// in the channel being closed by the package cache with a `ZX_ERR_BAD_STATE` epitaph and aborting
/// the package cache operation.
///
/// Once all needed blobs are written by the client, the package cache will complete the pending
/// [`PackageCache.Get`] request and close this channel with a `ZX_OK` epitaph.
async fn serve_needed_blobs(
mut stream: NeededBlobsRequestStream,
meta_far_info: BlobInfo,
pkgfs_install: &pkgfs::install::Client,
pkgfs_needs: &pkgfs::needs::Client,
dynamic_index: &Arc<Mutex<DynamicIndex>>,
blobfs: &blobfs::Client,
) -> Result<(), ServeNeededBlobsError> {
let res = async {
// Step 1: Open and write the meta.far, or determine it is not needed.
let () =
handle_open_meta_blob(&mut stream, meta_far_info, pkgfs_install, dynamic_index, blobfs)
.await?;
// Step 2: Determine which data blobs are needed and report them to the client.
let (serve_iterator, needs) =
handle_get_missing_blobs(&mut stream, meta_far_info, pkgfs_needs).await?;
// Step 3: Open and write all needed data blobs.
let () = handle_open_blobs(&mut stream, needs, pkgfs_install).await?;
serve_iterator.await;
Ok(())
}
.await;
// Sanity check consistency with pkgfs needs.
// FIXME(http://fxbug.dev/74213) Remove this when possible
if res.is_ok() {
let needs = pkgfs_needs.list_needs(meta_far_info.blob_id.into());
futures::pin_mut!(needs);
match needs.try_next().await {
Ok(None) => {
// All good. Other cases should be unreachable.
}
Ok(Some(needs)) => {
let mut needs = needs.into_iter().collect::<Vec<_>>();
needs.sort_unstable();
fx_log_err!(
"BAD_STATE: pkgfs needs more blobs, but all were provided: {:#?}",
needs
);
for need in needs {
// open each blob for write. If the blob already exists, this should fulfill
// it in pkgfs as well, unblocking this package fetch. Though, the open that
// happened during the package fetch should have already done that.
match open_blob(pkgfs_install, need, BlobKind::Data).await {
Ok(OpenBlobSuccess::AlreadyExists) => {
fx_log_err!(
"BAD_STATE: already written blob needed opened again to commit"
);
}
Ok(OpenBlobSuccess::Needed(_)) => {
fx_log_err!("BAD_STATE: already written blob needs written again");
}
Err(e) => {
fx_log_err!(
"BAD_STATE: error opening already written blob for write: {:#}",
anyhow!(e)
);
}
}
}
}
Err(e) => {
fx_log_err!(
"BAD_STATE: failed to verify pkgfs needs no more blobs: {:#}",
anyhow!(e)
);
}
};
}
if res.is_ok() {
dynamic_index.lock().complete_install(meta_far_info.blob_id.into())?;
} else {
dynamic_index.lock().cancel_install(&meta_far_info.blob_id.into());
}
// TODO in the Err(_) case, a responder was likely dropped, which would have already shutdown
// the stream without our custom epitaph value. Need to find a nice way to always shutdown
// with a custom epitaph without copy/pasting something to every return site.
let epitaph = match res {
Ok(_) => Status::OK,
Err(_) => Status::BAD_STATE,
};
stream.control_handle().shutdown_with_epitaph(epitaph);
res
}
async fn handle_open_meta_blob(
stream: &mut NeededBlobsRequestStream,
meta_far_info: BlobInfo,
pkgfs_install: &pkgfs::install::Client,
dynamic_index: &Arc<Mutex<DynamicIndex>>,
blobfs: &blobfs::Client,
) -> Result<(), ServeNeededBlobsError> {
let hash = meta_far_info.blob_id.into();
dynamic_index.lock().start_install(hash);
loop {
let (file, responder) =
match stream.try_next().await.map_err(ServeNeededBlobsError::ReceiveRequest)? {
Some(NeededBlobsRequest::OpenMetaBlob { file, responder }) => Ok((file, responder)),
Some(NeededBlobsRequest::Abort { responder: _ }) => {
Err(ServeNeededBlobsError::Aborted)
}
Some(other) => Err(ServeNeededBlobsError::UnexpectedRequest {
received: other.method_name(),
expected: "open_meta_blob",
}),
None => Err(ServeNeededBlobsError::UnexpectedClose),
}?;
let file_stream = file.into_stream().map_err(ServeNeededBlobsError::ReceiveRequest)?;
match open_write_blob(file_stream, responder, pkgfs_install, hash, BlobKind::Package).await
{
Ok(()) => break,
Err(OpenWriteBlobError::Serve(e)) => return Err(e),
Err(OpenWriteBlobError::NonFatalWrite(e)) => {
fx_log_warn!("Non-fatal error while writing metadata blob: {:#}", anyhow!(e));
continue;
}
}
}
fulfill_meta_far_blob(dynamic_index, blobfs, hash).await?;
Ok(())
}
async fn handle_get_missing_blobs(
stream: &mut NeededBlobsRequestStream,
meta_far_info: BlobInfo,
pkgfs_needs: &pkgfs::needs::Client,
) -> Result<(Task<()>, HashSet<Hash>), ServeNeededBlobsError> {
let iterator = match stream.try_next().await.map_err(ServeNeededBlobsError::ReceiveRequest)? {
Some(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ }) => Ok(iterator),
Some(NeededBlobsRequest::Abort { responder: _ }) => Err(ServeNeededBlobsError::Aborted),
Some(other) => Err(ServeNeededBlobsError::UnexpectedRequest {
received: other.method_name(),
expected: "get_missing_blobs",
}),
None => Err(ServeNeededBlobsError::UnexpectedClose),
}?;
let iter_stream = iterator.into_stream().map_err(ServeNeededBlobsError::ReceiveRequest)?;
// list_needs produces a stream that produces the full set of currently missing blobs on-demand
// as items are read from the stream. We are only interested in querying the needs once, so we
// only need to read 1 item and can then drop the stream.
let needs = pkgfs_needs.list_needs(meta_far_info.blob_id.into());
futures::pin_mut!(needs);
let needs = match needs.try_next().await.map_err(ServeNeededBlobsError::ListNeeds)? {
Some(needs) => {
let mut needs = needs
.into_iter()
.map(|hash| BlobInfo { blob_id: hash.into(), length: 0 })
.collect::<Vec<_>>();
// The needs provided by the stream are stored in a HashSet, so needs are in an
// unspecified order here. Provide a deterministic ordering to test/callers by sorting
// on merkle root.
needs.sort_unstable();
needs
}
None => vec![],
};
// Start serving the iterator in the background and internally move on to the next state. If
// this foreground task decides to bail out, this spawned task will be dropped which will abort
// the iterator serving task.
let serve_iterator = Task::spawn(serve_blob_info_iterator(
needs.iter().cloned().map(Into::into).collect::<Vec<fidl_fuchsia_pkg::BlobInfo>>(),
iter_stream,
));
let needs = needs.into_iter().map(|need| need.blob_id.into()).collect::<HashSet<Hash>>();
Ok((serve_iterator, needs))
}
async fn handle_open_blobs(
stream: &mut NeededBlobsRequestStream,
mut needs: HashSet<Hash>,
pkgfs_install: &pkgfs::install::Client,
) -> Result<(), ServeNeededBlobsError> {
let mut running = FuturesUnordered::new();
// `needs` represents needed blobs that aren't currently being written
// `running` represents needed blobs currently being written
// A blob write that fails with a retryable error can allow a blob to transition back from
// `running` to `needs`.
// Once both needs and running are empty, all needed blobs are now present.
while !(running.is_empty() && needs.is_empty()) {
#[derive(Debug)]
enum Event {
WriteBlobDone((Hash, Result<(), OpenWriteBlobError>)),
Request(Option<NeededBlobsRequest>),
}
// Wait for the next request/event to happen, giving priority to handling blob write
// completion events to new incoming requests.
let event = select_biased! {
res = running.select_next_some() => Event::WriteBlobDone(res),
req = stream.try_next() => Event::Request(req.map_err(ServeNeededBlobsError::ReceiveRequest)?),
};
match event {
Event::Request(Some(NeededBlobsRequest::OpenBlob { blob_id, file, responder })) => {
let blob_id = Hash::from(BlobId::from(blob_id));
// Make sure the blob is still needed/isn't being written already.
if !needs.remove(&blob_id) {
return Err(ServeNeededBlobsError::BlobNotNeeded(blob_id));
}
let file_stream =
file.into_stream().map_err(ServeNeededBlobsError::ReceiveRequest)?;
// Do the actual async work of opening the blob for write and serving the write
// calls in a separate Future so this loop can run this Future and handle new
// requests concurrently.
let task =
open_write_blob(file_stream, responder, pkgfs_install, blob_id, BlobKind::Data);
running.push(async move { (blob_id, task.await) });
continue;
}
Event::Request(Some(NeededBlobsRequest::Abort { responder })) => {
drop(responder);
return Err(ServeNeededBlobsError::Aborted);
}
Event::Request(Some(other)) => {
return Err(ServeNeededBlobsError::UnexpectedRequest {
received: other.method_name(),
expected: "open_blob",
})
}
Event::Request(None) => {
return Err(ServeNeededBlobsError::UnexpectedClose);
}
Event::WriteBlobDone((_, Ok(()))) => {
continue;
}
Event::WriteBlobDone((_, Err(OpenWriteBlobError::Serve(e)))) => {
return Err(e);
}
Event::WriteBlobDone((hash, Err(OpenWriteBlobError::NonFatalWrite(e)))) => {
// TODO serve_write_blob will notify the client of the error before this task
// finishes, so it is possible for a client to retry a blob fetch before this task
// re-registers the blob as needed, which would be a race condition if the
// pkg-resolver didn't just abort package fetches on any error.
fx_log_warn!("Non-fatal error while writing content blob: {:#}", anyhow!(e));
needs.insert(hash);
continue;
}
}
}
Ok(())
}
#[derive(Debug)]
enum OpenWriteBlobError {
NonFatalWrite(ServeWriteBlobError),
Serve(ServeNeededBlobsError),
}
impl From<ServeNeededBlobsError> for OpenWriteBlobError {
fn from(e: ServeNeededBlobsError) -> Self {
OpenWriteBlobError::Serve(e)
}
}
// Allow a function to generically respond to either an OpenMetaBlob or OpenBlob request.
type OpenBlobResponse = Result<bool, fidl_fuchsia_pkg::OpenBlobError>;
trait OpenBlobResponder {
fn send(self, res: OpenBlobResponse) -> Result<(), fidl::Error>;
}
impl OpenBlobResponder for fidl_fuchsia_pkg::NeededBlobsOpenBlobResponder {
fn send(self, mut res: OpenBlobResponse) -> Result<(), fidl::Error> {
self.send(&mut res)
}
}
impl OpenBlobResponder for fidl_fuchsia_pkg::NeededBlobsOpenMetaBlobResponder {
fn send(self, mut res: OpenBlobResponse) -> Result<(), fidl::Error> {
self.send(&mut res)
}
}
async fn open_write_blob(
file_stream: FileRequestStream,
responder: impl OpenBlobResponder,
pkgfs_install: &pkgfs::install::Client,
blob_id: Hash,
kind: BlobKind,
) -> Result<(), OpenWriteBlobError> {
let open_res = open_blob(pkgfs_install, blob_id, kind).await;
// Always respond to the Open[Meta]Blob request, then worry about actually handling the result.
responder
.send(match &open_res {
Ok(OpenBlobSuccess::Needed(_)) => Ok(true),
Ok(OpenBlobSuccess::AlreadyExists) => Ok(false),
Err(OpenBlobError::ConcurrentWrite) => {
Err(fidl_fuchsia_pkg::OpenBlobError::ConcurrentWrite)
}
Err(OpenBlobError::Io(_)) => Err(fidl_fuchsia_pkg::OpenBlobError::UnspecifiedIo),
})
.map_err(ServeNeededBlobsError::SendResponse)?;
match open_res {
Ok(OpenBlobSuccess::Needed(blob)) => {
serve_write_blob(file_stream, blob).await.map_err(|e| {
if e.is_fatal() {
OpenWriteBlobError::Serve(ServeNeededBlobsError::WriteBlob {
context: BlobContext { kind, hash: blob_id },
source: e,
})
} else {
OpenWriteBlobError::NonFatalWrite(e)
}
})
}
Ok(OpenBlobSuccess::AlreadyExists) => Ok(()),
Err(OpenBlobError::ConcurrentWrite) => {
Err(OpenWriteBlobError::NonFatalWrite(ServeWriteBlobError::ConcurrentWrite))
}
Err(e @ OpenBlobError::Io(_)) => Err(ServeNeededBlobsError::OpenBlob {
context: BlobContext { kind, hash: blob_id },
source: e,
}
.into()),
}
}
#[derive(thiserror::Error, Debug)]
enum ServeWriteBlobError {
#[error("protocol violation: file request stream terminated unexpectedly")]
UnexpectedClose,
#[error("protocol violation: file request stream fidl error")]
Fidl(#[source] fidl::Error),
#[error("protocol violation: expected {expected} request, got {received}")]
UnexpectedRequest { received: &'static str, expected: &'static str },
#[error("insufficient storage space is available")]
NoSpace,
#[error("the provided blob data is corrupt")]
Corrupt,
#[error("the blob is in the process of being written")]
ConcurrentWrite,
#[error("while truncating the blob")]
Truncate(#[source] pkgfs::install::BlobTruncateError),
#[error("while writing to the blob")]
Write(#[source] pkgfs::install::BlobWriteError),
}
impl From<pkgfs::install::BlobTruncateError> for ServeWriteBlobError {
fn from(e: pkgfs::install::BlobTruncateError) -> Self {
match e {
pkgfs::install::BlobTruncateError::NoSpace => ServeWriteBlobError::NoSpace,
e => ServeWriteBlobError::Truncate(e),
}
}
}
impl From<pkgfs::install::BlobWriteError> for ServeWriteBlobError {
fn from(e: pkgfs::install::BlobWriteError) -> Self {
match e {
pkgfs::install::BlobWriteError::NoSpace => ServeWriteBlobError::NoSpace,
pkgfs::install::BlobWriteError::Corrupt => ServeWriteBlobError::Corrupt,
e => ServeWriteBlobError::Write(e),
}
}
}
impl ServeWriteBlobError {
/// Determines if this error should cancel the associated Get() operation (true) or should
/// allow the NeededBlobs client retry the operation later (false).
fn is_fatal(&self) -> bool {
match self {
ServeWriteBlobError::UnexpectedClose => false,
ServeWriteBlobError::Fidl(_) => true,
ServeWriteBlobError::UnexpectedRequest { .. } => true,
ServeWriteBlobError::NoSpace => false,
ServeWriteBlobError::Corrupt => false,
ServeWriteBlobError::ConcurrentWrite => false,
ServeWriteBlobError::Truncate(_) => true,
ServeWriteBlobError::Write(_) => true,
}
}
}
async fn serve_write_blob(
mut stream: FileRequestStream,
blob: OpenBlob,
) -> Result<(), ServeWriteBlobError> {
use pkgfs::install::{
Blob, BlobTruncateError, BlobWriteError, BlobWriteSuccess, NeedsData, NeedsTruncate,
};
let OpenBlob { blob, closer } = blob;
enum State {
ExpectTruncate(Blob<NeedsTruncate>),
ExpectData(Blob<NeedsData>),
ExpectClose,
}
impl State {
fn expectation(&self) -> &'static str {
match self {
State::ExpectTruncate(_) => "truncate",
State::ExpectData(_) => "write",
State::ExpectClose => "close",
}
}
}
// Allow the inner task to sometimes close the underlying blob early while also unconditionally
// calling close after the inner task completes. Close closes the underlying blob the first
// time it is called and becomes a no-op on later calls.
let mut closer = Some(closer);
let mut close = || {
let closer = closer.take().map(|closer| closer.close());
async move {
match closer {
Some(closer) => closer.await,
None => {}
}
}
};
let mut state = State::ExpectTruncate(blob);
let task = async {
while let Some(request) = stream.try_next().await.map_err(ServeWriteBlobError::Fidl)? {
state = match (request, state) {
(FileRequest::Truncate { length, responder }, State::ExpectTruncate(blob)) => {
let res = blob.truncate(length).await;
// Interpret responding errors as the stream closing unexpectedly.
let _ = responder.send(
match &res {
Ok(_) => Status::OK,
Err(BlobTruncateError::NoSpace) => Status::NO_SPACE,
Err(BlobTruncateError::Fidl(_))
| Err(BlobTruncateError::UnexpectedResponse(_)) => Status::INTERNAL,
}
.into_raw(),
);
let blob = res?;
// The empty blob needs no data and is complete after it is truncated.
match length {
0 => State::ExpectClose,
_ => State::ExpectData(blob),
}
}
(FileRequest::Write { data, responder }, State::ExpectData(blob)) => {
let res = blob.write(&data).await;
let _ = responder.send(
match &res {
Ok(_) => Status::OK,
Err(BlobWriteError::NoSpace) => Status::NO_SPACE,
Err(BlobWriteError::Corrupt) => Status::IO_DATA_INTEGRITY,
Err(BlobWriteError::Overwrite) => Status::IO,
Err(BlobWriteError::Fidl(_))
| Err(BlobWriteError::UnexpectedResponse(_)) => Status::INTERNAL,
}
.into_raw(),
data.len() as u64,
);
match res? {
BlobWriteSuccess::MoreToWrite(blob) => State::ExpectData(blob),
BlobWriteSuccess::Done => State::ExpectClose,
}
}
// Close is allowed in any state, but the blob is only written if we were expecting
// a close.
(FileRequest::Close { responder }, State::ExpectClose) => {
close().await;
let _ = responder.send(Status::OK.into_raw());
return Ok(());
}
(FileRequest::Close { responder }, _) => {
close().await;
let _ = responder.send(Status::OK.into_raw());
return Err(ServeWriteBlobError::UnexpectedClose);
}
(request, state) => {
return Err(ServeWriteBlobError::UnexpectedRequest {
received: request.method_name(),
expected: state.expectation(),
});
}
};
}
match state {
State::ExpectClose => Ok(()),
_ => Err(ServeWriteBlobError::UnexpectedClose),
}
};
// Handle the request stream, then close the blob, then close the stream to avoid retry races
// creating a blob that is still open.
let res = task.await;
close().await;
drop(stream);
res
}
/// Helper to split a slice of items into chunks that will fit in a single FIDL vec response.
///
/// Note, Chunker assumes the fixed overhead of a single fidl response header and a single vec
/// header per chunk. It must not be used with more complex responses.
struct Chunker<'a, I> {
items: &'a mut [I],
}
impl<'a, I> Chunker<'a, I>
where
I: Measurable,
{
fn new(items: &'a mut [I]) -> Self {
Self { items }
}
/// Produce the next chunk of items to respond with. Iteration stops when this method returns
/// an empty slice, which occurs when either:
/// * All items have been returned
/// * Chunker encounters an item so large that it cannot even be stored in a response
/// dedicated to just that one item.
///
/// Once next() returns an empty slice, it will continue to do so in future calls.
fn next(&mut self) -> &'a mut [I] {
let mut bytes_used: usize = FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
let mut entry_count = 0;
for entry in &*self.items {
bytes_used += entry.measure();
if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize {
break;
}
entry_count += 1;
}
// tmp/swap dance to appease the borrow checker.
let tmp = std::mem::replace(&mut self.items, &mut []);
let (chunk, rest) = tmp.split_at_mut(entry_count);
self.items = rest;
chunk
}
}
/// A FIDL request stream for a FIDL protocol following the iterator pattern.
trait FidlIteratorRequestStream:
fidl::endpoints::RequestStream + TryStream<Error = fidl::Error>
{
type Responder: FidlIteratorNextResponder;
fn request_to_responder(request: <Self as TryStream>::Ok) -> Self::Responder;
}
/// A responder to a Next() request for a FIDL iterator.
trait FidlIteratorNextResponder {
type Item: Measurable + fidl::encoding::Encodable;
fn send_chunk(self, chunk: &mut [Self::Item]) -> Result<(), fidl::Error>;
}
impl FidlIteratorRequestStream for PackageIndexIteratorRequestStream {
type Responder = PackageIndexIteratorNextResponder;
fn request_to_responder(request: PackageIndexIteratorRequest) -> Self::Responder {
let PackageIndexIteratorRequest::Next { responder } = request;
responder
}
}
impl FidlIteratorNextResponder for PackageIndexIteratorNextResponder {
type Item = PackageIndexEntry;
fn send_chunk(self, chunk: &mut [Self::Item]) -> Result<(), fidl::Error> {
self.send(&mut chunk.iter_mut())
}
}
impl FidlIteratorRequestStream for BlobInfoIteratorRequestStream {
type Responder = BlobInfoIteratorNextResponder;
fn request_to_responder(request: BlobInfoIteratorRequest) -> Self::Responder {
let BlobInfoIteratorRequest::Next { responder } = request;
responder
}
}
impl FidlIteratorNextResponder for BlobInfoIteratorNextResponder {
type Item = fidl_fuchsia_pkg::BlobInfo;
fn send_chunk(self, chunk: &mut [Self::Item]) -> Result<(), fidl::Error> {
self.send(&mut chunk.iter_mut())
}
}
/// Serves the provided `FidlIteratorRequestStream` with as many entries per `Next()` request as
/// will fit in a fidl message. The task completes after yielding an empty response or the iterator
/// is interrupted (client closes the channel or this task encounters a FIDL layer error).
fn serve_fidl_iterator<I>(
mut items: impl AsMut<[<I::Responder as FidlIteratorNextResponder>::Item]>,
mut stream: I,
) -> impl Future<Output = ()>
where
I: FidlIteratorRequestStream,
{
async move {
let mut items = Chunker::new(items.as_mut());
loop {
let mut chunk = items.next();
let responder =
match stream.try_next().await.context("while waiting for next() request")? {
None => break,
Some(request) => I::request_to_responder(request),
};
let () = responder.send_chunk(&mut chunk).context("while responding")?;
// Yield a single empty chunk, then stop serving the protocol.
if chunk.is_empty() {
break;
}
}
Ok(())
}
.unwrap_or_else(|e: anyhow::Error| {
fx_log_err!(
"error serving {} protocol: {:#}",
<I::Service as fidl::endpoints::ServiceMarker>::DEBUG_NAME,
anyhow!(e)
)
})
}
/// Serves the `PackageIndexIteratorRequestStream` with as many entries per request as will fit in
/// a fidl message.
async fn serve_base_package_index(
static_packages: Arc<StaticPackages>,
stream: PackageIndexIteratorRequestStream,
) {
let package_entries = static_packages
.contents()
.map(|(path, hash)| PackageIndexEntry {
package_url: PackageUrl { url: format!("fuchsia-pkg://fuchsia.com/{}", path.name()) },
meta_far_blob_id: BlobId::from(hash.clone()).into(),
})
.collect::<Vec<PackageIndexEntry>>();
serve_fidl_iterator(package_entries, stream).await
}
/// Serves the `BlobInfoIteratorRequestStream` with as many entries per request as will fit in a
/// fidl message.
#[cfg_attr(not(test), allow(dead_code))]
async fn serve_blob_info_iterator(
items: impl AsMut<[fidl_fuchsia_pkg::BlobInfo]>,
stream: BlobInfoIteratorRequestStream,
) {
serve_fidl_iterator(items, stream).await
}
#[cfg(test)]
mod iter_tests {
use {
super::*,
fidl_fuchsia_pkg::{BlobInfoIteratorMarker, PackageIndexIteratorMarker},
fuchsia_async::Task,
fuchsia_hash::HashRangeFull,
fuchsia_pkg::PackagePath,
proptest::prelude::*,
};
proptest! {
#[test]
fn blob_info_iterator_yields_expected_entries(items: Vec<BlobInfo>) {
let mut executor = fuchsia_async::Executor::new().unwrap();
executor.run_singlethreaded(async move {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>().unwrap();
let mut actual_items = vec![];
let ((), ()) = future::join(
async {
let items = items
.iter()
.cloned()
.map(fidl_fuchsia_pkg::BlobInfo::from)
.collect::<Vec<_>>();
serve_blob_info_iterator(items, stream).await
},
async {
loop {
let chunk = proxy.next().await.unwrap();
if chunk.is_empty() {
break;
}
let chunk = chunk.into_iter().map(BlobInfo::from);
actual_items.extend(chunk);
}
},
)
.await;
assert_eq!(items, actual_items);
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Byte(u8);
impl Measurable for Byte {
fn measure(&self) -> usize {
1
}
}
#[test]
fn chunker_fuses() {
let items = &mut [Byte(42)];
let mut chunker = Chunker::new(items);
assert_eq!(chunker.next(), &mut [Byte(42)]);
assert_eq!(chunker.next(), &mut []);
assert_eq!(chunker.next(), &mut []);
}
#[test]
fn chunker_chunks_at_expected_boundary() {
const BYTES_PER_CHUNK: usize =
ZX_CHANNEL_MAX_MSG_BYTES as usize - FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
// Expect to fill 2 full chunks with 1 item left over.
let mut items =
(0..=(BYTES_PER_CHUNK as u64 * 2)).map(|n| Byte(n as u8)).collect::<Vec<Byte>>();
let expected = items.clone();
let mut chunker = Chunker::new(&mut items);
let mut actual: Vec<Byte> = vec![];
for _ in 0..2 {
let chunk = chunker.next();
assert_eq!(chunk.len(), BYTES_PER_CHUNK);
actual.extend(&*chunk);
}
let chunk = chunker.next();
assert_eq!(chunk.len(), 1);
actual.extend(&*chunk);
assert_eq!(actual, expected);
}
#[test]
fn chunker_terminates_at_too_large_item() {
#[derive(Debug, PartialEq, Eq)]
struct TooBig;
impl Measurable for TooBig {
fn measure(&self) -> usize {
ZX_CHANNEL_MAX_MSG_BYTES as usize
}
}
let items = &mut [TooBig];
let mut chunker = Chunker::new(items);
assert_eq!(chunker.next(), &mut []);
}
const PACKAGE_INDEX_CHUNK_SIZE_MAX: usize = 818;
const PACKAGE_INDEX_CHUNK_SIZE_MIN: usize = 372;
#[test]
fn verify_fidl_vec_response_overhead() {
let vec_response_overhead = {
use fidl::encoding::{TransactionHeader, TransactionMessage};
let mut nop: Vec<()> = vec![];
let mut msg =
TransactionMessage { header: TransactionHeader::new(0, 0), body: &mut nop };
fidl::encoding::with_tls_encoded(&mut msg, |bytes, _handles| {
Result::<_, fidl::Error>::Ok(bytes.len())
})
.unwrap()
};
assert_eq!(vec_response_overhead, FIDL_VEC_RESPONSE_OVERHEAD_BYTES);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn base_package_index_iterator_paginates_shortest_entries() {
let names = ('a'..='z').cycle().map(|c| c.to_string());
let paths = names.map(|name| PackagePath::from_name_and_variant(name, "0").unwrap());
verify_base_package_iterator_pagination(paths, PACKAGE_INDEX_CHUNK_SIZE_MAX).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn base_package_index_iterator_paginates_longest_entries() {
let names = ('a'..='z')
.map(|c| std::iter::repeat(c).take(PackagePath::MAX_NAME_BYTES).collect::<String>())
.cycle();
let paths = names.map(|name| PackagePath::from_name_and_variant(name, "0").unwrap());
verify_base_package_iterator_pagination(paths, PACKAGE_INDEX_CHUNK_SIZE_MIN).await;
}
async fn verify_base_package_iterator_pagination(
paths: impl Iterator<Item = PackagePath>,
expected_chunk_size: usize,
) {
let static_packages =
paths.zip(HashRangeFull::default()).take(expected_chunk_size * 2).collect();
let static_packages = Arc::new(StaticPackages::from_entries(static_packages));
let (iter, stream) =
fidl::endpoints::create_proxy_and_stream::<PackageIndexIteratorMarker>().unwrap();
let task = Task::local(serve_base_package_index(static_packages, stream));
let chunk = iter.next().await.unwrap();
assert_eq!(chunk.len(), expected_chunk_size);
let chunk = iter.next().await.unwrap();
assert_eq!(chunk.len(), expected_chunk_size);
let chunk = iter.next().await.unwrap();
assert_eq!(chunk.len(), 0);
let () = task.await;
}
}
#[cfg(test)]
mod serve_needed_blobs_tests {
use {
super::*,
crate::test_utils::add_meta_far_to_blobfs,
fidl_fuchsia_io::FileMarker,
fidl_fuchsia_pkg::{BlobInfoIteratorMarker, BlobInfoIteratorProxy, NeededBlobsProxy},
fuchsia_hash::HashRangeFull,
matches::assert_matches,
std::collections::BTreeSet,
};
#[fuchsia_async::run_singlethreaded(test)]
async fn start_stop() {
let (_, stream) = fidl::endpoints::create_proxy_and_stream::<NeededBlobsMarker>().unwrap();
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (pkgfs_install, _) = pkgfs::install::Client::new_test();
let (pkgfs_needs, _) = pkgfs::needs::Client::new_test();
let (blobfs, _) = blobfs::Client::new_test();
let dynamic_index = Arc::new(Mutex::new(DynamicIndex::new()));
assert_matches!(
serve_needed_blobs(
stream,
meta_blob_info,
&pkgfs_install,
&pkgfs_needs,
&dynamic_index,
&blobfs
)
.await,
Err(ServeNeededBlobsError::UnexpectedClose)
);
}
fn spawn_serve_needed_blobs_with_mocks(
meta_blob_info: BlobInfo,
) -> (
Task<Result<(), ServeNeededBlobsError>>,
NeededBlobsProxy,
pkgfs::install::Mock,
pkgfs::needs::Mock,
fuchsia_pkg_testing::blobfs::Fake,
) {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<NeededBlobsMarker>().unwrap();
let (pkgfs_install, pkgfs_install_mock) = pkgfs::install::Client::new_mock();
let (pkgfs_needs, pkgfs_needs_mock) = pkgfs::needs::Client::new_mock();
let (blobfs_fake, blobfs) = fuchsia_pkg_testing::blobfs::Fake::new();
let dynamic_index = Arc::new(Mutex::new(DynamicIndex::new()));
(
Task::spawn(async move {
serve_needed_blobs(
stream,
meta_blob_info,
&pkgfs_install,
&pkgfs_needs,
&dynamic_index,
&blobfs,
)
.await
}),
proxy,
pkgfs_install_mock,
pkgfs_needs_mock,
blobfs_fake,
)
}
struct FakeOpenBlobResponse(Option<OpenBlobResponse>);
struct FakeOpenBlobResponder<'a> {
// Response is written to through send(). It is never intended to read.
#[allow(dead_code)]
response: &'a mut FakeOpenBlobResponse,
}
impl FakeOpenBlobResponse {
fn new() -> Self {
Self(None)
}
fn responder(&mut self) -> FakeOpenBlobResponder<'_> {
FakeOpenBlobResponder { response: self }
}
fn take(self) -> OpenBlobResponse {
self.0.unwrap()
}
}
impl OpenBlobResponder for FakeOpenBlobResponder<'_> {
fn send(self, res: OpenBlobResponse) -> Result<(), fidl::Error> {
self.response.0 = Some(res);
Ok(())
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open_write_blob_handles_io_open_error() {
// Provide open_write_blob a closed pkgfs and file stream to trigger a PEER_CLOSED IO
// error.
let (_, file_stream) = fidl::endpoints::create_request_stream::<FileMarker>().unwrap();
let (pkgfs_install, _) = pkgfs::install::Client::new_test();
let mut response = FakeOpenBlobResponse::new();
let res = open_write_blob(
file_stream,
response.responder(),
&pkgfs_install,
[0; 32].into(),
BlobKind::Package,
)
.await;
// The operation should fail, and it should report the failure to the fidl responder.
assert_matches!(
res,
Err(OpenWriteBlobError::Serve(ServeNeededBlobsError::OpenBlob {
context: BlobContext { kind: BlobKind::Package, .. },
source: OpenBlobError::Io(_),
}))
);
assert_eq!(response.take(), Err(fidl_fuchsia_pkg::OpenBlobError::UnspecifiedIo));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn expects_open_meta_blob() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, pkgfs_install, pkgfs_needs, _blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
let (iter, iter_server_end) =
fidl::endpoints::create_proxy::<BlobInfoIteratorMarker>().unwrap();
proxy.get_missing_blobs(iter_server_end).unwrap();
assert_matches!(iter.next().await, Err(_));
assert_matches!(
task.await,
Err(ServeNeededBlobsError::UnexpectedRequest {
received: "get_missing_blobs",
expected: "open_meta_blob"
})
);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn expects_open_meta_blob_once() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 4 };
let (task, proxy, mut pkgfs_install, pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
// Open a needed meta FAR blob and write it.
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob([0; 32].into(), BlobKind::Package.into())
.await
.expect_payload(b"test")
.await;
add_meta_far_to_blobfs(&blobfs, [0; 32], "fake-package", vec![]);
},
async {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(proxy.open_meta_blob(blob_server_end).await, Ok(Ok(true)));
let _ = blob.truncate(4).await;
let _ = blob.write(b"test").await;
let _ = blob.close().await;
},
)
.await;
// Trying to open the meta FAR blob again after writing it successfully is a protocol violation.
let (_blob, blob_server_end) = fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(proxy.open_meta_blob(blob_server_end).await, Err(_));
assert_matches!(
task.await,
Err(ServeNeededBlobsError::UnexpectedRequest {
received: "open_meta_blob",
expected: "get_missing_blobs"
})
);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn handles_present_meta_blob() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
add_meta_far_to_blobfs(&blobfs, [0; 32], "fake-package", vec![]);
// Try to open the meta FAR blob, but report it is no longer needed.
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob([0; 32].into(), BlobKind::Package.into())
.await
.fail_open_with_already_exists();
},
async {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(proxy.open_meta_blob(blob_server_end).await, Ok(Ok(false)));
assert_matches!(blob.truncate(0).await, Err(_));
},
)
.await;
// Trying to open the meta FAR blob again after being told it is not needed is a protocol
// violation.
let (_blob, blob_server_end) = fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(proxy.open_meta_blob(blob_server_end).await, Err(_));
assert_matches!(
task.await,
Err(ServeNeededBlobsError::UnexpectedRequest {
received: "open_meta_blob",
expected: "get_missing_blobs"
})
);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn allows_retrying_nonfatal_open_meta_blob_errors() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 1 };
let (task, proxy, mut pkgfs_install, pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
// Try to open the meta FAR blob, but report it is already being written concurrently.
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob([0; 32].into(), BlobKind::Package.into())
.await
.fail_open_with_concurrent_write();
},
async {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(
proxy.open_meta_blob(blob_server_end).await,
Ok(Err(fidl_fuchsia_pkg::OpenBlobError::ConcurrentWrite))
);
assert_matches!(blob.truncate(1).await, Err(_));
},
)
.await;
// Try to write the meta FAR blob, but report the written contents are corrupt.
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob([0; 32].into(), BlobKind::Package.into())
.await
.fail_write_with_corrupt()
.await;
},
async {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(proxy.open_meta_blob(blob_server_end).await, Ok(Ok(true)));
let _ = blob.truncate(1).await;
let _ = blob.write(&mut [0]).await;
let _ = blob.close().await;
},
)
.await;
// Open the meta FAR blob for write, but then close it (a non-fatal error)
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob([0; 32].into(), BlobKind::Package.into())
.await
.expect_close()
.await;
},
async {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(proxy.open_meta_blob(blob_server_end).await, Ok(Ok(true)));
let _ = blob.close().await;
},
)
.await;
// Operation succeeds after pkgfs cooperates.
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob([0; 32].into(), BlobKind::Package.into())
.await
.expect_payload(&[0])
.await;
add_meta_far_to_blobfs(&blobfs, [0; 32], "fake-package", vec![]);
},
async {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(proxy.open_meta_blob(blob_server_end).await, Ok(Ok(true)));
let _ = blob.truncate(1).await;
let _ = blob.write(&mut [0]).await;
let _ = blob.close().await;
},
)
.await;
// Task moves to next state after retried write operation succeeds.
let (_blob, blob_server_end) = fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(proxy.open_meta_blob(blob_server_end).await, Err(_));
assert_matches!(
task.await,
Err(ServeNeededBlobsError::UnexpectedRequest {
received: "open_meta_blob",
expected: "get_missing_blobs"
})
);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
pub(super) async fn write_meta_blob(
proxy: &NeededBlobsProxy,
pkgfs_install: &mut pkgfs::install::Mock,
blobfs: &fuchsia_pkg_testing::blobfs::Fake,
meta_blob_info: BlobInfo,
needed_blobs: impl IntoIterator<Item = Hash>,
) {
add_meta_far_to_blobfs(blobfs, meta_blob_info.blob_id, "fake-package", needed_blobs);
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob(meta_blob_info.blob_id.into(), BlobKind::Package.into())
.await
.fail_open_with_already_exists();
},
async {
let (_blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(proxy.open_meta_blob(blob_server_end).await, Ok(Ok(false)));
},
)
.await;
}
async fn collect_blob_info_iterator(proxy: BlobInfoIteratorProxy) -> Vec<BlobInfo> {
let mut res = vec![];
loop {
let chunk = proxy.next().await.unwrap();
if chunk.is_empty() {
break;
}
res.extend(chunk.into_iter().map(BlobInfo::from));
}
res
}
#[fuchsia_async::run_singlethreaded(test)]
async fn discovers_and_reports_missing_blobs() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, mut pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, vec![]).await;
let expected = HashRangeFull::default().take(2000).collect::<Vec<_>>();
let ((), ()) = future::join(
async {
pkgfs_needs
.expect_enumerate_needs([0; 32].into())
.await
.enumerate_needs(
expected.iter().cloned().map(Into::into).collect::<BTreeSet<_>>(),
)
.await;
},
async {
let (missing_blobs_iter, missing_blobs_iter_server_end) =
fidl::endpoints::create_proxy::<BlobInfoIteratorMarker>().unwrap();
assert_matches!(proxy.get_missing_blobs(missing_blobs_iter_server_end), Ok(()));
let missing_blobs = collect_blob_info_iterator(missing_blobs_iter).await;
let expected = expected
.iter()
.cloned()
.map(|hash| BlobInfo { blob_id: hash.into(), length: 0 })
.collect::<Vec<_>>();
assert_eq!(missing_blobs, expected);
},
)
.await;
drop(proxy);
assert_matches!(task.await, Err(ServeNeededBlobsError::UnexpectedClose));
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn handles_no_missing_blobs() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, mut pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, vec![]).await;
let ((), ()) = future::join(
async {
pkgfs_needs
.expect_enumerate_needs([0; 32].into())
.await
.fail_open_with_not_found()
.await;
pkgfs_needs
.expect_enumerate_needs([0; 32].into())
.await
.fail_open_with_not_found()
.await;
},
async {
let (missing_blobs_iter, missing_blobs_iter_server_end) =
fidl::endpoints::create_proxy::<BlobInfoIteratorMarker>().unwrap();
assert_matches!(proxy.get_missing_blobs(missing_blobs_iter_server_end), Ok(()));
let missing_blobs = collect_blob_info_iterator(missing_blobs_iter).await;
assert_eq!(missing_blobs, vec![]);
},
)
.await;
assert_matches!(task.await, Ok(()));
assert_matches!(
proxy.take_event_stream().next().await,
Some(Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }))
);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn fails_on_needs_enumeration_error() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, mut pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, vec![]).await;
let ((), ()) = future::join(
async {
pkgfs_needs
.expect_enumerate_needs([0; 32].into())
.await
.fail_open_with_unexpected_error()
.await;
},
async {
let (missing_blobs_iter, missing_blobs_iter_server_end) =
fidl::endpoints::create_proxy::<BlobInfoIteratorMarker>().unwrap();
assert_matches!(proxy.get_missing_blobs(missing_blobs_iter_server_end), Ok(()));
assert_matches!(
missing_blobs_iter.next().await,
Err(fidl::Error::ClientChannelClosed { status: Status::PEER_CLOSED, .. })
);
},
)
.await;
drop(proxy);
assert_matches!(task.await, Err(ServeNeededBlobsError::ListNeeds(_)));
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn dropping_needed_blobs_stops_missing_blob_iterator() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, mut pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, vec![]).await;
let ((), ()) = future::join(
async {
pkgfs_needs
.expect_enumerate_needs([0; 32].into())
.await
.enumerate_needs(HashRangeFull::default().take(10).collect())
.await;
},
async {
let (missing_blobs_iter, missing_blobs_iter_server_end) =
fidl::endpoints::create_proxy::<BlobInfoIteratorMarker>().unwrap();
assert_matches!(proxy.get_missing_blobs(missing_blobs_iter_server_end), Ok(()));
// Closing the needed blobs request stream terminates any spawned tasks.
drop(proxy);
assert_matches!(
missing_blobs_iter.next().await,
Err(fidl::Error::ClientChannelClosed { status: Status::PEER_CLOSED, .. })
);
},
)
.await;
assert_matches!(task.await, Err(ServeNeededBlobsError::UnexpectedClose));
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn expects_get_missing_blobs_once() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, mut pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, vec![]).await;
// Enumerate the needs successfully once.
let ((), ()) = future::join(
async {
pkgfs_needs
.expect_enumerate_needs([0; 32].into())
.await
.enumerate_needs(HashRangeFull::default().take(10).collect())
.await;
},
async {
let (missing_blobs_iter, missing_blobs_iter_server_end) =
fidl::endpoints::create_proxy::<BlobInfoIteratorMarker>().unwrap();
assert_matches!(proxy.get_missing_blobs(missing_blobs_iter_server_end), Ok(()));
collect_blob_info_iterator(missing_blobs_iter).await;
},
)
.await;
// Trying to enumerate the missing blobs again is a protocol violation.
let (_missing_blobs_iter, missing_blobs_iter_server_end) =
fidl::endpoints::create_proxy::<BlobInfoIteratorMarker>().unwrap();
assert_matches!(proxy.get_missing_blobs(missing_blobs_iter_server_end), Ok(()));
assert_matches!(
task.await,
Err(ServeNeededBlobsError::UnexpectedRequest {
received: "get_missing_blobs",
expected: "open_blob"
})
);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
pub(super) async fn enumerate_missing_blobs(
proxy: &NeededBlobsProxy,
pkgfs_needs: &mut pkgfs::needs::Mock,
meta_blob_id: BlobId,
blobs: impl Iterator<Item = Hash>,
) {
let ((), ()) = future::join(
async {
pkgfs_needs
.expect_enumerate_needs(meta_blob_id.into())
.await
.enumerate_needs(blobs.map(Into::into).collect::<BTreeSet<_>>())
.await;
},
async {
let (missing_blobs_iter, missing_blobs_iter_server_end) =
fidl::endpoints::create_proxy::<BlobInfoIteratorMarker>().unwrap();
assert_matches!(proxy.get_missing_blobs(missing_blobs_iter_server_end), Ok(()));
collect_blob_info_iterator(missing_blobs_iter).await;
},
)
.await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn single_need() {
let meta_blob_info = BlobInfo { blob_id: [1; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, mut pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, vec![[2; 32].into()])
.await;
enumerate_missing_blobs(
&proxy,
&mut pkgfs_needs,
meta_blob_info.blob_id,
vec![[2; 32].into()].into_iter(),
)
.await;
let payload = b"single blob";
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob([2; 32].into(), BlobKind::Data.into())
.await
.expect_payload(payload)
.await;
},
async {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(
proxy.open_blob(&mut BlobId::from([2; 32]).into(), blob_server_end).await,
Ok(Ok(true))
);
let _ = blob.truncate(payload.len() as u64).await;
let _ = blob.write(payload).await;
let _ = blob.close().await;
},
)
.await;
pkgfs_needs.expect_enumerate_needs([1; 32].into()).await.fail_open_with_not_found().await;
assert_matches!(task.await, Ok(()));
assert_matches!(
proxy.take_event_stream().next().await,
Some(Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }))
);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn expects_open_blob_per_blob_once() {
let meta_blob_info = BlobInfo { blob_id: [1; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, mut pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, vec![[2; 32].into()])
.await;
enumerate_missing_blobs(
&proxy,
&mut pkgfs_needs,
meta_blob_info.blob_id,
vec![[2; 32].into()].into_iter(),
)
.await;
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob([2; 32].into(), BlobKind::Data.into())
.await
.expect_close()
.await;
},
async {
let (_blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(
proxy.open_blob(&mut BlobId::from([2; 32]).into(), blob_server_end).await,
Ok(Ok(true))
);
let (_blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(
proxy.open_blob(&mut BlobId::from([2; 32]).into(), blob_server_end).await,
Err(fidl::Error::ClientChannelClosed { status: Status::PEER_CLOSED, .. })
);
},
)
.await;
assert_matches!(task.await, Err(ServeNeededBlobsError::BlobNotNeeded(hash)) if hash == [2; 32].into());
assert_matches!(proxy.take_event_stream().next().await, None);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn handles_many_content_blobs_that_need_written() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, mut pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
let content_blobs = || HashRangeFull::default().skip(1).take(100);
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, content_blobs()).await;
enumerate_missing_blobs(&proxy, &mut pkgfs_needs, meta_blob_info.blob_id, content_blobs())
.await;
fn payload(hash: Hash) -> Vec<u8> {
let hash_bytes = || hash.as_bytes().iter().copied();
let len = hash_bytes().map(|n| n as usize).sum();
assert!(len <= fidl_fuchsia_io::MAX_BUF as usize);
std::iter::repeat(hash_bytes()).flatten().take(len).collect()
}
let ((), ()) = future::join(
async {
for hash in content_blobs() {
pkgfs_install
.expect_create_blob(hash, BlobKind::Data.into())
.await
.expect_payload(&payload(hash))
.await;
}
},
async {
let () = stream::iter(content_blobs())
.for_each_concurrent(None, |hash| {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
let open_fut =
proxy.open_blob(&mut BlobId::from(hash).into(), blob_server_end);
async move {
assert_matches!(open_fut.await, Ok(Ok(true)));
let payload = payload(hash);
let _ = blob.truncate(payload.len() as u64).await;
let _ = blob.write(&payload).await;
let _ = blob.close().await;
}
})
.await;
},
)
.await;
pkgfs_needs.expect_enumerate_needs([0; 32].into()).await.fail_open_with_not_found().await;
assert_matches!(task.await, Ok(()));
assert_matches!(
proxy.take_event_stream().next().await,
Some(Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }))
);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn handles_many_content_blobs_that_are_already_present() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, mut pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
let content_blobs = || HashRangeFull::default().skip(1).take(100);
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, content_blobs()).await;
enumerate_missing_blobs(&proxy, &mut pkgfs_needs, meta_blob_info.blob_id, content_blobs())
.await;
let ((), ()) = future::join(
async {
for hash in content_blobs() {
pkgfs_install
.expect_create_blob(hash, BlobKind::Data.into())
.await
.fail_open_with_already_exists();
}
},
async {
let () = stream::iter(content_blobs())
.for_each_concurrent(None, |hash| {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
let open_fut =
proxy.open_blob(&mut BlobId::from(hash).into(), blob_server_end);
async move {
assert_matches!(open_fut.await, Ok(Ok(false)));
assert_matches!(blob.take_event_stream().next().await, None);
}
})
.await;
},
)
.await;
pkgfs_needs.expect_enumerate_needs([0; 32].into()).await.fail_open_with_not_found().await;
assert_matches!(task.await, Ok(()));
assert_matches!(
proxy.take_event_stream().next().await,
Some(Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }))
);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn allows_retrying_nonfatal_open_blob_errors() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, mut pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
let content_blob = Hash::from([1; 32]);
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, vec![content_blob])
.await;
enumerate_missing_blobs(
&proxy,
&mut pkgfs_needs,
meta_blob_info.blob_id,
vec![content_blob].into_iter(),
)
.await;
// Try to open the blob, but report it is already being written concurrently.
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob(content_blob, BlobKind::Data.into())
.await
.fail_open_with_concurrent_write();
},
async {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(
proxy.open_blob(&mut BlobId::from(content_blob).into(), blob_server_end).await,
Ok(Err(fidl_fuchsia_pkg::OpenBlobError::ConcurrentWrite))
);
assert_matches!(blob.truncate(1).await, Err(_));
},
)
.await;
// Try to write the blob, but report the written contents are corrupt.
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob(content_blob, BlobKind::Data.into())
.await
.fail_write_with_corrupt()
.await;
},
async {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(
proxy.open_blob(&mut BlobId::from(content_blob).into(), blob_server_end).await,
Ok(Ok(true))
);
let _ = blob.truncate(1).await;
let _ = blob.write(&mut [0]).await;
let _ = blob.close().await;
},
)
.await;
// Open the blob for write, but then close it (a non-fatal error)
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob(content_blob, BlobKind::Data.into())
.await
.expect_close()
.await;
},
async {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(
proxy.open_blob(&mut BlobId::from(content_blob).into(), blob_server_end).await,
Ok(Ok(true))
);
let _ = blob.close().await;
},
)
.await;
// Operation succeeds after pkgfs cooperates.
let ((), ()) = future::join(
async {
pkgfs_install
.expect_create_blob(content_blob, BlobKind::Data.into())
.await
.expect_payload(&[0])
.await;
},
async {
let (blob, blob_server_end) =
fidl::endpoints::create_proxy::<FileMarker>().unwrap();
assert_matches!(
proxy.open_blob(&mut BlobId::from(content_blob).into(), blob_server_end).await,
Ok(Ok(true))
);
let _ = blob.truncate(1).await;
let _ = blob.write(&mut [0]).await;
let _ = blob.close().await;
},
)
.await;
pkgfs_needs.expect_enumerate_needs([0; 32].into()).await.fail_open_with_not_found().await;
// That was the only data blob, so the operation is now done.
assert_matches!(task.await, Ok(()));
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn abort_aborts_while_waiting_for_open_meta_blob() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, pkgfs_install, pkgfs_needs, _blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
let abort_fut = proxy.abort();
assert_matches!(task.await, Err(ServeNeededBlobsError::Aborted));
assert_matches!(
abort_fut.await,
Err(fidl::Error::ClientChannelClosed { status: Status::PEER_CLOSED, .. })
);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn abort_aborts_while_waiting_for_get_missing_blobs() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, vec![]).await;
let abort_fut = proxy.abort();
assert_matches!(task.await, Err(ServeNeededBlobsError::Aborted));
assert_matches!(
abort_fut.await,
Err(fidl::Error::ClientChannelClosed { status: Status::PEER_CLOSED, .. })
);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn abort_aborts_while_waiting_for_open_blobs() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (task, proxy, mut pkgfs_install, mut pkgfs_needs, blobfs) =
spawn_serve_needed_blobs_with_mocks(meta_blob_info);
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, vec![]).await;
enumerate_missing_blobs(
&proxy,
&mut pkgfs_needs,
meta_blob_info.blob_id,
vec![[2; 32].into()].into_iter(),
)
.await;
let abort_fut = proxy.abort();
assert_matches!(task.await, Err(ServeNeededBlobsError::Aborted));
assert_matches!(
abort_fut.await,
Err(fidl::Error::ClientChannelClosed { status: Status::PEER_CLOSED, .. })
);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
}
}
#[cfg(test)]
mod get_handler_tests {
use super::serve_needed_blobs_tests::*;
use super::*;
use fidl_fuchsia_pkg::NeededBlobsProxy;
use fuchsia_cobalt::{CobaltConnector, ConnectionType};
use matches::assert_matches;
fn spawn_get_with_mocks(
meta_blob_info: BlobInfo,
dir_request: Option<ServerEnd<DirectoryMarker>>,
) -> (
Task<Result<(), Status>>,
NeededBlobsProxy,
pkgfs::versions::Mock,
pkgfs::install::Mock,
pkgfs::needs::Mock,
fuchsia_pkg_testing::blobfs::Fake,
) {
let (proxy, stream) = fidl::endpoints::create_proxy::<NeededBlobsMarker>().unwrap();
let (pkgfs_versions, pkgfs_versions_mock) = pkgfs::versions::Client::new_mock();
let (pkgfs_install, pkgfs_install_mock) = pkgfs::install::Client::new_mock();
let (pkgfs_needs, pkgfs_needs_mock) = pkgfs::needs::Client::new_mock();
let (blobfs_fake, blobfs) = fuchsia_pkg_testing::blobfs::Fake::new();
let dynamic_index = Arc::new(Mutex::new(DynamicIndex::new()));
let (cobalt_sender, _) =
CobaltConnector::default().serve(ConnectionType::project_id(metrics::PROJECT_ID));
(
Task::spawn(async move {
get(
&pkgfs_versions,
&pkgfs_install,
&pkgfs_needs,
&dynamic_index,
&blobfs,
meta_blob_info,
vec![],
stream,
dir_request,
cobalt_sender,
)
.await
}),
proxy,
pkgfs_versions_mock,
pkgfs_install_mock,
pkgfs_needs_mock,
blobfs_fake,
)
}
#[fuchsia_async::run_singlethreaded(test)]
async fn everything_closed() {
let (_, stream) = fidl::endpoints::create_proxy::<NeededBlobsMarker>().unwrap();
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (pkgfs_versions, _) = pkgfs::versions::Client::new_test();
let (pkgfs_install, _) = pkgfs::install::Client::new_test();
let (pkgfs_needs, _) = pkgfs::needs::Client::new_test();
let (blobfs, _) = blobfs::Client::new_test();
let dynamic_index = Arc::new(Mutex::new(DynamicIndex::new()));
let (cobalt_sender, _) =
CobaltConnector::default().serve(ConnectionType::project_id(metrics::PROJECT_ID));
assert_matches!(
get(
&pkgfs_versions,
&pkgfs_install,
&pkgfs_needs,
&dynamic_index,
&blobfs,
meta_blob_info,
vec![],
stream,
None,
cobalt_sender
)
.await,
Err(Status::UNAVAILABLE)
);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn trivially_opens_present_package() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (pkgdir, pkgdir_server_end) =
fidl::endpoints::create_proxy::<DirectoryMarker>().unwrap();
let (task, proxy, mut pkgfs_versions, pkgfs_install, pkgfs_needs, _blobfs) =
spawn_get_with_mocks(meta_blob_info, Some(pkgdir_server_end));
pkgfs_versions
.expect_open_package([0; 32].into())
.await
.succeed_open()
.await
.expect_clone()
.await
.verify_are_same_channel(pkgdir);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
assert_eq!(task.await, Ok(()));
assert_matches!(
proxy.take_event_stream().next().await.unwrap(),
Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. })
);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn trivially_opens_present_package_even_if_needed_blobs_is_closed() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (pkgdir, pkgdir_server_end) =
fidl::endpoints::create_proxy::<DirectoryMarker>().unwrap();
let (task, proxy, mut pkgfs_versions, pkgfs_install, pkgfs_needs, _blobfs) =
spawn_get_with_mocks(meta_blob_info, Some(pkgdir_server_end));
drop(proxy);
pkgfs_versions
.expect_open_package([0; 32].into())
.await
.succeed_open()
.await
.expect_clone()
.await
.verify_are_same_channel(pkgdir);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
assert_eq!(task.await, Ok(()));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn opens_missing_package_after_writing_blobs() {
let meta_blob_info = BlobInfo { blob_id: [0; 32].into(), length: 0 };
let (pkgdir, pkgdir_server_end) =
fidl::endpoints::create_proxy::<DirectoryMarker>().unwrap();
let (task, proxy, mut pkgfs_versions, mut pkgfs_install, mut pkgfs_needs, blobfs) =
spawn_get_with_mocks(meta_blob_info, Some(pkgdir_server_end));
pkgfs_versions.expect_open_package([0; 32].into()).await.fail_open_with_not_found().await;
write_meta_blob(&proxy, &mut pkgfs_install, &blobfs, meta_blob_info, vec![]).await;
enumerate_missing_blobs(
&proxy,
&mut pkgfs_needs,
meta_blob_info.blob_id,
vec![].into_iter(),
)
.await;
pkgfs_needs.expect_enumerate_needs([0; 32].into()).await.fail_open_with_not_found().await;
assert_matches!(
proxy.take_event_stream().next().await.unwrap(),
Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. })
);
pkgfs_versions
.expect_open_package([0; 32].into())
.await
.succeed_open()
.await
.expect_clone()
.await
.verify_are_same_channel(pkgdir);
pkgfs_install.expect_done().await;
pkgfs_needs.expect_done().await;
assert_eq!(task.await, Ok(()));
}
}
#[cfg(test)]
mod serve_write_blob_tests {
use {
super::*,
fidl_fuchsia_io::{FileMarker, FileProxy},
futures::task::Poll,
matches::assert_matches,
proptest::prelude::*,
proptest_derive::Arbitrary,
};
/// Calls the provided test function with an open File proxy being served by serve_write_blob
/// and the corresponding request stream representing the open pkgfs install blob file.
async fn do_serve_write_blob_with<F, Fut>(cb: F) -> Result<(), ServeWriteBlobError>
where
F: FnOnce(FileProxy, FileRequestStream) -> Fut,
Fut: Future<Output = ()>,
{
let (pkgfs_blob, pkgfs_blob_stream) = OpenBlob::new_test(BlobKind::Data);
let (pkg_cache_blob, pkg_cache_blob_stream) =
fidl::endpoints::create_proxy_and_stream::<FileMarker>().unwrap();
let task = serve_write_blob(pkg_cache_blob_stream, pkgfs_blob);
let test = cb(pkg_cache_blob, pkgfs_blob_stream);
let (res, ()) = future::join(task, test).await;
res
}
/// Handles a single FIDL request on the provided stream, panicing if the received request is
/// not the expected kind.
macro_rules! serve_fidl_request {
(
$stream:expr, { $pat:pat => $handler:block, }
) => {
match $stream.next().await.unwrap().unwrap() {
$pat => $handler,
req => panic!("unexpected request: {:?}", req),
}
};
}
/// Runs the provided FIDL request stream to compleation, running each handler in sequence,
/// panicing if any incoming request is not the expected kind.
macro_rules! serve_fidl_stream {
(
$stream:expr, { $( $pat:pat => $handler:block, )* }
) => {
async move {
$(
serve_fidl_request!($stream, { $pat => $handler, });
)*
assert_matches!($stream.next().await, None);
}
}
}
/// Sends a truncate request, asserts that the remote end receives the request, responds to the
/// request, and asserts that the truncate request receives the expected mapped status code.
async fn verify_truncate(
proxy: &FileProxy,
stream: &mut FileRequestStream,
length: u64,
pkgfs_response: Status,
) -> Status {
let ((), o) = future::join(
async move {
serve_fidl_request!(stream, {
FileRequest::Truncate { length: actual_length, responder } => {
assert_eq!(length, actual_length);
responder.send(pkgfs_response.into_raw()).unwrap();
},
});
},
async move { proxy.truncate(length).await.map(Status::from_raw).unwrap() },
)
.await;
o
}
/// Sends a write request, asserts that the remote end receives the request, responds to the
/// request, and asserts that the write request receives the expected mapped status code/length.
async fn verify_write(
proxy: &FileProxy,
stream: &mut FileRequestStream,
data: &[u8],
pkgfs_response: Status,
) -> Status {
let ((), o) = future::join(
async move {
serve_fidl_request!(stream, {
FileRequest::Write{ data: actual_data, responder } => {
assert_eq!(data, actual_data);
responder.send(pkgfs_response.into_raw(), data.len() as u64).unwrap();
},
});
},
async move {
let (s, len) =
proxy.write(data).await.map(|(s, len)| (Status::from_raw(s), len)).unwrap();
if s == Status::OK {
assert_eq!(len, data.len() as u64);
}
s
},
)
.await;
o
}
/// Verify that closing the proxy results in the pkgfs backing file being explicitly closed.
async fn verify_inner_blob_closes(proxy: FileProxy, mut stream: FileRequestStream) {
drop(proxy);
serve_fidl_stream!(stream, {
FileRequest::Close { responder } => {
responder.send(Status::OK.into_raw()).unwrap();
},
})
.await;
}
/// Verify that an explicit close() request is proxied through to the pkgfs backing file.
async fn verify_explicit_close(proxy: FileProxy, mut stream: FileRequestStream) {
let ((), ()) = future::join(
serve_fidl_stream!(stream, {
FileRequest::Close { responder } => {
responder.send(Status::OK.into_raw()).unwrap();
},
}),
async move {
let _ = proxy.close().await;
drop(proxy);
},
)
.await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn start_stop() {
let res = do_serve_write_blob_with(|proxy, stream| async move {
drop(proxy);
drop(stream);
})
.await;
assert_matches!(res, Err(ServeWriteBlobError::UnexpectedClose));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn happy_path_succeeds() {
do_serve_write_blob_with(|proxy, mut stream| async move {
assert_eq!(verify_truncate(&proxy, &mut stream, 200, Status::OK).await, Status::OK);
assert_eq!(verify_write(&proxy, &mut stream, &[1; 100], Status::OK).await, Status::OK);
assert_eq!(verify_write(&proxy, &mut stream, &[2; 100], Status::OK).await, Status::OK);
verify_explicit_close(proxy, stream).await;
})
.await
.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn happy_path_implicit_close_succeeds() {
do_serve_write_blob_with(|proxy, mut stream| async move {
assert_eq!(verify_truncate(&proxy, &mut stream, 200, Status::OK).await, Status::OK);
assert_eq!(verify_write(&proxy, &mut stream, &[1; 100], Status::OK).await, Status::OK);
assert_eq!(verify_write(&proxy, &mut stream, &[2; 100], Status::OK).await, Status::OK);
verify_inner_blob_closes(proxy, stream).await;
})
.await
.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn raises_out_of_space_during_truncate() {
let res = do_serve_write_blob_with(|proxy, mut stream| async move {
assert_eq!(
verify_truncate(&proxy, &mut stream, 100, Status::NO_SPACE).await,
Status::NO_SPACE
);
verify_inner_blob_closes(proxy, stream).await;
})
.await;
assert_matches!(res, Err(ServeWriteBlobError::NoSpace));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn truncate_maps_unknown_errors_to_internal() {
let res = do_serve_write_blob_with(|proxy, mut stream| async move {
assert_eq!(
verify_truncate(&proxy, &mut stream, 100, Status::ADDRESS_UNREACHABLE).await,
Status::INTERNAL
);
verify_inner_blob_closes(proxy, stream).await;
})
.await;
assert_matches!(res, Err(ServeWriteBlobError::Truncate(_)));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn raises_out_of_space_during_write() {
let res = do_serve_write_blob_with(|proxy, mut stream| async move {
assert_eq!(verify_truncate(&proxy, &mut stream, 100, Status::OK).await, Status::OK);
assert_eq!(
verify_write(&proxy, &mut stream, &[0; 1], Status::NO_SPACE).await,
Status::NO_SPACE
);
verify_inner_blob_closes(proxy, stream).await;
})
.await;
assert_matches!(res, Err(ServeWriteBlobError::NoSpace));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn raises_corrupt_during_last_write() {
let res = do_serve_write_blob_with(|proxy, mut stream| async move {
assert_eq!(verify_truncate(&proxy, &mut stream, 10, Status::OK).await, Status::OK);
assert_eq!(verify_write(&proxy, &mut stream, &[0; 5], Status::OK).await, Status::OK);
assert_eq!(
verify_write(&proxy, &mut stream, &[1; 5], Status::IO_DATA_INTEGRITY).await,
Status::IO_DATA_INTEGRITY
);
verify_inner_blob_closes(proxy, stream).await;
})
.await;
assert_matches!(res, Err(ServeWriteBlobError::Corrupt));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn write_maps_unknown_errors_to_internal() {
let res = do_serve_write_blob_with(|proxy, mut stream| async move {
assert_eq!(verify_truncate(&proxy, &mut stream, 100, Status::OK).await, Status::OK);
assert_eq!(
verify_write(&proxy, &mut stream, &[1; 1], Status::ADDRESS_UNREACHABLE).await,
Status::INTERNAL
);
verify_inner_blob_closes(proxy, stream).await;
})
.await;
assert_matches!(res, Err(ServeWriteBlobError::Write(_)));
}
#[test]
fn close_closes_inner_blob_first() {
let mut executor = fuchsia_async::Executor::new().unwrap();
let (pkgfs_blob, mut pkgfs_blob_stream) = OpenBlob::new_test(BlobKind::Data);
let (pkg_cache_blob, pkg_cache_blob_stream) =
fidl::endpoints::create_proxy_and_stream::<FileMarker>().unwrap();
let task = serve_write_blob(pkg_cache_blob_stream, pkgfs_blob);
futures::pin_mut!(task);
let mut close_fut = pkg_cache_blob.close();
drop(pkg_cache_blob);
// Let the task process the close request, ensuring the close_future doesn't yet complete.
assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
assert_matches!(executor.run_until_stalled(&mut close_fut), Poll::Pending);
// Verify the inner blob is bineg closed.
let () = executor.run_singlethreaded(async {
serve_fidl_request!(pkgfs_blob_stream, {
FileRequest::Close { responder } => {
responder.send(Status::OK.into_raw()).unwrap();
},
})
});
// Now that the inner blob is closed, the proxy task and close request can complete
assert_matches!(
executor.run_until_stalled(&mut task),
Poll::Ready(Err(ServeWriteBlobError::UnexpectedClose))
);
assert_matches!(executor.run_until_stalled(&mut close_fut), Poll::Ready(Ok(0)));
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Arbitrary)]
enum StubRequestor {
Clone,
Describe,
Sync,
GetAttr,
SetAttr,
NodeGetFlags,
NodeSetFlags,
Write,
WriteAt,
Read,
ReadAt,
Seek,
Truncate,
GetFlags,
SetFlags,
GetBuffer,
// New API that references fuchsia.io2. Not strictly necessary to verify all possible
// ordinals (which is the space of a u64 anyway).
// AdvisoryLock
// Always allowed.
// Close
}
impl StubRequestor {
fn method_name(self) -> &'static str {
match self {
StubRequestor::Clone => "clone",
StubRequestor::Describe => "describe",
StubRequestor::Sync => "sync",
StubRequestor::GetAttr => "get_attr",
StubRequestor::SetAttr => "set_attr",
StubRequestor::NodeGetFlags => "node_get_flags",
StubRequestor::NodeSetFlags => "node_set_flags",
StubRequestor::Write => "write",
StubRequestor::WriteAt => "write_at",
StubRequestor::Read => "read",
StubRequestor::ReadAt => "read_at",
StubRequestor::Seek => "seek",
StubRequestor::Truncate => "truncate",
StubRequestor::GetFlags => "get_flags",
StubRequestor::SetFlags => "set_flags",
StubRequestor::GetBuffer => "get_buffer",
}
}
fn make_stub_request(self, proxy: &FileProxy) -> impl Future<Output = ()> {
use fidl::encoding::Decodable;
match self {
StubRequestor::Clone => {
let (_, server_end) =
fidl::endpoints::create_proxy::<fidl_fuchsia_io::NodeMarker>().unwrap();
let _ = proxy.clone(0, server_end);
future::ready(()).boxed()
}
StubRequestor::Describe => proxy.describe().map(|_| ()).boxed(),
StubRequestor::Sync => proxy.sync().map(|_| ()).boxed(),
StubRequestor::GetAttr => proxy.get_attr().map(|_| ()).boxed(),
StubRequestor::SetAttr => proxy
.set_attr(0, &mut fidl_fuchsia_io::NodeAttributes::new_empty())
.map(|_| ())
.boxed(),
StubRequestor::NodeGetFlags => proxy.node_get_flags().map(|_| ()).boxed(),
StubRequestor::NodeSetFlags => proxy.node_set_flags(0).map(|_| ()).boxed(),
StubRequestor::Write => proxy.write(&[0; 0]).map(|_| ()).boxed(),
StubRequestor::WriteAt => proxy.write_at(&[0; 0], 0).map(|_| ()).boxed(),
StubRequestor::Read => proxy.read(0).map(|_| ()).boxed(),
StubRequestor::ReadAt => proxy.read_at(0, 0).map(|_| ()).boxed(),
StubRequestor::Seek => {
proxy.seek(0, fidl_fuchsia_io::SeekOrigin::Start).map(|_| ()).boxed()
}
StubRequestor::Truncate => proxy.truncate(0).map(|_| ()).boxed(),
StubRequestor::GetFlags => proxy.get_flags().map(|_| ()).boxed(),
StubRequestor::SetFlags => proxy.set_flags(0).map(|_| ()).boxed(),
StubRequestor::GetBuffer => proxy.get_buffer(0).map(|_| ()).boxed(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Arbitrary)]
enum InitialState {
ExpectTruncate,
ExpectWrite,
ExpectClose,
}
impl InitialState {
fn expected_method_name(self) -> &'static str {
match self {
InitialState::ExpectTruncate => "truncate",
InitialState::ExpectWrite => "write",
InitialState::ExpectClose => "close",
}
}
async fn enter(self, proxy: &FileProxy, stream: &mut FileRequestStream) {
match self {
InitialState::ExpectTruncate => {}
InitialState::ExpectWrite => {
assert_eq!(verify_truncate(proxy, stream, 100, Status::OK).await, Status::OK);
}
InitialState::ExpectClose => {
assert_eq!(verify_truncate(proxy, stream, 100, Status::OK).await, Status::OK);
assert_eq!(
verify_write(proxy, stream, &[0; 100], Status::OK).await,
Status::OK
);
}
}
}
}
proptest! {
// Failure seed persistence isn't working in Fuchsia tests, and these tests are expected to
// verify the entire input space anyway. Enable result caching to skip running the same
// case more than once.
#![proptest_config(ProptestConfig{
failure_persistence: None,
result_cache: proptest::test_runner::basic_result_cache,
..Default::default()
})]
#[test]
fn allows_close_in_any_state(initial_state: InitialState) {
let mut executor = fuchsia_async::Executor::new().unwrap();
let () = executor.run_singlethreaded(async move {
let res = do_serve_write_blob_with(|proxy, mut stream| async move {
initial_state.enter(&proxy, &mut stream).await;
verify_explicit_close(proxy, stream).await;
})
.await;
match initial_state {
InitialState::ExpectClose => assert_matches!(res, Ok(())),
_ => assert_matches!(res, Err(ServeWriteBlobError::UnexpectedClose)),
}
});
}
#[test]
fn rejects_unexpected_requests(initial_state: InitialState, bad_request: StubRequestor) {
// Skip stub requests that are the expected request for this initial state.
prop_assume!(initial_state.expected_method_name() != bad_request.method_name());
let mut executor = fuchsia_async::Executor::new().unwrap();
let () = executor.run_singlethreaded(async move {
let res = do_serve_write_blob_with(|proxy, mut stream| async move {
initial_state.enter(&proxy, &mut stream).await;
let bad_request_fut = bad_request.make_stub_request(&proxy);
let ((), ()) = future::join(
async move {
let _ = bad_request_fut.await;
},
verify_inner_blob_closes(proxy, stream),
)
.await;
})
.await;
match res {
Err(ServeWriteBlobError::UnexpectedRequest{ received, expected }) => {
prop_assert_eq!(received, bad_request.method_name());
prop_assert_eq!(expected, initial_state.expected_method_name());
}
res => panic!("Expected UnexpectedRequest error, got {:?}", res),
}
Ok(())
})?;
}
}
}