blob: 2f2bbfc7f2af695a0c599eda7f7b48198ee0b6fc [file] [log] [blame]
//! Combinators for the [`Stream`] trait.
//!
//! # Examples
//!
//! ```
//! use futures_lite::stream::{self, StreamExt};
//!
//! # spin_on::spin_on(async {
//! let mut s = stream::iter(vec![1, 2, 3]);
//!
//! assert_eq!(s.next().await, Some(1));
//! assert_eq!(s.next().await, Some(2));
//! assert_eq!(s.next().await, Some(3));
//! assert_eq!(s.next().await, None);
//! # });
//! ```
#[cfg(feature = "alloc")]
extern crate alloc;
#[doc(no_inline)]
pub use futures_core::stream::Stream;
#[cfg(feature = "alloc")]
use alloc::boxed::Box;
use core::fmt;
use core::future::Future;
use core::marker::PhantomData;
use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;
use crate::ready;
/// Converts a stream into a blocking iterator.
///
/// # Examples
///
/// ```
/// use futures_lite::{pin, stream};
///
/// let stream = stream::once(7);
/// pin!(stream);
///
/// let mut iter = stream::block_on(stream);
/// assert_eq!(iter.next(), Some(7));
/// assert_eq!(iter.next(), None);
/// ```
#[cfg(feature = "std")]
pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
BlockOn(stream)
}
/// Iterator for the [`block_on()`] function.
#[derive(Debug)]
pub struct BlockOn<S>(S);
#[cfg(feature = "std")]
impl<S: Stream + Unpin> Iterator for BlockOn<S> {
type Item = S::Item;
fn next(&mut self) -> Option<Self::Item> {
crate::future::block_on(self.0.next())
}
}
/// Creates an empty stream.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::empty::<i32>();
/// assert_eq!(s.next().await, None);
/// # })
/// ```
pub fn empty<T>() -> Empty<T> {
Empty {
_marker: PhantomData,
}
}
/// Stream for the [`empty()`] function.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Empty<T> {
_marker: PhantomData<T>,
}
impl<T> Unpin for Empty<T> {}
impl<T> Stream for Empty<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(0))
}
}
/// Creates a stream from an iterator.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![1, 2]);
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, None);
/// # })
/// ```
pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
Iter {
iter: iter.into_iter(),
}
}
/// Stream for the [`iter()`] function.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Iter<I> {
iter: I,
}
impl<I> Unpin for Iter<I> {}
impl<I: Iterator> Stream for Iter<I> {
type Item = I::Item;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.iter.next())
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
/// Creates a stream that yields a single item.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::once(7);
///
/// assert_eq!(s.next().await, Some(7));
/// assert_eq!(s.next().await, None);
/// # })
/// ```
pub fn once<T>(t: T) -> Once<T> {
Once { value: Some(t) }
}
pin_project! {
/// Stream for the [`once()`] function.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Once<T> {
value: Option<T>,
}
}
impl<T> Stream for Once<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
Poll::Ready(self.project().value.take())
}
fn size_hint(&self) -> (usize, Option<usize>) {
if self.value.is_some() {
(1, Some(1))
} else {
(0, Some(0))
}
}
}
/// Creates a stream that is always pending.
///
/// # Examples
///
/// ```no_run
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::pending::<i32>();
/// s.next().await;
/// unreachable!();
/// # })
/// ```
pub fn pending<T>() -> Pending<T> {
Pending {
_marker: PhantomData,
}
}
/// Stream for the [`pending()`] function.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Pending<T> {
_marker: PhantomData<T>,
}
impl<T> Unpin for Pending<T> {}
impl<T> Stream for Pending<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
Poll::Pending
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(0))
}
}
/// Creates a stream from a function returning [`Poll`].
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
/// use std::task::{Context, Poll};
///
/// # spin_on::spin_on(async {
/// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
/// Poll::Ready(Some(7))
/// }
///
/// assert_eq!(stream::poll_fn(f).next().await, Some(7));
/// # })
/// ```
pub fn poll_fn<T, F>(f: F) -> PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
{
PollFn { f }
}
/// Stream for the [`poll_fn()`] function.
#[derive(Clone)]
#[must_use = "streams do nothing unless polled"]
pub struct PollFn<F> {
f: F,
}
impl<F> Unpin for PollFn<F> {}
impl<F> fmt::Debug for PollFn<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollFn").finish()
}
}
impl<T, F> Stream for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
(&mut self.f)(cx)
}
}
/// Creates an infinite stream that yields the same item repeatedly.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::repeat(7);
///
/// assert_eq!(s.next().await, Some(7));
/// assert_eq!(s.next().await, Some(7));
/// # })
/// ```
pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
Repeat { item }
}
/// Stream for the [`repeat()`] function.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Repeat<T> {
item: T,
}
impl<T> Unpin for Repeat<T> {}
impl<T: Clone> Stream for Repeat<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(Some(self.item.clone()))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(usize::max_value(), None)
}
}
/// Creates an infinite stream from a closure that generates items.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::repeat_with(|| 7);
///
/// assert_eq!(s.next().await, Some(7));
/// assert_eq!(s.next().await, Some(7));
/// # })
/// ```
pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
where
F: FnMut() -> T,
{
RepeatWith { f: repeater }
}
/// Stream for the [`repeat_with()`] function.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct RepeatWith<F> {
f: F,
}
impl<F> Unpin for RepeatWith<F> {}
impl<T, F> Stream for RepeatWith<F>
where
F: FnMut() -> T,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = (&mut self.f)();
Poll::Ready(Some(item))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(usize::max_value(), None)
}
}
/// Creates a stream from a seed value and an async closure operating on it.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::unfold(0, |mut n| async move {
/// if n < 2 {
/// let m = n + 1;
/// Some((n, m))
/// } else {
/// None
/// }
/// });
///
/// let v: Vec<i32> = s.collect().await;
/// assert_eq!(v, [0, 1]);
/// # })
/// ```
pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
Unfold {
f,
state: Some(seed),
fut: None,
}
}
pin_project! {
/// Stream for the [`unfold()`] function.
#[derive(Clone)]
#[must_use = "streams do nothing unless polled"]
pub struct Unfold<T, F, Fut> {
f: F,
state: Option<T>,
#[pin]
fut: Option<Fut>,
}
}
impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
where
T: fmt::Debug,
Fut: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Unfold")
.field("state", &self.state)
.field("fut", &self.fut)
.finish()
}
}
impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
type Item = Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(state) = this.state.take() {
this.fut.set(Some((this.f)(state)));
}
let step = ready!(this
.fut
.as_mut()
.as_pin_mut()
.expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
.poll(cx));
this.fut.set(None);
if let Some((item, next_state)) = step {
*this.state = Some(next_state);
Poll::Ready(Some(item))
} else {
Poll::Ready(None)
}
}
}
/// Creates a stream from a seed value and a fallible async closure operating on it.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::try_unfold(0, |mut n| async move {
/// if n < 2 {
/// let m = n + 1;
/// Ok(Some((n, m)))
/// } else {
/// std::io::Result::Ok(None)
/// }
/// });
///
/// let v: Vec<i32> = s.try_collect().await?;
/// assert_eq!(v, [0, 1]);
/// # std::io::Result::Ok(()) });
/// ```
pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Result<Option<(Item, T)>, E>>,
{
TryUnfold {
f,
state: Some(init),
fut: None,
}
}
pin_project! {
/// Stream for the [`try_unfold()`] function.
#[derive(Clone)]
#[must_use = "streams do nothing unless polled"]
pub struct TryUnfold<T, F, Fut> {
f: F,
state: Option<T>,
#[pin]
fut: Option<Fut>,
}
}
impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
where
T: fmt::Debug,
Fut: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TryUnfold")
.field("state", &self.state)
.field("fut", &self.fut)
.finish()
}
}
impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Result<Option<(Item, T)>, E>>,
{
type Item = Result<Item, E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(state) = this.state.take() {
this.fut.set(Some((this.f)(state)));
}
match this.fut.as_mut().as_pin_mut() {
None => {
// The future previously errored
Poll::Ready(None)
}
Some(future) => {
let step = ready!(future.poll(cx));
this.fut.set(None);
match step {
Ok(Some((item, next_state))) => {
*this.state = Some(next_state);
Poll::Ready(Some(Ok(item)))
}
Ok(None) => Poll::Ready(None),
Err(e) => Poll::Ready(Some(Err(e))),
}
}
}
}
}
/// Extension trait for [`Stream`].
pub trait StreamExt: Stream {
/// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
where
Self: Unpin,
{
Stream::poll_next(Pin::new(self), cx)
}
/// Retrieves the next item in the stream.
///
/// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
/// resume iteration after that.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(1..=3);
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(3));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn next(&mut self) -> NextFuture<'_, Self>
where
Self: Unpin,
{
NextFuture { stream: self }
}
/// Retrieves the next item in the stream.
///
/// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
/// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
///
/// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
///
/// assert_eq!(s.try_next().await, Ok(Some(1)));
/// assert_eq!(s.try_next().await, Ok(Some(2)));
/// assert_eq!(s.try_next().await, Err("error"));
/// assert_eq!(s.try_next().await, Ok(None));
/// # });
/// ```
fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
where
Self: Stream<Item = Result<T, E>> + Unpin,
{
TryNextFuture { stream: self }
}
/// Counts the number of items in the stream.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s1 = stream::iter(vec![0]);
/// let s2 = stream::iter(vec![1, 2, 3]);
///
/// assert_eq!(s1.count().await, 1);
/// assert_eq!(s2.count().await, 3);
/// # });
/// ```
fn count(self) -> CountFuture<Self>
where
Self: Sized,
{
CountFuture {
stream: self,
count: 0,
}
}
/// Maps items of the stream to new values using a closure.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![1, 2, 3]);
/// let mut s = s.map(|x| 2 * x);
///
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(4));
/// assert_eq!(s.next().await, Some(6));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn map<T, F>(self, f: F) -> Map<Self, F>
where
Self: Sized,
F: FnMut(Self::Item) -> T,
{
Map { stream: self, f }
}
/// Maps items to streams and then concatenates them.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let words = stream::iter(vec!["one", "two"]);
///
/// let s: String = words
/// .flat_map(|s| stream::iter(s.chars()))
/// .collect()
/// .await;
///
/// assert_eq!(s, "onetwo");
/// # });
/// ```
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
where
Self: Sized,
U: Stream,
F: FnMut(Self::Item) -> U,
{
FlatMap {
stream: self.map(f),
inner_stream: None,
}
}
/// Concatenates inner streams.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s1 = stream::iter(vec![1, 2, 3]);
/// let s2 = stream::iter(vec![4, 5]);
///
/// let s = stream::iter(vec![s1, s2]);
/// let v: Vec<_> = s.flatten().collect().await;
/// assert_eq!(v, [1, 2, 3, 4, 5]);
/// # });
/// ```
fn flatten(self) -> Flatten<Self>
where
Self: Sized,
Self::Item: Stream,
{
Flatten {
stream: self,
inner_stream: None,
}
}
/// Maps items of the stream to new values using an async closure.
///
/// # Examples
///
/// ```
/// use futures_lite::pin;
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![1, 2, 3]);
/// let mut s = s.then(|x| async move { 2 * x });
///
/// pin!(s);
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(4));
/// assert_eq!(s.next().await, Some(6));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
where
Self: Sized,
F: FnMut(Self::Item) -> Fut,
Fut: Future,
{
Then {
stream: self,
future: None,
f,
}
}
/// Keeps items of the stream for which `predicate` returns `true`.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![1, 2, 3, 4]);
/// let mut s = s.filter(|i| i % 2 == 0);
///
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(4));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn filter<P>(self, predicate: P) -> Filter<Self, P>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
Filter {
stream: self,
predicate,
}
}
/// Filters and maps items of the stream using a closure.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
/// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(3));
/// assert_eq!(s.next().await, Some(5));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
where
Self: Sized,
F: FnMut(Self::Item) -> Option<T>,
{
FilterMap { stream: self, f }
}
/// Takes only the first `n` items of the stream.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::repeat(7).take(2);
///
/// assert_eq!(s.next().await, Some(7));
/// assert_eq!(s.next().await, Some(7));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn take(self, n: usize) -> Take<Self>
where
Self: Sized,
{
Take { stream: self, n }
}
/// Takes items while `predicate` returns `true`.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![1, 2, 3, 4]);
/// let mut s = s.take_while(|x| *x < 3);
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
TakeWhile {
stream: self,
predicate,
}
}
/// Skips the first `n` items of the stream.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![1, 2, 3]);
/// let mut s = s.skip(2);
///
/// assert_eq!(s.next().await, Some(3));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn skip(self, n: usize) -> Skip<Self>
where
Self: Sized,
{
Skip { stream: self, n }
}
/// Skips items while `predicate` returns `true`.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![-1i32, 0, 1]);
/// let mut s = s.skip_while(|x| x.is_negative());
///
/// assert_eq!(s.next().await, Some(0));
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
SkipWhile {
stream: self,
predicate: Some(predicate),
}
}
/// Yields every `step`th item.
///
/// # Panics
///
/// This method will panic if the `step` is 0.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![0, 1, 2, 3, 4]);
/// let mut s = s.step_by(2);
///
/// assert_eq!(s.next().await, Some(0));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(4));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn step_by(self, step: usize) -> StepBy<Self>
where
Self: Sized,
{
assert!(step > 0, "`step` must be greater than zero");
StepBy {
stream: self,
step,
i: 0,
}
}
/// Appends another stream to the end of this one.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s1 = stream::iter(vec![1, 2]);
/// let s2 = stream::iter(vec![7, 8]);
/// let mut s = s1.chain(s2);
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(7));
/// assert_eq!(s.next().await, Some(8));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn chain<U>(self, other: U) -> Chain<Self, U>
where
Self: Sized,
U: Stream<Item = Self::Item> + Sized,
{
Chain {
first: self.fuse(),
second: other.fuse(),
}
}
/// Clones all items.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![&1, &2]);
/// let mut s = s.cloned();
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn cloned<'a, T>(self) -> Cloned<Self>
where
Self: Stream<Item = &'a T> + Sized,
T: Clone + 'a,
{
Cloned { stream: self }
}
/// Copies all items.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![&1, &2]);
/// let mut s = s.copied();
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn copied<'a, T>(self) -> Copied<Self>
where
Self: Stream<Item = &'a T> + Sized,
T: Copy + 'a,
{
Copied { stream: self }
}
/// Collects all items in the stream into a collection.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(1..=3);
///
/// let items: Vec<_> = s.collect().await;
/// assert_eq!(items, [1, 2, 3]);
/// # });
/// ```
fn collect<C>(self) -> CollectFuture<Self, C>
where
Self: Sized,
C: Default + Extend<Self::Item>,
{
CollectFuture {
stream: self,
collection: Default::default(),
}
}
/// Collects all items in the fallible stream into a collection.
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
/// let res: Result<Vec<i32>, i32> = s.try_collect().await;
/// assert_eq!(res, Err(2));
///
/// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
/// let res: Result<Vec<i32>, i32> = s.try_collect().await;
/// assert_eq!(res, Ok(vec![1, 2, 3]));
/// # })
/// ```
fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
where
Self: Stream<Item = Result<T, E>> + Sized,
C: Default + Extend<T>,
{
TryCollectFuture {
stream: self,
items: Default::default(),
}
}
/// Partitions items into those for which `predicate` is `true` and those for which it is
/// `false`, and then collects them into two collections.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![1, 2, 3]);
/// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
///
/// assert_eq!(even, &[2]);
/// assert_eq!(odd, &[1, 3]);
/// # })
/// ```
fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
where
Self: Sized,
B: Default + Extend<Self::Item>,
P: FnMut(&Self::Item) -> bool,
{
PartitionFuture {
stream: self,
predicate,
res: Some(Default::default()),
}
}
/// Accumulates a computation over the stream.
///
/// The computation begins with the accumulator value set to `init`, and then applies `f` to
/// the accumulator and each item in the stream. The final accumulator value is returned.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![1, 2, 3]);
/// let sum = s.fold(0, |acc, x| acc + x).await;
///
/// assert_eq!(sum, 6);
/// # })
/// ```
fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
where
Self: Sized,
F: FnMut(T, Self::Item) -> T,
{
FoldFuture {
stream: self,
f,
acc: Some(init),
}
}
/// Accumulates a fallible computation over the stream.
///
/// The computation begins with the accumulator value set to `init`, and then applies `f` to
/// the accumulator and each item in the stream. The final accumulator value is returned, or an
/// error if `f` failed the computation.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
///
/// let sum = s.try_fold(0, |acc, v| {
/// if (acc + v) % 2 == 1 {
/// Ok(acc + v)
/// } else {
/// Err("fail")
/// }
/// })
/// .await;
///
/// assert_eq!(sum, Err("fail"));
/// # })
/// ```
fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
where
Self: Stream<Item = Result<T, E>> + Unpin + Sized,
F: FnMut(B, T) -> Result<B, E>,
{
TryFoldFuture {
stream: self,
f,
acc: Some(init),
}
}
/// Maps items of the stream to new values using a state value and a closure.
///
/// Scanning begins with the inital state set to `initial_state`, and then applies `f` to the
/// state and each item in the stream. The stream stops when `f` returns `None`.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![1, 2, 3]);
/// let mut s = s.scan(1, |state, x| {
/// *state = *state * x;
/// Some(-*state)
/// });
///
/// assert_eq!(s.next().await, Some(-1));
/// assert_eq!(s.next().await, Some(-2));
/// assert_eq!(s.next().await, Some(-6));
/// assert_eq!(s.next().await, None);
/// # })
/// ```
fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
where
Self: Sized,
F: FnMut(&mut St, Self::Item) -> Option<B>,
{
Scan {
stream: self,
state_f: (initial_state, f),
}
}
/// Fuses the stream so that it stops yielding items after the first [`None`].
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::once(1).fuse();
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, None);
/// assert_eq!(s.next().await, None);
/// # })
/// ```
fn fuse(self) -> Fuse<Self>
where
Self: Sized,
{
Fuse {
stream: self,
done: false,
}
}
/// Repeats the stream from beginning to end, forever.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![1, 2]).cycle();
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// # });
/// ```
fn cycle(self) -> Cycle<Self>
where
Self: Clone + Sized,
{
Cycle {
orig: self.clone(),
stream: self,
}
}
/// Enumerates items, mapping them to `(index, item)`.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec!['a', 'b', 'c']);
/// let mut s = s.enumerate();
///
/// assert_eq!(s.next().await, Some((0, 'a')));
/// assert_eq!(s.next().await, Some((1, 'b')));
/// assert_eq!(s.next().await, Some((2, 'c')));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn enumerate(self) -> Enumerate<Self>
where
Self: Sized,
{
Enumerate { stream: self, i: 0 }
}
/// Calls a closure on each item and passes it on.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![1, 2, 3, 4, 5]);
///
/// let sum = s
/// .inspect(|x| println!("about to filter {}", x))
/// .filter(|x| x % 2 == 0)
/// .inspect(|x| println!("made it through filter: {}", x))
/// .fold(0, |sum, i| sum + i)
/// .await;
/// # });
/// ```
fn inspect<F>(self, f: F) -> Inspect<Self, F>
where
Self: Sized,
F: FnMut(&Self::Item),
{
Inspect { stream: self, f }
}
/// Gets the `n`th item of the stream.
///
/// In the end, `n+1` items of the stream will be consumed.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
///
/// assert_eq!(s.nth(2).await, Some(2));
/// assert_eq!(s.nth(2).await, Some(5));
/// assert_eq!(s.nth(2).await, None);
/// # });
/// ```
fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
where
Self: Unpin,
{
NthFuture { stream: self, n }
}
/// Returns the last item in the stream.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![1, 2, 3, 4]);
/// assert_eq!(s.last().await, Some(4));
///
/// let s = stream::empty::<i32>();
/// assert_eq!(s.last().await, None);
/// # });
/// ```
fn last(self) -> LastFuture<Self>
where
Self: Sized,
{
LastFuture {
stream: self,
last: None,
}
}
/// Finds the first item of the stream for which `predicate` returns `true`.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![11, 12, 13, 14]);
///
/// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
/// assert_eq!(s.next().await, Some(13));
/// # });
/// ```
fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
where
Self: Unpin,
P: FnMut(&Self::Item) -> bool,
{
FindFuture {
stream: self,
predicate,
}
}
/// Applies a closure to items in the stream and returns the first [`Some`] result.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
/// let number = s.find_map(|s| s.parse().ok()).await;
///
/// assert_eq!(number, Some(2));
/// # });
/// ```
fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
where
Self: Unpin,
F: FnMut(Self::Item) -> Option<B>,
{
FindMapFuture { stream: self, f }
}
/// Finds the index of the first item of the stream for which `predicate` returns `true`.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
///
/// assert_eq!(s.position(|x| x == 2).await, Some(2));
/// assert_eq!(s.position(|x| x == 3).await, Some(0));
/// assert_eq!(s.position(|x| x == 9).await, None);
/// # });
/// ```
fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
where
Self: Unpin,
P: FnMut(Self::Item) -> bool,
{
PositionFuture {
stream: self,
predicate,
index: 0,
}
}
/// Tests if `predicate` returns `true` for all items in the stream.
///
/// The result is `true` for an empty stream.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![1, 2, 3]);
/// assert!(!s.all(|x| x % 2 == 0).await);
///
/// let mut s = stream::iter(vec![2, 4, 6, 8]);
/// assert!(s.all(|x| x % 2 == 0).await);
///
/// let mut s = stream::empty::<i32>();
/// assert!(s.all(|x| x % 2 == 0).await);
/// # });
/// ```
fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
where
Self: Unpin,
P: FnMut(Self::Item) -> bool,
{
AllFuture {
stream: self,
predicate,
}
}
/// Tests if `predicate` returns `true` for any item in the stream.
///
/// The result is `false` for an empty stream.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![1, 3, 5, 7]);
/// assert!(!s.any(|x| x % 2 == 0).await);
///
/// let mut s = stream::iter(vec![1, 2, 3]);
/// assert!(s.any(|x| x % 2 == 0).await);
///
/// let mut s = stream::empty::<i32>();
/// assert!(!s.any(|x| x % 2 == 0).await);
/// # });
/// ```
fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
where
Self: Unpin,
P: FnMut(Self::Item) -> bool,
{
AnyFuture {
stream: self,
predicate,
}
}
/// Calls a closure on each item of the stream.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![1, 2, 3]);
/// s.for_each(|s| println!("{}", s)).await;
/// # });
/// ```
fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
where
Self: Sized,
F: FnMut(Self::Item),
{
ForEachFuture { stream: self, f }
}
/// Calls a fallible closure on each item of the stream, stopping on first error.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![0, 1, 2, 3]);
///
/// let mut v = vec![];
/// let res = s
/// .try_for_each(|n| {
/// if n < 2 {
/// v.push(n);
/// Ok(())
/// } else {
/// Err("too big")
/// }
/// })
/// .await;
///
/// assert_eq!(v, &[0, 1]);
/// assert_eq!(res, Err("too big"));
/// # });
/// ```
fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
where
Self: Unpin,
F: FnMut(Self::Item) -> Result<(), E>,
{
TryForEachFuture { stream: self, f }
}
/// Zips up two streams into a single stream of pairs.
///
/// The stream of pairs stops when either of the original two streams is exhausted.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let l = stream::iter(vec![1, 2, 3]);
/// let r = stream::iter(vec![4, 5, 6, 7]);
/// let mut s = l.zip(r);
///
/// assert_eq!(s.next().await, Some((1, 4)));
/// assert_eq!(s.next().await, Some((2, 5)));
/// assert_eq!(s.next().await, Some((3, 6)));
/// assert_eq!(s.next().await, None);
/// # });
/// ```
fn zip<U>(self, other: U) -> Zip<Self, U>
where
Self: Sized,
U: Stream,
{
Zip {
item_slot: None,
first: self,
second: other,
}
}
/// Collects a stream of pairs into a pair of collections.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let s = stream::iter(vec![(1, 2), (3, 4)]);
/// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
///
/// assert_eq!(left, [1, 3]);
/// assert_eq!(right, [2, 4]);
/// # });
/// ```
fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
where
FromA: Default + Extend<A>,
FromB: Default + Extend<B>,
Self: Stream<Item = (A, B)> + Sized,
{
UnzipFuture {
stream: self,
res: Some(Default::default()),
}
}
/// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
/// use futures_lite::stream::{once, pending};
///
/// # spin_on::spin_on(async {
/// assert_eq!(once(1).or(pending()).next().await, Some(1));
/// assert_eq!(pending().or(once(2)).next().await, Some(2));
///
/// // The first future wins.
/// assert_eq!(once(1).or(once(2)).next().await, Some(1));
/// # })
/// ```
fn or<S>(self, other: S) -> Or<Self, S>
where
Self: Sized,
S: Stream<Item = Self::Item>,
{
Or {
stream1: self,
stream2: other,
}
}
/// Merges with `other` stream, with no preference for either stream when both are ready.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
/// use futures_lite::stream::{once, pending};
///
/// # spin_on::spin_on(async {
/// assert_eq!(once(1).race(pending()).next().await, Some(1));
/// assert_eq!(pending().race(once(2)).next().await, Some(2));
///
/// // One of the two stream is randomly chosen as the winner.
/// let res = once(1).race(once(2)).next().await;
/// # })
/// ```
#[cfg(feature = "std")]
fn race<S>(self, other: S) -> Race<Self, S>
where
Self: Sized,
S: Stream<Item = Self::Item>,
{
Race {
stream1: self,
stream2: other,
}
}
/// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let a = stream::once(1);
/// let b = stream::empty();
///
/// // Streams of different types can be stored in
/// // the same collection when they are boxed:
/// let streams = vec![a.boxed(), b.boxed()];
/// # })
/// ```
#[cfg(feature = "alloc")]
fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
where
Self: Send + Sized + 'a,
{
Box::pin(self)
}
/// Boxes the stream and changes its type to `dyn Stream + 'a`.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// # spin_on::spin_on(async {
/// let a = stream::once(1);
/// let b = stream::empty();
///
/// // Streams of different types can be stored in
/// // the same collection when they are boxed:
/// let streams = vec![a.boxed_local(), b.boxed_local()];
/// # })
/// ```
#[cfg(feature = "alloc")]
fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
where
Self: Sized + 'a,
{
Box::pin(self)
}
}
impl<S: Stream + ?Sized> StreamExt for S {}
/// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// // These two lines are equivalent:
/// let s1: stream::Boxed<i32> = stream::once(7).boxed();
/// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
/// ```
#[cfg(feature = "alloc")]
pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
/// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, StreamExt};
///
/// // These two lines are equivalent:
/// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
/// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
/// ```
#[cfg(feature = "alloc")]
pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
/// Future for the [`StreamExt::next()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct NextFuture<'a, S: ?Sized> {
stream: &'a mut S,
}
impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.stream.poll_next(cx)
}
}
/// Future for the [`StreamExt::try_next()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryNextFuture<'a, S: ?Sized> {
stream: &'a mut S,
}
impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
impl<T, E, S> Future for TryNextFuture<'_, S>
where
S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
{
type Output = Result<Option<T>, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = ready!(self.stream.poll_next(cx));
Poll::Ready(res.transpose())
}
}
pin_project! {
/// Future for the [`StreamExt::count()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CountFuture<S: ?Sized> {
count: usize,
#[pin]
stream: S,
}
}
impl<S: Stream + ?Sized> Future for CountFuture<S> {
type Output = usize;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.as_mut().project().stream.poll_next(cx)) {
None => return Poll::Ready(self.count),
Some(_) => *self.as_mut().project().count += 1,
}
}
}
}
pin_project! {
/// Future for the [`StreamExt::collect()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CollectFuture<S, C> {
#[pin]
stream: S,
collection: C,
}
}
impl<S, C> Future for CollectFuture<S, C>
where
S: Stream,
C: Default + Extend<S::Item>,
{
type Output = C;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
let mut this = self.as_mut().project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(e) => this.collection.extend(Some(e)),
None => {
return Poll::Ready(mem::replace(self.project().collection, Default::default()))
}
}
}
}
}
pin_project! {
/// Future for the [`StreamExt::try_collect()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryCollectFuture<S, C> {
#[pin]
stream: S,
items: C,
}
}
impl<T, E, S, C> Future for TryCollectFuture<S, C>
where
S: Stream<Item = Result<T, E>>,
C: Default + Extend<T>,
{
type Output = Result<C, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
Poll::Ready(Ok(loop {
match ready!(this.stream.as_mut().poll_next(cx)?) {
Some(x) => this.items.extend(Some(x)),
None => break mem::replace(this.items, Default::default()),
}
}))
}
}
pin_project! {
/// Future for the [`StreamExt::partition()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PartitionFuture<S, P, B> {
#[pin]
stream: S,
predicate: P,
res: Option<(B, B)>,
}
}
impl<S, P, B> Future for PartitionFuture<S, P, B>
where
S: Stream + Sized,
P: FnMut(&S::Item) -> bool,
B: Default + Extend<S::Item>,
{
type Output = (B, B);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(v) => {
let res = this.res.as_mut().unwrap();
if (this.predicate)(&v) {
res.0.extend(Some(v))
} else {
res.1.extend(Some(v))
}
}
None => return Poll::Ready(this.res.take().unwrap()),
}
}
}
}
pin_project! {
/// Future for the [`StreamExt::fold()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FoldFuture<S, F, T> {
#[pin]
stream: S,
f: F,
acc: Option<T>,
}
}
impl<S, F, T> Future for FoldFuture<S, F, T>
where
S: Stream,
F: FnMut(T, S::Item) -> T,
{
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(v) => {
let old = this.acc.take().unwrap();
let new = (this.f)(old, v);
*this.acc = Some(new);
}
None => return Poll::Ready(this.acc.take().unwrap()),
}
}
}
}
/// Future for the [`StreamExt::try_fold()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryFoldFuture<'a, S, F, B> {
stream: &'a mut S,
f: F,
acc: Option<B>,
}
impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
where
S: Stream<Item = Result<T, E>> + Unpin,
F: FnMut(B, T) -> Result<B, E>,
{
type Output = Result<B, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.stream.poll_next(cx)) {
Some(Err(e)) => return Poll::Ready(Err(e)),
Some(Ok(t)) => {
let old = self.acc.take().unwrap();
let new = (&mut self.f)(old, t);
match new {
Ok(t) => self.acc = Some(t),
Err(e) => return Poll::Ready(Err(e)),
}
}
None => return Poll::Ready(Ok(self.acc.take().unwrap())),
}
}
}
}
pin_project! {
/// Stream for the [`StreamExt::scan()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Scan<S, St, F> {
#[pin]
stream: S,
state_f: (St, F),
}
}
impl<S, St, F, B> Stream for Scan<S, St, F>
where
S: Stream,
F: FnMut(&mut St, S::Item) -> Option<B>,
{
type Item = B;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
let mut this = self.project();
this.stream.as_mut().poll_next(cx).map(|item| {
item.and_then(|item| {
let (state, f) = this.state_f;
f(state, item)
})
})
}
}
pin_project! {
/// Stream for the [`StreamExt::fuse()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Fuse<S> {
#[pin]
stream: S,
done: bool,
}
}
impl<S: Stream> Stream for Fuse<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();
if *this.done {
Poll::Ready(None)
} else {
let next = ready!(this.stream.poll_next(cx));
if next.is_none() {
*this.done = true;
}
Poll::Ready(next)
}
}
}
pin_project! {
/// Stream for the [`StreamExt::map()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Map<S, F> {
#[pin]
stream: S,
f: F,
}
}
impl<S, F, T> Stream for Map<S, F>
where
S: Stream,
F: FnMut(S::Item) -> T,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = ready!(this.stream.poll_next(cx));
Poll::Ready(next.map(this.f))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
pin_project! {
/// Stream for the [`StreamExt::flat_map()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct FlatMap<S, U, F> {
#[pin]
stream: Map<S, F>,
#[pin]
inner_stream: Option<U>,
}
}
impl<S, U, F> Stream for FlatMap<S, U, F>
where
S: Stream,
U: Stream,
F: FnMut(S::Item) -> U,
{
type Item = U::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
match ready!(inner.poll_next(cx)) {
Some(item) => return Poll::Ready(Some(item)),
None => this.inner_stream.set(None),
}
}
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(stream) => this.inner_stream.set(Some(stream)),
None => return Poll::Ready(None),
}
}
}
}
pin_project! {
/// Stream for the [`StreamExt::flat_map()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Flatten<S: Stream> {
#[pin]
stream: S,
#[pin]
inner_stream: Option<S::Item>,
}
}
impl<S, U> Stream for Flatten<S>
where
S: Stream<Item = U>,
U: Stream,
{
type Item = U::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
match ready!(inner.poll_next(cx)) {
Some(item) => return Poll::Ready(Some(item)),
None => this.inner_stream.set(None),
}
}
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(inner) => this.inner_stream.set(Some(inner)),
None => return Poll::Ready(None),
}
}
}
}
pin_project! {
/// Stream for the [`StreamExt::then()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Then<S, F, Fut> {
#[pin]
stream: S,
#[pin]
future: Option<Fut>,
f: F,
}
}
impl<S, F, Fut> Stream for Then<S, F, Fut>
where
S: Stream,
F: FnMut(S::Item) -> Fut,
Fut: Future,
{
type Item = Fut::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Some(fut) = this.future.as_mut().as_pin_mut() {
let item = ready!(fut.poll(cx));
this.future.set(None);
return Poll::Ready(Some(item));
} else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
this.future.set(Some((this.f)(item)));
} else {
return Poll::Ready(None);
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let future_len = if self.future.is_some() { 1 } else { 0 };
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = upper.and_then(|u| u.checked_add(future_len));
(lower, upper)
}
}
pin_project! {
/// Stream for the [`StreamExt::filter()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Filter<S, P> {
#[pin]
stream: S,
predicate: P,
}
}
impl<S, P> Stream for Filter<S, P>
where
S: Stream,
P: FnMut(&S::Item) -> bool,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
None => return Poll::Ready(None),
Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
Some(_) => {}
}
}
}
}
/// Merges two streams, preferring items from `stream1` whenever both streams are ready.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, once, pending, StreamExt};
///
/// # spin_on::spin_on(async {
/// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
/// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
///
/// // The first stream wins.
/// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
/// # })
/// ```
pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
where
S1: Stream<Item = T>,
S2: Stream<Item = T>,
{
Or { stream1, stream2 }
}
pin_project! {
/// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Or<S1, S2> {
#[pin]
stream1: S1,
#[pin]
stream2: S2,
}
}
impl<T, S1, S2> Stream for Or<S1, S2>
where
S1: Stream<Item = T>,
S2: Stream<Item = T>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
return Poll::Ready(Some(t));
}
this.stream2.as_mut().poll_next(cx)
}
}
/// Merges two streams, with no preference for either stream when both are ready.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, once, pending, StreamExt};
///
/// # spin_on::spin_on(async {
/// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
/// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
///
/// // One of the two stream is randomly chosen as the winner.
/// let res = stream::race(once(1), once(2)).next().await;
/// # })
#[cfg(feature = "std")]
pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
where
S1: Stream<Item = T>,
S2: Stream<Item = T>,
{
Race { stream1, stream2 }
}
#[cfg(feature = "std")]
pin_project! {
/// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Race<S1, S2> {
#[pin]
stream1: S1,
#[pin]
stream2: S2,
}
}
#[cfg(feature = "std")]
impl<T, S1, S2> Stream for Race<S1, S2>
where
S1: Stream<Item = T>,
S2: Stream<Item = T>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if fastrand::bool() {
if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
return Poll::Ready(Some(t));
}
if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
return Poll::Ready(Some(t));
}
} else {
if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
return Poll::Ready(Some(t));
}
if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
return Poll::Ready(Some(t));
}
}
Poll::Pending
}
}
pin_project! {
/// Stream for the [`StreamExt::filter_map()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct FilterMap<S, F> {
#[pin]
stream: S,
f: F,
}
}
impl<S, F, T> Stream for FilterMap<S, F>
where
S: Stream,
F: FnMut(S::Item) -> Option<T>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
None => return Poll::Ready(None),
Some(v) => {
if let Some(t) = (this.f)(v) {
return Poll::Ready(Some(t));
}
}
}
}
}
}
pin_project! {
/// Stream for the [`StreamExt::take()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Take<S> {
#[pin]
stream: S,
n: usize,
}
}
impl<S: Stream> Stream for Take<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();
if *this.n == 0 {
Poll::Ready(None)
} else {
let next = ready!(this.stream.poll_next(cx));
match next {
Some(_) => *this.n -= 1,
None => *this.n = 0,
}
Poll::Ready(next)
}
}
}
pin_project! {
/// Stream for the [`StreamExt::take_while()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TakeWhile<S, P> {
#[pin]
stream: S,
predicate: P,
}
}
impl<S, P> Stream for TakeWhile<S, P>
where
S: Stream,
P: FnMut(&S::Item) -> bool,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match ready!(this.stream.poll_next(cx)) {
Some(v) => {
if (this.predicate)(&v) {
Poll::Ready(Some(v))
} else {
Poll::Ready(None)
}
}
None => Poll::Ready(None),
}
}
}
pin_project! {
/// Stream for the [`StreamExt::skip()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Skip<S> {
#[pin]
stream: S,
n: usize,
}
}
impl<S: Stream> Stream for Skip<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(v) => match *this.n {
0 => return Poll::Ready(Some(v)),
_ => *this.n -= 1,
},
None => return Poll::Ready(None),
}
}
}
}
pin_project! {
/// Stream for the [`StreamExt::skip_while()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct SkipWhile<S, P> {
#[pin]
stream: S,
predicate: Option<P>,
}
}
impl<S, P> Stream for SkipWhile<S, P>
where
S: Stream,
P: FnMut(&S::Item) -> bool,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(v) => match this.predicate {
Some(p) => {
if !p(&v) {
*this.predicate = None;
return Poll::Ready(Some(v));
}
}
None => return Poll::Ready(Some(v)),
},
None => return Poll::Ready(None),
}
}
}
}
pin_project! {
/// Stream for the [`StreamExt::step_by()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct StepBy<S> {
#[pin]
stream: S,
step: usize,
i: usize,
}
}
impl<S: Stream> Stream for StepBy<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(v) => {
if *this.i == 0 {
*this.i = *this.step - 1;
return Poll::Ready(Some(v));
} else {
*this.i -= 1;
}
}
None => return Poll::Ready(None),
}
}
}
}
pin_project! {
/// Stream for the [`StreamExt::chain()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Chain<S, U> {
#[pin]
first: Fuse<S>,
#[pin]
second: Fuse<U>,
}
}
impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if !this.first.done {
let next = ready!(this.first.as_mut().poll_next(cx));
if let Some(next) = next {
return Poll::Ready(Some(next));
}
}
if !this.second.done {
let next = ready!(this.second.as_mut().poll_next(cx));
if let Some(next) = next {
return Poll::Ready(Some(next));
}
}
if this.first.done && this.second.done {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
pin_project! {
/// Stream for the [`StreamExt::cloned()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Cloned<S> {
#[pin]
stream: S,
}
}
impl<'a, S, T: 'a> Stream for Cloned<S>
where
S: Stream<Item = &'a T>,
T: Clone,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = ready!(this.stream.poll_next(cx));
Poll::Ready(next.cloned())
}
}
pin_project! {
/// Stream for the [`StreamExt::copied()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Copied<S> {
#[pin]
stream: S,
}
}
impl<'a, S, T: 'a> Stream for Copied<S>
where
S: Stream<Item = &'a T>,
T: Copy,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = ready!(this.stream.poll_next(cx));
Poll::Ready(next.copied())
}
}
pin_project! {
/// Stream for the [`StreamExt::cycle()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Cycle<S> {
orig: S,
#[pin]
stream: S,
}
}
impl<S> Stream for Cycle<S>
where
S: Stream + Clone,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
Some(item) => Poll::Ready(Some(item)),
None => {
let new = self.as_mut().orig.clone();
self.as_mut().project().stream.set(new);
self.project().stream.poll_next(cx)
}
}
}
}
pin_project! {
/// Stream for the [`StreamExt::cycle()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Enumerate<S> {
#[pin]
stream: S,
i: usize,
}
}
impl<S> Stream for Enumerate<S>
where
S: Stream,
{
type Item = (usize, S::Item);
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match ready!(this.stream.poll_next(cx)) {
Some(v) => {
let ret = (*this.i, v);
*this.i += 1;
Poll::Ready(Some(ret))
}
None => Poll::Ready(None),
}
}
}
pin_project! {
/// Stream for the [`StreamExt::inspect()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Inspect<S, F> {
#[pin]
stream: S,
f: F,
}
}
impl<S, F> Stream for Inspect<S, F>
where
S: Stream,
F: FnMut(&S::Item),
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let next = ready!(this.stream.as_mut().poll_next(cx));
if let Some(x) = &next {
(this.f)(x);
}
Poll::Ready(next)
}
}
/// Future for the [`StreamExt::nth()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct NthFuture<'a, S: ?Sized> {
stream: &'a mut S,
n: usize,
}
impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
impl<'a, S> Future for NthFuture<'a, S>
where
S: Stream + Unpin + ?Sized,
{
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.stream.poll_next(cx)) {
Some(v) => match self.n {
0 => return Poll::Ready(Some(v)),
_ => self.n -= 1,
},
None => return Poll::Ready(None),
}
}
}
}
pin_project! {
/// Future for the [`StreamExt::last()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct LastFuture<S: Stream> {
#[pin]
stream: S,
last: Option<S::Item>,
}
}
impl<S: Stream> Future for LastFuture<S> {
type Output = Option<S::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(new) => *this.last = Some(new),
None => return Poll::Ready(this.last.take()),
}
}
}
}
/// Future for the [`StreamExt::find()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FindFuture<'a, S: ?Sized, P> {
stream: &'a mut S,
predicate: P,
}
impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
impl<'a, S, P> Future for FindFuture<'a, S, P>
where
S: Stream + Unpin + ?Sized,
P: FnMut(&S::Item) -> bool,
{
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.stream.poll_next(cx)) {
Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
Some(_) => {}
None => return Poll::Ready(None),
}
}
}
}
/// Future for the [`StreamExt::find_map()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FindMapFuture<'a, S: ?Sized, F> {
stream: &'a mut S,
f: F,
}
impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
where
S: Stream + Unpin + ?Sized,
F: FnMut(S::Item) -> Option<B>,
{
type Output = Option<B>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.stream.poll_next(cx)) {
Some(v) => {
if let Some(v) = (&mut self.f)(v) {
return Poll::Ready(Some(v));
}
}
None => return Poll::Ready(None),
}
}
}
}
/// Future for the [`StreamExt::position()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PositionFuture<'a, S: ?Sized, P> {
stream: &'a mut S,
predicate: P,
index: usize,
}
impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {}
impl<'a, S, P> Future for PositionFuture<'a, S, P>
where
S: Stream + Unpin + ?Sized,
P: FnMut(S::Item) -> bool,
{
type Output = Option<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.stream.poll_next(cx)) {
Some(v) => {
if (&mut self.predicate)(v) {
return Poll::Ready(Some(self.index));
} else {
self.index += 1;
}
}
None => return Poll::Ready(None),
}
}
}
}
/// Future for the [`StreamExt::all()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct AllFuture<'a, S: ?Sized, P> {
stream: &'a mut S,
predicate: P,
}
impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
impl<S, P> Future for AllFuture<'_, S, P>
where
S: Stream + Unpin + ?Sized,
P: FnMut(S::Item) -> bool,
{
type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.stream.poll_next(cx)) {
Some(v) => {
if !(&mut self.predicate)(v) {
return Poll::Ready(false);
}
}
None => return Poll::Ready(true),
}
}
}
}
/// Future for the [`StreamExt::any()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct AnyFuture<'a, S: ?Sized, P> {
stream: &'a mut S,
predicate: P,
}
impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
impl<S, P> Future for AnyFuture<'_, S, P>
where
S: Stream + Unpin + ?Sized,
P: FnMut(S::Item) -> bool,
{
type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.stream.poll_next(cx)) {
Some(v) => {
if (&mut self.predicate)(v) {
return Poll::Ready(true);
}
}
None => return Poll::Ready(false),
}
}
}
}
pin_project! {
/// Future for the [`StreamExt::for_each()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ForEachFuture<S, F> {
#[pin]
stream: S,
f: F,
}
}
impl<S, F> Future for ForEachFuture<S, F>
where
S: Stream,
F: FnMut(S::Item),
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(v) => (this.f)(v),
None => return Poll::Ready(()),
}
}
}
}
/// Future for the [`StreamExt::try_for_each()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryForEachFuture<'a, S: ?Sized, F> {
stream: &'a mut S,
f: F,
}
impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {}
impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
where
S: Stream + Unpin + ?Sized,
F: FnMut(S::Item) -> Result<(), E>,
{
type Output = Result<(), E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.stream.poll_next(cx)) {
None => return Poll::Ready(Ok(())),
Some(v) => (&mut self.f)(v)?,
}
}
}
}
pin_project! {
/// Stream for the [`StreamExt::zip()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Zip<A: Stream, B> {
item_slot: Option<A::Item>,
#[pin]
first: A,
#[pin]
second: B,
}
}
impl<A: Stream, B: Stream> Stream for Zip<A, B> {
type Item = (A::Item, B::Item);
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if this.item_slot.is_none() {
match this.first.poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(item)) => *this.item_slot = Some(item),
}
}
let second_item = ready!(this.second.poll_next(cx));
let first_item = this.item_slot.take().unwrap();
Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
}
}
pin_project! {
/// Future for the [`StreamExt::unzip()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct UnzipFuture<S, FromA, FromB> {
#[pin]
stream: S,
res: Option<(FromA, FromB)>,
}
}
impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
where
S: Stream<Item = (A, B)>,
FromA: Default + Extend<A>,
FromB: Default + Extend<B>,
{
type Output = (FromA, FromB);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some((a, b)) => {
let res = this.res.as_mut().unwrap();
res.0.extend(Some(a));
res.1.extend(Some(b));
}
None => return Poll::Ready(this.res.take().unwrap()),
}
}
}
}