| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| pub mod merge; |
| |
| use { |
| crate::{ |
| debug_assert_not_too_long, |
| errors::FxfsError, |
| filesystem::{ApplyContext, ApplyMode, Filesystem, JournalingObject, SyncOptions}, |
| log::*, |
| lsm_tree::{ |
| layers_from_handles, |
| merge::Merger, |
| skip_list_layer::SkipListLayer, |
| types::{ |
| BoxedLayerIterator, Item, ItemRef, Layer, LayerIterator, MutableLayer, NextKey, |
| OrdLowerBound, OrdUpperBound, RangeKey, |
| }, |
| LSMTree, LayerSet, |
| }, |
| metrics::{traits::Metric as _, UintMetric}, |
| object_handle::{ObjectHandle, ObjectHandleExt, INVALID_OBJECT_ID}, |
| object_store::{ |
| object_manager::ReservationUpdate, |
| store_object_handle::DirectWriter, |
| transaction::{AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction}, |
| tree, CachingObjectHandle, HandleOptions, ObjectStore, |
| }, |
| range::RangeExt, |
| round::round_down, |
| serialized_types::{ |
| Version, Versioned, VersionedLatest, DEFAULT_MAX_SERIALIZED_RECORD_SIZE, |
| }, |
| trace_duration, |
| }, |
| anyhow::{anyhow, bail, ensure, Error}, |
| async_trait::async_trait, |
| either::Either::{Left, Right}, |
| merge::{filter_marked_for_deletion, filter_tombstones, merge}, |
| serde::{Deserialize, Serialize}, |
| std::{ |
| any::Any, |
| borrow::Borrow, |
| cmp::min, |
| collections::{BTreeMap, HashSet, VecDeque}, |
| convert::TryInto, |
| marker::PhantomData, |
| ops::{Bound, Range}, |
| sync::{Arc, Mutex, Weak}, |
| }, |
| }; |
| |
| /// Allocators must implement this. An allocator is responsible for allocating ranges on behalf of |
| /// an object-store. |
| #[async_trait] |
| pub trait Allocator: ReservationOwner { |
| /// Returns the object ID for the allocator. |
| fn object_id(&self) -> u64; |
| |
| /// Returns information about the allocator. |
| // Aside: This breaks encapsulation, but we probably won't have more than one allocator, so it |
| // seems OK. |
| fn info(&self) -> AllocatorInfo; |
| |
| /// Tries to allocate enough space for |object_range| in the specified object and returns the |
| /// device range allocated. |
| /// The allocated range may be short (e.g. due to fragmentation), in which case the caller can |
| /// simply call allocate again until they have enough blocks. |
| /// |
| /// We also store the object store ID of the store that the allocation should be assigned to so |
| /// that we have a means to delete encrypted stores without needing the encryption key. |
| async fn allocate( |
| &self, |
| transaction: &mut Transaction<'_>, |
| owner_object_id: u64, |
| len: u64, |
| ) -> Result<Range<u64>, Error>; |
| |
| /// Deallocates the given device range for the specified object. |
| async fn deallocate( |
| &self, |
| transaction: &mut Transaction<'_>, |
| owner_object_id: u64, |
| device_range: Range<u64>, |
| ) -> Result<u64, Error>; |
| |
| /// Marks the given device range as allocated. The main use case for this at this time is for |
| /// the super-block which needs to be at a fixed location on the device. |
| async fn mark_allocated( |
| &self, |
| transaction: &mut Transaction<'_>, |
| owner_object_id: u64, |
| device_range: Range<u64>, |
| ) -> Result<(), Error>; |
| |
| /// Marks allocations associated with a given |owner_object_id| for deletion. |
| /// Does not necessarily perform the deletion stratight away but if this is the case, |
| /// implementation should be invisible to the caller. |
| async fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64); |
| |
| /// Cast to super-trait. |
| fn as_journaling_object(self: Arc<Self>) -> Arc<dyn JournalingObject>; |
| |
| fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>; |
| |
| /// Called when the device has been flush and indicates what the journal log offset was when |
| /// that happened. |
| async fn did_flush_device(&self, flush_log_offset: u64); |
| |
| /// Returns a reservation that can be used later, or None if there is insufficient space. |
| fn reserve(self: Arc<Self>, amount: u64) -> Option<Reservation>; |
| |
| /// Like reserve, but returns as much as available if not all of amount is available, which |
| /// could be zero bytes. |
| fn reserve_at_most(self: Arc<Self>, amount: u64) -> Reservation; |
| |
| /// Returns the total number of allocated bytes. |
| fn get_allocated_bytes(&self) -> u64; |
| |
| /// Returns the total number of allocated bytes per owner_object_id. |
| /// Note that this is quite an expensive operation as it copies the collection. |
| /// This is intended for use in fsck() and friends, not general use code. |
| fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, i64>; |
| |
| /// Returns the number of allocated and reserved bytes. |
| fn get_used_bytes(&self) -> u64; |
| } |
| |
| /// This trait is implemented by things that own reservations. |
| pub trait ReservationOwner: Send + Sync { |
| fn release_reservation(&self, amount: u64); |
| } |
| |
| /// A reservation guarantees that when it comes time to actually allocate, it will not fail due to |
| /// lack of space. Sub-reservations (a.k.a. holds) are possible which effectively allows part of a |
| /// reservation to be set aside until it's time to commit. Reservations do offer some |
| /// thread-safety, but some responsibility is born by the caller: e.g. calling `forget` and |
| /// `reserve` at the same time from different threads is unsafe. |
| pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> { |
| owner: T, |
| inner: Mutex<ReservationInner>, |
| phantom: PhantomData<U>, |
| } |
| |
| #[derive(Debug, Default)] |
| struct ReservationInner { |
| // Amount currently held by this reservation. |
| amount: u64, |
| |
| // Amount reserved by sub-reservations. |
| reserved: u64, |
| } |
| |
| impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| self.inner.lock().unwrap().fmt(f) |
| } |
| } |
| |
| impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> { |
| pub fn new(owner: T, amount: u64) -> Self { |
| Self { |
| owner, |
| inner: Mutex::new(ReservationInner { amount, reserved: 0 }), |
| phantom: PhantomData, |
| } |
| } |
| |
| /// Returns the total amount of the reservation, not accounting for anything that might be held. |
| pub fn amount(&self) -> u64 { |
| self.inner.lock().unwrap().amount |
| } |
| |
| /// Returns the amount available after accounting for space that is reserved. |
| pub fn avail(&self) -> u64 { |
| let inner = self.inner.lock().unwrap(); |
| inner.amount - inner.reserved |
| } |
| |
| /// Adds more to the reservation. |
| pub fn add(&self, amount: u64) { |
| self.inner.lock().unwrap().amount += amount; |
| } |
| |
| /// Returns the entire amount of the reservation. The caller is responsible for maintaining |
| /// consistency, i.e. updating counters, etc, and there can be no sub-reservations (an assert |
| /// will fire otherwise). |
| pub fn forget(&self) -> u64 { |
| let mut inner = self.inner.lock().unwrap(); |
| assert_eq!(inner.reserved, 0); |
| std::mem::take(&mut inner.amount) |
| } |
| |
| /// Takes some of the reservation. The caller is responsible for maintaining consistency, |
| /// i.e. updating counters, etc. This will assert that the amount being forgotten does not |
| /// exceed the available reservation amount; the caller should ensure that this is the case. |
| pub fn forget_some(&self, amount: u64) { |
| let mut inner = self.inner.lock().unwrap(); |
| inner.amount -= amount; |
| assert!(inner.reserved <= inner.amount); |
| } |
| |
| /// Returns a partial amount of the reservation. If the reservation is smaller than |amount|, |
| /// returns less than the requested amount, and this can be *zero*. |
| fn reserve_at_most(&self, amount: u64) -> ReservationImpl<&Self, Self> { |
| let mut inner = self.inner.lock().unwrap(); |
| let taken = std::cmp::min(amount, inner.amount - inner.reserved); |
| inner.reserved += taken; |
| ReservationImpl::new(self, taken) |
| } |
| |
| /// Reserves *exactly* amount if possible. |
| pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> { |
| let mut inner = self.inner.lock().unwrap(); |
| if inner.amount - inner.reserved < amount { |
| None |
| } else { |
| inner.reserved += amount; |
| Some(ReservationImpl::new(self, amount)) |
| } |
| } |
| |
| /// Commits a previously reserved amount from this reservation. The caller is responsible for |
| /// ensuring the amount was reserved. |
| pub fn commit(&self, amount: u64) { |
| let mut inner = self.inner.lock().unwrap(); |
| inner.reserved -= amount; |
| inner.amount -= amount; |
| } |
| |
| /// Returns the entire amount of the reservation. |
| pub fn take(&self) -> Self { |
| let mut inner = self.inner.lock().unwrap(); |
| assert_eq!(inner.reserved, 0); |
| Self::new(self.owner.clone(), std::mem::take(&mut inner.amount)) |
| } |
| |
| /// Returns some of the reservation. |
| pub fn give_back(&self, amount: u64) { |
| self.owner.borrow().release_reservation(amount); |
| let mut inner = self.inner.lock().unwrap(); |
| inner.amount -= amount; |
| assert!(inner.reserved <= inner.amount); |
| } |
| |
| /// Moves `amount` from this reservation to another reservation. |
| pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>( |
| &self, |
| other: &ReservationImpl<V, W>, |
| amount: u64, |
| ) { |
| self.inner.lock().unwrap().amount -= amount; |
| other.add(amount); |
| } |
| } |
| |
| impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> { |
| fn drop(&mut self) { |
| let inner = self.inner.get_mut().unwrap(); |
| assert_eq!(inner.reserved, 0); |
| if inner.amount > 0 { |
| self.owner.borrow().release_reservation(std::mem::take(&mut inner.amount)); |
| } |
| } |
| } |
| |
| impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner |
| for ReservationImpl<T, U> |
| { |
| fn release_reservation(&self, amount: u64) { |
| let mut inner = self.inner.lock().unwrap(); |
| inner.reserved -= amount; |
| } |
| } |
| |
| pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>; |
| |
| pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>; |
| |
| // Our allocator implementation tracks extents with a reference count. At time of writing, these |
| // reference counts should never exceed 1, but that might change with snapshots and clones. |
| |
| #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, Versioned)] |
| pub struct AllocatorKey { |
| pub device_range: Range<u64>, |
| } |
| |
| impl AllocatorKey { |
| /// Returns a new key that is a lower bound suitable for use with merge_into. |
| pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey { |
| AllocatorKey { device_range: 0..self.device_range.start } |
| } |
| } |
| |
| impl NextKey for AllocatorKey {} |
| |
| impl OrdUpperBound for AllocatorKey { |
| fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering { |
| self.device_range.end.cmp(&other.device_range.end) |
| } |
| } |
| |
| impl OrdLowerBound for AllocatorKey { |
| fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering { |
| // The ordering over range.end is significant here as it is used in |
| // the heap ordering that feeds into our merge function and |
| // a total ordering over range lets us remove a symmetry case from |
| // the allocator merge function. |
| self.device_range |
| .start |
| .cmp(&other.device_range.start) |
| .then(self.device_range.end.cmp(&other.device_range.end)) |
| } |
| } |
| |
| impl Ord for AllocatorKey { |
| fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering { |
| self.device_range |
| .start |
| .cmp(&other.device_range.start) |
| .then(self.device_range.end.cmp(&other.device_range.end)) |
| } |
| } |
| |
| impl PartialOrd for AllocatorKey { |
| fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> { |
| Some(self.cmp(other)) |
| } |
| } |
| |
| impl RangeKey for AllocatorKey { |
| fn overlaps(&self, other: &Self) -> bool { |
| self.device_range.start < other.device_range.end |
| && self.device_range.end > other.device_range.start |
| } |
| } |
| |
| /// Allocations are "owned" by a single ObjectStore and are reference counted |
| /// (for future snapshot/clone support). |
| #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, Versioned)] |
| pub enum AllocatorValue { |
| // Tombstone variant indicating an extent is no longer allocated. |
| None, |
| // Used when we know there are no possible allocations below us in the stack. |
| // This is currently all the time. We used to have a related Delta type but |
| // it has been removed due to correctness issues (https://fxbug.dev/97223). |
| Abs { count: u64, owner_object_id: u64 }, |
| } |
| |
| pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>; |
| |
| #[derive(Debug, Default, Clone, Deserialize, Serialize, Versioned)] |
| pub struct AllocatorInfo { |
| /// Holds the set of layer file object_id for the LSM tree (newest first). |
| pub layers: Vec<u64>, |
| /// Maps from owner_object_id to bytes allocated. |
| pub allocated_bytes: BTreeMap<u64, u64>, |
| /// Set of owner_object_id that we should ignore if found in layer files. |
| pub marked_for_deletion: HashSet<u64>, |
| } |
| |
| const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072; |
| |
| /// Computes the target maximum extent size based on the block size of the allocator. |
| pub fn max_extent_size_for_block_size(block_size: u64) -> u64 { |
| // Each block in an extent contains an 8-byte checksum (which due to varint encoding is 9 |
| // bytes), and a given extent record must be no larger DEFAULT_MAX_SERIALIZED_RECORD_SIZE. We |
| // also need to leave a bit of room (arbitrarily, 64 bytes) for the rest of the extent's |
| // metadata. |
| block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9 |
| } |
| |
| struct SimpleAllocatorStats { |
| #[allow(dead_code)] |
| max_extent_size_bytes: UintMetric, |
| } |
| |
| impl SimpleAllocatorStats { |
| fn new(max_extent_size_bytes: u64) -> Self { |
| Self { |
| max_extent_size_bytes: UintMetric::new("max_extent_size_bytes", max_extent_size_bytes), |
| } |
| } |
| } |
| |
| // For now this just implements a simple strategy of returning the first gap it can find (no matter |
| // the size). This is a very naiive implementation. |
| pub struct SimpleAllocator { |
| filesystem: Weak<dyn Filesystem>, |
| block_size: u64, |
| device_size: u64, |
| object_id: u64, |
| max_extent_size_bytes: u64, |
| tree: LSMTree<AllocatorKey, AllocatorValue>, |
| reserved_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>, |
| inner: Mutex<Inner>, |
| allocation_mutex: futures::lock::Mutex<()>, |
| #[allow(dead_code)] |
| stats: SimpleAllocatorStats, |
| } |
| |
| struct Inner { |
| info: AllocatorInfo, |
| // The allocator can only be opened if there have been no allocations and it has not already |
| // been opened or initialized. |
| opened: bool, |
| // When a transaction is dropped, we need to release the reservation, but that requires the use |
| // of async methods which we can't use when called from drop. To workaround that, we keep an |
| // array of dropped_allocations and update reserved_allocations the next time we try to |
| // allocate. |
| dropped_allocations: Vec<AllocatorItem>, |
| // This value is the up-to-date count of the number of allocated bytes per owner_object_id |
| // whereas the value in `info` is the value as it was when we last flushed. |
| // This is i64 because it can be negative during replay. |
| allocated_bytes: BTreeMap<u64, i64>, |
| // This value is the number of bytes allocated to uncommitted allocations. |
| uncommitted_allocated_bytes: u64, |
| // This value is the number of bytes allocated to reservations. |
| reserved_bytes: u64, |
| // Committed deallocations that we cannot use until they are flushed to the device. Each entry |
| // in this list is the log file offset at which it was committed and an array of deallocations |
| // that occurred at that time. |
| committed_deallocated: VecDeque<(u64, Range<u64>)>, |
| // The total number of committed deallocated bytes. |
| committed_deallocated_bytes: u64, |
| // A map of of |owner_object_id| to log offset and bytes allocated. |
| // Once the journal has been flushed beyond 'log_offset', we replace entries here with |
| // an entry in AllocatorInfo to have all iterators ignore owner_object_id. That entry is |
| // then cleaned up at next (major) compaction time. |
| committed_marked_for_deletion: BTreeMap<u64, (/*log_offset:*/ u64, /*bytes:*/ u64)>, |
| } |
| |
| impl Inner { |
| // Returns the amount that is not available to be allocated, which includes actually allocated |
| // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed |
| // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't |
| // reuse those bytes yet. |
| fn unavailable_bytes(&self) -> u64 { |
| self.allocated_bytes.values().sum::<i64>() as u64 |
| + self.uncommitted_allocated_bytes |
| + self.committed_deallocated_bytes |
| + self.committed_marked_for_deletion.values().map(|(_, x)| x).sum::<u64>() |
| } |
| |
| // Returns the total number of bytes that are taken either from reservations, allocations or |
| // uncommitted allocations. |
| fn taken_bytes(&self) -> u64 { |
| self.allocated_bytes.values().sum::<i64>() as u64 |
| + self.uncommitted_allocated_bytes |
| + self.reserved_bytes |
| } |
| } |
| |
| impl SimpleAllocator { |
| pub fn new(filesystem: Arc<dyn Filesystem>, object_id: u64) -> SimpleAllocator { |
| let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size()); |
| SimpleAllocator { |
| filesystem: Arc::downgrade(&filesystem), |
| block_size: filesystem.block_size(), |
| device_size: filesystem.device().size(), |
| object_id, |
| max_extent_size_bytes, |
| tree: LSMTree::new(merge), |
| reserved_allocations: SkipListLayer::new(1024), // TODO(fxbug.dev/95981): magic numbers |
| inner: Mutex::new(Inner { |
| info: AllocatorInfo::default(), |
| opened: false, |
| dropped_allocations: Vec::new(), |
| allocated_bytes: BTreeMap::new(), |
| uncommitted_allocated_bytes: 0, |
| reserved_bytes: 0, |
| committed_deallocated: VecDeque::new(), |
| committed_deallocated_bytes: 0, |
| committed_marked_for_deletion: BTreeMap::new(), |
| }), |
| allocation_mutex: futures::lock::Mutex::new(()), |
| stats: SimpleAllocatorStats::new(max_extent_size_bytes), |
| } |
| } |
| |
| pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> { |
| &self.tree |
| } |
| |
| /// Returns the layer set for all layers and reservations. |
| pub fn layer_set(&self) -> LayerSet<AllocatorKey, AllocatorValue> { |
| let tree = &self.tree; |
| let mut layer_set = tree.empty_layer_set(); |
| layer_set.layers.push((self.reserved_allocations.clone() as Arc<dyn Layer<_, _>>).into()); |
| tree.add_all_layers_to_layer_set(&mut layer_set); |
| layer_set |
| } |
| |
| /// Returns an iterator that yields all allocations, filtering out tombstones and any |
| /// owner_object_id that have been marked as deleted. |
| pub async fn iter<'a>( |
| &'a self, |
| merger: &'a mut Merger<'_, AllocatorKey, AllocatorValue>, |
| bound: Bound<&AllocatorKey>, |
| ) -> Result<Box<dyn LayerIterator<AllocatorKey, AllocatorValue> + 'a>, Error> { |
| let marked_for_deletion = self.inner.lock().unwrap().info.marked_for_deletion.clone(); |
| let iter = filter_marked_for_deletion( |
| filter_tombstones(Box::new(merger.seek(bound).await?)).await?, |
| marked_for_deletion, |
| ) |
| .await?; |
| Ok(iter) |
| } |
| |
| /// Creates a new (empty) allocator. |
| pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> { |
| // Mark the allocator as opened before creating the file because creating a new |
| // transaction requires a reservation. |
| assert_eq!(std::mem::replace(&mut self.inner.lock().unwrap().opened, true), false); |
| |
| let filesystem = self.filesystem.upgrade().unwrap(); |
| let root_store = filesystem.root_store(); |
| ObjectStore::create_object_with_id( |
| &root_store, |
| transaction, |
| self.object_id(), |
| HandleOptions::default(), |
| None, |
| ) |
| .await?; |
| Ok(()) |
| } |
| |
| // Ensures the allocator is open. If empty, create the object in the root object store, |
| // otherwise load and initialise the LSM tree. This is not thread-safe; this should be called |
| // after the journal has been replayed. |
| pub async fn open(&self) -> Result<(), Error> { |
| let filesystem = self.filesystem.upgrade().unwrap(); |
| let root_store = filesystem.root_store(); |
| |
| let handle = |
| ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None) |
| .await?; |
| |
| if handle.get_size() > 0 { |
| let serialized_info = handle.contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE).await?; |
| let mut cursor = std::io::Cursor::new(&serialized_info[..]); |
| let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)?; |
| let mut handles = Vec::new(); |
| let mut total_size = 0; |
| for object_id in &info.layers { |
| let handle = CachingObjectHandle::new( |
| ObjectStore::open_object( |
| &root_store, |
| *object_id, |
| HandleOptions::default(), |
| None, |
| ) |
| .await?, |
| ); |
| total_size += handle.get_size(); |
| handles.push(handle); |
| } |
| { |
| let mut inner = self.inner.lock().unwrap(); |
| // After replaying, allocated_bytes should include all the deltas since the time |
| // the allocator was last flushed, so here we just need to add whatever is |
| // recorded in info. |
| for (owner_object_id, bytes) in &info.allocated_bytes { |
| let amount: i64 = (*bytes).try_into().map_err(|_| { |
| anyhow!(FxfsError::Inconsistent).context("Allocated bytes inconsistent") |
| })?; |
| let entry = inner.allocated_bytes.entry(*owner_object_id).or_insert(0); |
| match entry.checked_add(amount) { |
| None => { |
| bail!(anyhow!(FxfsError::Inconsistent) |
| .context("Allocated bytes overflow")); |
| } |
| Some(value) if value < 0 || value as u64 > self.device_size => { |
| bail!(anyhow!(FxfsError::Inconsistent) |
| .context("Allocated bytes inconsistent")); |
| } |
| Some(value) => { |
| *entry = value; |
| } |
| }; |
| } |
| inner.info = info; |
| } |
| self.tree.append_layers(handles.into_boxed_slice()).await?; |
| self.filesystem.upgrade().unwrap().object_manager().update_reservation( |
| self.object_id, |
| tree::reservation_amount_from_layer_size(total_size), |
| ); |
| } |
| |
| assert_eq!(std::mem::replace(&mut self.inner.lock().unwrap().opened, true), false); |
| Ok(()) |
| } |
| |
| /// Returns all objects that exist in the parent store that pertain to this allocator. |
| pub fn parent_objects(&self) -> Vec<u64> { |
| // The allocator tree needs to store a file for each of the layers in the tree, so we return |
| // those, since nothing else references them. |
| self.inner.lock().unwrap().info.layers.clone() |
| } |
| |
| fn needs_sync(&self) -> bool { |
| // TODO(fxbug.dev/95982): This will only trigger if *all* free space is taken up with |
| // committed deallocated bytes, but we might want to trigger a sync if we're low and there |
| // happens to be a lot of deallocated bytes as that might mean we can fully satisfy |
| // allocation requests. |
| self.inner.lock().unwrap().unavailable_bytes() >= self.device_size |
| } |
| } |
| |
| impl Drop for SimpleAllocator { |
| fn drop(&mut self) { |
| let inner = self.inner.lock().unwrap(); |
| // Uncommitted and reserved should be released back using RAII, so they should be zero. |
| assert_eq!(inner.uncommitted_allocated_bytes, 0); |
| assert_eq!(inner.reserved_bytes, 0); |
| } |
| } |
| |
| #[async_trait] |
| impl Allocator for SimpleAllocator { |
| fn object_id(&self) -> u64 { |
| self.object_id |
| } |
| |
| fn info(&self) -> AllocatorInfo { |
| self.inner.lock().unwrap().info.clone() |
| } |
| |
| async fn allocate( |
| &self, |
| transaction: &mut Transaction<'_>, |
| owner_object_id: u64, |
| mut len: u64, |
| ) -> Result<Range<u64>, Error> { |
| assert_eq!(len % self.block_size, 0); |
| len = std::cmp::min(len, self.max_extent_size_bytes); |
| debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID); |
| |
| // Make sure we have space reserved before we try and find the space. |
| let reservation = if let Some(reservation) = transaction.allocator_reservation { |
| let r = reservation.reserve_at_most(len); |
| len = r.amount(); |
| Left(r) |
| } else { |
| let mut inner = self.inner.lock().unwrap(); |
| assert!(inner.opened); |
| // We must take care not to use up space that might be reserved. |
| len = round_down( |
| std::cmp::min(len, self.device_size - inner.taken_bytes()), |
| self.block_size, |
| ); |
| inner.reserved_bytes += len; |
| Right(ReservationImpl::<_, Self>::new(self, len)) |
| }; |
| |
| ensure!(len > 0, FxfsError::NoSpace); |
| |
| #[allow(clippy::never_loop)] // Loop used as a for {} else {}. |
| let _guard = 'sync: loop { |
| // Cap number of sync attempts before giving up on finding free space. |
| for _ in 0..10 { |
| { |
| let guard = self.allocation_mutex.lock().await; |
| |
| if !self.needs_sync() { |
| break 'sync guard; |
| } |
| } |
| |
| // All the free space is currently tied up with deallocations, so we need to sync |
| // and flush the device to free that up. |
| // |
| // We can't hold the allocation lock whilst we sync here because the allocation lock |
| // is also taken in apply_mutations, which is called when journal locks are held, |
| // and we call sync here which takes those same locks, so it would have the |
| // potential to result in a deadlock. Sync holds its own lock to guard against |
| // multiple syncs occurring at the same time, and we can supply a precondition that |
| // is evaluated under that lock to ensure we don't sync twice if we don't need to. |
| self.filesystem |
| .upgrade() |
| .unwrap() |
| .sync(SyncOptions { |
| flush_device: true, |
| precondition: Some(Box::new(|| self.needs_sync())), |
| ..Default::default() |
| }) |
| .await?; |
| } |
| bail!( |
| anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.") |
| ); |
| }; |
| |
| let dropped_allocations = |
| std::mem::take(&mut self.inner.lock().unwrap().dropped_allocations); |
| |
| // Update reserved_allocations using dropped_allocations. |
| for item in dropped_allocations { |
| self.reserved_allocations.erase(&item.key).await; |
| } |
| |
| let result = { |
| let layer_set = self.layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = self.iter(&mut merger, Bound::Unbounded).await?; |
| let mut last_offset = 0; |
| loop { |
| match iter.get() { |
| None => { |
| let end = std::cmp::min(last_offset + len, self.device_size); |
| if end <= last_offset { |
| // This is unexpected since we reserved space above. It would suggest |
| // that our counters are confused somehow. |
| bail!(anyhow!(FxfsError::NoSpace) |
| .context("Unexpectedly found no space after search")); |
| } |
| break last_offset..end; |
| } |
| Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => { |
| if device_range.start > last_offset { |
| break last_offset..min(last_offset + len, device_range.start); |
| } |
| last_offset = device_range.end; |
| } |
| } |
| iter.advance().await?; |
| } |
| }; |
| |
| debug!(device_range = ?result, "allocate"); |
| |
| let len = result.length().unwrap(); |
| reservation.either(|l| l.forget_some(len), |r| r.forget_some(len)); |
| |
| { |
| let mut inner = self.inner.lock().unwrap(); |
| inner.reserved_bytes -= len; |
| inner.uncommitted_allocated_bytes += len; |
| } |
| |
| let item = AllocatorItem::new( |
| AllocatorKey { device_range: result.clone() }, |
| AllocatorValue::Abs { count: 1, owner_object_id }, |
| ); |
| let mutation = |
| AllocatorMutation::Allocate { device_range: result.clone(), owner_object_id }; |
| self.reserved_allocations.insert(item).await; |
| assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none()); |
| |
| Ok(result) |
| } |
| |
| async fn mark_allocated( |
| &self, |
| transaction: &mut Transaction<'_>, |
| owner_object_id: u64, |
| device_range: Range<u64>, |
| ) -> Result<(), Error> { |
| debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID); |
| { |
| let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?; |
| |
| let mut inner = self.inner.lock().unwrap(); |
| ensure!( |
| device_range.end <= self.device_size |
| && self.device_size - inner.taken_bytes() >= len, |
| FxfsError::NoSpace |
| ); |
| if let Some(reservation) = &mut transaction.allocator_reservation { |
| // The transaction takes ownership of this hold. |
| reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget(); |
| } |
| inner.uncommitted_allocated_bytes += len; |
| } |
| let item = AllocatorItem::new( |
| AllocatorKey { device_range: device_range.clone() }, |
| AllocatorValue::Abs { count: 1, owner_object_id }, |
| ); |
| let mutation = AllocatorMutation::Allocate { device_range, owner_object_id }; |
| self.reserved_allocations.insert(item).await; |
| transaction.add(self.object_id(), Mutation::Allocator(mutation)); |
| Ok(()) |
| } |
| |
| async fn deallocate( |
| &self, |
| transaction: &mut Transaction<'_>, |
| owner_object_id: u64, |
| mut dealloc_range: Range<u64>, |
| ) -> Result<u64, Error> { |
| debug!(device_range = ?dealloc_range, "deallocate"); |
| trace_duration!("SimpleAllocator::deallocate"); |
| |
| ensure!(dealloc_range.valid(), FxfsError::InvalidArgs); |
| |
| let layer_set = self.tree.layer_set(); |
| let mut merger = layer_set.merger(); |
| // The precise search key that we choose here is important. We need to perform a full merge |
| // across all layers because we want the precise value of delta, so we must ensure that we |
| // query all layers, which is done by setting the lower bound to zero (the merger consults |
| // iterators until it encounters a key whose lower-bound is not greater than the search |
| // key). The upper bound is used to search each individual layer, and we want to start with |
| // an extent that covers the first byte of the range we're deallocating. |
| let mut iter = self |
| .iter( |
| &mut merger, |
| Bound::Included(&AllocatorKey { device_range: 0..dealloc_range.start + 1 }), |
| ) |
| .await?; |
| let mut deallocated = 0; |
| let mut mutation = None; |
| while let Some(ItemRef { key: AllocatorKey { device_range, .. }, value, .. }) = iter.get() { |
| if device_range.start > dealloc_range.start { |
| // We expect the entire range to be allocated. |
| bail!(anyhow!(FxfsError::Inconsistent) |
| .context("Attempt to deallocate unallocated range")); |
| } |
| let end = std::cmp::min(device_range.end, dealloc_range.end); |
| if let AllocatorValue::Abs { count: 1, owner_object_id: store_object_id } = value { |
| debug_assert_eq!(owner_object_id, *store_object_id); |
| match &mut mutation { |
| None => { |
| mutation = Some(AllocatorMutation::Deallocate { |
| device_range: dealloc_range.start..end, |
| owner_object_id, |
| }); |
| } |
| Some(AllocatorMutation::Deallocate { device_range, .. }) => { |
| device_range.end = end; |
| } |
| _ => unreachable!(), |
| } |
| deallocated += end - dealloc_range.start; |
| } else { |
| panic!("Unexpected AllocatorValue variant: {:?}", value); |
| } |
| if end == dealloc_range.end { |
| break; |
| } |
| dealloc_range.start = end; |
| iter.advance().await?; |
| } |
| if let Some(mutation) = mutation { |
| transaction.add(self.object_id(), Mutation::Allocator(mutation)); |
| } |
| Ok(deallocated) |
| } |
| |
| /// This is used as part of deleting encrypted volumes (ObjectStore) without having the keys. |
| /// |
| /// MarkForDeletion mutations eventually manipulates allocator metadata (AllocatorInfo) instead |
| /// of the mutable layer but we must be careful not to do this too early and risk premature |
| /// reuse of extents. |
| /// |
| /// Applying the mutation moves byte count for the owner_object_id from 'allocated_bytes' to |
| /// 'committed_marked_for_deletion'. |
| /// |
| /// Replay is not guaranteed until the *device* gets flushed, so we cannot reuse the deleted |
| /// extents until we receive a `did_flush_device` callback. |
| /// |
| /// At this point, the mutation is guaranteed so the 'committed_marked_for_deletion' entry is |
| /// removed and the owner_object_id is added to the 'marked_for_deletion' set. This set |
| /// of owner_object_id are filtered out of all iterators used by the allocator. |
| /// |
| /// After an allocator.flush() (i.e. a major compaction), we know that there is no data left |
| /// in the layer files for this owner_object_id and we are able to clear `marked_for_deletion`. |
| async fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) { |
| // Note that because the actual time of deletion (the next major compaction) is undefined, |
| // |owner_object_id| should not be reused after this call. |
| transaction.add( |
| self.object_id(), |
| Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)), |
| ); |
| } |
| |
| fn as_journaling_object(self: Arc<Self>) -> Arc<dyn JournalingObject> { |
| self |
| } |
| |
| fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> { |
| self |
| } |
| |
| async fn did_flush_device(&self, flush_log_offset: u64) { |
| let mut total = 0; |
| |
| // First take out the deallocations that we now know to be flushed. The list is maintained |
| // in order, so we can stop on the first entry that we find that should not be unreserved |
| // yet. |
| #[allow(clippy::never_loop)] // Loop used as a for {} else {}. |
| let deallocs = 'deallocs_outer: loop { |
| let mut inner = self.inner.lock().unwrap(); |
| for (index, (dealloc_log_offset, _)) in inner.committed_deallocated.iter().enumerate() { |
| if *dealloc_log_offset >= flush_log_offset { |
| let mut deallocs = inner.committed_deallocated.split_off(index); |
| // Swap because we want the opposite of what split_off does. |
| std::mem::swap(&mut inner.committed_deallocated, &mut deallocs); |
| break 'deallocs_outer deallocs; |
| } |
| } |
| break std::mem::take(&mut inner.committed_deallocated); |
| }; |
| // Now we can erase those elements from reserved_allocations (whilst we're not holding the |
| // lock on inner). |
| for (_, device_range) in deallocs { |
| total += device_range.length().unwrap(); |
| self.reserved_allocations.erase(&AllocatorKey { device_range }).await; |
| } |
| |
| let mut inner = self.inner.lock().unwrap(); |
| // This *must* come after we've removed the records from reserved reservations because the |
| // allocator uses this value to decide whether or not a device-flush is required and it must |
| // be possible to find free space if it thinks no device-flush is required. |
| inner.committed_deallocated_bytes -= total; |
| |
| // We can now reuse any marked_for_deletion extents that have been committed to journal. |
| let committed_marked_for_deletion = |
| std::mem::take(&mut inner.committed_marked_for_deletion); |
| for (owner_object_id, (log_offset, bytes)) in committed_marked_for_deletion { |
| if log_offset >= flush_log_offset { |
| inner.committed_marked_for_deletion.insert(owner_object_id, (log_offset, bytes)); |
| } else { |
| inner.info.marked_for_deletion.insert(owner_object_id); |
| } |
| } |
| } |
| |
| fn reserve(self: Arc<Self>, amount: u64) -> Option<Reservation> { |
| { |
| let mut inner = self.inner.lock().unwrap(); |
| if self.device_size - inner.taken_bytes() < amount { |
| return None; |
| } |
| inner.reserved_bytes += amount; |
| } |
| Some(Reservation::new(self, amount)) |
| } |
| |
| fn reserve_at_most(self: Arc<Self>, mut amount: u64) -> Reservation { |
| { |
| let mut inner = self.inner.lock().unwrap(); |
| amount = std::cmp::min(self.device_size - inner.taken_bytes(), amount); |
| inner.reserved_bytes += amount; |
| } |
| Reservation::new(self, amount) |
| } |
| |
| fn get_allocated_bytes(&self) -> u64 { |
| self.inner.lock().unwrap().allocated_bytes.values().sum::<i64>() as u64 |
| } |
| |
| fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, i64> { |
| self.inner.lock().unwrap().allocated_bytes.iter().map(|(k, v)| (*k, *v)).collect() |
| } |
| |
| fn get_used_bytes(&self) -> u64 { |
| let inner = self.inner.lock().unwrap(); |
| inner.allocated_bytes.values().sum::<i64>() as u64 + inner.reserved_bytes |
| } |
| } |
| |
| impl ReservationOwner for SimpleAllocator { |
| fn release_reservation(&self, amount: u64) { |
| let mut inner = self.inner.lock().unwrap(); |
| inner.reserved_bytes -= amount; |
| } |
| } |
| |
| #[async_trait] |
| impl JournalingObject for SimpleAllocator { |
| async fn apply_mutation( |
| &self, |
| mutation: Mutation, |
| context: &ApplyContext<'_, '_>, |
| _assoc_obj: AssocObj<'_>, |
| ) { |
| match mutation { |
| Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => { |
| let mut inner = self.inner.lock().unwrap(); |
| if let Some(bytes) = inner.allocated_bytes.remove(&owner_object_id) { |
| // If live, we haven't serialized this yet so we track the commitment in RAM. |
| // If we're replaying the journal, we know this is already on storage and |
| // MUST happen so we can update the StoreInfo (to be written at next allocator |
| // flush time). |
| if context.mode.is_replay() { |
| inner.info.marked_for_deletion.insert(owner_object_id); |
| } else { |
| inner.committed_marked_for_deletion.insert( |
| owner_object_id, |
| (context.checkpoint.file_offset, bytes as u64), |
| ); |
| } |
| } |
| } |
| Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => { |
| let item = AllocatorItem { |
| key: AllocatorKey { device_range }, |
| value: AllocatorValue::Abs { count: 1, owner_object_id }, |
| sequence: context.checkpoint.file_offset, |
| }; |
| // We currently rely on barriers here between inserting/removing from reserved |
| // allocations and merging into the tree. These barriers are present whilst we use |
| // skip_list_layer's commit_and_wait method, rather than just commit. |
| let len = item.key.device_range.length().unwrap(); |
| let lower_bound = item.key.lower_bound_for_merge_into(); |
| self.tree.merge_into(item.clone(), &lower_bound).await; |
| if context.mode.is_live() { |
| self.reserved_allocations.erase(&item.key).await; |
| } |
| let mut inner = self.inner.lock().unwrap(); |
| let entry = inner.allocated_bytes.entry(owner_object_id).or_insert(0); |
| *entry = entry.saturating_add(len as i64); |
| if let ApplyMode::Live(transaction) = context.mode { |
| inner.uncommitted_allocated_bytes -= len; |
| if let Some(reservation) = transaction.allocator_reservation { |
| reservation.commit(len); |
| } |
| } |
| } |
| Mutation::Allocator(AllocatorMutation::Deallocate { |
| device_range, |
| owner_object_id, |
| }) => { |
| let item = AllocatorItem { |
| key: AllocatorKey { device_range }, |
| value: AllocatorValue::None, |
| sequence: context.checkpoint.file_offset, |
| }; |
| // We currently rely on barriers here between inserting/removing from reserved |
| // allocations and merging into the tree. These barriers are present whilst we use |
| // skip_list_layer's commit_and_wait method, rather than just commit. |
| let len = item.key.device_range.length().unwrap(); |
| if context.mode.is_live() { |
| let mut item = item.clone(); |
| // Note that the point of this reservation is to avoid premature reuse. |
| item.value = AllocatorValue::Abs { count: 1, owner_object_id }; |
| self.reserved_allocations.insert(item).await; |
| } |
| |
| { |
| let mut inner = self.inner.lock().unwrap(); |
| let entry = inner.allocated_bytes.entry(owner_object_id).or_insert(0); |
| *entry = entry.saturating_sub(len as i64); |
| if context.mode.is_live() { |
| inner.committed_deallocated.push_back(( |
| context.checkpoint.file_offset, |
| item.key.device_range.clone(), |
| )); |
| inner.committed_deallocated_bytes += |
| item.key.device_range.length().unwrap(); |
| } |
| if let ApplyMode::Live(Transaction { |
| allocator_reservation: Some(reservation), |
| .. |
| }) = context.mode |
| { |
| inner.reserved_bytes += len; |
| reservation.add(len); |
| } |
| } |
| let lower_bound = item.key.lower_bound_for_merge_into(); |
| self.tree.merge_into(item, &lower_bound).await; |
| } |
| Mutation::BeginFlush => { |
| { |
| // After we seal the tree, we will start adding mutations to the new mutable |
| // layer, but we cannot safely do that whilst we are attempting to allocate |
| // because there is a chance it might miss an allocation and also not see the |
| // allocation in reserved_allocations. |
| let _guard = debug_assert_not_too_long!(self.allocation_mutex.lock()); |
| self.tree.seal().await; |
| } |
| // Transfer our running count for allocated_bytes so that it gets written to the new |
| // info file when flush completes. |
| let mut inner = self.inner.lock().unwrap(); |
| let allocated_bytes = |
| inner.allocated_bytes.iter().map(|(k, v)| (*k, *v as u64)).collect(); |
| inner.info.allocated_bytes = allocated_bytes; |
| } |
| Mutation::EndFlush => { |
| if context.mode.is_replay() { |
| self.tree.reset_immutable_layers(); |
| // AllocatorInfo is written in the same transaction and will contain the count |
| // at the point BeginFlush was applied, so we need to adjust allocated_bytes so |
| // that it just covers the delta from that point. Later, when we properly open |
| // the allocator, we'll add this back. |
| let mut inner = self.inner.lock().unwrap(); |
| //let allocated_bytes = inner.info.allocated_bytes; |
| let allocated_bytes: Vec<(u64, i64)> = |
| inner.info.allocated_bytes.iter().map(|(k, v)| (*k, *v as i64)).collect(); |
| for (k, v) in allocated_bytes { |
| *inner.allocated_bytes.entry(k).or_insert(0) -= v as i64; |
| } |
| } |
| } |
| // TODO(fxbug.dev/95979): ideally, we'd return an error here instead. This should only |
| // be possible with a bad mutation during replay. |
| _ => panic!("unexpected mutation! {:?}", mutation), |
| } |
| } |
| |
| fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) { |
| match mutation { |
| Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => { |
| let mut inner = self.inner.lock().unwrap(); |
| let len = device_range.length().unwrap(); |
| inner.uncommitted_allocated_bytes -= len; |
| if let Some(reservation) = transaction.allocator_reservation { |
| reservation.release_reservation(len); |
| inner.reserved_bytes += len; |
| } |
| let item = AllocatorItem::new( |
| AllocatorKey { device_range }, |
| AllocatorValue::Abs { count: 1, owner_object_id }, |
| ); |
| inner.dropped_allocations.push(item); |
| } |
| _ => {} |
| } |
| } |
| |
| async fn flush(&self) -> Result<Version, Error> { |
| let filesystem = self.filesystem.upgrade().unwrap(); |
| let object_manager = filesystem.object_manager(); |
| if !object_manager.needs_flush(self.object_id()) { |
| // Early exit, but still return the earliest version used by a struct in the tree |
| return Ok(self.tree.get_earliest_version()); |
| } |
| |
| let keys = [LockKey::flush(self.object_id())]; |
| let _guard = debug_assert_not_too_long!(filesystem.write_lock(&keys)); |
| |
| let reservation = object_manager.metadata_reservation(); |
| let txn_options = Options { |
| skip_journal_checks: true, |
| borrow_metadata_space: true, |
| allocator_reservation: Some(reservation), |
| ..Default::default() |
| }; |
| let mut transaction = filesystem.clone().new_transaction(&[], txn_options).await?; |
| |
| let root_store = self.filesystem.upgrade().unwrap().root_store(); |
| let layer_object_handle = ObjectStore::create_object( |
| &root_store, |
| &mut transaction, |
| HandleOptions { skip_journal_checks: true, ..Default::default() }, |
| None, |
| ) |
| .await?; |
| let object_id = layer_object_handle.object_id(); |
| root_store.add_to_graveyard(&mut transaction, object_id); |
| // It's important that this transaction does not include any allocations because we use |
| // BeginFlush as a snapshot point for mutations to the tree: other allocator mutations |
| // within this transaction might get applied before seal (which would be OK), but they could |
| // equally get applied afterwards (since Transaction makes no guarantees about the order in |
| // which mutations are applied whilst committing), in which case they'd get lost on replay |
| // because the journal will only send mutations that follow this transaction. |
| transaction.add(self.object_id(), Mutation::BeginFlush); |
| transaction.commit().await?; |
| |
| let layer_set = self.tree.immutable_layer_set(); |
| { |
| let mut merger = layer_set.merger(); |
| let iter = self.iter(&mut merger, Bound::Unbounded).await?; |
| let iter = CoalescingIterator::new(iter).await?; |
| self.tree |
| .compact_with_iterator( |
| iter, |
| DirectWriter::new(&layer_object_handle, txn_options), |
| layer_object_handle.block_size(), |
| ) |
| .await?; |
| } |
| |
| debug!(oid = object_id, "new allocator layer file"); |
| let object_handle = |
| ObjectStore::open_object(&root_store, self.object_id(), HandleOptions::default(), None) |
| .await?; |
| |
| let reservation_update; |
| let mut transaction = filesystem.clone().new_transaction(&[], txn_options).await?; |
| let mut serialized_info = Vec::new(); |
| |
| // We must be careful to take a copy AllocatorInfo here rather than manipulate the |
| // live one. If we remove marked_for_deletion entries prematurely, we may fail any |
| // allocate() calls that are performed before the new version makes it to disk. |
| // Specifically, txn_write() below must allocate space and may fail if we prematurely |
| // clear marked_for_deletion. |
| let new_info = { |
| let mut info = self.inner.lock().unwrap().info.clone(); |
| |
| // After compaction, all new layers have marked_for_deletion objects removed. |
| info.marked_for_deletion.clear(); |
| |
| // Move all the existing layers to the graveyard. |
| for object_id in &info.layers { |
| root_store.add_to_graveyard(&mut transaction, *object_id); |
| } |
| |
| info.layers = vec![object_id]; |
| info |
| }; |
| new_info.serialize_with_version(&mut serialized_info)?; |
| |
| let mut buf = object_handle.allocate_buffer(serialized_info.len()); |
| buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]); |
| object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?; |
| |
| reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size( |
| layer_object_handle.get_size(), |
| )); |
| |
| // It's important that EndFlush is in the same transaction that we write AllocatorInfo, |
| // because we use EndFlush to make the required adjustments to allocated_bytes. |
| transaction.add_with_object( |
| self.object_id(), |
| Mutation::EndFlush, |
| AssocObj::Borrowed(&reservation_update), |
| ); |
| root_store.remove_from_graveyard(&mut transaction, object_id); |
| |
| let layers = |
| layers_from_handles(Box::new([CachingObjectHandle::new(layer_object_handle)])).await?; |
| transaction.commit_with_callback(|_| self.tree.set_layers(layers)).await?; |
| |
| // At this point we've committed the new layers to disk so we can start using them. |
| // This means we can also switch to the new AllocatorInfo which clears marked_for_deletion. |
| self.inner.lock().unwrap().info = new_info; |
| |
| // Now close the layers and purge them. |
| for layer in layer_set.layers { |
| let object_id = layer.handle().map(|h| h.object_id()); |
| layer.close_layer().await; |
| if let Some(object_id) = object_id { |
| root_store.tombstone(object_id, txn_options).await?; |
| } |
| } |
| |
| // Return the earliest version used by a struct in the tree |
| Ok(self.tree.get_earliest_version()) |
| } |
| } |
| |
| // The merger is unable to merge extents that exist like the following: |
| // |
| // |----- +1 -----| |
| // |----- -1 -----| |
| // |----- +2 -----| |
| // |
| // It cannot coalesce them because it has to emit the +1 record so that it can move on and merge the |
| // -1 and +2 records. To address this, we add another stage that applies after merging which |
| // coalesces records after they have been emitted. This is a bit simpler than merging because the |
| // records cannot overlap, so it's just a question of merging adjacent records if they happen to |
| // have the same delta and object_id. |
| |
| pub struct CoalescingIterator<'a> { |
| iter: BoxedLayerIterator<'a, AllocatorKey, AllocatorValue>, |
| item: Option<AllocatorItem>, |
| } |
| |
| impl<'a> CoalescingIterator<'a> { |
| pub async fn new( |
| iter: BoxedLayerIterator<'a, AllocatorKey, AllocatorValue>, |
| ) -> Result<CoalescingIterator<'a>, Error> { |
| let mut iter = Self { iter, item: None }; |
| iter.advance().await?; |
| Ok(iter) |
| } |
| } |
| |
| #[async_trait] |
| impl LayerIterator<AllocatorKey, AllocatorValue> for CoalescingIterator<'_> { |
| async fn advance(&mut self) -> Result<(), Error> { |
| self.item = self.iter.get().map(|x| x.cloned()); |
| if self.item.is_none() { |
| return Ok(()); |
| } |
| let left = self.item.as_mut().unwrap(); |
| loop { |
| self.iter.advance().await?; |
| match self.iter.get() { |
| None => return Ok(()), |
| Some(right) => { |
| // The two records cannot overlap. |
| assert!(left.key.device_range.end <= right.key.device_range.start); |
| // We can only coalesce records if they are touching and have the same value. |
| if left.key.device_range.end < right.key.device_range.start |
| || left.value != *right.value |
| { |
| return Ok(()); |
| } |
| left.key.device_range.end = right.key.device_range.end; |
| } |
| } |
| } |
| } |
| |
| fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> { |
| self.item.as_ref().map(|x| x.as_item_ref()) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| crate::{ |
| filesystem::{Filesystem, JournalingObject}, |
| lsm_tree::{ |
| skip_list_layer::SkipListLayer, |
| types::{Item, ItemRef, Layer, LayerIterator, MutableLayer}, |
| LSMTree, |
| }, |
| object_store::{ |
| allocator::{ |
| merge::merge, Allocator, AllocatorKey, AllocatorValue, CoalescingIterator, |
| SimpleAllocator, |
| }, |
| testing::fake_filesystem::FakeFilesystem, |
| transaction::{Options, TransactionHandler}, |
| ObjectStore, |
| }, |
| range::RangeExt, |
| }, |
| fuchsia_async as fasync, |
| std::{ |
| cmp::{max, min}, |
| ops::{Bound, Range}, |
| sync::Arc, |
| }, |
| storage_device::{fake_device::FakeDevice, DeviceHolder}, |
| }; |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_coalescing_iterator() { |
| let skip_list = SkipListLayer::new(100); |
| let items = [ |
| Item::new( |
| AllocatorKey { device_range: 0..100 }, |
| AllocatorValue::Abs { count: 1, owner_object_id: 99 }, |
| ), |
| Item::new( |
| AllocatorKey { device_range: 100..200 }, |
| AllocatorValue::Abs { count: 1, owner_object_id: 99 }, |
| ), |
| ]; |
| skip_list.insert(items[1].clone()).await; |
| skip_list.insert(items[0].clone()).await; |
| let mut iter = |
| CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed")) |
| .await |
| .expect("new failed"); |
| let ItemRef { key, value, .. } = iter.get().expect("get failed"); |
| assert_eq!( |
| (key, value), |
| ( |
| &AllocatorKey { device_range: 0..200 }, |
| &AllocatorValue::Abs { count: 1, owner_object_id: 99 } |
| ) |
| ); |
| iter.advance().await.expect("advance failed"); |
| assert!(iter.get().is_none()); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_merge_and_coalesce_across_three_layers() { |
| let lsm_tree = LSMTree::new(merge); |
| lsm_tree |
| .insert(Item::new( |
| AllocatorKey { device_range: 100..200 }, |
| AllocatorValue::Abs { count: 1, owner_object_id: 99 }, |
| )) |
| .await; |
| lsm_tree.seal().await; |
| lsm_tree |
| .insert(Item::new( |
| AllocatorKey { device_range: 0..100 }, |
| AllocatorValue::Abs { count: 1, owner_object_id: 99 }, |
| )) |
| .await; |
| |
| let layer_set = lsm_tree.layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = CoalescingIterator::new(Box::new( |
| merger.seek(Bound::Unbounded).await.expect("seek failed"), |
| )) |
| .await |
| .expect("new failed"); |
| let ItemRef { key, value, .. } = iter.get().expect("get failed"); |
| assert_eq!( |
| (key, value), |
| ( |
| &AllocatorKey { device_range: 0..200 }, |
| &AllocatorValue::Abs { count: 1, owner_object_id: 99 } |
| ) |
| ); |
| iter.advance().await.expect("advance failed"); |
| assert!(iter.get().is_none()); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_merge_and_coalesce_wont_merge_across_object_id() { |
| let lsm_tree = LSMTree::new(merge); |
| lsm_tree |
| .insert(Item::new( |
| AllocatorKey { device_range: 100..200 }, |
| AllocatorValue::Abs { count: 1, owner_object_id: 99 }, |
| )) |
| .await; |
| lsm_tree.seal().await; |
| lsm_tree |
| .insert(Item::new( |
| AllocatorKey { device_range: 0..100 }, |
| AllocatorValue::Abs { count: 1, owner_object_id: 98 }, |
| )) |
| .await; |
| |
| let layer_set = lsm_tree.layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = CoalescingIterator::new(Box::new( |
| merger.seek(Bound::Unbounded).await.expect("seek failed"), |
| )) |
| .await |
| .expect("new failed"); |
| let ItemRef { key, value, .. } = iter.get().expect("get failed"); |
| assert_eq!( |
| (key, value), |
| ( |
| &AllocatorKey { device_range: 0..100 }, |
| &AllocatorValue::Abs { count: 1, owner_object_id: 98 }, |
| ) |
| ); |
| iter.advance().await.expect("advance failed"); |
| let ItemRef { key, value, .. } = iter.get().expect("get failed"); |
| assert_eq!( |
| (key, value), |
| ( |
| &AllocatorKey { device_range: 100..200 }, |
| &AllocatorValue::Abs { count: 1, owner_object_id: 99 } |
| ) |
| ); |
| iter.advance().await.expect("advance failed"); |
| assert!(iter.get().is_none()); |
| } |
| |
| fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 { |
| if a.end > b.start && a.start < b.end { |
| min(a.end, b.end) - max(a.start, b.start) |
| } else { |
| 0 |
| } |
| } |
| |
| async fn check_allocations(allocator: &SimpleAllocator, expected_allocations: &[Range<u64>]) { |
| let layer_set = allocator.tree.layer_set(); |
| let mut merger = layer_set.merger(); |
| let mut iter = allocator.iter(&mut merger, Bound::Unbounded).await.expect("build iterator"); |
| let mut found = 0; |
| while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() { |
| let mut l = device_range.length().expect("Invalid range"); |
| found += l; |
| // Make sure that the entire range we have found completely overlaps with all the |
| // allocations we expect to find. |
| for range in expected_allocations { |
| l -= overlap(range, device_range); |
| if l == 0 { |
| break; |
| } |
| } |
| assert_eq!(l, 0); |
| iter.advance().await.expect("advance failed"); |
| } |
| // Make sure the total we found adds up to what we expect. |
| assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>()); |
| } |
| |
| async fn test_fs() -> (Arc<FakeFilesystem>, Arc<SimpleAllocator>, Arc<ObjectStore>) { |
| let device = DeviceHolder::new(FakeDevice::new(4096, 4096)); |
| let fs = FakeFilesystem::new(device); |
| let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1)); |
| fs.object_manager().set_allocator(allocator.clone()); |
| let store = ObjectStore::new_empty(None, 2, fs.clone()); |
| store.set_graveyard_directory_object_id(store.maybe_get_next_object_id()); |
| fs.object_manager().set_root_store(store.clone()); |
| fs.object_manager().init_metadata_reservation(); |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| allocator.create(&mut transaction).await.expect("create failed"); |
| transaction.commit().await.expect("commit failed"); |
| (fs, allocator, store) |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_allocations() { |
| const STORE_OBJECT_ID: u64 = 99; |
| let (fs, allocator, _) = test_fs().await; |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| let mut device_ranges = Vec::new(); |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size()) |
| .await |
| .expect("allocate failed"), |
| ); |
| assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size()); |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size()) |
| .await |
| .expect("allocate failed"), |
| ); |
| assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size()); |
| assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0); |
| transaction.commit().await.expect("commit failed"); |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size()) |
| .await |
| .expect("allocate failed"), |
| ); |
| assert_eq!(device_ranges[2].length().unwrap(), fs.block_size()); |
| assert_eq!(overlap(&device_ranges[0], &device_ranges[2]), 0); |
| assert_eq!(overlap(&device_ranges[1], &device_ranges[2]), 0); |
| transaction.commit().await.expect("commit failed"); |
| |
| check_allocations(&allocator, &device_ranges).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_allocate_more_than_max_size() { |
| const STORE_OBJECT_ID: u64 = 99; |
| let (fs, allocator, _) = test_fs().await; |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| let mut device_ranges = Vec::new(); |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size()) |
| .await |
| .expect("allocate failed"), |
| ); |
| assert_eq!( |
| device_ranges.last().unwrap().length().expect("Invalid range"), |
| allocator.max_extent_size_bytes |
| ); |
| transaction.commit().await.expect("commit failed"); |
| |
| check_allocations(&allocator, &device_ranges).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_deallocations() { |
| const STORE_OBJECT_ID: u64 = 99; |
| let (fs, allocator, _) = test_fs().await; |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| let device_range1 = allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size()) |
| .await |
| .expect("allocate failed"); |
| assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size()); |
| transaction.commit().await.expect("commit failed"); |
| |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| allocator |
| .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1) |
| .await |
| .expect("deallocate failed"); |
| transaction.commit().await.expect("commit failed"); |
| |
| check_allocations(&allocator, &[]).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_mark_allocated() { |
| const STORE_OBJECT_ID: u64 = 99; |
| let (fs, allocator, _) = test_fs().await; |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| let mut device_ranges = Vec::new(); |
| device_ranges.push(0..fs.block_size()); |
| allocator |
| .mark_allocated( |
| &mut transaction, |
| STORE_OBJECT_ID, |
| device_ranges.last().unwrap().clone(), |
| ) |
| .await |
| .expect("mark_allocated failed"); |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size()) |
| .await |
| .expect("allocate failed"), |
| ); |
| assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size()); |
| assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0); |
| transaction.commit().await.expect("commit failed"); |
| |
| check_allocations(&allocator, &device_ranges).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_mark_for_deletion() { |
| const STORE_OBJECT_ID: u64 = 99; |
| let (fs, allocator, _) = test_fs().await; |
| |
| // Allocate some stuff. |
| assert_eq!(0, allocator.get_allocated_bytes()); |
| let mut device_ranges = Vec::new(); |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| // Note we have a cap on individual allocation length so we allocate over multiple mutation. |
| for _ in 0..15 { |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size()) |
| .await |
| .expect("allocate failed"), |
| ); |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size()) |
| .await |
| .expect("allocate2 failed"), |
| ); |
| } |
| transaction.commit().await.expect("commit failed"); |
| check_allocations(&allocator, &device_ranges).await; |
| |
| assert_eq!(fs.block_size() * 3000, allocator.get_allocated_bytes()); |
| |
| // Mark for deletion. |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID).await; |
| transaction.commit().await.expect("commit failed"); |
| |
| // Expect that allocated bytes is updated immediately but device ranges are still allocated. |
| assert_eq!(0, allocator.get_allocated_bytes()); |
| check_allocations(&allocator, &device_ranges).await; |
| |
| // Allocate more space than we have until we deallocate the mark_for_deletion space. |
| // This should force a flush on allocate(). (1500 * 3 > test_fs size of 4096 blocks). |
| device_ranges.clear(); |
| |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| let target_bytes = 1500 * fs.block_size(); |
| while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes { |
| let len = std::cmp::min( |
| target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(), |
| 100 * fs.block_size(), |
| ); |
| device_ranges.push( |
| allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"), |
| ); |
| } |
| transaction.commit().await.expect("commit failed"); |
| |
| // Have the deleted ranges cleaned up. |
| allocator.flush().await.expect("flush failed"); |
| |
| // The flush above seems to trigger an allocation for the allocator itself. |
| // We will just check that we have the right size for the owner we care about. |
| |
| assert_eq!(*allocator.get_owner_allocated_bytes().entry(99).or_default() as u64, 0); |
| assert_eq!( |
| *allocator.get_owner_allocated_bytes().entry(100).or_default() as u64, |
| 1500 * fs.block_size() |
| ); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_allocate_free_reallocate() { |
| const STORE_OBJECT_ID: u64 = 99; |
| let (fs, allocator, _) = test_fs().await; |
| |
| // Allocate some stuff. |
| let mut device_ranges = Vec::new(); |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| for _ in 0..30 { |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size()) |
| .await |
| .expect("allocate failed"), |
| ); |
| } |
| transaction.commit().await.expect("commit failed"); |
| |
| assert_eq!( |
| fs.block_size() * 3000, |
| *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default() as u64 |
| ); |
| |
| // Delete it all. |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| for range in std::mem::replace(&mut device_ranges, Vec::new()) { |
| allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc"); |
| } |
| transaction.commit().await.expect("commit failed"); |
| |
| assert_eq!( |
| 0, |
| *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default() as u64 |
| ); |
| |
| // Allocate some more stuff. Due to storage pressure, this requires us to flush device |
| // before reusing the above space |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| let target_len = 1500 * fs.block_size(); |
| while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len { |
| let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>(); |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, len) |
| .await |
| .expect("allocate failed"), |
| ); |
| } |
| transaction.commit().await.expect("commit failed"); |
| |
| assert_eq!( |
| fs.block_size() * 1500, |
| *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default() as u64 |
| ); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_flush() { |
| const STORE_OBJECT_ID: u64 = 99; |
| let (fs, allocator, _) = test_fs().await; |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| let mut device_ranges = Vec::new(); |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size()) |
| .await |
| .expect("allocate failed"), |
| ); |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size()) |
| .await |
| .expect("allocate failed"), |
| ); |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size()) |
| .await |
| .expect("allocate failed"), |
| ); |
| transaction.commit().await.expect("commit failed"); |
| |
| allocator.flush().await.expect("flush failed"); |
| |
| let allocator = Arc::new(SimpleAllocator::new(fs.clone(), 1)); |
| fs.object_manager().set_allocator(allocator.clone()); |
| allocator.open().await.expect("open failed"); |
| // When we flushed the allocator, it would have been written to the device somewhere but |
| // without a journal, we will be missing those records, so this next allocation will likely |
| // be on top of those objects. That won't matter for the purposes of this test, since we |
| // are not writing anything to these ranges. |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| device_ranges.push( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size()) |
| .await |
| .expect("allocate failed"), |
| ); |
| for r in &device_ranges[..3] { |
| assert_eq!(overlap(r, device_ranges.last().unwrap()), 0); |
| } |
| transaction.commit().await.expect("commit failed"); |
| check_allocations(&allocator, &device_ranges).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_dropped_transaction() { |
| const STORE_OBJECT_ID: u64 = 99; |
| let (fs, allocator, _) = test_fs().await; |
| let allocated_range = { |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size()) |
| .await |
| .expect("allocate failed") |
| }; |
| // After dropping the transaction and attempting to allocate again, we should end up with |
| // the same range because the reservation should have been released. |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| assert_eq!( |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size()) |
| .await |
| .expect("allocate failed"), |
| allocated_range |
| ); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_allocated_bytes() { |
| const STORE_OBJECT_ID: u64 = 99; |
| let (fs, allocator, _) = test_fs().await; |
| assert_eq!(allocator.get_allocated_bytes(), 0); |
| |
| // Verify allocated_bytes reflects allocation changes. |
| let allocated_bytes = fs.block_size(); |
| let allocated_range = { |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| let range = allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, allocated_bytes) |
| .await |
| .expect("allocate failed"); |
| transaction.commit().await.expect("commit failed"); |
| assert_eq!(allocator.get_allocated_bytes(), allocated_bytes); |
| range |
| }; |
| |
| { |
| let mut transaction = fs |
| .clone() |
| .new_transaction(&[], Options::default()) |
| .await |
| .expect("new_transaction failed"); |
| allocator |
| .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size()) |
| .await |
| .expect("allocate failed"); |
| |
| // Prior to commiiting, the count of allocated bytes shouldn't change. |
| assert_eq!(allocator.get_allocated_bytes(), allocated_bytes); |
| } |
| |
| // After dropping the prior transaction, the allocated bytes still shouldn't have changed. |
| assert_eq!(allocator.get_allocated_bytes(), allocated_bytes); |
| |
| // Verify allocated_bytes reflects deallocations. |
| let deallocate_range = allocated_range.start + 20..allocated_range.end - 20; |
| let mut transaction = |
| fs.clone().new_transaction(&[], Options::default()).await.expect("new failed"); |
| allocator |
| .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range) |
| .await |
| .expect("deallocate failed"); |
| |
| // Before committing, there should be no change. |
| assert_eq!(allocator.get_allocated_bytes(), allocated_bytes); |
| |
| transaction.commit().await.expect("commit failed"); |
| |
| // After committing, all but 40 bytes should remain allocated. |
| assert_eq!(allocator.get_allocated_bytes(), 40); |
| } |
| } |