blob: 22cba8ee155907b69bc00bda50ca5d545744d926 [file] [log] [blame]
//! Stream utilities for Tokio.
//!
//! A `Stream` is an asynchronous sequence of values. It can be thought of as an asynchronous version of the standard library's `Iterator` trait.
//!
//! This module provides helpers to work with them.
mod all;
use all::AllFuture;
mod any;
use any::AnyFuture;
mod chain;
use chain::Chain;
mod collect;
use collect::Collect;
pub use collect::FromStream;
mod empty;
pub use empty::{empty, Empty};
mod filter;
use filter::Filter;
mod filter_map;
use filter_map::FilterMap;
mod fold;
use fold::FoldFuture;
mod fuse;
use fuse::Fuse;
mod iter;
pub use iter::{iter, Iter};
mod map;
use map::Map;
mod merge;
use merge::Merge;
mod next;
use next::Next;
mod once;
pub use once::{once, Once};
mod pending;
pub use pending::{pending, Pending};
mod stream_map;
pub use stream_map::StreamMap;
mod skip;
use skip::Skip;
mod skip_while;
use skip_while::SkipWhile;
mod try_next;
use try_next::TryNext;
mod take;
use take::Take;
mod take_while;
use take_while::TakeWhile;
cfg_time! {
mod timeout;
use timeout::Timeout;
use std::time::Duration;
}
pub use futures_core::Stream;
/// An extension trait for `Stream`s that provides a variety of convenient
/// combinator functions.
pub trait StreamExt: Stream {
/// Consumes and returns the next value in the stream or `None` if the
/// stream is finished.
///
/// Equivalent to:
///
/// ```ignore
/// async fn next(&mut self) -> Option<Self::Item>;
/// ```
///
/// Note that because `next` doesn't take ownership over the stream,
/// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
/// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
/// be done by boxing the stream using [`Box::pin`] or
/// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
/// crate.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let mut stream = stream::iter(1..=3);
///
/// assert_eq!(stream.next().await, Some(1));
/// assert_eq!(stream.next().await, Some(2));
/// assert_eq!(stream.next().await, Some(3));
/// assert_eq!(stream.next().await, None);
/// # }
/// ```
fn next(&mut self) -> Next<'_, Self>
where
Self: Unpin,
{
Next::new(self)
}
/// Consumes and returns the next item in the stream. If an error is
/// encountered before the next item, the error is returned instead.
///
/// Equivalent to:
///
/// ```ignore
/// async fn try_next(&mut self) -> Result<Option<T>, E>;
/// ```
///
/// This is similar to the [`next`](StreamExt::next) combinator,
/// but returns a [`Result<Option<T>, E>`](Result) rather than
/// an [`Option<Result<T, E>>`](Option), making for easy use
/// with the [`?`](std::ops::Try) operator.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
///
/// assert_eq!(stream.try_next().await, Ok(Some(1)));
/// assert_eq!(stream.try_next().await, Ok(Some(2)));
/// assert_eq!(stream.try_next().await, Err("nope"));
/// # }
/// ```
fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
where
Self: Stream<Item = Result<T, E>> + Unpin,
{
TryNext::new(self)
}
/// Maps this stream's items to a different type, returning a new stream of
/// the resulting type.
///
/// The provided closure is executed over all elements of this stream as
/// they are made available. It is executed inline with calls to
/// [`poll_next`](Stream::poll_next).
///
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to the existing `map` methods in the
/// standard library.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=3);
/// let mut stream = stream.map(|x| x + 3);
///
/// assert_eq!(stream.next().await, Some(4));
/// assert_eq!(stream.next().await, Some(5));
/// assert_eq!(stream.next().await, Some(6));
/// # }
/// ```
fn map<T, F>(self, f: F) -> Map<Self, F>
where
F: FnMut(Self::Item) -> T,
Self: Sized,
{
Map::new(self, f)
}
/// Combine two streams into one by interleaving the output of both as it
/// is produced.
///
/// Values are produced from the merged stream in the order they arrive from
/// the two source streams. If both source streams provide values
/// simultaneously, the merge stream alternates between them. This provides
/// some level of fairness.
///
/// The merged stream completes once **both** source streams complete. When
/// one source stream completes before the other, the merge stream
/// exclusively polls the remaining stream.
///
/// # Examples
///
/// ```
/// use tokio::stream::StreamExt;
/// use tokio::sync::mpsc;
/// use tokio::time;
///
/// use std::time::Duration;
///
/// # /*
/// #[tokio::main]
/// # */
/// # #[tokio::main(basic_scheduler)]
/// async fn main() {
/// # time::pause();
/// let (mut tx1, rx1) = mpsc::channel(10);
/// let (mut tx2, rx2) = mpsc::channel(10);
///
/// let mut rx = rx1.merge(rx2);
///
/// tokio::spawn(async move {
/// // Send some values immediately
/// tx1.send(1).await.unwrap();
/// tx1.send(2).await.unwrap();
///
/// // Let the other task send values
/// time::delay_for(Duration::from_millis(20)).await;
///
/// tx1.send(4).await.unwrap();
/// });
///
/// tokio::spawn(async move {
/// // Wait for the first task to send values
/// time::delay_for(Duration::from_millis(5)).await;
///
/// tx2.send(3).await.unwrap();
///
/// time::delay_for(Duration::from_millis(25)).await;
///
/// // Send the final value
/// tx2.send(5).await.unwrap();
/// });
///
/// assert_eq!(1, rx.next().await.unwrap());
/// assert_eq!(2, rx.next().await.unwrap());
/// assert_eq!(3, rx.next().await.unwrap());
/// assert_eq!(4, rx.next().await.unwrap());
/// assert_eq!(5, rx.next().await.unwrap());
///
/// // The merged stream is consumed
/// assert!(rx.next().await.is_none());
/// }
/// ```
fn merge<U>(self, other: U) -> Merge<Self, U>
where
U: Stream<Item = Self::Item>,
Self: Sized,
{
Merge::new(self, other)
}
/// Filters the values produced by this stream according to the provided
/// predicate.
///
/// As values of this stream are made available, the provided predicate `f`
/// will be run against them. If the predicate
/// resolves to `true`, then the stream will yield the value, but if the
/// predicate resolves to `false`, then the value
/// will be discarded and the next value will be produced.
///
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to [`Iterator::filter`] method in the
/// standard library.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=8);
/// let mut evens = stream.filter(|x| x % 2 == 0);
///
/// assert_eq!(Some(2), evens.next().await);
/// assert_eq!(Some(4), evens.next().await);
/// assert_eq!(Some(6), evens.next().await);
/// assert_eq!(Some(8), evens.next().await);
/// assert_eq!(None, evens.next().await);
/// # }
/// ```
fn filter<F>(self, f: F) -> Filter<Self, F>
where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
{
Filter::new(self, f)
}
/// Filters the values produced by this stream while simultaneously mapping
/// them to a different type according to the provided closure.
///
/// As values of this stream are made available, the provided function will
/// be run on them. If the predicate `f` resolves to
/// [`Some(item)`](Some) then the stream will yield the value `item`, but if
/// it resolves to [`None`], then the value value will be skipped.
///
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to [`Iterator::filter_map`] method in the
/// standard library.
///
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=8);
/// let mut evens = stream.filter_map(|x| {
/// if x % 2 == 0 { Some(x + 1) } else { None }
/// });
///
/// assert_eq!(Some(3), evens.next().await);
/// assert_eq!(Some(5), evens.next().await);
/// assert_eq!(Some(7), evens.next().await);
/// assert_eq!(Some(9), evens.next().await);
/// assert_eq!(None, evens.next().await);
/// # }
/// ```
fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
where
F: FnMut(Self::Item) -> Option<T>,
Self: Sized,
{
FilterMap::new(self, f)
}
/// Creates a stream which ends after the first `None`.
///
/// After a stream returns `None`, behavior is undefined. Future calls to
/// `poll_next` may or may not return `Some(T)` again or they may panic.
/// `fuse()` adapts a stream, ensuring that after `None` is given, it will
/// return `None` forever.
///
/// # Examples
///
/// ```
/// use tokio::stream::{Stream, StreamExt};
///
/// use std::pin::Pin;
/// use std::task::{Context, Poll};
///
/// // a stream which alternates between Some and None
/// struct Alternate {
/// state: i32,
/// }
///
/// impl Stream for Alternate {
/// type Item = i32;
///
/// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
/// let val = self.state;
/// self.state = self.state + 1;
///
/// // if it's even, Some(i32), else None
/// if val % 2 == 0 {
/// Poll::Ready(Some(val))
/// } else {
/// Poll::Ready(None)
/// }
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let mut stream = Alternate { state: 0 };
///
/// // the stream goes back and forth
/// assert_eq!(stream.next().await, Some(0));
/// assert_eq!(stream.next().await, None);
/// assert_eq!(stream.next().await, Some(2));
/// assert_eq!(stream.next().await, None);
///
/// // however, once it is fused
/// let mut stream = stream.fuse();
///
/// assert_eq!(stream.next().await, Some(4));
/// assert_eq!(stream.next().await, None);
///
/// // it will always return `None` after the first time.
/// assert_eq!(stream.next().await, None);
/// assert_eq!(stream.next().await, None);
/// assert_eq!(stream.next().await, None);
/// }
/// ```
fn fuse(self) -> Fuse<Self>
where
Self: Sized,
{
Fuse::new(self)
}
/// Creates a new stream of at most `n` items of the underlying stream.
///
/// Once `n` items have been yielded from this stream then it will always
/// return that the stream is done.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let mut stream = stream::iter(1..=10).take(3);
///
/// assert_eq!(Some(1), stream.next().await);
/// assert_eq!(Some(2), stream.next().await);
/// assert_eq!(Some(3), stream.next().await);
/// assert_eq!(None, stream.next().await);
/// # }
/// ```
fn take(self, n: usize) -> Take<Self>
where
Self: Sized,
{
Take::new(self, n)
}
/// Take elements from this stream while the provided predicate
/// resolves to `true`.
///
/// This function, like `Iterator::take_while`, will take elements from the
/// stream until the predicate `f` resolves to `false`. Once one element
/// returns false it will always return that the stream is done.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
///
/// assert_eq!(Some(1), stream.next().await);
/// assert_eq!(Some(2), stream.next().await);
/// assert_eq!(Some(3), stream.next().await);
/// assert_eq!(None, stream.next().await);
/// # }
/// ```
fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
{
TakeWhile::new(self, f)
}
/// Creates a new stream that will skip the `n` first items of the
/// underlying stream.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let mut stream = stream::iter(1..=10).skip(7);
///
/// assert_eq!(Some(8), stream.next().await);
/// assert_eq!(Some(9), stream.next().await);
/// assert_eq!(Some(10), stream.next().await);
/// assert_eq!(None, stream.next().await);
/// # }
/// ```
fn skip(self, n: usize) -> Skip<Self>
where
Self: Sized,
{
Skip::new(self, n)
}
/// Skip elements from the underlying stream while the provided predicate
/// resolves to `true`.
///
/// This function, like [`Iterator::skip_while`], will ignore elemets from the
/// stream until the predicate `f` resolves to `false`. Once one element
/// returns false, the rest of the elements will be yielded.
///
/// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
/// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
///
/// assert_eq!(Some(3), stream.next().await);
/// assert_eq!(Some(4), stream.next().await);
/// assert_eq!(Some(1), stream.next().await);
/// assert_eq!(None, stream.next().await);
/// # }
/// ```
fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
{
SkipWhile::new(self, f)
}
/// Tests if every element of the stream matches a predicate.
///
/// `all()` takes a closure that returns `true` or `false`. It applies
/// this closure to each element of the stream, and if they all return
/// `true`, then so does `all`. If any of them return `false`, it
/// returns `false`. An empty stream returns `true`.
///
/// `all()` is short-circuiting; in other words, it will stop processing
/// as soon as it finds a `false`, given that no matter what else happens,
/// the result will also be `false`.
///
/// An empty stream returns `true`.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let a = [1, 2, 3];
///
/// assert!(stream::iter(&a).all(|&x| x > 0).await);
///
/// assert!(!stream::iter(&a).all(|&x| x > 2).await);
/// # }
/// ```
///
/// Stopping at the first `false`:
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let a = [1, 2, 3];
///
/// let mut iter = stream::iter(&a);
///
/// assert!(!iter.all(|&x| x != 2).await);
///
/// // we can still use `iter`, as there are more elements.
/// assert_eq!(iter.next().await, Some(&3));
/// # }
/// ```
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
where
Self: Unpin,
F: FnMut(Self::Item) -> bool,
{
AllFuture::new(self, f)
}
/// Tests if any element of the stream matches a predicate.
///
/// `any()` takes a closure that returns `true` or `false`. It applies
/// this closure to each element of the stream, and if any of them return
/// `true`, then so does `any()`. If they all return `false`, it
/// returns `false`.
///
/// `any()` is short-circuiting; in other words, it will stop processing
/// as soon as it finds a `true`, given that no matter what else happens,
/// the result will also be `true`.
///
/// An empty stream returns `false`.
///
/// Basic usage:
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let a = [1, 2, 3];
///
/// assert!(stream::iter(&a).any(|&x| x > 0).await);
///
/// assert!(!stream::iter(&a).any(|&x| x > 5).await);
/// # }
/// ```
///
/// Stopping at the first `true`:
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let a = [1, 2, 3];
///
/// let mut iter = stream::iter(&a);
///
/// assert!(iter.any(|&x| x != 2).await);
///
/// // we can still use `iter`, as there are more elements.
/// assert_eq!(iter.next().await, Some(&2));
/// # }
/// ```
fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
where
Self: Unpin,
F: FnMut(Self::Item) -> bool,
{
AnyFuture::new(self, f)
}
/// Combine two streams into one by first returning all values from the
/// first stream then all values from the second stream.
///
/// As long as `self` still has values to emit, no values from `other` are
/// emitted, even if some are ready.
///
/// # Examples
///
/// ```
/// use tokio::stream::{self, StreamExt};
///
/// #[tokio::main]
/// async fn main() {
/// let one = stream::iter(vec![1, 2, 3]);
/// let two = stream::iter(vec![4, 5, 6]);
///
/// let mut stream = one.chain(two);
///
/// assert_eq!(stream.next().await, Some(1));
/// assert_eq!(stream.next().await, Some(2));
/// assert_eq!(stream.next().await, Some(3));
/// assert_eq!(stream.next().await, Some(4));
/// assert_eq!(stream.next().await, Some(5));
/// assert_eq!(stream.next().await, Some(6));
/// assert_eq!(stream.next().await, None);
/// }
/// ```
fn chain<U>(self, other: U) -> Chain<Self, U>
where
U: Stream<Item = Self::Item>,
Self: Sized,
{
Chain::new(self, other)
}
/// A combinator that applies a function to every element in a stream
/// producing a single, final value.
///
/// # Examples
/// Basic usage:
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, *};
///
/// let s = stream::iter(vec![1u8, 2, 3]);
/// let sum = s.fold(0, |acc, x| acc + x).await;
///
/// assert_eq!(sum, 6);
/// # }
/// ```
fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
where
Self: Sized,
F: FnMut(B, Self::Item) -> B,
{
FoldFuture::new(self, init, f)
}
/// Drain stream pushing all emitted values into a collection.
///
/// `collect` streams all values, awaiting as needed. Values are pushed into
/// a collection. A number of different target collection types are
/// supported, including [`Vec`](std::vec::Vec),
/// [`String`](std::string::String), and [`Bytes`](bytes::Bytes).
///
/// # `Result`
///
/// `collect()` can also be used with streams of type `Result<T, E>` where
/// `T: FromStream<_>`. In this case, `collect()` will stream as long as
/// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
/// streaming is terminated and `collect()` returns the `Err`.
///
/// # Notes
///
/// `FromStream` is currently a sealed trait. Stabilization is pending
/// enhancements to the Rust langague.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// use tokio::stream::{self, StreamExt};
///
/// #[tokio::main]
/// async fn main() {
/// let doubled: Vec<i32> =
/// stream::iter(vec![1, 2, 3])
/// .map(|x| x * 2)
/// .collect()
/// .await;
///
/// assert_eq!(vec![2, 4, 6], doubled);
/// }
/// ```
///
/// Collecting a stream of `Result` values
///
/// ```
/// use tokio::stream::{self, StreamExt};
///
/// #[tokio::main]
/// async fn main() {
/// // A stream containing only `Ok` values will be collected
/// let values: Result<Vec<i32>, &str> =
/// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
/// .collect()
/// .await;
///
/// assert_eq!(Ok(vec![1, 2, 3]), values);
///
/// // A stream containing `Err` values will return the first error.
/// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
///
/// let values: Result<Vec<i32>, &str> =
/// stream::iter(results)
/// .collect()
/// .await;
///
/// assert_eq!(Err("no"), values);
/// }
/// ```
fn collect<T>(self) -> Collect<Self, T>
where
T: FromStream<Self::Item>,
Self: Sized,
{
Collect::new(self)
}
/// Applies a per-item timeout to the passed stream.
///
/// `timeout()` takes a `Duration` that represents the maximum amount of
/// time each element of the stream has to complete before timing out.
///
/// If the wrapped stream yields a value before the deadline is reached, the
/// value is returned. Otherwise, an error is returned. The caller may decide
/// to continue consuming the stream and will eventually get the next source
/// stream value once it becomes available.
///
/// # Notes
///
/// This function consumes the stream passed into it and returns a
/// wrapped version of it.
///
/// Polling the returned stream will continue to poll the inner stream even
/// if one or more items time out.
///
/// # Examples
///
/// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
/// use std::time::Duration;
/// # let int_stream = stream::iter(1..=3);
///
/// let mut int_stream = int_stream.timeout(Duration::from_secs(1));
///
/// // When no items time out, we get the 3 elements in succession:
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // If the second item times out, we get an error and continue polling the stream:
/// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert!(int_stream.try_next().await.is_err());
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // If we want to stop consuming the source stream the first time an
/// // element times out, we can use the `take_while` operator:
/// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// let mut int_stream = int_stream.take_while(Result::is_ok);
///
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
/// # }
/// ```
#[cfg(all(feature = "time"))]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn timeout(self, duration: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout::new(self, duration)
}
}
impl<St: ?Sized> StreamExt for St where St: Stream {}
/// Merge the size hints from two streams.
fn merge_size_hints(
(left_low, left_high): (usize, Option<usize>),
(right_low, right_hign): (usize, Option<usize>),
) -> (usize, Option<usize>) {
let low = left_low.saturating_add(right_low);
let high = match (left_high, right_hign) {
(Some(h1), Some(h2)) => h1.checked_add(h2),
_ => None,
};
(low, high)
}