| //! Combinators for the [`Stream`] trait. |
| //! |
| //! # Examples |
| //! |
| //! ``` |
| //! use futures_lite::stream::{self, StreamExt}; |
| //! |
| //! # spin_on::spin_on(async { |
| //! let mut s = stream::iter(vec![1, 2, 3]); |
| //! |
| //! assert_eq!(s.next().await, Some(1)); |
| //! assert_eq!(s.next().await, Some(2)); |
| //! assert_eq!(s.next().await, Some(3)); |
| //! assert_eq!(s.next().await, None); |
| //! # }); |
| //! ``` |
| |
| #[cfg(feature = "alloc")] |
| extern crate alloc; |
| |
| #[doc(no_inline)] |
| pub use futures_core::stream::Stream; |
| |
| #[cfg(feature = "alloc")] |
| use alloc::boxed::Box; |
| |
| use core::fmt; |
| use core::future::Future; |
| use core::marker::PhantomData; |
| use core::mem; |
| use core::pin::Pin; |
| use core::task::{Context, Poll}; |
| |
| use pin_project_lite::pin_project; |
| |
| use crate::ready; |
| |
| /// Converts a stream into a blocking iterator. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::{pin, stream}; |
| /// |
| /// let stream = stream::once(7); |
| /// pin!(stream); |
| /// |
| /// let mut iter = stream::block_on(stream); |
| /// assert_eq!(iter.next(), Some(7)); |
| /// assert_eq!(iter.next(), None); |
| /// ``` |
| #[cfg(feature = "std")] |
| pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> { |
| BlockOn(stream) |
| } |
| |
| /// Iterator for the [`block_on()`] function. |
| #[derive(Debug)] |
| pub struct BlockOn<S>(S); |
| |
| #[cfg(feature = "std")] |
| impl<S: Stream + Unpin> Iterator for BlockOn<S> { |
| type Item = S::Item; |
| |
| fn next(&mut self) -> Option<Self::Item> { |
| crate::future::block_on(self.0.next()) |
| } |
| } |
| |
| /// Creates an empty stream. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::empty::<i32>(); |
| /// assert_eq!(s.next().await, None); |
| /// # }) |
| /// ``` |
| pub fn empty<T>() -> Empty<T> { |
| Empty { |
| _marker: PhantomData, |
| } |
| } |
| |
| /// Stream for the [`empty()`] function. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Empty<T> { |
| _marker: PhantomData<T>, |
| } |
| |
| impl<T> Unpin for Empty<T> {} |
| |
| impl<T> Stream for Empty<T> { |
| type Item = T; |
| |
| fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| Poll::Ready(None) |
| } |
| |
| fn size_hint(&self) -> (usize, Option<usize>) { |
| (0, Some(0)) |
| } |
| } |
| |
| /// Creates a stream from an iterator. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(vec![1, 2]); |
| /// |
| /// assert_eq!(s.next().await, Some(1)); |
| /// assert_eq!(s.next().await, Some(2)); |
| /// assert_eq!(s.next().await, None); |
| /// # }) |
| /// ``` |
| pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> { |
| Iter { |
| iter: iter.into_iter(), |
| } |
| } |
| |
| /// Stream for the [`iter()`] function. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Iter<I> { |
| iter: I, |
| } |
| |
| impl<I> Unpin for Iter<I> {} |
| |
| impl<I: Iterator> Stream for Iter<I> { |
| type Item = I::Item; |
| |
| fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| Poll::Ready(self.iter.next()) |
| } |
| |
| fn size_hint(&self) -> (usize, Option<usize>) { |
| self.iter.size_hint() |
| } |
| } |
| |
| /// Creates a stream that yields a single item. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::once(7); |
| /// |
| /// assert_eq!(s.next().await, Some(7)); |
| /// assert_eq!(s.next().await, None); |
| /// # }) |
| /// ``` |
| pub fn once<T>(t: T) -> Once<T> { |
| Once { value: Some(t) } |
| } |
| |
| pin_project! { |
| /// Stream for the [`once()`] function. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Once<T> { |
| value: Option<T>, |
| } |
| } |
| |
| impl<T> Stream for Once<T> { |
| type Item = T; |
| |
| fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> { |
| Poll::Ready(self.project().value.take()) |
| } |
| |
| fn size_hint(&self) -> (usize, Option<usize>) { |
| if self.value.is_some() { |
| (1, Some(1)) |
| } else { |
| (0, Some(0)) |
| } |
| } |
| } |
| |
| /// Creates a stream that is always pending. |
| /// |
| /// # Examples |
| /// |
| /// ```no_run |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::pending::<i32>(); |
| /// s.next().await; |
| /// unreachable!(); |
| /// # }) |
| /// ``` |
| pub fn pending<T>() -> Pending<T> { |
| Pending { |
| _marker: PhantomData, |
| } |
| } |
| |
| /// Stream for the [`pending()`] function. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Pending<T> { |
| _marker: PhantomData<T>, |
| } |
| |
| impl<T> Unpin for Pending<T> {} |
| |
| impl<T> Stream for Pending<T> { |
| type Item = T; |
| |
| fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> { |
| Poll::Pending |
| } |
| |
| fn size_hint(&self) -> (usize, Option<usize>) { |
| (0, Some(0)) |
| } |
| } |
| |
| /// Creates a stream from a function returning [`Poll`]. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// use std::task::{Context, Poll}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> { |
| /// Poll::Ready(Some(7)) |
| /// } |
| /// |
| /// assert_eq!(stream::poll_fn(f).next().await, Some(7)); |
| /// # }) |
| /// ``` |
| pub fn poll_fn<T, F>(f: F) -> PollFn<F> |
| where |
| F: FnMut(&mut Context<'_>) -> Poll<Option<T>>, |
| { |
| PollFn { f } |
| } |
| |
| /// Stream for the [`poll_fn()`] function. |
| #[derive(Clone)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct PollFn<F> { |
| f: F, |
| } |
| |
| impl<F> Unpin for PollFn<F> {} |
| |
| impl<F> fmt::Debug for PollFn<F> { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("PollFn").finish() |
| } |
| } |
| |
| impl<T, F> Stream for PollFn<F> |
| where |
| F: FnMut(&mut Context<'_>) -> Poll<Option<T>>, |
| { |
| type Item = T; |
| |
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { |
| (&mut self.f)(cx) |
| } |
| } |
| |
| /// Creates an infinite stream that yields the same item repeatedly. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::repeat(7); |
| /// |
| /// assert_eq!(s.next().await, Some(7)); |
| /// assert_eq!(s.next().await, Some(7)); |
| /// # }) |
| /// ``` |
| pub fn repeat<T: Clone>(item: T) -> Repeat<T> { |
| Repeat { item } |
| } |
| |
| /// Stream for the [`repeat()`] function. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Repeat<T> { |
| item: T, |
| } |
| |
| impl<T> Unpin for Repeat<T> {} |
| |
| impl<T: Clone> Stream for Repeat<T> { |
| type Item = T; |
| |
| fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| Poll::Ready(Some(self.item.clone())) |
| } |
| |
| fn size_hint(&self) -> (usize, Option<usize>) { |
| (usize::max_value(), None) |
| } |
| } |
| |
| /// Creates an infinite stream from a closure that generates items. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::repeat_with(|| 7); |
| /// |
| /// assert_eq!(s.next().await, Some(7)); |
| /// assert_eq!(s.next().await, Some(7)); |
| /// # }) |
| /// ``` |
| pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F> |
| where |
| F: FnMut() -> T, |
| { |
| RepeatWith { f: repeater } |
| } |
| |
| /// Stream for the [`repeat_with()`] function. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct RepeatWith<F> { |
| f: F, |
| } |
| |
| impl<F> Unpin for RepeatWith<F> {} |
| |
| impl<T, F> Stream for RepeatWith<F> |
| where |
| F: FnMut() -> T, |
| { |
| type Item = T; |
| |
| fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let item = (&mut self.f)(); |
| Poll::Ready(Some(item)) |
| } |
| |
| fn size_hint(&self) -> (usize, Option<usize>) { |
| (usize::max_value(), None) |
| } |
| } |
| |
| /// Creates a stream from a seed value and an async closure operating on it. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::unfold(0, |mut n| async move { |
| /// if n < 2 { |
| /// let m = n + 1; |
| /// Some((n, m)) |
| /// } else { |
| /// None |
| /// } |
| /// }); |
| /// |
| /// let v: Vec<i32> = s.collect().await; |
| /// assert_eq!(v, [0, 1]); |
| /// # }) |
| /// ``` |
| pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut> |
| where |
| F: FnMut(T) -> Fut, |
| Fut: Future<Output = Option<(Item, T)>>, |
| { |
| Unfold { |
| f, |
| state: Some(seed), |
| fut: None, |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`unfold()`] function. |
| #[derive(Clone)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Unfold<T, F, Fut> { |
| f: F, |
| state: Option<T>, |
| #[pin] |
| fut: Option<Fut>, |
| } |
| } |
| |
| impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut> |
| where |
| T: fmt::Debug, |
| Fut: fmt::Debug, |
| { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("Unfold") |
| .field("state", &self.state) |
| .field("fut", &self.fut) |
| .finish() |
| } |
| } |
| |
| impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut> |
| where |
| F: FnMut(T) -> Fut, |
| Fut: Future<Output = Option<(Item, T)>>, |
| { |
| type Item = Item; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| |
| if let Some(state) = this.state.take() { |
| this.fut.set(Some((this.f)(state))); |
| } |
| |
| let step = ready!(this |
| .fut |
| .as_mut() |
| .as_pin_mut() |
| .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`") |
| .poll(cx)); |
| this.fut.set(None); |
| |
| if let Some((item, next_state)) = step { |
| *this.state = Some(next_state); |
| Poll::Ready(Some(item)) |
| } else { |
| Poll::Ready(None) |
| } |
| } |
| } |
| |
| /// Creates a stream from a seed value and a fallible async closure operating on it. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::try_unfold(0, |mut n| async move { |
| /// if n < 2 { |
| /// let m = n + 1; |
| /// Ok(Some((n, m))) |
| /// } else { |
| /// std::io::Result::Ok(None) |
| /// } |
| /// }); |
| /// |
| /// let v: Vec<i32> = s.try_collect().await?; |
| /// assert_eq!(v, [0, 1]); |
| /// # std::io::Result::Ok(()) }); |
| /// ``` |
| pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut> |
| where |
| F: FnMut(T) -> Fut, |
| Fut: Future<Output = Result<Option<(Item, T)>, E>>, |
| { |
| TryUnfold { |
| f, |
| state: Some(init), |
| fut: None, |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`try_unfold()`] function. |
| #[derive(Clone)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct TryUnfold<T, F, Fut> { |
| f: F, |
| state: Option<T>, |
| #[pin] |
| fut: Option<Fut>, |
| } |
| } |
| |
| impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut> |
| where |
| T: fmt::Debug, |
| Fut: fmt::Debug, |
| { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("TryUnfold") |
| .field("state", &self.state) |
| .field("fut", &self.fut) |
| .finish() |
| } |
| } |
| |
| impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut> |
| where |
| F: FnMut(T) -> Fut, |
| Fut: Future<Output = Result<Option<(Item, T)>, E>>, |
| { |
| type Item = Result<Item, E>; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| |
| if let Some(state) = this.state.take() { |
| this.fut.set(Some((this.f)(state))); |
| } |
| |
| match this.fut.as_mut().as_pin_mut() { |
| None => { |
| // The future previously errored |
| Poll::Ready(None) |
| } |
| Some(future) => { |
| let step = ready!(future.poll(cx)); |
| this.fut.set(None); |
| |
| match step { |
| Ok(Some((item, next_state))) => { |
| *this.state = Some(next_state); |
| Poll::Ready(Some(Ok(item))) |
| } |
| Ok(None) => Poll::Ready(None), |
| Err(e) => Poll::Ready(Some(Err(e))), |
| } |
| } |
| } |
| } |
| } |
| |
| /// Extension trait for [`Stream`]. |
| pub trait StreamExt: Stream { |
| /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types. |
| fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> |
| where |
| Self: Unpin, |
| { |
| Stream::poll_next(Pin::new(self), cx) |
| } |
| |
| /// Retrieves the next item in the stream. |
| /// |
| /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to |
| /// resume iteration after that. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(1..=3); |
| /// |
| /// assert_eq!(s.next().await, Some(1)); |
| /// assert_eq!(s.next().await, Some(2)); |
| /// assert_eq!(s.next().await, Some(3)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn next(&mut self) -> NextFuture<'_, Self> |
| where |
| Self: Unpin, |
| { |
| NextFuture { stream: self } |
| } |
| |
| /// Retrieves the next item in the stream. |
| /// |
| /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns |
| /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`. |
| /// |
| /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]); |
| /// |
| /// assert_eq!(s.try_next().await, Ok(Some(1))); |
| /// assert_eq!(s.try_next().await, Ok(Some(2))); |
| /// assert_eq!(s.try_next().await, Err("error")); |
| /// assert_eq!(s.try_next().await, Ok(None)); |
| /// # }); |
| /// ``` |
| fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self> |
| where |
| Self: Stream<Item = Result<T, E>> + Unpin, |
| { |
| TryNextFuture { stream: self } |
| } |
| |
| /// Counts the number of items in the stream. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s1 = stream::iter(vec![0]); |
| /// let s2 = stream::iter(vec![1, 2, 3]); |
| /// |
| /// assert_eq!(s1.count().await, 1); |
| /// assert_eq!(s2.count().await, 3); |
| /// # }); |
| /// ``` |
| fn count(self) -> CountFuture<Self> |
| where |
| Self: Sized, |
| { |
| CountFuture { |
| stream: self, |
| count: 0, |
| } |
| } |
| |
| /// Maps items of the stream to new values using a closure. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![1, 2, 3]); |
| /// let mut s = s.map(|x| 2 * x); |
| /// |
| /// assert_eq!(s.next().await, Some(2)); |
| /// assert_eq!(s.next().await, Some(4)); |
| /// assert_eq!(s.next().await, Some(6)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn map<T, F>(self, f: F) -> Map<Self, F> |
| where |
| Self: Sized, |
| F: FnMut(Self::Item) -> T, |
| { |
| Map { stream: self, f } |
| } |
| |
| /// Maps items to streams and then concatenates them. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let words = stream::iter(vec!["one", "two"]); |
| /// |
| /// let s: String = words |
| /// .flat_map(|s| stream::iter(s.chars())) |
| /// .collect() |
| /// .await; |
| /// |
| /// assert_eq!(s, "onetwo"); |
| /// # }); |
| /// ``` |
| fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> |
| where |
| Self: Sized, |
| U: Stream, |
| F: FnMut(Self::Item) -> U, |
| { |
| FlatMap { |
| stream: self.map(f), |
| inner_stream: None, |
| } |
| } |
| |
| /// Concatenates inner streams. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s1 = stream::iter(vec![1, 2, 3]); |
| /// let s2 = stream::iter(vec![4, 5]); |
| /// |
| /// let s = stream::iter(vec![s1, s2]); |
| /// let v: Vec<_> = s.flatten().collect().await; |
| /// assert_eq!(v, [1, 2, 3, 4, 5]); |
| /// # }); |
| /// ``` |
| fn flatten(self) -> Flatten<Self> |
| where |
| Self: Sized, |
| Self::Item: Stream, |
| { |
| Flatten { |
| stream: self, |
| inner_stream: None, |
| } |
| } |
| |
| /// Maps items of the stream to new values using an async closure. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::pin; |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![1, 2, 3]); |
| /// let mut s = s.then(|x| async move { 2 * x }); |
| /// |
| /// pin!(s); |
| /// assert_eq!(s.next().await, Some(2)); |
| /// assert_eq!(s.next().await, Some(4)); |
| /// assert_eq!(s.next().await, Some(6)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut> |
| where |
| Self: Sized, |
| F: FnMut(Self::Item) -> Fut, |
| Fut: Future, |
| { |
| Then { |
| stream: self, |
| future: None, |
| f, |
| } |
| } |
| |
| /// Keeps items of the stream for which `predicate` returns `true`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![1, 2, 3, 4]); |
| /// let mut s = s.filter(|i| i % 2 == 0); |
| /// |
| /// assert_eq!(s.next().await, Some(2)); |
| /// assert_eq!(s.next().await, Some(4)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn filter<P>(self, predicate: P) -> Filter<Self, P> |
| where |
| Self: Sized, |
| P: FnMut(&Self::Item) -> bool, |
| { |
| Filter { |
| stream: self, |
| predicate, |
| } |
| } |
| |
| /// Filters and maps items of the stream using a closure. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]); |
| /// let mut s = s.filter_map(|a| a.parse::<u32>().ok()); |
| /// |
| /// assert_eq!(s.next().await, Some(1)); |
| /// assert_eq!(s.next().await, Some(3)); |
| /// assert_eq!(s.next().await, Some(5)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F> |
| where |
| Self: Sized, |
| F: FnMut(Self::Item) -> Option<T>, |
| { |
| FilterMap { stream: self, f } |
| } |
| |
| /// Takes only the first `n` items of the stream. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::repeat(7).take(2); |
| /// |
| /// assert_eq!(s.next().await, Some(7)); |
| /// assert_eq!(s.next().await, Some(7)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn take(self, n: usize) -> Take<Self> |
| where |
| Self: Sized, |
| { |
| Take { stream: self, n } |
| } |
| |
| /// Takes items while `predicate` returns `true`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![1, 2, 3, 4]); |
| /// let mut s = s.take_while(|x| *x < 3); |
| /// |
| /// assert_eq!(s.next().await, Some(1)); |
| /// assert_eq!(s.next().await, Some(2)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P> |
| where |
| Self: Sized, |
| P: FnMut(&Self::Item) -> bool, |
| { |
| TakeWhile { |
| stream: self, |
| predicate, |
| } |
| } |
| |
| /// Skips the first `n` items of the stream. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![1, 2, 3]); |
| /// let mut s = s.skip(2); |
| /// |
| /// assert_eq!(s.next().await, Some(3)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn skip(self, n: usize) -> Skip<Self> |
| where |
| Self: Sized, |
| { |
| Skip { stream: self, n } |
| } |
| |
| /// Skips items while `predicate` returns `true`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![-1i32, 0, 1]); |
| /// let mut s = s.skip_while(|x| x.is_negative()); |
| /// |
| /// assert_eq!(s.next().await, Some(0)); |
| /// assert_eq!(s.next().await, Some(1)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P> |
| where |
| Self: Sized, |
| P: FnMut(&Self::Item) -> bool, |
| { |
| SkipWhile { |
| stream: self, |
| predicate: Some(predicate), |
| } |
| } |
| |
| /// Yields every `step`th item. |
| /// |
| /// # Panics |
| /// |
| /// This method will panic if the `step` is 0. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![0, 1, 2, 3, 4]); |
| /// let mut s = s.step_by(2); |
| /// |
| /// assert_eq!(s.next().await, Some(0)); |
| /// assert_eq!(s.next().await, Some(2)); |
| /// assert_eq!(s.next().await, Some(4)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn step_by(self, step: usize) -> StepBy<Self> |
| where |
| Self: Sized, |
| { |
| assert!(step > 0, "`step` must be greater than zero"); |
| StepBy { |
| stream: self, |
| step, |
| i: 0, |
| } |
| } |
| |
| /// Appends another stream to the end of this one. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s1 = stream::iter(vec![1, 2]); |
| /// let s2 = stream::iter(vec![7, 8]); |
| /// let mut s = s1.chain(s2); |
| /// |
| /// assert_eq!(s.next().await, Some(1)); |
| /// assert_eq!(s.next().await, Some(2)); |
| /// assert_eq!(s.next().await, Some(7)); |
| /// assert_eq!(s.next().await, Some(8)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn chain<U>(self, other: U) -> Chain<Self, U> |
| where |
| Self: Sized, |
| U: Stream<Item = Self::Item> + Sized, |
| { |
| Chain { |
| first: self.fuse(), |
| second: other.fuse(), |
| } |
| } |
| |
| /// Clones all items. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![&1, &2]); |
| /// let mut s = s.cloned(); |
| /// |
| /// assert_eq!(s.next().await, Some(1)); |
| /// assert_eq!(s.next().await, Some(2)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn cloned<'a, T>(self) -> Cloned<Self> |
| where |
| Self: Stream<Item = &'a T> + Sized, |
| T: Clone + 'a, |
| { |
| Cloned { stream: self } |
| } |
| |
| /// Copies all items. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![&1, &2]); |
| /// let mut s = s.copied(); |
| /// |
| /// assert_eq!(s.next().await, Some(1)); |
| /// assert_eq!(s.next().await, Some(2)); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn copied<'a, T>(self) -> Copied<Self> |
| where |
| Self: Stream<Item = &'a T> + Sized, |
| T: Copy + 'a, |
| { |
| Copied { stream: self } |
| } |
| |
| /// Collects all items in the stream into a collection. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(1..=3); |
| /// |
| /// let items: Vec<_> = s.collect().await; |
| /// assert_eq!(items, [1, 2, 3]); |
| /// # }); |
| /// ``` |
| fn collect<C>(self) -> CollectFuture<Self, C> |
| where |
| Self: Sized, |
| C: Default + Extend<Self::Item>, |
| { |
| CollectFuture { |
| stream: self, |
| collection: Default::default(), |
| } |
| } |
| |
| /// Collects all items in the fallible stream into a collection. |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]); |
| /// let res: Result<Vec<i32>, i32> = s.try_collect().await; |
| /// assert_eq!(res, Err(2)); |
| /// |
| /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]); |
| /// let res: Result<Vec<i32>, i32> = s.try_collect().await; |
| /// assert_eq!(res, Ok(vec![1, 2, 3])); |
| /// # }) |
| /// ``` |
| fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C> |
| where |
| Self: Stream<Item = Result<T, E>> + Sized, |
| C: Default + Extend<T>, |
| { |
| TryCollectFuture { |
| stream: self, |
| items: Default::default(), |
| } |
| } |
| |
| /// Partitions items into those for which `predicate` is `true` and those for which it is |
| /// `false`, and then collects them into two collections. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![1, 2, 3]); |
| /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await; |
| /// |
| /// assert_eq!(even, &[2]); |
| /// assert_eq!(odd, &[1, 3]); |
| /// # }) |
| /// ``` |
| fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B> |
| where |
| Self: Sized, |
| B: Default + Extend<Self::Item>, |
| P: FnMut(&Self::Item) -> bool, |
| { |
| PartitionFuture { |
| stream: self, |
| predicate, |
| res: Some(Default::default()), |
| } |
| } |
| |
| /// Accumulates a computation over the stream. |
| /// |
| /// The computation begins with the accumulator value set to `init`, and then applies `f` to |
| /// the accumulator and each item in the stream. The final accumulator value is returned. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![1, 2, 3]); |
| /// let sum = s.fold(0, |acc, x| acc + x).await; |
| /// |
| /// assert_eq!(sum, 6); |
| /// # }) |
| /// ``` |
| fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T> |
| where |
| Self: Sized, |
| F: FnMut(T, Self::Item) -> T, |
| { |
| FoldFuture { |
| stream: self, |
| f, |
| acc: Some(init), |
| } |
| } |
| |
| /// Accumulates a fallible computation over the stream. |
| /// |
| /// The computation begins with the accumulator value set to `init`, and then applies `f` to |
| /// the accumulator and each item in the stream. The final accumulator value is returned, or an |
| /// error if `f` failed the computation. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]); |
| /// |
| /// let sum = s.try_fold(0, |acc, v| { |
| /// if (acc + v) % 2 == 1 { |
| /// Ok(acc + v) |
| /// } else { |
| /// Err("fail") |
| /// } |
| /// }) |
| /// .await; |
| /// |
| /// assert_eq!(sum, Err("fail")); |
| /// # }) |
| /// ``` |
| fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B> |
| where |
| Self: Stream<Item = Result<T, E>> + Unpin + Sized, |
| F: FnMut(B, T) -> Result<B, E>, |
| { |
| TryFoldFuture { |
| stream: self, |
| f, |
| acc: Some(init), |
| } |
| } |
| |
| /// Maps items of the stream to new values using a state value and a closure. |
| /// |
| /// Scanning begins with the inital state set to `initial_state`, and then applies `f` to the |
| /// state and each item in the stream. The stream stops when `f` returns `None`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![1, 2, 3]); |
| /// let mut s = s.scan(1, |state, x| { |
| /// *state = *state * x; |
| /// Some(-*state) |
| /// }); |
| /// |
| /// assert_eq!(s.next().await, Some(-1)); |
| /// assert_eq!(s.next().await, Some(-2)); |
| /// assert_eq!(s.next().await, Some(-6)); |
| /// assert_eq!(s.next().await, None); |
| /// # }) |
| /// ``` |
| fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F> |
| where |
| Self: Sized, |
| F: FnMut(&mut St, Self::Item) -> Option<B>, |
| { |
| Scan { |
| stream: self, |
| state_f: (initial_state, f), |
| } |
| } |
| |
| /// Fuses the stream so that it stops yielding items after the first [`None`]. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::once(1).fuse(); |
| /// |
| /// assert_eq!(s.next().await, Some(1)); |
| /// assert_eq!(s.next().await, None); |
| /// assert_eq!(s.next().await, None); |
| /// # }) |
| /// ``` |
| fn fuse(self) -> Fuse<Self> |
| where |
| Self: Sized, |
| { |
| Fuse { |
| stream: self, |
| done: false, |
| } |
| } |
| |
| /// Repeats the stream from beginning to end, forever. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(vec![1, 2]).cycle(); |
| /// |
| /// assert_eq!(s.next().await, Some(1)); |
| /// assert_eq!(s.next().await, Some(2)); |
| /// assert_eq!(s.next().await, Some(1)); |
| /// assert_eq!(s.next().await, Some(2)); |
| /// # }); |
| /// ``` |
| fn cycle(self) -> Cycle<Self> |
| where |
| Self: Clone + Sized, |
| { |
| Cycle { |
| orig: self.clone(), |
| stream: self, |
| } |
| } |
| |
| /// Enumerates items, mapping them to `(index, item)`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec!['a', 'b', 'c']); |
| /// let mut s = s.enumerate(); |
| /// |
| /// assert_eq!(s.next().await, Some((0, 'a'))); |
| /// assert_eq!(s.next().await, Some((1, 'b'))); |
| /// assert_eq!(s.next().await, Some((2, 'c'))); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn enumerate(self) -> Enumerate<Self> |
| where |
| Self: Sized, |
| { |
| Enumerate { stream: self, i: 0 } |
| } |
| |
| /// Calls a closure on each item and passes it on. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![1, 2, 3, 4, 5]); |
| /// |
| /// let sum = s |
| /// .inspect(|x| println!("about to filter {}", x)) |
| /// .filter(|x| x % 2 == 0) |
| /// .inspect(|x| println!("made it through filter: {}", x)) |
| /// .fold(0, |sum, i| sum + i) |
| /// .await; |
| /// # }); |
| /// ``` |
| fn inspect<F>(self, f: F) -> Inspect<Self, F> |
| where |
| Self: Sized, |
| F: FnMut(&Self::Item), |
| { |
| Inspect { stream: self, f } |
| } |
| |
| /// Gets the `n`th item of the stream. |
| /// |
| /// In the end, `n+1` items of the stream will be consumed. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]); |
| /// |
| /// assert_eq!(s.nth(2).await, Some(2)); |
| /// assert_eq!(s.nth(2).await, Some(5)); |
| /// assert_eq!(s.nth(2).await, None); |
| /// # }); |
| /// ``` |
| fn nth(&mut self, n: usize) -> NthFuture<'_, Self> |
| where |
| Self: Unpin, |
| { |
| NthFuture { stream: self, n } |
| } |
| |
| /// Returns the last item in the stream. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![1, 2, 3, 4]); |
| /// assert_eq!(s.last().await, Some(4)); |
| /// |
| /// let s = stream::empty::<i32>(); |
| /// assert_eq!(s.last().await, None); |
| /// # }); |
| /// ``` |
| fn last(self) -> LastFuture<Self> |
| where |
| Self: Sized, |
| { |
| LastFuture { |
| stream: self, |
| last: None, |
| } |
| } |
| |
| /// Finds the first item of the stream for which `predicate` returns `true`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(vec![11, 12, 13, 14]); |
| /// |
| /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12)); |
| /// assert_eq!(s.next().await, Some(13)); |
| /// # }); |
| /// ``` |
| fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P> |
| where |
| Self: Unpin, |
| P: FnMut(&Self::Item) -> bool, |
| { |
| FindFuture { |
| stream: self, |
| predicate, |
| } |
| } |
| |
| /// Applies a closure to items in the stream and returns the first [`Some`] result. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]); |
| /// let number = s.find_map(|s| s.parse().ok()).await; |
| /// |
| /// assert_eq!(number, Some(2)); |
| /// # }); |
| /// ``` |
| fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F> |
| where |
| Self: Unpin, |
| F: FnMut(Self::Item) -> Option<B>, |
| { |
| FindMapFuture { stream: self, f } |
| } |
| |
| /// Finds the index of the first item of the stream for which `predicate` returns `true`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]); |
| /// |
| /// assert_eq!(s.position(|x| x == 2).await, Some(2)); |
| /// assert_eq!(s.position(|x| x == 3).await, Some(0)); |
| /// assert_eq!(s.position(|x| x == 9).await, None); |
| /// # }); |
| /// ``` |
| fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P> |
| where |
| Self: Unpin, |
| P: FnMut(Self::Item) -> bool, |
| { |
| PositionFuture { |
| stream: self, |
| predicate, |
| index: 0, |
| } |
| } |
| |
| /// Tests if `predicate` returns `true` for all items in the stream. |
| /// |
| /// The result is `true` for an empty stream. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(vec![1, 2, 3]); |
| /// assert!(!s.all(|x| x % 2 == 0).await); |
| /// |
| /// let mut s = stream::iter(vec![2, 4, 6, 8]); |
| /// assert!(s.all(|x| x % 2 == 0).await); |
| /// |
| /// let mut s = stream::empty::<i32>(); |
| /// assert!(s.all(|x| x % 2 == 0).await); |
| /// # }); |
| /// ``` |
| fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P> |
| where |
| Self: Unpin, |
| P: FnMut(Self::Item) -> bool, |
| { |
| AllFuture { |
| stream: self, |
| predicate, |
| } |
| } |
| |
| /// Tests if `predicate` returns `true` for any item in the stream. |
| /// |
| /// The result is `false` for an empty stream. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(vec![1, 3, 5, 7]); |
| /// assert!(!s.any(|x| x % 2 == 0).await); |
| /// |
| /// let mut s = stream::iter(vec![1, 2, 3]); |
| /// assert!(s.any(|x| x % 2 == 0).await); |
| /// |
| /// let mut s = stream::empty::<i32>(); |
| /// assert!(!s.any(|x| x % 2 == 0).await); |
| /// # }); |
| /// ``` |
| fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P> |
| where |
| Self: Unpin, |
| P: FnMut(Self::Item) -> bool, |
| { |
| AnyFuture { |
| stream: self, |
| predicate, |
| } |
| } |
| |
| /// Calls a closure on each item of the stream. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(vec![1, 2, 3]); |
| /// s.for_each(|s| println!("{}", s)).await; |
| /// # }); |
| /// ``` |
| fn for_each<F>(self, f: F) -> ForEachFuture<Self, F> |
| where |
| Self: Sized, |
| F: FnMut(Self::Item), |
| { |
| ForEachFuture { stream: self, f } |
| } |
| |
| /// Calls a fallible closure on each item of the stream, stopping on first error. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let mut s = stream::iter(vec![0, 1, 2, 3]); |
| /// |
| /// let mut v = vec![]; |
| /// let res = s |
| /// .try_for_each(|n| { |
| /// if n < 2 { |
| /// v.push(n); |
| /// Ok(()) |
| /// } else { |
| /// Err("too big") |
| /// } |
| /// }) |
| /// .await; |
| /// |
| /// assert_eq!(v, &[0, 1]); |
| /// assert_eq!(res, Err("too big")); |
| /// # }); |
| /// ``` |
| fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F> |
| where |
| Self: Unpin, |
| F: FnMut(Self::Item) -> Result<(), E>, |
| { |
| TryForEachFuture { stream: self, f } |
| } |
| |
| /// Zips up two streams into a single stream of pairs. |
| /// |
| /// The stream of pairs stops when either of the original two streams is exhausted. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let l = stream::iter(vec![1, 2, 3]); |
| /// let r = stream::iter(vec![4, 5, 6, 7]); |
| /// let mut s = l.zip(r); |
| /// |
| /// assert_eq!(s.next().await, Some((1, 4))); |
| /// assert_eq!(s.next().await, Some((2, 5))); |
| /// assert_eq!(s.next().await, Some((3, 6))); |
| /// assert_eq!(s.next().await, None); |
| /// # }); |
| /// ``` |
| fn zip<U>(self, other: U) -> Zip<Self, U> |
| where |
| Self: Sized, |
| U: Stream, |
| { |
| Zip { |
| item_slot: None, |
| first: self, |
| second: other, |
| } |
| } |
| |
| /// Collects a stream of pairs into a pair of collections. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let s = stream::iter(vec![(1, 2), (3, 4)]); |
| /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await; |
| /// |
| /// assert_eq!(left, [1, 3]); |
| /// assert_eq!(right, [2, 4]); |
| /// # }); |
| /// ``` |
| fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB> |
| where |
| FromA: Default + Extend<A>, |
| FromB: Default + Extend<B>, |
| Self: Stream<Item = (A, B)> + Sized, |
| { |
| UnzipFuture { |
| stream: self, |
| res: Some(Default::default()), |
| } |
| } |
| |
| /// Merges with `other` stream, preferring items from `self` whenever both streams are ready. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// use futures_lite::stream::{once, pending}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// assert_eq!(once(1).or(pending()).next().await, Some(1)); |
| /// assert_eq!(pending().or(once(2)).next().await, Some(2)); |
| /// |
| /// // The first future wins. |
| /// assert_eq!(once(1).or(once(2)).next().await, Some(1)); |
| /// # }) |
| /// ``` |
| fn or<S>(self, other: S) -> Or<Self, S> |
| where |
| Self: Sized, |
| S: Stream<Item = Self::Item>, |
| { |
| Or { |
| stream1: self, |
| stream2: other, |
| } |
| } |
| |
| /// Merges with `other` stream, with no preference for either stream when both are ready. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// use futures_lite::stream::{once, pending}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// assert_eq!(once(1).race(pending()).next().await, Some(1)); |
| /// assert_eq!(pending().race(once(2)).next().await, Some(2)); |
| /// |
| /// // One of the two stream is randomly chosen as the winner. |
| /// let res = once(1).race(once(2)).next().await; |
| /// # }) |
| /// ``` |
| #[cfg(feature = "std")] |
| fn race<S>(self, other: S) -> Race<Self, S> |
| where |
| Self: Sized, |
| S: Stream<Item = Self::Item>, |
| { |
| Race { |
| stream1: self, |
| stream2: other, |
| } |
| } |
| |
| /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let a = stream::once(1); |
| /// let b = stream::empty(); |
| /// |
| /// // Streams of different types can be stored in |
| /// // the same collection when they are boxed: |
| /// let streams = vec![a.boxed(), b.boxed()]; |
| /// # }) |
| /// ``` |
| #[cfg(feature = "alloc")] |
| fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>> |
| where |
| Self: Send + Sized + 'a, |
| { |
| Box::pin(self) |
| } |
| |
| /// Boxes the stream and changes its type to `dyn Stream + 'a`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// let a = stream::once(1); |
| /// let b = stream::empty(); |
| /// |
| /// // Streams of different types can be stored in |
| /// // the same collection when they are boxed: |
| /// let streams = vec![a.boxed_local(), b.boxed_local()]; |
| /// # }) |
| /// ``` |
| #[cfg(feature = "alloc")] |
| fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>> |
| where |
| Self: Sized + 'a, |
| { |
| Box::pin(self) |
| } |
| } |
| |
| impl<S: Stream + ?Sized> StreamExt for S {} |
| |
| /// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// // These two lines are equivalent: |
| /// let s1: stream::Boxed<i32> = stream::once(7).boxed(); |
| /// let s2: stream::Boxed<i32> = Box::pin(stream::once(7)); |
| /// ``` |
| #[cfg(feature = "alloc")] |
| pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>; |
| |
| /// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, StreamExt}; |
| /// |
| /// // These two lines are equivalent: |
| /// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local(); |
| /// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7)); |
| /// ``` |
| #[cfg(feature = "alloc")] |
| pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>; |
| |
| /// Future for the [`StreamExt::next()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct NextFuture<'a, S: ?Sized> { |
| stream: &'a mut S, |
| } |
| |
| impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {} |
| |
| impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> { |
| type Output = Option<S::Item>; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| self.stream.poll_next(cx) |
| } |
| } |
| |
| /// Future for the [`StreamExt::try_next()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct TryNextFuture<'a, S: ?Sized> { |
| stream: &'a mut S, |
| } |
| |
| impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {} |
| |
| impl<T, E, S> Future for TryNextFuture<'_, S> |
| where |
| S: Stream<Item = Result<T, E>> + Unpin + ?Sized, |
| { |
| type Output = Result<Option<T>, E>; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| let res = ready!(self.stream.poll_next(cx)); |
| Poll::Ready(res.transpose()) |
| } |
| } |
| |
| pin_project! { |
| /// Future for the [`StreamExt::count()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct CountFuture<S: ?Sized> { |
| count: usize, |
| #[pin] |
| stream: S, |
| } |
| } |
| |
| impl<S: Stream + ?Sized> Future for CountFuture<S> { |
| type Output = usize; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| loop { |
| match ready!(self.as_mut().project().stream.poll_next(cx)) { |
| None => return Poll::Ready(self.count), |
| Some(_) => *self.as_mut().project().count += 1, |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Future for the [`StreamExt::collect()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct CollectFuture<S, C> { |
| #[pin] |
| stream: S, |
| collection: C, |
| } |
| } |
| |
| impl<S, C> Future for CollectFuture<S, C> |
| where |
| S: Stream, |
| C: Default + Extend<S::Item>, |
| { |
| type Output = C; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> { |
| let mut this = self.as_mut().project(); |
| loop { |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| Some(e) => this.collection.extend(Some(e)), |
| None => { |
| return Poll::Ready(mem::replace(self.project().collection, Default::default())) |
| } |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Future for the [`StreamExt::try_collect()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct TryCollectFuture<S, C> { |
| #[pin] |
| stream: S, |
| items: C, |
| } |
| } |
| |
| impl<T, E, S, C> Future for TryCollectFuture<S, C> |
| where |
| S: Stream<Item = Result<T, E>>, |
| C: Default + Extend<T>, |
| { |
| type Output = Result<C, E>; |
| |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| let mut this = self.project(); |
| Poll::Ready(Ok(loop { |
| match ready!(this.stream.as_mut().poll_next(cx)?) { |
| Some(x) => this.items.extend(Some(x)), |
| None => break mem::replace(this.items, Default::default()), |
| } |
| })) |
| } |
| } |
| |
| pin_project! { |
| /// Future for the [`StreamExt::partition()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct PartitionFuture<S, P, B> { |
| #[pin] |
| stream: S, |
| predicate: P, |
| res: Option<(B, B)>, |
| } |
| } |
| |
| impl<S, P, B> Future for PartitionFuture<S, P, B> |
| where |
| S: Stream + Sized, |
| P: FnMut(&S::Item) -> bool, |
| B: Default + Extend<S::Item>, |
| { |
| type Output = (B, B); |
| |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| let mut this = self.project(); |
| loop { |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| Some(v) => { |
| let res = this.res.as_mut().unwrap(); |
| if (this.predicate)(&v) { |
| res.0.extend(Some(v)) |
| } else { |
| res.1.extend(Some(v)) |
| } |
| } |
| None => return Poll::Ready(this.res.take().unwrap()), |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Future for the [`StreamExt::fold()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct FoldFuture<S, F, T> { |
| #[pin] |
| stream: S, |
| f: F, |
| acc: Option<T>, |
| } |
| } |
| |
| impl<S, F, T> Future for FoldFuture<S, F, T> |
| where |
| S: Stream, |
| F: FnMut(T, S::Item) -> T, |
| { |
| type Output = T; |
| |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| let mut this = self.project(); |
| loop { |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| Some(v) => { |
| let old = this.acc.take().unwrap(); |
| let new = (this.f)(old, v); |
| *this.acc = Some(new); |
| } |
| None => return Poll::Ready(this.acc.take().unwrap()), |
| } |
| } |
| } |
| } |
| |
| /// Future for the [`StreamExt::try_fold()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct TryFoldFuture<'a, S, F, B> { |
| stream: &'a mut S, |
| f: F, |
| acc: Option<B>, |
| } |
| |
| impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {} |
| |
| impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B> |
| where |
| S: Stream<Item = Result<T, E>> + Unpin, |
| F: FnMut(B, T) -> Result<B, E>, |
| { |
| type Output = Result<B, E>; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| loop { |
| match ready!(self.stream.poll_next(cx)) { |
| Some(Err(e)) => return Poll::Ready(Err(e)), |
| Some(Ok(t)) => { |
| let old = self.acc.take().unwrap(); |
| let new = (&mut self.f)(old, t); |
| |
| match new { |
| Ok(t) => self.acc = Some(t), |
| Err(e) => return Poll::Ready(Err(e)), |
| } |
| } |
| None => return Poll::Ready(Ok(self.acc.take().unwrap())), |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::scan()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Scan<S, St, F> { |
| #[pin] |
| stream: S, |
| state_f: (St, F), |
| } |
| } |
| |
| impl<S, St, F, B> Stream for Scan<S, St, F> |
| where |
| S: Stream, |
| F: FnMut(&mut St, S::Item) -> Option<B>, |
| { |
| type Item = B; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> { |
| let mut this = self.project(); |
| this.stream.as_mut().poll_next(cx).map(|item| { |
| item.and_then(|item| { |
| let (state, f) = this.state_f; |
| f(state, item) |
| }) |
| }) |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::fuse()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Fuse<S> { |
| #[pin] |
| stream: S, |
| done: bool, |
| } |
| } |
| |
| impl<S: Stream> Stream for Fuse<S> { |
| type Item = S::Item; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { |
| let this = self.project(); |
| |
| if *this.done { |
| Poll::Ready(None) |
| } else { |
| let next = ready!(this.stream.poll_next(cx)); |
| if next.is_none() { |
| *this.done = true; |
| } |
| Poll::Ready(next) |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::map()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Map<S, F> { |
| #[pin] |
| stream: S, |
| f: F, |
| } |
| } |
| |
| impl<S, F, T> Stream for Map<S, F> |
| where |
| S: Stream, |
| F: FnMut(S::Item) -> T, |
| { |
| type Item = T; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let this = self.project(); |
| let next = ready!(this.stream.poll_next(cx)); |
| Poll::Ready(next.map(this.f)) |
| } |
| |
| fn size_hint(&self) -> (usize, Option<usize>) { |
| self.stream.size_hint() |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::flat_map()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct FlatMap<S, U, F> { |
| #[pin] |
| stream: Map<S, F>, |
| #[pin] |
| inner_stream: Option<U>, |
| } |
| } |
| |
| impl<S, U, F> Stream for FlatMap<S, U, F> |
| where |
| S: Stream, |
| U: Stream, |
| F: FnMut(S::Item) -> U, |
| { |
| type Item = U::Item; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| loop { |
| if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { |
| match ready!(inner.poll_next(cx)) { |
| Some(item) => return Poll::Ready(Some(item)), |
| None => this.inner_stream.set(None), |
| } |
| } |
| |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| Some(stream) => this.inner_stream.set(Some(stream)), |
| None => return Poll::Ready(None), |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::flat_map()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Flatten<S: Stream> { |
| #[pin] |
| stream: S, |
| #[pin] |
| inner_stream: Option<S::Item>, |
| } |
| } |
| |
| impl<S, U> Stream for Flatten<S> |
| where |
| S: Stream<Item = U>, |
| U: Stream, |
| { |
| type Item = U::Item; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| loop { |
| if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { |
| match ready!(inner.poll_next(cx)) { |
| Some(item) => return Poll::Ready(Some(item)), |
| None => this.inner_stream.set(None), |
| } |
| } |
| |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| Some(inner) => this.inner_stream.set(Some(inner)), |
| None => return Poll::Ready(None), |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::then()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Then<S, F, Fut> { |
| #[pin] |
| stream: S, |
| #[pin] |
| future: Option<Fut>, |
| f: F, |
| } |
| } |
| |
| impl<S, F, Fut> Stream for Then<S, F, Fut> |
| where |
| S: Stream, |
| F: FnMut(S::Item) -> Fut, |
| Fut: Future, |
| { |
| type Item = Fut::Output; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| |
| loop { |
| if let Some(fut) = this.future.as_mut().as_pin_mut() { |
| let item = ready!(fut.poll(cx)); |
| this.future.set(None); |
| return Poll::Ready(Some(item)); |
| } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { |
| this.future.set(Some((this.f)(item))); |
| } else { |
| return Poll::Ready(None); |
| } |
| } |
| } |
| |
| fn size_hint(&self) -> (usize, Option<usize>) { |
| let future_len = if self.future.is_some() { 1 } else { 0 }; |
| let (lower, upper) = self.stream.size_hint(); |
| let lower = lower.saturating_add(future_len); |
| let upper = upper.and_then(|u| u.checked_add(future_len)); |
| (lower, upper) |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::filter()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Filter<S, P> { |
| #[pin] |
| stream: S, |
| predicate: P, |
| } |
| } |
| |
| impl<S, P> Stream for Filter<S, P> |
| where |
| S: Stream, |
| P: FnMut(&S::Item) -> bool, |
| { |
| type Item = S::Item; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| loop { |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| None => return Poll::Ready(None), |
| Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)), |
| Some(_) => {} |
| } |
| } |
| } |
| } |
| |
| /// Merges two streams, preferring items from `stream1` whenever both streams are ready. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, once, pending, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// assert_eq!(stream::or(once(1), pending()).next().await, Some(1)); |
| /// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2)); |
| /// |
| /// // The first stream wins. |
| /// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1)); |
| /// # }) |
| /// ``` |
| pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2> |
| where |
| S1: Stream<Item = T>, |
| S2: Stream<Item = T>, |
| { |
| Or { stream1, stream2 } |
| } |
| |
| pin_project! { |
| /// Stream for the [`or()`] function and the [`StreamExt::or()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Or<S1, S2> { |
| #[pin] |
| stream1: S1, |
| #[pin] |
| stream2: S2, |
| } |
| } |
| |
| impl<T, S1, S2> Stream for Or<S1, S2> |
| where |
| S1: Stream<Item = T>, |
| S2: Stream<Item = T>, |
| { |
| type Item = T; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| |
| if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) { |
| return Poll::Ready(Some(t)); |
| } |
| this.stream2.as_mut().poll_next(cx) |
| } |
| } |
| |
| /// Merges two streams, with no preference for either stream when both are ready. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use futures_lite::stream::{self, once, pending, StreamExt}; |
| /// |
| /// # spin_on::spin_on(async { |
| /// assert_eq!(stream::race(once(1), pending()).next().await, Some(1)); |
| /// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2)); |
| /// |
| /// // One of the two stream is randomly chosen as the winner. |
| /// let res = stream::race(once(1), once(2)).next().await; |
| /// # }) |
| #[cfg(feature = "std")] |
| pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2> |
| where |
| S1: Stream<Item = T>, |
| S2: Stream<Item = T>, |
| { |
| Race { stream1, stream2 } |
| } |
| |
| #[cfg(feature = "std")] |
| pin_project! { |
| /// Stream for the [`race()`] function and the [`StreamExt::race()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Race<S1, S2> { |
| #[pin] |
| stream1: S1, |
| #[pin] |
| stream2: S2, |
| } |
| } |
| |
| #[cfg(feature = "std")] |
| impl<T, S1, S2> Stream for Race<S1, S2> |
| where |
| S1: Stream<Item = T>, |
| S2: Stream<Item = T>, |
| { |
| type Item = T; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| |
| if fastrand::bool() { |
| if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) { |
| return Poll::Ready(Some(t)); |
| } |
| if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) { |
| return Poll::Ready(Some(t)); |
| } |
| } else { |
| if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) { |
| return Poll::Ready(Some(t)); |
| } |
| if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) { |
| return Poll::Ready(Some(t)); |
| } |
| } |
| Poll::Pending |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::filter_map()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct FilterMap<S, F> { |
| #[pin] |
| stream: S, |
| f: F, |
| } |
| } |
| |
| impl<S, F, T> Stream for FilterMap<S, F> |
| where |
| S: Stream, |
| F: FnMut(S::Item) -> Option<T>, |
| { |
| type Item = T; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| loop { |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| None => return Poll::Ready(None), |
| Some(v) => { |
| if let Some(t) = (this.f)(v) { |
| return Poll::Ready(Some(t)); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::take()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Take<S> { |
| #[pin] |
| stream: S, |
| n: usize, |
| } |
| } |
| |
| impl<S: Stream> Stream for Take<S> { |
| type Item = S::Item; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { |
| let this = self.project(); |
| |
| if *this.n == 0 { |
| Poll::Ready(None) |
| } else { |
| let next = ready!(this.stream.poll_next(cx)); |
| match next { |
| Some(_) => *this.n -= 1, |
| None => *this.n = 0, |
| } |
| Poll::Ready(next) |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::take_while()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct TakeWhile<S, P> { |
| #[pin] |
| stream: S, |
| predicate: P, |
| } |
| } |
| |
| impl<S, P> Stream for TakeWhile<S, P> |
| where |
| S: Stream, |
| P: FnMut(&S::Item) -> bool, |
| { |
| type Item = S::Item; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let this = self.project(); |
| |
| match ready!(this.stream.poll_next(cx)) { |
| Some(v) => { |
| if (this.predicate)(&v) { |
| Poll::Ready(Some(v)) |
| } else { |
| Poll::Ready(None) |
| } |
| } |
| None => Poll::Ready(None), |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::skip()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Skip<S> { |
| #[pin] |
| stream: S, |
| n: usize, |
| } |
| } |
| |
| impl<S: Stream> Stream for Skip<S> { |
| type Item = S::Item; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| loop { |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| Some(v) => match *this.n { |
| 0 => return Poll::Ready(Some(v)), |
| _ => *this.n -= 1, |
| }, |
| None => return Poll::Ready(None), |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::skip_while()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct SkipWhile<S, P> { |
| #[pin] |
| stream: S, |
| predicate: Option<P>, |
| } |
| } |
| |
| impl<S, P> Stream for SkipWhile<S, P> |
| where |
| S: Stream, |
| P: FnMut(&S::Item) -> bool, |
| { |
| type Item = S::Item; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| loop { |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| Some(v) => match this.predicate { |
| Some(p) => { |
| if !p(&v) { |
| *this.predicate = None; |
| return Poll::Ready(Some(v)); |
| } |
| } |
| None => return Poll::Ready(Some(v)), |
| }, |
| None => return Poll::Ready(None), |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::step_by()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct StepBy<S> { |
| #[pin] |
| stream: S, |
| step: usize, |
| i: usize, |
| } |
| } |
| |
| impl<S: Stream> Stream for StepBy<S> { |
| type Item = S::Item; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| loop { |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| Some(v) => { |
| if *this.i == 0 { |
| *this.i = *this.step - 1; |
| return Poll::Ready(Some(v)); |
| } else { |
| *this.i -= 1; |
| } |
| } |
| None => return Poll::Ready(None), |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::chain()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Chain<S, U> { |
| #[pin] |
| first: Fuse<S>, |
| #[pin] |
| second: Fuse<U>, |
| } |
| } |
| |
| impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> { |
| type Item = S::Item; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| |
| if !this.first.done { |
| let next = ready!(this.first.as_mut().poll_next(cx)); |
| if let Some(next) = next { |
| return Poll::Ready(Some(next)); |
| } |
| } |
| |
| if !this.second.done { |
| let next = ready!(this.second.as_mut().poll_next(cx)); |
| if let Some(next) = next { |
| return Poll::Ready(Some(next)); |
| } |
| } |
| |
| if this.first.done && this.second.done { |
| Poll::Ready(None) |
| } else { |
| Poll::Pending |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::cloned()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Cloned<S> { |
| #[pin] |
| stream: S, |
| } |
| } |
| |
| impl<'a, S, T: 'a> Stream for Cloned<S> |
| where |
| S: Stream<Item = &'a T>, |
| T: Clone, |
| { |
| type Item = T; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let this = self.project(); |
| let next = ready!(this.stream.poll_next(cx)); |
| Poll::Ready(next.cloned()) |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::copied()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Copied<S> { |
| #[pin] |
| stream: S, |
| } |
| } |
| |
| impl<'a, S, T: 'a> Stream for Copied<S> |
| where |
| S: Stream<Item = &'a T>, |
| T: Copy, |
| { |
| type Item = T; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let this = self.project(); |
| let next = ready!(this.stream.poll_next(cx)); |
| Poll::Ready(next.copied()) |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::cycle()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Cycle<S> { |
| orig: S, |
| #[pin] |
| stream: S, |
| } |
| } |
| |
| impl<S> Stream for Cycle<S> |
| where |
| S: Stream + Clone, |
| { |
| type Item = S::Item; |
| |
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) { |
| Some(item) => Poll::Ready(Some(item)), |
| None => { |
| let new = self.as_mut().orig.clone(); |
| self.as_mut().project().stream.set(new); |
| self.project().stream.poll_next(cx) |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::cycle()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Enumerate<S> { |
| #[pin] |
| stream: S, |
| i: usize, |
| } |
| } |
| |
| impl<S> Stream for Enumerate<S> |
| where |
| S: Stream, |
| { |
| type Item = (usize, S::Item); |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let this = self.project(); |
| |
| match ready!(this.stream.poll_next(cx)) { |
| Some(v) => { |
| let ret = (*this.i, v); |
| *this.i += 1; |
| Poll::Ready(Some(ret)) |
| } |
| None => Poll::Ready(None), |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::inspect()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Inspect<S, F> { |
| #[pin] |
| stream: S, |
| f: F, |
| } |
| } |
| |
| impl<S, F> Stream for Inspect<S, F> |
| where |
| S: Stream, |
| F: FnMut(&S::Item), |
| { |
| type Item = S::Item; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| let next = ready!(this.stream.as_mut().poll_next(cx)); |
| if let Some(x) = &next { |
| (this.f)(x); |
| } |
| Poll::Ready(next) |
| } |
| } |
| |
| /// Future for the [`StreamExt::nth()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct NthFuture<'a, S: ?Sized> { |
| stream: &'a mut S, |
| n: usize, |
| } |
| |
| impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {} |
| |
| impl<'a, S> Future for NthFuture<'a, S> |
| where |
| S: Stream + Unpin + ?Sized, |
| { |
| type Output = Option<S::Item>; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| loop { |
| match ready!(self.stream.poll_next(cx)) { |
| Some(v) => match self.n { |
| 0 => return Poll::Ready(Some(v)), |
| _ => self.n -= 1, |
| }, |
| None => return Poll::Ready(None), |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Future for the [`StreamExt::last()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct LastFuture<S: Stream> { |
| #[pin] |
| stream: S, |
| last: Option<S::Item>, |
| } |
| } |
| |
| impl<S: Stream> Future for LastFuture<S> { |
| type Output = Option<S::Item>; |
| |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| let mut this = self.project(); |
| loop { |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| Some(new) => *this.last = Some(new), |
| None => return Poll::Ready(this.last.take()), |
| } |
| } |
| } |
| } |
| |
| /// Future for the [`StreamExt::find()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct FindFuture<'a, S: ?Sized, P> { |
| stream: &'a mut S, |
| predicate: P, |
| } |
| |
| impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {} |
| |
| impl<'a, S, P> Future for FindFuture<'a, S, P> |
| where |
| S: Stream + Unpin + ?Sized, |
| P: FnMut(&S::Item) -> bool, |
| { |
| type Output = Option<S::Item>; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| loop { |
| match ready!(self.stream.poll_next(cx)) { |
| Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)), |
| Some(_) => {} |
| None => return Poll::Ready(None), |
| } |
| } |
| } |
| } |
| |
| /// Future for the [`StreamExt::find_map()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct FindMapFuture<'a, S: ?Sized, F> { |
| stream: &'a mut S, |
| f: F, |
| } |
| |
| impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {} |
| |
| impl<'a, S, B, F> Future for FindMapFuture<'a, S, F> |
| where |
| S: Stream + Unpin + ?Sized, |
| F: FnMut(S::Item) -> Option<B>, |
| { |
| type Output = Option<B>; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| loop { |
| match ready!(self.stream.poll_next(cx)) { |
| Some(v) => { |
| if let Some(v) = (&mut self.f)(v) { |
| return Poll::Ready(Some(v)); |
| } |
| } |
| None => return Poll::Ready(None), |
| } |
| } |
| } |
| } |
| |
| /// Future for the [`StreamExt::position()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct PositionFuture<'a, S: ?Sized, P> { |
| stream: &'a mut S, |
| predicate: P, |
| index: usize, |
| } |
| |
| impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {} |
| |
| impl<'a, S, P> Future for PositionFuture<'a, S, P> |
| where |
| S: Stream + Unpin + ?Sized, |
| P: FnMut(S::Item) -> bool, |
| { |
| type Output = Option<usize>; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| loop { |
| match ready!(self.stream.poll_next(cx)) { |
| Some(v) => { |
| if (&mut self.predicate)(v) { |
| return Poll::Ready(Some(self.index)); |
| } else { |
| self.index += 1; |
| } |
| } |
| None => return Poll::Ready(None), |
| } |
| } |
| } |
| } |
| |
| /// Future for the [`StreamExt::all()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct AllFuture<'a, S: ?Sized, P> { |
| stream: &'a mut S, |
| predicate: P, |
| } |
| |
| impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {} |
| |
| impl<S, P> Future for AllFuture<'_, S, P> |
| where |
| S: Stream + Unpin + ?Sized, |
| P: FnMut(S::Item) -> bool, |
| { |
| type Output = bool; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| loop { |
| match ready!(self.stream.poll_next(cx)) { |
| Some(v) => { |
| if !(&mut self.predicate)(v) { |
| return Poll::Ready(false); |
| } |
| } |
| None => return Poll::Ready(true), |
| } |
| } |
| } |
| } |
| |
| /// Future for the [`StreamExt::any()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct AnyFuture<'a, S: ?Sized, P> { |
| stream: &'a mut S, |
| predicate: P, |
| } |
| |
| impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {} |
| |
| impl<S, P> Future for AnyFuture<'_, S, P> |
| where |
| S: Stream + Unpin + ?Sized, |
| P: FnMut(S::Item) -> bool, |
| { |
| type Output = bool; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| loop { |
| match ready!(self.stream.poll_next(cx)) { |
| Some(v) => { |
| if (&mut self.predicate)(v) { |
| return Poll::Ready(true); |
| } |
| } |
| None => return Poll::Ready(false), |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Future for the [`StreamExt::for_each()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct ForEachFuture<S, F> { |
| #[pin] |
| stream: S, |
| f: F, |
| } |
| } |
| |
| impl<S, F> Future for ForEachFuture<S, F> |
| where |
| S: Stream, |
| F: FnMut(S::Item), |
| { |
| type Output = (); |
| |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| let mut this = self.project(); |
| loop { |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| Some(v) => (this.f)(v), |
| None => return Poll::Ready(()), |
| } |
| } |
| } |
| } |
| |
| /// Future for the [`StreamExt::try_for_each()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct TryForEachFuture<'a, S: ?Sized, F> { |
| stream: &'a mut S, |
| f: F, |
| } |
| |
| impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {} |
| |
| impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F> |
| where |
| S: Stream + Unpin + ?Sized, |
| F: FnMut(S::Item) -> Result<(), E>, |
| { |
| type Output = Result<(), E>; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| loop { |
| match ready!(self.stream.poll_next(cx)) { |
| None => return Poll::Ready(Ok(())), |
| Some(v) => (&mut self.f)(v)?, |
| } |
| } |
| } |
| } |
| |
| pin_project! { |
| /// Stream for the [`StreamExt::zip()`] method. |
| #[derive(Clone, Debug)] |
| #[must_use = "streams do nothing unless polled"] |
| pub struct Zip<A: Stream, B> { |
| item_slot: Option<A::Item>, |
| #[pin] |
| first: A, |
| #[pin] |
| second: B, |
| } |
| } |
| |
| impl<A: Stream, B: Stream> Stream for Zip<A, B> { |
| type Item = (A::Item, B::Item); |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let this = self.project(); |
| |
| if this.item_slot.is_none() { |
| match this.first.poll_next(cx) { |
| Poll::Pending => return Poll::Pending, |
| Poll::Ready(None) => return Poll::Ready(None), |
| Poll::Ready(Some(item)) => *this.item_slot = Some(item), |
| } |
| } |
| |
| let second_item = ready!(this.second.poll_next(cx)); |
| let first_item = this.item_slot.take().unwrap(); |
| Poll::Ready(second_item.map(|second_item| (first_item, second_item))) |
| } |
| } |
| |
| pin_project! { |
| /// Future for the [`StreamExt::unzip()`] method. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless you `.await` or poll them"] |
| pub struct UnzipFuture<S, FromA, FromB> { |
| #[pin] |
| stream: S, |
| res: Option<(FromA, FromB)>, |
| } |
| } |
| |
| impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB> |
| where |
| S: Stream<Item = (A, B)>, |
| FromA: Default + Extend<A>, |
| FromB: Default + Extend<B>, |
| { |
| type Output = (FromA, FromB); |
| |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| let mut this = self.project(); |
| |
| loop { |
| match ready!(this.stream.as_mut().poll_next(cx)) { |
| Some((a, b)) => { |
| let res = this.res.as_mut().unwrap(); |
| res.0.extend(Some(a)); |
| res.1.extend(Some(b)); |
| } |
| None => return Poll::Ready(this.res.take().unwrap()), |
| } |
| } |
| } |
| } |