| //! Allows a future or stream to execute for a maximum amount of time. |
| //! |
| //! See [`Timeout`] documentation for more details. |
| //! |
| //! [`Timeout`]: struct.Timeout.html |
| |
| use clock::now; |
| use Delay; |
| |
| use futures::{Async, Future, Poll, Stream}; |
| |
| use std::error; |
| use std::fmt; |
| use std::time::{Duration, Instant}; |
| |
| /// Allows a `Future` or `Stream` to execute for a limited amount of time. |
| /// |
| /// If the future or stream completes before the timeout has expired, then |
| /// `Timeout` returns the completed value. Otherwise, `Timeout` returns an |
| /// [`Error`]. |
| /// |
| /// # Futures and Streams |
| /// |
| /// The exact behavor depends on if the inner value is a `Future` or a `Stream`. |
| /// In the case of a `Future`, `Timeout` will require the future to complete by |
| /// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item |
| /// to take the entire timeout before returning an error. |
| /// |
| /// In order to set an upper bound on the processing of the *entire* stream, |
| /// then a timeout should be set on the future that processes the stream. For |
| /// example: |
| /// |
| /// ```rust |
| /// # extern crate futures; |
| /// # extern crate tokio; |
| /// // import the `timeout` function, usually this is done |
| /// // with `use tokio::prelude::*` |
| /// use tokio::prelude::FutureExt; |
| /// use futures::Stream; |
| /// use futures::sync::mpsc; |
| /// use std::time::Duration; |
| /// |
| /// # fn main() { |
| /// let (tx, rx) = mpsc::unbounded(); |
| /// # tx.unbounded_send(()).unwrap(); |
| /// # drop(tx); |
| /// |
| /// let process = rx.for_each(|item| { |
| /// // do something with `item` |
| /// # drop(item); |
| /// # Ok(()) |
| /// }); |
| /// |
| /// # tokio::runtime::current_thread::block_on_all( |
| /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. |
| /// process.timeout(Duration::from_millis(10)) |
| /// # ).unwrap(); |
| /// # } |
| /// ``` |
| /// |
| /// # Cancelation |
| /// |
| /// Cancelling a `Timeout` is done by dropping the value. No additional cleanup |
| /// or other work is required. |
| /// |
| /// The original future or stream may be obtained by calling [`Timeout::into_inner`]. This |
| /// consumes the `Timeout`. |
| /// |
| /// [`Error`]: struct.Error.html |
| /// [`Timeout::into_inner`]: struct.Timeout.html#method.into_iter |
| #[must_use = "futures do nothing unless polled"] |
| #[derive(Debug)] |
| pub struct Timeout<T> { |
| value: T, |
| delay: Delay, |
| } |
| |
| /// Error returned by `Timeout`. |
| #[derive(Debug)] |
| pub struct Error<T>(Kind<T>); |
| |
| /// Timeout error variants |
| #[derive(Debug)] |
| enum Kind<T> { |
| /// Inner value returned an error |
| Inner(T), |
| |
| /// The timeout elapsed. |
| Elapsed, |
| |
| /// Timer returned an error. |
| Timer(::Error), |
| } |
| |
| impl<T> Timeout<T> { |
| /// Create a new `Timeout` that allows `value` to execute for a duration of |
| /// at most `timeout`. |
| /// |
| /// The exact behavior depends on if `value` is a `Future` or a `Stream`. |
| /// |
| /// See [type] level documentation for more details. |
| /// |
| /// [type]: # |
| /// |
| /// # Examples |
| /// |
| /// Create a new `Timeout` set to expire in 10 milliseconds. |
| /// |
| /// ```rust |
| /// # extern crate futures; |
| /// # extern crate tokio; |
| /// use tokio::timer::Timeout; |
| /// use futures::Future; |
| /// use futures::sync::oneshot; |
| /// use std::time::Duration; |
| /// |
| /// # fn main() { |
| /// let (tx, rx) = oneshot::channel(); |
| /// # tx.send(()).unwrap(); |
| /// |
| /// # tokio::runtime::current_thread::block_on_all( |
| /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. |
| /// Timeout::new(rx, Duration::from_millis(10)) |
| /// # ).unwrap(); |
| /// # } |
| /// ``` |
| pub fn new(value: T, timeout: Duration) -> Timeout<T> { |
| let delay = Delay::new_timeout(now() + timeout, timeout); |
| Timeout::new_with_delay(value, delay) |
| } |
| |
| pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> { |
| Timeout { value, delay } |
| } |
| |
| /// Gets a reference to the underlying value in this timeout. |
| pub fn get_ref(&self) -> &T { |
| &self.value |
| } |
| |
| /// Gets a mutable reference to the underlying value in this timeout. |
| pub fn get_mut(&mut self) -> &mut T { |
| &mut self.value |
| } |
| |
| /// Consumes this timeout, returning the underlying value. |
| pub fn into_inner(self) -> T { |
| self.value |
| } |
| } |
| |
| impl<T: Future> Timeout<T> { |
| /// Create a new `Timeout` that completes when `future` completes or when |
| /// `deadline` is reached. |
| /// |
| /// This function differs from `new` in that: |
| /// |
| /// * It only accepts `Future` arguments. |
| /// * It sets an explicit `Instant` at which the timeout expires. |
| pub fn new_at(future: T, deadline: Instant) -> Timeout<T> { |
| let delay = Delay::new(deadline); |
| |
| Timeout { |
| value: future, |
| delay, |
| } |
| } |
| } |
| |
| impl<T> Future for Timeout<T> |
| where |
| T: Future, |
| { |
| type Item = T::Item; |
| type Error = Error<T::Error>; |
| |
| fn poll(&mut self) -> Poll<Self::Item, Self::Error> { |
| // First, try polling the future |
| match self.value.poll() { |
| Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), |
| Ok(Async::NotReady) => {} |
| Err(e) => return Err(Error::inner(e)), |
| } |
| |
| // Now check the timer |
| match self.delay.poll() { |
| Ok(Async::NotReady) => Ok(Async::NotReady), |
| Ok(Async::Ready(_)) => Err(Error::elapsed()), |
| Err(e) => Err(Error::timer(e)), |
| } |
| } |
| } |
| |
| impl<T> Stream for Timeout<T> |
| where |
| T: Stream, |
| { |
| type Item = T::Item; |
| type Error = Error<T::Error>; |
| |
| fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { |
| // First, try polling the future |
| match self.value.poll() { |
| Ok(Async::Ready(v)) => { |
| if v.is_some() { |
| self.delay.reset_timeout(); |
| } |
| return Ok(Async::Ready(v)); |
| } |
| Ok(Async::NotReady) => {} |
| Err(e) => return Err(Error::inner(e)), |
| } |
| |
| // Now check the timer |
| match self.delay.poll() { |
| Ok(Async::NotReady) => Ok(Async::NotReady), |
| Ok(Async::Ready(_)) => { |
| self.delay.reset_timeout(); |
| Err(Error::elapsed()) |
| } |
| Err(e) => Err(Error::timer(e)), |
| } |
| } |
| } |
| |
| // ===== impl Error ===== |
| |
| impl<T> Error<T> { |
| /// Create a new `Error` representing the inner value completing with `Err`. |
| pub fn inner(err: T) -> Error<T> { |
| Error(Kind::Inner(err)) |
| } |
| |
| /// Returns `true` if the error was caused by the inner value completing |
| /// with `Err`. |
| pub fn is_inner(&self) -> bool { |
| match self.0 { |
| Kind::Inner(_) => true, |
| _ => false, |
| } |
| } |
| |
| /// Consumes `self`, returning the inner future error. |
| pub fn into_inner(self) -> Option<T> { |
| match self.0 { |
| Kind::Inner(err) => Some(err), |
| _ => None, |
| } |
| } |
| |
| /// Create a new `Error` representing the inner value not completing before |
| /// the deadline is reached. |
| pub fn elapsed() -> Error<T> { |
| Error(Kind::Elapsed) |
| } |
| |
| /// Returns `true` if the error was caused by the inner value not completing |
| /// before the deadline is reached. |
| pub fn is_elapsed(&self) -> bool { |
| match self.0 { |
| Kind::Elapsed => true, |
| _ => false, |
| } |
| } |
| |
| /// Creates a new `Error` representing an error encountered by the timer |
| /// implementation |
| pub fn timer(err: ::Error) -> Error<T> { |
| Error(Kind::Timer(err)) |
| } |
| |
| /// Returns `true` if the error was caused by the timer. |
| pub fn is_timer(&self) -> bool { |
| match self.0 { |
| Kind::Timer(_) => true, |
| _ => false, |
| } |
| } |
| |
| /// Consumes `self`, returning the error raised by the timer implementation. |
| pub fn into_timer(self) -> Option<::Error> { |
| match self.0 { |
| Kind::Timer(err) => Some(err), |
| _ => None, |
| } |
| } |
| } |
| |
| impl<T: error::Error> error::Error for Error<T> { |
| fn description(&self) -> &str { |
| use self::Kind::*; |
| |
| match self.0 { |
| Inner(ref e) => e.description(), |
| Elapsed => "deadline has elapsed", |
| Timer(ref e) => e.description(), |
| } |
| } |
| } |
| |
| impl<T: fmt::Display> fmt::Display for Error<T> { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| use self::Kind::*; |
| |
| match self.0 { |
| Inner(ref e) => e.fmt(fmt), |
| Elapsed => "deadline has elapsed".fmt(fmt), |
| Timer(ref e) => e.fmt(fmt), |
| } |
| } |
| } |