blob: c88dade9fd0a25180d0d37cb4ce5e53eb40291b8 [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! This module contains the [`BlobDirectory`] node type used to represent a directory of immutable
//! content-addressable blobs.
use {
crate::{
component::map_to_raw_status,
fuchsia::{
directory::FxDirectory,
fxblob::{blob::FxBlob, writer::FxDeliveryBlob},
node::{FxNode, GetResult, OpenedNode},
volume::{FxVolume, RootDir},
},
},
anyhow::{anyhow, bail, ensure, Error},
async_trait::async_trait,
fidl::endpoints::{create_proxy, ClientEnd, Proxy as _, ServerEnd},
fidl_fuchsia_fxfs::{
BlobCreatorRequest, BlobCreatorRequestStream, BlobReaderRequest, BlobReaderRequestStream,
BlobWriterMarker, CreateBlobError,
},
fidl_fuchsia_io::{
self as fio, FilesystemInfo, MutableNodeAttributes, NodeAttributeFlags, NodeAttributes,
NodeMarker, WatchMask,
},
fuchsia_async as fasync,
fuchsia_hash::Hash,
fuchsia_merkle::{MerkleTree, MerkleTreeBuilder},
fuchsia_zircon::Status,
futures::{FutureExt, TryStreamExt},
fxfs::{
errors::FxfsError,
log::*,
object_handle::ReadObjectHandle,
object_store::{
self,
transaction::{lock_keys, LockKey, Options},
HandleOptions, ObjectDescriptor, ObjectStore, BLOB_MERKLE_ATTRIBUTE_ID,
},
serialized_types::BlobMetadata,
},
std::{str::FromStr, sync::Arc},
vfs::{
directory::{
dirents_sink::{self, Sink},
entry::{DirectoryEntry, EntryInfo, OpenRequest},
entry_container::{Directory as VfsDirectory, DirectoryWatcher, MutableDirectory},
mutable::connection::MutableConnection,
traversal_position::TraversalPosition,
},
execution_scope::ExecutionScope,
path::Path,
ToObjectRequest,
},
};
/// A flat directory containing content-addressable blobs (names are their hashes).
/// It is not possible to create sub-directories.
/// It is not possible to write to an existing blob.
/// It is not possible to open or read a blob until it is written and verified.
pub struct BlobDirectory {
directory: Arc<FxDirectory>,
}
/// Instead of constantly switching back and forth between strings and hashes. Do it once and then
/// just pass around a reference to that.
pub(crate) struct Identifier {
pub string: String,
pub hash: Hash,
}
impl Identifier {
pub fn from_str(string: &str) -> Result<Self, Error> {
Ok(Self {
string: string.to_owned(),
hash: Hash::from_str(string).map_err(|_| FxfsError::InvalidArgs)?,
})
}
pub fn from_hash(hash: Hash) -> Self {
Self { string: hash.to_string(), hash }
}
}
#[async_trait]
impl RootDir for BlobDirectory {
fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry> {
self
}
fn as_directory(self: Arc<Self>) -> Arc<dyn VfsDirectory> {
self
}
fn as_node(self: Arc<Self>) -> Arc<dyn FxNode> {
self as Arc<dyn FxNode>
}
fn on_open(self: Arc<Self>) {
fasync::Task::spawn(async move {
if let Err(e) = self.prefetch_blobs().await {
warn!("Failed to prefetch blobs: {:?}", e);
}
})
.detach();
}
async fn handle_blob_creator_requests(self: Arc<Self>, mut requests: BlobCreatorRequestStream) {
while let Ok(Some(request)) = requests.try_next().await {
match request {
BlobCreatorRequest::Create { responder, hash, .. } => {
responder.send(self.create_blob(Hash::from(hash)).await).unwrap_or_else(
|error| {
tracing::error!(?error, "failed to send Create response");
},
);
}
}
}
}
async fn handle_blob_reader_requests(self: Arc<Self>, mut requests: BlobReaderRequestStream) {
while let Ok(Some(request)) = requests.try_next().await {
match request {
BlobReaderRequest::GetVmo { blob_hash, responder } => {
responder
.send(
self.clone()
.get_blob_vmo(blob_hash.into())
.await
.map_err(map_to_raw_status),
)
.unwrap_or_else(|error| {
tracing::error!(?error, "failed to send GetVmo response");
});
}
};
}
}
}
impl BlobDirectory {
fn new(directory: FxDirectory) -> Self {
fuchsia_merkle::crypto_library_init();
Self { directory: Arc::new(directory) }
}
pub fn directory(&self) -> &Arc<FxDirectory> {
&self.directory
}
pub fn volume(&self) -> &Arc<FxVolume> {
self.directory.volume()
}
fn store(&self) -> &ObjectStore {
self.directory.store()
}
async fn prefetch_blobs(self: &Arc<Self>) -> Result<(), Error> {
let store = self.store();
let fs = store.filesystem();
let dirents = {
let _guard = fs
.lock_manager()
.read_lock(lock_keys![LockKey::object(
store.store_object_id(),
self.directory.object_id()
)])
.await;
let mut dirents = vec![];
let layer_set = store.tree().layer_set();
let mut merger = layer_set.merger();
let mut iter = self.directory.directory().iter(&mut merger).await?;
let mut num = 0;
let limit = self.directory.directory().owner().dirent_cache().limit();
while let Some((name, object_id, _)) = iter.get() {
dirents.push((Identifier::from_str(name)?, object_id));
iter.advance().await?;
num += 1;
if num >= limit {
break;
}
}
dirents
};
for (identifier, object_id) in dirents {
if let Ok(node) = self.get_or_load_node(object_id, &identifier).await {
self.directory.directory().owner().dirent_cache().insert(
self.directory.object_id(),
identifier.string,
node,
);
}
}
Ok(())
}
pub async fn open_blob(self: &Arc<Self>, hash: Hash) -> Result<Arc<FxBlob>, Error> {
self.lookup(fio::OpenFlags::RIGHT_READABLE, Identifier::from_hash(hash))
.await?
.take()
.into_any()
.downcast::<FxBlob>()
.map_err(|_| FxfsError::Inconsistent.into())
}
pub(crate) async fn lookup(
self: &Arc<Self>,
flags: fio::OpenFlags,
id: Identifier,
) -> Result<OpenedNode<dyn FxNode>, Error> {
let store = self.store();
let fs = store.filesystem();
// TODO(https://fxbug.dev/42073113): Create the transaction here if we might need to create the object
// so that we have a lock in place.
let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
// A lock needs to be held over searching the directory and incrementing the open count.
let guard = fs.lock_manager().read_lock(keys.clone()).await;
let child_node = match self
.directory
.directory()
.owner()
.dirent_cache()
.lookup(&(self.directory.object_id(), &id.string))
{
Some(node) => Some(node),
None => {
if let Some((object_id, _)) = self.directory.directory().lookup(&id.string).await? {
let node = self.get_or_load_node(object_id, &id).await?;
self.directory.directory().owner().dirent_cache().insert(
self.directory.object_id(),
id.string.clone(),
node.clone(),
);
Some(node)
} else {
None
}
}
};
match child_node {
Some(node) => {
ensure!(
!flags.contains(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT),
FxfsError::AlreadyExists
);
ensure!(!flags.contains(fio::OpenFlags::RIGHT_WRITABLE), FxfsError::AccessDenied);
match node.object_descriptor() {
ObjectDescriptor::File => {
ensure!(!flags.contains(fio::OpenFlags::DIRECTORY), FxfsError::NotDir)
}
_ => bail!(FxfsError::Inconsistent),
}
// TODO(https://fxbug.dev/42073113): Test that we can't open a blob while still writing it.
Ok(OpenedNode::new(node))
}
None => {
std::mem::drop(guard);
ensure!(flags.contains(fio::OpenFlags::CREATE), FxfsError::NotFound);
ensure!(flags.contains(fio::OpenFlags::RIGHT_WRITABLE), FxfsError::AccessDenied);
let mut transaction = fs.clone().new_transaction(keys, Options::default()).await?;
let handle = ObjectStore::create_object(
self.volume(),
&mut transaction,
// Checksums are redundant for blobs, which are already content-verified.
HandleOptions { skip_checksums: true, ..Default::default() },
None,
None,
)
.await?;
let node = OpenedNode::new(
FxDeliveryBlob::new(self.clone(), id.hash, handle) as Arc<dyn FxNode>
);
// Add the object to the graveyard so that it's cleaned up if we crash.
store.add_to_graveyard(&mut transaction, node.object_id());
// Note that we don't bother notifying watchers yet. Nothing else should be able to
// see this object yet.
transaction.commit().await?;
Ok(node)
}
}
}
// Attempts to get a node from the node cache. If the node wasn't present in the cache, loads
// the object from the object store, installing the returned node into the cache and returns the
// newly created FxNode backed by the loaded object.
async fn get_or_load_node(
self: &Arc<Self>,
object_id: u64,
id: &Identifier,
) -> Result<Arc<dyn FxNode>, Error> {
let volume = self.volume();
match volume.cache().get_or_reserve(object_id).await {
GetResult::Node(node) => {
// Protecting against the scenario where a directory entry points to another node
// which has already been loaded and verified with the correct hash. We need to
// verify that the hash for the blob that is cached here matches the requested hash.
let blob = node.into_any().downcast::<FxBlob>().map_err(|_| {
anyhow!(FxfsError::Inconsistent).context("Loaded non-blob from cache")
})?;
ensure!(
blob.root() == id.hash,
anyhow!(FxfsError::Inconsistent)
.context("Loaded blob by node that did not match the given hash")
);
Ok(blob as Arc<dyn FxNode>)
}
GetResult::Placeholder(placeholder) => {
let object =
ObjectStore::open_object(volume, object_id, HandleOptions::default(), None)
.await?;
let (tree, metadata) = match object.read_attr(BLOB_MERKLE_ATTRIBUTE_ID).await? {
None => {
// If the file is uncompressed and is small enough, it may not have any
// metadata stored on disk.
(
MerkleTree::from_levels(vec![vec![id.hash]]),
BlobMetadata {
hashes: vec![],
chunk_size: 0,
compressed_offsets: vec![],
uncompressed_size: object.get_size(),
},
)
}
Some(data) => {
let mut metadata: BlobMetadata = bincode::deserialize_from(&*data)?;
let tree = if metadata.hashes.is_empty() {
MerkleTree::from_levels(vec![vec![id.hash]])
} else {
let mut builder = MerkleTreeBuilder::new();
for hash in std::mem::take(&mut metadata.hashes) {
builder.push_data_hash(hash.into());
}
let tree = builder.finish();
ensure!(tree.root() == id.hash, FxfsError::Inconsistent);
tree
};
(tree, metadata)
}
};
let node = FxBlob::new(
object,
tree,
metadata.chunk_size,
metadata.compressed_offsets,
metadata.uncompressed_size,
) as Arc<dyn FxNode>;
placeholder.commit(&node);
Ok(node)
}
}
}
async fn create_blob(
self: &Arc<Self>,
hash: Hash,
) -> Result<ClientEnd<BlobWriterMarker>, CreateBlobError> {
let flags = fio::OpenFlags::CREATE
| fio::OpenFlags::CREATE_IF_ABSENT
| fio::OpenFlags::RIGHT_WRITABLE
| fio::OpenFlags::RIGHT_READABLE;
let node = match self.lookup(flags, Identifier::from_hash(hash)).await {
Ok(node) => node,
Err(e) => {
if FxfsError::AlreadyExists.matches(&e) {
return Err(CreateBlobError::AlreadyExists);
} else {
tracing::error!("lookup failed: {:?}", e);
return Err(CreateBlobError::Internal);
}
}
};
let blob = node.downcast::<FxDeliveryBlob>().unwrap_or_else(|_| unreachable!());
let (client, server_end) = create_proxy::<BlobWriterMarker>().map_err(|e| {
tracing::error!("create_proxy failed for the BlobWriter protocol: {:?}", e);
CreateBlobError::Internal
})?;
let client_channel = client.into_channel().map_err(|_| {
tracing::error!("failed to create client channel");
CreateBlobError::Internal
})?;
let client_end = ClientEnd::new(client_channel.into());
self.volume().scope().spawn(async move {
if let Err(e) = blob.as_ref().handle_requests(server_end).await {
tracing::error!("Failed to handle blob writer requests: {}", e);
}
});
return Ok(client_end);
}
}
impl FxNode for BlobDirectory {
fn object_id(&self) -> u64 {
self.directory.object_id()
}
fn parent(&self) -> Option<Arc<FxDirectory>> {
self.directory.parent()
}
fn set_parent(&self, _parent: Arc<FxDirectory>) {
// This directory can't be renamed.
unreachable!();
}
fn open_count_add_one(&self) {}
fn open_count_sub_one(self: Arc<Self>) {}
fn object_descriptor(&self) -> ObjectDescriptor {
ObjectDescriptor::Directory
}
}
#[async_trait]
impl MutableDirectory for BlobDirectory {
async fn unlink(self: Arc<Self>, name: &str, must_be_directory: bool) -> Result<(), Status> {
if must_be_directory {
return Err(Status::INVALID_ARGS);
}
self.directory.clone().unlink(name, must_be_directory).await
}
async fn set_attrs(
&self,
flags: NodeAttributeFlags,
attrs: NodeAttributes,
) -> Result<(), Status> {
self.directory.set_attrs(flags, attrs).await
}
async fn update_attributes(&self, attributes: MutableNodeAttributes) -> Result<(), Status> {
self.directory.update_attributes(attributes).await
}
async fn sync(&self) -> Result<(), Status> {
self.directory.sync().await
}
async fn rename(
self: Arc<Self>,
_src_dir: Arc<dyn vfs::directory::entry_container::MutableDirectory + 'static>,
_src_name: Path,
_dst_name: Path,
) -> Result<(), Status> {
// Files in a blob directory can't be renamed.
Err(Status::NOT_SUPPORTED)
}
}
/// Implementation of VFS pseudo-directory for blobs. Forks a task per connection.
impl DirectoryEntry for BlobDirectory {
fn entry_info(&self) -> EntryInfo {
self.directory.entry_info()
}
fn open_entry(self: Arc<Self>, request: OpenRequest<'_>) -> Result<(), Status> {
request.open_dir(self)
}
}
#[async_trait]
impl vfs::node::Node for BlobDirectory {
async fn get_attrs(&self) -> Result<NodeAttributes, Status> {
self.directory.get_attrs().await
}
async fn get_attributes(
&self,
requested_attributes: fio::NodeAttributesQuery,
) -> Result<fio::NodeAttributes2, Status> {
self.directory.get_attributes(requested_attributes).await
}
fn query_filesystem(&self) -> Result<FilesystemInfo, Status> {
self.directory.query_filesystem()
}
}
/// Implements VFS entry container trait for directories, allowing manipulation of their contents.
#[async_trait]
impl VfsDirectory for BlobDirectory {
fn open(
self: Arc<Self>,
scope: ExecutionScope,
flags: fio::OpenFlags,
path: Path,
server_end: ServerEnd<NodeMarker>,
) {
flags.to_object_request(server_end).spawn(&scope.clone(), move |object_request| {
async move {
if path.is_empty() {
object_request.create_connection(
scope,
OpenedNode::new(self.clone())
.downcast::<BlobDirectory>()
.unwrap_or_else(|_| unreachable!())
.take(),
flags,
MutableConnection::create,
)
} else {
tracing::error!(
"Tried to open a blob via open(). Use the BlobCreator or BlobReader
instead."
);
Err(Status::NOT_SUPPORTED)
}
}
.boxed()
});
}
async fn read_dirents<'a>(
&'a self,
pos: &'a TraversalPosition,
sink: Box<dyn Sink>,
) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), Status> {
self.directory.read_dirents(pos, sink).await
}
fn register_watcher(
self: Arc<Self>,
scope: ExecutionScope,
mask: WatchMask,
watcher: DirectoryWatcher,
) -> Result<(), Status> {
self.directory.clone().register_watcher(scope, mask, watcher)
}
fn unregister_watcher(self: Arc<Self>, key: usize) {
self.directory.clone().unregister_watcher(key)
}
}
impl From<object_store::Directory<FxVolume>> for BlobDirectory {
fn from(dir: object_store::Directory<FxVolume>) -> Self {
Self::new(dir.into())
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::fuchsia::fxblob::testing::{new_blob_fixture, open_blob_fixture, BlobFixture},
assert_matches::assert_matches,
blob_writer::BlobWriter,
delivery_blob::{delivery_blob_path, CompressionMode, Type1Blob},
fidl_fuchsia_fxfs::BlobReaderMarker,
fuchsia_async::{self as fasync, DurationExt as _, TimeoutExt as _},
fuchsia_component::client::connect_to_protocol_at_dir_svc,
fuchsia_fs::directory::{
readdir_inclusive, DirEntry, DirentKind, WatchEvent, WatchMessage, Watcher,
},
fuchsia_zircon::DurationNum as _,
futures::StreamExt as _,
std::path::PathBuf,
};
#[fasync::run(10, test)]
async fn test_unlink() {
let fixture = new_blob_fixture().await;
let data = [1; 1000];
let hash = fixture.write_blob(&data, CompressionMode::Never).await;
assert_eq!(fixture.read_blob(hash).await, data);
fixture
.root()
.unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
.await
.expect("FIDL failed")
.expect("unlink failed");
fixture.close().await;
}
#[fasync::run(10, test)]
async fn test_readdir() {
let fixture = new_blob_fixture().await;
let data = [0xab; 2];
let hash;
{
let mut builder = MerkleTreeBuilder::new();
builder.write(&data);
hash = builder.finish().root();
let compressed_data: Vec<u8> = Type1Blob::generate(&data, CompressionMode::Always);
let (blob_volume_outgoing_dir, server_end) =
fidl::endpoints::create_proxy::<fio::DirectoryMarker>()
.expect("Create dir proxy to succeed");
fixture
.volumes_directory()
.serve_volume(fixture.volume(), server_end, true)
.expect("failed to serve blob volume");
let blob_proxy =
connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
&blob_volume_outgoing_dir,
)
.expect("failed to connect to the Blob service");
let blob_writer_client_end = blob_proxy
.create(&hash.into(), false)
.await
.expect("transport error on create")
.expect("failed to create blob");
let writer = blob_writer_client_end.into_proxy().unwrap();
let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
.await
.expect("failed to create BlobWriter");
blob_writer.write(&compressed_data[..1]).await.unwrap();
// Before the blob is finished writing, it shouldn't appear in the directory.
assert_eq!(
readdir_inclusive(fixture.root()).await.ok(),
Some(vec![DirEntry { name: ".".to_string(), kind: DirentKind::Directory }])
);
blob_writer.write(&compressed_data[1..]).await.unwrap();
}
assert_eq!(
readdir_inclusive(fixture.root()).await.ok(),
Some(vec![
DirEntry { name: ".".to_string(), kind: DirentKind::Directory },
DirEntry { name: format! {"{}", hash}, kind: DirentKind::File },
])
);
fixture.close().await;
}
#[fasync::run(10, test)]
async fn test_watchers() {
let fixture = new_blob_fixture().await;
let mut watcher = Watcher::new(fixture.root()).await.unwrap();
assert_eq!(
watcher.next().await,
Some(Ok(WatchMessage { event: WatchEvent::EXISTING, filename: PathBuf::from(".") }))
);
assert_matches!(
watcher.next().await,
Some(Ok(WatchMessage { event: WatchEvent::IDLE, .. }))
);
let data = vec![vec![0xab; 2], vec![0xcd; 65_536]];
let mut hashes = vec![];
let mut filenames = vec![];
for datum in data {
let mut builder = MerkleTreeBuilder::new();
builder.write(&datum);
let hash = builder.finish().root();
let filename = PathBuf::from(format!("{}", hash));
hashes.push(hash.clone());
filenames.push(filename.clone());
let compressed_data: Vec<u8> = Type1Blob::generate(&datum, CompressionMode::Always);
let (blob_volume_outgoing_dir, server_end) =
fidl::endpoints::create_proxy::<fio::DirectoryMarker>()
.expect("Create dir proxy to succeed");
fixture
.volumes_directory()
.serve_volume(fixture.volume(), server_end, true)
.expect("failed to serve blob volume");
let blob_proxy =
connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
&blob_volume_outgoing_dir,
)
.expect("failed to connect to the Blob service");
let blob_writer_client_end = blob_proxy
.create(&hash.into(), false)
.await
.expect("transport error on create")
.expect("failed to create blob");
let writer = blob_writer_client_end.into_proxy().unwrap();
let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
.await
.expect("failed to create BlobWriter");
blob_writer.write(&compressed_data[..compressed_data.len() - 1]).await.unwrap();
// Before the blob is finished writing, we shouldn't see any watch events for it.
assert_matches!(
watcher.next().on_timeout(500.millis().after_now(), || None).await,
None
);
blob_writer.write(&compressed_data[compressed_data.len() - 1..]).await.unwrap();
assert_eq!(
watcher.next().await,
Some(Ok(WatchMessage { event: WatchEvent::ADD_FILE, filename }))
);
}
for (hash, filename) in hashes.iter().zip(filenames) {
fixture
.root()
.unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
.await
.expect("FIDL call failed")
.expect("unlink failed");
assert_eq!(
watcher.next().await,
Some(Ok(WatchMessage { event: WatchEvent::REMOVE_FILE, filename }))
);
}
std::mem::drop(watcher);
fixture.close().await;
}
#[fasync::run(10, test)]
async fn test_rename_fails() {
let fixture = new_blob_fixture().await;
let data = vec![];
let hash = fixture.write_blob(&data, CompressionMode::Never).await;
let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
Status::ok(status).unwrap();
fixture
.root()
.rename(&format!("{}", delivery_blob_path(hash)), token.unwrap().into(), "foo")
.await
.expect("FIDL failed")
.expect_err("rename should fail");
fixture.close().await;
}
#[fasync::run(10, test)]
async fn test_link_fails() {
let fixture = new_blob_fixture().await;
let data = vec![];
let hash = fixture.write_blob(&data, CompressionMode::Never).await;
let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
Status::ok(status).unwrap();
let status = fixture
.root()
.link(&format!("{}", hash), token.unwrap().into(), "foo")
.await
.expect("FIDL failed");
assert_eq!(Status::from_raw(status), Status::NOT_SUPPORTED);
fixture.close().await;
}
#[fasync::run(10, test)]
async fn test_verify_cached_hash_node() {
let fixture = new_blob_fixture().await;
let data = vec![];
let hash = fixture.write_blob(&data, CompressionMode::Never).await;
let evil_hash =
Hash::from_str("2222222222222222222222222222222222222222222222222222222222222222")
.unwrap();
// Create a malicious link to the existing blob. This shouldn't be possible without special
// access either via internal apis or modifying the disk image.
{
let root = fixture
.volume()
.root()
.clone()
.as_node()
.into_any()
.downcast::<BlobDirectory>()
.unwrap()
.directory()
.clone();
root.clone()
.link(evil_hash.to_string(), root, &hash.to_string())
.await
.expect("Linking file");
}
let device = fixture.close().await;
let fixture = open_blob_fixture(device).await;
{
// Hold open a ref to keep it in the node cache.
let _vmo = fixture.get_blob_vmo(hash).await;
// Open the malicious link
let blob_reader =
connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
.expect("failed to connect to the BlobReader service");
blob_reader
.get_vmo(&evil_hash.into())
.await
.expect("transport error on BlobReader.GetVmo")
.expect_err("Hashes should mismatch");
}
fixture.close().await;
}
}