| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #![deny(missing_docs)] |
| |
| //! Typesafe wrappers around the /blob filesystem. |
| |
| use { |
| fidl_fuchsia_io::{ |
| DirectoryMarker, DirectoryProxy, DirectoryRequestStream, FileObject, FileProxy, NodeInfo, |
| }, |
| fidl_fuchsia_io2::UnlinkOptions, |
| fuchsia_hash::{Hash, ParseHashError}, |
| fuchsia_syslog::fx_log_warn, |
| fuchsia_zircon::{self as zx, AsHandleRef as _, Status}, |
| futures::StreamExt as _, |
| std::collections::HashSet, |
| thiserror::Error, |
| }; |
| |
| /// Blobfs client errors. |
| #[derive(Debug, Error)] |
| #[allow(missing_docs)] |
| pub enum BlobfsError { |
| #[error("while opening blobfs dir")] |
| OpenDir(#[from] io_util::node::OpenError), |
| |
| #[error("while listing blobfs dir")] |
| ReadDir(#[source] files_async::Error), |
| |
| #[error("while deleting blob")] |
| Unlink(#[source] Status), |
| |
| #[error("while parsing blob merkle hash")] |
| ParseHash(#[from] ParseHashError), |
| |
| #[error("FIDL error")] |
| Fidl(#[from] fidl::Error), |
| } |
| |
| /// Blobfs client |
| #[derive(Debug, Clone)] |
| pub struct Client { |
| proxy: DirectoryProxy, |
| } |
| |
| impl Client { |
| /// Returns an client connected to blobfs from the current component's namespace. |
| pub fn open_from_namespace() -> Result<Self, BlobfsError> { |
| let proxy = io_util::directory::open_in_namespace( |
| "/blob", |
| fidl_fuchsia_io::OPEN_RIGHT_READABLE | fidl_fuchsia_io::OPEN_RIGHT_WRITABLE, |
| )?; |
| Ok(Client { proxy }) |
| } |
| |
| /// Returns an client connected to blobfs from the given blobfs root dir. |
| pub fn new(proxy: DirectoryProxy) -> Self { |
| Client { proxy } |
| } |
| |
| /// Creates a new client backed by the returned request stream. This constructor should not be |
| /// used outside of tests. |
| /// |
| /// # Panics |
| /// |
| /// Panics on error |
| pub fn new_test() -> (Self, DirectoryRequestStream) { |
| let (proxy, stream) = |
| fidl::endpoints::create_proxy_and_stream::<DirectoryMarker>().unwrap(); |
| |
| (Self { proxy }, stream) |
| } |
| |
| /// Returns the list of known blobs in blobfs. |
| pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> { |
| let entries = files_async::readdir(&self.proxy).await.map_err(BlobfsError::ReadDir)?; |
| |
| entries |
| .into_iter() |
| .filter(|entry| entry.kind == files_async::DirentKind::File) |
| .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash)) |
| .collect() |
| } |
| |
| /// Delete the blob with the given merkle hash. |
| pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> { |
| self.proxy |
| .unlink2(&blob.to_string(), UnlinkOptions::EMPTY) |
| .await? |
| .map_err(|s| BlobfsError::Unlink(Status::from_raw(s))) |
| } |
| |
| /// Open the blob for reading. |
| pub async fn open_blob_for_read( |
| &self, |
| blob: &Hash, |
| ) -> Result<FileProxy, io_util::node::OpenError> { |
| io_util::directory::open_file( |
| &self.proxy, |
| &blob.to_string(), |
| fidl_fuchsia_io::OPEN_RIGHT_READABLE, |
| ) |
| .await |
| } |
| |
| /// Returns whether blobfs has a blob with the given hash. |
| pub async fn has_blob(&self, blob: &Hash) -> bool { |
| let file = match io_util::directory::open_file_no_describe( |
| &self.proxy, |
| &blob.to_string(), |
| fidl_fuchsia_io::OPEN_FLAG_DESCRIBE | fidl_fuchsia_io::OPEN_RIGHT_READABLE, |
| ) { |
| Ok(file) => file, |
| Err(_) => return false, |
| }; |
| |
| let mut events = file.take_event_stream(); |
| |
| let fidl_fuchsia_io::FileEvent::OnOpen_ { s, info } = match events.next().await { |
| Some(Ok(event)) => event, |
| _ => return false, |
| }; |
| |
| if Status::from_raw(s) != Status::OK { |
| return false; |
| } |
| |
| let event = match info { |
| Some(info) => match *info { |
| NodeInfo::File(FileObject { event: Some(event), stream: None }) => event, |
| _ => return false, |
| }, |
| _ => return false, |
| }; |
| |
| // Check that the USER_0 signal has been asserted on the file's event to make sure we |
| // return false on the edge case of the blob is current being written. |
| match event.wait_handle(zx::Signals::USER_0, zx::Time::INFINITE_PAST) { |
| Ok(_) => true, |
| Err(status) => { |
| if status != Status::TIMED_OUT { |
| fx_log_warn!("blobfs: unknown error asserting blob existence: {}", status); |
| } |
| false |
| } |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::*, blobfs_ramdisk::BlobfsRamdisk, fidl_fuchsia_io::DirectoryRequest, |
| fuchsia_async as fasync, fuchsia_merkle::MerkleTree, futures::stream::TryStreamExt, |
| maplit::hashset, matches::assert_matches, std::io::Write as _, |
| }; |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn list_known_blobs_empty() { |
| let blobfs = BlobfsRamdisk::start().unwrap(); |
| let client = Client::new(blobfs.root_dir_proxy().unwrap()); |
| |
| assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new()); |
| blobfs.stop().await.unwrap(); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn list_known_blobs() { |
| let blobfs = BlobfsRamdisk::builder() |
| .with_blob(&b"blob 1"[..]) |
| .with_blob(&b"blob 2"[..]) |
| .start() |
| .unwrap(); |
| let client = Client::new(blobfs.root_dir_proxy().unwrap()); |
| |
| let expected = blobfs.list_blobs().unwrap().into_iter().collect(); |
| assert_eq!(client.list_known_blobs().await.unwrap(), expected); |
| blobfs.stop().await.unwrap(); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn delete_blob_and_then_list() { |
| let blobfs = BlobfsRamdisk::builder() |
| .with_blob(&b"blob 1"[..]) |
| .with_blob(&b"blob 2"[..]) |
| .start() |
| .unwrap(); |
| let client = Client::new(blobfs.root_dir_proxy().unwrap()); |
| |
| let merkle = MerkleTree::from_reader(&b"blob 1"[..]).unwrap().root(); |
| assert_matches!(client.delete_blob(&merkle).await, Ok(())); |
| |
| let expected = hashset! {MerkleTree::from_reader(&b"blob 2"[..]).unwrap().root()}; |
| assert_eq!(client.list_known_blobs().await.unwrap(), expected); |
| blobfs.stop().await.unwrap(); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn delete_non_existing_blob() { |
| let blobfs = BlobfsRamdisk::start().unwrap(); |
| let client = Client::new(blobfs.root_dir_proxy().unwrap()); |
| let blob_merkle = Hash::from([1; 32]); |
| |
| assert_matches!( |
| client.delete_blob(&blob_merkle).await, |
| Err(BlobfsError::Unlink(Status::NOT_FOUND)) |
| ); |
| blobfs.stop().await.unwrap(); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn delete_blob_mock() { |
| let (client, mut stream) = Client::new_test(); |
| let blob_merkle = Hash::from([1; 32]); |
| fasync::Task::spawn(async move { |
| match stream.try_next().await.unwrap().unwrap() { |
| DirectoryRequest::Unlink2 { name, responder, .. } => { |
| assert_eq!(name, blob_merkle.to_string()); |
| responder.send(&mut Ok(())).unwrap(); |
| } |
| other => panic!("unexpected request: {:?}", other), |
| } |
| }) |
| .detach(); |
| |
| assert_matches!(client.delete_blob(&blob_merkle).await, Ok(())); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn has_blob() { |
| let blobfs = BlobfsRamdisk::builder().with_blob(&b"blob 1"[..]).start().unwrap(); |
| let client = Client::new(blobfs.root_dir_proxy().unwrap()); |
| |
| assert_eq!( |
| client.has_blob(&MerkleTree::from_reader(&b"blob 1"[..]).unwrap().root()).await, |
| true |
| ); |
| assert_eq!(client.has_blob(&Hash::from([1; 32])).await, false); |
| |
| blobfs.stop().await.unwrap(); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn has_blob_return_false_if_blob_is_partially_written() { |
| let blobfs = BlobfsRamdisk::start().unwrap(); |
| let client = Client::new(blobfs.root_dir_proxy().unwrap()); |
| |
| let blob = [3; 1024]; |
| let hash = MerkleTree::from_reader(&blob[..]).unwrap().root(); |
| |
| let mut file = blobfs.root_dir().unwrap().write_file(hash.to_string(), 0o777).unwrap(); |
| assert_eq!(client.has_blob(&hash).await, false); |
| file.set_len(blob.len() as u64).unwrap(); |
| assert_eq!(client.has_blob(&hash).await, false); |
| file.write_all(&blob[..512]).unwrap(); |
| assert_eq!(client.has_blob(&hash).await, false); |
| file.write_all(&blob[512..]).unwrap(); |
| assert_eq!(client.has_blob(&hash).await, true); |
| |
| blobfs.stop().await.unwrap(); |
| } |
| } |