blob: 7021bd2e49b0e5505420b05287a34408854a0074 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
errors::FxfsError,
log::*,
lsm_tree::types::{ItemRef, LayerIterator},
object_handle::{
ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes, WriteObjectHandle,
},
object_store::{
extent_record::{ExtentKey, ExtentMode, ExtentValue},
object_manager::ObjectManager,
object_record::{
AttributeKey, FsverityMetadata, ObjectAttributes, ObjectItem, ObjectKey,
ObjectKeyData, ObjectKind, ObjectValue, Timestamp,
},
store_object_handle::{MaybeChecksums, NeedsTrim},
transaction::{
self, lock_keys, AssocObj, AssociatedObject, LockKey, Mutation,
ObjectStoreMutation, Options, Transaction,
},
HandleOptions, HandleOwner, ObjectStore, RootDigest, StoreObjectHandle, TrimMode,
TrimResult, DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID,
TRANSACTION_MUTATION_THRESHOLD,
},
range::RangeExt,
round::{round_down, round_up},
},
anyhow::{anyhow, bail, ensure, Context, Error},
async_trait::async_trait,
fidl_fuchsia_io as fio,
fsverity_merkle::{FsVerityHasher, FsVerityHasherOptions, MerkleTreeBuilder},
futures::{stream::FuturesUnordered, TryStreamExt},
fxfs_trace::trace,
mundane::hash::{Digest, Hasher, Sha256, Sha512},
std::{
cmp::min,
ops::{Bound, DerefMut, Range},
sync::{
atomic::{self, AtomicU64, Ordering},
Arc, Mutex,
},
},
storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef},
};
/// How much data each transaction will cover when writing an attribute across batches. Pulled from
/// `FLUSH_BATCH_SIZE` in paged_object_handle.rs.
pub const WRITE_ATTR_BATCH_SIZE: usize = 524_288;
/// DataObjectHandle is a typed handle for file-like objects that store data in the default data
/// attribute. In addition to traditional files, this means things like the journal, superblocks,
/// and layer files.
///
/// It caches the content size of the data attribute it was configured for, and has helpers for
/// complex extent manipulation, as well as implementations of ReadObjectHandle and
/// WriteObjectHandle.
pub struct DataObjectHandle<S: HandleOwner> {
handle: StoreObjectHandle<S>,
attribute_id: u64,
content_size: AtomicU64,
fsverity_state: Mutex<FsverityState>,
}
#[derive(Debug)]
pub enum FsverityState {
None,
Started,
Pending(FsverityStateInner),
Some(FsverityStateInner),
}
#[derive(Debug)]
pub struct FsverityStateInner {
descriptor: FsverityMetadata,
// TODO(b/309656632): This should store the entire merkle tree and not just the leaf nodes.
// Potentially store a pager-backed vmo instead of passing around a boxed array.
merkle_tree: Box<[u8]>,
}
impl FsverityStateInner {
pub fn new(descriptor: FsverityMetadata, merkle_tree: Box<[u8]>) -> Self {
FsverityStateInner { descriptor, merkle_tree }
}
}
impl<S: HandleOwner> DataObjectHandle<S> {
pub fn new(
owner: Arc<S>,
object_id: u64,
permanent_keys: bool,
attribute_id: u64,
size: u64,
fsverity_state: FsverityState,
options: HandleOptions,
trace: bool,
) -> Self {
Self {
handle: StoreObjectHandle::new(owner, object_id, permanent_keys, options, trace),
attribute_id,
content_size: AtomicU64::new(size),
fsverity_state: Mutex::new(fsverity_state),
}
}
pub fn owner(&self) -> &Arc<S> {
self.handle.owner()
}
pub fn attribute_id(&self) -> u64 {
self.attribute_id
}
pub fn verified_file(&self) -> bool {
matches!(*self.fsverity_state.lock().unwrap(), FsverityState::Some(_))
}
pub fn store(&self) -> &ObjectStore {
self.handle.store()
}
pub fn trace(&self) -> bool {
self.handle.trace()
}
pub fn handle(&self) -> &StoreObjectHandle<S> {
&self.handle
}
/// Sets `self.fsverity_state` to FsverityState::Started. Called at the top of `enable_verity`.
/// If another caller has already started but not completed `enabled_verity`, returns
/// FxfsError::AlreadyBound. If another caller has already completed `enable_verity`, returns
/// FxfsError::AlreadyExists.
pub fn set_fsverity_state_started(&self) -> Result<(), Error> {
let mut fsverity_guard = self.fsverity_state.lock().unwrap();
match *fsverity_guard {
FsverityState::None => {
*fsverity_guard = FsverityState::Started;
Ok(())
}
FsverityState::Started | FsverityState::Pending(_) => {
Err(anyhow!(FxfsError::Unavailable))
}
FsverityState::Some(_) => Err(anyhow!(FxfsError::AlreadyExists)),
}
}
/// Sets `self.fsverity_state` to Pending. Must be called before `finalize_fsverity_state()`.
/// Asserts that the prior state of `self.fsverity_state` was `FsverityState::Started`.
pub fn set_fsverity_state_pending(&self, descriptor: FsverityMetadata, merkle_tree: Box<[u8]>) {
let mut fsverity_guard = self.fsverity_state.lock().unwrap();
assert!(matches!(*fsverity_guard, FsverityState::Started));
*fsverity_guard = FsverityState::Pending(FsverityStateInner { descriptor, merkle_tree });
}
/// Sets `self.fsverity_state` to Some. Panics if the prior state of `self.fsverity_state` was
/// not `FsverityState::Pending(_)`.
pub fn finalize_fsverity_state(&self) {
let mut fsverity_state_guard = self.fsverity_state.lock().unwrap();
let mut_fsverity_state = fsverity_state_guard.deref_mut();
let fsverity_state = std::mem::replace(mut_fsverity_state, FsverityState::None);
match fsverity_state {
FsverityState::None => panic!("Cannot go from FsverityState::None to Some"),
FsverityState::Started => panic!("Cannot go from FsverityState::Started to Some"),
FsverityState::Pending(inner) => *mut_fsverity_state = FsverityState::Some(inner),
FsverityState::Some(_) => panic!("Fsverity state was already set to Some"),
}
}
/// Sets `self.fsverity_state` directly to Some without going through the entire state machine.
/// Used to set `self.fsverity_state` on open of a verified file.
pub fn set_fsverity_state_some(&self, descriptor: FsverityMetadata, merkle_tree: Box<[u8]>) {
let mut fsverity_guard = self.fsverity_state.lock().unwrap();
assert!(matches!(*fsverity_guard, FsverityState::None));
*fsverity_guard = FsverityState::Some(FsverityStateInner { descriptor, merkle_tree });
}
/// Verifies contents of `buffer` against the corresponding hashes in the stored merkle tree.
/// `offset` is the logical offset in the file that `buffer` starts at. `offset` must be
/// block-aligned. Fails on non fsverity-enabled files.
async fn verify_data(&self, mut offset: usize, buffer: &[u8]) -> Result<(), Error> {
let block_size = self.block_size() as usize;
assert!(offset % block_size == 0);
let fsverity_state = self.fsverity_state.lock().unwrap();
match &*fsverity_state {
FsverityState::None => {
Err(anyhow!("Tried to verify read on a non verity-enabled file"))
}
FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
"Enable verity has not yet completed, fsverity state: {:?}",
&*fsverity_state
)),
FsverityState::Some(metadata) => {
let (hasher, digest_size) = match metadata.descriptor.root_digest {
RootDigest::Sha256(_) => {
let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
metadata.descriptor.salt.clone(),
block_size,
));
(hasher, <Sha256 as Hasher>::Digest::DIGEST_LEN)
}
RootDigest::Sha512(_) => {
let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
metadata.descriptor.salt.clone(),
block_size,
));
(hasher, <Sha512 as Hasher>::Digest::DIGEST_LEN)
}
};
let leaf_nodes: Vec<&[u8]> = metadata.merkle_tree.chunks(digest_size).collect();
fxfs_trace::duration!(c"fsverity-verify", "len" => buffer.len());
// TODO(b/318880297): Consider parallelizing computation.
for b in buffer.chunks(block_size) {
ensure!(
hasher.hash_block(b) == leaf_nodes[offset / block_size],
anyhow!(FxfsError::Inconsistent).context("Hash mismatch")
);
offset += block_size;
}
Ok(())
}
}
}
/// Extend the file with the given extent. The only use case for this right now is for files
/// that must exist at certain offsets on the device, such as super-blocks.
pub async fn extend<'a>(
&'a self,
transaction: &mut Transaction<'a>,
device_range: Range<u64>,
) -> Result<(), Error> {
let old_end =
round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
let new_size = old_end + device_range.end - device_range.start;
self.store()
.allocator()
.mark_allocated(transaction, self.store().store_object_id(), device_range.clone())
.await?;
transaction.add_with_object(
self.store().store_object_id,
Mutation::replace_or_insert_object(
ObjectKey::attribute(
self.object_id(),
self.attribute_id(),
AttributeKey::Attribute,
),
ObjectValue::attribute(new_size),
),
AssocObj::Borrowed(self),
);
transaction.add(
self.store().store_object_id,
Mutation::merge_object(
ObjectKey::extent(self.object_id(), self.attribute_id(), old_end..new_size),
ObjectValue::Extent(ExtentValue::new_raw(device_range.start)),
),
);
self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
}
// Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
// the data from `buf`.
async fn align_buffer(
&self,
offset: u64,
buf: BufferRef<'_>,
) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
self.handle.align_buffer(self.attribute_id(), offset, buf).await
}
// Writes potentially unaligned data at `device_offset` and returns checksums if requested. The
// data will be encrypted if necessary.
// `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
// the buffer in-place rather than copying to another buffer if the write is already aligned.
async fn write_at(
&self,
offset: u64,
buf: MutableBufferRef<'_>,
device_offset: u64,
) -> Result<MaybeChecksums, Error> {
self.handle.write_at(self.attribute_id(), offset, buf, device_offset).await
}
/// Zeroes the given range. The range must be aligned. Returns the amount of data deallocated.
pub async fn zero(
&self,
transaction: &mut Transaction<'_>,
range: Range<u64>,
) -> Result<(), Error> {
self.handle.zero(transaction, self.attribute_id(), range).await
}
/// The cached value for `self.fsverity_state` is set either in `open_object` or on
/// `enable_verity`. If set, translates `self.fsverity_state.descriptor` into an
/// fio::VerificationOptions instance and a root hash. Otherwise, returns None.
pub async fn get_descriptor(
&self,
) -> Result<Option<(fio::VerificationOptions, Vec<u8>)>, Error> {
let fsverity_state = self.fsverity_state.lock().unwrap();
match &*fsverity_state {
FsverityState::None => Ok(None),
FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
"Enable verity has not yet completed, fsverity state: {:?}",
&*fsverity_state
)),
FsverityState::Some(metadata) => {
let (options, root_hash) = match &metadata.descriptor.root_digest {
RootDigest::Sha256(root_hash) => {
let mut root_vec = root_hash.to_vec();
// Need to zero out the rest of the vector so that there's no garbage.
root_vec.extend_from_slice(&[0; 32]);
(
fio::VerificationOptions {
hash_algorithm: Some(fio::HashAlgorithm::Sha256),
salt: Some(metadata.descriptor.salt.clone()),
..Default::default()
},
root_vec,
)
}
RootDigest::Sha512(root_hash) => (
fio::VerificationOptions {
hash_algorithm: Some(fio::HashAlgorithm::Sha512),
salt: Some(metadata.descriptor.salt.clone()),
..Default::default()
},
root_hash.clone(),
),
};
Ok(Some((options, root_hash)))
}
}
}
/// Reads the data attribute and computes a merkle tree from the data. The values of the
/// parameters required to build the merkle tree are supplied by `descriptor` (i.e. salt,
/// hash_algorithm, etc.) Writes the leaf nodes of the merkle tree to an attribute with id
/// `FSVERITY_MERKLE_ATTRIBUTE_ID`. Updates the root_hash of the `descriptor` according to the
/// computed merkle tree and then replaces the ObjectValue of the data attribute with
/// ObjectValue::VerifiedAttribute, which stores the `descriptor` inline.
#[trace]
pub async fn enable_verity(&self, options: fio::VerificationOptions) -> Result<(), Error> {
self.set_fsverity_state_started()?;
// If the merkle attribute was tombstoned in the last attempt of `enable_verity`, flushing
// the graveyard should process the tombstone before we start rewriting the attribute.
if let Some(_) = self
.store()
.tree()
.find(&ObjectKey::graveyard_attribute_entry(
self.store().graveyard_directory_object_id(),
self.object_id(),
FSVERITY_MERKLE_ATTRIBUTE_ID,
))
.await?
{
self.store().filesystem().graveyard().flush().await;
}
let mut transaction = self.new_transaction().await?;
let hash_alg =
options.hash_algorithm.ok_or_else(|| anyhow!("No hash algorithm provided"))?;
let salt = options.salt.ok_or_else(|| anyhow!("No salt provided"))?;
let (root_digest, merkle_tree) = match hash_alg {
fio::HashAlgorithm::Sha256 => {
let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
salt.clone(),
self.block_size() as usize,
));
let mut builder = MerkleTreeBuilder::new(hasher);
let mut offset = 0;
let size = self.get_size();
// TODO(b/314836822): Consider further tuning the buffer size to optimize
// performance. Experimentally, most verity-enabled files are <256K.
let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
while offset < size {
// TODO(b/314842875): Consider optimizations for sparse files.
let read = self.read(offset, buf.as_mut()).await? as u64;
assert!(offset + read <= size);
builder.write(&buf.as_slice()[0..read as usize]);
offset += read;
}
let tree = builder.finish();
let merkle_leaf_nodes: Vec<u8> =
tree.as_ref()[0].iter().flat_map(|x| x.clone()).collect();
// TODO(b/314194485): Eventually want streaming writes.
// The merkle tree attribute should not require trimming because it should not
// exist.
self.handle
.write_new_attr_in_batches(
&mut transaction,
FSVERITY_MERKLE_ATTRIBUTE_ID,
&merkle_leaf_nodes,
WRITE_ATTR_BATCH_SIZE,
)
.await?;
let root: [u8; 32] = tree.root().try_into().unwrap();
(RootDigest::Sha256(root), merkle_leaf_nodes)
}
fio::HashAlgorithm::Sha512 => {
let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
salt.clone(),
self.block_size() as usize,
));
let mut builder = MerkleTreeBuilder::new(hasher);
let mut offset = 0;
let size = self.get_size();
// TODO(b/314836822): Consider further tuning the buffer size to optimize
// performance. Experimentally, most verity-enabled files are <256K.
let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
while offset < size {
// TODO(b/314842875): Consider optimizations for sparse files.
let read = self.read(offset, buf.as_mut()).await? as u64;
assert!(offset + read <= size);
builder.write(&buf.as_slice()[0..read as usize]);
offset += read;
}
let tree = builder.finish();
let merkle_leaf_nodes: Vec<u8> =
tree.as_ref()[0].iter().flat_map(|x| x.clone()).collect();
// TODO(b/314194485): Eventually want streaming writes.
// The merkle tree attribute should not require trimming because it should not
// exist.
self.handle
.write_new_attr_in_batches(
&mut transaction,
FSVERITY_MERKLE_ATTRIBUTE_ID,
&merkle_leaf_nodes,
WRITE_ATTR_BATCH_SIZE,
)
.await?;
(RootDigest::Sha512(tree.root().to_vec()), merkle_leaf_nodes)
}
_ => {
bail!(anyhow!(FxfsError::NotSupported)
.context(format!("hash algorithm not supported")));
}
};
if merkle_tree.len() > WRITE_ATTR_BATCH_SIZE {
transaction.add(
self.store().store_object_id,
Mutation::replace_or_insert_object(
ObjectKey::graveyard_attribute_entry(
self.store().graveyard_directory_object_id(),
self.object_id(),
FSVERITY_MERKLE_ATTRIBUTE_ID,
),
ObjectValue::None,
),
);
};
let descriptor = FsverityMetadata { root_digest, salt };
self.set_fsverity_state_pending(descriptor.clone(), merkle_tree.into());
transaction.add_with_object(
self.store().store_object_id(),
Mutation::replace_or_insert_object(
ObjectKey::attribute(
self.object_id(),
DEFAULT_DATA_ATTRIBUTE_ID,
AttributeKey::Attribute,
),
ObjectValue::verified_attribute(self.get_size(), descriptor),
),
AssocObj::Borrowed(self),
);
transaction.commit().await?;
Ok(())
}
/// Pre-allocate disk space for the given logical file range. If any part of the allocation
/// range is beyond the end of the file, the file size is updated.
/// NB: Do not hook this up yet! This is just for testing until all the fallocate features are
/// implemented.
#[cfg(test)]
async fn allocate(&self, range: Range<u64>) -> Result<(), Error> {
debug_assert!(range.start < range.end);
debug_assert_eq!(range.start % self.block_size(), 0);
debug_assert_eq!(range.end % self.block_size(), 0);
let mut transaction = self.new_transaction().await?;
let mut to_allocate = Vec::new();
let mut to_switch = Vec::new();
let mut new_range = range.clone();
{
let tree = &self.store().tree;
let layer_set = tree.layer_set();
let offset_key = ObjectKey::attribute(
self.object_id(),
self.attribute_id(),
AttributeKey::Extent(ExtentKey::search_key_from_offset(new_range.start)),
);
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Included(&offset_key)).await?;
loop {
match iter.get() {
Some(ItemRef {
key:
ObjectKey {
object_id,
data:
ObjectKeyData::Attribute(
attribute_id,
AttributeKey::Extent(extent_key),
),
},
value: ObjectValue::Extent(extent_value),
..
}) if *object_id == self.object_id()
&& *attribute_id == self.attribute_id() =>
{
// If the start of this extent is beyond the end of the range we are
// allocating, we don't have any more work to do.
if new_range.end <= extent_key.range.start {
break;
}
let device_offset = match extent_value {
ExtentValue::None => {
// If the extent value is None, it indicates a deleted extent. In
// that case, we just skip it entirely. By keeping the new_range
// where it is, this section will get included in the new
// allocations.
iter.advance().await?;
continue;
}
ExtentValue::Some { mode: ExtentMode::OverwritePartial(_), .. }
| ExtentValue::Some { mode: ExtentMode::Overwrite, .. } => {
// If this extent is already in overwrite mode, we can skip it.
if extent_key.range.end < new_range.end {
new_range.start = extent_key.range.end;
iter.advance().await?;
continue;
} else {
new_range.start = new_range.end;
break;
}
}
ExtentValue::Some { device_offset, .. } => *device_offset,
};
// Figure out how we have to break up the ranges.
if extent_key.range.start <= new_range.start {
// [ extent
// [ new
// and
// [ extent
// [ new
assert!(new_range.start < extent_key.range.end);
let device_offset =
device_offset + (new_range.start - extent_key.range.start);
if extent_key.range.end < new_range.end {
// [ extent ]
// [ new ]
to_switch
.push((new_range.start..extent_key.range.end, device_offset));
new_range.start = extent_key.range.end;
} else {
// [ extent ]
// [ new ]
// or
// [ extent ]
// [ new ]
to_switch.push((new_range.start..new_range.end, device_offset));
new_range.start = new_range.end;
break;
}
} else {
// [ extent
// [ new ]
to_allocate.push(new_range.start..extent_key.range.start);
if extent_key.range.end < new_range.end {
// [ extent ]
// [ new ]
to_switch.push((extent_key.range.clone(), device_offset));
new_range.start = extent_key.range.end;
} else {
// [ extent ]
// [ new ]
// or
// [ extent ]
// [ new ]
to_switch
.push((extent_key.range.start..new_range.end, device_offset));
new_range.start = new_range.end;
break;
}
}
}
// The records are sorted so if we find something that isn't an extent or
// doesn't match the object id then there are no more extent records for this
// object.
_ => break,
}
iter.advance().await?;
}
}
if new_range.start < new_range.end {
to_allocate.push(new_range.clone());
}
// Make sure the mutation that flips the has_overwrite_extents advisory flag is in the
// first transaction, in case we split transactions. This makes it okay to only replay the
// first transaction if power loss occurs - the file will be in an unusual state, but not
// an invalid one, if only part of the allocate goes through.
let mut mutation =
self.store().txn_get_object_mutation(&transaction, self.object_id()).await?;
if let ObjectValue::Object {
kind: ObjectKind::File { has_overwrite_extents, .. }, ..
} = &mut mutation.item.value
{
*has_overwrite_extents = true;
} else {
bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
}
transaction.add(self.store().store_object_id(), Mutation::ObjectStore(mutation));
// The maximum number of mutations we are going to allow per transaction in allocate. This
// is probably quite a bit lower than the actual limit, but it should be large enough to
// handle most non-edge-case versions of allocate without splitting the transaction.
const MAX_TRANSACTION_SIZE: usize = 256;
for (switch_range, device_offset) in to_switch {
transaction.add_with_object(
self.store().store_object_id(),
Mutation::merge_object(
ObjectKey::extent(self.object_id(), self.attribute_id(), switch_range),
ObjectValue::Extent(ExtentValue::initialized_overwrite_extent(device_offset)),
),
AssocObj::Borrowed(self),
);
if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
transaction.commit_and_continue().await?;
}
}
let mut allocated = 0;
let allocator = self.store().allocator();
for mut allocate_range in to_allocate {
while allocate_range.start < allocate_range.end {
// TODO(https://fxbug.dev/293943124): if any extents are beyond the end of the file
// and the transaction gets split, it will cause an fsck failure. We need to handle
// that case somehow (probably by adding a temporary TRIM record).
let device_range = allocator
.allocate(
&mut transaction,
self.store().store_object_id(),
allocate_range.end - allocate_range.start,
)
.await
.context("allocation failed")?;
let device_range_len = device_range.end - device_range.start;
transaction.add_with_object(
self.store().store_object_id(),
Mutation::merge_object(
ObjectKey::extent(
self.object_id(),
self.attribute_id(),
allocate_range.start..allocate_range.start + device_range_len,
),
ObjectValue::Extent(ExtentValue::blank_overwrite_extent(
device_range.start,
(device_range_len / self.block_size()) as usize,
)),
),
AssocObj::Borrowed(self),
);
allocate_range.start += device_range_len;
allocated += device_range_len;
if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
transaction.commit_and_continue().await?;
}
}
}
if new_range.end > self.txn_get_size(&transaction) {
transaction.add_with_object(
self.store().store_object_id(),
Mutation::replace_or_insert_object(
ObjectKey::attribute(
self.object_id(),
self.attribute_id(),
AttributeKey::Attribute,
),
ObjectValue::attribute(new_range.end),
),
AssocObj::Borrowed(self),
);
}
self.update_allocated_size(&mut transaction, allocated, 0).await?;
transaction.commit().await?;
Ok(())
}
/// Return information on a contiguous set of extents that has the same allocation status,
/// starting from `start_offset`. The information returned is if this set of extents are marked
/// allocated/not allocated and also the size of this set (in bytes). This is used when
/// querying slices for volumes.
/// This function expects `start_offset` to be aligned to block size
pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
let block_size = self.block_size();
assert_eq!(start_offset % block_size, 0);
if start_offset > self.get_size() {
bail!(FxfsError::OutOfRange)
}
if start_offset == self.get_size() {
return Ok((false, 0));
}
let tree = &self.store().tree;
let layer_set = tree.layer_set();
let offset_key = ObjectKey::attribute(
self.object_id(),
self.attribute_id(),
AttributeKey::Extent(ExtentKey::search_key_from_offset(start_offset)),
);
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Included(&offset_key)).await?;
let mut allocated = None;
let mut end = start_offset;
loop {
// Iterate through the extents, each time setting `end` as the end of the previous
// extent
match iter.get() {
Some(ItemRef {
key:
ObjectKey {
object_id,
data:
ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
},
value: ObjectValue::Extent(extent_value),
..
}) => {
// Equivalent of getting no extents back
if *object_id != self.object_id() || *attribute_id != self.attribute_id() {
if allocated == Some(false) || allocated.is_none() {
end = self.get_size();
allocated = Some(false);
}
break;
}
ensure!(extent_key.range.is_aligned(block_size), FxfsError::Inconsistent);
if extent_key.range.start > end {
// If a previous extent has already been visited and we are tracking an
// allocated set, we are only interested in an extent where the range of the
// current extent follows immediately after the previous one.
if allocated == Some(true) {
break;
} else {
// The gap between the previous `end` and this extent is not allocated
end = extent_key.range.start;
allocated = Some(false);
// Continue this iteration, except now the `end` is set to the end of
// the "previous" extent which is this gap between the start_offset
// and the current extent
}
}
// We can assume that from here, the `end` points to the end of a previous
// extent.
match extent_value {
// The current extent has been allocated
ExtentValue::Some { .. } => {
// Stop searching if previous extent was marked deleted
if allocated == Some(false) {
break;
}
allocated = Some(true);
}
// This extent has been marked deleted
ExtentValue::None => {
// Stop searching if previous extent was marked allocated
if allocated == Some(true) {
break;
}
allocated = Some(false);
}
}
end = extent_key.range.end;
}
// This occurs when there are no extents left
None => {
if allocated == Some(false) || allocated.is_none() {
end = self.get_size();
allocated = Some(false);
}
// Otherwise, we were monitoring extents that were allocated, so just exit.
break;
}
// Non-extent records (Object, Child, GraveyardEntry) are ignored.
Some(_) => {}
}
iter.advance().await?;
}
Ok((allocated.unwrap(), end - start_offset))
}
pub async fn txn_write<'a>(
&'a self,
transaction: &mut Transaction<'a>,
offset: u64,
buf: BufferRef<'_>,
) -> Result<(), Error> {
if buf.is_empty() {
return Ok(());
}
let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
self.multi_write(
transaction,
self.attribute_id(),
&[aligned.clone()],
transfer_buf.as_mut(),
)
.await?;
if offset + buf.len() as u64 > self.txn_get_size(transaction) {
transaction.add_with_object(
self.store().store_object_id,
Mutation::replace_or_insert_object(
ObjectKey::attribute(
self.object_id(),
self.attribute_id(),
AttributeKey::Attribute,
),
ObjectValue::attribute(offset + buf.len() as u64),
),
AssocObj::Borrowed(self),
);
}
Ok(())
}
// Writes to multiple ranges with data provided in `buf`. The buffer can be modified in place
// if encryption takes place. The ranges must all be aligned and no change to content size is
// applied; the caller is responsible for updating size if required.
pub async fn multi_write<'a>(
&'a self,
transaction: &mut Transaction<'a>,
attribute_id: u64,
ranges: &[Range<u64>],
buf: MutableBufferRef<'_>,
) -> Result<(), Error> {
self.handle.multi_write(transaction, attribute_id, ranges, buf).await
}
// If allow_allocations is false, then all the extents for the range must have been
// preallocated using preallocate_range or from existing writes.
//
// `buf` is mutable as an optimization, since the write may require encryption, we can
// encrypt the buffer in-place rather than copying to another buffer if the write is
// already aligned.
//
// Note: in the event of power failure during an overwrite() call, it is possible that
// old data (which hasn't been overwritten with new bytes yet) may be exposed to the user.
// Since the old data should be encrypted, it is probably safe to expose, although not ideal.
pub async fn overwrite(
&self,
mut offset: u64,
mut buf: MutableBufferRef<'_>,
allow_allocations: bool,
) -> Result<(), Error> {
assert_eq!((buf.len() as u32) % self.store().device.block_size(), 0);
let end = offset + buf.len() as u64;
// The transaction only ends up being used if allow_allocations is true
let mut transaction =
if allow_allocations { Some(self.new_transaction().await?) } else { None };
// We build up a list of writes to perform later
let writes = FuturesUnordered::new();
// We create a new scope here, so that the merger iterator will get dropped before we try to
// commit our transaction. Otherwise the transaction commit would block.
{
let store = self.store();
let store_object_id = store.store_object_id;
let allocator = store.allocator();
let tree = &store.tree;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger
.seek(Bound::Included(&ObjectKey::attribute(
self.object_id(),
self.attribute_id(),
AttributeKey::Extent(ExtentKey::search_key_from_offset(offset)),
)))
.await?;
let block_size = self.block_size();
loop {
let (device_offset, bytes_to_write, should_advance) = match iter.get() {
Some(ItemRef {
key:
ObjectKey {
object_id,
data:
ObjectKeyData::Attribute(
attribute_id,
AttributeKey::Extent(ExtentKey { range }),
),
},
value: ObjectValue::Extent(ExtentValue::Some { .. }),
..
}) if *object_id == self.object_id()
&& *attribute_id == self.attribute_id()
&& range.end == offset =>
{
iter.advance().await?;
continue;
}
Some(ItemRef {
key:
ObjectKey {
object_id,
data:
ObjectKeyData::Attribute(
attribute_id,
AttributeKey::Extent(ExtentKey { range }),
),
},
value,
..
}) if *object_id == self.object_id()
&& *attribute_id == self.attribute_id()
&& range.start <= offset =>
{
match value {
ObjectValue::Extent(ExtentValue::Some {
device_offset,
mode: ExtentMode::Raw,
..
}) => {
ensure!(
range.is_aligned(block_size) && device_offset % block_size == 0,
FxfsError::Inconsistent
);
let offset_within_extent = offset - range.start;
let remaining_length_of_extent = (range
.end
.checked_sub(offset)
.ok_or(FxfsError::Inconsistent)?)
as usize;
// Yields (device_offset, bytes_to_write, should_advance)
(
device_offset + offset_within_extent,
min(buf.len(), remaining_length_of_extent),
true,
)
}
ObjectValue::Extent(ExtentValue::Some { .. }) => {
// TODO(https://fxbug.dev/42066056): Maybe we should create
// a new extent without checksums?
bail!(
"extent from ({},{}) which overlaps offset \
{} has the wrong extent mode",
range.start,
range.end,
offset
)
}
_ => {
bail!(
"overwrite failed: extent overlapping offset {} has \
unexpected ObjectValue",
offset
)
}
}
}
maybe_item_ref => {
if let Some(transaction) = transaction.as_mut() {
assert_eq!(allow_allocations, true);
assert_eq!(offset % self.block_size(), 0);
// We are going to make a new extent, but let's check if there is an
// extent after us. If there is an extent after us, then we don't want
// our new extent to bump into it...
let mut bytes_to_allocate =
round_up(buf.len() as u64, self.block_size())
.ok_or(FxfsError::TooBig)?;
if let Some(ItemRef {
key:
ObjectKey {
object_id,
data:
ObjectKeyData::Attribute(
attribute_id,
AttributeKey::Extent(ExtentKey { range }),
),
},
..
}) = maybe_item_ref
{
if *object_id == self.object_id()
&& *attribute_id == self.attribute_id()
&& offset < range.start
{
let bytes_until_next_extent = range.start - offset;
bytes_to_allocate =
min(bytes_to_allocate, bytes_until_next_extent);
}
}
let device_range = allocator
.allocate(transaction, store_object_id, bytes_to_allocate)
.await?;
let device_range_len = device_range.end - device_range.start;
transaction.add(
store_object_id,
Mutation::insert_object(
ObjectKey::extent(
self.object_id(),
self.attribute_id(),
offset..offset + device_range_len,
),
ObjectValue::Extent(ExtentValue::new_raw(device_range.start)),
),
);
self.update_allocated_size(transaction, device_range_len, 0).await?;
// Yields (device_offset, bytes_to_write, should_advance)
(device_range.start, min(buf.len(), device_range_len as usize), false)
} else {
bail!(
"no extent overlapping offset {}, \
and new allocations are not allowed",
offset
)
}
}
};
let (current_buf, remaining_buf) = buf.split_at_mut(bytes_to_write);
writes.push(self.write_at(offset, current_buf, device_offset));
if remaining_buf.len() == 0 {
break;
} else {
buf = remaining_buf;
offset += bytes_to_write as u64;
if should_advance {
iter.advance().await?;
}
}
}
}
self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
// The checksums are being ignored here, but we don't need to know them
writes.try_collect::<Vec<MaybeChecksums>>().await?;
if let Some(mut transaction) = transaction {
assert_eq!(allow_allocations, true);
if !transaction.is_empty() {
if end > self.get_size() {
self.grow(&mut transaction, self.get_size(), end).await?;
}
transaction.commit().await?;
}
}
Ok(())
}
// Within a transaction, the size of the object might have changed, so get the size from there
// if it exists, otherwise, fall back on the cached size.
fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
transaction
.get_object_mutation(
self.store().store_object_id,
ObjectKey::attribute(
self.object_id(),
self.attribute_id(),
AttributeKey::Attribute,
),
)
.and_then(|m| {
if let ObjectItem { value: ObjectValue::Attribute { size }, .. } = m.item {
Some(size)
} else {
None
}
})
.unwrap_or_else(|| self.get_size())
}
async fn update_allocated_size(
&self,
transaction: &mut Transaction<'_>,
allocated: u64,
deallocated: u64,
) -> Result<(), Error> {
self.handle.update_allocated_size(transaction, allocated, deallocated).await
}
pub async fn shrink<'a>(
&'a self,
transaction: &mut Transaction<'a>,
size: u64,
) -> Result<NeedsTrim, Error> {
let needs_trim = self.handle.shrink(transaction, self.attribute_id(), size).await?;
transaction.add_with_object(
self.store().store_object_id(),
Mutation::replace_or_insert_object(
ObjectKey::attribute(
self.object_id(),
self.attribute_id(),
AttributeKey::Attribute,
),
ObjectValue::attribute(size),
),
AssocObj::Borrowed(self),
);
Ok(needs_trim)
}
pub async fn grow<'a>(
&'a self,
transaction: &mut Transaction<'a>,
old_size: u64,
size: u64,
) -> Result<(), Error> {
// Before growing the file, we must make sure that a previous trim has completed.
let store = self.store();
while matches!(
store
.trim_some(
transaction,
self.object_id(),
self.attribute_id(),
TrimMode::FromOffset(old_size)
)
.await?,
TrimResult::Incomplete
) {
transaction.commit_and_continue().await?;
}
// We might need to zero out the tail of the old last block.
let block_size = self.block_size();
if old_size % block_size != 0 {
let layer_set = store.tree.layer_set();
let mut merger = layer_set.merger();
let aligned_old_size = round_down(old_size, block_size);
let iter = merger
.seek(Bound::Included(&ObjectKey::extent(
self.object_id(),
self.attribute_id(),
aligned_old_size..aligned_old_size + 1,
)))
.await?;
if let Some(ItemRef {
key:
ObjectKey {
object_id,
data:
ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
},
value: ObjectValue::Extent(ExtentValue::Some { device_offset, key_id, .. }),
..
}) = iter.get()
{
if *object_id == self.object_id() && *attribute_id == self.attribute_id() {
let device_offset = device_offset
.checked_add(aligned_old_size - extent_key.range.start)
.ok_or(FxfsError::Inconsistent)?;
ensure!(device_offset % block_size == 0, FxfsError::Inconsistent);
let mut buf = self.allocate_buffer(block_size as usize).await;
// In the case that this extent is in OverwritePartial mode, there is a
// possibility that the last block is allocated, but not initialized yet, in
// which case we don't actually need to bother zeroing out the tail. However,
// it's not strictly incorrect to change uninitialized data, so we skip the
// check and blindly do it to keep it simpler here.
self.read_and_decrypt(
device_offset,
aligned_old_size,
buf.as_mut(),
*key_id,
None,
)
.await?;
buf.as_mut_slice()[(old_size % block_size) as usize..].fill(0);
self.multi_write(
transaction,
*attribute_id,
&[aligned_old_size..aligned_old_size + block_size],
buf.as_mut(),
)
.await?;
}
}
}
transaction.add_with_object(
store.store_object_id,
Mutation::replace_or_insert_object(
ObjectKey::attribute(
self.object_id(),
self.attribute_id(),
AttributeKey::Attribute,
),
ObjectValue::attribute(size),
),
AssocObj::Borrowed(self),
);
Ok(())
}
/// Attempts to pre-allocate a `file_range` of bytes for this object.
/// Returns a set of device ranges (i.e. potentially multiple extents).
///
/// It may not be possible to preallocate the entire requested range in one request
/// due to limitations on transaction size. In such cases, we will preallocate as much as
/// we can up to some (arbitrary, internal) limit on transaction size.
///
/// `file_range.start` is modified to point at the end of the logical range
/// that was preallocated such that repeated calls to `preallocate_range` with new
/// transactions can be used to preallocate ranges of any size.
///
/// Requested range must be a multiple of block size.
pub async fn preallocate_range<'a>(
&'a self,
transaction: &mut Transaction<'a>,
file_range: &mut Range<u64>,
) -> Result<Vec<Range<u64>>, Error> {
let block_size = self.block_size();
assert!(file_range.is_aligned(block_size));
assert!(!self.handle.is_encrypted());
let mut ranges = Vec::new();
let tree = &self.store().tree;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger
.seek(Bound::Included(&ObjectKey::attribute(
self.object_id(),
self.attribute_id(),
AttributeKey::Extent(ExtentKey::search_key_from_offset(file_range.start)),
)))
.await?;
let mut allocated = 0;
'outer: while file_range.start < file_range.end {
let allocate_end = loop {
match iter.get() {
// Case for allocated extents for the same object that overlap with file_range.
Some(ItemRef {
key:
ObjectKey {
object_id,
data:
ObjectKeyData::Attribute(
attribute_id,
AttributeKey::Extent(ExtentKey { range }),
),
},
value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
..
}) if *object_id == self.object_id()
&& *attribute_id == self.attribute_id()
&& range.start < file_range.end =>
{
ensure!(
range.is_valid()
&& range.is_aligned(block_size)
&& device_offset % block_size == 0,
FxfsError::Inconsistent
);
// If the start of the requested file_range overlaps with an existing extent...
if range.start <= file_range.start {
// Record the existing extent and move on.
let device_range = device_offset
.checked_add(file_range.start - range.start)
.ok_or(FxfsError::Inconsistent)?
..device_offset
.checked_add(min(range.end, file_range.end) - range.start)
.ok_or(FxfsError::Inconsistent)?;
file_range.start += device_range.end - device_range.start;
ranges.push(device_range);
if file_range.start >= file_range.end {
break 'outer;
}
iter.advance().await?;
continue;
} else {
// There's nothing allocated between file_range.start and the beginning
// of this extent.
break range.start;
}
}
// Case for deleted extents eclipsed by file_range.
Some(ItemRef {
key:
ObjectKey {
object_id,
data:
ObjectKeyData::Attribute(
attribute_id,
AttributeKey::Extent(ExtentKey { range }),
),
},
value: ObjectValue::Extent(ExtentValue::None),
..
}) if *object_id == self.object_id()
&& *attribute_id == self.attribute_id()
&& range.end < file_range.end =>
{
iter.advance().await?;
}
_ => {
// We can just preallocate the rest.
break file_range.end;
}
}
};
let device_range = self
.store()
.allocator()
.allocate(
transaction,
self.store().store_object_id(),
allocate_end - file_range.start,
)
.await
.context("Allocation failed")?;
allocated += device_range.end - device_range.start;
let this_file_range =
file_range.start..file_range.start + device_range.end - device_range.start;
file_range.start = this_file_range.end;
transaction.add(
self.store().store_object_id,
Mutation::merge_object(
ObjectKey::extent(self.object_id(), self.attribute_id(), this_file_range),
ObjectValue::Extent(ExtentValue::new_raw(device_range.start)),
),
);
ranges.push(device_range);
// If we didn't allocate all that we requested, we'll loop around and try again.
// ... unless we have filled the transaction. The caller should check file_range.
if transaction.mutations().len() > TRANSACTION_MUTATION_THRESHOLD {
break;
}
}
// Update the file size if it changed.
if file_range.start > round_up(self.txn_get_size(transaction), block_size).unwrap() {
transaction.add_with_object(
self.store().store_object_id,
Mutation::replace_or_insert_object(
ObjectKey::attribute(
self.object_id(),
self.attribute_id(),
AttributeKey::Attribute,
),
ObjectValue::attribute(file_range.start),
),
AssocObj::Borrowed(self),
);
}
self.update_allocated_size(transaction, allocated, 0).await?;
Ok(ranges)
}
pub async fn update_attributes<'a>(
&self,
transaction: &mut Transaction<'a>,
node_attributes: Option<&fio::MutableNodeAttributes>,
change_time: Option<Timestamp>,
) -> Result<(), Error> {
self.handle.update_attributes(transaction, node_attributes, change_time).await
}
/// Get the default set of transaction options for this object. This is mostly the overall
/// default, modified by any [`HandleOptions`] held by this handle.
pub fn default_transaction_options<'b>(&self) -> Options<'b> {
self.handle.default_transaction_options()
}
pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
self.new_transaction_with_options(self.default_transaction_options()).await
}
pub async fn new_transaction_with_options<'b>(
&self,
options: Options<'b>,
) -> Result<Transaction<'b>, Error> {
self.handle.new_transaction_with_options(self.attribute_id(), options).await
}
/// Flushes the underlying device. This is expensive and should be used sparingly.
pub async fn flush_device(&self) -> Result<(), Error> {
self.handle.flush_device().await
}
/// Reads an entire attribute.
pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
self.handle.read_attr(attribute_id).await
}
/// Writes an entire attribute.
pub async fn write_attr(&self, attribute_id: u64, data: &[u8]) -> Result<(), Error> {
// Must be different attribute otherwise cached size gets out of date.
assert_ne!(attribute_id, self.attribute_id());
let store = self.store();
let mut transaction = self.new_transaction().await?;
if self.handle.write_attr(&mut transaction, attribute_id, data).await?.0 {
transaction.commit_and_continue().await?;
while matches!(
store
.trim_some(
&mut transaction,
self.object_id(),
attribute_id,
TrimMode::FromOffset(data.len() as u64),
)
.await?,
TrimResult::Incomplete
) {
transaction.commit_and_continue().await?;
}
}
transaction.commit().await?;
Ok(())
}
async fn read_and_decrypt(
&self,
device_offset: u64,
file_offset: u64,
buffer: MutableBufferRef<'_>,
key_id: u64,
block_bitmap: Option<bit_vec::BitVec>,
) -> Result<(), Error> {
self.handle.read_and_decrypt(device_offset, file_offset, buffer, key_id, block_bitmap).await
}
/// Truncates a file to a given size (growing/shrinking as required).
///
/// Nb: Most code will want to call truncate() instead. This method is used
/// to update the super block -- a case where we must borrow metadata space.
pub async fn truncate_with_options(
&self,
options: Options<'_>,
size: u64,
) -> Result<(), Error> {
let mut transaction = self.new_transaction_with_options(options).await?;
let old_size = self.get_size();
if size == old_size {
return Ok(());
}
if size < old_size {
if self.shrink(&mut transaction, size).await?.0 {
// The file needs to be trimmed.
transaction.commit_and_continue().await?;
let store = self.store();
while matches!(
store
.trim_some(
&mut transaction,
self.object_id(),
self.attribute_id(),
TrimMode::FromOffset(size)
)
.await?,
TrimResult::Incomplete
) {
if let Err(error) = transaction.commit_and_continue().await {
warn!(?error, "Failed to trim after truncate");
return Ok(());
}
}
if let Err(error) = transaction.commit().await {
warn!(?error, "Failed to trim after truncate");
}
return Ok(());
}
} else {
self.grow(&mut transaction, old_size, size).await?;
}
transaction.commit().await?;
Ok(())
}
pub async fn get_properties(&self) -> Result<ObjectProperties, Error> {
// We don't take a read guard here since the object properties are contained in a single
// object, which cannot be inconsistent with itself. The LSM tree does not return
// intermediate states for a single object.
let item = self
.store()
.tree
.find(&ObjectKey::object(self.object_id()))
.await?
.expect("Unable to find object record");
match item.value {
ObjectValue::Object {
kind: ObjectKind::File { refs, .. },
attributes:
ObjectAttributes {
creation_time,
modification_time,
posix_attributes,
allocated_size,
access_time,
change_time,
..
},
} => Ok(ObjectProperties {
refs,
allocated_size,
data_attribute_size: self.get_size(),
creation_time,
modification_time,
access_time,
change_time,
sub_dirs: 0,
posix_attributes,
}),
_ => bail!(FxfsError::NotFile),
}
}
// Returns the contents of this object. This object must be < |limit| bytes in size.
pub async fn contents(&self, limit: usize) -> Result<Box<[u8]>, Error> {
let size = self.get_size();
if size > limit as u64 {
bail!("Object too big ({} > {})", size, limit);
}
let mut buf = self.allocate_buffer(size as usize).await;
self.read(0u64, buf.as_mut()).await?;
Ok(buf.as_slice().into())
}
}
impl<S: HandleOwner> AssociatedObject for DataObjectHandle<S> {
fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
match mutation {
Mutation::ObjectStore(ObjectStoreMutation {
item: ObjectItem { value: ObjectValue::Attribute { size }, .. },
..
}) => self.content_size.store(*size, atomic::Ordering::Relaxed),
Mutation::ObjectStore(ObjectStoreMutation {
item: ObjectItem { value: ObjectValue::VerifiedAttribute { .. }, .. },
..
}) => self.finalize_fsverity_state(),
_ => {}
}
}
}
impl<S: HandleOwner> ObjectHandle for DataObjectHandle<S> {
fn set_trace(&self, v: bool) {
self.handle.set_trace(v)
}
fn object_id(&self) -> u64 {
self.handle.object_id()
}
fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
self.handle.allocate_buffer(size)
}
fn block_size(&self) -> u64 {
self.handle.block_size()
}
}
#[async_trait]
impl<S: HandleOwner> ReadObjectHandle for DataObjectHandle<S> {
async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
let fs = self.store().filesystem();
let guard = fs
.lock_manager()
.read_lock(lock_keys![LockKey::object_attribute(
self.store().store_object_id,
self.object_id(),
self.attribute_id(),
)])
.await;
let size = self.get_size();
if offset >= size {
return Ok(0);
}
let length = min(buf.len() as u64, size - offset) as usize;
buf = buf.subslice_mut(0..length);
self.handle.read_unchecked(self.attribute_id(), offset, buf.reborrow(), &guard).await?;
if self.verified_file() {
self.verify_data(offset as usize, buf.as_slice()).await?;
}
Ok(length)
}
fn get_size(&self) -> u64 {
self.content_size.load(atomic::Ordering::Relaxed)
}
}
impl<S: HandleOwner> WriteObjectHandle for DataObjectHandle<S> {
async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
let offset = offset.unwrap_or(self.get_size());
let mut transaction = self.new_transaction().await?;
self.txn_write(&mut transaction, offset, buf).await?;
let new_size = self.txn_get_size(&transaction);
transaction.commit().await?;
Ok(new_size)
}
async fn truncate(&self, size: u64) -> Result<(), Error> {
self.truncate_with_options(self.default_transaction_options(), size).await
}
async fn flush(&self) -> Result<(), Error> {
Ok(())
}
}
/// Like object_handle::Writer, but allows custom transaction options to be set, and makes every
/// write go directly to the handle in a transaction.
pub struct DirectWriter<'a, S: HandleOwner> {
handle: &'a DataObjectHandle<S>,
options: transaction::Options<'a>,
buffer: Buffer<'a>,
offset: u64,
buf_offset: usize,
}
const BUFFER_SIZE: usize = 1_048_576;
impl<S: HandleOwner> Drop for DirectWriter<'_, S> {
fn drop(&mut self) {
if self.buf_offset != 0 {
warn!("DirectWriter: dropping data, did you forget to call complete?");
}
}
}
impl<'a, S: HandleOwner> DirectWriter<'a, S> {
pub async fn new(
handle: &'a DataObjectHandle<S>,
options: transaction::Options<'a>,
) -> DirectWriter<'a, S> {
Self {
handle,
options,
buffer: handle.allocate_buffer(BUFFER_SIZE).await,
offset: 0,
buf_offset: 0,
}
}
async fn flush(&mut self) -> Result<(), Error> {
let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
self.handle
.txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
.await?;
transaction.commit().await?;
self.offset += self.buf_offset as u64;
self.buf_offset = 0;
Ok(())
}
}
impl<'a, S: HandleOwner> WriteBytes for DirectWriter<'a, S> {
fn block_size(&self) -> u64 {
self.handle.block_size()
}
async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
while buf.len() > 0 {
let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
self.buffer
.subslice_mut(self.buf_offset..self.buf_offset + to_do)
.as_mut_slice()
.copy_from_slice(&buf[..to_do]);
self.buf_offset += to_do;
if self.buf_offset == BUFFER_SIZE {
self.flush().await?;
}
buf = &buf[to_do..];
}
Ok(())
}
async fn complete(&mut self) -> Result<(), Error> {
self.flush().await?;
Ok(())
}
async fn skip(&mut self, amount: u64) -> Result<(), Error> {
if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
self.buffer
.subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
.as_mut_slice()
.fill(0);
self.buf_offset += amount as usize;
} else {
self.flush().await?;
self.offset += amount;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use {
crate::{
errors::FxfsError,
filesystem::{
FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem, SyncOptions,
},
fsck::{fsck, fsck_volume_with_options, fsck_with_options, FsckOptions},
object_handle::{ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle},
object_store::{
data_object_handle::WRITE_ATTR_BATCH_SIZE,
directory::replace_child,
object_record::{ObjectKey, ObjectValue, Timestamp},
transaction::{lock_keys, Mutation, Options},
volume::root_volume,
DataObjectHandle, Directory, HandleOptions, LockKey, ObjectStore, PosixAttributes,
FSVERITY_MERKLE_ATTRIBUTE_ID, TRANSACTION_MUTATION_THRESHOLD,
},
round::{round_down, round_up},
},
assert_matches::assert_matches,
fidl_fuchsia_io as fio, fuchsia_async as fasync,
futures::{
channel::oneshot::channel,
stream::{FuturesUnordered, StreamExt},
FutureExt,
},
fxfs_crypto::Crypt,
fxfs_insecure_crypto::InsecureCrypt,
mundane::hash::{Digest, Hasher, Sha256},
rand::Rng,
std::{
ops::Range,
sync::{Arc, Mutex},
time::Duration,
},
storage_device::{fake_device::FakeDevice, DeviceHolder},
};
const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
// Some tests (the preallocate_range ones) currently assume that the data only occupies a single
// device block.
const TEST_DATA_OFFSET: u64 = 5000;
const TEST_DATA: &[u8] = b"hello";
const TEST_OBJECT_SIZE: u64 = 5678;
const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
const TEST_OBJECT_NAME: &str = "foo";
async fn test_filesystem() -> OpenFxFilesystem {
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
FxFilesystem::new_empty(device).await.expect("new_empty failed")
}
async fn test_filesystem_and_object_with_key(
crypt: Option<&dyn Crypt>,
write_object_test_data: bool,
) -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
let fs = test_filesystem().await;
let store = fs.root_store();
let object;
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
store.root_directory_object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
object = ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
crypt,
None,
)
.await
.expect("create_object failed");
let root_directory =
Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
root_directory
.add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
.await
.expect("add_child_file failed");
if write_object_test_data {
let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
let mut buf = object.allocate_buffer(align + TEST_DATA.len()).await;
buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
object
.txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
.await
.expect("write failed");
}
transaction.commit().await.expect("commit failed");
object.truncate(TEST_OBJECT_SIZE).await.expect("truncate failed");
(fs, object)
}
async fn test_filesystem_and_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
test_filesystem_and_object_with_key(Some(&InsecureCrypt::new()), true).await
}
async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
{
test_filesystem_and_object_with_key(Some(&InsecureCrypt::new()), false).await
}
#[fuchsia::test]
async fn test_zero_buf_len_read() {
let (fs, object) = test_filesystem_and_object().await;
let mut buf = object.allocate_buffer(0).await;
assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_beyond_eof_read() {
let (fs, object) = test_filesystem_and_object().await;
let offset = TEST_OBJECT_SIZE as usize - 2;
let align = offset % fs.block_size() as usize;
let len: usize = 2;
let mut buf = object.allocate_buffer(align + len + 1).await;
buf.as_mut_slice().fill(123u8);
assert_eq!(
object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
align + len
);
assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_beyond_eof_read_from() {
let (fs, object) = test_filesystem_and_object().await;
let offset = TEST_OBJECT_SIZE as usize - 2;
let align = offset % fs.block_size() as usize;
let len: usize = 2;
let mut buf = object.allocate_buffer(align + len + 1).await;
buf.as_mut_slice().fill(123u8);
assert_eq!(
object
.handle()
.read(0, (offset - align) as u64, buf.as_mut())
.await
.expect("read failed"),
align + len
);
assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_beyond_eof_read_unchecked() {
let (fs, object) = test_filesystem_and_object().await;
let offset = TEST_OBJECT_SIZE as usize - 2;
let align = offset % fs.block_size() as usize;
let len: usize = 2;
let mut buf = object.allocate_buffer(align + len + 1).await;
buf.as_mut_slice().fill(123u8);
let guard = fs
.lock_manager()
.read_lock(lock_keys![LockKey::object_attribute(
object.store().store_object_id,
object.object_id(),
0,
)])
.await;
object
.handle()
.read_unchecked(0, (offset - align) as u64, buf.as_mut(), &guard)
.await
.expect("read failed");
assert_eq!(&buf.as_slice()[align..], &vec![0u8; len + 1]);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_read_sparse() {
let (fs, object) = test_filesystem_and_object().await;
// Deliberately read not right to eof.
let len = TEST_OBJECT_SIZE as usize - 1;
let mut buf = object.allocate_buffer(len).await;
buf.as_mut_slice().fill(123u8);
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
let mut expected = vec![0; len];
let offset = TEST_DATA_OFFSET as usize;
expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
assert_eq!(buf.as_slice()[..len], expected[..]);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_read_after_writes_interspersed_with_flush() {
let (fs, object) = test_filesystem_and_object().await;
object.owner().flush().await.expect("flush failed");
// Write more test data to the first block fo the file.
let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
buf.as_mut_slice().copy_from_slice(TEST_DATA);
object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
let len = TEST_OBJECT_SIZE as usize - 1;
let mut buf = object.allocate_buffer(len).await;
buf.as_mut_slice().fill(123u8);
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
let mut expected = vec![0u8; len];
let offset = TEST_DATA_OFFSET as usize;
expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
assert_eq!(buf.as_slice(), &expected);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_read_after_truncate_and_extend() {
let (fs, object) = test_filesystem_and_object().await;
// Arrange for there to be <extent><deleted-extent><extent>.
let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
buf.as_mut_slice().copy_from_slice(TEST_DATA);
// This adds an extent at 0..512.
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
// This deletes 512..1024.
object.truncate(3).await.expect("truncate failed");
let data = b"foo";
let offset = 1500u64;
let align = (offset % fs.block_size() as u64) as usize;
let mut buf = object.allocate_buffer(align + data.len()).await;
buf.as_mut_slice()[align..].copy_from_slice(data);
// This adds 1024..1536.
object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed");
const LEN1: usize = 1503;
let mut buf = object.allocate_buffer(LEN1).await;
buf.as_mut_slice().fill(123u8);
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
let mut expected = [0; LEN1];
expected[..3].copy_from_slice(&TEST_DATA[..3]);
expected[1500..].copy_from_slice(b"foo");
assert_eq!(buf.as_slice(), &expected);
// Also test a read that ends midway through the deleted extent.
const LEN2: usize = 601;
let mut buf = object.allocate_buffer(LEN2).await;
buf.as_mut_slice().fill(123u8);
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
assert_eq!(buf.as_slice(), &expected[..LEN2]);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_read_whole_blocks_with_multiple_objects() {
let (fs, object) = test_filesystem_and_object().await;
let block_size = object.block_size() as usize;
let mut buffer = object.allocate_buffer(block_size).await;
buffer.as_mut_slice().fill(0xaf);
object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
let store = object.owner();
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let object2 = ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed");
transaction.commit().await.expect("commit failed");
let mut ef_buffer = object.allocate_buffer(block_size).await;
ef_buffer.as_mut_slice().fill(0xef);
object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
let mut buffer = object.allocate_buffer(block_size).await;
buffer.as_mut_slice().fill(0xaf);
object
.write_or_append(Some(block_size as u64), buffer.as_ref())
.await
.expect("write failed");
object.truncate(3 * block_size as u64).await.expect("truncate failed");
object2
.write_or_append(Some(block_size as u64), ef_buffer.as_ref())
.await
.expect("write failed");
let mut buffer = object.allocate_buffer(4 * block_size).await;
buffer.as_mut_slice().fill(123);
assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * block_size);
assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xaf; 2 * block_size]);
assert_eq!(&buffer.as_slice()[2 * block_size..3 * block_size], &vec![0; block_size]);
assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * block_size);
assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xef; 2 * block_size]);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_alignment() {
let (fs, object) = test_filesystem_and_object().await;
struct AlignTest {
fill: u8,
object: DataObjectHandle<ObjectStore>,
mirror: Vec<u8>,
}
impl AlignTest {
async fn new(object: DataObjectHandle<ObjectStore>) -> Self {
let mirror = {
let mut buf = object.allocate_buffer(object.get_size() as usize).await;
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
buf.as_slice().to_vec()
};
Self { fill: 0, object, mirror }
}
// Fills |range| of self.object with a byte value (self.fill) and mirrors the same
// operation to an in-memory copy of the object.
// Each subsequent call bumps the value of fill.
// It is expected that the object and its mirror maintain identical content.
async fn test(&mut self, range: Range<u64>) {
let mut buf = self.object.allocate_buffer((range.end - range.start) as usize).await;
self.fill += 1;
buf.as_mut_slice().fill(self.fill);
self.object
.write_or_append(Some(range.start), buf.as_ref())
.await
.expect("write_or_append failed");
if range.end > self.mirror.len() as u64 {
self.mirror.resize(range.end as usize, 0);
}
self.mirror[range.start as usize..range.end as usize].fill(self.fill);
let mut buf = self.object.allocate_buffer(self.mirror.len() + 1).await;
assert_eq!(
self.object.read(0, buf.as_mut()).await.expect("read failed"),
self.mirror.len()
);
assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
}
}
let block_size = object.block_size() as u64;
let mut align = AlignTest::new(object).await;
// Fill the object to start with (with 1).
align.test(0..2 * block_size + 1).await;
// Unaligned head (fills with 2, overwrites that with 3).
align.test(1..block_size).await;
align.test(1..2 * block_size).await;
// Unaligned tail (fills with 4 and 5).
align.test(0..block_size - 1).await;
align.test(0..2 * block_size - 1).await;
// Both unaligned (fills with 6 and 7).
align.test(1..block_size - 1).await;
align.test(1..2 * block_size - 1).await;
fs.close().await.expect("Close failed");
}
async fn test_preallocate_common(fs: &FxFilesystem, object: DataObjectHandle<ObjectStore>) {
let allocator = fs.allocator();
let allocated_before = allocator.get_allocated_bytes();
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object
.preallocate_range(&mut transaction, &mut (0..fs.block_size() as u64))
.await
.expect("preallocate_range failed");
transaction.commit().await.expect("commit failed");
assert!(object.get_size() < 1048576);
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object
.preallocate_range(&mut transaction, &mut (0..1048576))
.await
.expect("preallocate_range failed");
transaction.commit().await.expect("commit failed");
assert_eq!(object.get_size(), 1048576);
// Check that it didn't reallocate the space for the existing extent
let allocated_after = allocator.get_allocated_bytes();
assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
let mut buf = object
.allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize)
.await;
buf.as_mut_slice().fill(47);
object
.write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
.await
.expect("write failed");
buf.as_mut_slice().fill(95);
let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
object.overwrite(offset, buf.as_mut(), false).await.expect("write failed");
// Make sure there were no more allocations.
assert_eq!(allocator.get_allocated_bytes(), allocated_after);
// Read back the data and make sure it is what we expect.
let mut buf = object.allocate_buffer(104876).await;
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
assert_eq!(
&buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
TEST_DATA
);
assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
}
#[fuchsia::test]
async fn test_preallocate_range() {
let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
test_preallocate_common(&fs, object).await;
fs.close().await.expect("Close failed");
}
// This is identical to the previous test except that we flush so that extents end up in
// different layers.
#[fuchsia::test]
async fn test_preallocate_succeeds_when_extents_are_in_different_layers() {
let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
object.owner().flush().await.expect("flush failed");
test_preallocate_common(&fs, object).await;
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_already_preallocated() {
let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
let allocator = fs.allocator();
let allocated_before = allocator.get_allocated_bytes();
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
object
.preallocate_range(&mut transaction, &mut (offset..offset + fs.block_size() as u64))
.await
.expect("preallocate_range failed");
transaction.commit().await.expect("commit failed");
// Check that it didn't reallocate any new space.
assert_eq!(allocator.get_allocated_bytes(), allocated_before);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_overwrite_when_preallocated_at_start_of_file() {
// The standard test data we put in the test object would cause an extent with checksums
// to be created, which overwrite() doesn't support. So we create an empty object instead.
let (fs, object) = test_filesystem_and_empty_object().await;
let object = ObjectStore::open_object(
object.owner(),
object.object_id(),
HandleOptions::default(),
None,
)
.await
.expect("open_object failed");
assert_eq!(fs.block_size(), 4096);
let mut write_buf = object.allocate_buffer(4096).await;
write_buf.as_mut_slice().fill(95);
// First try to overwrite without allowing allocations
// We expect this to fail, since nothing is allocated yet
object.overwrite(0, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
// Now preallocate some space (exactly one block)
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object
.preallocate_range(&mut transaction, &mut (0..4096 as u64))
.await
.expect("preallocate_range failed");
transaction.commit().await.expect("commit failed");
// Now try the same overwrite command as before, it should work this time,
// even with allocations disabled...
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(0, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[0; 4096]);
}
object.overwrite(0, write_buf.as_mut(), false).await.expect("overwrite failed");
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(0, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[95; 4096]);
}
// Now try to overwrite at offset 4096. We expect this to fail, since we only preallocated
// one block earlier at offset 0
object.overwrite(4096, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
// We can't assert anything about the existing bytes, because they haven't been allocated
// yet and they could contain any values
object.overwrite(4096, write_buf.as_mut(), true).await.expect("overwrite failed");
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(4096, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[95; 4096]);
}
// Check that the overwrites haven't messed up the filesystem state
let fsck_options = FsckOptions {
fail_on_warning: true,
no_lock: true,
on_error: Box::new(|err| println!("fsck error: {:?}", err)),
..Default::default()
};
fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_overwrite_large_buffer_and_file_with_many_holes() {
// The standard test data we put in the test object would cause an extent with checksums
// to be created, which overwrite() doesn't support. So we create an empty object instead.
let (fs, object) = test_filesystem_and_empty_object().await;
let object = ObjectStore::open_object(
object.owner(),
object.object_id(),
HandleOptions::default(),
None,
)
.await
.expect("open_object failed");
assert_eq!(fs.block_size(), 4096);
assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
// Let's create some non-holes
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object
.preallocate_range(&mut transaction, &mut (4096..8192 as u64))
.await
.expect("preallocate_range failed");
object
.preallocate_range(&mut transaction, &mut (16384..32768 as u64))
.await
.expect("preallocate_range failed");
object
.preallocate_range(&mut transaction, &mut (65536..131072 as u64))
.await
.expect("preallocate_range failed");
object
.preallocate_range(&mut transaction, &mut (262144..524288 as u64))
.await
.expect("preallocate_range failed");
transaction.commit().await.expect("commit failed");
assert_eq!(object.get_size(), 524288);
let mut write_buf = object.allocate_buffer(4096).await;
write_buf.as_mut_slice().fill(95);
// We shouldn't be able to overwrite in the holes if new allocations aren't enabled
object.overwrite(0, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
object.overwrite(8192, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
object.overwrite(32768, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
object.overwrite(131072, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
// But we should be able to overwrite in the prealloc'd areas without needing allocations
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(4096, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[0; 4096]);
}
object.overwrite(4096, write_buf.as_mut(), false).await.expect("overwrite failed");
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(4096, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[95; 4096]);
}
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(16384, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[0; 4096]);
}
object.overwrite(16384, write_buf.as_mut(), false).await.expect("overwrite failed");
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(16384, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[95; 4096]);
}
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(65536, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[0; 4096]);
}
object.overwrite(65536, write_buf.as_mut(), false).await.expect("overwrite failed");
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(65536, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[95; 4096]);
}
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(262144, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[0; 4096]);
}
object.overwrite(262144, write_buf.as_mut(), false).await.expect("overwrite failed");
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(262144, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[95; 4096]);
}
// Now let's try to do a huge overwrite, that spans over many holes and non-holes
let mut huge_write_buf = object.allocate_buffer(524288).await;
huge_write_buf.as_mut_slice().fill(96);
// With allocations disabled, the big overwrite should fail...
object.overwrite(0, huge_write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
// ... but it should work when allocations are enabled
object.overwrite(0, huge_write_buf.as_mut(), true).await.expect("overwrite failed");
{
let mut read_buf = object.allocate_buffer(524288).await;
object.read(0, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[96; 524288]);
}
// Check that the overwrites haven't messed up the filesystem state
let fsck_options = FsckOptions {
fail_on_warning: true,
no_lock: true,
on_error: Box::new(|err| println!("fsck error: {:?}", err)),
..Default::default()
};
fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_overwrite_when_unallocated_at_start_of_file() {
// The standard test data we put in the test object would cause an extent with checksums
// to be created, which overwrite() doesn't support. So we create an empty object instead.
let (fs, object) = test_filesystem_and_empty_object().await;
let object = ObjectStore::open_object(
object.owner(),
object.object_id(),
HandleOptions::default(),
None,
)
.await
.expect("open_object failed");
assert_eq!(fs.block_size(), 4096);
let mut write_buf = object.allocate_buffer(4096).await;
write_buf.as_mut_slice().fill(95);
// First try to overwrite without allowing allocations
// We expect this to fail, since nothing is allocated yet
object.overwrite(0, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
// Now try the same overwrite command as before, but allow allocations
object.overwrite(0, write_buf.as_mut(), true).await.expect("overwrite failed");
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(0, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[95; 4096]);
}
// Now try to overwrite at the next block. This should fail if allocations are disabled
object.overwrite(4096, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
// ... but it should work if allocations are enabled
object.overwrite(4096, write_buf.as_mut(), true).await.expect("overwrite failed");
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(4096, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[95; 4096]);
}
// Check that the overwrites haven't messed up the filesystem state
let fsck_options = FsckOptions {
fail_on_warning: true,
no_lock: true,
on_error: Box::new(|err| println!("fsck error: {:?}", err)),
..Default::default()
};
fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_overwrite_can_extend_a_file() {
// The standard test data we put in the test object would cause an extent with checksums
// to be created, which overwrite() doesn't support. So we create an empty object instead.
let (fs, object) = test_filesystem_and_empty_object().await;
let object = ObjectStore::open_object(
object.owner(),
object.object_id(),
HandleOptions::default(),
None,
)
.await
.expect("open_object failed");
assert_eq!(fs.block_size(), 4096);
assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
let mut write_buf = object.allocate_buffer(4096).await;
write_buf.as_mut_slice().fill(95);
// Let's try to fill up the last block, and increase the file size in doing so
let last_block_offset = round_down(TEST_OBJECT_SIZE, 4096 as u32);
// Expected to fail with allocations disabled
object
.overwrite(last_block_offset, write_buf.as_mut(), false)
.await
.expect_err("overwrite succeeded");
// ... but expected to succeed with allocations enabled
object
.overwrite(last_block_offset, write_buf.as_mut(), true)
.await
.expect("overwrite failed");
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(last_block_offset, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[95; 4096]);
}
assert_eq!(object.get_size(), 8192);
// Let's try to write at the next block, too
let next_block_offset = round_up(TEST_OBJECT_SIZE, 4096 as u32).unwrap();
// Expected to fail with allocations disabled
object
.overwrite(next_block_offset, write_buf.as_mut(), false)
.await
.expect_err("overwrite succeeded");
// ... but expected to succeed with allocations enabled
object
.overwrite(next_block_offset, write_buf.as_mut(), true)
.await
.expect("overwrite failed");
{
let mut read_buf = object.allocate_buffer(4096).await;
object.read(next_block_offset, read_buf.as_mut()).await.expect("read failed");
assert_eq!(&read_buf.as_slice(), &[95; 4096]);
}
assert_eq!(object.get_size(), 12288);
// Check that the overwrites haven't messed up the filesystem state
let fsck_options = FsckOptions {
fail_on_warning: true,
no_lock: true,
on_error: Box::new(|err| println!("fsck error: {:?}", err)),
..Default::default()
};
fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_enable_verity() {
let fs: OpenFxFilesystem = test_filesystem().await;
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let store = fs.root_store();
let object = Arc::new(
ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed"),
);
transaction.commit().await.unwrap();
object
.enable_verity(fio::VerificationOptions {
hash_algorithm: Some(fio::HashAlgorithm::Sha256),
salt: Some(vec![]),
..Default::default()
})
.await
.expect("set verified file metadata failed");
let handle =
ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
.await
.expect("open_object failed");
assert!(handle.verified_file());
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_enable_verity_large_file() {
// Need to make a large FakeDevice to create space for a 67 MB file.
let device = DeviceHolder::new(FakeDevice::new(262144, TEST_DEVICE_BLOCK_SIZE));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let root_store = fs.root_store();
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let handle = ObjectStore::create_object(
&root_store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("failed to create object");
transaction.commit().await.expect("commit failed");
let mut offset = 0;
// Write a file big enough to trigger multiple transactions on enable_verity().
let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
buf.as_mut_slice().fill(1);
for _ in 0..130 {
handle.write_or_append(Some(offset), buf.as_ref()).await.expect("write failed");
offset += WRITE_ATTR_BATCH_SIZE as u64;
}
handle
.enable_verity(fio::VerificationOptions {
hash_algorithm: Some(fio::HashAlgorithm::Sha256),
salt: Some(vec![]),
..Default::default()
})
.await
.expect("set verified file metadata failed");
let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
offset = 0;
for _ in 0..130 {
handle.read(offset, buf.as_mut()).await.expect("verification during read should fail");
assert_eq!(buf.as_slice(), &[1; WRITE_ATTR_BATCH_SIZE]);
offset += WRITE_ATTR_BATCH_SIZE as u64;
}
fsck(fs.clone()).await.expect("fsck failed");
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_retry_enable_verity_on_reboot() {
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let root_store = fs.root_store();
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let handle = ObjectStore::create_object(
&root_store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("failed to create object");
transaction.commit().await.expect("commit failed");
let object_id = {
let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
transaction.add(
root_store.store_object_id(),
Mutation::replace_or_insert_object(
ObjectKey::graveyard_attribute_entry(
root_store.graveyard_directory_object_id(),
handle.object_id(),
FSVERITY_MERKLE_ATTRIBUTE_ID,
),
ObjectValue::Some,
),
);
// This write should span three transactions. This test mimics the behavior when the
// last transaction gets interrupted by a filesystem.close().
handle
.handle()
.write_new_attr_in_batches(
&mut transaction,
FSVERITY_MERKLE_ATTRIBUTE_ID,
&vec![0; 2 * WRITE_ATTR_BATCH_SIZE],
WRITE_ATTR_BATCH_SIZE,
)
.await
.expect("failed to write merkle attribute");
handle.object_id()
// Drop the transaction to simulate interrupting the merkle tree creation as well as to
// release the transaction locks.
};
fs.close().await.expect("failed to close filesystem");
let device = fs.take_device().await;
device.reopen(false);
let fs =
FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
fsck(fs.clone()).await.expect("fsck failed");
fs.close().await.expect("failed to close filesystem");
let device = fs.take_device().await;
device.reopen(false);
// On open, the filesystem will call initial_reap which will call queue_tombstone().
let fs = FxFilesystem::open(device).await.expect("open failed");
let root_store = fs.root_store();
let handle =
ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
.await
.expect("open_object failed");
handle
.enable_verity(fio::VerificationOptions {
hash_algorithm: Some(fio::HashAlgorithm::Sha256),
salt: Some(vec![]),
..Default::default()
})
.await
.expect("set verified file metadata failed");
// `flush` will ensure that initial reap fully processes all the graveyard entries. This
// isn't strictly necessary for the test to pass (the graveyard marker was already
// processed during `enable_verity`), but it does help catch bugs, such as the attribute
// graveyard entry not being removed upon processing.
fs.graveyard().flush().await;
assert_eq!(
handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
Some(vec![0; <Sha256 as Hasher>::Digest::DIGEST_LEN].into())
);
fsck(fs.clone()).await.expect("fsck failed");
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_verify_data_corrupt_file() {
let fs: OpenFxFilesystem = test_filesystem().await;
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let store = fs.root_store();
let object = Arc::new(
ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed"),
);
transaction.commit().await.unwrap();
let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
buf.as_mut_slice().fill(123);
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
object
.enable_verity(fio::VerificationOptions {
hash_algorithm: Some(fio::HashAlgorithm::Sha256),
salt: Some(vec![]),
..Default::default()
})
.await
.expect("set verified file metadata failed");
// Change file contents and ensure verification fails
buf.as_mut_slice().fill(234);
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
object.read(0, buf.as_mut()).await.expect_err("verification during read should fail");
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_extend() {
let fs = test_filesystem().await;
let handle;
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let store = fs.root_store();
handle = ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed");
// As of writing, an empty filesystem has two 512kiB superblock extents and a little over
// 256kiB of additional allocations (journal, etc) so we start use a 'magic' starting point
// of 2MiB here.
const START_OFFSET: u64 = 2048 * 1024;
handle
.extend(&mut transaction, START_OFFSET..START_OFFSET + 5 * fs.block_size() as u64)
.await
.expect("extend failed");
transaction.commit().await.expect("commit failed");
let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize).await;
buf.as_mut_slice().fill(123);
handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
buf.as_mut_slice().fill(67);
handle.read(0, buf.as_mut()).await.expect("read failed");
assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_truncate_deallocates_old_extents() {
let (fs, object) = test_filesystem_and_object().await;
let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
buf.as_mut_slice().fill(0xaa);
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
let allocator = fs.allocator();
let allocated_before = allocator.get_allocated_bytes();
object.truncate(fs.block_size() as u64).await.expect("truncate failed");
let allocated_after = allocator.get_allocated_bytes();
assert!(
allocated_after < allocated_before,
"before = {} after = {}",
allocated_before,
allocated_after
);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_truncate_zeroes_tail_block() {
let (fs, object) = test_filesystem_and_object().await;
WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + 3).await.expect("truncate failed");
WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + TEST_DATA.len() as u64)
.await
.expect("truncate failed");
let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
let offset = (TEST_DATA_OFFSET % fs.block_size()) as usize;
object.read(TEST_DATA_OFFSET - offset as u64, buf.as_mut()).await.expect("read failed");
let mut expected = TEST_DATA.to_vec();
expected[3..].fill(0);
assert_eq!(&buf.as_slice()[offset..offset + expected.len()], &expected);
}
#[fuchsia::test]
async fn test_trim() {
// Format a new filesystem.
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
let block_size = fs.block_size();
root_volume(fs.clone())
.await
.expect("root_volume failed")
.new_volume("test", None)
.await
.expect("volume failed");
fs.close().await.expect("close failed");
let device = fs.take_device().await;
device.reopen(false);
// To test trim, we open the filesystem and set up a post commit hook that runs after every
// transaction. When the hook triggers, we can fsck the volume, take a snapshot of the
// device and check that it gets replayed correctly on the snapshot. We can check that the
// graveyard trims the file as expected.
#[derive(Default)]
struct Context {
store: Option<Arc<ObjectStore>>,
object_id: Option<u64>,
}
let shared_context = Arc::new(Mutex::new(Context::default()));
let object_size = (TRANSACTION_MUTATION_THRESHOLD as u64 + 10) * 2 * block_size;
// Wait for an object to get tombstoned by the graveyard.
async fn expect_tombstoned(store: &Arc<ObjectStore>, object_id: u64) {
loop {
if let Err(e) =
ObjectStore::open_object(store, object_id, HandleOptions::default(), None).await
{
assert!(FxfsError::NotFound.matches(&e));
break;
}
// The graveyard should eventually tombstone the object.
fasync::Timer::new(std::time::Duration::from_millis(100)).await;
}
}
// Checks to see if the object needs to be trimmed.
async fn needs_trim(store: &Arc<ObjectStore>) -> Option<DataObjectHandle<ObjectStore>> {
let root_directory = Directory::open(store, store.root_directory_object_id())
.await
.expect("open failed");
let oid = root_directory.lookup("foo").await.expect("lookup failed");
if let Some((oid, _)) = oid {
let object = ObjectStore::open_object(store, oid, HandleOptions::default(), None)
.await
.expect("open_object failed");
let props = object.get_properties().await.expect("get_properties failed");
if props.allocated_size > 0 && props.data_attribute_size == 0 {
Some(object)
} else {
None
}
} else {
None
}
}
let shared_context_clone = shared_context.clone();
let post_commit = move || {
let store = shared_context_clone.lock().unwrap().store.as_ref().cloned().unwrap();
let shared_context = shared_context_clone.clone();
async move {
// First run fsck on the current filesystem.
let options = FsckOptions {
fail_on_warning: true,
no_lock: true,
on_error: Box::new(|err| println!("fsck error: {:?}", err)),
..Default::default()
};
let fs = store.filesystem();
fsck_with_options(fs.clone(), &options).await.expect("fsck_with_options failed");
fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
.await
.expect("fsck_volume_with_options failed");
// Now check that we can replay this correctly.
fs.sync(SyncOptions { flush_device: true, ..Default::default() })
.await
.expect("sync failed");
let device = fs.device().snapshot().expect("snapshot failed");
let object_id = shared_context.lock().unwrap().object_id.clone();
let fs2 = FxFilesystemBuilder::new()
.skip_initial_reap(object_id.is_none())
.open(device)
.await
.expect("open failed");
// If the "foo" file exists check that allocated size matches content size.
let root_vol = root_volume(fs2.clone()).await.expect("root_volume failed");
let store = root_vol.volume("test", None).await.expect("volume failed");
if let Some(oid) = object_id {
// For the second pass, the object should get tombstoned.
expect_tombstoned(&store, oid).await;
} else if let Some(object) = needs_trim(&store).await {
// Extend the file and make sure that it is correctly trimmed.
object.truncate(object_size).await.expect("truncate failed");
let mut buf = object.allocate_buffer(block_size as usize).await;
object
.read(object_size - block_size * 2, buf.as_mut())
.await
.expect("read failed");
assert_eq!(buf.as_slice(), &vec![0; block_size as usize]);
// Remount, this time with the graveyard performing an initial reap and the
// object should get trimmed.
let fs = FxFilesystem::open(fs.device().snapshot().expect("snapshot failed"))
.await
.expect("open failed");
let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
let store = root_vol.volume("test", None).await.expect("volume failed");
while needs_trim(&store).await.is_some() {
// The object has been truncated, but still has some data allocated to
// it. The graveyard should trim the object eventually.
fasync::Timer::new(std::time::Duration::from_millis(100)).await;
}
// Run fsck.
fsck_with_options(fs.clone(), &options)
.await
.expect("fsck_with_options failed");
fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
.await
.expect("fsck_volume_with_options failed");
fs.close().await.expect("close failed");
}
// Run fsck on fs2.
fsck_with_options(fs2.clone(), &options).await.expect("fsck_with_options failed");
fsck_volume_with_options(fs2.as_ref(), &options, store.store_object_id(), None)
.await
.expect("fsck_volume_with_options failed");
fs2.close().await.expect("close failed");
}
.boxed()
};
let fs = FxFilesystemBuilder::new()
.post_commit_hook(post_commit)
.open(device)
.await
.expect("open failed");
let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
let store = root_vol.volume("test", None).await.expect("volume failed");
shared_context.lock().unwrap().store = Some(store.clone());
let root_directory =
Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
let object;
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
store.root_directory_object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
object = root_directory
.create_child_file(&mut transaction, "foo", None)
.await
.expect("create_object failed");
transaction.commit().await.expect("commit failed");
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
Options::default(),
)
.await
.expect("new_transaction failed");
// Two passes: first with a regular object, and then with that object moved into the
// graveyard.
let mut pass = 0;
loop {
// Create enough extents in it such that when we truncate the object it will require
// more than one transaction.
let mut buf = object.allocate_buffer(5).await;
buf.as_mut_slice().fill(1);
// Write every other block.
for offset in (0..object_size).into_iter().step_by(2 * block_size as usize) {
object
.txn_write(&mut transaction, offset, buf.as_ref())
.await
.expect("write failed");
}
transaction.commit().await.expect("commit failed");
// This should take up more than one transaction.
WriteObjectHandle::truncate(&object, 0).await.expect("truncate failed");
if pass == 1 {
break;
}
// Store the object ID so that we can make sure the object is always tombstoned
// after remount (see above).
shared_context.lock().unwrap().object_id = Some(object.object_id());
transaction = fs
.clone()
.new_transaction(
lock_keys![
LockKey::object(store.store_object_id(), store.root_directory_object_id()),
LockKey::object(store.store_object_id(), object.object_id()),
],
Options::default(),
)
.await
.expect("new_transaction failed");
// Move the object into the graveyard.
replace_child(&mut transaction, None, (&root_directory, "foo"))
.await
.expect("replace_child failed");
store.add_to_graveyard(&mut transaction, object.object_id());
pass += 1;
}
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_adjust_refs() {
let (fs, object) = test_filesystem_and_object().await;
let store = object.owner();
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
Options::default(),
)
.await
.expect("new_transaction failed");
assert_eq!(
store
.adjust_refs(&mut transaction, object.object_id(), 1)
.await
.expect("adjust_refs failed"),
false
);
transaction.commit().await.expect("commit failed");
let allocator = fs.allocator();
let allocated_before = allocator.get_allocated_bytes();
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
Options::default(),
)
.await
.expect("new_transaction failed");
assert_eq!(
store
.adjust_refs(&mut transaction, object.object_id(), -2)
.await
.expect("adjust_refs failed"),
true
);
transaction.commit().await.expect("commit failed");
assert_eq!(allocator.get_allocated_bytes(), allocated_before);
store
.tombstone_object(
object.object_id(),
Options { borrow_metadata_space: true, ..Default::default() },
)
.await
.expect("purge failed");
assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64);
// We need to remove the directory entry, too, otherwise fsck will complain
{
let mut transaction = fs
.clone()
.new_transaction(
lock_keys![LockKey::object(
store.store_object_id(),
store.root_directory_object_id()
)],
Options::default(),
)
.await
.expect("new_transaction failed");
let root_directory = Directory::open(&store, store.root_directory_object_id())
.await
.expect("open failed");
transaction.add(
store.store_object_id(),
Mutation::replace_or_insert_object(
ObjectKey::child(root_directory.object_id(), TEST_OBJECT_NAME),
ObjectValue::None,
),
);
transaction.commit().await.expect("commit failed");
}
fsck_with_options(
fs.clone(),
&FsckOptions {
fail_on_warning: true,
on_error: Box::new(|err| println!("fsck error: {:?}", err)),
..Default::default()
},
)
.await
.expect("fsck_with_options failed");
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_locks() {
let (fs, object) = test_filesystem_and_object().await;
let (send1, recv1) = channel();
let (send2, recv2) = channel();
let (send3, recv3) = channel();
let done = Mutex::new(false);
let mut futures = FuturesUnordered::new();
futures.push(
async {
let mut t = object.new_transaction().await.expect("new_transaction failed");
send1.send(()).unwrap(); // Tell the next future to continue.
send3.send(()).unwrap(); // Tell the last future to continue.
recv2.await.unwrap();
let mut buf = object.allocate_buffer(5).await;
buf.as_mut_slice().copy_from_slice(b"hello");
object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
// This is a halting problem so all we can do is sleep.
fasync::Timer::new(Duration::from_millis(100)).await;
assert!(!*done.lock().unwrap());
t.commit().await.expect("commit failed");
}
.boxed(),
);
futures.push(
async {
recv1.await.unwrap();
// Reads should not block.
let offset = TEST_DATA_OFFSET as usize;
let align = offset % fs.block_size() as usize;
let len = TEST_DATA.len();
let mut buf = object.allocate_buffer(align + len).await;
assert_eq!(
object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
align + TEST_DATA.len()
);
assert_eq!(&buf.as_slice()[align..], TEST_DATA);
// Tell the first future to continue.
send2.send(()).unwrap();
}
.boxed(),
);
futures.push(
async {
// This should block until the first future has completed.
recv3.await.unwrap();
let _t = object.new_transaction().await.expect("new_transaction failed");
let mut buf = object.allocate_buffer(5).await;
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
assert_eq!(buf.as_slice(), b"hello");
}
.boxed(),
);
while let Some(()) = futures.next().await {}
fs.close().await.expect("Close failed");
}
#[fuchsia::test(threads = 10)]
async fn test_racy_reads() {
let fs = test_filesystem().await;
let object;
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let store = fs.root_store();
object = Arc::new(
ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed"),
);
transaction.commit().await.expect("commit failed");
for _ in 0..100 {
let cloned_object = object.clone();
let writer = fasync::Task::spawn(async move {
let mut buf = cloned_object.allocate_buffer(10).await;
buf.as_mut_slice().fill(123);
cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
});
let cloned_object = object.clone();
let reader = fasync::Task::spawn(async move {
let wait_time = rand::thread_rng().gen_range(0..5);
fasync::Timer::new(Duration::from_millis(wait_time)).await;
let mut buf = cloned_object.allocate_buffer(10).await;
buf.as_mut_slice().fill(23);
let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
// If we succeed in reading data, it must include the write; i.e. if we see the size
// change, we should see the data too. For this to succeed it requires locking on
// the read size to ensure that when we read the size, we get the extents changed in
// that same transaction.
if amount != 0 {
assert_eq!(amount, 10);
assert_eq!(buf.as_slice(), &[123; 10]);
}
});
writer.await;
reader.await;
object.truncate(0).await.expect("truncate failed");
}
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_allocated_size() {
let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
let before = object.get_properties().await.expect("get_properties failed").allocated_size;
let mut buf = object.allocate_buffer(5).await;
buf.as_mut_slice().copy_from_slice(b"hello");
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
let after = object.get_properties().await.expect("get_properties failed").allocated_size;
assert_eq!(after, before + fs.block_size() as u64);
// Do the same write again and there should be no change.
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
assert_eq!(
object.get_properties().await.expect("get_properties failed").allocated_size,
after
);
// extend...
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
let offset = 1000 * fs.block_size() as u64;
let before = after;
object
.extend(&mut transaction, offset..offset + fs.block_size() as u64)
.await
.expect("extend failed");
transaction.commit().await.expect("commit failed");
let after = object.get_properties().await.expect("get_properties failed").allocated_size;
assert_eq!(after, before + fs.block_size() as u64);
// truncate...
let before = after;
let size = object.get_size();
object.truncate(size - fs.block_size() as u64).await.expect("extend failed");
let after = object.get_properties().await.expect("get_properties failed").allocated_size;
assert_eq!(after, before - fs.block_size() as u64);
// preallocate_range...
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
let before = after;
let mut file_range = offset..offset + fs.block_size() as u64;
object.preallocate_range(&mut transaction, &mut file_range).await.expect("extend failed");
transaction.commit().await.expect("commit failed");
let after = object.get_properties().await.expect("get_properties failed").allocated_size;
assert_eq!(after, before + fs.block_size() as u64);
fs.close().await.expect("Close failed");
}
#[fuchsia::test(threads = 10)]
async fn test_zero() {
let (fs, object) = test_filesystem_and_object().await;
let expected_size = object.get_size();
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
transaction.commit().await.expect("commit failed");
assert_eq!(object.get_size(), expected_size);
let mut buf = object.allocate_buffer(fs.block_size() as usize * 10).await;
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
assert_eq!(
&buf.as_slice()[0..expected_size as usize],
vec![0u8; expected_size as usize].as_slice()
);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_properties() {
let (fs, object) = test_filesystem_and_object().await;
const CRTIME: Timestamp = Timestamp::from_nanos(1234);
const MTIME: Timestamp = Timestamp::from_nanos(5678);
const CTIME: Timestamp = Timestamp::from_nanos(8765);
// ObjectProperties can be updated through `update_attributes`.
// `get_properties` should reflect the latest changes.
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object
.update_attributes(
&mut transaction,
Some(&fio::MutableNodeAttributes {
creation_time: Some(CRTIME.as_nanos()),
modification_time: Some(MTIME.as_nanos()),
mode: Some(111),
gid: Some(222),
..Default::default()
}),
None,
)
.await
.expect("update_attributes failed");
const MTIME_NEW: Timestamp = Timestamp::from_nanos(12345678);
object
.update_attributes(
&mut transaction,
Some(&fio::MutableNodeAttributes {
modification_time: Some(MTIME_NEW.as_nanos()),
gid: Some(333),
rdev: Some(444),
..Default::default()
}),
Some(CTIME),
)
.await
.expect("update_timestamps failed");
transaction.commit().await.expect("commit failed");
let properties = object.get_properties().await.expect("get_properties failed");
assert_matches!(
properties,
ObjectProperties {
refs: 1u64,
allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
data_attribute_size: TEST_OBJECT_SIZE,
creation_time: CRTIME,
modification_time: MTIME_NEW,
posix_attributes: Some(PosixAttributes { mode: 111, gid: 333, rdev: 444, .. }),
change_time: CTIME,
..
}
);
fs.close().await.expect("Close failed");
}
#[fuchsia::test]
async fn test_is_allocated() {
let (fs, object) = test_filesystem_and_object().await;
// `test_filesystem_and_object()` wrote the buffer `TEST_DATA` to the device at offset
// `TEST_DATA_OFFSET` where the length and offset are aligned to the block size.
let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
// Check for the case where where we have the following extent layout
// [ unallocated ][ `TEST_DATA` ]
// The extents before `aligned_offset` should not be allocated
let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
assert_eq!(count, aligned_offset);
assert_eq!(allocated, false);
let (allocated, count) =
object.is_allocated(aligned_offset).await.expect("is_allocated failed");
assert_eq!(count, aligned_length);
assert_eq!(allocated, true);
// Check for the case where where we query out of range
let end = aligned_offset + aligned_length;
object
.is_allocated(end)
.await
.expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
// Check for the case where where we start querying for allocation starting from
// an allocated range to the end of the device
let size = 50 * fs.block_size() as u64;
object.truncate(size).await.expect("extend failed");
let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
assert_eq!(count, size - end);
assert_eq!(allocated, false);
// Check for the case where where we have the following extent layout
// [ unallocated ][ `buf` ][ `buf` ]
let buf_length = 5 * fs.block_size();
let mut buf = object.allocate_buffer(buf_length as usize).await;
buf.as_mut_slice().fill(123);
let new_offset = end + 20 * fs.block_size() as u64;
object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
object
.write_or_append(Some(new_offset + buf_length), buf.as_ref())
.await
.expect("write failed");
let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
assert_eq!(count, new_offset - end);
assert_eq!(allocated, false);
let (allocated, count) =
object.is_allocated(new_offset).await.expect("is_allocated failed");
assert_eq!(count, 2 * buf_length);
assert_eq!(allocated, true);
// Check the case where we query from the middle of an extent
let (allocated, count) = object
.is_allocated(new_offset + 4 * fs.block_size())
.await
.expect("is_allocated failed");
assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
assert_eq!(allocated, true);
// Now, write buffer to a location already written to.
// Check for the case when we the following extent layout
// [ unallocated ][ `other_buf` ][ (part of) `buf` ][ `buf` ]
let other_buf_length = 3 * fs.block_size();
let mut other_buf = object.allocate_buffer(other_buf_length as usize).await;
other_buf.as_mut_slice().fill(231);
object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
// We still expect that `is_allocated(..)` will return that there are 2*`buf_length bytes`
// allocated from `new_offset`
let (allocated, count) =
object.is_allocated(new_offset).await.expect("is_allocated failed");
assert_eq!(count, 2 * buf_length);
assert_eq!(allocated, true);
// Check for the case when we the following extent layout
// [ unallocated ][ deleted ][ unallocated ][ deleted ][ allocated ]
// Mark TEST_DATA as deleted
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object
.zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
.await
.expect("zero failed");
// Mark `other_buf` as deleted
object
.zero(&mut transaction, new_offset..new_offset + buf_length)
.await
.expect("zero failed");
transaction.commit().await.expect("commit transaction failed");
let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
assert_eq!(count, new_offset + buf_length);
assert_eq!(allocated, false);
let (allocated, count) =
object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
assert_eq!(count, buf_length);
assert_eq!(allocated, true);
let new_end = new_offset + buf_length + count;
// Check for the case where there are objects with different keys.
// Case that we're checking for:
// [ unallocated ][ extent (object with different key) ][ unallocated ]
let store = object.owner();
let mut transaction = fs
.clone()
.new_transaction(lock_keys![], Options::default())
.await
.expect("new_transaction failed");
let object2 = ObjectStore::create_object(
&store,
&mut transaction,
HandleOptions::default(),
None,
None,
)
.await
.expect("create_object failed");
transaction.commit().await.expect("commit failed");
object2
.write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
.await
.expect("write failed");
// Expecting that the extent with a different key is treated like unallocated extent
let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
assert_eq!(count, size - new_end);
assert_eq!(allocated, false);
fs.close().await.expect("close failed");
}
#[fuchsia::test(threads = 10)]
async fn test_read_write_attr() {
let (_fs, object) = test_filesystem_and_object().await;
let data = [0xffu8; 16_384];
object.write_attr(20, &data).await.expect("write_attr failed");
let rdata =
object.read_attr(20).await.expect("read_attr failed").expect("no attribute data found");
assert_eq!(&data[..], &rdata[..]);
assert_eq!(object.read_attr(21).await.expect("read_attr failed"), None);
}
#[fuchsia::test(threads = 10)]
async fn test_allocate_basic() {
let (fs, object) = test_filesystem_and_empty_object().await;
let block_size = fs.block_size();
let file_size = block_size * 10;
object.truncate(file_size).await.unwrap();
let small_buf_size = 1024;
let large_buf_aligned_size = block_size as usize * 2;
let large_buf_size = block_size as usize * 2 + 1024;
let mut small_buf = object.allocate_buffer(small_buf_size).await;
let mut large_buf_aligned = object.allocate_buffer(large_buf_aligned_size).await;
let mut large_buf = object.allocate_buffer(large_buf_size).await;
assert_eq!(object.read(0, small_buf.as_mut()).await.unwrap(), small_buf_size);
assert_eq!(small_buf.as_slice(), &vec![0; small_buf_size]);
assert_eq!(object.read(0, large_buf.as_mut()).await.unwrap(), large_buf_size);
assert_eq!(large_buf.as_slice(), &vec![0; large_buf_size]);
assert_eq!(
object.read(0, large_buf_aligned.as_mut()).await.unwrap(),
large_buf_aligned_size
);
assert_eq!(large_buf_aligned.as_slice(), &vec![0; large_buf_aligned_size]);
// Allocation succeeds, and without any writes to the location it shows up as zero.
object.allocate(block_size..block_size * 3).await.unwrap();
// Test starting before, inside, and after the allocated section with every sized buffer.
for (buf_index, buf) in [small_buf, large_buf, large_buf_aligned].iter_mut().enumerate() {
for offset in 0..4 {
assert_eq!(
object.read(block_size * offset, buf.as_mut()).await.unwrap(),
buf.len(),
"buf_index: {}, read offset: {}",
buf_index,
offset,
);
assert_eq!(
buf.as_slice(),
&vec![0; buf.len()],
"buf_index: {}, read offset: {}",
buf_index,
offset,
);
}
}
fs.close().await.expect("close failed");
}
#[fuchsia::test(threads = 10)]
async fn test_allocate_extends_file() {
const BUF_SIZE: usize = 1024;
let (fs, object) = test_filesystem_and_empty_object().await;
let mut buf = object.allocate_buffer(BUF_SIZE).await;
let block_size = fs.block_size();
assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
assert!(TEST_OBJECT_SIZE < block_size * 4);
// Allocation succeeds, and without any writes to the location it shows up as zero.
object.allocate(0..block_size * 4).await.unwrap();
assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
assert_eq!(object.read(block_size, buf.as_mut()).await.unwrap(), buf.len());
assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
assert_eq!(object.read(block_size * 3, buf.as_mut()).await.unwrap(), buf.len());
assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
fs.close().await.expect("close failed");
}
#[fuchsia::test(threads = 10)]
async fn test_allocate_past_end() {
const BUF_SIZE: usize = 1024;
let (fs, object) = test_filesystem_and_empty_object().await;
let mut buf = object.allocate_buffer(BUF_SIZE).await;
let block_size = fs.block_size();
assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
assert!(TEST_OBJECT_SIZE < block_size * 4);
// Allocation succeeds, and without any writes to the location it shows up as zero.
object.allocate(block_size * 4..block_size * 6).await.unwrap();
assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
assert_eq!(object.read(block_size * 4, buf.as_mut()).await.unwrap(), buf.len());
assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
assert_eq!(object.read(block_size * 5, buf.as_mut()).await.unwrap(), buf.len());
assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
fs.close().await.expect("close failed");
}
#[fuchsia::test(threads = 10)]
async fn test_allocate_read_attr() {
let (fs, object) = test_filesystem_and_empty_object().await;
let block_size = fs.block_size();
let file_size = block_size * 4;
object.truncate(file_size).await.unwrap();
let content = object
.read_attr(object.attribute_id())
.await
.expect("failed to read attr")
.expect("attr returned none");
assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
object.allocate(block_size..block_size * 3).await.unwrap();
let content = object
.read_attr(object.attribute_id())
.await
.expect("failed to read attr")
.expect("attr returned none");
assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
fs.close().await.expect("close failed");
}
#[fuchsia::test(threads = 10)]
async fn test_allocate_existing_data() {
struct Case {
written_ranges: Vec<Range<usize>>,
allocate_range: Range<u64>,
}
let cases = [
Case { written_ranges: vec![4..7], allocate_range: 4..7 },
Case { written_ranges: vec![4..7], allocate_range: 3..8 },
Case { written_ranges: vec![4..7], allocate_range: 5..6 },
Case { written_ranges: vec![4..7], allocate_range: 5..8 },
Case { written_ranges: vec![4..7], allocate_range: 3..5 },
Case { written_ranges: vec![0..1, 2..3, 4..5, 6..7, 8..9], allocate_range: 0..10 },
Case { written_ranges: vec![0..2, 4..6, 7..10], allocate_range: 1..8 },
];
for case in cases {
let (fs, object) = test_filesystem_and_empty_object().await;
let block_size = fs.block_size();
let file_size = block_size * 10;
object.truncate(file_size).await.unwrap();
for write in &case.written_ranges {
let write_len = (write.end - write.start) * block_size as usize;
let mut write_buf = object.allocate_buffer(write_len).await;
write_buf.as_mut_slice().fill(0xff);
assert_eq!(
object
.write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
.await
.unwrap(),
file_size
);
}
let mut expected_buf = object.allocate_buffer(file_size as usize).await;
assert_eq!(object.read(0, expected_buf.as_mut()).await.unwrap(), expected_buf.len());
object
.allocate(
case.allocate_range.start * block_size..case.allocate_range.end * block_size,
)
.await
.unwrap();
let mut read_buf = object.allocate_buffer(file_size as usize).await;
assert_eq!(object.read(0, read_buf.as_mut()).await.unwrap(), read_buf.len());
assert_eq!(read_buf.as_slice(), expected_buf.as_slice());
fs.close().await.expect("close failed");
}
}
}