blob: 2fafd850d28dbbfd05176e84fc6f24da870ea847 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use super::*;
use assert_matches::assert_matches;
use fidl_fuchsia_io as fio;
use futures::stream::StreamExt as _;
use std::io::Write as _;
use std::time::Duration;
use zx::{AsHandleRef as _, Status};
// merkle root of b"Hello world!\n".
static BLOB_MERKLE: &str = "e5892a9b652ede2e19460a9103fd9cb3417f782a8d29f6c93ec0c31170a94af3";
static BLOB_CONTENTS: &[u8] = b"Hello world!\n";
fn ls_simple(d: openat::DirIter) -> Result<Vec<String>, Error> {
Ok(d.map(|i| i.map(|entry| entry.file_name().to_string_lossy().into()))
.collect::<Result<Vec<_>, _>>()?)
}
#[fuchsia_async::run_singlethreaded(test)]
async fn blobfs() -> Result<(), Error> {
let blobfs_server = BlobfsRamdisk::start().await?;
let d = blobfs_server.root_dir().context("get root dir")?;
assert_eq!(
ls_simple(d.list_dir(".").expect("list dir")).expect("list dir contents"),
Vec::<String>::new(),
);
let mut f = d.write_file(BLOB_MERKLE, 0).expect("open file 1");
f.set_len(6_u64).expect("truncate");
f.write_all(b"Hello").unwrap_or_else(|e| eprintln!("write 1 error: {e}"));
drop(f);
assert_eq!(
ls_simple(d.list_dir(".").expect("list dir")).expect("list dir contents"),
Vec::<String>::new(),
);
let mut f = d.write_file(BLOB_MERKLE, 0).expect("open file 2");
f.set_len(BLOB_CONTENTS.len() as u64).expect("truncate");
f.write_all(b"Hello ").expect("write file2.1");
f.write_all(b"world!\n").expect("write file2.2");
drop(f);
assert_eq!(
ls_simple(d.list_dir(".").expect("list dir")).expect("list dir contents"),
vec![BLOB_MERKLE.to_string()],
);
assert_eq!(
blobfs_server.list_blobs().expect("list blobs"),
BTreeSet::from([BLOB_MERKLE.parse().unwrap()]),
);
blobfs_server.stop().await?;
Ok(())
}
async fn open_blob(
blobfs: &fio::DirectoryProxy,
merkle: &str,
mut flags: fio::Flags,
) -> Result<(fio::FileProxy, zx::Event), zx::Status> {
let (file, server_end) = fidl::endpoints::create_proxy::<fio::FileMarker>();
flags |= fio::Flags::FLAG_SEND_REPRESENTATION;
blobfs.open(merkle, flags, &Default::default(), server_end.into_channel()).expect("open blob");
let event = file
.take_event_stream()
.next()
.await
.expect("fio::FileEvent stream must be non empty!")
.map_err(|e| match e {
fidl::Error::ClientChannelClosed { status, .. } => status,
_ => panic!("fio::FileEvent stream contained fidl error"),
})?;
let observer = match event {
fio::FileEvent::OnOpen_ { .. } => {
panic!("Expected OnRepresentation event, got OnOpen instead!");
}
fio::FileEvent::OnRepresentation { payload } => match payload {
fio::Representation::File(fio::FileInfo { observer: Some(observer), .. }) => observer,
other => panic!(
"ConnectionInfo from fio::FileEventStream to be File variant with event: {other:?}"
),
},
fio::FileEvent::_UnknownEvent { ordinal, .. } => panic!("Unknown file event {ordinal}"),
};
Ok((file, observer))
}
async fn write_blob(blob: &fio::FileProxy, bytes: &[u8]) -> Result<(), Error> {
let n = blob.write(bytes).await?.map_err(zx::Status::from_raw)?;
assert_eq!(n, bytes.len() as u64);
Ok(())
}
/// Verify the contents of a blob, or return any non-ok zx status encountered along the way.
async fn verify_blob(blob: &fio::FileProxy, expected_bytes: &[u8]) -> Result<(), Status> {
let actual_bytes = blob
.read_at(expected_bytes.len() as u64 + 1, 0)
.await
.unwrap()
.map_err(Status::from_raw)?;
assert_eq!(actual_bytes, expected_bytes);
Ok(())
}
async fn create_blob(
blobfs: &fio::DirectoryProxy,
merkle: &str,
contents: &[u8],
) -> Result<(), Error> {
let (blob, _) = open_blob(
blobfs,
merkle,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await?;
let () = blob.resize(contents.len() as u64).await?.map_err(Status::from_raw)?;
write_blob(&blob, contents).await?;
blob.close().await?.map_err(Status::from_raw)?;
let (blob, _) = open_blob(blobfs, merkle, fio::PERM_READABLE).await?;
verify_blob(&blob, contents).await?;
Ok(())
}
// Dropping a FileProxy synchronously closes the zircon channel, but it is not guaranteed
// that blobfs will respond to the channel closing before it responds to a request on a
// separate channel to open the same blob. This means a test case that:
// 1. opens writable + resizes on channel 0
// 2. drops channel 0
// 3. opens writable on channel 1
// can fail with ACCESS_DENIED in step 3, unless we wait.
async fn wait_for_blob_to_be_creatable(blobfs: &fio::DirectoryProxy, merkle: &str) {
for _ in 0..50 {
let res = open_blob(
blobfs,
merkle,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await;
match res {
Err(zx::Status::ACCESS_DENIED) => {
fuchsia_async::Timer::new(Duration::from_millis(10)).await;
continue;
}
Err(err) => {
panic!("unexpected error waiting for blob to become writable: {err:?}");
}
Ok((blob, _)) => {
// Explicitly close the blob so that when this function returns the blob
// is in the state (creatable + not openable for read). If we just drop
// the FileProxy instead of closing, the blob will be openable for read until
// blobfs asynchronously cleans up.
blob.close().await.unwrap().map_err(Status::from_raw).unwrap();
return;
}
}
}
panic!("timeout waiting for blob to become creatable");
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open_for_create_create() -> Result<(), Error> {
let blobfs_server = BlobfsRamdisk::start().await?;
let root_dir = blobfs_server.root_dir_proxy()?;
let (_blob, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await?;
create_blob(&root_dir, BLOB_MERKLE, BLOB_CONTENTS).await?;
blobfs_server.stop().await
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open_resize_drop_create() -> Result<(), Error> {
let blobfs_server = BlobfsRamdisk::start().await?;
let root_dir = blobfs_server.root_dir_proxy()?;
let (blob, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await?;
let () = blob.resize(BLOB_CONTENTS.len() as u64).await?.map_err(Status::from_raw)?;
drop(blob);
wait_for_blob_to_be_creatable(&root_dir, BLOB_MERKLE).await;
create_blob(&root_dir, BLOB_MERKLE, BLOB_CONTENTS).await?;
blobfs_server.stop().await
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open_partial_write_drop_create() -> Result<(), Error> {
let blobfs_server = BlobfsRamdisk::start().await?;
let root_dir = blobfs_server.root_dir_proxy()?;
let (blob, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await?;
let () = blob.resize(BLOB_CONTENTS.len() as u64).await?.map_err(Status::from_raw)?;
write_blob(&blob, &BLOB_CONTENTS[0..1]).await?;
drop(blob);
wait_for_blob_to_be_creatable(&root_dir, BLOB_MERKLE).await;
create_blob(&root_dir, BLOB_MERKLE, BLOB_CONTENTS).await?;
blobfs_server.stop().await
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open_partial_write_close_create() -> Result<(), Error> {
let blobfs_server = BlobfsRamdisk::start().await?;
let root_dir = blobfs_server.root_dir_proxy()?;
let (blob, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await?;
let () = blob.resize(BLOB_CONTENTS.len() as u64).await?.map_err(Status::from_raw)?;
write_blob(&blob, &BLOB_CONTENTS[0..1]).await?;
blob.close().await?.map_err(Status::from_raw)?;
create_blob(&root_dir, BLOB_MERKLE, BLOB_CONTENTS).await?;
blobfs_server.stop().await
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open_resize_open_for_create_fails() -> Result<(), Error> {
let blobfs_server = BlobfsRamdisk::start().await?;
let root_dir = blobfs_server.root_dir_proxy()?;
let (blob, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await?;
let () = blob.resize(BLOB_CONTENTS.len() as u64).await?.map_err(Status::from_raw)?;
let res = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await;
assert_matches!(res, Err(zx::Status::ACCESS_DENIED));
blobfs_server.stop().await
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open_open_resize_resize_fails() -> Result<(), Error> {
let blobfs_server = BlobfsRamdisk::start().await?;
let root_dir = blobfs_server.root_dir_proxy()?;
let (blob0, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await?;
let (blob1, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await?;
let () = blob0.resize(BLOB_CONTENTS.len() as u64).await?.map_err(Status::from_raw)?;
let result = blob1.resize(BLOB_CONTENTS.len() as u64).await?.map_err(Status::from_raw);
assert_matches!(result, Err(zx::Status::BAD_STATE));
blobfs_server.stop().await
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open0_open1_resize1_write1_succeeds() {
let blobfs_server = BlobfsRamdisk::start().await.unwrap();
let root_dir = blobfs_server.root_dir_proxy().unwrap();
let (_blob0, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await
.unwrap();
let (blob1, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await
.unwrap();
let () = blob1.resize(BLOB_CONTENTS.len() as u64).await.unwrap().unwrap();
let () = write_blob(&blob1, BLOB_CONTENTS).await.unwrap();
let (blob2, _) = open_blob(&root_dir, BLOB_MERKLE, fio::PERM_READABLE).await.unwrap();
let () = verify_blob(&blob2, BLOB_CONTENTS).await.unwrap();
let () = blobfs_server.stop().await.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open0_open1_resize0_write0_succeeds() {
let blobfs_server = BlobfsRamdisk::start().await.unwrap();
let root_dir = blobfs_server.root_dir_proxy().unwrap();
let (blob0, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await
.unwrap();
let (_blob1, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await
.unwrap();
let () = blob0.resize(BLOB_CONTENTS.len() as u64).await.unwrap().unwrap();
let () = write_blob(&blob0, BLOB_CONTENTS).await.unwrap();
let (blob2, _) = open_blob(&root_dir, BLOB_MERKLE, fio::PERM_READABLE).await.unwrap();
let () = verify_blob(&blob2, BLOB_CONTENTS).await.unwrap();
let () = blobfs_server.stop().await.unwrap();
}
/// On c++blobfs, open connections keep even partially written blobs alive.
/// Testing for blob presence by opening a blob can therefore conflict with a concurrent operation
/// that is writing a blob, as the partial blob from a failed write attempt can be kept alive by the
/// connection from the presence test and prevent the writer from recreating the blob for a retry
/// attempt.
#[fuchsia_async::run_singlethreaded(test)]
async fn open0_open1_resize0_close0_open2_fails() {
let blobfs_server = BlobfsRamdisk::start().await.unwrap();
let root_dir = blobfs_server.root_dir_proxy().unwrap();
let (blob0, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await
.unwrap();
let (blob1, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await
.unwrap();
let () = blob0.resize(BLOB_CONTENTS.len() as u64).await.unwrap().unwrap();
let () = blob0.close().await.unwrap().unwrap();
// The outstanding blob1 write connection keeps the partially written blob alive, preventing new
// write attempts.
assert_matches!(
open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE
)
.await,
Err(zx::Status::ACCESS_DENIED)
);
// Closing the outstanding connection allows the blob to be created.
let () = blob1.close().await.unwrap().unwrap();
let (blob2, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await
.unwrap();
let () = blob2.resize(BLOB_CONTENTS.len() as u64).await.unwrap().unwrap();
let () = write_blob(&blob2, BLOB_CONTENTS).await.unwrap();
let (blob3, _) = open_blob(&root_dir, BLOB_MERKLE, fio::PERM_READABLE).await.unwrap();
let () = verify_blob(&blob3, BLOB_CONTENTS).await.unwrap();
let () = blobfs_server.stop().await.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open_resize_open_read_fails() -> Result<(), Error> {
let blobfs_server = BlobfsRamdisk::start().await?;
let root_dir = blobfs_server.root_dir_proxy()?;
let (blob0, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await?;
let () = blob0.resize(BLOB_CONTENTS.len() as u64).await?.map_err(Status::from_raw)?;
let (blob1, _) = open_blob(&root_dir, BLOB_MERKLE, fio::PERM_READABLE).await?;
let result = blob1.read_at(1, 0).await?.map_err(zx::Status::from_raw);
assert_eq!(result, Err(zx::Status::BAD_STATE));
blobfs_server.stop().await
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open_for_create_wait_for_signal() -> Result<(), Error> {
let blobfs_server = BlobfsRamdisk::start().await?;
let root_dir = blobfs_server.root_dir_proxy()?;
let (blob0, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await?;
let (blob1, event) = open_blob(&root_dir, BLOB_MERKLE, fio::PERM_READABLE).await?;
let () = blob0.resize(BLOB_CONTENTS.len() as u64).await?.map_err(Status::from_raw)?;
assert_matches!(
event
.wait_handle(
zx::Signals::all(),
zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0))
)
.to_result(),
Err(zx::Status::TIMED_OUT)
);
write_blob(&blob0, BLOB_CONTENTS).await?;
assert_eq!(
event
.wait_handle(
zx::Signals::all(),
zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0))
)
.to_result()?,
zx::Signals::USER_0
);
verify_blob(&blob1, BLOB_CONTENTS).await?;
blobfs_server.stop().await
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open_resize_wait_for_signal() -> Result<(), Error> {
let blobfs_server = BlobfsRamdisk::start().await?;
let root_dir = blobfs_server.root_dir_proxy()?;
let (blob0, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await?;
let () = blob0.resize(BLOB_CONTENTS.len() as u64).await?.map_err(Status::from_raw)?;
let (blob1, event) = open_blob(&root_dir, BLOB_MERKLE, fio::PERM_READABLE).await?;
assert_matches!(
event
.wait_handle(
zx::Signals::all(),
zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0))
)
.to_result(),
Err(zx::Status::TIMED_OUT)
);
write_blob(&blob0, BLOB_CONTENTS).await?;
assert_eq!(
event
.wait_handle(
zx::Signals::all(),
zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0))
)
.to_result()?,
zx::Signals::USER_0
);
verify_blob(&blob1, BLOB_CONTENTS).await?;
blobfs_server.stop().await
}
#[fuchsia_async::run_singlethreaded(test)]
async fn empty_blob_readable_after_resize() {
let empty_hash = fuchsia_merkle::from_slice(&[][..]).root().to_string();
let blobfs_server = BlobfsRamdisk::start().await.unwrap();
let root_dir = blobfs_server.root_dir_proxy().unwrap();
let (blob0, _) = open_blob(
&root_dir,
&empty_hash,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await
.unwrap();
let () = blob0.resize(0).await.unwrap().map_err(Status::from_raw).unwrap();
let (blob1, event) = open_blob(&root_dir, &empty_hash, fio::PERM_READABLE).await.unwrap();
assert_matches!(
event
.wait_handle(
zx::Signals::all(),
zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0))
)
.to_result(),
Ok(zx::Signals::USER_0)
);
verify_blob(&blob1, &[]).await.unwrap();
blobfs_server.stop().await.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn open_missing_fails() -> Result<(), Error> {
let blobfs_server = BlobfsRamdisk::start().await?;
let root_dir = blobfs_server.root_dir_proxy()?;
let res = open_blob(&root_dir, BLOB_MERKLE, fio::PERM_READABLE).await;
assert_matches!(res, Err(zx::Status::NOT_FOUND));
blobfs_server.stop().await
}
// ReadDirents on /blob should only return blobs if they are fully written and do not have
// outstanding deletion requests.
#[fuchsia_async::run_singlethreaded(test)]
async fn readdirents_only_returns_valid_blobs() {
let blobfs_server = BlobfsRamdisk::start().await.unwrap();
let root_dir = blobfs_server.root_dir_proxy().unwrap();
// Blob doesn't appear until it is fully written.
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::new());
let (blob0, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await
.unwrap();
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::new());
let () = blob0.resize(BLOB_CONTENTS.len() as u64).await.unwrap().unwrap();
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::new());
let () = write_blob(&blob0, &BLOB_CONTENTS[0..BLOB_CONTENTS.len() - 1]).await.unwrap();
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::new());
let () = write_blob(&blob0, &BLOB_CONTENTS[BLOB_CONTENTS.len() - 1..]).await.unwrap();
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::from([BLOB_MERKLE.parse().unwrap()]));
// Blob disappears once a deletion request has been received, even if an outstanding connection
// is keeping it alive.
let (blob1, _) = open_blob(&root_dir, BLOB_MERKLE, fio::PERM_READABLE).await.unwrap();
let client = blobfs_server.client();
let () = client.delete_blob(&BLOB_MERKLE.parse().unwrap()).await.unwrap();
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::new());
let () = blob0.close().await.unwrap().unwrap();
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::new());
let () = blob1.close().await.unwrap().unwrap();
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::new());
let () = blobfs_server.stop().await.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn corrupt_create_fails_on_last_byte_write() -> Result<(), Error> {
let blobfs_server = BlobfsRamdisk::start().await?;
let root_dir = blobfs_server.root_dir_proxy()?;
let (blob, _) = open_blob(
&root_dir,
BLOB_MERKLE,
fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE | fio::PERM_WRITABLE,
)
.await?;
let () = blob.resize(BLOB_CONTENTS.len() as u64).await?.map_err(Status::from_raw)?;
write_blob(&blob, &BLOB_CONTENTS[..BLOB_CONTENTS.len() - 1]).await?;
let wrong_trailing_byte = !BLOB_CONTENTS.last().unwrap();
assert_matches!(
write_blob(&blob, &[wrong_trailing_byte]).await,
Err(e) if *e.downcast_ref::<zx::Status>().unwrap() == zx::Status::IO_DATA_INTEGRITY
);
blobfs_server.stop().await
}
#[fuchsia_async::run_singlethreaded(test)]
async fn fxblob_concurrent_creation_succeeds() {
let blobfs = BlobfsRamdisk::builder().fxblob().start().await.unwrap();
let creator = blobfs.blob_creator_proxy().unwrap().unwrap();
// 8,194 bytes so that the partial write exceeds 8,192 bytes.
let bytes = vec![0u8; 8194];
let hash = fuchsia_merkle::from_slice(&bytes).root();
let compressed = Type1Blob::generate(&bytes, CompressionMode::Never);
let compressed_len: u64 = compressed.len().try_into().unwrap();
let writer0 = creator.create(&hash, false).await.unwrap().unwrap().into_proxy();
let vmo0 = writer0.get_vmo(compressed_len).await.unwrap().unwrap();
let () = vmo0.write(&compressed, 0).unwrap();
let () = writer0.bytes_ready(compressed_len - 1).await.unwrap().unwrap();
assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::new());
let writer1 = creator.create(&hash, false).await.unwrap().unwrap().into_proxy();
let vmo1 = writer1.get_vmo(compressed_len).await.unwrap().unwrap();
let () = vmo1.write(&compressed, 0).unwrap();
let () = writer1.bytes_ready(compressed_len).await.unwrap().unwrap();
assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([hash]));
blobfs.stop().await.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn fxblob_create_already_present_returns_already_exists() {
let blobfs = BlobfsRamdisk::builder().fxblob().start().await.unwrap();
let creator = blobfs.blob_creator_proxy().unwrap().unwrap();
let bytes = vec![0u8; 1];
let hash = fuchsia_merkle::from_slice(&bytes).root();
let compressed = Type1Blob::generate(&bytes, CompressionMode::Never);
let compressed_len: u64 = compressed.len().try_into().unwrap();
let writer0 = creator.create(&hash, false).await.unwrap().unwrap().into_proxy();
let vmo0 = writer0.get_vmo(compressed_len).await.unwrap().unwrap();
let () = vmo0.write(&compressed, 0).unwrap();
let () = writer0.bytes_ready(compressed_len).await.unwrap().unwrap();
assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([hash]));
assert_matches!(
creator.create(&hash, false).await,
Ok(Err(ffxfs::CreateBlobError::AlreadyExists))
);
blobfs.stop().await.unwrap();
}
// ReadDirents on /blob should only return blobs if they are fully written and do not have
// outstanding deletion requests.
#[fuchsia_async::run_singlethreaded(test)]
async fn fxblob_readdirents_only_returns_valid_blobs() {
let blobfs_server = BlobfsRamdisk::builder().fxblob().start().await.unwrap();
let creator = blobfs_server.blob_creator_proxy().unwrap().unwrap();
let bytes = vec![0u8; 1];
let hash = fuchsia_merkle::from_slice(&bytes).root();
let compressed = Type1Blob::generate(&bytes, CompressionMode::Never);
let compressed_len: u64 = compressed.len().try_into().unwrap();
// Blob doesn't appear until it is fully written.
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::new());
let writer0 = creator.create(&hash, false).await.unwrap().unwrap().into_proxy();
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::new());
let vmo0 = writer0.get_vmo(compressed_len).await.unwrap().unwrap();
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::new());
let () = vmo0.write(&compressed, 0).unwrap();
let () = writer0.bytes_ready(compressed_len - 1).await.unwrap().unwrap();
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::new());
let () = writer0.bytes_ready(1).await.unwrap().unwrap();
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::from([hash]));
// Blob disappears once a deletion request has been received, even if an outstanding connection
// is keeping it alive.
let reader = blobfs_server.blob_reader_proxy().unwrap().unwrap();
let _vmo1: zx::Vmo = reader.get_vmo(&hash.into()).await.unwrap().unwrap();
let () = blobfs_server.client().delete_blob(&hash).await.unwrap();
assert_eq!(blobfs_server.list_blobs().unwrap(), BTreeSet::new());
let () = blobfs_server.stop().await.unwrap();
}