blob: b35ad0ffe8ad55a83f904333d20a1ec37558ddd9 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
checksum::fletcher64,
crypt::{UnwrappedKeys, XtsCipherSet},
errors::FxfsError,
log::*,
lsm_tree::types::{ItemRef, LayerIterator},
object_handle::{
GetProperties, ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes,
WriteObjectHandle,
},
object_store::{
extent_record::{Checksums, ExtentKey, ExtentValue},
object_manager::ObjectManager,
object_record::{
AttributeKey, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData, ObjectKind,
ObjectValue, Timestamp,
},
transaction::{
self, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Options,
Transaction,
},
HandleOptions, ObjectStore,
},
round::{round_down, round_up},
},
anyhow::{anyhow, bail, Context, Error},
async_trait::async_trait,
futures::{
stream::{FuturesOrdered, FuturesUnordered},
try_join, TryStreamExt,
},
std::{
cmp::min,
ops::{Bound, Range},
sync::{
atomic::{self, AtomicBool, AtomicU64},
Arc,
},
},
storage_device::buffer::{Buffer, BufferRef, MutableBufferRef},
};
pub struct StoreObjectHandle<S: AsRef<ObjectStore> + Send + Sync + 'static> {
owner: Arc<S>,
pub(super) object_id: u64,
pub(super) attribute_id: u64,
pub(super) options: HandleOptions,
pub(super) trace: AtomicBool,
keys: Option<XtsCipherSet>,
content_size: AtomicU64,
}
impl<S: AsRef<ObjectStore> + Send + Sync + 'static> StoreObjectHandle<S> {
pub fn new(
owner: Arc<S>,
object_id: u64,
keys: Option<UnwrappedKeys>,
attribute_id: u64,
size: u64,
options: HandleOptions,
trace: bool,
) -> Self {
Self {
owner,
object_id,
keys: keys.as_ref().map(XtsCipherSet::new),
attribute_id,
options,
trace: AtomicBool::new(trace),
content_size: AtomicU64::new(size),
}
}
pub fn owner(&self) -> &Arc<S> {
&self.owner
}
pub fn attribute_id(&self) -> u64 {
self.attribute_id
}
pub fn store(&self) -> &ObjectStore {
self.owner.as_ref().as_ref()
}
/// Extend the file with the given extent. The only use case for this right now is for files
/// that must exist at certain offsets on the device, such as super-blocks.
pub async fn extend<'a>(
&'a self,
transaction: &mut Transaction<'a>,
device_range: Range<u64>,
) -> Result<(), Error> {
let old_end =
round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
let new_size = old_end + device_range.end - device_range.start;
self.store()
.allocator()
.mark_allocated(transaction, self.store().store_object_id(), device_range.clone())
.await?;
transaction.add_with_object(
self.store().store_object_id,
Mutation::replace_or_insert_object(
ObjectKey::attribute(self.object_id, self.attribute_id, AttributeKey::Size),
ObjectValue::attribute(new_size),
),
AssocObj::Borrowed(self),
);
transaction.add(
self.store().store_object_id,
Mutation::merge_object(
ObjectKey::extent(self.object_id, self.attribute_id, old_end..new_size),
ObjectValue::Extent(ExtentValue::new(device_range.start)),
),
);
self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
}
// Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
// the data from `buf`.
async fn align_buffer(
&self,
offset: u64,
buf: BufferRef<'_>,
) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
let block_size = self.block_size();
let end = offset + buf.len() as u64;
let aligned =
round_down(offset, block_size)..round_up(end, block_size).ok_or(FxfsError::TooBig)?;
let mut aligned_buf =
self.store().device.allocate_buffer((aligned.end - aligned.start) as usize);
// Deal with head alignment.
if aligned.start < offset {
let mut head_block = aligned_buf.subslice_mut(..block_size as usize);
let read = self.read(aligned.start, head_block.reborrow()).await?;
head_block.as_mut_slice()[read..].fill(0);
}
// Deal with tail alignment.
if aligned.end > end {
let end_block_offset = aligned.end - block_size;
// There's no need to read the tail block if we read it as part of the head block.
if offset <= end_block_offset {
let mut tail_block =
aligned_buf.subslice_mut(aligned_buf.len() - block_size as usize..);
let read = self.read(end_block_offset, tail_block.reborrow()).await?;
tail_block.as_mut_slice()[read..].fill(0);
}
}
aligned_buf.as_mut_slice()
[(offset - aligned.start) as usize..(end - aligned.start) as usize]
.copy_from_slice(buf.as_slice());
Ok((aligned, aligned_buf))
}
// Writes potentially unaligned data at `device_offset` and returns checksums if requested. The
// data will be encrypted if necessary.
// `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
// the buffer in-place rather than copying to another buffer if the write is already aligned.
async fn write_at(
&self,
offset: u64,
buf: MutableBufferRef<'_>,
device_offset: u64,
compute_checksum: bool,
) -> Result<Checksums, Error> {
let mut transfer_buf;
let bs = self.block_size();
let (range, mut transfer_buf_ref) = if offset % bs == 0 && buf.len() as u64 % bs == 0 {
(offset..offset + buf.len() as u64, buf)
} else {
let (range, buf) = self.align_buffer(offset, buf.as_ref()).await?;
transfer_buf = buf;
(range, transfer_buf.as_mut())
};
if let Some(keys) = &self.keys {
// TODO(https://fxbug.dev/92975): Support key_id != 0.
keys.encrypt(range.start, 0, transfer_buf_ref.as_mut_slice())?;
}
self.write_aligned(
transfer_buf_ref.as_ref(),
device_offset - (offset - range.start),
compute_checksum,
)
.await
}
// Writes aligned data (that should already be encrypted) to the given offset and computes
// checksums if requested.
async fn write_aligned(
&self,
buf: BufferRef<'_>,
device_offset: u64,
compute_checksum: bool,
) -> Result<Checksums, Error> {
if self.trace.load(atomic::Ordering::Relaxed) {
info!(
store_id = self.store().store_object_id(),
oid = self.object_id,
device_range = ?(device_offset..device_offset + buf.len() as u64),
len = buf.len(),
"W",
);
}
let mut checksums = Vec::new();
try_join!(self.store().device.write(device_offset, buf), async {
if compute_checksum {
let block_size = self.block_size();
for chunk in buf.as_slice().chunks_exact(block_size as usize) {
checksums.push(fletcher64(chunk, 0));
}
}
Ok(())
})?;
Ok(if compute_checksum { Checksums::Fletcher(checksums) } else { Checksums::None })
}
// Returns the amount deallocated.
async fn deallocate_old_extents(
&self,
transaction: &mut Transaction<'_>,
range: Range<u64>,
) -> Result<u64, Error> {
let block_size = self.block_size();
assert_eq!(range.start % block_size, 0);
assert_eq!(range.end % block_size, 0);
if range.start == range.end {
return Ok(0);
}
let tree = &self.store().tree;
let layer_set = tree.layer_set();
let key = ExtentKey { range };
let lower_bound = ObjectKey::attribute(
self.object_id,
self.attribute_id,
AttributeKey::Extent(key.search_key()),
);
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Included(&lower_bound)).await?;
let allocator = self.store().allocator();
let mut deallocated = 0;
let trace = self.trace.load(atomic::Ordering::Relaxed);
while let Some(ItemRef {
key:
ObjectKey {
object_id,
data: ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
},
value: ObjectValue::Extent(value),
..
}) = iter.get()
{
if *object_id != self.object_id || *attribute_id != self.attribute_id {
break;
}
if let ExtentValue::Some { device_offset, .. } = value {
if let Some(overlap) = key.overlap(extent_key) {
let range = device_offset + overlap.start - extent_key.range.start
..device_offset + overlap.end - extent_key.range.start;
if trace {
info!(
store_id = self.store().store_object_id(),
oid = self.object_id,
device_range = ?range,
len = range.end - range.start,
?extent_key,
"D",
);
}
allocator
.deallocate(transaction, self.store().store_object_id(), range)
.await?;
deallocated += overlap.end - overlap.start;
} else {
break;
}
}
iter.advance().await?;
}
Ok(deallocated)
}
/// Zeroes the given range. The range must be aligned. Returns the amount of data deallocated.
pub async fn zero(
&self,
transaction: &mut Transaction<'_>,
range: Range<u64>,
) -> Result<(), Error> {
let deallocated = self.deallocate_old_extents(transaction, range.clone()).await?;
if deallocated > 0 {
self.update_allocated_size(transaction, 0, deallocated).await?;
transaction.add(
self.store().store_object_id,
Mutation::merge_object(
ObjectKey::extent(self.object_id, self.attribute_id, range),
ObjectValue::Extent(ExtentValue::deleted_extent()),
),
);
}
Ok(())
}
/// Return information on a contiguous set of extents that has the same allocation status,
/// starting from `start_offset`. The information returned is if this set of extents are marked
/// allocated/not allocated and also the size of this set (in bytes). This is used when
/// querying slices for volumes.
/// This function expects `start_offset` to be aligned to block size
pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
let block_size = self.block_size();
assert_eq!(start_offset % block_size, 0);
if start_offset > self.get_size() {
bail!(FxfsError::OutOfRange)
}
if start_offset == self.get_size() {
return Ok((false, 0));
}
let tree = &self.store().tree;
let layer_set = tree.layer_set();
let offset_key = ObjectKey::attribute(
self.object_id,
self.attribute_id,
AttributeKey::Extent(ExtentKey::search_key_from_offset(start_offset)),
);
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Included(&offset_key)).await?;
let mut allocated = None;
let mut end = start_offset;
loop {
// Iterate through the extents, each time setting `end` as the end of the previous
// extent
match iter.get() {
Some(ItemRef {
key:
ObjectKey {
object_id,
data:
ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
},
value: ObjectValue::Extent(extent_value),
..
}) => {
// Equivalent of getting no extents back
if *object_id != self.object_id || *attribute_id != self.attribute_id {
if allocated == Some(false) || allocated.is_none() {
end = self.get_size();
allocated = Some(false);
}
break;
}
if extent_key.range.start > end {
// If a previous extent has already been visited and we are tracking an
// allocated set, we are only interested in an extent where the range of the
// current extent follows immediately after the previous one.
if allocated == Some(true) {
break;
} else {
// The gap between the previous `end` and this extent is not allocated
end = extent_key.range.start;
allocated = Some(false);
// Continue this iteration, except now the `end` is set to the end of
// the "previous" extent which is this gap between the start_offset
// and the current extent
}
}
// We can assume that from here, the `end` points to the end of a previous
// extent.
match extent_value {
// The current extent has been allocated
ExtentValue::Some { .. } => {
// Stop searching if previous extent was marked deleted
if allocated == Some(false) {
break;
}
allocated = Some(true);
}
// This extent has been marked deleted
ExtentValue::None => {
// Stop searching if previous extent was marked allocated
if allocated == Some(true) {
break;
}
allocated = Some(false);
}
}
end = extent_key.range.end;
}
// This occurs when there are no extents left
None => {
if allocated == Some(false) || allocated.is_none() {
end = self.get_size();
allocated = Some(false);
}
// Otherwise, we were monitoring extents that were allocated, so just exit.
break;
}
// Non-extent records (Object, Child, GraveyardEntry) are ignored.
Some(_) => {}
}
iter.advance().await?;
}
Ok((allocated.unwrap(), end - start_offset))
}
pub async fn txn_write<'a>(
&'a self,
transaction: &mut Transaction<'a>,
offset: u64,
buf: BufferRef<'_>,
) -> Result<(), Error> {
if buf.is_empty() {
return Ok(());
}
let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
self.multi_write(transaction, &[aligned], transfer_buf.as_mut()).await?;
if offset + buf.len() as u64 > self.txn_get_size(transaction) {
transaction.add_with_object(
self.store().store_object_id,
Mutation::replace_or_insert_object(
ObjectKey::attribute(self.object_id, self.attribute_id, AttributeKey::Size),
ObjectValue::attribute(offset + buf.len() as u64),
),
AssocObj::Borrowed(self),
);
}
Ok(())
}
// Writes to multiple ranges with data provided in `buf`. The buffer can be modified in place
// if encryption takes place. The ranges must all be aligned and no change to content size is
// applied; the caller is responsible for updating size if required.
pub async fn multi_write<'a>(
&'a self,
transaction: &mut Transaction<'a>,
ranges: &[Range<u64>],
mut buf: MutableBufferRef<'_>,
) -> Result<(), Error> {
if buf.is_empty() {
return Ok(());
}
let block_size = self.block_size();
let store = self.store();
let store_id = store.store_object_id;
if let Some(keys) = &self.keys {
let mut slice = buf.as_mut_slice();
for r in ranges {
let l = r.end - r.start;
let (head, tail) = slice.split_at_mut(l as usize);
// TODO(https://fxbug.dev/92975): Support key_id != 0.
keys.encrypt(r.start, 0, head)?;
slice = tail;
}
}
let mut allocated = 0;
let allocator = store.allocator();
let trace = self.trace.load(atomic::Ordering::Relaxed);
let mut writes = FuturesOrdered::new();
while !buf.is_empty() {
let device_range = allocator
.allocate(transaction, self.store().store_object_id(), buf.len() as u64)
.await
.context("allocation failed")?;
if trace {
info!(
store_id,
oid = self.object_id,
?device_range,
len = device_range.end - device_range.start,
"A",
);
}
let device_range_len = device_range.end - device_range.start;
allocated += device_range_len;
let (head, tail) = buf.split_at_mut(device_range_len as usize);
buf = tail;
writes.push(async move {
let len = head.len() as u64;
Result::<_, Error>::Ok((
device_range.start,
len,
self.write_aligned(head.as_ref(), device_range.start, true).await?,
))
});
}
let (mutations, deallocated) = try_join!(
async {
let mut current_range = 0..0;
let mut mutations = Vec::new();
let mut ranges = ranges.iter();
while let Some((mut device_offset, mut len, mut checksums)) =
writes.try_next().await?
{
while len > 0 {
if current_range.end <= current_range.start {
current_range = ranges.next().unwrap().clone();
}
let l = std::cmp::min(len, current_range.end - current_range.start);
let tail = checksums.split_off((l / block_size) as usize);
mutations.push(Mutation::merge_object(
ObjectKey::extent(
self.object_id,
self.attribute_id,
current_range.start..current_range.start + l,
),
ObjectValue::Extent(ExtentValue::with_checksum(
device_offset,
checksums,
)),
));
checksums = tail;
device_offset += l;
len -= l;
current_range.start += l;
}
}
Result::<_, Error>::Ok(mutations)
},
async {
let mut deallocated = 0;
let aligned_size = round_up(self.txn_get_size(transaction), block_size)
.ok_or(anyhow!(FxfsError::Inconsistent).context("flush: Bad size"))?;
for r in ranges {
if r.start < aligned_size {
deallocated += self.deallocate_old_extents(transaction, r.clone()).await?;
}
}
Result::<_, Error>::Ok(deallocated)
}
)?;
for m in mutations {
transaction.add(store_id, m);
}
self.update_allocated_size(transaction, allocated, deallocated).await
}
// All the extents for the range must have been preallocated using preallocate_range or from
// existing writes.
// `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
// the buffer in-place rather than copying to another buffer if the write is already aligned.
pub async fn overwrite(
&self,
mut offset: u64,
mut buf: MutableBufferRef<'_>,
) -> Result<(), Error> {
let tree = &self.store().tree;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger
.seek(Bound::Included(&ObjectKey::attribute(
self.object_id,
self.attribute_id,
AttributeKey::Extent(ExtentKey::search_key_from_offset(offset)),
)))
.await?;
loop {
let (device_offset, to_do) = match iter.get() {
Some(ItemRef {
key:
ObjectKey {
object_id,
data:
ObjectKeyData::Attribute(
attribute_id,
AttributeKey::Extent(ExtentKey { range }),
),
},
value:
ObjectValue::Extent(ExtentValue::Some {
device_offset,
checksums: Checksums::None,
..
}),
..
}) if *object_id == self.object_id
&& *attribute_id == self.attribute_id
&& range.start <= offset =>
{
(
device_offset + (offset - range.start),
min(buf.len(), (range.end - offset) as usize),
)
}
_ => bail!("offset {} not allocated/has checksums", offset),
};
let (part, remainder) = buf.split_at_mut(to_do);
self.write_at(offset, part, device_offset, false).await?;
if remainder.len() == 0 {
break;
}
buf = remainder;
offset += to_do as u64;
iter.advance().await?;
}
Ok(())
}
// If |transaction| has an impending mutation for the underlying object, returns that.
// Otherwise, looks up the object from the tree.
async fn txn_get_object_mutation(
&self,
transaction: &Transaction<'_>,
) -> Result<ObjectStoreMutation, Error> {
self.store().txn_get_object_mutation(transaction, self.object_id).await
}
// Within a transaction, the size of the object might have changed, so get the size from there
// if it exists, otherwise, fall back on the cached size.
fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
transaction
.get_object_mutation(
self.store().store_object_id,
ObjectKey::attribute(self.object_id, self.attribute_id, AttributeKey::Size),
)
.and_then(|m| {
if let ObjectItem { value: ObjectValue::Attribute { size }, .. } = m.item {
Some(size)
} else {
None
}
})
.unwrap_or_else(|| self.get_size())
}
async fn update_allocated_size(
&self,
transaction: &mut Transaction<'_>,
allocated: u64,
deallocated: u64,
) -> Result<(), Error> {
if allocated == deallocated {
return Ok(());
}
let mut mutation = self.txn_get_object_mutation(transaction).await?;
if let ObjectValue::Object { kind: ObjectKind::File { allocated_size, .. }, .. } =
&mut mutation.item.value
{
// The only way for these to fail are if the volume is inconsistent.
*allocated_size = allocated_size
.checked_add(allocated)
.ok_or_else(|| anyhow!(FxfsError::Inconsistent).context("Allocated size overflow"))?
.checked_sub(deallocated)
.ok_or_else(|| {
anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
})?;
} else {
panic!("Unexpceted object value");
}
transaction.add(self.store().store_object_id, Mutation::ObjectStore(mutation));
Ok(())
}
pub async fn truncate<'a>(
&'a self,
transaction: &mut Transaction<'a>,
size: u64,
) -> Result<(), Error> {
let old_size = self.txn_get_size(transaction);
if self.trace.load(atomic::Ordering::Relaxed) {
info!(
store_id = self.store().store_object_id(),
oid = self.object_id,
old_size,
orig_size = self.get_size(),
new_size = size,
"T",
);
}
if size < old_size {
let block_size = self.block_size();
let aligned_size = round_up(size, block_size).ok_or(FxfsError::TooBig)?;
self.zero(
transaction,
aligned_size..round_up(old_size, block_size).ok_or_else(|| {
anyhow!(FxfsError::Inconsistent).context("truncate: Bad size")
})?,
)
.await?;
let to_zero = aligned_size - size;
if to_zero > 0 {
assert!(to_zero < block_size);
// We intentionally use the COW write path even if we're in overwrite mode. There's
// no need to support overwrite mode here, and it would be difficult since we'd need
// to transactionalize zeroing the tail of the last block with the other metadata
// changes, which we don't currently have a way to do.
//
// TODO(fxbug.dev/88676): This is allocating a small buffer that we'll just end up
// copying. Is there a better way?
//
// TODO(fxbug.dev/88676): This might cause an allocation when there needs be none;
// ideally this would know if the tail block is allocated and if it isn't, it should
// leave it be.
let mut buf = self.store().device.allocate_buffer(to_zero as usize);
buf.as_mut_slice().fill(0);
self.txn_write(transaction, size, buf.as_ref()).await?;
}
}
transaction.add_with_object(
self.store().store_object_id,
Mutation::replace_or_insert_object(
ObjectKey::attribute(self.object_id, self.attribute_id, AttributeKey::Size),
ObjectValue::attribute(size),
),
AssocObj::Borrowed(self),
);
Ok(())
}
// Must be multiple of block size.
pub async fn preallocate_range<'a>(
&'a self,
transaction: &mut Transaction<'a>,
mut file_range: Range<u64>,
) -> Result<Vec<Range<u64>>, Error> {
assert_eq!(file_range.start % self.block_size(), 0);
assert_eq!(file_range.end % self.block_size(), 0);
assert!(self.keys.is_none());
let mut ranges = Vec::new();
let tree = &self.store().tree;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger
.seek(Bound::Included(&ObjectKey::attribute(
self.object_id,
self.attribute_id,
AttributeKey::Extent(ExtentKey::search_key_from_offset(file_range.start)),
)))
.await?;
let mut allocated = 0;
'outer: while file_range.start < file_range.end {
let allocate_end = loop {
match iter.get() {
// Case for allocated extents for the same object that overlap with file_range.
Some(ItemRef {
key:
ObjectKey {
object_id,
data:
ObjectKeyData::Attribute(
attribute_id,
AttributeKey::Extent(ExtentKey { range }),
),
},
value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
..
}) if *object_id == self.object_id
&& *attribute_id == self.attribute_id
&& range.start < file_range.end =>
{
// If the start of the requested file_range overlaps with an existing extent...
if range.start <= file_range.start {
// Record the existing extent and move on.
let device_range = device_offset + file_range.start - range.start
..device_offset + min(range.end, file_range.end) - range.start;
file_range.start += device_range.end - device_range.start;
ranges.push(device_range);
if file_range.start >= file_range.end {
break 'outer;
}
iter.advance().await?;
continue;
} else {
// There's nothing allocated between file_range.start and the beginning
// of this extent.
break range.start;
}
}
// Case for deleted extents eclipsed by file_range.
Some(ItemRef {
key:
ObjectKey {
object_id,
data:
ObjectKeyData::Attribute(
attribute_id,
AttributeKey::Extent(ExtentKey { range }),
),
},
value: ObjectValue::Extent(ExtentValue::None),
..
}) if *object_id == self.object_id
&& *attribute_id == self.attribute_id
&& range.end < file_range.end =>
{
iter.advance().await?;
}
_ => {
// We can just preallocate the rest.
break file_range.end;
}
}
};
let device_range = self
.store()
.allocator()
.allocate(
transaction,
self.store().store_object_id(),
allocate_end - file_range.start,
)
.await
.context("Allocation failed")?;
allocated += device_range.end - device_range.start;
let this_file_range =
file_range.start..file_range.start + device_range.end - device_range.start;
file_range.start = this_file_range.end;
transaction.add(
self.store().store_object_id,
Mutation::merge_object(
ObjectKey::extent(self.object_id, self.attribute_id, this_file_range),
ObjectValue::Extent(ExtentValue::new(device_range.start)),
),
);
ranges.push(device_range);
// If we didn't allocate all that we requested, we'll loop around and try again.
}
// Update the file size if it changed.
if file_range.end > self.txn_get_size(transaction) {
transaction.add_with_object(
self.store().store_object_id,
Mutation::replace_or_insert_object(
ObjectKey::attribute(self.object_id, self.attribute_id, AttributeKey::Size),
ObjectValue::attribute(file_range.end),
),
AssocObj::Borrowed(self),
);
}
self.update_allocated_size(transaction, allocated, 0).await?;
Ok(ranges)
}
pub async fn write_timestamps<'a>(
&'a self,
transaction: &mut Transaction<'a>,
crtime: Option<Timestamp>,
mtime: Option<Timestamp>,
) -> Result<(), Error> {
if let (None, None) = (crtime.as_ref(), mtime.as_ref()) {
return Ok(());
}
let mut mutation = self.txn_get_object_mutation(transaction).await?;
if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
if let Some(time) = crtime {
attributes.creation_time = time;
}
if let Some(time) = mtime {
attributes.modification_time = time;
}
} else {
bail!(
anyhow!(FxfsError::Inconsistent).context("write_timestamps: Expected object value")
);
};
transaction.add(self.store().store_object_id(), Mutation::ObjectStore(mutation));
Ok(())
}
pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
self.new_transaction_with_options(Options {
skip_journal_checks: self.options.skip_journal_checks,
..Default::default()
})
.await
}
pub async fn new_transaction_with_options<'b>(
&self,
options: Options<'b>,
) -> Result<Transaction<'b>, Error> {
Ok(self
.store()
.filesystem()
.new_transaction(
&[LockKey::object_attribute(
self.store().store_object_id,
self.object_id,
self.attribute_id,
)],
options,
)
.await?)
}
/// Flushes the underlying device. This is expensive and should be used sparingly.
pub async fn flush_device(&self) -> Result<(), Error> {
self.store().device().flush().await
}
async fn read_and_decrypt(
&self,
device_offset: u64,
file_offset: u64,
mut buffer: MutableBufferRef<'_>,
key_id: u64,
) -> Result<(), Error> {
self.store().device.read(device_offset, buffer.reborrow()).await?;
if let Some(keys) = &self.keys {
keys.decrypt(file_offset, key_id, buffer.as_mut_slice())?;
}
Ok(())
}
}
impl<S: AsRef<ObjectStore> + Send + Sync + 'static> AssociatedObject for StoreObjectHandle<S> {
fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
match mutation {
Mutation::ObjectStore(ObjectStoreMutation {
item: ObjectItem { value: ObjectValue::Attribute { size }, .. },
..
}) => self.content_size.store(*size, atomic::Ordering::Relaxed),
_ => {}
}
}
}
impl<S: AsRef<ObjectStore> + Send + Sync + 'static> ObjectHandle for StoreObjectHandle<S> {
fn set_trace(&self, v: bool) {
info!(store_id = self.store().store_object_id, oid = self.object_id(), trace = v, "trace");
self.trace.store(v, atomic::Ordering::Relaxed);
}
fn object_id(&self) -> u64 {
return self.object_id;
}
fn allocate_buffer(&self, size: usize) -> Buffer<'_> {
self.store().device.allocate_buffer(size)
}
fn block_size(&self) -> u64 {
self.store().block_size()
}
fn get_size(&self) -> u64 {
self.content_size.load(atomic::Ordering::Relaxed)
}
}
#[async_trait]
impl<S: AsRef<ObjectStore> + Send + Sync + 'static> GetProperties for StoreObjectHandle<S> {
async fn get_properties(&self) -> Result<ObjectProperties, Error> {
// Take a read guard since we need to return a consistent view of all object properties.
let fs = self.store().filesystem();
let _guard = fs
.read_lock(&[LockKey::object_attribute(
self.store().store_object_id,
self.object_id,
self.attribute_id,
)])
.await;
let item = self
.store()
.tree
.find(&ObjectKey::object(self.object_id))
.await?
.expect("Unable to find object record");
match item.value {
ObjectValue::Object {
kind: ObjectKind::File { refs, allocated_size, .. },
attributes: ObjectAttributes { creation_time, modification_time },
} => Ok(ObjectProperties {
refs,
allocated_size,
data_attribute_size: self.get_size(),
creation_time,
modification_time,
sub_dirs: 0,
}),
_ => bail!(FxfsError::NotFile),
}
}
}
#[async_trait]
impl<S: AsRef<ObjectStore> + Send + Sync + 'static> ReadObjectHandle for StoreObjectHandle<S> {
async fn read(&self, mut offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
if buf.len() == 0 {
return Ok(0);
}
// Whilst the read offset must be aligned to the filesystem block size, the buffer need only
// be aligned to the device's block size.
let block_size = self.block_size() as u64;
let device_block_size = self.store().device.block_size() as u64;
assert_eq!(offset % block_size, 0);
assert_eq!(buf.range().start as u64 % device_block_size, 0);
let fs = self.store().filesystem();
let _guard = fs
.read_lock(&[LockKey::object_attribute(
self.store().store_object_id,
self.object_id,
self.attribute_id,
)])
.await;
let size = self.get_size();
if offset >= size {
return Ok(0);
}
let tree = &self.store().tree;
let layer_set = tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger
.seek(Bound::Included(&ObjectKey::extent(
self.object_id,
self.attribute_id,
offset..offset + 1,
)))
.await?;
let to_do = min(buf.len() as u64, size - offset) as usize;
buf = buf.subslice_mut(0..to_do);
let end_align = ((offset + to_do as u64) % block_size) as usize;
let trace = self.trace.load(atomic::Ordering::Relaxed);
let reads = FuturesUnordered::new();
while let Some(ItemRef {
key:
ObjectKey {
object_id,
data: ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
},
value: ObjectValue::Extent(extent_value),
..
}) = iter.get()
{
if *object_id != self.object_id || *attribute_id != self.attribute_id {
break;
}
if extent_key.range.start > offset {
// Zero everything up to the start of the extent.
let to_zero = min(extent_key.range.start - offset, buf.len() as u64) as usize;
for i in &mut buf.as_mut_slice()[..to_zero] {
*i = 0;
}
buf = buf.subslice_mut(to_zero..);
if buf.is_empty() {
break;
}
offset += to_zero as u64;
}
if let ExtentValue::Some { device_offset, key_id, .. } = extent_value {
let mut device_offset = device_offset + (offset - extent_key.range.start);
let to_copy = min(buf.len() - end_align, (extent_key.range.end - offset) as usize);
if to_copy > 0 {
if trace {
info!(
store_id = self.store().store_object_id(),
oid = self.object_id,
device_range = ?(device_offset..device_offset + to_copy as u64),
"R",
);
}
let (head, tail) = buf.split_at_mut(to_copy);
reads.push(self.read_and_decrypt(device_offset, offset, head, *key_id));
buf = tail;
if buf.is_empty() {
break;
}
offset += to_copy as u64;
device_offset += to_copy as u64;
}
// Deal with end alignment by reading the existing contents into an alignment
// buffer.
if offset < extent_key.range.end && end_align > 0 {
let mut align_buf = self.store().device.allocate_buffer(block_size as usize);
if trace {
info!(
store_id = self.store().store_object_id(),
oid = self.object_id,
device_range = ?(device_offset..device_offset + align_buf.len() as u64),
"RT",
);
}
self.read_and_decrypt(device_offset, offset, align_buf.as_mut(), *key_id)
.await?;
buf.as_mut_slice().copy_from_slice(&align_buf.as_slice()[..end_align]);
buf = buf.subslice_mut(0..0);
break;
}
} else if extent_key.range.end >= offset + buf.len() as u64 {
// Deleted extent covers remainder, so we're done.
break;
}
iter.advance().await?;
}
reads.try_collect().await?;
buf.as_mut_slice().fill(0);
Ok(to_do)
}
}
#[async_trait]
impl<S: AsRef<ObjectStore> + Send + Sync + 'static> WriteObjectHandle for StoreObjectHandle<S> {
async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
let offset = offset.unwrap_or(self.get_size());
let mut transaction = self.new_transaction().await?;
self.txn_write(&mut transaction, offset, buf).await?;
let new_size = self.txn_get_size(&transaction);
transaction.commit().await?;
Ok(new_size)
}
async fn truncate(&self, size: u64) -> Result<(), Error> {
let mut transaction = self.new_transaction().await?;
StoreObjectHandle::truncate(self, &mut transaction, size).await?;
transaction.commit().await?;
Ok(())
}
async fn write_timestamps(
&self,
crtime: Option<Timestamp>,
mtime: Option<Timestamp>,
) -> Result<(), Error> {
if let (None, None) = (crtime.as_ref(), mtime.as_ref()) {
return Ok(());
}
let mut transaction = self.new_transaction().await?;
StoreObjectHandle::write_timestamps(self, &mut transaction, crtime, mtime).await?;
transaction.commit().await?;
Ok(())
}
async fn flush(&self) -> Result<(), Error> {
Ok(())
}
}
/// Like object_handle::Writer, but allows custom transaction options to be set, and makes every
/// write go directly to the handle in a transaction.
pub struct DirectWriter<'a, S: AsRef<ObjectStore> + Send + Sync + 'static> {
handle: &'a StoreObjectHandle<S>,
options: transaction::Options<'a>,
buffer: Buffer<'a>,
offset: u64,
buf_offset: usize,
}
const BUFFER_SIZE: usize = 1_048_576;
impl<S: AsRef<ObjectStore> + Send + Sync + 'static> Drop for DirectWriter<'_, S> {
fn drop(&mut self) {
if self.buf_offset != 0 {
warn!("DirectWriter: dropping data, did you forget to call complete?");
}
}
}
impl<'a, S: AsRef<ObjectStore> + Send + Sync + 'static> DirectWriter<'a, S> {
pub fn new(handle: &'a StoreObjectHandle<S>, options: transaction::Options<'a>) -> Self {
Self {
handle,
options,
buffer: handle.allocate_buffer(BUFFER_SIZE),
offset: 0,
buf_offset: 0,
}
}
async fn flush(&mut self) -> Result<(), Error> {
let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
self.handle
.txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
.await?;
transaction.commit().await?;
self.offset += self.buf_offset as u64;
self.buf_offset = 0;
Ok(())
}
}
#[async_trait]
impl<'a, S: AsRef<ObjectStore> + Send + Sync + 'static> WriteBytes for DirectWriter<'a, S> {
fn handle(&self) -> &dyn WriteObjectHandle {
self.handle
}
async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
while buf.len() > 0 {
let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
self.buffer
.subslice_mut(self.buf_offset..self.buf_offset + to_do)
.as_mut_slice()
.copy_from_slice(&buf[..to_do]);
self.buf_offset += to_do;
if self.buf_offset == BUFFER_SIZE {
self.flush().await?;
}
buf = &buf[to_do..];
}
Ok(())
}
async fn complete(&mut self) -> Result<(), Error> {
self.flush().await?;
Ok(())
}
async fn skip(&mut self, amount: u64) -> Result<(), Error> {
if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
self.buffer
.subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
.as_mut_slice()
.fill(0);
self.buf_offset += amount as usize;
} else {
self.flush().await?;
self.offset += amount;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use {
crate::{
crypt::{insecure::InsecureCrypt, Crypt},
filesystem::{Filesystem, FxFilesystem, JournalingObject, OpenFxFilesystem},
lsm_tree::types::{ItemRef, LayerIterator},
object_handle::{
GetProperties, ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle,
},
object_store::{
extent_record::ExtentValue,
object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue, Timestamp},
transaction::{Options, TransactionHandler},
HandleOptions, ObjectStore, StoreObjectHandle,
},
round::{round_down, round_up},
},
assert_matches::assert_matches,
fuchsia_async as fasync,
futures::{channel::oneshot::channel, join},
rand::Rng,
std::{
ops::{Bound, Range},
sync::{Arc, Mutex},
time::Duration,
},
storage_device::{fake_device::FakeDevice, DeviceHolder},
};
const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
// Some tests (the preallocate_range ones) currently assume that the data only occupies a single
// device block.
const TEST_DATA_OFFSET: u64 = 5000;
const TEST_DATA: &[u8] = b"hello";
const TEST_OBJECT_SIZE: u64 = 5678;
const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
async fn test_filesystem() -> OpenFxFilesystem {
let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
FxFilesystem::new_empty(device).await.expect("new_empty failed")
}
async fn test_filesystem_and_object_with_key(
crypt: Option<&dyn Crypt>,
) -> (OpenFxFilesystem, StoreObjectHandle<ObjectStore>) {
let fs = test_filesystem().await;
let store = fs.root_store();
let object;
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
object =
ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), crypt)
.await
.expect("create_object failed");
{
let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
let mut buf = object.allocate_buffer(align + TEST_DATA.len());
buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
object
.txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
.await
.expect("write failed");
}
object.truncate(&mut transaction, TEST_OBJECT_SIZE).await.expect("truncate failed");
transaction.commit().await.expect("commit failed");
(fs, object)
}
async fn test_filesystem_and_object() -> (OpenFxFilesystem, StoreObjectHandle<ObjectStore>) {
test_filesystem_and_object_with_key(Some(&InsecureCrypt::new())).await
}
#[fasync::run_singlethreaded(test)]
async fn test_zero_buf_len_read() {
let (fs, object) = test_filesystem_and_object().await;
let mut buf = object.allocate_buffer(0);
assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_beyond_eof_read() {
let (fs, object) = test_filesystem_and_object().await;
let offset = TEST_OBJECT_SIZE as usize - 2;
let align = offset % fs.block_size() as usize;
let len: usize = 2;
let mut buf = object.allocate_buffer(align + len + 1);
buf.as_mut_slice().fill(123u8);
assert_eq!(
object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
align + len
);
assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_read_sparse() {
let (fs, object) = test_filesystem_and_object().await;
// Deliberately read not right to eof.
let len = TEST_OBJECT_SIZE as usize - 1;
let mut buf = object.allocate_buffer(len);
buf.as_mut_slice().fill(123u8);
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
let mut expected = vec![0; len];
let offset = TEST_DATA_OFFSET as usize;
expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
assert_eq!(buf.as_slice()[..len], expected[..]);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_read_after_writes_interspersed_with_flush() {
let (fs, object) = test_filesystem_and_object().await;
object.owner().flush().await.expect("flush failed");
// Write more test data to the first block fo the file.
let mut buf = object.allocate_buffer(TEST_DATA.len());
buf.as_mut_slice().copy_from_slice(TEST_DATA);
object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
let len = TEST_OBJECT_SIZE as usize - 1;
let mut buf = object.allocate_buffer(len);
buf.as_mut_slice().fill(123u8);
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
let mut expected = vec![0u8; len];
let offset = TEST_DATA_OFFSET as usize;
expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
assert_eq!(buf.as_slice(), &expected);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_read_after_truncate_and_extend() {
let (fs, object) = test_filesystem_and_object().await;
// Arrange for there to be <extent><deleted-extent><extent>.
let mut buf = object.allocate_buffer(TEST_DATA.len());
buf.as_mut_slice().copy_from_slice(TEST_DATA);
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed"); // This adds an extent at 0..512.
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
object.truncate(&mut transaction, 3).await.expect("truncate failed"); // This deletes 512..1024.
transaction.commit().await.expect("commit failed");
let data = b"foo";
let offset = 1500u64;
let align = (offset % fs.block_size() as u64) as usize;
let mut buf = object.allocate_buffer(align + data.len());
buf.as_mut_slice()[align..].copy_from_slice(data);
object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed"); // This adds 1024..1536.
const LEN1: usize = 1503;
let mut buf = object.allocate_buffer(LEN1);
buf.as_mut_slice().fill(123u8);
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
let mut expected = [0; LEN1];
expected[..3].copy_from_slice(&TEST_DATA[..3]);
expected[1500..].copy_from_slice(b"foo");
assert_eq!(buf.as_slice(), &expected);
// Also test a read that ends midway through the deleted extent.
const LEN2: usize = 601;
let mut buf = object.allocate_buffer(LEN2);
buf.as_mut_slice().fill(123u8);
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
assert_eq!(buf.as_slice(), &expected[..LEN2]);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_read_whole_blocks_with_multiple_objects() {
let (fs, object) = test_filesystem_and_object().await;
let bs = object.block_size() as usize;
let mut buffer = object.allocate_buffer(bs);
buffer.as_mut_slice().fill(0xaf);
object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
let store = object.owner();
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let object2 =
ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
.await
.expect("create_object failed");
transaction.commit().await.expect("commit failed");
let mut ef_buffer = object.allocate_buffer(bs);
ef_buffer.as_mut_slice().fill(0xef);
object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
let mut buffer = object.allocate_buffer(bs);
buffer.as_mut_slice().fill(0xaf);
object.write_or_append(Some(bs as u64), buffer.as_ref()).await.expect("write failed");
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object.truncate(&mut transaction, 3 * bs as u64).await.expect("truncate failed");
transaction.commit().await.expect("commit failed");
object2.write_or_append(Some(bs as u64), ef_buffer.as_ref()).await.expect("write failed");
let mut buffer = object.allocate_buffer(4 * bs);
buffer.as_mut_slice().fill(123);
assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * bs);
assert_eq!(&buffer.as_slice()[..2 * bs], &vec![0xaf; 2 * bs]);
assert_eq!(&buffer.as_slice()[2 * bs..3 * bs], &vec![0; bs]);
assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * bs);
assert_eq!(&buffer.as_slice()[..2 * bs], &vec![0xef; 2 * bs]);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_alignment() {
let (fs, object) = test_filesystem_and_object().await;
struct AlignTest {
fill: u8,
object: StoreObjectHandle<ObjectStore>,
mirror: Vec<u8>,
}
impl AlignTest {
async fn new(object: StoreObjectHandle<ObjectStore>) -> Self {
let mirror = {
let mut buf = object.allocate_buffer(object.get_size() as usize);
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
buf.as_slice().to_vec()
};
Self { fill: 0, object, mirror }
}
// Fills |range| of self.object with a byte value (self.fill) and mirrors the same
// operation to an in-memory copy of the object.
// Each subsequent call bumps the value of fill.
// It is expected that the object and its mirror maintain identical content.
async fn test(&mut self, range: Range<u64>) {
let mut buf = self.object.allocate_buffer((range.end - range.start) as usize);
self.fill += 1;
buf.as_mut_slice().fill(self.fill);
self.object
.write_or_append(Some(range.start), buf.as_ref())
.await
.expect("write_or_append failed");
if range.end > self.mirror.len() as u64 {
self.mirror.resize(range.end as usize, 0);
}
self.mirror[range.start as usize..range.end as usize].fill(self.fill);
let mut buf = self.object.allocate_buffer(self.mirror.len() + 1);
assert_eq!(
self.object.read(0, buf.as_mut()).await.expect("read failed"),
self.mirror.len()
);
assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
}
}
let block_size = object.block_size() as u64;
let mut align = AlignTest::new(object).await;
// Fill the object to start with (with 1).
align.test(0..2 * block_size + 1).await;
// Unaligned head (fills with 2, overwrites that with 3).
align.test(1..block_size).await;
align.test(1..2 * block_size).await;
// Unaligned tail (fills with 4 and 5).
align.test(0..block_size - 1).await;
align.test(0..2 * block_size - 1).await;
// Both unaligned (fills with 6 and 7).
align.test(1..block_size - 1).await;
align.test(1..2 * block_size - 1).await;
fs.close().await.expect("Close failed");
}
async fn test_preallocate_common(fs: &FxFilesystem, object: StoreObjectHandle<ObjectStore>) {
let allocator = fs.allocator();
let allocated_before = allocator.get_allocated_bytes();
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object
.preallocate_range(&mut transaction, 0..fs.block_size() as u64)
.await
.expect("preallocate_range failed");
transaction.commit().await.expect("commit failed");
assert!(object.get_size() < 1048576);
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object
.preallocate_range(&mut transaction, 0..1048576)
.await
.expect("preallocate_range failed");
transaction.commit().await.expect("commit failed");
assert_eq!(object.get_size(), 1048576);
// Check that it didn't reallocate the space for the existing extent
let allocated_after = allocator.get_allocated_bytes();
assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
let mut buf =
object.allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize);
buf.as_mut_slice().fill(47);
object
.write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
.await
.expect("write failed");
buf.as_mut_slice().fill(95);
let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
object.overwrite(offset, buf.as_mut()).await.expect("write failed");
// Make sure there were no more allocations.
assert_eq!(allocator.get_allocated_bytes(), allocated_after);
// Read back the data and make sure it is what we expect.
let mut buf = object.allocate_buffer(104876);
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
assert_eq!(
&buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
TEST_DATA
);
assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
}
#[fasync::run_singlethreaded(test)]
async fn test_preallocate_range() {
let (fs, object) = test_filesystem_and_object_with_key(None).await;
test_preallocate_common(&fs, object).await;
fs.close().await.expect("Close failed");
}
// This is identical to the previous test except that we flush so that extents end up in
// different layers.
#[fasync::run_singlethreaded(test)]
async fn test_preallocate_suceeds_when_extents_are_in_different_layers() {
let (fs, object) = test_filesystem_and_object_with_key(None).await;
object.owner().flush().await.expect("flush failed");
test_preallocate_common(&fs, object).await;
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_already_preallocated() {
let (fs, object) = test_filesystem_and_object_with_key(None).await;
let allocator = fs.allocator();
let allocated_before = allocator.get_allocated_bytes();
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
object
.preallocate_range(&mut transaction, offset..offset + fs.block_size() as u64)
.await
.expect("preallocate_range failed");
transaction.commit().await.expect("commit failed");
// Check that it didn't reallocate any new space.
assert_eq!(allocator.get_allocated_bytes(), allocated_before);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_overwrite_fails_if_not_preallocated() {
let (fs, object) = test_filesystem_and_object().await;
let object = ObjectStore::open_object(
&object.owner,
object.object_id(),
HandleOptions::default(),
Some(&InsecureCrypt::new()),
)
.await
.expect("open_object failed");
let mut buf = object.allocate_buffer(2048);
buf.as_mut_slice().fill(95);
let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
object.overwrite(offset, buf.as_mut()).await.expect_err("write succeeded");
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_extend() {
let fs = test_filesystem().await;
let handle;
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let store = fs.root_store();
handle =
ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
.await
.expect("create_object failed");
handle
.extend(&mut transaction, 0..5 * fs.block_size() as u64)
.await
.expect("extend failed");
transaction.commit().await.expect("commit failed");
let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize);
buf.as_mut_slice().fill(123);
handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
buf.as_mut_slice().fill(67);
handle.read(0, buf.as_mut()).await.expect("read failed");
assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_truncate_deallocates_old_extents() {
let (fs, object) = test_filesystem_and_object().await;
let mut buf = object.allocate_buffer(5 * fs.block_size() as usize);
buf.as_mut_slice().fill(0xaa);
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
let allocator = fs.allocator();
let allocated_before = allocator.get_allocated_bytes();
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
object.truncate(&mut transaction, fs.block_size() as u64).await.expect("truncate failed");
transaction.commit().await.expect("commit failed");
let allocated_after = allocator.get_allocated_bytes();
assert!(
allocated_after < allocated_before,
"before = {} after = {}",
allocated_before,
allocated_after
);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_adjust_refs() {
let (fs, object) = test_filesystem_and_object().await;
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let store = object.owner();
assert_eq!(
store
.adjust_refs(&mut transaction, object.object_id(), 1)
.await
.expect("adjust_refs failed"),
false
);
transaction.commit().await.expect("commit failed");
let allocator = fs.allocator();
let allocated_before = allocator.get_allocated_bytes();
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
assert_eq!(
store
.adjust_refs(&mut transaction, object.object_id(), -2)
.await
.expect("adjust_refs failed"),
true
);
transaction.commit().await.expect("commit failed");
assert_eq!(allocator.get_allocated_bytes(), allocated_before);
store
.tombstone(
object.object_id,
Options { borrow_metadata_space: true, ..Default::default() },
)
.await
.expect("purge failed");
assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64,);
let layer_set = store.tree.layer_set();
let mut merger = layer_set.merger();
let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed");
let mut found_tombstone = false;
let mut found_deleted_extent = false;
while let Some(ItemRef { key: ObjectKey { object_id, data }, value, .. }) = iter.get() {
if *object_id == object.object_id() {
match (data, value) {
// Tombstone entry
(ObjectKeyData::Object, ObjectValue::None) => {
assert!(!found_tombstone);
found_tombstone = true;
}
// Deleted extent entry
(
ObjectKeyData::Attribute(0, AttributeKey::Extent(_)),
ObjectValue::Extent(ExtentValue::None),
) => {
assert!(!found_deleted_extent);
found_deleted_extent = true;
}
// We don't expect anything else.
_ => assert!(false, "Unexpected item {:?}", iter.get()),
}
}
iter.advance().await.expect("advance failed");
}
assert!(found_tombstone);
assert!(found_deleted_extent);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_locks() {
let (fs, object) = test_filesystem_and_object().await;
let (send1, recv1) = channel();
let (send2, recv2) = channel();
let (send3, recv3) = channel();
let done = Mutex::new(false);
join!(
async {
let mut t = object.new_transaction().await.expect("new_transaction failed");
send1.send(()).unwrap(); // Tell the next future to continue.
send3.send(()).unwrap(); // Tell the last future to continue.
recv2.await.unwrap();
let mut buf = object.allocate_buffer(5);
buf.as_mut_slice().copy_from_slice(b"hello");
object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
// This is a halting problem so all we can do is sleep.
fasync::Timer::new(Duration::from_millis(100)).await;
assert!(!*done.lock().unwrap());
t.commit().await.expect("commit failed");
},
async {
recv1.await.unwrap();
// Reads should not block.
let offset = TEST_DATA_OFFSET as usize;
let align = offset % fs.block_size() as usize;
let len = TEST_DATA.len();
let mut buf = object.allocate_buffer(align + len);
assert_eq!(
object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
align + TEST_DATA.len()
);
assert_eq!(&buf.as_slice()[align..], TEST_DATA);
// Tell the first future to continue.
send2.send(()).unwrap();
},
async {
// This should block until the first future has completed.
recv3.await.unwrap();
let _t = object.new_transaction().await.expect("new_transaction failed");
let mut buf = object.allocate_buffer(5);
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
assert_eq!(buf.as_slice(), b"hello");
}
);
fs.close().await.expect("Close failed");
}
#[fasync::run(10, test)]
async fn test_racy_reads() {
let fs = test_filesystem().await;
let object;
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let store = fs.root_store();
object = Arc::new(
ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
.await
.expect("create_object failed"),
);
transaction.commit().await.expect("commit failed");
for _ in 0..100 {
let cloned_object = object.clone();
let writer = fasync::Task::spawn(async move {
let mut buf = cloned_object.allocate_buffer(10);
buf.as_mut_slice().fill(123);
cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
});
let cloned_object = object.clone();
let reader = fasync::Task::spawn(async move {
let wait_time = rand::thread_rng().gen_range(0..5);
fasync::Timer::new(Duration::from_millis(wait_time)).await;
let mut buf = cloned_object.allocate_buffer(10);
buf.as_mut_slice().fill(23);
let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
// If we succeed in reading data, it must include the write; i.e. if we see the size
// change, we should see the data too. For this to succeed it requires locking on
// the read size to ensure that when we read the size, we get the extents changed in
// that same transaction.
if amount != 0 {
assert_eq!(amount, 10);
assert_eq!(buf.as_slice(), &[123; 10]);
}
});
writer.await;
reader.await;
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object.truncate(&mut transaction, 0).await.expect("truncate failed");
transaction.commit().await.expect("commit failed");
}
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_allocated_size() {
let (fs, object) = test_filesystem_and_object_with_key(None).await;
let before = object.get_properties().await.expect("get_properties failed").allocated_size;
let mut buf = object.allocate_buffer(5);
buf.as_mut_slice().copy_from_slice(b"hello");
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
let after = object.get_properties().await.expect("get_properties failed").allocated_size;
assert_eq!(after, before + fs.block_size() as u64);
// Do the same write again and there should be no change.
object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
assert_eq!(
object.get_properties().await.expect("get_properties failed").allocated_size,
after
);
// extend...
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
let offset = 1000 * fs.block_size() as u64;
let before = after;
object
.extend(&mut transaction, offset..offset + fs.block_size() as u64)
.await
.expect("extend failed");
transaction.commit().await.expect("commit failed");
let after = object.get_properties().await.expect("get_properties failed").allocated_size;
assert_eq!(after, before + fs.block_size() as u64);
// truncate...
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
let before = after;
let size = object.get_size();
object
.truncate(&mut transaction, size - fs.block_size() as u64)
.await
.expect("extend failed");
transaction.commit().await.expect("commit failed");
let after = object.get_properties().await.expect("get_properties failed").allocated_size;
assert_eq!(after, before - fs.block_size() as u64);
// preallocate_range...
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
let before = after;
object
.preallocate_range(&mut transaction, offset..offset + fs.block_size() as u64)
.await
.expect("extend failed");
transaction.commit().await.expect("commit failed");
let after = object.get_properties().await.expect("get_properties failed").allocated_size;
assert_eq!(after, before + fs.block_size() as u64);
fs.close().await.expect("Close failed");
}
#[fasync::run(10, test)]
async fn test_zero() {
let (fs, object) = test_filesystem_and_object().await;
let expected_size = object.get_size();
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
transaction.commit().await.expect("commit failed");
assert_eq!(object.get_size(), expected_size);
let mut buf = object.allocate_buffer(fs.block_size() as usize * 10);
assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
assert_eq!(
&buf.as_slice()[0..expected_size as usize],
vec![0u8; expected_size as usize].as_slice()
);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_properties() {
let (fs, object) = test_filesystem_and_object().await;
const CRTIME: Timestamp = Timestamp::from_nanos(1234);
const MTIME: Timestamp = Timestamp::from_nanos(5678);
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object
.write_timestamps(&mut transaction, Some(CRTIME), Some(MTIME))
.await
.expect("update_timestamps failed");
transaction.commit().await.expect("commit failed");
let properties = object.get_properties().await.expect("get_properties failed");
assert_matches!(
properties,
ObjectProperties {
refs: 1u64,
allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
data_attribute_size: TEST_OBJECT_SIZE,
creation_time: CRTIME,
modification_time: MTIME,
..
}
);
fs.close().await.expect("Close failed");
}
#[fasync::run_singlethreaded(test)]
async fn test_is_allocated() {
let (fs, object) = test_filesystem_and_object().await;
// `test_filesystem_and_object()` wrote the buffer `TEST_DATA` to the device at offset
// `TEST_DATA_OFFSET` where the length and offset are aligned to the block size.
let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
// Check for the case where where we have the following extent layout
// [ unallocated ][ `TEST_DATA` ]
// The extents before `aligned_offset` should not be allocated
let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
assert_eq!(count, aligned_offset);
assert_eq!(allocated, false);
let (allocated, count) =
object.is_allocated(aligned_offset).await.expect("is_allocated failed");
assert_eq!(count, aligned_length);
assert_eq!(allocated, true);
// Check for the case where where we query out of range
let end = aligned_offset + aligned_length;
object
.is_allocated(end)
.await
.expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
// Check for the case where where we start querying for allocation starting from
// an allocated range to the end of the device
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
let size = 50 * fs.block_size() as u64;
object.truncate(&mut transaction, size).await.expect("extend failed");
transaction.commit().await.expect("commit failed");
let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
assert_eq!(count, size - end);
assert_eq!(allocated, false);
// Check for the case where where we have the following extent layout
// [ unallocated ][ `buf` ][ `buf` ]
let buf_length = 5 * fs.block_size();
let mut buf = object.allocate_buffer(buf_length as usize);
buf.as_mut_slice().fill(123);
let new_offset = end + 20 * fs.block_size() as u64;
object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
object
.write_or_append(Some(new_offset + buf_length), buf.as_ref())
.await
.expect("write failed");
let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
assert_eq!(count, new_offset - end);
assert_eq!(allocated, false);
let (allocated, count) =
object.is_allocated(new_offset).await.expect("is_allocated failed");
assert_eq!(count, 2 * buf_length);
assert_eq!(allocated, true);
// Check the case where we query from the middle of an extent
let (allocated, count) = object
.is_allocated(new_offset + 4 * fs.block_size())
.await
.expect("is_allocated failed");
assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
assert_eq!(allocated, true);
// Now, write buffer to a location already written to.
// Check for the case when we the following extent layout
// [ unallocated ][ `other_buf` ][ (part of) `buf` ][ `buf` ]
let other_buf_length = 3 * fs.block_size();
let mut other_buf = object.allocate_buffer(other_buf_length as usize);
other_buf.as_mut_slice().fill(231);
object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
// We still expect that `is_allocated(..)` will return that there are 2*`buf_length bytes`
// allocated from `new_offset`
let (allocated, count) =
object.is_allocated(new_offset).await.expect("is_allocated failed");
assert_eq!(count, 2 * buf_length);
assert_eq!(allocated, true);
// Check for the case when we the following extent layout
// [ unallocated ][ deleted ][ unallocated ][ deleted ][ allocated ]
// Mark TEST_DATA as deleted
let mut transaction = object.new_transaction().await.expect("new_transaction failed");
object
.zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
.await
.expect("zero failed");
// Mark `other_buf` as deleted
object
.zero(&mut transaction, new_offset..new_offset + buf_length)
.await
.expect("zero failed");
transaction.commit().await.expect("commit transaction failed");
let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
assert_eq!(count, new_offset + buf_length);
assert_eq!(allocated, false);
let (allocated, count) =
object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
assert_eq!(count, buf_length);
assert_eq!(allocated, true);
let new_end = new_offset + buf_length + count;
// Check for the case where there are objects with different keys.
// Case that we're checking for:
// [ unallocated ][ extent (object with different key) ][ unallocated ]
let store = object.owner();
let mut transaction = fs
.clone()
.new_transaction(&[], Options::default())
.await
.expect("new_transaction failed");
let object2 =
ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
.await
.expect("create_object failed");
transaction.commit().await.expect("commit failed");
object2
.write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
.await
.expect("write failed");
// Expecting that the extent with a different key is treated like unallocated extent
let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
assert_eq!(count, size - new_end);
assert_eq!(allocated, false);
fs.close().await.expect("close failed");
}
}