blob: 4280a903f95a348b2e0890422d60b79564380a8a [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use futures::ready;
use futures::future::Future;
use futures::stream::{self, Fuse, Stream, StreamExt};
use futures::task::{Poll, LocalWaker};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::marker::Unpin;
use std::pin::Pin;
use crate::Never;
pub struct GroupAvailable<S, T, E> where S: Stream<Item = Result<T, E>> {
stream: Fuse<S>,
error: Option<E>,
}
impl<S, T, E> Unpin for GroupAvailable<S, T, E>
where
S: Unpin + Stream<Item = Result<T, E>>
{}
impl<S, T, E> GroupAvailable<S, T, E>
where
S: Unpin + Stream<Item = Result<T, E>>
{
// Safety: projecting to `Fuse<S>` is safe because GroupAvailable is `!Unpin`
// when `S` is `!Unpin`, and `GroupAvailable` doesn't move out of `stream`.
unsafe_pinned!(stream: Fuse<S>);
// Safety: nothing requires `error` not to move.
unsafe_unpinned!(error: Option<E>);
}
impl<S, T, E> Stream for GroupAvailable<S, T, E>
where
S: Unpin + Stream<Item = Result<T, E>>
{
type Item = Result<Vec<T>, E>;
fn poll_next(
mut self: Pin<&mut Self>,
lw: &LocalWaker,
) -> Poll<Option<Self::Item>> {
if let Some(e) = self.error().take() {
return Poll::Ready(Some(Err(e)));
}
let mut batch = match ready!(self.stream().poll_next(lw)?) {
Some(item) => vec![item],
None => return Poll::Ready(None),
};
loop {
match self.stream().poll_next(lw) {
Poll::Ready(Some(Ok(item))) => batch.push(item),
Poll::Ready(None) | Poll::Pending => break,
Poll::Ready(Some(Err(e))) => {
*self.error() = Some(e);
break;
}
}
}
Poll::Ready(Some(Ok(batch)))
}
}
pub trait GroupAvailableExt: Stream {
/// An adaptor for grouping readily available messages into a single Vec item.
///
/// Similar to StreamExt.chunks(), except the size of produced batches can be arbitrary,
/// and only depends on how many items were available when the stream was polled.
fn group_available<T, E>(self) -> GroupAvailable<Self, T, E>
where
Self: Stream<Item = Result<T, E>>,
Self: Sized,
{
GroupAvailable {
stream: self.fuse(),
error: None
}
}
}
impl<T> GroupAvailableExt for T where T: Stream + ?Sized {}
/// Similar to FuturesUnordered, but doesn't terminate when the there are no futures.
/// Also, it is a Future rather than a Stream to make it easier to use with select! macro
pub struct ConcurrentTasks<T> {
tasks: stream::FuturesUnordered<T>
}
impl<T> ConcurrentTasks<T> where T: Future {
unsafe_pinned!(tasks: stream::FuturesUnordered<T>);
pub fn new() -> Self {
ConcurrentTasks {
tasks: stream::FuturesUnordered::new(),
}
}
pub fn add(&mut self, task: T) {
self.tasks.push(task);
}
}
impl<T> Future for ConcurrentTasks<T> where T: Future<Output = ()> {
type Output = Never;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
loop {
match self.tasks().poll_next(lw) {
Poll::Ready(Some(())) => {},
Poll::Pending | Poll::Ready(None) => return Poll::Pending,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use fuchsia_async::{self as fasync, temp::TempStreamExt};
use futures::channel::mpsc;
use futures::stream::{self, TryStreamExt};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::Never;
#[test]
fn empty() {
let mut exec = fasync::Executor::new().expect("Failed to create an executor");
let (item, _) = exec.run_singlethreaded(
stream::empty::<Result<(), Never>>().group_available().try_into_future())
.unwrap_or_else(Never::into_any);
assert!(item.is_none());
}
#[test]
fn pending() {
let mut exec = fasync::Executor::new().expect("Failed to create an executor");
let always_pending = stream::poll_fn(|_lw| Poll::Pending::<Option<Result<(), Never>>>);
let mut group_available = always_pending.group_available();
let mut fut = group_available.try_next();
let a = exec.run_until_stalled(&mut fut);
assert!(a.is_pending());
}
#[test]
fn group_available_items() {
let mut exec = fasync::Executor::new().expect("Failed to create an executor");
let (send, recv) = mpsc::unbounded();
send.unbounded_send(10i32).unwrap();
send.unbounded_send(20i32).unwrap();
let mut s = recv.map(Ok).group_available();
let item = exec.run_singlethreaded(s.try_next()).unwrap_or_else(Never::into_any);
assert_eq!(Some(vec![10i32, 20i32]), item);
send.unbounded_send(30i32).unwrap();
let item = exec.run_singlethreaded(s.try_next()).unwrap_or_else(Never::into_any);
assert_eq!(Some(vec![30i32]), item);
}
#[test]
fn buffer_error() {
let mut exec = fasync::Executor::new().expect("Failed to create an executor");
let mut s = stream::iter(vec![Ok(10i32), Ok(20i32), Err(-30i32)])
.group_available();
let item = exec.run_singlethreaded(s.try_next())
.expect("expected a successful value");
assert_eq!(Some(vec![10i32, 20i32]), item);
let res = exec.run_singlethreaded(s.try_next());
assert_eq!(Err(-30i32), res);
}
#[test]
fn concurrent_tasks() {
let mut exec = fasync::Executor::new().expect("Failed to create an executor");
let mut tasks = ConcurrentTasks::new();
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut tasks));
let count_one = Arc::new(AtomicUsize::new(0));
tasks.add(simple_future(Arc::clone(&count_one)));
let count_two = Arc::new(AtomicUsize::new(0));
tasks.add(simple_future(Arc::clone(&count_two)));
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut tasks));
assert_eq!(1, count_one.load(Ordering::SeqCst));
assert_eq!(1, count_two.load(Ordering::SeqCst));
}
async fn simple_future(res: Arc<AtomicUsize>) {
res.fetch_add(1, Ordering::SeqCst);
}
}