blob: 6e37c0225aeb9e22509e1d3dba1239fa88f39073 [file] [log] [blame]
mod allocator;
mod constants;
mod directory;
pub mod filesystem;
mod log;
mod merge;
mod record;
pub use directory::Directory;
pub use filesystem::Filesystem;
pub use log::Transaction;
pub use record::ObjectType;
#[cfg(test)]
mod tests;
use {
crate::{
lsm_tree::{ItemRef, LSMTree},
object_handle::{ObjectHandle, ObjectHandleCursor},
},
allocator::Allocator,
anyhow::Error,
bincode::{deserialize_from, serialize_into},
constants::{LOWEST_SPECIAL_OBJECT_ID, RESERVED_OBJECT_ID},
log::{Log, Mutation},
record::{decode_extent, ExtentKey, ObjectItem, ObjectKey, ObjectKeyData, ObjectValue},
serde::{Deserialize, Serialize},
std::{
cmp::min,
io::{BufWriter, ErrorKind, Write},
ops::{Bound, Range},
sync::{Arc, Mutex, Weak},
},
};
#[derive(Default)]
pub struct HandleOptions {
// If true, don't COW, write to blocks that are already allocated.
pub overwrite: bool,
}
#[derive(Default)]
pub struct StoreOptions {
// TODO: This is horrible, and we should revisit this to see if it's necessary.
use_parent_to_allocate_object_ids: bool,
}
pub trait Device: Send + Sync {
fn block_size(&self) -> u64;
fn read(&self, offset: u64, buf: &mut [u8]) -> std::io::Result<()>;
fn write(&self, offset: u64, buf: &[u8]) -> std::io::Result<()>;
}
pub struct StoreObjectHandle {
store: Arc<ObjectStore>,
block_size: u64,
object_id: u64,
attribute_id: u64,
size: Mutex<u64>,
options: HandleOptions,
}
// TODO: Make async, or maybe better to make device take array of operations.
impl StoreObjectHandle {
fn write_at(&self, offset: u64, buf: &[u8], mut device_offset: u64) -> std::io::Result<()> {
// Deal with alignment.
let start_align = (offset % self.block_size) as usize;
let start_offset = offset - start_align as u64;
let remainder = if start_align > 0 {
let (head, remainder) =
buf.split_at(min(self.block_size as usize - start_align, buf.len()));
let mut align_buf = vec![0; self.block_size as usize];
self.read(start_offset, align_buf.as_mut_slice())?;
&mut align_buf[start_align..(start_align + head.len())].copy_from_slice(head);
device_offset -= start_align as u64;
self.store.device.write(device_offset, &align_buf)?;
device_offset += self.block_size;
remainder
} else {
buf
};
if remainder.len() > 0 {
let end = offset + buf.len() as u64;
let end_align = (end % self.block_size) as usize;
let (whole_blocks, tail) = remainder.split_at(remainder.len() - end_align);
self.store.device.write(device_offset, whole_blocks)?;
device_offset += whole_blocks.len() as u64;
if tail.len() > 0 {
let mut align_buf = vec![0; self.block_size as usize];
self.read(end - end_align as u64, align_buf.as_mut_slice())?;
align_buf[..tail.len()].copy_from_slice(tail);
self.store.device.write(device_offset, &align_buf)?;
}
}
Ok(())
}
fn delete_old_extents(
&self,
transaction: &mut Transaction,
key: &ExtentKey,
) -> std::io::Result<()> {
// Delete old extents. TODO: this should turn into an asynchronous task, that
// blocks flushing this object store until completed. For that, we need the log
// checkpoint. To make it work properly, we would need to replace in the mutable
// layer and free extents there synchronously, and then do the extents in the other
// layers asynchronously.
// TODO: need to check allocator checkpoint.
// We can't trigger work that might depend on state *before* the transaction because we
// might lose information if we do a flush. So, we have to either do all the lookups up
// front and queue up the changes, or we must queue up the changes in a different
// transaction via some async mechanism.
let tree = &self.store.tree;
let lower_bound = ObjectKey::extent(self.object_id, key.lower_bound());
let mut iter = tree.range_from(Bound::Included(&lower_bound)).map_err(map_to_io_error)?;
while let Some((oid, extent_key, extent_value)) = iter.get().and_then(decode_extent) {
if oid != self.object_id {
break;
}
if let Some(overlap) = key.overlap(extent_key) {
self.store.allocator.upgrade().unwrap().deallocate(
transaction,
self.object_id,
key.attribute_id,
extent_value.device_offset + overlap.start - extent_key.range.start
..extent_value.device_offset + overlap.end - extent_key.range.start,
overlap.start,
);
} else {
break;
}
iter.advance().unwrap();
}
Ok(())
}
}
pub fn map_to_io_error(error: Error) -> std::io::Error {
std::io::Error::new(ErrorKind::Other, error)
}
fn round_down(offset: u64, block_size: u64) -> u64 {
offset - offset % block_size
}
fn round_up(offset: u64, block_size: u64) -> u64 {
round_down(offset + block_size - 1, block_size)
}
impl ObjectHandle for StoreObjectHandle {
fn object_id(&self) -> u64 {
return self.object_id;
}
fn read(&self, mut offset: u64, buf: &mut [u8]) -> std::io::Result<usize> {
// println!("{:?} reading {:?} @ {:?}", self.object_id, buf.len(), offset);
// TODO: out of range offset
if buf.len() == 0 {
return Ok(0);
}
let tree = &self.store.tree;
let mut merger = tree
.range_from(Bound::Included(&ObjectKey::extent(
self.object_id,
ExtentKey::new(self.attribute_id, 0..offset + 1),
)))
.map_err(map_to_io_error)?;
let mut buf_offset = 0;
let to_do = min(buf.len() as u64, *self.size.lock().unwrap() - offset) as usize;
// TODO: Should sparse be explicit or just absent records?
loop {
match merger.get() {
Some(ItemRef {
key: ObjectKey { object_id, data: ObjectKeyData::Extent(extent_key) },
value: ObjectValue::Extent(extent_value),
}) => {
// TODO: check attribute_id
if *object_id != self.object_id {
break;
}
if extent_key.range.start > offset {
let to_zero =
min((extent_key.range.start - offset) as usize, to_do - buf_offset);
for i in 0..to_zero {
buf[buf_offset + i] = 0;
}
buf_offset += to_zero;
offset += to_zero as u64;
}
let start_align = (offset % self.block_size) as usize;
let mut device_offset = extent_value.device_offset
+ (offset - start_align as u64 - extent_key.range.start);
// Deal with starting alignment.
if start_align > 0 {
let mut align_buf = vec![0; self.block_size as usize];
self.store.device.read(device_offset, &mut align_buf)?;
let to_copy =
min(self.block_size as usize - start_align, to_do - buf_offset);
buf[buf_offset..buf_offset + to_copy]
.copy_from_slice(&mut align_buf[start_align..(start_align + to_copy)]);
buf_offset += to_copy;
if buf_offset >= to_do {
break;
}
offset += to_copy as u64;
device_offset += self.block_size;
}
let end_align = to_do % self.block_size as usize;
let to_copy = min(to_do - end_align, (extent_key.range.end - offset) as usize);
if to_copy > 0 {
self.store
.device
.read(device_offset, &mut buf[buf_offset..(buf_offset + to_copy)])?;
buf_offset += to_copy;
offset += to_copy as u64;
device_offset += to_copy as u64;
}
// Deal with end alignment.
if offset < extent_key.range.end && end_align > 0 {
let mut align_buf = vec![0; self.block_size as usize];
self.store.device.read(device_offset, &mut align_buf)?;
buf[buf_offset..to_do].copy_from_slice(&align_buf[..end_align]);
buf_offset += end_align;
break;
}
}
_ => break,
}
if buf_offset == to_do {
break;
}
merger
.advance_to(&ObjectKey::extent(
self.object_id,
ExtentKey::new(self.attribute_id, offset..std::u64::MAX),
))
.map_err(map_to_io_error)?;
}
// Zero out anything remaining.
for i in buf_offset..to_do {
buf[i] = 0;
}
Ok(to_do)
}
fn write(&self, mut offset: u64, buf: &[u8]) -> std::io::Result<()> {
if self.options.overwrite {
let mut buf_offset = 0;
let tree = &self.store.tree;
let mut merger = tree
.range_from(Bound::Included(&ObjectKey::extent(
self.object_id,
ExtentKey::new(self.attribute_id, 0..(offset + 1)),
)))
.map_err(map_to_io_error)?;
while buf_offset < buf.len() {
match merger.get() {
Some(ItemRef {
key: ObjectKey { object_id, data: ObjectKeyData::Extent(extent_key) },
value: ObjectValue::Extent(extent_value),
}) => {
if *object_id != self.object_id {
panic!("No extent!"); // TODO
}
let buf_end =
min(buf.len(), buf_offset + (extent_key.range.end - offset) as usize);
self.write_at(
offset,
&buf[buf_offset..buf_end],
extent_value.device_offset + (offset - extent_key.range.start),
)?;
buf_offset = buf_end;
offset = extent_key.range.end;
}
_ => panic!("No extent!"),
}
merger
.advance_to(&ObjectKey::extent(
self.object_id,
ExtentKey::new(self.attribute_id, offset..std::u64::MAX),
))
.map_err(map_to_io_error)?;
}
} else {
let mut aligned = round_down(offset, self.block_size)
..round_up(offset + buf.len() as u64, self.block_size);
let mut buf_offset = 0;
let mut transaction = Transaction::new(); // TODO: transaction too big?
if offset + buf.len() as u64 > *self.size.lock().unwrap() {
// TODO: need to hold locks properly
*self.size.lock().unwrap() = offset + buf.len() as u64;
transaction.add(
self.store.store_object_id,
Mutation::ReplaceOrInsert {
item: ObjectItem {
key: ObjectKey::attribute(self.object_id, 0),
value: ObjectValue::attribute(*self.size.lock().unwrap()),
},
},
);
}
self.delete_old_extents(
&mut transaction,
&ExtentKey::new(self.attribute_id, aligned.clone()),
)?;
while buf_offset < buf.len() {
let device_range = self
.store
.allocator
.upgrade()
.unwrap()
.allocate(self.object_id, 0, aligned.clone())
.map_err(map_to_io_error)?;
let extent_len = device_range.end - device_range.start;
let end = aligned.start + extent_len;
let len = min(buf.len() - buf_offset, (end - offset) as usize);
assert!(len > 0);
self.write_at(
offset,
&buf[buf_offset..buf_offset + len],
device_range.start + offset % self.block_size,
)?;
transaction.add(
self.store.store_object_id,
Mutation::ReplaceExtent {
item: ObjectItem {
key: ObjectKey::extent(
self.object_id,
ExtentKey::new(self.attribute_id, aligned.start..end),
),
value: ObjectValue::extent(device_range.start),
},
},
);
aligned.start += extent_len;
buf_offset += len;
offset += len as u64;
}
self.store.log.upgrade().unwrap().commit(transaction);
}
Ok(())
}
// Must be multiple of block size.
fn preallocate_range(
&self,
mut file_range: Range<u64>,
transaction: &mut Transaction,
) -> std::io::Result<Vec<Range<u64>>> {
// TODO: add checks on length, etc.
let mut ranges = Vec::new();
while file_range.start < file_range.end {
let device_range = self
.store
.allocator
.upgrade()
.unwrap()
.allocate(self.object_id, 0, file_range.clone())
.map_err(map_to_io_error)?;
let this_file_range =
file_range.start..file_range.start + device_range.end - device_range.start;
self.delete_old_extents(
transaction,
&ExtentKey::new(self.attribute_id, this_file_range.clone()),
)?;
file_range.start = this_file_range.end;
transaction.add(
self.store.store_object_id,
Mutation::ReplaceExtent {
item: ObjectItem {
key: ObjectKey::extent(
self.object_id,
ExtentKey::new(self.attribute_id, this_file_range),
),
value: ObjectValue::extent(device_range.start),
},
},
);
ranges.push(device_range);
}
if file_range.end > *self.size.lock().unwrap() {
*self.size.lock().unwrap() = file_range.end;
transaction.add(
self.store.store_object_id,
Mutation::ReplaceOrInsert {
item: ObjectItem {
key: ObjectKey::attribute(self.object_id, 0),
value: ObjectValue::attribute(*self.size.lock().unwrap()),
},
},
);
}
Ok(ranges)
}
fn get_size(&self) -> u64 {
*self.size.lock().unwrap()
}
}
#[derive(Clone, Default, Serialize, Deserialize)]
pub struct StoreInfo {
// The last used object ID.
last_object_id: u64,
// Object ids for layers. TODO: need a layer of indirection here so we can
// support snapshots.
layers: Vec<u64>,
}
impl StoreInfo {
fn new() -> StoreInfo {
StoreInfo { last_object_id: RESERVED_OBJECT_ID, layers: Vec::new() }
}
}
pub struct ObjectStore {
parent_store: Option<Arc<ObjectStore>>,
store_object_id: u64,
device: Arc<dyn Device>,
block_size: u64,
allocator: Weak<dyn Allocator>,
log: Weak<Log>,
options: StoreOptions,
store_info: Mutex<StoreInfo>,
tree: LSMTree<ObjectKey, ObjectValue>,
// When replaying the log, the store cannot read StoreInfo until the whole log
// has been replayed, so during that time, opened will be false and records
// just get sent to the tree. Once the log has been replayed, we can open the store
// and load all the other layer information.
opened: Mutex<bool>,
}
impl ObjectStore {
fn new(
parent_store: Option<Arc<ObjectStore>>,
store_object_id: u64,
device: Arc<dyn Device>,
allocator: &Arc<dyn Allocator>,
log: &Arc<Log>,
store_info: StoreInfo,
tree: LSMTree<ObjectKey, ObjectValue>,
options: StoreOptions,
opened: bool,
) -> Arc<ObjectStore> {
Arc::new(ObjectStore {
parent_store,
store_object_id,
device: device.clone(),
block_size: device.block_size(),
allocator: Arc::downgrade(allocator),
log: Arc::downgrade(log),
options,
store_info: Mutex::new(store_info),
tree,
opened: Mutex::new(opened),
})
}
pub fn new_empty(
parent_store: Option<Arc<ObjectStore>>,
store_object_id: u64,
device: Arc<dyn Device>,
allocator: &Arc<dyn Allocator>,
log: &Arc<Log>,
options: StoreOptions,
) -> Arc<Self> {
Self::new(
parent_store,
store_object_id,
device,
allocator,
log,
StoreInfo::new(),
LSMTree::new(merge::merge),
options,
true,
)
}
pub fn log(&self) -> Arc<Log> {
self.log.upgrade().unwrap()
}
pub fn create_child_store(
self: &Arc<ObjectStore>,
options: StoreOptions,
) -> Result<Arc<ObjectStore>, Error> {
self.ensure_open()?;
// TODO: This should probably all be in a transaction. There should probably be a log
// record to create a store.
let mut transaction = Transaction::new();
let handle = self.clone().create_object(&mut transaction, HandleOptions::default())?;
let log = self.log.upgrade().unwrap();
log.commit(transaction);
// Write a default StoreInfo file. TODO: this should be part of a bigger transaction i.e.
// this function should take transaction as an arg.
let mut writer = BufWriter::new(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0));
serialize_into(&mut writer, &StoreInfo::default())?;
writer.flush()?;
let new_store = Self::new_empty(
Some(self.clone()),
handle.object_id(),
self.device.clone(),
&self.allocator.upgrade().unwrap(),
&self.log.upgrade().unwrap(),
options,
);
log.register_store(&new_store);
Ok(new_store)
}
pub fn lazy_open_store(
self: &Arc<ObjectStore>,
store_object_id: u64,
options: StoreOptions,
) -> Arc<ObjectStore> {
Self::new(
Some(self.clone()),
store_object_id,
self.device.clone(),
&self.allocator.upgrade().unwrap(),
&self.log.upgrade().unwrap(),
StoreInfo::default(),
LSMTree::new(merge::merge),
options,
false,
)
}
// TODO: find a way to make sure this is always called at the right time. Any time we add
// mutation records to a transaction, this needs to be called prior to that.
fn ensure_open(&self) -> Result<(), Error> {
let mut opened = self.opened.lock().unwrap();
if *opened {
return Ok(());
}
let parent_store = self.parent_store.as_ref().unwrap();
let handle = parent_store.open_object(self.store_object_id, HandleOptions::default())?;
let store_info: StoreInfo =
deserialize_from(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0))?;
let mut handles = Vec::new();
for object_id in &store_info.layers {
handles.push(parent_store.open_object(*object_id, HandleOptions::default())?);
}
self.tree.set_layers(handles.into());
let mut current_store_info = self.store_info_for_last_object_id().lock().unwrap();
if store_info.last_object_id > current_store_info.last_object_id {
current_store_info.last_object_id = store_info.last_object_id
}
*opened = true;
Ok(())
}
pub fn open_store(
self: &Arc<ObjectStore>,
store_object_id: u64,
options: StoreOptions,
) -> Result<Arc<ObjectStore>, Error> {
let store = self.lazy_open_store(store_object_id, options);
store.ensure_open()?;
Ok(store)
}
pub fn store_object_id(&self) -> u64 {
self.store_object_id
}
pub fn open_object(
self: &Arc<Self>,
object_id: u64,
options: HandleOptions,
) -> std::io::Result<StoreObjectHandle> {
self.ensure_open().map_err(map_to_io_error)?;
let item = self
.tree
.find(&ObjectKey::attribute(object_id, 0))
.map_err(map_to_io_error)?
.ok_or(std::io::Error::new(ErrorKind::NotFound, "Not found"))?;
if let ObjectValue::Attribute { size } = item.value {
Ok(StoreObjectHandle {
store: self.clone(),
object_id: object_id,
attribute_id: 0,
block_size: self.block_size,
size: Mutex::new(size),
options,
})
} else {
Err(std::io::Error::new(ErrorKind::InvalidData, "Expected attribute value"))
}
}
pub fn create_object_with_id(
self: &Arc<Self>,
transaction: &mut Transaction,
object_id: u64,
options: HandleOptions,
) -> std::io::Result<StoreObjectHandle> {
self.ensure_open().map_err(map_to_io_error)?;
transaction.add(
self.store_object_id,
Mutation::Insert {
item: ObjectItem {
key: ObjectKey::object(object_id),
value: ObjectValue::object(ObjectType::File),
},
},
);
transaction.add(
self.store_object_id,
Mutation::Insert {
item: ObjectItem {
key: ObjectKey::attribute(object_id, 0),
value: ObjectValue::attribute(0),
},
},
);
Ok(StoreObjectHandle {
store: self.clone(),
block_size: self.block_size,
object_id,
attribute_id: 0,
size: Mutex::new(0),
options,
})
}
pub fn create_directory(self: &Arc<Self>) -> std::io::Result<directory::Directory> {
self.ensure_open().map_err(map_to_io_error)?;
let object_id = self.get_next_object_id();
let mut transaction = Transaction::new();
transaction.add(
self.store_object_id,
Mutation::Insert {
item: ObjectItem {
key: ObjectKey::object(object_id),
value: ObjectValue::object(ObjectType::Directory),
},
},
);
self.log.upgrade().unwrap().commit(transaction);
Ok(directory::Directory::new(self.clone(), object_id))
}
pub fn open_directory(
self: &Arc<Self>,
object_id: u64,
) -> std::io::Result<directory::Directory> {
let item = self
.tree
.find(&ObjectKey::object(object_id))
.map_err(map_to_io_error)?
.ok_or(std::io::Error::new(ErrorKind::NotFound, "Not found"))?;
if let ObjectValue::Object { object_type: ObjectType::Directory } = item.value {
Ok(directory::Directory::new(self.clone(), object_id))
} else {
Err(std::io::Error::new(ErrorKind::InvalidData, "Expected directory"))
}
}
fn store_info_for_last_object_id(&self) -> &Mutex<StoreInfo> {
if self.options.use_parent_to_allocate_object_ids {
&self.parent_store.as_ref().unwrap().store_info
} else {
&self.store_info
}
}
fn get_next_object_id(&self) -> u64 {
let mut store_info = self.store_info_for_last_object_id().lock().unwrap();
store_info.last_object_id += 1;
store_info.last_object_id
}
pub fn create_object(
self: &Arc<Self>,
mut transaction: &mut Transaction,
options: HandleOptions,
) -> std::io::Result<StoreObjectHandle> {
let object_id = self.get_next_object_id();
self.create_object_with_id(&mut transaction, object_id, options)
}
pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
&self.tree
}
// Push all in-memory structures to the device. This is not necessary for sync since the log
// will take care of it. This will panic if called on the root parent store.
pub fn flush(&self, force: bool) -> Result<(), Error> {
self.ensure_open()?;
let log = self.log();
let mut object_sync = log.begin_object_sync(self.store_object_id);
if !force && !object_sync.needs_sync() {
return Ok(());
}
let parent_store = self.parent_store.as_ref().unwrap();
let mut transaction = Transaction::new();
let object_handle =
parent_store.clone().create_object(&mut transaction, HandleOptions::default())?;
self.log.upgrade().unwrap().commit(transaction); // This needs to encompass all the following.
let object_id = object_handle.object_id();
let handle =
parent_store.clone().open_object(self.store_object_id, HandleOptions::default())?;
self.tree.commit(object_handle)?;
let mut store_info = self.store_info.lock().unwrap();
// TODO: get layers from tree.
store_info.layers = vec![object_id];
if self.options.use_parent_to_allocate_object_ids {
store_info.last_object_id =
self.parent_store.as_ref().unwrap().store_info.lock().unwrap().last_object_id;
}
// TODO: truncate file in same transaction.
let mut writer = BufWriter::new(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0));
serialize_into(&mut writer, &*store_info)?;
writer.flush()?;
object_sync.done();
Ok(())
}
// -- Methods only to be called by Log --
pub fn insert(&self, item: ObjectItem) {
let store_info = self.store_info_for_last_object_id();
if item.key.object_id < LOWEST_SPECIAL_OBJECT_ID
&& item.key.object_id > store_info.lock().unwrap().last_object_id
{
store_info.lock().unwrap().last_object_id = item.key.object_id;
}
self.tree.insert(item);
}
pub fn replace_extent(&self, item: ObjectItem) {
let lower_bound = item.key.lower_bound();
self.tree.replace_range(item, &lower_bound);
}
pub fn replace_or_insert(&self, item: ObjectItem) {
self.tree.replace_or_insert(item);
}
}
// TODO: validation of all deserialized structs.