blob: bf562a895a2fb20ce452dc9b2166f3c23fbceb3e [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::identity::ComponentIdentity;
use fidl_fuchsia_diagnostics::Selector;
use fuchsia_trace as ftrace;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::{Stream, StreamExt};
use lazy_static::lazy_static;
use selectors::SelectorExt;
use std::{
cmp::Ordering,
fmt::Debug,
pin::Pin,
sync::{atomic::AtomicUsize, Arc},
task::{Context, Poll},
};
use tracing::trace;
pub type PinStream<I> = Pin<Box<dyn DebugStream<Item = I> + Send + 'static>>;
lazy_static! {
static ref MULTIPLEXER_ID: std::sync::atomic::AtomicUsize = AtomicUsize::new(0);
}
/// A Multiplexer takes multiple possibly-ordered streams and attempts to impose a sensible ordering
/// over the yielded items without risking starvation. New streams can be added to the multiplexer
/// by sending them on a channel.
pub struct Multiplexer<I> {
// TODO(https://fxbug.dev/42147260) explore using a BinaryHeap for sorting substreams
current: Vec<SubStream<I>>,
incoming: UnboundedReceiver<IncomingStream<PinStream<I>>>,
incoming_is_live: bool,
selectors: Option<Vec<Selector>>,
id: usize,
/// The multiplexer id will be sent through this channel when the Multiplexer is dropped. This
/// is used to clean up MultiplexerHandles in the MultiplexerBroker.
on_drop_id_sender: Option<UnboundedSender<usize>>,
}
impl<I> Multiplexer<I> {
pub fn new(
parent_trace_id: ftrace::Id,
selectors: Option<Vec<Selector>>,
substreams: impl Iterator<Item = (Arc<ComponentIdentity>, PinStream<I>)>,
) -> (Self, MultiplexerHandle<I>) {
let (sender, incoming) = futures::channel::mpsc::unbounded();
let id = MULTIPLEXER_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let current = substreams
.filter(|(identity, _)| Self::is_identity_allowed(&selectors, identity))
.map(|(identity, stream)| SubStream::new(identity, stream))
.collect();
(
Self {
current,
incoming,
incoming_is_live: true,
selectors,
id,
on_drop_id_sender: None,
},
MultiplexerHandle { sender, id, trace_id: parent_trace_id },
)
}
pub fn id(&self) -> usize {
self.id
}
pub fn set_on_drop_id_sender(&mut self, snd: UnboundedSender<usize>) {
self.on_drop_id_sender = Some(snd);
}
/// Drain the incoming channel to be sure we have all live sub-streams available when
/// considering ordering.
fn integrate_incoming_sub_streams(&mut self, cx: &mut Context<'_>) {
if self.incoming_is_live {
loop {
match self.incoming.poll_next_unpin(cx) {
// incoming has more for us right now
Poll::Ready(Some(IncomingStream::Next { identity, stream })) => {
if self.selectors_allow(&identity) {
self.current.push(SubStream::new(Arc::clone(&identity), stream));
}
}
// incoming has no more for us
Poll::Ready(Some(IncomingStream::Done)) | Poll::Ready(None) => {
self.incoming_is_live = false;
break;
}
// incoming has more for us, but not now
Poll::Pending => break,
}
}
}
}
fn selectors_allow(&self, identity: &ComponentIdentity) -> bool {
Self::is_identity_allowed(&self.selectors, identity)
}
fn is_identity_allowed(
selectors: &Option<Vec<Selector>>,
identity: &ComponentIdentity,
) -> bool {
let component_selectors = selectors
.as_ref()
.map(|ss| ss.iter().filter_map(|s| s.component_selector.as_ref()).collect::<Vec<_>>());
match &component_selectors {
None => true,
Some(selectors) => identity
.moniker
.match_against_component_selectors(selectors)
.map(|matched_selectors| !matched_selectors.is_empty())
.unwrap_or(false),
}
}
}
impl<I> Drop for Multiplexer<I> {
fn drop(&mut self) {
if let Some(snd) = &self.on_drop_id_sender {
let _ = snd.unbounded_send(self.id());
}
}
}
impl<I: Ord + Unpin> Stream for Multiplexer<I> {
type Item = I;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// ensure the incoming channel is empty so that we're considering all streams here
self.integrate_incoming_sub_streams(cx);
// ensure we have the latest item cached from each stream that has results
self.current.iter_mut().for_each(|s| s.poll_cache(cx));
// ensure we only have substreams which can still yield values
self.current.retain(SubStream::is_live);
// sort by the cached latest item from each sub-stream
self.current.sort_unstable_by(compare_sub_streams);
if self.current.is_empty() && !self.incoming_is_live {
// we don't have any live sub-streams and we're not getting any more
Poll::Ready(None)
} else if let Some(next) = self.current.get_mut(0).and_then(|c| c.cached.take()) {
// get the front item among our substreams and return it if available
Poll::Ready(Some(next))
} else {
// we're out of cached values but have sub-streams that claim to be pending
Poll::Pending
}
}
}
/// A handle to a running multiplexer. Can be used to add new sub-streams to the multiplexer.
pub struct MultiplexerHandle<I> {
sender: UnboundedSender<IncomingStream<PinStream<I>>>,
id: usize,
pub trace_id: ftrace::Id,
}
impl<I> MultiplexerHandle<I> {
/// Send a new substream to the multiplexer. Returns `true` if it is still listening.
pub fn send(&self, identity: Arc<ComponentIdentity>, stream: PinStream<I>) -> bool {
self.sender.unbounded_send(IncomingStream::Next { identity, stream }).is_ok()
}
pub fn multiplexer_id(&self) -> usize {
self.id
}
pub fn parent_trace_id(&self) -> ftrace::Id {
self.trace_id
}
/// Notify the multiplexer that no new sub-streams will be arriving.
pub fn close(&self) {
self.sender.unbounded_send(IncomingStream::Done).ok();
}
}
enum IncomingStream<S> {
Next { identity: Arc<ComponentIdentity>, stream: S },
Done,
}
/// A `SubStream` wraps an inner stream and keeps its latest value cached inline for comparison
/// with the cached values of other `SubStream`s, allowing for semi-ordered merging of streams.
#[derive(Debug)]
pub struct SubStream<I> {
identity: Arc<ComponentIdentity>,
cached: Option<I>,
inner_is_live: bool,
inner: PinStream<I>,
}
impl<I> SubStream<I> {
pub fn new(identity: Arc<ComponentIdentity>, inner: PinStream<I>) -> Self {
Self { identity, cached: None, inner_is_live: true, inner }
}
}
impl<I> SubStream<I> {
/// Attempts to populate the inline cache of the latest stream value, if needed.
fn poll_cache(&mut self, cx: &mut Context<'_>) {
if self.cached.is_none() && self.inner_is_live {
match self.inner.as_mut().poll_next(cx) {
Poll::Ready(Some(item)) => self.cached = Some(item),
Poll::Ready(None) => self.inner_is_live = false,
Poll::Pending => (),
}
}
}
fn is_live(&self) -> bool {
self.inner_is_live || self.cached.is_some()
}
}
impl<I> Drop for SubStream<I> {
fn drop(&mut self) {
trace!(identity = %self.identity, "substream terminated");
}
}
/// Compare two SubStreams so that streams with cached values come before those without cached
/// values, deferring to `I`'s `Ord` impl for those SubStreams with cached values.
fn compare_sub_streams<I: Ord>(a: &SubStream<I>, b: &SubStream<I>) -> Ordering {
match (&a.cached, &b.cached) {
(Some(a), Some(b)) => a.cmp(b),
(None, Some(_)) => Ordering::Greater,
(Some(_), None) => Ordering::Less,
(None, None) => Ordering::Equal,
}
}
pub trait DebugStream: Debug {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
impl<I, T> DebugStream for T
where
T: Debug + Stream<Item = I>,
{
type Item = I;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Stream::poll_next(self, cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{prelude::*, stream::iter as iter2stream};
use selectors::FastError;
#[fuchsia::test]
async fn empty_multiplexer_terminates() {
let (mux, handle) = Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
handle.close();
let observed: Vec<i32> = mux.collect().await;
let expected: Vec<i32> = vec![];
assert_eq!(observed, expected);
}
#[fuchsia::test]
async fn empty_input_streams_terminate() {
let (mux, handle) = Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
handle
.send(Arc::new(vec!["empty1"].into()), Box::pin(iter2stream(vec![])) as PinStream<i32>);
handle
.send(Arc::new(vec!["empty2"].into()), Box::pin(iter2stream(vec![])) as PinStream<i32>);
handle
.send(Arc::new(vec!["empty3"].into()), Box::pin(iter2stream(vec![])) as PinStream<i32>);
handle.close();
let observed: Vec<i32> = mux.collect::<Vec<i32>>().await;
let expected: Vec<i32> = vec![];
assert_eq!(observed, expected);
}
#[fuchsia::test]
async fn outputs_are_ordered() {
let (mux, handle) = Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
handle.send(
Arc::new(vec!["first"].into()),
Box::pin(iter2stream(vec![1, 3, 5, 7])) as PinStream<i32>,
);
handle.send(
Arc::new(vec!["second"].into()),
Box::pin(iter2stream(vec![2, 4, 6, 8])) as PinStream<i32>,
);
handle.send(
Arc::new(vec!["third"].into()),
Box::pin(iter2stream(vec![9, 10, 11])) as PinStream<i32>,
);
handle.close();
let observed: Vec<i32> = mux.collect::<Vec<i32>>().await;
let expected: Vec<i32> = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
assert_eq!(observed, expected);
}
#[fuchsia::test]
async fn semi_sorted_substream_semi_sorted() {
let (mux, handle) = Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
handle.send(
Arc::new(vec!["unordered"].into()),
Box::pin(iter2stream(vec![1, 7, 3, 5])) as PinStream<i32>,
);
handle.send(
Arc::new(vec!["ordered"].into()),
Box::pin(iter2stream(vec![2, 4, 6, 8])) as PinStream<i32>,
);
handle.close();
let observed: Vec<i32> = mux.collect::<Vec<i32>>().await;
// we get the stream in a weird order because `unordered`'s 3 & 5 are held up behind 7
let expected: Vec<i32> = vec![1, 2, 4, 6, 7, 3, 5, 8];
assert_eq!(observed, expected);
}
#[fuchsia::test]
async fn single_stream() {
let (mut send, recv) = futures::channel::mpsc::unbounded();
let (mut mux, handle) =
Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
handle.send(Arc::new(vec!["recv"].into()), Box::pin(recv) as PinStream<i32>);
assert!(mux.next().now_or_never().is_none());
send.unbounded_send(1).unwrap();
assert_eq!(mux.next().await.unwrap(), 1);
assert!(mux.next().now_or_never().is_none());
send.unbounded_send(2).unwrap();
send.unbounded_send(3).unwrap();
send.unbounded_send(4).unwrap();
send.unbounded_send(5).unwrap();
send.unbounded_send(6).unwrap();
send.disconnect();
handle.close();
let observed: Vec<i32> = mux.collect().await;
assert_eq!(observed, vec![2, 3, 4, 5, 6]);
}
#[fuchsia::test]
async fn two_streams_merged() {
let (mut send1, recv1) = futures::channel::mpsc::unbounded();
let (mut send2, recv2) = futures::channel::mpsc::unbounded();
let (mut mux, handle) =
Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
handle.send(Arc::new(vec!["recv1"].into()), Box::pin(recv1) as PinStream<i32>);
handle.send(Arc::new(vec!["recv2"].into()), Box::pin(recv2) as PinStream<i32>);
assert!(mux.next().now_or_never().is_none());
send1.unbounded_send(2).unwrap();
send2.unbounded_send(1).unwrap();
assert_eq!(mux.next().await.unwrap(), 1);
assert_eq!(mux.next().await.unwrap(), 2);
assert!(mux.next().now_or_never().is_none());
send1.unbounded_send(2).unwrap();
send2.unbounded_send(3).unwrap();
send1.disconnect();
assert_eq!(mux.next().await.unwrap(), 2);
assert_eq!(mux.next().await.unwrap(), 3);
assert!(mux.next().now_or_never().is_none());
send2.unbounded_send(4).unwrap();
send2.unbounded_send(5).unwrap();
send2.disconnect();
assert_eq!(mux.next().await.unwrap(), 4);
assert_eq!(mux.next().await.unwrap(), 5);
assert!(
mux.next().now_or_never().is_none(),
"multiplexer stays open even with current streams terminated"
);
handle.close();
assert!(mux.next().await.is_none());
}
#[fuchsia::test]
async fn new_sub_streams_are_merged() {
let (mut send1, recv1) = futures::channel::mpsc::unbounded();
let (mut send2, recv2) = futures::channel::mpsc::unbounded();
let (mut send3, recv3) = futures::channel::mpsc::unbounded();
let (mut mux, handle) =
Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
handle.send(Arc::new(vec!["recv1"].into()), Box::pin(recv1) as PinStream<i32>);
handle.send(Arc::new(vec!["recv2"].into()), Box::pin(recv2) as PinStream<i32>);
send3.unbounded_send(0).unwrap(); // this shouldn't show up until we add it to the mux below
assert!(mux.next().now_or_never().is_none());
send1.unbounded_send(2).unwrap();
send2.unbounded_send(1).unwrap();
assert_eq!(mux.next().await.unwrap(), 1);
assert_eq!(mux.next().await.unwrap(), 2);
assert!(mux.next().now_or_never().is_none());
send1.unbounded_send(3).unwrap();
handle.send(Arc::new(vec!["recv3"].into()), Box::pin(recv3) as PinStream<i32>);
assert_eq!(mux.next().await.unwrap(), 0);
assert_eq!(mux.next().await.unwrap(), 3);
assert!(mux.next().now_or_never().is_none());
handle.close();
assert!(mux.next().now_or_never().is_none(), "open substreams hold the multiplexer open");
send1.disconnect();
send2.disconnect();
send3.disconnect();
assert!(mux.next().await.is_none(), "all substreams terminated, now we can close");
}
#[fuchsia::test]
async fn snapshot_with_stopped_substream() {
let (mut send1, recv1) = futures::channel::mpsc::unbounded();
let (mut send2, recv2) = futures::channel::mpsc::unbounded();
let (mut mux, handle) =
Multiplexer::<i32>::new(ftrace::Id::random(), None, std::iter::empty());
send1.unbounded_send(1).unwrap();
send1.disconnect();
handle.send(Arc::new(vec!["recv1"].into()), Box::pin(recv1));
send2.unbounded_send(2).unwrap();
handle.send(Arc::new(vec!["recv2"].into()), Box::pin(recv2));
handle.close();
assert_eq!(mux.next().await.unwrap(), 1);
assert_eq!(mux.next().await.unwrap(), 2);
assert!(mux.next().now_or_never().is_none());
send2.unbounded_send(3).unwrap();
assert_eq!(mux.next().await.unwrap(), 3);
assert!(mux.next().now_or_never().is_none());
send2.disconnect();
assert!(mux.next().await.is_none(), "all substreams terminated, now we can close");
}
#[fuchsia_async::run_singlethreaded(test)]
async fn multiplexer_selectors() {
let (mut send1, recv1) = futures::channel::mpsc::unbounded();
let (send2, recv2) = futures::channel::mpsc::unbounded();
let (mut mux, handle) = Multiplexer::<i32>::new(
ftrace::Id::random(),
Some(vec![selectors::parse_selector::<FastError>("recv1:root").unwrap()]),
std::iter::empty(),
);
handle.send(Arc::new(vec!["recv1"].into()), Box::pin(recv1) as PinStream<i32>);
handle.send(Arc::new(vec!["recv2"].into()), Box::pin(recv2) as PinStream<i32>);
// Verify we never see recv2 messages and we didn't event connect it.
assert!(mux.next().now_or_never().is_none());
send1.unbounded_send(1).unwrap();
assert!(send2.unbounded_send(2).unwrap_err().is_disconnected());
assert_eq!(mux.next().await.unwrap(), 1);
assert!(mux.next().now_or_never().is_none());
send1.disconnect();
handle.close();
assert!(mux.next().await.is_none());
}
}