blob: 5557b293820befbb365dd537874a4cd4cccf6c19 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
errors::FxfsError,
lsm_tree::types::ItemRef,
object_handle::{ObjectHandle, ObjectProperties},
object_store::{
record::{
ExtentKey, ExtentValue, ObjectKey, ObjectValue, Timestamp,
DEFAULT_DATA_ATTRIBUTE_ID,
},
transaction::{self, Transaction},
},
},
anyhow::{bail, Error},
async_trait::async_trait,
interval_tree::utils::RangeOps,
std::{cmp::min, ops::Range, sync::Arc},
storage_device::{
buffer::{Buffer, BufferRef, MutableBufferRef},
Device,
},
};
// To read the super-block and journal, we use a special reader since we cannot use the object-store
// reader until we've replayed the whole journal. Clients must supply the extents to be used.
pub struct Handle {
object_id: u64,
device: Arc<dyn Device>,
start_offset: u64,
extents: Vec<Range<u64>>,
size: u64,
}
impl Handle {
pub fn new(object_id: u64, device: Arc<dyn Device>) -> Self {
Self { object_id, device, start_offset: 0, extents: Vec::new(), size: 0 }
}
pub fn push_extent(&mut self, r: Range<u64>) {
self.extents.push(r);
}
pub fn try_push_extent_from_object_item(
&mut self,
item: ItemRef<'_, ObjectKey, ObjectValue>,
) -> Result<bool, Error> {
match item.into() {
Some((
object_id,
DEFAULT_DATA_ATTRIBUTE_ID,
ExtentKey { range },
ExtentValue { device_offset: Some((device_offset, _)) },
)) if object_id == self.object_id => {
if self.extents.is_empty() {
self.start_offset = range.start;
} else if range.start != self.size {
bail!(FxfsError::Inconsistent);
}
self.extents.push(*device_offset..*device_offset + range.length());
self.size = range.end;
Ok(true)
}
_ => Ok(false),
}
}
}
// TODO(csuter): This doesn't need to be ObjectHandle any more and we could integrate this into
// JournalReader.
#[async_trait]
impl ObjectHandle for Handle {
fn object_id(&self) -> u64 {
self.object_id
}
fn allocate_buffer(&self, size: usize) -> Buffer<'_> {
self.device.allocate_buffer(size)
}
fn block_size(&self) -> u32 {
self.device.block_size()
}
async fn read(&self, mut offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
assert!(offset >= self.start_offset);
let len = buf.len();
let mut buf_offset = 0;
let mut file_offset = self.start_offset;
for extent in &self.extents {
let extent_len = extent.end - extent.start;
if offset < file_offset + extent_len {
let device_offset = extent.start + offset - file_offset;
let to_read = min(extent.end - device_offset, (len - buf_offset) as u64) as usize;
assert!(buf_offset % self.device.block_size() as usize == 0);
self.device
.read(
device_offset,
buf.reborrow().subslice_mut(buf_offset..buf_offset + to_read),
)
.await?;
buf_offset += to_read;
if buf_offset == len {
break;
}
offset += to_read as u64;
}
file_offset += extent_len;
}
Ok(len)
}
async fn txn_write<'a>(
&'a self,
_transaction: &mut Transaction<'a>,
_offset: u64,
_buf: BufferRef<'_>,
) -> Result<(), Error> {
unreachable!();
}
fn get_size(&self) -> u64 {
self.size
}
async fn truncate<'a>(
&'a self,
_transaction: &mut Transaction<'a>,
_length: u64,
) -> Result<(), Error> {
unreachable!();
}
async fn preallocate_range<'a>(
&'a self,
_transaction: &mut Transaction<'a>,
_range: Range<u64>,
) -> Result<Vec<Range<u64>>, Error> {
unreachable!();
}
async fn update_timestamps<'a>(
&'a self,
_transaction: Option<&mut Transaction<'a>>,
_ctime: Option<Timestamp>,
_mtime: Option<Timestamp>,
) -> Result<(), Error> {
unreachable!();
}
async fn get_properties(&self) -> Result<ObjectProperties, Error> {
unreachable!();
}
async fn new_transaction_with_options<'a>(
&self,
_options: transaction::Options<'a>,
) -> Result<Transaction<'a>, Error> {
unreachable!();
}
}