Fix alignment bug and retain cycle
Change-Id: I1b36ce3a0f09817e8b8d57e3bc38b7d425f297d3
diff --git a/src/storage/fxfs/src/object_store.rs b/src/storage/fxfs/src/object_store.rs
index dec9309..e34e0d6 100644
--- a/src/storage/fxfs/src/object_store.rs
+++ b/src/storage/fxfs/src/object_store.rs
@@ -29,7 +29,7 @@
cmp::min,
io::{BufWriter, ErrorKind, Write},
ops::{Bound, Range},
- sync::{Arc, Mutex},
+ sync::{Arc, Mutex, Weak},
},
};
@@ -71,6 +71,7 @@
let mut align_buf = vec![0; self.block_size as usize];
self.read(start_offset, align_buf.as_mut_slice())?;
&mut align_buf[start_align..(start_align + head.len())].copy_from_slice(head);
+ device_offset -= start_align as u64;
self.store.device.write(device_offset, &align_buf)?;
device_offset += self.block_size;
remainder
@@ -117,7 +118,7 @@
break;
}
if let Some(overlap) = key.overlap(extent_key) {
- self.store.allocator.deallocate(
+ self.store.allocator.upgrade().unwrap().deallocate(
transaction,
self.object_id,
key.attribute_id,
@@ -306,13 +307,19 @@
let device_range = self
.store
.allocator
+ .upgrade()
+ .unwrap()
.allocate(self.object_id, 0, aligned.clone())
.map_err(map_to_io_error)?;
let extent_len = device_range.end - device_range.start;
let end = aligned.start + extent_len;
- let len = min(buf.len() - buf_offset, (aligned.end - offset) as usize);
+ let len = min(buf.len() - buf_offset, (end - offset) as usize);
assert!(len > 0);
- self.write_at(offset, &buf[buf_offset..buf_offset + len], device_range.start)?;
+ self.write_at(
+ offset,
+ &buf[buf_offset..buf_offset + len],
+ device_range.start + offset % self.block_size,
+ )?;
transaction.add(
self.store.store_object_id,
Mutation::ReplaceExtent {
@@ -329,7 +336,7 @@
buf_offset += len;
offset += len as u64;
}
- self.store.log.commit(transaction);
+ self.store.log.upgrade().unwrap().commit(transaction);
}
Ok(())
}
@@ -346,6 +353,8 @@
let device_range = self
.store
.allocator
+ .upgrade()
+ .unwrap()
.allocate(self.object_id, 0, file_range.clone())
.map_err(map_to_io_error)?;
let this_file_range =
@@ -410,8 +419,8 @@
store_object_id: u64,
device: Arc<dyn Device>,
block_size: u64,
- allocator: Arc<dyn Allocator>,
- log: Arc<Log>,
+ allocator: Weak<dyn Allocator>,
+ log: Weak<Log>,
options: StoreOptions,
store_info: Mutex<StoreInfo>,
tree: LSMTree<ObjectKey, ObjectValue>,
@@ -422,8 +431,8 @@
parent_store: Option<Arc<ObjectStore>>,
store_object_id: u64,
device: Arc<dyn Device>,
- allocator: Arc<dyn Allocator>,
- log: Arc<Log>,
+ allocator: &Arc<dyn Allocator>,
+ log: &Arc<Log>,
store_info: StoreInfo,
tree: LSMTree<ObjectKey, ObjectValue>,
options: StoreOptions,
@@ -433,8 +442,8 @@
store_object_id,
device: device.clone(),
block_size: device.block_size(),
- allocator,
- log,
+ allocator: Arc::downgrade(allocator),
+ log: Arc::downgrade(log),
options,
store_info: Mutex::new(store_info),
tree,
@@ -445,8 +454,8 @@
parent_store: Option<Arc<ObjectStore>>,
store_object_id: u64,
device: Arc<dyn Device>,
- allocator: Arc<dyn Allocator>,
- log: Arc<Log>,
+ allocator: &Arc<dyn Allocator>,
+ log: &Arc<Log>,
options: StoreOptions,
) -> Arc<Self> {
Self::new(
@@ -461,8 +470,8 @@
)
}
- pub fn log(&self) -> &Log {
- &self.log
+ pub fn log(&self) -> Arc<Log> {
+ self.log.upgrade().unwrap()
}
pub fn create_child_store(
@@ -474,13 +483,13 @@
let mut transaction = Transaction::new();
let handle =
parent_store.clone().create_object(&mut transaction, HandleOptions::default())?;
- parent_store.log.commit(transaction);
+ parent_store.log.upgrade().unwrap().commit(transaction);
Ok(Self::new_empty(
Some(parent_store.clone()),
handle.object_id(),
parent_store.device.clone(),
- parent_store.allocator.clone(),
- parent_store.log.clone(),
+ &parent_store.allocator.upgrade().unwrap(),
+ &parent_store.log.upgrade().unwrap(),
options,
))
}
@@ -509,8 +518,8 @@
Some(self.clone()),
store_object_id,
self.device.clone(),
- self.allocator.clone(),
- self.log.clone(),
+ &self.allocator.upgrade().unwrap(),
+ &self.log.upgrade().unwrap(),
store_info,
LSMTree::open(merge::merge, handles.into_boxed_slice()),
StoreOptions::default(),
@@ -591,7 +600,7 @@
},
},
);
- self.log.commit(transaction);
+ self.log.upgrade().unwrap().commit(transaction);
Ok(directory::Directory::new(self.clone(), object_id))
}
@@ -641,7 +650,8 @@
// Push all in-memory structures to the device. This is not necessary for sync since the log
// will take care of it. This will panic if called on the root parent store.
pub fn flush(&self, force: bool) -> Result<(), Error> {
- let mut object_sync = self.log.begin_object_sync(self.store_object_id);
+ let log = self.log();
+ let mut object_sync = log.begin_object_sync(self.store_object_id);
if !force && !object_sync.needs_sync() {
return Ok(());
}
@@ -649,7 +659,7 @@
let mut transaction = Transaction::new();
let object_handle =
parent_store.clone().create_object(&mut transaction, HandleOptions::default())?;
- self.log.commit(transaction); // This needs to encompass all the following.
+ self.log.upgrade().unwrap().commit(transaction); // This needs to encompass all the following.
let object_id = object_handle.object_id();
let handle =
parent_store.clone().open_object(self.store_object_id, HandleOptions::default())?;
diff --git a/src/storage/fxfs/src/object_store/allocator.rs b/src/storage/fxfs/src/object_store/allocator.rs
index efc1ed5..a11840d 100644
--- a/src/storage/fxfs/src/object_store/allocator.rs
+++ b/src/storage/fxfs/src/object_store/allocator.rs
@@ -19,11 +19,10 @@
merge::merge,
serde::{Deserialize, Serialize},
std::{
- borrow::Borrow,
cmp::min,
sync::{
atomic::{AtomicU64, Ordering::Relaxed},
- Arc, Mutex,
+ Arc, Mutex, Weak,
},
},
};
@@ -38,7 +37,7 @@
*/
pub trait Allocator: Send + Sync {
- fn init(&self, store: Arc<ObjectStore>, handle: Box<dyn ObjectHandle>);
+ fn init(&self, store: &Arc<ObjectStore>) -> Result<(), Error>;
fn object_id(&self) -> u64;
@@ -74,7 +73,7 @@
fn commit_deallocation(&self, item: AllocatorItem);
- fn open(&self, store: Arc<ObjectStore>, handle: Box<dyn ObjectHandle>) -> Result<(), Error>;
+ fn open(&self, store: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error>;
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
@@ -138,7 +137,7 @@
}
pub struct SimpleAllocator {
- log: Arc<Log>,
+ log: Weak<Log>,
object_id: AtomicU64,
tree: LSMTree<AllocatorKey, AllocatorValue>,
reserved_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
@@ -148,15 +147,14 @@
struct Inner {
store: Option<Arc<ObjectStore>>,
info: Option<AllocatorInfo>,
- object_handle: Option<Box<dyn ObjectHandle>>,
next_block: u64,
// reserved_deallocations: RefCell<Rc<SkipListLayer<AllocatorKey, AllocatorValue>>>,
}
impl SimpleAllocator {
- pub fn new(log: Arc<Log>) -> SimpleAllocator {
+ pub fn new(log: &Arc<Log>) -> SimpleAllocator {
SimpleAllocator {
- log,
+ log: Arc::downgrade(log),
// TODO: maybe make tree optional or put all data in separate structure.
object_id: AtomicU64::new(0),
tree: LSMTree::new(merge),
@@ -164,7 +162,6 @@
inner: Mutex::new(Inner {
store: None,
info: None,
- object_handle: None,
next_block: 0,
// reserved_deallocations: RefCell::new(Rc::new(SkipListLayer::new(1024))),
}),
@@ -173,29 +170,30 @@
}
impl Allocator for SimpleAllocator {
- fn init(&self, store: Arc<ObjectStore>, handle: Box<dyn ObjectHandle>) {
- println!("allocator object id {:?}", handle.object_id());
+ fn init(&self, store: &Arc<ObjectStore>) -> Result<(), Error> {
let mut inner = self.inner.lock().unwrap();
- inner.store = Some(store);
- self.object_id.store(handle.object_id(), Relaxed);
- inner.object_handle = Some(handle);
+ let mut transaction = Transaction::new();
+ let allocator_object = store.create_object(&mut transaction, HandleOptions::default())?;
+ self.log.upgrade().unwrap().commit(transaction);
+ inner.store = Some(store.clone());
+ self.object_id.store(allocator_object.object_id(), Relaxed);
inner.info = Some(AllocatorInfo { layers: Vec::new() });
+ Ok(())
}
- fn open(&self, store: Arc<ObjectStore>, handle: Box<dyn ObjectHandle>) -> Result<(), Error> {
- println!("allocator open object id {:?}", handle.object_id());
+ fn open(&self, store: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error> {
let mut inner = self.inner.lock().unwrap();
- self.object_id.store(handle.object_id(), Relaxed);
- let info: AllocatorInfo = deserialize_from(ObjectHandleCursor::new(handle.borrow(), 0))?;
- println!("done reading allocator file");
+ self.object_id.store(object_id, Relaxed);
+ let handle = store.open_object(object_id, HandleOptions::default())?;
+ let info: AllocatorInfo =
+ deserialize_from(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0))?;
let mut handles = Vec::new();
for object_id in &info.layers {
handles.push(store.open_object(*object_id, HandleOptions::default())?);
}
inner.info = Some(info);
self.tree.set_layers(handles.into_boxed_slice());
- inner.store = Some(store);
- inner.object_handle = Some(handle);
+ inner.store = Some(store.clone());
Ok(())
}
@@ -279,32 +277,29 @@
fn flush(&self, force: bool) -> Result<(), Error> {
println!("flushing allocator");
let mut inner = self.inner.lock().unwrap();
- let mut object_sync = self.log.begin_object_sync(self.object_id());
+ let log = self.log.upgrade().unwrap();
+ let mut object_sync = log.begin_object_sync(self.object_id());
if !force && !object_sync.needs_sync() {
println!("not forced");
return Ok(());
}
// TODO: what if there have been no allocations.
let mut transaction = Transaction::new();
- let object_handle = inner
- .store
- .as_ref()
- .unwrap()
- .create_object(&mut transaction, HandleOptions::default())?;
- self.log.commit(transaction); // TODO: Move to encompass all of this.
- let object_id = object_handle.object_id();
+ let store = inner.store.as_ref().unwrap();
+ let layer_object_handle =
+ store.create_object(&mut transaction, HandleOptions::default())?;
+ let object_handle = store.open_object(self.object_id(), HandleOptions::default())?;
+ log.commit(transaction); // TODO: Move to encompass all of this.
+ let object_id = layer_object_handle.object_id();
// TODO: clean up objects if there's an error.
inner.info.as_mut().unwrap().layers.push(object_id);
println!("serializing");
// let handle = &mut inner.object_handle;
serialize_into(
- ObjectHandleCursor::new(
- &**inner.object_handle.as_ref().unwrap() as &dyn ObjectHandle,
- 0,
- ),
+ ObjectHandleCursor::new(&object_handle as &dyn ObjectHandle, 0),
inner.info.as_ref().unwrap(),
)?;
- self.tree.commit(object_handle)?;
+ self.tree.commit(layer_object_handle)?;
object_sync.done();
println!("allocator flushed");
Ok(())
diff --git a/src/storage/fxfs/src/object_store/directory.rs b/src/storage/fxfs/src/object_store/directory.rs
index 25031e8..5ae8ec9 100644
--- a/src/storage/fxfs/src/object_store/directory.rs
+++ b/src/storage/fxfs/src/object_store/directory.rs
@@ -33,7 +33,7 @@
},
},
);
- self.store.log.commit(transaction);
+ self.store.log.upgrade().unwrap().commit(transaction);
Ok(handle)
}
diff --git a/src/storage/fxfs/src/object_store/filesystem.rs b/src/storage/fxfs/src/object_store/filesystem.rs
index ced3014..c9a727e 100644
--- a/src/storage/fxfs/src/object_store/filesystem.rs
+++ b/src/storage/fxfs/src/object_store/filesystem.rs
@@ -1,7 +1,9 @@
use {
crate::object_store::{
- allocator::SimpleAllocator, constants::ROOT_PARENT_STORE_OBJECT_ID, log::Log, Device,
- ObjectStore, StoreOptions,
+ allocator::{Allocator, SimpleAllocator},
+ constants::ROOT_PARENT_STORE_OBJECT_ID,
+ log::Log,
+ Device, ObjectStore, StoreOptions,
},
anyhow::Error,
std::{
@@ -64,23 +66,23 @@
impl Filesystem {
pub fn new_empty(device: Arc<dyn Device>) -> Result<Filesystem, Error> {
let log = Arc::new(Log::new());
- let allocator = Arc::new(SimpleAllocator::new(log.clone()));
+ let allocator = Arc::new(SimpleAllocator::new(&log));
let stores = Arc::new(StoreManager::new());
stores.new_store(ObjectStore::new_empty(
None,
ROOT_PARENT_STORE_OBJECT_ID,
device.clone(),
- allocator.clone(),
- log.clone(),
+ &(allocator.clone() as Arc<dyn Allocator>),
+ &log,
StoreOptions::default(),
));
- log.init_empty(stores.clone(), allocator.clone())?;
+ log.init_empty(&stores, &(allocator as Arc<dyn Allocator>))?;
Ok(Filesystem { device, log, stores })
}
pub fn open(device: Arc<dyn Device>) -> Result<Filesystem, Error> {
let log = Arc::new(Log::new());
- let allocator = Arc::new(SimpleAllocator::new(log.clone()));
+ let allocator = Arc::new(SimpleAllocator::new(&log));
let stores = Arc::new(StoreManager::new());
log.replay(device.clone(), stores.clone(), allocator.clone())?;
Ok(Filesystem { device, log, stores })
diff --git a/src/storage/fxfs/src/object_store/log.rs b/src/storage/fxfs/src/object_store/log.rs
index 8597ffa..9807b3a 100644
--- a/src/storage/fxfs/src/object_store/log.rs
+++ b/src/storage/fxfs/src/object_store/log.rs
@@ -318,8 +318,8 @@
None,
ROOT_PARENT_STORE_OBJECT_ID,
device,
- allocator.clone(),
- self.clone(),
+ &allocator,
+ &self,
StoreOptions::default(),
));
// Skip to the end of the block; super-block always occupies whole block.
@@ -418,22 +418,15 @@
writer.reset = !end_block;
writer.last_check_sum = reader.last_read_check_sum;
let root_store = stores.root_store();
- allocator.open(
- root_store.clone(),
- Box::new(
- root_store
- .clone()
- .open_object(super_block.allocator_object_id, HandleOptions::default())?,
- ),
- )?;
+ allocator.open(&root_store, super_block.allocator_object_id)?;
println!("replay done");
Ok(())
}
pub fn init_empty(
&self,
- stores: Arc<StoreManager>,
- allocator: Arc<dyn Allocator>,
+ stores: &Arc<StoreManager>,
+ allocator: &Arc<dyn Allocator>,
) -> Result<(), Error> {
let mut rng = rand::thread_rng();
let starting_check_sum: u64 = rng.gen();
@@ -451,12 +444,7 @@
)?);
let root_store = stores.root_store();
println!("root store object id {:?}", root_store.store_object_id());
- let mut transaction = Transaction::new();
- let allocator_object =
- Box::new(root_store.create_object(&mut transaction, HandleOptions::default())?);
- let allocator_object_id = allocator_object.object_id();
- allocator.init(root_store.clone(), allocator_object);
- self.commit(transaction);
+ allocator.init(&root_store)?;
allocator.set_next_block(MIN_SUPER_BLOCK_SIZE / 512); // TODO: stop using blocks.
let mut transaction = Transaction::new();
// TODO: Fix this hack; move to object_store code.
@@ -502,7 +490,7 @@
let mut log = self.log();
let super_block = &mut log.as_mut().unwrap().super_block;
super_block.root_store_object_id = stores.root_store().store_object_id();
- super_block.allocator_object_id = allocator_object_id;
+ super_block.allocator_object_id = allocator.object_id();
super_block.log_object_id = log_handle.object_id();
super_block.log_checkpoint = LogCheckpoint::new(0, starting_check_sum);
log.as_mut().unwrap().writer.handle = Some(log_handle);