blob: 6c3fe638691b5b473855ecf062582430b9c28f92 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::{Stream, StreamExt};
use std::{
cmp::Ordering,
fmt::Debug,
marker::Unpin,
pin::Pin,
task::{Context, Poll},
};
use tracing::trace;
pub type PinStream<I> = Pin<Box<dyn DebugStream<Item = I> + Send + 'static>>;
/// A Multiplexer takes multiple possibly-ordered streams and attempts to impose a sensible ordering
/// over the yielded items without risking starvation. New streams can be added to the multiplexer
/// by sending them on a channel.
pub struct Multiplexer<I> {
// TODO(fxbug.dev/68257) explore using a BinaryHeap for sorting substreams
current: Vec<SubStream<I>>,
incoming: UnboundedReceiver<IncomingStream<PinStream<I>>>,
incoming_is_live: bool,
}
impl<I> Multiplexer<I> {
pub fn new() -> (Self, MultiplexerHandle<I>) {
let (sender, incoming) = futures::channel::mpsc::unbounded();
(Self { current: vec![], incoming, incoming_is_live: true }, MultiplexerHandle { sender })
}
/// Drain the incoming channel to be sure we have all live sub-streams available when
/// considering ordering.
fn integrate_incoming_sub_streams(&mut self, cx: &mut Context<'_>) {
if self.incoming_is_live {
loop {
match self.incoming.poll_next_unpin(cx) {
// incoming has more for us right now
Poll::Ready(Some(IncomingStream::Next { name, stream })) => {
self.current.push(SubStream::new(name, stream));
}
// incoming has no more for us
Poll::Ready(Some(IncomingStream::Done)) | Poll::Ready(None) => {
self.incoming_is_live = false;
break;
}
// incoming has more for us, but not now
Poll::Pending => break,
}
}
}
}
}
impl<I: Ord + Unpin> Stream for Multiplexer<I> {
type Item = I;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// ensure the incoming channel is empty so that we're considering all streams here
self.integrate_incoming_sub_streams(cx);
// ensure we have the latest item cached from each stream that has results
self.current.iter_mut().for_each(|s| s.poll_cache(cx));
// ensure we only have substreams which can still yield values
self.current.retain(SubStream::is_live);
// sort by the cached latest item from each sub-stream
self.current.sort_unstable_by(compare_sub_streams);
if self.current.is_empty() && !self.incoming_is_live {
// we don't have any live sub-streams and we're not getting any more
Poll::Ready(None)
} else if let Some(next) = self.current.get_mut(0).and_then(|c| c.cached.take()) {
// get the front item among our substreams and return it if available
Poll::Ready(Some(next))
} else {
// we're out of cached values but have sub-streams that claim to be pending
Poll::Pending
}
}
}
/// A handle to a running multiplexer. Can be used to add new sub-streams to the multiplexer.
pub struct MultiplexerHandle<I> {
sender: UnboundedSender<IncomingStream<PinStream<I>>>,
}
impl<I> MultiplexerHandle<I> {
/// Send a new substream to the multiplexer. Returns `true` if it is still listening.
pub fn send(&self, name: impl Into<String>, stream: PinStream<I>) -> bool {
self.sender.unbounded_send(IncomingStream::Next { name: name.into(), stream }).is_ok()
}
/// Notify the multiplexer that no new sub-streams will be arriving.
pub fn close(&self) {
self.sender.unbounded_send(IncomingStream::Done).ok();
}
}
enum IncomingStream<S> {
Next { name: String, stream: S },
Done,
}
/// A `SubStream` wraps an inner stream and keeps its latest value cached inline for comparison
/// with the cached values of other `SubStream`s, allowing for semi-ordered merging of streams.
#[derive(Debug)]
struct SubStream<I> {
name: String,
cached: Option<I>,
inner_is_live: bool,
inner: PinStream<I>,
}
impl<I> SubStream<I> {
fn new(name: String, inner: PinStream<I>) -> Self {
Self { name, cached: None, inner_is_live: true, inner }
}
}
impl<I> SubStream<I> {
/// Attempts to populate the inline cache of the latest stream value, if needed.
fn poll_cache(&mut self, cx: &mut Context<'_>) {
if self.cached.is_none() && self.inner_is_live {
match self.inner.as_mut().poll_next(cx) {
Poll::Ready(Some(item)) => self.cached = Some(item),
Poll::Ready(None) => self.inner_is_live = false,
Poll::Pending => (),
}
}
}
fn is_live(&self) -> bool {
self.inner_is_live || self.cached.is_some()
}
}
impl<I> Drop for SubStream<I> {
fn drop(&mut self) {
trace!(%self.name, "substream terminated");
}
}
/// Compare two SubStreams so that streams with cached values come before those without cached
/// values, deferring to `I`'s `Ord` impl for those SubStreams with cached values.
fn compare_sub_streams<I: Ord>(a: &SubStream<I>, b: &SubStream<I>) -> Ordering {
match (&a.cached, &b.cached) {
(Some(a), Some(b)) => a.cmp(&b),
(None, Some(_)) => Ordering::Greater,
(Some(_), None) => Ordering::Less,
(None, None) => Ordering::Equal,
}
}
pub trait DebugStream: Debug {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
impl<I, T> DebugStream for T
where
T: Debug + Stream<Item = I>,
{
type Item = I;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Stream::poll_next(self, cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{prelude::*, stream::iter as iter2stream};
#[fuchsia_async::run_singlethreaded(test)]
async fn empty_multiplexer_terminates() {
let (mux, handle) = Multiplexer::<i32>::new();
handle.close();
let observed: Vec<i32> = mux.collect().await;
let expected: Vec<i32> = vec![];
assert_eq!(observed, expected);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn empty_input_streams_terminate() {
let (mux, handle) = Multiplexer::<i32>::new();
handle.send("empty1", Box::pin(iter2stream(vec![])) as PinStream<i32>);
handle.send("empty2", Box::pin(iter2stream(vec![])) as PinStream<i32>);
handle.send("empty3", Box::pin(iter2stream(vec![])) as PinStream<i32>);
handle.close();
let observed: Vec<i32> = mux.collect::<Vec<i32>>().await;
let expected: Vec<i32> = vec![];
assert_eq!(observed, expected);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn outputs_are_ordered() {
let (mux, handle) = Multiplexer::<i32>::new();
handle.send("first", Box::pin(iter2stream(vec![1, 3, 5, 7])) as PinStream<i32>);
handle.send("second", Box::pin(iter2stream(vec![2, 4, 6, 8])) as PinStream<i32>);
handle.send("third", Box::pin(iter2stream(vec![9, 10, 11])) as PinStream<i32>);
handle.close();
let observed: Vec<i32> = mux.collect::<Vec<i32>>().await;
let expected: Vec<i32> = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
assert_eq!(observed, expected);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn semi_sorted_substream_semi_sorted() {
let (mux, handle) = Multiplexer::<i32>::new();
handle.send("unordered", Box::pin(iter2stream(vec![1, 7, 3, 5])) as PinStream<i32>);
handle.send("ordered", Box::pin(iter2stream(vec![2, 4, 6, 8])) as PinStream<i32>);
handle.close();
let observed: Vec<i32> = mux.collect::<Vec<i32>>().await;
// we get the stream in a weird order because `unordered`'s 3 & 5 are held up behind 7
let expected: Vec<i32> = vec![1, 2, 4, 6, 7, 3, 5, 8];
assert_eq!(observed, expected);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn single_stream() {
let (mut send, recv) = futures::channel::mpsc::unbounded();
let (mut mux, handle) = Multiplexer::<i32>::new();
handle.send("recv", Box::pin(recv) as PinStream<i32>);
assert!(mux.next().now_or_never().is_none());
send.unbounded_send(1).unwrap();
assert_eq!(mux.next().await.unwrap(), 1);
assert!(mux.next().now_or_never().is_none());
send.unbounded_send(2).unwrap();
send.unbounded_send(3).unwrap();
send.unbounded_send(4).unwrap();
send.unbounded_send(5).unwrap();
send.unbounded_send(6).unwrap();
send.disconnect();
handle.close();
let observed: Vec<i32> = mux.collect().await;
assert_eq!(observed, vec![2, 3, 4, 5, 6]);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn two_streams_merged() {
let (mut send1, recv1) = futures::channel::mpsc::unbounded();
let (mut send2, recv2) = futures::channel::mpsc::unbounded();
let (mut mux, handle) = Multiplexer::<i32>::new();
handle.send("recv1", Box::pin(recv1) as PinStream<i32>);
handle.send("recv2", Box::pin(recv2) as PinStream<i32>);
assert!(mux.next().now_or_never().is_none());
send1.unbounded_send(2).unwrap();
send2.unbounded_send(1).unwrap();
assert_eq!(mux.next().await.unwrap(), 1);
assert_eq!(mux.next().await.unwrap(), 2);
assert!(mux.next().now_or_never().is_none());
send1.unbounded_send(2).unwrap();
send2.unbounded_send(3).unwrap();
send1.disconnect();
assert_eq!(mux.next().await.unwrap(), 2);
assert_eq!(mux.next().await.unwrap(), 3);
assert!(mux.next().now_or_never().is_none());
send2.unbounded_send(4).unwrap();
send2.unbounded_send(5).unwrap();
send2.disconnect();
assert_eq!(mux.next().await.unwrap(), 4);
assert_eq!(mux.next().await.unwrap(), 5);
assert!(
mux.next().now_or_never().is_none(),
"multiplexer stays open even with current streams terminated"
);
handle.close();
assert!(mux.next().await.is_none());
}
#[fuchsia_async::run_singlethreaded(test)]
async fn new_sub_streams_are_merged() {
let (mut send1, recv1) = futures::channel::mpsc::unbounded();
let (mut send2, recv2) = futures::channel::mpsc::unbounded();
let (mut send3, recv3) = futures::channel::mpsc::unbounded();
let (mut mux, handle) = Multiplexer::<i32>::new();
handle.send("recv1", Box::pin(recv1) as PinStream<i32>);
handle.send("recv2", Box::pin(recv2) as PinStream<i32>);
send3.unbounded_send(0).unwrap(); // this shouldn't show up until we add it to the mux below
assert!(mux.next().now_or_never().is_none());
send1.unbounded_send(2).unwrap();
send2.unbounded_send(1).unwrap();
assert_eq!(mux.next().await.unwrap(), 1);
assert_eq!(mux.next().await.unwrap(), 2);
assert!(mux.next().now_or_never().is_none());
send1.unbounded_send(3).unwrap();
handle.send("recv3", Box::pin(recv3) as PinStream<i32>);
assert_eq!(mux.next().await.unwrap(), 0);
assert_eq!(mux.next().await.unwrap(), 3);
assert!(mux.next().now_or_never().is_none());
handle.close();
assert!(mux.next().now_or_never().is_none(), "open substreams hold the multiplexer open");
send1.disconnect();
send2.disconnect();
send3.disconnect();
assert!(mux.next().await.is_none(), "all substreams terminated, now we can close");
}
#[fuchsia_async::run_singlethreaded(test)]
async fn snapshot_with_stopped_substream() {
let (mut send1, recv1) = futures::channel::mpsc::unbounded();
let (mut send2, recv2) = futures::channel::mpsc::unbounded();
let (mut mux, handle) = Multiplexer::<i32>::new();
send1.unbounded_send(1).unwrap();
send1.disconnect();
handle.send("recv1", Box::pin(recv1));
send2.unbounded_send(2).unwrap();
handle.send("recv2", Box::pin(recv2));
handle.close();
assert_eq!(mux.next().await.unwrap(), 1);
assert_eq!(mux.next().await.unwrap(), 2);
assert!(mux.next().now_or_never().is_none());
send2.unbounded_send(3).unwrap();
assert_eq!(mux.next().await.unwrap(), 3);
assert!(mux.next().now_or_never().is_none());
send2.disconnect();
assert!(mux.next().await.is_none(), "all substreams terminated, now we can close");
}
}