blob: 65c7ba4afef9577a8fd1db59e776bd7e08570f19 [file] [log] [blame]
// Copyright 2024 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::{
cmp,
collections::BinaryHeap,
fmt::Debug,
sync::{
atomic::{self, AtomicI64, AtomicUsize},
Arc,
},
task::Poll,
};
use async_utils::futures::YieldToExecutorOnce;
use fuchsia_async as fasync;
use futures::{future::FusedFuture, Future, FutureExt, StreamExt as _};
use netstack3_core::sync::Mutex as CoreMutex;
use tracing::{trace, warn};
use crate::bindings::util::{NeedsDataNotifier, NeedsDataWatcher};
pub(crate) use scheduled_instant::ScheduledInstant;
/// A special time that is used as a marker for an unscheduled timer.
const UNSCHEDULED_SENTINEL: fasync::Time = fasync::Time::INFINITE;
/// The maximum number of timers that will fire without yielding to the
/// executor.
///
/// This allows us to yield time to other tasks if there's a thundering herd of
/// timers firing at the same time.
const YIELD_TIMER_COUNT: usize = 10;
pub(crate) struct TimerDispatcher<T> {
inner: Arc<TimerDispatcherInner<T>>,
}
impl<T> Default for TimerDispatcher<T> {
fn default() -> Self {
let notifier = NeedsDataNotifier::default();
let watcher = notifier.watcher();
Self {
inner: Arc::new(TimerDispatcherInner {
timer_count: AtomicUsize::new(0),
state: CoreMutex::new(TimerDispatcherState {
heap: BinaryHeap::new(),
notifier: Some(notifier),
watcher: Some(watcher),
}),
}),
}
}
}
impl<T: Clone + Send + Sync + Debug + 'static> TimerDispatcher<T> {
/// Spawns a worker for this `TimerDispatcher`.
///
/// The worker task will only end when `close` is called on this
/// `TimerDispatcher`.
///
/// `handler` is called whenever a timer is fired.
///
/// # Panics
///
/// Panics if this `TimerDispatcher` has already been spawned.
pub(crate) fn spawn<H: FnMut(T) + Send + Sync + 'static>(
&self,
handler: H,
) -> fasync::Task<()> {
let watcher =
self.inner.state.lock().watcher.take().expect("timer dispatcher already spawned");
fasync::Task::spawn(Self::worker(handler, watcher, Arc::clone(&self.inner)))
}
/// Creates a new timer with identifier `dispatch` on this `dispatcher`.
pub(crate) fn new_timer(&self, dispatch: T) -> Timer<T> {
let _: usize = self.inner.timer_count.fetch_add(1, atomic::Ordering::SeqCst);
Timer {
heap: Arc::clone(&self.inner),
state: Arc::new(TimerState {
dispatch,
scheduled: AtomicI64::new(UNSCHEDULED_SENTINEL.into_nanos()),
gc_generation: AtomicUsize::new(0),
}),
_no_clone: NoCloneGuard,
}
}
/// Stops this timer dispatcher.
///
/// This causes a task previously returned by [`TimerDispatcher::spawn`] to
/// complete.
///
/// # Panics
///
/// If `stop` was already called.
pub(crate) fn stop(&self) {
assert!(self.inner.state.lock().notifier.take().is_some(), "dispatcher already closed");
}
async fn worker<H: FnMut(T)>(
mut handler: H,
mut watcher: NeedsDataWatcher,
inner: Arc<TimerDispatcherInner<T>>,
) {
let mut timer = TimerWaiter::new();
let mut gc_generation = 0;
loop {
let mut watcher_next = watcher.next().fuse();
futures::select! {
() = timer => (),
w = watcher_next => {
match w {
Some(()) => (),
// Dispatcher is closed, break the loop.
None => break,
}
}
}
let (next_wakeup, heap_len) = Self::check_timer_heap(&mut handler, &inner.state).await;
trace!("next wakeup = {:?}, heap_len = {heap_len}", ScheduledInstant::new(next_wakeup));
// Update our timer to wake at the next wake up time according to
// the heap.
timer.set(next_wakeup);
// Our target heap size is the number of timers we have alive,
// capped to a minimum size to avoid too much GC thrash.
//
// We'll run a GC on the heap whenever we hit twice the target size,
// at which point we expect the GC to bring the number of entries
// down to around the target size.
let target_heap_len =
inner.timer_count.load(atomic::Ordering::SeqCst).max(MIN_TARGET_HEAP_SIZE);
timer.set_target_heap_size(target_heap_len);
if heap_len > target_heap_len * 2 {
gc_generation += 1;
let after = Self::gc_timer_heap(&inner.state, gc_generation);
trace!(
"timer gc gen({}) triggered with heap_len={}, target={}, after={}",
gc_generation,
heap_len,
target_heap_len,
after
);
}
}
// Cleanup everything on the heap before returning.
inner.state.lock().heap.clear();
}
/// Walks the timer heap, removing any stale entries that might have
/// accumulated due to timer scheduling/cancelation thrash.
///
/// Returns the heap length after GC.
fn gc_timer_heap(inner: &CoreMutex<TimerDispatcherState<T>>, generation: usize) -> usize {
let mut guard = inner.lock();
let TimerDispatcherState { heap, notifier: _, watcher: _ } = &mut *guard;
heap.retain(|TimeAndValue { time, value }| {
let current = fasync::Time::from_nanos(value.scheduled.load(atomic::Ordering::SeqCst));
// Retain all the entries that are still valid.
match ScheduledEntryValidity::new(current, *time) {
ScheduledEntryValidity::Valid | ScheduledEntryValidity::ValidForLaterTime => {
// Only keep entries that we haven't seen yet on this GC
// sweep. It is possible to observe many `ValidForLaterTime`
// entries for a single timer and we want to keep only one
// of them.
value.gc_generation.swap(generation, atomic::Ordering::SeqCst) != generation
}
ScheduledEntryValidity::Invalid => false,
}
});
heap.len()
}
/// Checks the timer heap, firing any elapsed timers.
///
/// Returns the next wakeup time for the worker task and the total number of
/// entries in the internal heap.
async fn check_timer_heap<H: FnMut(T)>(
handler: &mut H,
inner: &CoreMutex<TimerDispatcherState<T>>,
) -> (fasync::Time, usize) {
let mut fired_count = 0;
loop {
let fire = {
let mut guard = inner.lock();
let TimerDispatcherState { heap, notifier: _, watcher: _ } = &mut *guard;
let heap_len = heap.len();
let Some(front) = heap.peek_mut() else {
// Nothing to wait for.
return (UNSCHEDULED_SENTINEL, heap_len);
};
if !front.should_fire_at(fasync::Time::now()) {
// Wait until the time at the front of the heap to fire.
return (front.time, heap_len);
}
// NB: This is an associated function, probably because
// PeekMut implements Deref.
let front = std::collections::binary_heap::PeekMut::pop(front);
match front.try_fire() {
TryFireResult::Fire(f) => f,
TryFireResult::Ignore => continue,
TryFireResult::Reschedule(r) => {
heap.push(r);
continue;
}
}
};
trace!("firing timer {fire:?}");
handler(fire);
fired_count += 1;
if fired_count % YIELD_TIMER_COUNT == 0 {
YieldToExecutorOnce::new().await;
}
}
}
}
struct TimerDispatcherState<T> {
heap: BinaryHeap<TimerScheduledEntry<T>>,
// Notifier is removed on close.
notifier: Option<NeedsDataNotifier>,
// Watcher is taken on spawn.
watcher: Option<NeedsDataWatcher>,
}
struct TimerDispatcherInner<T> {
timer_count: AtomicUsize,
state: CoreMutex<TimerDispatcherState<T>>,
}
/// A reusable struct to place a value and a timestamp in a [`BinaryHeap`].
///
/// Its `Ord` implementation is tuned to make [`BinaryHeap`] a min heap.
struct TimeAndValue<T> {
time: fasync::Time,
value: T,
}
// Boilerplate to implement a heap entry.
impl<T> PartialEq for TimeAndValue<T> {
fn eq(&self, other: &Self) -> bool {
self.time == other.time
}
}
impl<T> Eq for TimeAndValue<T> {}
impl<T> Ord for TimeAndValue<T> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Note that we flip the argument order here to make the `BinaryHeap` a
// min heap.
Ord::cmp(&other.time, &self.time)
}
}
impl<T> PartialOrd for TimeAndValue<T> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(Ord::cmp(self, other))
}
}
/// An entry in the [`TimerDispatcher`] heap.
type TimerScheduledEntry<T> = TimeAndValue<Arc<TimerState<T>>>;
/// The validity state of a [`TimerScheduledEntry`].
enum ScheduledEntryValidity {
/// This entry is valid and can cause a timer to fire.
Valid,
/// This entry is valid at a later time, loaded from the timer state.
ValidForLaterTime,
/// This entry is invalid and can be discarded.
Invalid,
}
impl ScheduledEntryValidity {
/// Evaluates the validity of a scheduled entry with `current_value` and
/// cached `scheduled_entry`.
fn new(current_value: fasync::Time, scheduled_entry: fasync::Time) -> Self {
if current_value == UNSCHEDULED_SENTINEL {
return Self::Invalid;
}
match current_value.cmp(&scheduled_entry) {
// If the timer state is holding a time before the one originally
// scheduled, this is a stale timer entry. We should ignore it.
cmp::Ordering::Less => Self::Invalid,
cmp::Ordering::Equal => Self::Valid,
// If the timer state is holding a time later than the originally
// scheduled, it needs to be replaced in the heap to be fired later.
cmp::Ordering::Greater => Self::ValidForLaterTime,
}
}
}
impl<T: Clone> TimerScheduledEntry<T> {
/// Consumes the entry in an attempt to fire the timer.
fn try_fire(self) -> TryFireResult<T> {
let Self { value: timer_state, time: scheduled_for } = self;
match timer_state.scheduled.compare_exchange(
scheduled_for.into_nanos(),
UNSCHEDULED_SENTINEL.into_nanos(),
atomic::Ordering::SeqCst,
atomic::Ordering::SeqCst,
) {
// We may only fire the timer if the timer state is holding the same
// value that it had when it was put into the heap.
Ok(_) => TryFireResult::Fire(timer_state.dispatch.clone()),
Err(next) => {
let next = fasync::Time::from_nanos(next);
match ScheduledEntryValidity::new(next, scheduled_for) {
// If the timer state is valid for a later time, we need to
// put it back into the heap to be fired later.
ScheduledEntryValidity::ValidForLaterTime => {
TryFireResult::Reschedule(Self { value: timer_state, time: next })
}
// Ignore stale and invalid entries.
ScheduledEntryValidity::Invalid => TryFireResult::Ignore,
// The valid case, when the current and scheduled_for times
// are the same is covered by the Ok arm above.
ScheduledEntryValidity::Valid => unreachable!(),
}
}
}
}
/// Returns whether this entry should fire for the current time.
fn should_fire_at(&self, now: fasync::Time) -> bool {
self.time <= now
}
}
/// The result of operating a valid timer entry.
enum TryFireResult<T> {
/// Fire this timer with the provided dispatch id.
Fire(T),
/// Timer entry is stale, ignore it.
Ignore,
/// Reschedule with this new entry.
Reschedule(TimerScheduledEntry<T>),
}
/// The state kept in each timer.
struct TimerState<T> {
/// Schedule holds the most up to date desired wake up time for a timer.
///
/// This value is atomically read and compared to the cached time in
/// [`TimeAndValue`] to validate the desired wake up time hasn't changed.
scheduled: AtomicI64,
dispatch: T,
// State kept with each timer to help GC.
gc_generation: AtomicUsize,
}
/// This type is preventing [`Timer`] from evolving poorly and deriving `Clone`.
struct NoCloneGuard;
pub(crate) struct Timer<T> {
state: Arc<TimerState<T>>,
heap: Arc<TimerDispatcherInner<T>>,
/// This type must not become Clone, see explanation in [`Timer::schedule`].
_no_clone: NoCloneGuard,
}
impl<T> Debug for Timer<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Many structs in core that use timers are self-referential so we need
// to be careful in the Debug impl here, especially around the opaque
// type T.
let Self { state, heap: _, _no_clone: _ } = self;
let TimerState { scheduled, dispatch: _, gc_generation: _ } = &**state;
let scheduled = ScheduledInstant::new(fasync::Time::from_nanos(
scheduled.load(atomic::Ordering::Relaxed),
));
f.debug_struct("Timer").field("scheduled", &scheduled).finish_non_exhaustive()
}
}
impl<T> Drop for Timer<T> {
fn drop(&mut self) {
let _: Option<ScheduledInstant> = self.cancel();
let _: usize = self.heap.timer_count.fetch_sub(1, atomic::Ordering::SeqCst);
}
}
impl<T> Timer<T> {
pub(crate) fn schedule(&mut self, when: fasync::Time) -> Option<ScheduledInstant> {
let prev = self.state.scheduled.swap(when.into_nanos(), atomic::Ordering::SeqCst);
let prev = fasync::Time::from_nanos(prev);
// CRITICAL SECTION HERE.
//
// A timer *cannot* be cloned because there must be only one instance of
// if that can schedule; the cloned state may only be used by the heap
// to invalidate, by setting the scheduled time to infinite future.
//
// If we raced with a different scheduling operation here the simple
// check to see if the previous value is in the future could cause
// problems.
//
// Given no cloning, the race here is prevented because the timer heap
// only writes infinite future and always checks it against the expected
// scheduled time. So if this timer was in the heap and it just fired,
// then the heap will take care of rescheduling it for the future if it
// was pushed later in time or drop it if it was pushed earlier in time.
// Those actions are compatible with what we do below: do nothing if
// we're re-scheduling for a later moment in time and push back to the
// heap otherwise.
//
// Note that cancelation will never hit this branch because it's the
// maximum value.
if prev > when {
// If the new timer is for an earlier moment in time, then we need
// to put this in the heap; the old entry will eventually be a
// no-op.
let mut guard = self.heap.state.lock();
let wake_notifier = {
let TimerDispatcherState { heap, notifier, watcher: _ } = &mut *guard;
if let Some(notifier) = notifier.as_ref() {
let front = heap.peek().map(|c| c.time);
heap.push(TimerScheduledEntry { time: when, value: Arc::clone(&self.state) });
// Wake the timer task whenever the front of the heap changes.
(front != heap.peek().map(|c| c.time)).then_some(notifier)
} else {
// Avoid doing any work if the notifier is gone, that means
// the timer is going away.
warn!("TimerDispatcher is closed, timer will not fire");
None
}
};
if let Some(notifier) = wake_notifier {
notifier.schedule();
}
} else {
// Updating the atomic is sufficient to reschedule a timer for a
// later instant. When the entry is popped from the heap, the
// updated value from `TimerState` informs the heap of the
// reschedule.
}
ScheduledInstant::new(prev)
}
pub(crate) fn cancel(&mut self) -> Option<ScheduledInstant> {
self.schedule(UNSCHEDULED_SENTINEL)
}
pub(crate) fn scheduled_time(&self) -> Option<ScheduledInstant> {
ScheduledInstant::new(fasync::Time::from_nanos(
self.state.scheduled.load(atomic::Ordering::SeqCst),
))
}
}
const MIN_TARGET_HEAP_SIZE: usize = 10;
/// This provides a means to reuse [`fasync::Timer`] across run loops of
/// [`TimerDispatcher`], minimizing reallocations and decreasing leaks.
///
/// A `fasync::Timer` allocates backing memory that is only freed when the time
/// it was created with expires.. `TimerWaiter` offers a self-contained reusable
/// collection of timers that it decides to use based on the desired scheduled
/// wait time.
///
/// It also works around the limitation that an `fasync::Timer` can only be
/// rescheduled after it has fired.
struct TimerWaiter {
timers: BinaryHeap<TimeAndValue<fasync::Timer>>,
selected: Option<TimeAndValue<fasync::Timer>>,
wakeup: fasync::Time,
/// The target heap size controls when we let go of expired fasync::Timers
/// instead of reusing them.
target_heap_size: usize,
}
impl TimerWaiter {
/// Creates a new `TimerWaiter`.
///
/// In the default state, `TimerWaiter` is scheduled for `INFINITE_FUTURE`.
fn new() -> Self {
Self {
timers: Default::default(),
selected: None,
wakeup: UNSCHEDULED_SENTINEL,
target_heap_size: MIN_TARGET_HEAP_SIZE,
}
}
/// Sets the target heap size to `target`.
fn set_target_heap_size(&mut self, target: usize) {
self.target_heap_size = target;
}
/// Sets the time at which this future will resolve to ready.
///
/// Note that an `INFINITE_FUTURE` wakeup time will cause `TimerWaiter` to
/// behave like a terminated future, i.e., `FusedFuture::is_terminated` is
/// `true` and polling it may panic.
fn set(&mut self, new_wakeup: fasync::Time) {
let Self { timers, selected, wakeup, target_heap_size: _ } = self;
if std::mem::replace(wakeup, new_wakeup) == new_wakeup {
return;
}
// If we have a new wakeup time, always give up on the currently
// selected timer and push to the heap.
if let Some(selected) = selected.take() {
timers.push(selected);
}
// Don't use timers for waiting until infinity. Our `FusedFuture`
// implementation will prevent this from being polled when that's the
// case.
if new_wakeup == UNSCHEDULED_SENTINEL {
return;
}
let time_and_value = timers
.peek_mut()
.and_then(|front| {
// This is a good candidate IFF the fasync time is scheduled
// before our new wakeup time or it matches it exactly.
(front.time <= new_wakeup)
.then(|| std::collections::binary_heap::PeekMut::pop(front))
})
.unwrap_or_else(|| {
// If there are no timers available in the heap then we need to
// allocate a new timer.
TimeAndValue { time: new_wakeup, value: fasync::Timer::new(new_wakeup) }
});
*selected = Some(time_and_value);
}
fn maybe_return_fired_timer_to_pool(&mut self, entry: TimeAndValue<fasync::Timer>) {
// Place the timer back in the heap if we're not over the threshold.
//
// We don't store anything special in the heap saying this timer has
// already fired because fasync::Timer can be polled to discover that
// immediately, and that's resolved in our polling implementation.
// Whenever we observe `Ready` from the timer.
if self.timers.len() < self.target_heap_size {
self.timers.push(entry);
}
}
}
impl Future for TimerWaiter {
type Output = ();
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
let Self { timers: _, selected, wakeup, target_heap_size: _ } = &mut *self;
// Must not poll without a selected timer.
let mut taken_selected = selected.take().unwrap();
match taken_selected.value.poll_unpin(cx) {
Poll::Ready(()) => (),
Poll::Pending => {
// Place it back while pending.
*selected = Some(taken_selected);
return Poll::Pending;
}
}
if *wakeup <= taken_selected.time {
self.maybe_return_fired_timer_to_pool(taken_selected);
return Poll::Ready(());
}
// If our selected timer was scheduled for earlier than our wakeup
// reset the timer now that it fired and poll again.
taken_selected.value.reset(*wakeup);
taken_selected.time = *wakeup;
match taken_selected.value.poll_unpin(cx) {
Poll::Ready(()) => {
self.maybe_return_fired_timer_to_pool(taken_selected);
Poll::Ready(())
}
Poll::Pending => {
// Still pending put it back.
*selected = Some(taken_selected);
Poll::Pending
}
}
}
}
impl FusedFuture for TimerWaiter {
fn is_terminated(&self) -> bool {
self.selected.is_none()
}
}
/// A separate module for [`ScheduledInstant`] so it can't be constructed
/// violating its invariants.
mod scheduled_instant {
use crate::bindings::StackTime;
use super::{fasync, UNSCHEDULED_SENTINEL};
/// A time that stands as a witness for a valid schedule time.
///
/// It can only be constructed with an [`fasync::Time`] that is not
/// `INFINITE_FUTURE`.
#[derive(Eq, PartialEq)]
pub(crate) struct ScheduledInstant(fasync::Time);
impl std::fmt::Debug for ScheduledInstant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self(time) = self;
f.debug_tuple("ScheduledInstant").field(&time.into_nanos()).finish()
}
}
impl ScheduledInstant {
/// Constructs a new `ScheduledInstant` if `time` is not
/// `INFINITE_FUTURE`.
pub(super) fn new(time: fasync::Time) -> Option<Self> {
(time != UNSCHEDULED_SENTINEL).then_some(Self(time))
}
}
impl From<ScheduledInstant> for fasync::Time {
fn from(ScheduledInstant(value): ScheduledInstant) -> Self {
value
}
}
impl From<ScheduledInstant> for StackTime {
fn from(value: ScheduledInstant) -> Self {
Self(value.into())
}
}
}
#[cfg(test)]
mod tests {
use std::pin::pin;
use crate::bindings::integration_tests::set_logger_for_test;
use fuchsia_zircon as zx;
use futures::channel::mpsc;
use test_case::test_case;
use super::*;
impl TimerWaiter {
#[track_caller]
fn assert_timers<I: IntoIterator<Item = fasync::Time>>(
&self,
expect_heap: I,
expect_selected: Option<fasync::Time>,
) {
let Self { timers, selected, wakeup: _, target_heap_size: _ } = self;
// Collect times and compare sorted vecs.
let mut expect_heap = expect_heap.into_iter().collect::<Vec<_>>();
expect_heap.sort();
let mut timers =
timers.iter().map(|TimeAndValue { time, value: _ }| *time).collect::<Vec<_>>();
timers.sort();
assert_eq!(timers, expect_heap);
assert_eq!(
selected.as_ref().map(|TimeAndValue { time, value: _ }| *time),
expect_selected
);
}
}
fn new_test_executor() -> fasync::TestExecutor {
let executor = fasync::TestExecutor::new_with_fake_time();
// Set to zero to simplify logs in tests.
executor.set_fake_time(T0);
executor
}
#[track_caller]
fn run_in_executor<R, Fut: Future<Output = R>>(
executor: &mut fasync::TestExecutor,
f: Fut,
) -> R {
let mut f = pin!(f);
match executor.run_until_stalled(&mut f) {
Poll::Ready(r) => r,
Poll::Pending => panic!("Executor stalled"),
}
}
const T0: fasync::Time = fasync::Time::from_nanos(0);
const T1: fasync::Time = fasync::Time::from_nanos(1);
const T2: fasync::Time = fasync::Time::from_nanos(2);
const T3: fasync::Time = fasync::Time::from_nanos(3);
const T4: fasync::Time = fasync::Time::from_nanos(4);
#[derive(Debug, Eq, PartialEq, Clone)]
enum TimerId {
A,
B,
C,
Other(usize),
}
struct TestContext {
dispatcher: TimerDispatcher<TimerId>,
fired: mpsc::UnboundedReceiver<TimerId>,
task: fasync::Task<()>,
}
impl TestContext {
fn new() -> Self {
let dispatcher = TimerDispatcher::default();
let (sender, fired) = mpsc::unbounded();
let task = dispatcher
.spawn(move |t| sender.unbounded_send(t).expect("failed to send fired timer"));
Self { dispatcher, fired, task }
}
fn heap_len(&self) -> usize {
self.dispatcher.inner.state.lock().heap.len()
}
fn schedule_notifier(&self) {
self.dispatcher.inner.state.lock().notifier.as_ref().unwrap().schedule()
}
async fn shutdown(self) {
let Self { dispatcher, fired, task } = self;
dispatcher.stop();
task.await;
assert_eq!(fired.collect::<Vec<_>>().await, vec![], "unacknowledged fired timers");
}
fn shutdown_in_executor((this, mut executor): (Self, fasync::TestExecutor)) {
run_in_executor(&mut executor, this.shutdown());
assert_eq!(executor.wake_next_timer(), None, "timer leak");
}
#[track_caller]
fn assert_no_timers(&mut self, executor: &mut fasync::TestExecutor) {
assert_eq!(executor.run_until_stalled(&mut self.fired.next()), Poll::Pending);
}
#[track_caller]
fn next_timer(&mut self, executor: &mut fasync::TestExecutor) -> TimerId {
run_in_executor(executor, &mut self.fired.next()).unwrap()
}
}
#[fixture::teardown(TestContext::shutdown_in_executor)]
#[test]
fn create_timer_and_fire() {
set_logger_for_test();
let mut executor = new_test_executor();
let mut t = TestContext::new();
let mut timer_a = t.dispatcher.new_timer(TimerId::A);
let mut timer_b = t.dispatcher.new_timer(TimerId::B);
// Never scheduled should never be fired.
let _timer_c = t.dispatcher.new_timer(TimerId::C);
t.assert_no_timers(&mut executor);
// No timers are scheduled.
assert_eq!(executor.wake_next_timer(), None);
assert_eq!(timer_a.schedule(T1), None);
assert_eq!(timer_b.schedule(T3), None);
executor.set_fake_time(T1);
assert_eq!(t.next_timer(&mut executor), TimerId::A);
executor.set_fake_time(T3);
assert_eq!(t.next_timer(&mut executor), TimerId::B);
// Reschedule for a time in the past should fire immediately.
assert_eq!(timer_a.schedule(T2), None);
assert_eq!(t.next_timer(&mut executor), TimerId::A);
(t, executor)
}
#[fixture::teardown(TestContext::shutdown_in_executor)]
#[test]
fn reschedule_for_later() {
set_logger_for_test();
let mut executor = new_test_executor();
let mut t = TestContext::new();
let mut timer = t.dispatcher.new_timer(TimerId::A);
assert_eq!(timer.schedule(T1), None);
assert_eq!(t.heap_len(), 1);
t.assert_no_timers(&mut executor);
assert_eq!(timer.schedule(T2).map(Into::into), Some(T1));
// Reschedule for later doesn't create a new heap entry.
assert_eq!(t.heap_len(), 1);
executor.set_fake_time(T1);
t.assert_no_timers(&mut executor);
executor.set_fake_time(T2);
assert_eq!(t.next_timer(&mut executor), TimerId::A);
(t, executor)
}
#[fixture::teardown(TestContext::shutdown_in_executor)]
#[test]
fn reschedule_for_earlier() {
set_logger_for_test();
let mut executor = new_test_executor();
let mut t = TestContext::new();
let mut timer = t.dispatcher.new_timer(TimerId::A);
assert_eq!(timer.schedule(T2), None);
assert_eq!(t.heap_len(), 1);
t.assert_no_timers(&mut executor);
assert_eq!(timer.schedule(T1).map(Into::into), Some(T2));
// Rescheduling for earlier creates a new heap entry.
assert_eq!(t.heap_len(), 2);
executor.set_fake_time(T1);
assert_eq!(t.next_timer(&mut executor), TimerId::A);
executor.set_fake_time(T2);
t.assert_no_timers(&mut executor);
(t, executor)
}
#[fixture::teardown(TestContext::shutdown_in_executor)]
#[test]
fn cancel() {
set_logger_for_test();
let mut executor = new_test_executor();
let mut t = TestContext::new();
let mut timer = t.dispatcher.new_timer(TimerId::A);
// Can cancel an unscheduled timer.
assert_eq!(timer.cancel(), None);
assert_eq!(timer.schedule(T1), None);
t.assert_no_timers(&mut executor);
assert_eq!(timer.cancel().map(Into::into), Some(T1));
executor.set_fake_time(T1);
t.assert_no_timers(&mut executor);
// Can cancel an already canceled timer.
assert_eq!(timer.cancel(), None);
// Dropping the timer also cancels it.
assert_eq!(timer.schedule(T2), None);
std::mem::drop(timer);
executor.set_fake_time(T2);
t.assert_no_timers(&mut executor);
(t, executor)
}
#[fixture::teardown(TestContext::shutdown_in_executor)]
#[test]
fn fire_in_order() {
set_logger_for_test();
let mut executor = new_test_executor();
let mut t = TestContext::new();
let mut timer_a = t.dispatcher.new_timer(TimerId::A);
let mut timer_b = t.dispatcher.new_timer(TimerId::B);
let mut timer_c = t.dispatcher.new_timer(TimerId::C);
assert_eq!(timer_a.schedule(T1), None);
assert_eq!(timer_b.schedule(T2), None);
assert_eq!(timer_c.schedule(T3), None);
executor.set_fake_time(T3);
assert_eq!(
executor.run_until_stalled(&mut futures::future::pending::<()>()),
Poll::Pending
);
// All the timers should now be stashed in the receiver.
assert_eq!(
t.fired.by_ref().take(3).collect::<Vec<_>>().now_or_never(),
Some(vec![TimerId::A, TimerId::B, TimerId::C])
);
(t, executor)
}
#[fixture::teardown(TestContext::shutdown_in_executor)]
#[test_case(1; "min_target_size")]
#[test_case(MIN_TARGET_HEAP_SIZE * 2; "timer_count")]
fn heap_gc(timer_count: usize) {
set_logger_for_test();
let mut executor = new_test_executor();
let mut t = TestContext::new();
assert_eq!(t.dispatcher.inner.timer_count.load(atomic::Ordering::SeqCst), 0);
let mut timers =
(0..timer_count).map(|i| t.dispatcher.new_timer(TimerId::Other(i))).collect::<Vec<_>>();
let timer = &mut timers[0];
assert_eq!(t.dispatcher.inner.timer_count.load(atomic::Ordering::SeqCst), timer_count);
assert_eq!(timer.schedule(T1), None);
assert_eq!(t.heap_len(), 1);
let mut reschedule = || {
let prev: fasync::Time = timer.cancel().unwrap().into();
assert_eq!(timer.schedule(prev + zx::Duration::from_seconds(1)), None);
};
// Canceling and rescheduling creates a new heap entry.
reschedule();
assert_eq!(t.heap_len(), 2);
// GC won't be triggered until we hit the gc threshold.
t.assert_no_timers(&mut executor);
assert_eq!(t.heap_len(), 2);
let threshold = timer_count.max(MIN_TARGET_HEAP_SIZE) * 2;
while t.heap_len() < threshold {
reschedule();
}
// Kick the notifier and check to check that GC doesn't run yet.
t.schedule_notifier();
t.assert_no_timers(&mut executor);
assert_eq!(t.heap_len(), threshold);
// Do one more round to push over the threshold and check that GC runs.
reschedule();
t.schedule_notifier();
t.assert_no_timers(&mut executor);
assert_eq!(t.heap_len(), 1);
// On drop the number of timers should decrease.
drop(timers);
assert_eq!(t.dispatcher.inner.timer_count.load(atomic::Ordering::SeqCst), 0);
(t, executor)
}
#[test]
fn timer_waiter_starts_terminated() {
let tw = TimerWaiter::new();
assert_eq!(tw.is_terminated(), true);
}
#[test]
fn timer_waiter_reuse_timer() {
set_logger_for_test();
let mut executor = new_test_executor();
let mut tw = TimerWaiter::new();
tw.assert_timers([], None);
tw.set(T1);
tw.assert_timers([], Some(T1));
assert_eq!(tw.is_terminated(), false);
assert_eq!(executor.wake_next_timer(), Some(T1));
assert_eq!(executor.run_until_stalled(&mut tw), Poll::Ready(()));
assert_eq!(tw.is_terminated(), true);
assert_eq!(executor.wake_next_timer(), None);
tw.assert_timers([T1], None);
// Should be able to reuse the timer here.
tw.set(T2);
// Initially the timer is still cached to fire at T1, we haven't reset
// it.
tw.assert_timers([], Some(T1));
assert_eq!(executor.wake_next_timer(), None);
// Then we'll poll once which will update the timer.
assert_eq!(executor.run_until_stalled(&mut tw), Poll::Pending);
tw.assert_timers([], Some(T2));
// Now unblock the future.
assert_eq!(executor.wake_next_timer(), Some(T2));
assert_eq!(executor.run_until_stalled(&mut tw), Poll::Ready(()));
assert_eq!(tw.is_terminated(), true);
// We can now do everything again, but let's observe the future resolve
// all at once because time is in the future.
tw.set(T3);
tw.assert_timers([], Some(T2));
executor.set_fake_time(T3);
assert_eq!(executor.run_until_stalled(&mut tw), Poll::Ready(()));
assert_eq!(tw.is_terminated(), true);
}
#[test]
fn timer_waiter_cant_reuse() {
set_logger_for_test();
let mut executor = new_test_executor();
let mut tw = TimerWaiter::new();
tw.set(T3);
tw.assert_timers([], Some(T3));
// A time that is earlier than our only available timer will cause us to
// create a new timer.
tw.set(T2);
tw.assert_timers([T3], Some(T2));
tw.set(T1);
tw.assert_timers([T2, T3], Some(T1));
// If we reschedule again to T3, the configuration remains the same
// because we always prefer our earliest timer.
tw.set(T3);
tw.assert_timers([T2, T3], Some(T1));
// Won't fire until we reach T3.
executor.set_fake_time(T1);
assert_eq!(executor.run_until_stalled(&mut tw), Poll::Pending);
tw.assert_timers([T2, T3], Some(T3));
executor.set_fake_time(T3);
assert_eq!(executor.run_until_stalled(&mut tw), Poll::Ready(()));
}
#[test]
fn timer_waiter_target_heap_size() {
set_logger_for_test();
let mut executor = new_test_executor();
let mut tw = TimerWaiter::new();
tw.target_heap_size = 2;
// Change the scheduled time in reverse to thrash timer creation.
tw.set(T4);
tw.set(T3);
tw.set(T2);
tw.set(T1);
tw.assert_timers([T2, T3, T4], Some(T1));
executor.set_fake_time(T1);
assert_eq!(executor.run_until_stalled(&mut tw), Poll::Ready(()));
// Timer is not reused because we hit the threshold. Note, however, that
// we're still above threshold because removal is only considered when
// the timer is observed to have fired.
tw.assert_timers([T2, T3, T4], None);
}
}