blob: 1a41f043965ae2a0f215e2fd2175c915b1c5797e [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Streams always signal exhaustion with `None` return values. A stream epitaph can be used when
//! a specific value is desired as the last item returned by a stream before it is exhausted.
//!
//! Example usecase: often streams will be used without having direct access to the stream itself
//! such as from a `streammap::StreamMap` or a `futures::stream::FuturesUnordered`. Occasionally,
//! it is necessary to perform some cleanup procedure outside of a stream when it is exhausted. An
//! `epitaph` can be used to uniquely identify which stream has ended within a collection of
//! streams.
use {
futures::stream::{FusedStream, Stream},
std::pin::Pin,
std::task::{Context, Poll},
};
/// Values returned from a stream with an epitaph are of type `StreamItem`.
#[derive(Debug, PartialEq)]
pub enum StreamItem<T, E> {
/// Item polled from the underlying `Stream`
Item(T),
/// Epitaph value returned after the underlying `Stream` is exhausted.
Epitaph(E),
}
/// A `Stream` that returns the values of the wrapped stream until the wrapped stream is exhausted.
/// Then it returns a single epitaph value before being exhausted
pub struct StreamWithEpitaph<S, E> {
inner: S,
epitaph: Option<E>,
terminated: bool,
}
// The `Unpin` bounds are not strictly necessary, but make for a more convenient
// implementation. The bounds can be relaxed if !Unpin support is desired.
impl<S, T, E> Stream for StreamWithEpitaph<S, E>
where
S: Stream<Item = T> + Unpin,
E: Unpin,
T: Unpin,
{
type Item = StreamItem<T, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.terminated {
return Poll::Ready(None);
}
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(None) => {
let mut this = self.get_mut();
this.terminated = true;
let ep = this.epitaph.take().map(StreamItem::Epitaph);
assert!(ep.is_some(), "epitaph must be present if stream is not terminated");
Poll::Ready(ep)
}
Poll::Ready(item) => Poll::Ready(item.map(StreamItem::Item)),
Poll::Pending => Poll::Pending,
}
}
}
impl<S, T, E> FusedStream for StreamWithEpitaph<S, E>
where
S: Stream<Item = T> + FusedStream + Unpin,
E: Unpin,
T: Unpin,
{
fn is_terminated(&self) -> bool {
self.terminated
}
}
/// Extension trait to allow for easy creation of a `StreamWithEpitaph` from a `Stream`.
pub trait WithEpitaph: Sized {
/// Map this stream to one producing a `StreamItem::Item` value for each item of the stream
/// followed by a single `StreamItem::Epitaph` value with the provided `epitaph`.
fn with_epitaph<E>(self, epitaph: E) -> StreamWithEpitaph<Self, E>;
}
impl<T> WithEpitaph for T
where
T: Stream,
{
fn with_epitaph<E>(self, epitaph: E) -> StreamWithEpitaph<T, E> {
StreamWithEpitaph { inner: self, epitaph: Some(epitaph), terminated: false }
}
}
#[cfg(test)]
mod tests {
use {
super::*,
fuchsia_async as fasync,
futures::{
future::ready,
stream::{empty, iter, once, Empty, StreamExt},
},
};
#[fasync::run_until_stalled(test)]
async fn empty_stream_returns_epitaph_only() {
let s: Empty<i32> = empty();
let s = s.with_epitaph(0i64);
let actual: Vec<_> = s.collect().await;
let expected = vec![StreamItem::Epitaph(0i64)];
assert_eq!(actual, expected);
}
#[fasync::run_until_stalled(test)]
async fn populated_stream_returns_items_and_epitaph() {
let s = iter(0i32..3).fuse().with_epitaph(3i64);
let actual: Vec<_> = StreamExt::collect::<Vec<_>>(s).await;
let expected = vec![
StreamItem::Item(0),
StreamItem::Item(1),
StreamItem::Item(2),
StreamItem::Epitaph(3i64),
];
assert_eq!(actual, expected);
}
#[fasync::run_until_stalled(test)]
async fn stream_is_terminated_after_end() {
let mut s = once(ready(0i32)).with_epitaph(3i64);
s.next().await;
s.next().await;
assert!(s.is_terminated());
}
}