blob: a11840d06f270fd8fd172f8784d37f42e3b0eb07 [file] [log] [blame]
mod merge;
#[cfg(test)]
mod tests;
use {
crate::{
lsm_tree::{
skip_list_layer::SkipListLayer, Item, ItemRef, LSMTree, MutableLayer, OrdLowerBound,
},
object_handle::{ObjectHandle, ObjectHandleCursor},
object_store::{
log::{Log, Mutation, Transaction},
HandleOptions, ObjectStore,
},
},
anyhow::Error,
bincode::{deserialize_from, serialize_into},
merge::merge,
serde::{Deserialize, Serialize},
std::{
cmp::min,
sync::{
atomic::{AtomicU64, Ordering::Relaxed},
Arc, Mutex, Weak,
},
},
};
pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
/*
pub struct AllocatorReservation<'allocator> {
allocator: &'allocator dyn Allocator,
item: AllocatorItem,
}
*/
pub trait Allocator: Send + Sync {
fn init(&self, store: &Arc<ObjectStore>) -> Result<(), Error>;
fn object_id(&self) -> u64;
// Returns device ranges.
fn allocate(
&self,
object_id: u64,
attribute_id: u64,
object_range: std::ops::Range<u64>,
) -> Result<std::ops::Range<u64>, Error>;
fn deallocate(
&self,
transaction: &mut Transaction,
object_id: u64,
attribute_id: u64,
device_range: std::ops::Range<u64>,
file_offset: u64,
);
// Called by log.
fn set_next_block(&self, block: u64);
fn flush(&self, force: bool) -> Result<(), Error>;
fn commit_allocation(
&self,
object_id: u64,
attribute_id: u64,
device_range: std::ops::Range<u64>,
file_offset: u64,
);
fn commit_deallocation(&self, item: AllocatorItem);
fn open(&self, store: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error>;
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct AllocatorKey {
device_range: std::ops::Range<u64>,
object_id: u64,
attribute_id: u64,
file_offset: u64,
}
impl AllocatorKey {
fn lower_bound(&self) -> AllocatorKey {
AllocatorKey {
device_range: 0..self.device_range.start + 1,
object_id: self.object_id,
attribute_id: self.attribute_id,
file_offset: self.file_offset,
}
}
}
impl Ord for AllocatorKey {
fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
self.device_range
.end
.cmp(&other.device_range.end)
.then(self.device_range.start.cmp(&other.device_range.start))
.then(self.object_id.cmp(&other.object_id))
.then(self.attribute_id.cmp(&other.attribute_id))
.then(self.file_offset.cmp(&other.file_offset))
}
}
impl PartialOrd for AllocatorKey {
fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl OrdLowerBound for AllocatorKey {
fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
self.device_range
.start
.cmp(&other.device_range.start)
.then(self.device_range.end.cmp(&other.device_range.end))
.then(self.object_id.cmp(&other.object_id))
.then(self.attribute_id.cmp(&other.attribute_id))
.then(self.file_offset.cmp(&other.file_offset))
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum AllocatorValue {
Insert,
Delete,
}
#[derive(Deserialize, Serialize)]
struct AllocatorInfo {
layers: Vec<u64>,
}
pub struct SimpleAllocator {
log: Weak<Log>,
object_id: AtomicU64,
tree: LSMTree<AllocatorKey, AllocatorValue>,
reserved_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
inner: Mutex<Inner>,
}
struct Inner {
store: Option<Arc<ObjectStore>>,
info: Option<AllocatorInfo>,
next_block: u64,
// reserved_deallocations: RefCell<Rc<SkipListLayer<AllocatorKey, AllocatorValue>>>,
}
impl SimpleAllocator {
pub fn new(log: &Arc<Log>) -> SimpleAllocator {
SimpleAllocator {
log: Arc::downgrade(log),
// TODO: maybe make tree optional or put all data in separate structure.
object_id: AtomicU64::new(0),
tree: LSMTree::new(merge),
reserved_allocations: Arc::new(SkipListLayer::new(1024)), // TODO: magic numbers
inner: Mutex::new(Inner {
store: None,
info: None,
next_block: 0,
// reserved_deallocations: RefCell::new(Rc::new(SkipListLayer::new(1024))),
}),
}
}
}
impl Allocator for SimpleAllocator {
fn init(&self, store: &Arc<ObjectStore>) -> Result<(), Error> {
let mut inner = self.inner.lock().unwrap();
let mut transaction = Transaction::new();
let allocator_object = store.create_object(&mut transaction, HandleOptions::default())?;
self.log.upgrade().unwrap().commit(transaction);
inner.store = Some(store.clone());
self.object_id.store(allocator_object.object_id(), Relaxed);
inner.info = Some(AllocatorInfo { layers: Vec::new() });
Ok(())
}
fn open(&self, store: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error> {
let mut inner = self.inner.lock().unwrap();
self.object_id.store(object_id, Relaxed);
let handle = store.open_object(object_id, HandleOptions::default())?;
let info: AllocatorInfo =
deserialize_from(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0))?;
let mut handles = Vec::new();
for object_id in &info.layers {
handles.push(store.open_object(*object_id, HandleOptions::default())?);
}
inner.info = Some(info);
self.tree.set_layers(handles.into_boxed_slice());
inner.store = Some(store.clone());
Ok(())
}
fn object_id(&self) -> u64 {
self.object_id.load(Relaxed)
}
// TODO: this should return a reservation object rather than just a range so that it can get cleaned up.
fn allocate(
&self,
object_id: u64,
attribute_id: u64,
object_range: std::ops::Range<u64>,
) -> Result<std::ops::Range<u64>, Error> {
let len = object_range.end - object_range.start;
let tree = &self.tree; // TODO: document which tree methods require no locks held.
let result: std::ops::Range<u64>;
{
let mut iter = tree.iter_with_layers(vec![self.reserved_allocations.clone()]);
let mut last_offset = 0;
iter.advance()?;
loop {
match iter.get() {
None => {
// TODO: Don't assume infinite device size.
result = last_offset..last_offset + len;
break;
}
Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
// println!("found {:?}", device_range);
if device_range.start > last_offset {
result = last_offset..min(last_offset + len, device_range.start);
break;
}
last_offset = device_range.end;
}
}
iter.advance()?;
}
}
// println!("alloc: {:?}", result);
// TODO: got to make reserved allocation actually reserve something.
self.reserved_allocations.insert(AllocatorItem {
key: AllocatorKey {
device_range: result.clone(),
object_id,
attribute_id,
file_offset: result.start,
},
value: AllocatorValue::Insert,
});
// TODO: roll back reservation if transaction fails.
Ok(result)
}
fn deallocate(
&self,
transaction: &mut Transaction,
object_id: u64,
attribute_id: u64,
device_range: std::ops::Range<u64>,
file_offset: u64,
) {
transaction.add(
self.object_id(),
Mutation::Deallocate(AllocatorItem {
key: AllocatorKey { device_range, object_id, attribute_id, file_offset },
value: AllocatorValue::Delete,
}),
);
}
fn set_next_block(&self, block: u64) {
// TODO Long term, this is wrong --- we need to separate reserved from committed.
let mut inner = self.inner.lock().unwrap();
if block > inner.next_block {
inner.next_block = block;
}
}
fn flush(&self, force: bool) -> Result<(), Error> {
println!("flushing allocator");
let mut inner = self.inner.lock().unwrap();
let log = self.log.upgrade().unwrap();
let mut object_sync = log.begin_object_sync(self.object_id());
if !force && !object_sync.needs_sync() {
println!("not forced");
return Ok(());
}
// TODO: what if there have been no allocations.
let mut transaction = Transaction::new();
let store = inner.store.as_ref().unwrap();
let layer_object_handle =
store.create_object(&mut transaction, HandleOptions::default())?;
let object_handle = store.open_object(self.object_id(), HandleOptions::default())?;
log.commit(transaction); // TODO: Move to encompass all of this.
let object_id = layer_object_handle.object_id();
// TODO: clean up objects if there's an error.
inner.info.as_mut().unwrap().layers.push(object_id);
println!("serializing");
// let handle = &mut inner.object_handle;
serialize_into(
ObjectHandleCursor::new(&object_handle as &dyn ObjectHandle, 0),
inner.info.as_ref().unwrap(),
)?;
self.tree.commit(layer_object_handle)?;
object_sync.done();
println!("allocator flushed");
Ok(())
}
fn commit_allocation(
&self,
object_id: u64,
attribute_id: u64,
device_range: std::ops::Range<u64>,
file_offset: u64,
) {
let item = AllocatorItem {
key: AllocatorKey { device_range, object_id, attribute_id, file_offset },
value: AllocatorValue::Insert,
};
// println!("commit_allocation {:?}", item);
self.tree.insert(item);
}
fn commit_deallocation(&self, item: AllocatorItem) {
self.reserved_allocations.erase(item.as_item_ref());
let lower_bound = item.key.lower_bound();
self.tree.replace_range(item, &lower_bound);
// TODO: must reserve deallocation until after barrier
}
}