[pkg-cache] Use async far reader.
Fixed: 68929
Change-Id: I886a672fb4763e2d85510d103768c272bb227623
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/490159
Commit-Queue: Sen Jiang <senj@google.com>
Reviewed-by: Dan Johnson <computerdruid@google.com>
diff --git a/src/sys/pkg/bin/pkg-cache/src/dynamic_index.rs b/src/sys/pkg/bin/pkg-cache/src/dynamic_index.rs
index 2077e6d..9faaeec 100644
--- a/src/sys/pkg/bin/pkg-cache/src/dynamic_index.rs
+++ b/src/sys/pkg/bin/pkg-cache/src/dynamic_index.rs
@@ -179,12 +179,12 @@
let (path, required_blobs) = {
let file =
blobfs.open_blob_for_read(&blob_hash).await.map_err(DynamicIndexError::OpenBlob)?;
- let meta_far_blob =
- io_util::file::read(&file).await.map_err(DynamicIndexError::ReadBlob)?;
- // TODO: Switch to async far reader (fxbug.dev/68929) to save memory.
- let mut meta_far = fuchsia_archive::Reader::new(std::io::Cursor::new(meta_far_blob))?;
- let meta_package = MetaPackage::deserialize(&meta_far.read_file("meta/package")?[..])?;
- let meta_contents = MetaContents::deserialize(&meta_far.read_file("meta/contents")?[..])?;
+ let mut meta_far =
+ fuchsia_archive::AsyncReader::new(io_util::file::AsyncFile::from_proxy(file)).await?;
+ let meta_package =
+ MetaPackage::deserialize(&meta_far.read_file("meta/package").await?[..])?;
+ let meta_contents =
+ MetaContents::deserialize(&meta_far.read_file("meta/contents").await?[..])?;
(meta_package.into_path(), meta_contents.into_hashes().collect::<HashSet<_>>())
};
@@ -621,7 +621,17 @@
let ((), ()) = future::join(
async {
- blobfs_mock.expect_open_blob(meta_far_hash).await.expect_read(&meta_far).await;
+ blobfs_mock
+ .expect_open_blob(meta_far_hash)
+ .await
+ .then_send_on_open_readable()
+ .await
+ .expect_read_at(&meta_far[..64], 0)
+ .await
+ .expect_get_attr(meta_far.len() as u64)
+ .await
+ .handle_read_at_until_close(&meta_far)
+ .await;
blobfs_mock.expect_open_blob(blob_hash).await.expect_close().await;
},
async {
diff --git a/src/sys/pkg/lib/fuchsia-pkg-testing/src/blobfs.rs b/src/sys/pkg/lib/fuchsia-pkg-testing/src/blobfs.rs
index eeb3edb..964ea17 100644
--- a/src/sys/pkg/lib/fuchsia-pkg-testing/src/blobfs.rs
+++ b/src/sys/pkg/lib/fuchsia-pkg-testing/src/blobfs.rs
@@ -5,7 +5,7 @@
//! Fake and mock implementation of blobfs for blobfs::Client.
use {
- fidl::endpoints::RequestStream as _,
+ fidl::{encoding::Decodable as _, endpoints::RequestStream as _},
fidl_fuchsia_io::{
DirectoryMarker, DirectoryProxy, DirectoryRequest, DirectoryRequestStream, FileObject,
FileRequest, FileRequestStream, NodeInfo,
@@ -142,6 +142,16 @@
}
}
+ /// Succeeds the open request.
+ ///
+ /// # Panics
+ ///
+ /// Panics on error
+ pub async fn then_send_on_open_readable(mut self) -> Self {
+ self.send_on_open_with_readable(Status::OK);
+ self
+ }
+
/// Succeeds the open request, then verifies the blob is immediately closed (possibly after
/// handling a single Close request).
///
@@ -185,4 +195,68 @@
Some(other) => panic!("unexpected request: {:?}", other),
}
}
+
+ /// Handle read_at requests with the given data and offset until all data after offset is read.
+ ///
+ /// # Panics
+ ///
+ /// Panics on error or if data not read sequentially.
+ pub async fn expect_read_at(mut self, data: &[u8], start_offset: usize) -> Self {
+ let mut expected_offset = start_offset;
+ while expected_offset < data.len() {
+ match self.stream.next().await {
+ Some(Ok(FileRequest::ReadAt { count, offset, responder })) => {
+ let offset: usize = offset.try_into().unwrap();
+ assert_eq!(offset, expected_offset);
+ let count = min(count.try_into().unwrap(), data.len() - offset);
+ responder.send(Status::OK.into_raw(), &data[offset..offset + count]).unwrap();
+ expected_offset += count;
+ }
+ other => panic!("unexpected request: {:?}", other),
+ }
+ }
+ self
+ }
+
+ /// Handle one get_attr request with the given blob size.
+ ///
+ /// # Panics
+ ///
+ /// Panics on error
+ pub async fn expect_get_attr(mut self, blob_size: u64) -> Self {
+ match self.stream.next().await {
+ Some(Ok(FileRequest::GetAttr { responder })) => {
+ let mut attr = fidl_fuchsia_io::NodeAttributes::new_empty();
+ attr.content_size = blob_size;
+ responder.send(Status::OK.into_raw(), &mut attr).unwrap();
+ }
+ other => panic!("unexpected request: {:?}", other),
+ }
+ self
+ }
+
+ /// Handle all read_at requests with the given blob data until close is called.
+ ///
+ /// # Panics
+ ///
+ /// Panics on error
+ pub async fn handle_read_at_until_close(mut self, data: &[u8]) {
+ loop {
+ match self.stream.next().await {
+ Some(Ok(FileRequest::ReadAt { count, offset, responder })) => {
+ let offset: usize = offset.try_into().unwrap();
+ let count = min(count.try_into().unwrap(), data.len() - offset);
+ responder.send(Status::OK.into_raw(), &data[offset..offset + count]).unwrap();
+ }
+ Some(Ok(FileRequest::Close { responder })) => {
+ let _ = responder.send(Status::OK.into_raw());
+ return;
+ }
+ None => {
+ return;
+ }
+ other => panic!("unexpected request: {:?}", other),
+ }
+ }
+ }
}