// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use futures::ready;
use futures::stream::{Fuse, Stream, StreamExt};
use futures::task::{Context, Poll};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::marker::Unpin;
use std::pin::Pin;
pub struct GroupAvailable<S, T, E>
S: Stream<Item = Result<T, E>>,
stream: Fuse<S>,
error: Option<E>,
impl<S, T, E> Unpin for GroupAvailable<S, T, E> where S: Unpin + Stream<Item = Result<T, E>> {}
impl<S, T, E> GroupAvailable<S, T, E>
S: Unpin + Stream<Item = Result<T, E>>,
// Safety: projecting to `Fuse<S>` is safe because GroupAvailable is `!Unpin`
// when `S` is `!Unpin`, and `GroupAvailable` doesn't move out of `stream`.
unsafe_pinned!(stream: Fuse<S>);
// Safety: nothing requires `error` not to move.
unsafe_unpinned!(error: Option<E>);
impl<S, T, E> Stream for GroupAvailable<S, T, E>
S: Unpin + Stream<Item = Result<T, E>>,
type Item = Result<Vec<T>, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(e) = self.as_mut().error().take() {
return Poll::Ready(Some(Err(e)));
let mut batch = match ready!(self.as_mut().stream().poll_next(cx)?) {
Some(item) => vec![item],
None => return Poll::Ready(None),
loop {
match self.as_mut().stream().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => batch.push(item),
Poll::Ready(None) | Poll::Pending => break,
Poll::Ready(Some(Err(e))) => {
*self.error() = Some(e);
pub trait GroupAvailableExt: Stream {
/// An adaptor for grouping readily available messages into a single Vec
/// item.
/// Similar to StreamExt.chunks(), except the size of produced batches can
/// be arbitrary, and only depends on how many items were available when
/// the stream was polled.
fn group_available<T, E>(self) -> GroupAvailable<Self, T, E>
Self: Stream<Item = Result<T, E>>,
Self: Sized,
GroupAvailable { stream: self.fuse(), error: None }
impl<T> GroupAvailableExt for T where T: Stream + ?Sized {}
mod tests {
use super::*;
use fuchsia_async::{self as fasync, temp::TempStreamExt};
use futures::channel::mpsc;
use futures::stream::{self, TryStreamExt};
use void::Void;
fn empty() {
let mut exec = fasync::Executor::new().expect("Failed to create an executor");
let (item, _) = exec
stream::empty::<Result<(), Void>>().group_available().try_into_future(),
.unwrap_or_else(|x| match x {});
fn pending() {
let mut exec = fasync::Executor::new().expect("Failed to create an executor");
let always_pending = stream::poll_fn(|_lw| Poll::Pending::<Option<Result<(), Void>>>);
let mut group_available = always_pending.group_available();
let mut fut = group_available.try_next();
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
fn group_available_items() {
let mut exec = fasync::Executor::new().expect("Failed to create an executor");
let (send, recv) = mpsc::unbounded();
let mut s =;
let item = exec.run_singlethreaded(s.try_next()).unwrap_or_else(|void: Void| match void {});
assert_eq!(Some(vec![10i32, 20i32]), item);
let item = exec.run_singlethreaded(s.try_next()).unwrap_or_else(|void: Void| match void {});
assert_eq!(Some(vec![30i32]), item);
fn buffer_error() {
let mut exec = fasync::Executor::new().expect("Failed to create an executor");
let mut s = stream::iter(vec![Ok(10i32), Ok(20i32), Err(-30i32)]).group_available();
let item = exec.run_singlethreaded(s.try_next()).expect("expected a successful value");
assert_eq!(Some(vec![10i32, 20i32]), item);
let res = exec.run_singlethreaded(s.try_next());
assert_eq!(Err(-30i32), res);