blob: 061f79dc9ecae7df98a4d6adf34bf2a720151fd6 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Typesafe wrappers around the /pkgfs/needs filesystem.
use {
fidl::endpoints::RequestStream,
fidl_fuchsia_io as fio,
fuchsia_hash::{Hash, ParseHashError},
fuchsia_zircon::Status,
futures::prelude::*,
std::{
collections::{BTreeSet, HashSet},
convert::TryInto,
},
thiserror::Error,
};
#[derive(Debug, Error)]
#[allow(missing_docs)]
pub enum ListNeedsError {
#[error("while opening needs dir: {}", _0)]
OpenDir(io_util::node::OpenError),
#[error("while listing needs dir: {}", _0)]
ReadDir(files_async::Error),
#[error("unable to parse a need blob id: {}", _0)]
ParseError(ParseHashError),
}
/// An open handle to /pkgfs/needs
#[derive(Debug, Clone)]
pub struct Client {
proxy: fio::DirectoryProxy,
}
impl Client {
/// Returns an client connected to pkgfs from the current component's namespace
pub fn open_from_namespace() -> Result<Self, io_util::node::OpenError> {
let proxy =
io_util::directory::open_in_namespace("/pkgfs/needs", fio::OpenFlags::RIGHT_READABLE)?;
Ok(Client { proxy })
}
/// Returns an client connected to pkgfs from the given pkgfs root dir.
pub fn open_from_pkgfs_root(
pkgfs: &fio::DirectoryProxy,
) -> Result<Self, io_util::node::OpenError> {
Ok(Client {
proxy: io_util::directory::open_directory_no_describe(
pkgfs,
"needs",
fio::OpenFlags::RIGHT_READABLE,
)?,
})
}
/// Creates a new client backed by the returned request stream. This constructor should not be
/// used outside of tests.
///
/// # Panics
///
/// Panics on error
pub fn new_test() -> (Self, fio::DirectoryRequestStream) {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>().unwrap();
(Self { proxy }, stream)
}
/// Creates a new client backed by the returned mock. This constructor should not be used
/// outside of tests.
///
/// # Panics
///
/// Panics on error
pub fn new_mock() -> (Self, Mock) {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>().unwrap();
(Self { proxy }, Mock { stream })
}
/// Returns a stream of chunks of blobs that are needed to resolve the package specified by
/// `pkg_merkle` provided that the `pkg_merkle` blob has previously been written to
/// /pkgfs/install/pkg/. The package should be available in /pkgfs/versions when this stream
/// terminates without error.
pub fn list_needs(
&self,
pkg_merkle: Hash,
) -> impl Stream<Item = Result<HashSet<Hash>, ListNeedsError>> + '_ {
// None if stream is terminated and should not continue to enumerate needs.
let state = Some(&self.proxy);
futures::stream::unfold(state, move |state: Option<&fio::DirectoryProxy>| {
async move {
if let Some(proxy) = state {
match enumerate_needs_dir(proxy, pkg_merkle).await {
Ok(needs) => {
if needs.is_empty() {
None
} else {
Some((Ok(needs), Some(proxy)))
}
}
// report the error and terminate the stream.
Err(err) => return Some((Err(err), None)),
}
} else {
None
}
}
})
}
}
/// Lists all blobs currently in the `pkg_merkle`'s needs directory.
async fn enumerate_needs_dir(
pkgfs_needs: &fio::DirectoryProxy,
pkg_merkle: Hash,
) -> Result<HashSet<Hash>, ListNeedsError> {
let path = format!("packages/{}", pkg_merkle);
let flags = fio::OpenFlags::RIGHT_READABLE;
let needs_dir = match io_util::directory::open_directory(pkgfs_needs, &path, flags).await {
Ok(dir) => dir,
Err(io_util::node::OpenError::OpenError(Status::NOT_FOUND)) => return Ok(HashSet::new()),
Err(e) => return Err(ListNeedsError::OpenDir(e)),
};
let entries = files_async::readdir(&needs_dir).await.map_err(ListNeedsError::ReadDir)?;
Ok(entries
.into_iter()
.filter_map(|entry| {
if entry.kind == files_async::DirentKind::File {
Some(entry.name.parse().map_err(ListNeedsError::ParseError))
} else {
// Ignore unknown entries.
None
}
})
.collect::<Result<HashSet<Hash>, ListNeedsError>>()?)
}
/// A testing server implementation of /pkgfs/needs.
///
/// Mock does not handle requests until instructed to do so.
pub struct Mock {
stream: fio::DirectoryRequestStream,
}
impl Mock {
/// Consume the next directory request, verifying it is intended to open the directory
/// containing the needs for the given package meta far `merkle`. Returns a `MockNeeds`
/// representing the open needs directory.
///
/// # Panics
///
/// Panics on error or assertion violation (unexpected requests or a mismatched open call)
pub async fn expect_enumerate_needs(&mut self, merkle: Hash) -> MockNeeds {
match self.stream.next().await {
Some(Ok(fio::DirectoryRequest::Open {
flags: _,
mode: _,
path,
object,
control_handle: _,
})) => {
assert_eq!(path, format!("packages/{}", merkle));
let stream = object.into_stream().unwrap().cast_stream();
MockNeeds { stream }
}
other => panic!("unexpected request: {:?}", other),
}
}
/// Asserts that the request stream closes without any further requests.
///
/// # Panics
///
/// Panics on error
pub async fn expect_done(mut self) {
match self.stream.next().await {
None => {}
Some(request) => panic!("unexpected request: {:?}", request),
}
}
}
/// A testing server implementation of an open /pkgfs/needs/packages/<merkle> directory.
///
/// MockNeeds does not send the OnOpen event or handle requests until instructed to do so.
pub struct MockNeeds {
stream: fio::DirectoryRequestStream,
}
impl MockNeeds {
fn send_on_open(&mut self, status: Status) {
let mut info = fio::NodeInfo::Directory(fio::DirectoryObject);
let () =
self.stream.control_handle().send_on_open_(status.into_raw(), Some(&mut info)).unwrap();
}
async fn handle_rewind(&mut self) {
match self.stream.next().await {
Some(Ok(fio::DirectoryRequest::Rewind { responder })) => {
responder.send(Status::OK.into_raw()).unwrap();
}
other => panic!("unexpected request: {:?}", other),
}
}
/// Fail the open request with an error indicating there are no needs.
///
/// # Panics
///
/// Panics on error
pub async fn fail_open_with_not_found(mut self) {
self.send_on_open(Status::NOT_FOUND);
}
/// Fail the open request with an unexpected error status.
///
/// # Panics
///
/// Panics on error
pub async fn fail_open_with_unexpected_error(mut self) {
self.send_on_open(Status::INTERNAL);
}
/// Succeeds the open request, then handles incoming read_dirents requests to provide the
/// client with the given `needs`.
///
/// # Panics
///
/// Panics on error
pub async fn enumerate_needs(mut self, needs: BTreeSet<Hash>) {
self.send_on_open(Status::OK);
// files_async starts by resetting the directory channel's readdir position.
self.handle_rewind().await;
#[repr(C, packed)]
struct Dirent {
ino: u64,
size: u8,
kind: u8,
name: [u8; 64],
}
impl Dirent {
fn as_bytes(&self) -> &[u8] {
let start = self as *const Self as *const u8;
// Safe because the FIDL wire format for directory entries is
// defined to be the C packed struct representation used here.
unsafe { std::slice::from_raw_parts(start, std::mem::size_of::<Self>()) }
}
}
let mut needs_iter = needs.iter().enumerate().map(|(i, hash)| Dirent {
ino: i as u64 + 1,
size: 64,
kind: fio::DirentType::File.into_primitive(),
name: hash.to_string().as_bytes().try_into().unwrap(),
});
while let Some(request) = self.stream.try_next().await.unwrap() {
match request {
fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
let max_bytes = max_bytes as usize;
assert!(max_bytes >= std::mem::size_of::<Dirent>());
let mut buf = vec![];
while buf.len() + std::mem::size_of::<Dirent>() <= max_bytes {
match needs_iter.next() {
Some(need) => {
buf.extend(need.as_bytes());
}
None => break,
}
}
responder.send(Status::OK.into_raw(), &buf).unwrap();
}
other => panic!("unexpected request: {:?}", other),
}
}
assert!(matches!(needs_iter.next(), None));
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::install::{BlobCreateError, BlobKind, BlobWriteSuccess},
assert_matches::assert_matches,
fuchsia_hash::HashRangeFull,
fuchsia_pkg_testing::PackageBuilder,
maplit::hashset,
pkgfs_ramdisk::PkgfsRamdisk,
};
#[fuchsia_async::run_singlethreaded(test)]
async fn no_needs_is_empty_needs() {
let pkgfs = PkgfsRamdisk::start().unwrap();
let root = pkgfs.root_dir_proxy().unwrap();
let client = Client::open_from_pkgfs_root(&root).unwrap();
let merkle = fuchsia_merkle::MerkleTree::from_reader(std::io::empty()).unwrap().root();
let mut needs = client.list_needs(merkle).boxed();
assert_matches!(needs.next().await, None);
pkgfs.stop().await.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn list_needs() {
let pkgfs = PkgfsRamdisk::start().unwrap();
let root = pkgfs.root_dir_proxy().unwrap();
let install = crate::install::Client::open_from_pkgfs_root(&root).unwrap();
let client = Client::open_from_pkgfs_root(&root).unwrap();
let pkg = PackageBuilder::new("list-needs")
.add_resource_at("data/blob1", "blob1".as_bytes())
.add_resource_at("data/blob2", "blob2".as_bytes())
.build()
.await
.unwrap();
let pkg_contents = pkg.meta_contents().unwrap().contents().to_owned();
install.write_meta_far(&pkg).await;
let mut needs = client.list_needs(pkg.meta_far_merkle_root().to_owned()).boxed();
assert_matches!(
needs.next().await,
Some(Ok(needs)) if needs == hashset! {
pkg_contents["data/blob1"],
pkg_contents["data/blob2"],
}
);
install.write_blob(pkg_contents["data/blob1"], BlobKind::Data, "blob1".as_bytes()).await;
assert_matches!(
needs.next().await,
Some(Ok(needs)) if needs == hashset! {
pkg_contents["data/blob2"],
}
);
install.write_blob(pkg_contents["data/blob2"], BlobKind::Data, "blob2".as_bytes()).await;
assert_matches!(needs.next().await, None);
pkgfs.stop().await.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn shared_blob_still_needed_while_being_written() {
let pkgfs = PkgfsRamdisk::start().unwrap();
let root = pkgfs.root_dir_proxy().unwrap();
let install = crate::install::Client::open_from_pkgfs_root(&root).unwrap();
let versions = crate::versions::Client::open_from_pkgfs_root(&root).unwrap();
let client = Client::open_from_pkgfs_root(&root).unwrap();
const SHARED_BLOB_CONTENTS: &[u8] = "shared between both packages".as_bytes();
let pkg1 = PackageBuilder::new("shared-content-a")
.add_resource_at("data/shared", SHARED_BLOB_CONTENTS)
.build()
.await
.unwrap();
let pkg2 = PackageBuilder::new("shared-content-b")
.add_resource_at("data/shared", SHARED_BLOB_CONTENTS)
.build()
.await
.unwrap();
let pkg_contents = pkg1.meta_contents().unwrap().contents().to_owned();
install.write_meta_far(&pkg1).await;
// start writing the shared blob, but don't finish.
let (blob, closer) =
install.create_blob(pkg_contents["data/shared"], BlobKind::Data).await.unwrap();
let blob = blob.truncate(SHARED_BLOB_CONTENTS.len() as u64).await.unwrap();
let (first, second) = SHARED_BLOB_CONTENTS.split_at(10);
let blob = match blob.write(first).await.unwrap() {
BlobWriteSuccess::MoreToWrite(blob) => blob,
BlobWriteSuccess::Done => unreachable!(),
};
// start installing the second package, and verify the shared blob is listed in needs
install.write_meta_far(&pkg2).await;
let mut needs = client.list_needs(pkg2.meta_far_merkle_root().to_owned()).boxed();
assert_matches!(
needs.next().await,
Some(Ok(needs)) if needs == hashset! {
pkg_contents["data/shared"],
}
);
// finish writing the shared blob, and verify both packages are now complete.
assert_matches!(blob.write(second).await, Ok(BlobWriteSuccess::Done));
closer.close().await;
assert_matches!(needs.next().await, None);
let pkg1_dir =
versions.open_package(pkg1.meta_far_merkle_root()).await.unwrap().into_proxy();
pkg1.verify_contents(&pkg1_dir).await.unwrap();
let pkg2_dir =
versions.open_package(pkg2.meta_far_merkle_root()).await.unwrap().into_proxy();
pkg2.verify_contents(&pkg2_dir).await.unwrap();
pkgfs.stop().await.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn initially_present_blobs_are_not_needed() {
let pkgfs = PkgfsRamdisk::start().unwrap();
let root = pkgfs.root_dir_proxy().unwrap();
let install = crate::install::Client::open_from_pkgfs_root(&root).unwrap();
let client = Client::open_from_pkgfs_root(&root).unwrap();
const PRESENT_BLOB_CONTENTS: &[u8] = "already here".as_bytes();
let pkg = PackageBuilder::new("partially-cached")
.add_resource_at("data/present", PRESENT_BLOB_CONTENTS)
.add_resource_at("data/needed", "need to fetch this one".as_bytes())
.build()
.await
.unwrap();
let pkg_contents = pkg.meta_contents().unwrap().contents().to_owned();
// write the present blob and start installing the package.
pkgfs
.blobfs()
.add_blob_from(
&fuchsia_merkle::MerkleTree::from_reader(PRESENT_BLOB_CONTENTS).unwrap().root(),
PRESENT_BLOB_CONTENTS,
)
.unwrap();
install.write_meta_far(&pkg).await;
// confirm that the needed blob is needed and the present blob is present.
let mut needs = client.list_needs(pkg.meta_far_merkle_root().to_owned()).boxed();
assert_matches!(
needs.next().await,
Some(Ok(needs)) if needs == hashset! {
pkg_contents["data/needed"],
}
);
pkgfs.stop().await.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn racing_blob_writes_do_not_fulfill_partial_blobs() {
let pkgfs = PkgfsRamdisk::start().unwrap();
let root = pkgfs.root_dir_proxy().unwrap();
let install = crate::install::Client::open_from_pkgfs_root(&root).unwrap();
let versions = crate::versions::Client::open_from_pkgfs_root(&root).unwrap();
let client = Client::open_from_pkgfs_root(&root).unwrap();
const REQUIRED_BLOB_CONTENTS: &[u8] = "don't fulfill me early please".as_bytes();
let pkg = PackageBuilder::new("partially-cached")
.add_resource_at("data/required", REQUIRED_BLOB_CONTENTS)
.build()
.await
.unwrap();
let pkg_contents = pkg.meta_contents().unwrap().contents().to_owned();
// write the package meta far and verify the needed blob is needed.
install.write_meta_far(&pkg).await;
let mut needs = client.list_needs(pkg.meta_far_merkle_root().to_owned()).boxed();
assert_matches!(
needs.next().await,
Some(Ok(needs)) if needs == hashset! {
pkg_contents["data/required"],
}
);
// start writing the content blob, but don't finish yet.
let (blob, closer) =
install.create_blob(pkg_contents["data/required"], BlobKind::Data).await.unwrap();
let blob = blob.truncate(REQUIRED_BLOB_CONTENTS.len() as u64).await.unwrap();
let blob = match blob.write("don't ".as_bytes()).await.unwrap() {
BlobWriteSuccess::MoreToWrite(blob) => blob,
BlobWriteSuccess::Done => unreachable!(),
};
// verify the blob is still needed.
assert_matches!(
needs.next().await,
Some(Ok(needs)) if needs == hashset! {
pkg_contents["data/required"],
}
);
// trying to start writing the blob again fails.
assert_matches!(
install.create_blob(pkg_contents["data/required"], BlobKind::Data).await,
Err(BlobCreateError::ConcurrentWrite)
);
// no really, the blob is still needed.
assert_matches!(
needs.next().await,
Some(Ok(needs)) if needs == hashset! {
pkg_contents["data/required"],
}
);
// finish writing the blob.
assert_matches!(
blob.write("fulfill me early please".as_bytes()).await,
Ok(BlobWriteSuccess::Done)
);
closer.close().await;
// verify there are no more needs and the package is readable.
assert_matches!(needs.next().await, None);
let pkg_dir = versions.open_package(pkg.meta_far_merkle_root()).await.unwrap().into_proxy();
pkg.verify_contents(&pkg_dir).await.unwrap();
pkgfs.stop().await.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn mock_yields_expected_needs() {
let (client, mut mock) = Client::new_mock();
let expected = || HashRangeFull::default().take(200);
let ((), ()) = future::join(
async {
mock.expect_enumerate_needs([0; 32].into())
.await
.enumerate_needs(expected().collect::<BTreeSet<_>>())
.await;
},
async {
let needs = client.list_needs([0; 32].into());
futures::pin_mut!(needs);
let actual = needs.next().await.unwrap().unwrap();
assert_eq!(actual, expected().collect::<HashSet<_>>());
},
)
.await;
}
}