blob: 74f532e3d8e0cca6b5b1e790f999925aab20b2a2 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Support for creating futures that represent timers.
//!
//! This module contains the `Timer` type which is a future that will resolve
//! at a particular point in the future.
use {
crate::executor::EHandle,
fuchsia_zircon as zx,
futures::{
task::{AtomicWaker, LocalWaker},
Future, FutureExt, Poll, Stream,
stream::FusedStream,
},
pin_utils::{unsafe_pinned, unsafe_unpinned},
std::{
marker::Unpin,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
},
};
/// A trait which allows futures to be easily wrapped in a timeout.
pub trait TimeoutExt: Future + Sized {
/// Wraps the future in a timeout, calling `on_timeout` to produce a result
/// when the timeout occurs.
fn on_timeout<OT>(self, time: zx::Time, on_timeout: OT) -> OnTimeout<Self, OT>
where
OT: FnOnce() -> Self::Output,
{
OnTimeout {
timer: Timer::new(time),
future: self,
on_timeout: Some(on_timeout),
}
}
}
impl<F: Future + Sized> TimeoutExt for F {}
/// A wrapper for a future which will complete with a provided closure when a timeout occurs.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct OnTimeout<F, OT> {
timer: Timer,
future: F,
on_timeout: Option<OT>,
}
impl<F, OT> OnTimeout<F, OT> {
// Safety: this is safe because `OnTimeout` is only `Unpin` if
// the future is `Unpin`, and aside from `future`, all other fields are
// treated as movable.
unsafe_unpinned!(timer: Timer);
unsafe_pinned!(future: F);
unsafe_unpinned!(on_timeout: Option<OT>);
}
impl<F: Unpin, OT> Unpin for OnTimeout<F, OT> {}
impl<F: Future, OT> Future for OnTimeout<F, OT>
where
OT: FnOnce() -> F::Output,
{
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
if let Poll::Ready(item) = self.as_mut().future().poll(lw) {
return Poll::Ready(item);
}
if let Poll::Ready(()) = self.as_mut().timer().poll_unpin(lw) {
let ot = OnTimeout::on_timeout(self.as_mut())
.take()
.expect("polled withtimeout after completion");
let item = (ot)();
return Poll::Ready(item);
}
Poll::Pending
}
}
/// An asynchronous timer.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Timer {
waker_and_bool: Arc<(AtomicWaker, AtomicBool)>,
}
impl Unpin for Timer {}
impl Timer {
/// Create a new timer scheduled to fire at `time`.
pub fn new(time: zx::Time) -> Self {
let waker_and_bool = Arc::new((AtomicWaker::new(), AtomicBool::new(false)));
EHandle::local().register_timer(time, &waker_and_bool);
Timer { waker_and_bool }
}
/// Reset the `Timer` to a fire at a new time.
/// The `Timer` must have already fired since last being reset.
pub fn reset(&mut self, time: zx::Time) {
assert!(self.did_fire());
self.waker_and_bool.1.store(false, Ordering::SeqCst);
EHandle::local().register_timer(time, &self.waker_and_bool)
}
fn did_fire(&self) -> bool {
self.waker_and_bool.1.load(Ordering::SeqCst)
}
fn register_task(&self, lw: &LocalWaker) {
self.waker_and_bool.0.register(lw);
}
}
impl Future for Timer {
type Output = ();
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
if self.did_fire() {
Poll::Ready(())
} else {
self.register_task(lw);
Poll::Pending
}
}
}
/// An asynchronous interval timer.
/// This is a stream of events resolving at a rate of once-per interval.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Interval {
timer: Timer,
next: zx::Time,
duration: zx::Duration,
}
impl Interval {
/// Create a new `Interval` which yields every `duration`.
pub fn new(duration: zx::Duration) -> Self {
let next = duration.after_now();
Interval {
timer: Timer::new(next),
next,
duration,
}
}
}
impl Unpin for Interval {}
impl FusedStream for Interval {
fn is_terminated(&self) -> bool {
// `Interval` never yields `None`
false
}
}
impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> {
let this = &mut *self;
match this.timer.poll_unpin(lw) {
Poll::Ready(()) => {
this.timer.register_task(lw);
this.next += this.duration;
this.timer.reset(this.next);
Poll::Ready(Some(()))
}
Poll::Pending => {
this.timer.register_task(lw);
Poll::Pending
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{
temp::{Either, TempFutureExt},
Executor, Timer,
};
use fuchsia_zircon::prelude::*;
use futures::prelude::*;
#[test]
fn shorter_fires_first() {
let mut exec = Executor::new().unwrap();
let shorter = Timer::new(100.millis().after_now());
let longer = Timer::new(1.second().after_now());
match exec.run_singlethreaded(shorter.select(longer)) {
Either::Left(()) => {}
Either::Right(()) => panic!("wrong timer fired"),
}
}
#[test]
fn shorter_fires_first_multithreaded() {
let mut exec = Executor::new().unwrap();
let shorter = Timer::new(100.millis().after_now());
let longer = Timer::new(1.second().after_now());
match exec.run(shorter.select(longer), 4) {
Either::Left(()) => {}
Either::Right(()) => panic!("wrong timer fired"),
}
}
#[test]
fn fires_after_timeout() {
let mut exec = Executor::new().unwrap();
let deadline = 5.seconds().after_now();
let mut future = Timer::new(deadline);
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
assert_eq!(Some(deadline), exec.wake_next_timer());
assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
}
#[test]
fn interval() {
let mut exec = Executor::new().unwrap();
let start = 0.seconds().after_now();
let counter = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
let mut future = {
let counter = counter.clone();
Interval::new(5.seconds())
.map(move |()| {
counter.fetch_add(1, Ordering::SeqCst);
})
.collect::<()>()
};
// PollResult for the first time before the timer runs
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
assert_eq!(0, counter.load(Ordering::SeqCst));
// Pretend to wait until the next timer
let first_deadline = exec
.wake_next_timer()
.expect("Expected a pending timeout (1)");
assert!(first_deadline >= start + 5.seconds());
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
assert_eq!(1, counter.load(Ordering::SeqCst));
// PollResulting again before the timer runs shouldn't produce another item from the stream
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
assert_eq!(1, counter.load(Ordering::SeqCst));
// "Wait" until the next timeout and poll again: expect another item from the stream
let second_deadline = exec
.wake_next_timer()
.expect("Expected a pending timeout (2)");
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
assert_eq!(2, counter.load(Ordering::SeqCst));
assert_eq!(second_deadline, first_deadline + 5.seconds());
}
}