| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| mod merge; |
| |
| use { |
| crate::{ |
| lsm_tree::{ |
| skip_list_layer::SkipListLayer, |
| types::{ |
| BoxedLayerIterator, Item, ItemRef, LayerIterator, MutableLayer, OrdLowerBound, |
| }, |
| LSMTree, |
| }, |
| object_handle::{ObjectHandle, ObjectHandleExt}, |
| object_store::{ |
| filesystem::{Filesystem, Mutations}, |
| transaction::{AllocatorMutation, AssociatedObject, Mutation, Transaction}, |
| HandleOptions, |
| }, |
| }, |
| anyhow::{ensure, Error}, |
| async_trait::async_trait, |
| bincode::{deserialize_from, serialize_into}, |
| merge::merge, |
| serde::{Deserialize, Serialize}, |
| std::{ |
| cmp::min, |
| ops::{Bound, Range}, |
| sync::{Arc, Mutex, Weak}, |
| }, |
| }; |
| |
| /// Allocators must implement this. An allocator is responsible for allocating ranges on behalf of |
| /// an object-store. |
| #[async_trait] |
| pub trait Allocator: Send + Sync { |
| /// Returns the object ID for the allocator. |
| fn object_id(&self) -> u64; |
| |
| /// Tries to allocate enough space for |object_range| in the specified object and returns the |
| /// device ranges allocated. |
| async fn allocate( |
| &self, |
| transaction: &mut Transaction<'_>, |
| len: u64, |
| ) -> Result<Range<u64>, Error>; |
| |
| /// Deallocates the given device range for the specified object. |
| async fn deallocate(&self, transaction: &mut Transaction<'_>, device_range: Range<u64>); |
| |
| /// Push all in-memory structures to the device. This is not necessary for sync since the |
| /// journal will take care of it. |
| async fn flush(&self, force: bool) -> Result<(), Error>; |
| |
| /// Reserve the given device range. The main use case for this at this time is for the |
| /// super-block which needs to be at a fixed location on the device. |
| async fn reserve(&self, transaction: &mut Transaction<'_>, device_range: Range<u64>); |
| |
| /// Cast to super-trait. |
| fn as_mutations(self: Arc<Self>) -> Arc<dyn Mutations>; |
| } |
| |
| // Our allocator implementation tracks extents with a reference count. At time of writing, these |
| // reference counts should never exceed 1, but that might change with snapshots and clones. |
| |
| #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] |
| pub struct AllocatorKey { |
| device_range: Range<u64>, |
| } |
| |
| impl Ord for AllocatorKey { |
| fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering { |
| self.device_range.end.cmp(&other.device_range.end) |
| } |
| } |
| |
| impl PartialOrd for AllocatorKey { |
| fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> { |
| Some(self.cmp(other)) |
| } |
| } |
| |
| impl OrdLowerBound for AllocatorKey { |
| fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering { |
| self.device_range.start.cmp(&other.device_range.start) |
| } |
| } |
| |
| #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] |
| pub struct AllocatorValue { |
| // This is the delta on a reference count for the extent. |
| delta: i64, |
| } |
| |
| pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>; |
| |
| #[derive(Debug, Default, Deserialize, Serialize)] |
| struct AllocatorInfo { |
| layers: Vec<u64>, |
| } |
| |
| const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131072; |
| |
| // For now this just implements a first-fit strategy. This is a very naiive implementation. |
| pub struct SimpleAllocator { |
| filesystem: Weak<dyn Filesystem>, |
| block_size: u32, |
| object_id: u64, |
| empty: bool, |
| tree: LSMTree<AllocatorKey, AllocatorValue>, |
| reserved_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>, |
| inner: Mutex<Inner>, |
| } |
| |
| struct Inner { |
| info: AllocatorInfo, |
| // The allocator can only be opened if there have been no allocations and it has not already |
| // been opened or initialized. |
| opened: bool, |
| // When a transaction is dropped, we need to release the reservation, but that requires the use |
| // of async methods which we can't use when called from drop. To workaround that, we keep an |
| // array of dropped_allocations and update reserved_allocations the next time we try to |
| // allocate. |
| dropped_allocations: Vec<AllocatorItem>, |
| } |
| |
| impl SimpleAllocator { |
| pub fn new(filesystem: Arc<dyn Filesystem>, object_id: u64, empty: bool) -> SimpleAllocator { |
| SimpleAllocator { |
| filesystem: Arc::downgrade(&filesystem), |
| block_size: filesystem.device().block_size(), |
| object_id, |
| empty, |
| tree: LSMTree::new(merge), |
| reserved_allocations: SkipListLayer::new(1024), // TODO(csuter): magic numbers |
| inner: Mutex::new(Inner { |
| info: AllocatorInfo::default(), |
| opened: false, |
| dropped_allocations: Vec::new(), |
| }), |
| } |
| } |
| |
| // Ensures the allocator is open. If empty, create the object in the root object store, |
| // otherwise load and initialise the LSM tree. |
| async fn ensure_open(&self) -> Result<(), Error> { |
| { |
| if self.inner.lock().unwrap().opened { |
| return Ok(()); |
| } |
| } |
| |
| let filesystem = self.filesystem.upgrade().unwrap(); |
| let root_store = filesystem.root_store(); |
| |
| if self.empty { |
| let mut transaction = filesystem.clone().new_transaction(&[]).await?; |
| root_store |
| .create_object_with_id(&mut transaction, self.object_id(), HandleOptions::default()) |
| .await?; |
| transaction.commit().await; |
| } else { |
| let handle = root_store.open_object(self.object_id, HandleOptions::default()).await?; |
| |
| if handle.get_size() > 0 { |
| let serialized_info = handle.contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE).await?; |
| let info: AllocatorInfo = deserialize_from(&serialized_info[..])?; |
| let mut handles = Vec::new(); |
| for object_id in &info.layers { |
| handles |
| .push(root_store.open_object(*object_id, HandleOptions::default()).await?); |
| } |
| self.inner.lock().unwrap().info = info; |
| self.tree.set_layers(handles.into_boxed_slice()).await?; |
| } |
| } |
| |
| self.inner.lock().unwrap().opened = true; |
| Ok(()) |
| } |
| } |
| |
| #[async_trait] |
| impl Allocator for SimpleAllocator { |
| fn object_id(&self) -> u64 { |
| self.object_id |
| } |
| |
| async fn allocate( |
| &self, |
| transaction: &mut Transaction<'_>, |
| len: u64, |
| ) -> Result<Range<u64>, Error> { |
| ensure!(len % self.block_size as u64 == 0); |
| |
| self.ensure_open().await?; |
| |
| // Update reserved_allocations using dropped_allocations. |
| let dropped_allocations = |
| std::mem::take(&mut self.inner.lock().unwrap().dropped_allocations); |
| for item in dropped_allocations { |
| self.reserved_allocations.erase(item.as_item_ref()).await; |
| } |
| |
| let tree = &self.tree; |
| let result = { |
| let mut layer_set = tree.layer_set(); |
| layer_set.add_layer(self.reserved_allocations.clone()); |
| let mut merger = layer_set.merger(); |
| let mut iter = merger.seek(Bound::Unbounded).await?; |
| let mut last_offset = 0; |
| loop { |
| let next = iter.get(); |
| match next { |
| None => { |
| // TODO(csuter): Don't assume infinite device size. |
| break last_offset..last_offset + len; |
| } |
| Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => { |
| if device_range.start > last_offset { |
| break last_offset..min(last_offset + len, device_range.start); |
| } |
| last_offset = device_range.end; |
| } |
| } |
| iter.advance().await?; |
| } |
| }; |
| log::debug!("allocate {:?}", result); |
| self.reserve(transaction, result.clone()).await; |
| Ok(result) |
| } |
| |
| async fn reserve(&self, transaction: &mut Transaction<'_>, device_range: Range<u64>) { |
| let item = AllocatorItem::new(AllocatorKey { device_range }, AllocatorValue { delta: 1 }); |
| self.reserved_allocations.insert(item.clone()).await; |
| transaction.add(self.object_id(), Mutation::allocation(item)); |
| } |
| |
| async fn deallocate(&self, transaction: &mut Transaction<'_>, device_range: Range<u64>) { |
| log::debug!("deallocate {:?}", device_range); |
| |
| transaction.add( |
| self.object_id(), |
| Mutation::allocation(Item::new( |
| AllocatorKey { device_range }, |
| AllocatorValue { delta: -1 }, |
| )), |
| ); |
| } |
| |
| async fn flush(&self, force: bool) -> Result<(), Error> { |
| self.ensure_open().await?; |
| |
| let filesystem = self.filesystem.upgrade().unwrap(); |
| let object_sync = filesystem.begin_object_sync(self.object_id()); |
| if !force && !object_sync.needs_sync() { |
| return Ok(()); |
| } |
| |
| // TODO(csuter): This all needs to be atomic somehow. We'll need to use different |
| // transactions for each stage, but we need make sure objects are cleaned up if there's a |
| // failure. |
| let mut transaction = filesystem.clone().new_transaction(&[]).await?; |
| |
| let root_store = self.filesystem.upgrade().unwrap().root_store(); |
| let layer_object_handle = |
| root_store.create_object(&mut transaction, HandleOptions::default()).await?; |
| |
| transaction.add(self.object_id(), Mutation::TreeSeal); |
| transaction.commit().await; |
| |
| let object_id = layer_object_handle.object_id(); |
| let layer_set = self.tree.immutable_layer_set(); |
| let mut merger = layer_set.merger(); |
| self.tree |
| .compact_with_iterator( |
| CoalescingIterator::new(Box::new(merger.seek(Bound::Unbounded).await?)).await?, |
| layer_object_handle, |
| ) |
| .await?; |
| |
| log::debug!("using {} for allocator layer file", object_id); |
| let object_handle = |
| root_store.open_object(self.object_id(), HandleOptions::default()).await?; |
| // TODO(jfsulliv): Can we preallocate the buffer instead of doing a bounce? Do we know the |
| // size up front? |
| let mut serialized_info = Vec::new(); |
| { |
| let mut inner = self.inner.lock().unwrap(); |
| inner.info.layers.push(object_id); |
| serialize_into(&mut serialized_info, &inner.info)?; |
| } |
| let mut buf = object_handle.allocate_buffer(serialized_info.len()); |
| buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]); |
| let mut transaction = filesystem.clone().new_transaction(&[]).await?; |
| object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?; |
| |
| transaction.add(self.object_id(), Mutation::TreeCompact); |
| transaction.commit().await; |
| |
| object_sync.commit(); |
| Ok(()) |
| } |
| |
| fn as_mutations(self: Arc<Self>) -> Arc<dyn Mutations> { |
| self |
| } |
| } |
| |
| #[async_trait] |
| impl Mutations for SimpleAllocator { |
| async fn apply_mutation( |
| &self, |
| mutation: Mutation, |
| replay: bool, |
| _object: Option<AssociatedObject<'_>>, |
| ) { |
| match mutation { |
| Mutation::Allocator(AllocatorMutation(item)) => { |
| self.reserved_allocations.erase(item.as_item_ref()).await; |
| let lower_bound = lower_bound_for_replace_range(&item.key); |
| self.tree.merge_into(item, &lower_bound).await; |
| } |
| Mutation::TreeSeal => self.tree.seal(), |
| Mutation::TreeCompact => { |
| if replay { |
| self.tree.reset_immutable_layers(); |
| } |
| } |
| _ => panic!("unexpected mutation! {:?}", mutation), // TODO(csuter): This can't panic |
| } |
| } |
| |
| fn drop_mutation(&self, mutation: Mutation) { |
| match mutation { |
| Mutation::Allocator(AllocatorMutation(item)) if item.value.delta > 0 => { |
| self.inner.lock().unwrap().dropped_allocations.push(item); |
| } |
| _ => {} |
| } |
| } |
| } |
| |
| fn lower_bound_for_replace_range(key: &AllocatorKey) -> AllocatorKey { |
| AllocatorKey { device_range: 0..key.device_range.start } |
| } |
| |
| // The merger is unable to merge extents that exist like the following: |
| // |
| // |----- +1 -----| |
| // |----- -1 -----| |
| // |----- +2 -----| |
| // |
| // It cannot coalesce them because it has to emit the +1 record so that it can move on and merge the |
| // -1 and +2 records. To address this, we add another stage that applies after merging which |
| // coalesces records after they have been emitted. This is a bit simpler than merging because the |
| // records cannot overlap, so it's just a question of merging adjacent records if they happen to |
| // have the same delta. |
| |
| struct CoalescingIterator<'a> { |
| iter: BoxedLayerIterator<'a, AllocatorKey, AllocatorValue>, |
| item: Option<AllocatorItem>, |
| } |
| |
| impl<'a> CoalescingIterator<'a> { |
| async fn new( |
| iter: BoxedLayerIterator<'a, AllocatorKey, AllocatorValue>, |
| ) -> Result<CoalescingIterator<'a>, Error> { |
| let mut iter = Self { iter, item: None }; |
| iter.advance().await?; |
| Ok(iter) |
| } |
| } |
| |
| #[async_trait] |
| impl LayerIterator<AllocatorKey, AllocatorValue> for CoalescingIterator<'_> { |
| async fn advance(&mut self) -> Result<(), Error> { |
| self.item = self.iter.get().map(|x| x.cloned()); |
| if self.item.is_none() { |
| return Ok(()); |
| } |
| let left = self.item.as_mut().unwrap(); |
| loop { |
| self.iter.advance().await?; |
| match self.iter.get() { |
| None => return Ok(()), |
| Some(right) => { |
| // The two records cannot overlap. |
| assert!(left.key.device_range.end <= right.key.device_range.start); |
| // We can only coalesce the records if they are touching and have the same |
| // delta. |
| if left.key.device_range.end < right.key.device_range.start |
| || left.value.delta != right.value.delta |
| { |
| return Ok(()); |
| } |
| left.key.device_range.end = right.key.device_range.end; |
| } |
| } |
| } |
| } |
| |
| fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> { |
| self.item.as_ref().map(|x| x.as_item_ref()) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| crate::{ |
| lsm_tree::{ |
| skip_list_layer::SkipListLayer, |
| types::{Item, ItemRef, Layer, LayerIterator, MutableLayer}, |
| LSMTree, |
| }, |
| object_store::{ |
| allocator::{ |
| merge::merge, Allocator, AllocatorKey, AllocatorValue, CoalescingIterator, |
| SimpleAllocator, |
| }, |
| testing::fake_filesystem::FakeFilesystem, |
| transaction::TransactionHandler, |
| ObjectStore, |
| }, |
| testing::fake_device::FakeDevice, |
| }, |
| fuchsia_async as fasync, |
| std::{ |
| cmp::{max, min}, |
| ops::{Bound, Range}, |
| sync::Arc, |
| }, |
| }; |
| |
| // TODO(jfsulliv): move to a range_utils module or something similar. |
| fn range_len(r: &Range<u64>) -> u64 { |
| r.end - r.start |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_coalescing_iterator() { |
| let skip_list = SkipListLayer::new(100); |
| let items = [ |
| Item::new(AllocatorKey { device_range: 0..100 }, AllocatorValue { delta: 1 }), |
| Item::new(AllocatorKey { device_range: 100..200 }, AllocatorValue { delta: 1 }), |
| ]; |
| skip_list.insert(items[1].clone()).await; |
| skip_list.insert(items[0].clone()).await; |
| let mut iter = |
| CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed")) |
| .await |
| .expect("new failed"); |
| let ItemRef { key, value } = iter.get().expect("get failed"); |
| assert_eq!( |
| (key, value), |
| (&AllocatorKey { device_range: 0..200 }, &AllocatorValue { delta: 1 }) |
| ); |
| iter.advance().await.expect("advance failed"); |
| assert!(iter.get().is_none()); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_merge_and_coalesce_across_three_layers() { |
| let lsm_tree = LSMTree::new(merge); |
| lsm_tree |
| .insert(Item::new(AllocatorKey { device_range: 100..200 }, AllocatorValue { delta: 2 })) |
| .await; |
| lsm_tree.seal(); |
| lsm_tree |
| .insert(Item::new( |
| AllocatorKey { device_range: 100..200 }, |
| AllocatorValue { delta: -1 }, |
| )) |
| .await; |
| lsm_tree.seal(); |
| lsm_tree |
| .insert(Item::new(AllocatorKey { device_range: 0..100 }, AllocatorValue { delta: 1 })) |
| .await; |
| |
| let layer_set = lsm_tree.layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = CoalescingIterator::new(Box::new( |
| merger.seek(Bound::Unbounded).await.expect("seek failed"), |
| )) |
| .await |
| .expect("new failed"); |
| let ItemRef { key, value } = iter.get().expect("get failed"); |
| assert_eq!( |
| (key, value), |
| (&AllocatorKey { device_range: 0..200 }, &AllocatorValue { delta: 1 }) |
| ); |
| iter.advance().await.expect("advance failed"); |
| assert!(iter.get().is_none()); |
| } |
| |
| fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 { |
| if a.end > b.start && a.start < b.end { |
| min(a.end, b.end) - max(a.start, b.start) |
| } else { |
| 0 |
| } |
| } |
| |
| async fn check_allocations(allocator: &SimpleAllocator, expected_allocations: &[Range<u64>]) { |
| let layer_set = allocator.tree.layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = merger.seek(Bound::Unbounded).await.expect("seek failed"); |
| let mut found = 0; |
| while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() { |
| let mut l = range_len(device_range); |
| found += l; |
| // Make sure that the entire range we have found completely overlaps with all the |
| // allocations we expect to find. |
| for range in expected_allocations { |
| l -= overlap(range, device_range); |
| if l == 0 { |
| break; |
| } |
| } |
| assert_eq!(l, 0); |
| iter.advance().await.expect("advance failed"); |
| } |
| // Make sure the total we found adds up to what we expect. |
| assert_eq!(found, expected_allocations.iter().map(|r| range_len(r)).sum()); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_allocations() { |
| let device = Arc::new(FakeDevice::new(1024, 512)); |
| let fs = FakeFilesystem::new(device); |
| let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true)); |
| fs.object_manager().set_allocator(allocator.clone()); |
| let _store = ObjectStore::new_empty(None, 2, fs.clone()); |
| fs.object_manager().set_root_store_object_id(2); |
| let mut transaction = fs.clone().new_transaction(&[]).await.expect("new failed"); |
| let mut device_ranges = Vec::new(); |
| device_ranges |
| .push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed")); |
| assert!(range_len(device_ranges.last().unwrap()) == 512); |
| device_ranges |
| .push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed")); |
| assert!(range_len(device_ranges.last().unwrap()) == 512); |
| assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0); |
| transaction.commit().await; |
| let mut transaction = fs.clone().new_transaction(&[]).await.expect("new failed"); |
| device_ranges |
| .push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed")); |
| assert!(range_len(&device_ranges[2]) == 512); |
| assert_eq!(overlap(&device_ranges[0], &device_ranges[2]), 0); |
| assert_eq!(overlap(&device_ranges[1], &device_ranges[2]), 0); |
| transaction.commit().await; |
| |
| check_allocations(&allocator, &device_ranges).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_deallocations() { |
| let device = Arc::new(FakeDevice::new(1024, 512)); |
| let fs = FakeFilesystem::new(device); |
| let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true)); |
| fs.object_manager().set_allocator(allocator.clone()); |
| let _store = ObjectStore::new_empty(None, 2, fs.clone()); |
| fs.object_manager().set_root_store_object_id(2); |
| let mut transaction = fs.clone().new_transaction(&[]).await.expect("new failed"); |
| let device_range1 = |
| allocator.allocate(&mut transaction, 512).await.expect("allocate failed"); |
| assert!(range_len(&device_range1) == 512); |
| transaction.commit().await; |
| |
| let mut transaction = fs.clone().new_transaction(&[]).await.expect("new failed"); |
| allocator.deallocate(&mut transaction, device_range1).await; |
| transaction.commit().await; |
| |
| check_allocations(&allocator, &[]).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_reserve() { |
| let device = Arc::new(FakeDevice::new(1024, 512)); |
| let fs = FakeFilesystem::new(device); |
| let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true)); |
| fs.object_manager().set_allocator(allocator.clone()); |
| let _store = ObjectStore::new_empty(None, 2, fs.clone()); |
| fs.object_manager().set_root_store_object_id(2); |
| let mut transaction = fs.clone().new_transaction(&[]).await.expect("new failed"); |
| let mut device_ranges = Vec::new(); |
| device_ranges.push(0..512); |
| allocator.reserve(&mut transaction, device_ranges.last().unwrap().clone()).await; |
| device_ranges |
| .push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed")); |
| assert!(range_len(device_ranges.last().unwrap()) == 512); |
| assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0); |
| transaction.commit().await; |
| |
| check_allocations(&allocator, &device_ranges).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_flush() { |
| let device = Arc::new(FakeDevice::new(1024, 512)); |
| let fs = FakeFilesystem::new(device); |
| let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true)); |
| fs.object_manager().set_allocator(allocator.clone()); |
| let _store = ObjectStore::new_empty(None, 2, fs.clone()); |
| fs.object_manager().set_root_store_object_id(2); |
| let mut transaction = fs.clone().new_transaction(&[]).await.expect("new failed"); |
| let mut device_ranges = Vec::new(); |
| device_ranges |
| .push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed")); |
| device_ranges |
| .push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed")); |
| device_ranges |
| .push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed")); |
| transaction.commit().await; |
| |
| allocator.flush(false).await.expect("flush failed"); |
| |
| let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, false)); |
| fs.object_manager().set_allocator(allocator.clone()); |
| // When we flushed the allocator, it would have been written to the device somewhere but |
| // without a journal, we will be missing those records, so this next allocation will likely |
| // be on top of those objects. That won't matter for the purposes of this test, since we |
| // are not writing anything to these ranges. |
| let mut transaction = fs.clone().new_transaction(&[]).await.expect("new failed"); |
| device_ranges |
| .push(allocator.allocate(&mut transaction, 512).await.expect("allocate failed")); |
| for r in &device_ranges[..3] { |
| assert_eq!(overlap(r, device_ranges.last().unwrap()), 0); |
| } |
| transaction.commit().await; |
| check_allocations(&allocator, &device_ranges).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_dropped_transaction() { |
| let device = Arc::new(FakeDevice::new(1024, 512)); |
| let fs = FakeFilesystem::new(device); |
| let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1, true)); |
| fs.object_manager().set_allocator(allocator.clone()); |
| let _store = ObjectStore::new_empty(None, 2, fs.clone()); |
| fs.object_manager().set_root_store_object_id(2); |
| let allocated_range = { |
| let mut transaction = |
| fs.clone().new_transaction(&[]).await.expect("new_transaction failed"); |
| allocator.allocate(&mut transaction, 512).await.expect("allocate failed") |
| }; |
| // After dropping the transaction and attempting to allocate again, we should end up with |
| // the same range because the reservation should have been released. |
| let mut transaction = |
| fs.clone().new_transaction(&[]).await.expect("new_transaction failed"); |
| assert_eq!( |
| allocator.allocate(&mut transaction, 512).await.expect("allocate failed"), |
| allocated_range |
| ); |
| } |
| } |
| |
| // TODO(csuter): deallocations can't be used until mutations have been written to the device and the |
| // device has been flushed. |
| |
| // TODO(csuter): the locking needs to be investigated and fixed here. There are likely problems |
| // with ensure_open, and allocate. |