blob: 4a2a15059ec73693dd6e2772231f1ae91917465e [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::{collections::vec_deque, time::Duration};
pub trait SizeOf {
/// Size in bytes of an object. Makes no distinction between stack and heap allocated memory.
fn size_of(&self) -> usize;
}
/// An implementor has some concept of the time at which it was created and knows how to retrieve
/// that time.
pub trait CreatedAt {
/// Return the `Duration` since the Unix Epoch that represents when `self` was created.
fn created_at(&self) -> Duration;
}
/// Mutable Iterator over the elements in a `BoundedQueue`
type IterMut<'a, T> = vec_deque::IterMut<'a, T>;
/// A `BoundedQueue` is a collection that holds elements with eviction minimums on the size (in
/// bytes) of the collection and how long it will hold elements. If both the `eviction_size_minimum`
/// and the `eviction_age_minimum` are exceeded, the oldest elements will be dropped until either
/// the size of the collection or the age of the oldest element no longer exceed the requested
/// minimum. `eviction_size_maximum` is the maximum possible size of all data stored in a
/// `BoundedQueue` regardless of age or number of elements stored. This value can never be
/// exceeded, even if this results in zero packets being stored. A `eviction_size_maximum` of
/// 0 indicates no maximum.
///
/// The `BoundedQueue` only supports inserting new elements to the end of the queue.
pub(crate) struct BoundedQueue<T> {
eviction_age_minimum: Duration,
eviction_size_minimum: usize,
eviction_size_maximum: usize,
current_size: usize,
inner: vec_deque::VecDeque<T>,
}
impl<T> BoundedQueue<T>
where
T: SizeOf + CreatedAt,
{
/// Create an empty `BoundedQueue` with the specified eviction minimums.
pub fn new(
eviction_size_minimum: usize,
eviction_size_maximum: usize,
eviction_age_minimum: Duration,
) -> BoundedQueue<T> {
BoundedQueue {
eviction_age_minimum,
eviction_size_minimum,
eviction_size_maximum,
current_size: 0,
inner: vec_deque::VecDeque::new(),
}
}
/// Calculate whether adding `item` would make this queue large enough to evict old packets.
fn eviction_size_minimum_reached(&self, item: &T) -> bool {
self.current_size + item.size_of() > self.eviction_size_minimum
}
/// Calculate whether adding `item` would make this queue large enough to force eviction of
/// old packets.
fn eviction_size_maximum_reached(&self, item: &T) -> bool {
if self.eviction_size_maximum == 0 {
false
} else {
self.current_size + item.size_of() > self.eviction_size_maximum
}
}
/// When `item` is added, returns true if the oldest item will be too old.
fn oldest_item_will_expire(&self, item: &T) -> bool {
if self.inner.is_empty() {
return false;
}
(item.created_at() - self.inner[0].created_at()) > self.eviction_age_minimum
}
/// Insert a new item to the end of the `BoundedQueue`, removing items as needed to make room.
pub fn insert(&mut self, item: T) {
// Keep removing items until the new one fits.
while self.eviction_size_maximum_reached(&item)
|| (self.eviction_size_minimum_reached(&item) && self.oldest_item_will_expire(&item))
{
// return early without insertion if the buffer is empty but the item still will not
// fit.
if self.inner.is_empty() {
return;
}
self.current_size -= self.inner.pop_front().map(|item| item.size_of()).unwrap_or(0);
}
self.current_size += item.size_of();
self.inner.push_back(item);
}
/// Return an `Iterator` over mutable references to elements ordered from oldest to newest.
pub fn iter_mut(&mut self) -> IterMut<'_, T> {
self.inner.iter_mut()
}
/// Returns the number of elements in the queue
pub fn len(&self) -> usize {
self.inner.len()
}
/// The total size in bytes of all elements stored in the queue.
pub fn byte_len(&self) -> usize {
self.current_size
}
}
#[cfg(test)]
mod tests {
use {super::*, std::time::Duration};
// Dummy struct that implements the SizeOf and CreatedAt traits required for elements of
// a `BoundedQueue`
#[derive(Debug)]
struct Record {
created_at: Duration,
value: u64,
}
impl From<u64> for Record {
fn from(value: u64) -> Self {
Record { created_at: Duration::from_secs(value), value: value }
}
}
impl SizeOf for Record {
fn size_of(&self) -> usize {
std::mem::size_of::<Record>()
}
}
impl CreatedAt for Record {
fn created_at(&self) -> Duration {
self.created_at
}
}
// Return a vec of the values of a record. For use in asserting the state of a `BoundedQueue` in
// tests.
fn to_values(queue: &mut BoundedQueue<Record>) -> Vec<u64> {
queue.iter_mut().map(|n| n.value).collect()
}
#[test]
fn test_bounded_queue() {
// create a queue with age 0 and test inserting elements into it
let mut queue: BoundedQueue<Record> = BoundedQueue::new(
3 * std::mem::size_of::<Record>(),
3 * std::mem::size_of::<Record>(),
Duration::new(0, 0),
);
assert!(!queue.eviction_size_minimum_reached(&0.into()));
assert_eq!(to_values(&mut queue), Vec::<u64>::new());
assert_eq!(queue.len(), 0);
queue.insert(0.into());
assert!(!queue.eviction_size_minimum_reached(&1.into()));
queue.insert(1.into());
queue.insert(2.into());
assert!(queue.eviction_size_minimum_reached(&3.into()));
assert_eq!(to_values(&mut queue), vec![0, 1, 2]);
assert_eq!(queue.len(), 3);
queue.insert(3.into());
assert!(queue.eviction_size_minimum_reached(&4.into()));
assert_eq!(to_values(&mut queue), vec![1, 2, 3]);
queue.insert(4.into());
assert_eq!(to_values(&mut queue), vec![2, 3, 4]);
assert_eq!(queue.len(), 3);
// create a queue with size 0 and test inserting elements into it
let mut queue: BoundedQueue<Record> =
BoundedQueue::new(0, 10 * std::mem::size_of::<Record>(), Duration::new(2, 0));
assert!(!queue.oldest_item_will_expire(&0.into()));
assert_eq!(to_values(&mut queue), Vec::<u64>::new());
queue.insert(0.into());
assert!(!queue.oldest_item_will_expire(&1.into()));
queue.insert(1.into());
queue.insert(2.into());
assert!(queue.oldest_item_will_expire(&3.into()));
assert_eq!(to_values(&mut queue), vec![0, 1, 2]);
queue.insert(3.into());
assert!(queue.oldest_item_will_expire(&4.into()));
assert_eq!(to_values(&mut queue), vec![1, 2, 3]);
queue.insert(4.into());
assert_eq!(to_values(&mut queue), vec![2, 3, 4]);
// Create a queue with some size and some age test inserting elements into it
let mut queue: BoundedQueue<Record> = BoundedQueue::new(
3 * std::mem::size_of::<Record>(),
5 * std::mem::size_of::<Record>(),
Duration::new(3, 0),
);
assert_eq!(to_values(&mut queue), Vec::<u64>::new());
queue.insert(0.into());
queue.insert(1.into());
queue.insert(2.into());
assert_eq!(to_values(&mut queue), vec![0, 1, 2]);
queue.insert(3.into());
assert_eq!(to_values(&mut queue), vec![0, 1, 2, 3]);
// Both space and age limits are exceeded.
// The expected behavior is to evict old items as new ones are added
assert!(queue.eviction_size_minimum_reached(&4.into()));
assert!(queue.oldest_item_will_expire(&4.into()));
queue.insert(4.into());
assert_eq!(to_values(&mut queue), vec![1, 2, 3, 4]);
queue.insert(6.into());
assert_eq!(to_values(&mut queue), vec![3, 4, 6]);
queue.insert(Record { created_at: Duration::new(6, 1), value: 7 });
assert_eq!(to_values(&mut queue), vec![4, 6, 7]);
// Only space limit is exceeded.
// The expected behavior is to keep all items even as new items are added.
// Space is allowed to grow to contain elements that are within the time limit.
let record = Record { created_at: Duration::new(6, 2), value: 8 };
assert!(queue.eviction_size_minimum_reached(&record));
assert!(!queue.oldest_item_will_expire(&record));
queue.insert(record);
assert_eq!(to_values(&mut queue), vec![4, 6, 7, 8]);
queue.insert(Record { created_at: Duration::new(6, 3), value: 9 });
assert_eq!(to_values(&mut queue), vec![4, 6, 7, 8, 9]);
// Both space and age limits are exceeded.
// The expected behavior is to evict old items as new ones are added
assert!(queue.eviction_size_minimum_reached(&10.into()));
assert!(queue.oldest_item_will_expire(&10.into()));
queue.insert(10.into());
assert_eq!(to_values(&mut queue), vec![8, 9, 10]);
let mut queue: BoundedQueue<Record> = BoundedQueue::new(
2 * std::mem::size_of::<Record>(),
2 * std::mem::size_of::<Record>(),
Duration::new(3, 0),
);
queue.insert(0.into());
// Only age limit is exceeded.
// The expected behavior is to keep all items even as new items are added.
// Space is allowed to grow to contain elements that are still within the space limit.
assert!(!queue.eviction_size_minimum_reached(&4.into()));
assert!(queue.oldest_item_will_expire(&4.into()));
queue.insert(4.into());
assert_eq!(to_values(&mut queue), vec![0, 4]);
// Both space and age limit is exceeded.
// The expected behavior is to evict old items as new ones are added
assert!(queue.eviction_size_minimum_reached(&5.into()));
assert!(queue.oldest_item_will_expire(&5.into()));
queue.insert(5.into());
assert_eq!(to_values(&mut queue), vec![4, 5]);
let mut queue: BoundedQueue<Record> = BoundedQueue::new(
0 * std::mem::size_of::<Record>(),
2 * std::mem::size_of::<Record>(),
Duration::new(3, 0),
);
queue.insert(0.into());
// Maximum space limit is exceeded.
// The expected behavior is that old items are evicted as new ones are added.
// Space is not allowed to grow larger than the maximum space limit regardless
// of age.
let record = Record { created_at: Duration::new(1, 0), value: 1 };
assert!(queue.eviction_size_minimum_reached(&record));
assert!(!queue.eviction_size_maximum_reached(&record));
assert!(!queue.oldest_item_will_expire(&record));
queue.insert(record);
assert_eq!(to_values(&mut queue), vec![0, 1]);
queue.insert(Record { created_at: Duration::new(1, 1), value: 2 });
assert_eq!(to_values(&mut queue), vec![1, 2]);
// A queue with a hard max size of 1 will not accept any items inserted into it
// because it is too small to fit any.
let mut queue: BoundedQueue<Record> = BoundedQueue::new(0, 1, Duration::new(3, 0));
let record = 0.into();
assert!(queue.eviction_size_minimum_reached(&record));
assert!(queue.eviction_size_maximum_reached(&record));
// This element will be evicted immediately a.k.a. not inserted.
queue.insert(record);
assert_eq!(queue.len(), 0);
}
}