Fix alignment bug and retain cycle

Change-Id: I1b36ce3a0f09817e8b8d57e3bc38b7d425f297d3
diff --git a/src/storage/fxfs/src/object_store.rs b/src/storage/fxfs/src/object_store.rs
index dec9309..e34e0d6 100644
--- a/src/storage/fxfs/src/object_store.rs
+++ b/src/storage/fxfs/src/object_store.rs
@@ -29,7 +29,7 @@
         cmp::min,
         io::{BufWriter, ErrorKind, Write},
         ops::{Bound, Range},
-        sync::{Arc, Mutex},
+        sync::{Arc, Mutex, Weak},
     },
 };
 
@@ -71,6 +71,7 @@
             let mut align_buf = vec![0; self.block_size as usize];
             self.read(start_offset, align_buf.as_mut_slice())?;
             &mut align_buf[start_align..(start_align + head.len())].copy_from_slice(head);
+            device_offset -= start_align as u64;
             self.store.device.write(device_offset, &align_buf)?;
             device_offset += self.block_size;
             remainder
@@ -117,7 +118,7 @@
                 break;
             }
             if let Some(overlap) = key.overlap(extent_key) {
-                self.store.allocator.deallocate(
+                self.store.allocator.upgrade().unwrap().deallocate(
                     transaction,
                     self.object_id,
                     key.attribute_id,
@@ -306,13 +307,19 @@
                 let device_range = self
                     .store
                     .allocator
+                    .upgrade()
+                    .unwrap()
                     .allocate(self.object_id, 0, aligned.clone())
                     .map_err(map_to_io_error)?;
                 let extent_len = device_range.end - device_range.start;
                 let end = aligned.start + extent_len;
-                let len = min(buf.len() - buf_offset, (aligned.end - offset) as usize);
+                let len = min(buf.len() - buf_offset, (end - offset) as usize);
                 assert!(len > 0);
-                self.write_at(offset, &buf[buf_offset..buf_offset + len], device_range.start)?;
+                self.write_at(
+                    offset,
+                    &buf[buf_offset..buf_offset + len],
+                    device_range.start + offset % self.block_size,
+                )?;
                 transaction.add(
                     self.store.store_object_id,
                     Mutation::ReplaceExtent {
@@ -329,7 +336,7 @@
                 buf_offset += len;
                 offset += len as u64;
             }
-            self.store.log.commit(transaction);
+            self.store.log.upgrade().unwrap().commit(transaction);
         }
         Ok(())
     }
@@ -346,6 +353,8 @@
             let device_range = self
                 .store
                 .allocator
+                .upgrade()
+                .unwrap()
                 .allocate(self.object_id, 0, file_range.clone())
                 .map_err(map_to_io_error)?;
             let this_file_range =
@@ -410,8 +419,8 @@
     store_object_id: u64,
     device: Arc<dyn Device>,
     block_size: u64,
-    allocator: Arc<dyn Allocator>,
-    log: Arc<Log>,
+    allocator: Weak<dyn Allocator>,
+    log: Weak<Log>,
     options: StoreOptions,
     store_info: Mutex<StoreInfo>,
     tree: LSMTree<ObjectKey, ObjectValue>,
@@ -422,8 +431,8 @@
         parent_store: Option<Arc<ObjectStore>>,
         store_object_id: u64,
         device: Arc<dyn Device>,
-        allocator: Arc<dyn Allocator>,
-        log: Arc<Log>,
+        allocator: &Arc<dyn Allocator>,
+        log: &Arc<Log>,
         store_info: StoreInfo,
         tree: LSMTree<ObjectKey, ObjectValue>,
         options: StoreOptions,
@@ -433,8 +442,8 @@
             store_object_id,
             device: device.clone(),
             block_size: device.block_size(),
-            allocator,
-            log,
+            allocator: Arc::downgrade(allocator),
+            log: Arc::downgrade(log),
             options,
             store_info: Mutex::new(store_info),
             tree,
@@ -445,8 +454,8 @@
         parent_store: Option<Arc<ObjectStore>>,
         store_object_id: u64,
         device: Arc<dyn Device>,
-        allocator: Arc<dyn Allocator>,
-        log: Arc<Log>,
+        allocator: &Arc<dyn Allocator>,
+        log: &Arc<Log>,
         options: StoreOptions,
     ) -> Arc<Self> {
         Self::new(
@@ -461,8 +470,8 @@
         )
     }
 
-    pub fn log(&self) -> &Log {
-        &self.log
+    pub fn log(&self) -> Arc<Log> {
+        self.log.upgrade().unwrap()
     }
 
     pub fn create_child_store(
@@ -474,13 +483,13 @@
         let mut transaction = Transaction::new();
         let handle =
             parent_store.clone().create_object(&mut transaction, HandleOptions::default())?;
-        parent_store.log.commit(transaction);
+        parent_store.log.upgrade().unwrap().commit(transaction);
         Ok(Self::new_empty(
             Some(parent_store.clone()),
             handle.object_id(),
             parent_store.device.clone(),
-            parent_store.allocator.clone(),
-            parent_store.log.clone(),
+            &parent_store.allocator.upgrade().unwrap(),
+            &parent_store.log.upgrade().unwrap(),
             options,
         ))
     }
@@ -509,8 +518,8 @@
             Some(self.clone()),
             store_object_id,
             self.device.clone(),
-            self.allocator.clone(),
-            self.log.clone(),
+            &self.allocator.upgrade().unwrap(),
+            &self.log.upgrade().unwrap(),
             store_info,
             LSMTree::open(merge::merge, handles.into_boxed_slice()),
             StoreOptions::default(),
@@ -591,7 +600,7 @@
                 },
             },
         );
-        self.log.commit(transaction);
+        self.log.upgrade().unwrap().commit(transaction);
         Ok(directory::Directory::new(self.clone(), object_id))
     }
 
@@ -641,7 +650,8 @@
     // Push all in-memory structures to the device. This is not necessary for sync since the log
     // will take care of it. This will panic if called on the root parent store.
     pub fn flush(&self, force: bool) -> Result<(), Error> {
-        let mut object_sync = self.log.begin_object_sync(self.store_object_id);
+        let log = self.log();
+        let mut object_sync = log.begin_object_sync(self.store_object_id);
         if !force && !object_sync.needs_sync() {
             return Ok(());
         }
@@ -649,7 +659,7 @@
         let mut transaction = Transaction::new();
         let object_handle =
             parent_store.clone().create_object(&mut transaction, HandleOptions::default())?;
-        self.log.commit(transaction); // This needs to encompass all the following.
+        self.log.upgrade().unwrap().commit(transaction); // This needs to encompass all the following.
         let object_id = object_handle.object_id();
         let handle =
             parent_store.clone().open_object(self.store_object_id, HandleOptions::default())?;
diff --git a/src/storage/fxfs/src/object_store/allocator.rs b/src/storage/fxfs/src/object_store/allocator.rs
index efc1ed5..a11840d 100644
--- a/src/storage/fxfs/src/object_store/allocator.rs
+++ b/src/storage/fxfs/src/object_store/allocator.rs
@@ -19,11 +19,10 @@
     merge::merge,
     serde::{Deserialize, Serialize},
     std::{
-        borrow::Borrow,
         cmp::min,
         sync::{
             atomic::{AtomicU64, Ordering::Relaxed},
-            Arc, Mutex,
+            Arc, Mutex, Weak,
         },
     },
 };
@@ -38,7 +37,7 @@
 */
 
 pub trait Allocator: Send + Sync {
-    fn init(&self, store: Arc<ObjectStore>, handle: Box<dyn ObjectHandle>);
+    fn init(&self, store: &Arc<ObjectStore>) -> Result<(), Error>;
 
     fn object_id(&self) -> u64;
 
@@ -74,7 +73,7 @@
 
     fn commit_deallocation(&self, item: AllocatorItem);
 
-    fn open(&self, store: Arc<ObjectStore>, handle: Box<dyn ObjectHandle>) -> Result<(), Error>;
+    fn open(&self, store: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error>;
 }
 
 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
@@ -138,7 +137,7 @@
 }
 
 pub struct SimpleAllocator {
-    log: Arc<Log>,
+    log: Weak<Log>,
     object_id: AtomicU64,
     tree: LSMTree<AllocatorKey, AllocatorValue>,
     reserved_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
@@ -148,15 +147,14 @@
 struct Inner {
     store: Option<Arc<ObjectStore>>,
     info: Option<AllocatorInfo>,
-    object_handle: Option<Box<dyn ObjectHandle>>,
     next_block: u64,
     //    reserved_deallocations: RefCell<Rc<SkipListLayer<AllocatorKey, AllocatorValue>>>,
 }
 
 impl SimpleAllocator {
-    pub fn new(log: Arc<Log>) -> SimpleAllocator {
+    pub fn new(log: &Arc<Log>) -> SimpleAllocator {
         SimpleAllocator {
-            log,
+            log: Arc::downgrade(log),
             // TODO: maybe make tree optional or put all data in separate structure.
             object_id: AtomicU64::new(0),
             tree: LSMTree::new(merge),
@@ -164,7 +162,6 @@
             inner: Mutex::new(Inner {
                 store: None,
                 info: None,
-                object_handle: None,
                 next_block: 0,
                 //            reserved_deallocations: RefCell::new(Rc::new(SkipListLayer::new(1024))),
             }),
@@ -173,29 +170,30 @@
 }
 
 impl Allocator for SimpleAllocator {
-    fn init(&self, store: Arc<ObjectStore>, handle: Box<dyn ObjectHandle>) {
-        println!("allocator object id {:?}", handle.object_id());
+    fn init(&self, store: &Arc<ObjectStore>) -> Result<(), Error> {
         let mut inner = self.inner.lock().unwrap();
-        inner.store = Some(store);
-        self.object_id.store(handle.object_id(), Relaxed);
-        inner.object_handle = Some(handle);
+        let mut transaction = Transaction::new();
+        let allocator_object = store.create_object(&mut transaction, HandleOptions::default())?;
+        self.log.upgrade().unwrap().commit(transaction);
+        inner.store = Some(store.clone());
+        self.object_id.store(allocator_object.object_id(), Relaxed);
         inner.info = Some(AllocatorInfo { layers: Vec::new() });
+        Ok(())
     }
 
-    fn open(&self, store: Arc<ObjectStore>, handle: Box<dyn ObjectHandle>) -> Result<(), Error> {
-        println!("allocator open object id {:?}", handle.object_id());
+    fn open(&self, store: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error> {
         let mut inner = self.inner.lock().unwrap();
-        self.object_id.store(handle.object_id(), Relaxed);
-        let info: AllocatorInfo = deserialize_from(ObjectHandleCursor::new(handle.borrow(), 0))?;
-        println!("done reading allocator file");
+        self.object_id.store(object_id, Relaxed);
+        let handle = store.open_object(object_id, HandleOptions::default())?;
+        let info: AllocatorInfo =
+            deserialize_from(ObjectHandleCursor::new(&handle as &dyn ObjectHandle, 0))?;
         let mut handles = Vec::new();
         for object_id in &info.layers {
             handles.push(store.open_object(*object_id, HandleOptions::default())?);
         }
         inner.info = Some(info);
         self.tree.set_layers(handles.into_boxed_slice());
-        inner.store = Some(store);
-        inner.object_handle = Some(handle);
+        inner.store = Some(store.clone());
         Ok(())
     }
 
@@ -279,32 +277,29 @@
     fn flush(&self, force: bool) -> Result<(), Error> {
         println!("flushing allocator");
         let mut inner = self.inner.lock().unwrap();
-        let mut object_sync = self.log.begin_object_sync(self.object_id());
+        let log = self.log.upgrade().unwrap();
+        let mut object_sync = log.begin_object_sync(self.object_id());
         if !force && !object_sync.needs_sync() {
             println!("not forced");
             return Ok(());
         }
         // TODO: what if there have been no allocations.
         let mut transaction = Transaction::new();
-        let object_handle = inner
-            .store
-            .as_ref()
-            .unwrap()
-            .create_object(&mut transaction, HandleOptions::default())?;
-        self.log.commit(transaction); // TODO: Move to encompass all of this.
-        let object_id = object_handle.object_id();
+        let store = inner.store.as_ref().unwrap();
+        let layer_object_handle =
+            store.create_object(&mut transaction, HandleOptions::default())?;
+        let object_handle = store.open_object(self.object_id(), HandleOptions::default())?;
+        log.commit(transaction); // TODO: Move to encompass all of this.
+        let object_id = layer_object_handle.object_id();
         // TODO: clean up objects if there's an error.
         inner.info.as_mut().unwrap().layers.push(object_id);
         println!("serializing");
         // let handle = &mut inner.object_handle;
         serialize_into(
-            ObjectHandleCursor::new(
-                &**inner.object_handle.as_ref().unwrap() as &dyn ObjectHandle,
-                0,
-            ),
+            ObjectHandleCursor::new(&object_handle as &dyn ObjectHandle, 0),
             inner.info.as_ref().unwrap(),
         )?;
-        self.tree.commit(object_handle)?;
+        self.tree.commit(layer_object_handle)?;
         object_sync.done();
         println!("allocator flushed");
         Ok(())
diff --git a/src/storage/fxfs/src/object_store/directory.rs b/src/storage/fxfs/src/object_store/directory.rs
index 25031e8..5ae8ec9 100644
--- a/src/storage/fxfs/src/object_store/directory.rs
+++ b/src/storage/fxfs/src/object_store/directory.rs
@@ -33,7 +33,7 @@
                 },
             },
         );
-        self.store.log.commit(transaction);
+        self.store.log.upgrade().unwrap().commit(transaction);
         Ok(handle)
     }
 
diff --git a/src/storage/fxfs/src/object_store/filesystem.rs b/src/storage/fxfs/src/object_store/filesystem.rs
index ced3014..c9a727e 100644
--- a/src/storage/fxfs/src/object_store/filesystem.rs
+++ b/src/storage/fxfs/src/object_store/filesystem.rs
@@ -1,7 +1,9 @@
 use {
     crate::object_store::{
-        allocator::SimpleAllocator, constants::ROOT_PARENT_STORE_OBJECT_ID, log::Log, Device,
-        ObjectStore, StoreOptions,
+        allocator::{Allocator, SimpleAllocator},
+        constants::ROOT_PARENT_STORE_OBJECT_ID,
+        log::Log,
+        Device, ObjectStore, StoreOptions,
     },
     anyhow::Error,
     std::{
@@ -64,23 +66,23 @@
 impl Filesystem {
     pub fn new_empty(device: Arc<dyn Device>) -> Result<Filesystem, Error> {
         let log = Arc::new(Log::new());
-        let allocator = Arc::new(SimpleAllocator::new(log.clone()));
+        let allocator = Arc::new(SimpleAllocator::new(&log));
         let stores = Arc::new(StoreManager::new());
         stores.new_store(ObjectStore::new_empty(
             None,
             ROOT_PARENT_STORE_OBJECT_ID,
             device.clone(),
-            allocator.clone(),
-            log.clone(),
+            &(allocator.clone() as Arc<dyn Allocator>),
+            &log,
             StoreOptions::default(),
         ));
-        log.init_empty(stores.clone(), allocator.clone())?;
+        log.init_empty(&stores, &(allocator as Arc<dyn Allocator>))?;
         Ok(Filesystem { device, log, stores })
     }
 
     pub fn open(device: Arc<dyn Device>) -> Result<Filesystem, Error> {
         let log = Arc::new(Log::new());
-        let allocator = Arc::new(SimpleAllocator::new(log.clone()));
+        let allocator = Arc::new(SimpleAllocator::new(&log));
         let stores = Arc::new(StoreManager::new());
         log.replay(device.clone(), stores.clone(), allocator.clone())?;
         Ok(Filesystem { device, log, stores })
diff --git a/src/storage/fxfs/src/object_store/log.rs b/src/storage/fxfs/src/object_store/log.rs
index 8597ffa..9807b3a 100644
--- a/src/storage/fxfs/src/object_store/log.rs
+++ b/src/storage/fxfs/src/object_store/log.rs
@@ -318,8 +318,8 @@
             None,
             ROOT_PARENT_STORE_OBJECT_ID,
             device,
-            allocator.clone(),
-            self.clone(),
+            &allocator,
+            &self,
             StoreOptions::default(),
         ));
         // Skip to the end of the block; super-block always occupies whole block.
@@ -418,22 +418,15 @@
         writer.reset = !end_block;
         writer.last_check_sum = reader.last_read_check_sum;
         let root_store = stores.root_store();
-        allocator.open(
-            root_store.clone(),
-            Box::new(
-                root_store
-                    .clone()
-                    .open_object(super_block.allocator_object_id, HandleOptions::default())?,
-            ),
-        )?;
+        allocator.open(&root_store, super_block.allocator_object_id)?;
         println!("replay done");
         Ok(())
     }
 
     pub fn init_empty(
         &self,
-        stores: Arc<StoreManager>,
-        allocator: Arc<dyn Allocator>,
+        stores: &Arc<StoreManager>,
+        allocator: &Arc<dyn Allocator>,
     ) -> Result<(), Error> {
         let mut rng = rand::thread_rng();
         let starting_check_sum: u64 = rng.gen();
@@ -451,12 +444,7 @@
         )?);
         let root_store = stores.root_store();
         println!("root store object id {:?}", root_store.store_object_id());
-        let mut transaction = Transaction::new();
-        let allocator_object =
-            Box::new(root_store.create_object(&mut transaction, HandleOptions::default())?);
-        let allocator_object_id = allocator_object.object_id();
-        allocator.init(root_store.clone(), allocator_object);
-        self.commit(transaction);
+        allocator.init(&root_store)?;
         allocator.set_next_block(MIN_SUPER_BLOCK_SIZE / 512); // TODO: stop using blocks.
         let mut transaction = Transaction::new();
         // TODO: Fix this hack; move to object_store code.
@@ -502,7 +490,7 @@
         let mut log = self.log();
         let super_block = &mut log.as_mut().unwrap().super_block;
         super_block.root_store_object_id = stores.root_store().store_object_id();
-        super_block.allocator_object_id = allocator_object_id;
+        super_block.allocator_object_id = allocator.object_id();
         super_block.log_object_id = log_handle.object_id();
         super_block.log_checkpoint = LogCheckpoint::new(0, starting_check_sum);
         log.as_mut().unwrap().writer.handle = Some(log_handle);