[pkg-cache] Use async far reader.

Fixed: 68929
Change-Id: I886a672fb4763e2d85510d103768c272bb227623
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/490159
Commit-Queue: Sen Jiang <senj@google.com>
Reviewed-by: Dan Johnson <computerdruid@google.com>
diff --git a/src/sys/pkg/bin/pkg-cache/src/dynamic_index.rs b/src/sys/pkg/bin/pkg-cache/src/dynamic_index.rs
index 2077e6d..9faaeec 100644
--- a/src/sys/pkg/bin/pkg-cache/src/dynamic_index.rs
+++ b/src/sys/pkg/bin/pkg-cache/src/dynamic_index.rs
@@ -179,12 +179,12 @@
     let (path, required_blobs) = {
         let file =
             blobfs.open_blob_for_read(&blob_hash).await.map_err(DynamicIndexError::OpenBlob)?;
-        let meta_far_blob =
-            io_util::file::read(&file).await.map_err(DynamicIndexError::ReadBlob)?;
-        // TODO: Switch to async far reader (fxbug.dev/68929) to save memory.
-        let mut meta_far = fuchsia_archive::Reader::new(std::io::Cursor::new(meta_far_blob))?;
-        let meta_package = MetaPackage::deserialize(&meta_far.read_file("meta/package")?[..])?;
-        let meta_contents = MetaContents::deserialize(&meta_far.read_file("meta/contents")?[..])?;
+        let mut meta_far =
+            fuchsia_archive::AsyncReader::new(io_util::file::AsyncFile::from_proxy(file)).await?;
+        let meta_package =
+            MetaPackage::deserialize(&meta_far.read_file("meta/package").await?[..])?;
+        let meta_contents =
+            MetaContents::deserialize(&meta_far.read_file("meta/contents").await?[..])?;
 
         (meta_package.into_path(), meta_contents.into_hashes().collect::<HashSet<_>>())
     };
@@ -621,7 +621,17 @@
 
         let ((), ()) = future::join(
             async {
-                blobfs_mock.expect_open_blob(meta_far_hash).await.expect_read(&meta_far).await;
+                blobfs_mock
+                    .expect_open_blob(meta_far_hash)
+                    .await
+                    .then_send_on_open_readable()
+                    .await
+                    .expect_read_at(&meta_far[..64], 0)
+                    .await
+                    .expect_get_attr(meta_far.len() as u64)
+                    .await
+                    .handle_read_at_until_close(&meta_far)
+                    .await;
                 blobfs_mock.expect_open_blob(blob_hash).await.expect_close().await;
             },
             async {
diff --git a/src/sys/pkg/lib/fuchsia-pkg-testing/src/blobfs.rs b/src/sys/pkg/lib/fuchsia-pkg-testing/src/blobfs.rs
index eeb3edb..964ea17 100644
--- a/src/sys/pkg/lib/fuchsia-pkg-testing/src/blobfs.rs
+++ b/src/sys/pkg/lib/fuchsia-pkg-testing/src/blobfs.rs
@@ -5,7 +5,7 @@
 //! Fake and mock implementation of blobfs for blobfs::Client.
 
 use {
-    fidl::endpoints::RequestStream as _,
+    fidl::{encoding::Decodable as _, endpoints::RequestStream as _},
     fidl_fuchsia_io::{
         DirectoryMarker, DirectoryProxy, DirectoryRequest, DirectoryRequestStream, FileObject,
         FileRequest, FileRequestStream, NodeInfo,
@@ -142,6 +142,16 @@
         }
     }
 
+    /// Succeeds the open request.
+    ///
+    /// # Panics
+    ///
+    /// Panics on error
+    pub async fn then_send_on_open_readable(mut self) -> Self {
+        self.send_on_open_with_readable(Status::OK);
+        self
+    }
+
     /// Succeeds the open request, then verifies the blob is immediately closed (possibly after
     /// handling a single Close request).
     ///
@@ -185,4 +195,68 @@
             Some(other) => panic!("unexpected request: {:?}", other),
         }
     }
+
+    /// Handle read_at requests with the given data and offset until all data after offset is read.
+    ///
+    /// # Panics
+    ///
+    /// Panics on error or if data not read sequentially.
+    pub async fn expect_read_at(mut self, data: &[u8], start_offset: usize) -> Self {
+        let mut expected_offset = start_offset;
+        while expected_offset < data.len() {
+            match self.stream.next().await {
+                Some(Ok(FileRequest::ReadAt { count, offset, responder })) => {
+                    let offset: usize = offset.try_into().unwrap();
+                    assert_eq!(offset, expected_offset);
+                    let count = min(count.try_into().unwrap(), data.len() - offset);
+                    responder.send(Status::OK.into_raw(), &data[offset..offset + count]).unwrap();
+                    expected_offset += count;
+                }
+                other => panic!("unexpected request: {:?}", other),
+            }
+        }
+        self
+    }
+
+    /// Handle one get_attr request with the given blob size.
+    ///
+    /// # Panics
+    ///
+    /// Panics on error
+    pub async fn expect_get_attr(mut self, blob_size: u64) -> Self {
+        match self.stream.next().await {
+            Some(Ok(FileRequest::GetAttr { responder })) => {
+                let mut attr = fidl_fuchsia_io::NodeAttributes::new_empty();
+                attr.content_size = blob_size;
+                responder.send(Status::OK.into_raw(), &mut attr).unwrap();
+            }
+            other => panic!("unexpected request: {:?}", other),
+        }
+        self
+    }
+
+    /// Handle all read_at requests with the given blob data until close is called.
+    ///
+    /// # Panics
+    ///
+    /// Panics on error
+    pub async fn handle_read_at_until_close(mut self, data: &[u8]) {
+        loop {
+            match self.stream.next().await {
+                Some(Ok(FileRequest::ReadAt { count, offset, responder })) => {
+                    let offset: usize = offset.try_into().unwrap();
+                    let count = min(count.try_into().unwrap(), data.len() - offset);
+                    responder.send(Status::OK.into_raw(), &data[offset..offset + count]).unwrap();
+                }
+                Some(Ok(FileRequest::Close { responder })) => {
+                    let _ = responder.send(Status::OK.into_raw());
+                    return;
+                }
+                None => {
+                    return;
+                }
+                other => panic!("unexpected request: {:?}", other),
+            }
+        }
+    }
 }