blob: cb71ae43e3171dda97272ca7e26ea8d94dc343dc [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{events::types::*, identity::ComponentIdentity};
use async_trait::async_trait;
use fuchsia_inspect::{self as inspect, NumericProperty, StringReference};
use fuchsia_inspect_contrib::{inspect_log, nodes::BoundedListNode};
use futures::{
channel::{mpsc, oneshot},
task::{Context, Poll},
Future, SinkExt, Stream, StreamExt,
};
use lazy_static::lazy_static;
use pin_project::pin_project;
use std::{
collections::{BTreeMap, BTreeSet},
iter::Extend,
pin::Pin,
sync::{Arc, Weak},
};
use thiserror::Error;
use tracing::{debug, error};
const MAX_EVENT_BUS_CAPACITY: usize = 1024;
const RECENT_EVENT_LIMIT: usize = 200;
lazy_static! {
static ref EVENT: StringReference<'static> = "event".into();
static ref MONIKER: StringReference<'static> = "moniker".into();
}
/// Core archivist internal event router that supports multiple event producers and multiple event
/// consumers.
pub struct EventRouter {
// All the consumers that have been registered for an event.
consumers: BTreeMap<AnyEventType, Vec<Weak<dyn EventConsumer + Send + Sync>>>,
// The types of all events that can be produced. Used only for validation.
producers_registered: BTreeSet<AnyEventType>,
// Ends of the channel used by internal event producers.
internal_sender: mpsc::Sender<Event>,
internal_receiver: mpsc::Receiver<Event>,
// Ends of the channel used by all external event producers.
external_sender: mpsc::Sender<Event>,
external_receiver: mpsc::Receiver<Event>,
inspect_logger: EventStreamLogger,
}
impl EventRouter {
/// Creates a new empty event router.
pub fn new(node: inspect::Node) -> Self {
let (internal_sender, internal_receiver) = mpsc::channel(MAX_EVENT_BUS_CAPACITY);
let (external_sender, external_receiver) = mpsc::channel(MAX_EVENT_BUS_CAPACITY);
Self {
consumers: BTreeMap::new(),
internal_sender,
internal_receiver,
external_sender,
external_receiver,
producers_registered: BTreeSet::new(),
inspect_logger: EventStreamLogger::new(node),
}
}
/// Registers an event producer with the given configuration specifying the types of events the
/// given producer is allowed to emit.
pub fn add_producer<T>(&mut self, config: ProducerConfig<'_, T>)
where
T: EventProducer,
{
let mut events: BTreeSet<_> = config.events.into_iter().map(|e| e.into()).collect();
events.extend(config.singleton_events.into_iter().map(|e| e.into()));
self.producers_registered.append(&mut events.clone());
let sender = match config.producer_type {
ProducerType::Internal => self.internal_sender.clone(),
ProducerType::External => self.external_sender.clone(),
};
let dispatcher = Dispatcher::new(events, sender);
config.producer.set_dispatcher(dispatcher);
}
/// Registers an event consumer with the given configuration specifying the types of events the
/// given consumer will receive.
pub fn add_consumer<T: 'static>(&mut self, config: ConsumerConfig<'_, T>)
where
T: EventConsumer + Send + Sync,
{
let subscriber_weak = Arc::downgrade(config.consumer);
for event_type in config.events {
self.consumers.entry(event_type.into()).or_default().push(subscriber_weak.clone());
}
for event_type in config.singleton_events {
self.consumers.entry(event_type.into()).or_default().push(subscriber_weak.clone());
}
}
/// Starts listening for events emitted by the registered producers and dispatching them to
/// registered consumers.
///
/// First, validates that for every event type that will be dispatched, there exists at least
/// one consumer. And that for every event that will be consumed, there exists at least one
/// producer.
///
/// Afterwards, listens to events emitted by producers. When an event arrives it sends it to
/// all consumers of the event. If the event is singleton, the first consumer that was
/// registered will get the singleton data and the rest won't.
pub fn start(mut self) -> Result<(TerminateHandle, impl Future<Output = ()>), RouterError> {
self.validate_routing()?;
let (terminate_handle, mut stream) =
EventStream::new(self.external_receiver, self.internal_receiver);
let mut consumers = self.consumers;
let mut inspect_logger = self.inspect_logger;
let fut = async move {
loop {
match stream.next().await {
None => {
debug!("Event ingestion finished");
break;
}
Some(event) => {
inspect_logger.log(&event);
let event_type = event.ty();
let weak_consumers = match consumers.get_mut(&event_type) {
Some(c) => c,
None => continue,
};
let event_without_singleton_data = event.clone();
let mut event_with_singleton_data =
if event.is_singleton() { Some(event) } else { None };
// Consumers which weak reference could be upgraded will be stored here.
let mut active_consumers = vec![];
for consumer in weak_consumers.iter_mut().filter_map(|c| c.upgrade()) {
active_consumers.push(Arc::downgrade(&consumer));
let e = event_with_singleton_data
.take()
.unwrap_or_else(|| event_without_singleton_data.clone());
consumer.handle(e).await;
}
// We insert the list of active consumers in the map at the key for this
// event type. This leads to dropping the previous list of weak references
// which contains consumers which aren't active anymore.
consumers.insert(event_type, active_consumers);
}
}
}
};
Ok((terminate_handle, fut))
}
fn validate_routing(&mut self) -> Result<(), RouterError> {
for consumed_event in self.consumers.keys() {
if self.producers_registered.get(consumed_event).is_none() {
return Err(RouterError::MissingProducer(consumed_event.clone()));
}
}
for produced_event in &self.producers_registered {
if self.consumers.get(produced_event).is_none() {
return Err(RouterError::MissingConsumer(produced_event.clone()));
}
}
Ok(())
}
}
/// Stream of events that merges the internal and external stream into a single stream. It also
/// provides the mechanisms used to notify when the external events have been drained.
#[pin_project]
struct EventStream {
/// The stream containing events originating externally.
#[pin]
external: mpsc::Receiver<Event>,
/// The stream conitaining events originating internally.
#[pin]
internal: mpsc::Receiver<Event>,
/// When this future is ready, the external stream will be closed. Messages still in the buffer
/// will be drained.
#[pin]
on_terminate: oneshot::Receiver<()>,
/// When the external stream has been drained a notification will be sent through this channel.
on_external_drained: Option<oneshot::Sender<()>>,
/// Specifies what stream will be polled first. When true, the external stream is polled first,
/// when false, the internal stream is polled first. Polling of both streams will be alteranted
/// in a round robin fashion.
turn: Turn,
}
enum Turn {
Internal,
External,
}
impl Turn {
fn advance(&mut self) {
match self {
Turn::Internal => *self = Turn::External,
Turn::External => *self = Turn::Internal,
}
}
}
impl EventStream {
fn new(
external: mpsc::Receiver<Event>,
internal: mpsc::Receiver<Event>,
) -> (TerminateHandle, Self) {
let (snd, rcv) = oneshot::channel();
let (external_drain_snd, external_drain_rcv) = oneshot::channel();
(
TerminateHandle { snd, external_drained: external_drain_rcv },
Self {
external,
internal,
on_terminate: rcv,
on_external_drained: Some(external_drain_snd),
turn: Turn::External,
},
)
}
}
impl Stream for EventStream {
type Item = Event;
/// This stream implementation merges two streams into a single one polling from each of them
/// in a round robin fashion. When one stream finishes, this will keep polling from the
/// remaining one.
///
/// When receiving a request for termination, the external event stream will be
/// closed so that no new messages can be sent through that channel, but it'll still be drained.
///
/// When the external stream has been drained, a message is sent through the appropriate
/// channel.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
// First check if request to terminate the external event ingestion has been requested, if
// it has, then close the channel to which external events are sent. This will prevent
// further messages to be sent, but it remains possible to drain the external channel
// buffer.
match this.on_terminate.poll(cx) {
Poll::Pending => {}
Poll::Ready(_) => {
this.external.close();
}
}
// Depending on the turn, pick the stream to be polled first.
let ((first_is_external, first), (second_is_external, second)) = match this.turn {
Turn::External => ((true, this.external), (false, this.internal)),
Turn::Internal => ((false, this.internal), (true, this.external)),
};
// Toggle the turn so we poll the other stream in the next poll_next call.
this.turn.advance();
// Poll the first stream and track whether it's drained or not.
let first_drained = match first.poll_next(cx) {
Poll::Pending => false,
Poll::Ready(None) => {
// If this stream is the external one, notify once that it has been drained.
if first_is_external {
if let Some(snd) = this.on_external_drained.take() {
snd.send(()).unwrap_or_else(|err| {
error!(?err, "Failed to notify the external events have been drained.");
});
};
}
true
}
res @ Poll::Ready(Some(_)) => return res,
};
match second.poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => {
// If this stream is the external one, notify once that it has been drained.
if second_is_external {
if let Some(snd) = this.on_external_drained.take() {
snd.send(()).unwrap_or_else(|err| {
error!(?err, "Failed to notify the external events have been drained.");
});
};
}
// If the first stream was also drained, then we are done. Otherwise, this stream
// remains pending.
if first_drained {
Poll::Ready(None)
} else {
Poll::Pending
}
}
res @ Poll::Ready(Some(_)) => {
// If the first stream wasn't drained, then make sure we continue with that other
// stream in the next call to poll_next as we just had an item to return from this
// second stream. Therefore, we undo the toggling of the turn done initially.
if !first_drained {
this.turn.advance();
}
res
}
}
}
}
/// Allows to termiante external event ingestion.
pub struct TerminateHandle {
snd: oneshot::Sender<()>,
external_drained: oneshot::Receiver<()>,
}
impl TerminateHandle {
/// Terminates external event ingestion. Buffered events will be drained. The returned future
/// will complete once all buffered external events have been drained.
pub async fn terminate(self) {
self.snd.send(()).unwrap_or_else(|err| {
error!(?err, "Failed to terminate the external event ingestion.");
});
self.external_drained
.await
.unwrap_or_else(|err| error!(?err, "Error waiting for external events to be drained."));
}
}
/// Allows to emit events of a restricted set of types.
///
/// Event producers will receive a `Dispatcher` instance that will allow them to emit events of
/// restricted set of types.
pub struct Dispatcher {
allowed_events: BTreeSet<AnyEventType>,
sender: Option<mpsc::Sender<Event>>,
}
/// Returns a no-op dispatcher.
impl Default for Dispatcher {
fn default() -> Self {
Self { allowed_events: BTreeSet::new(), sender: None }
}
}
impl Dispatcher {
fn new(allowed_events: BTreeSet<AnyEventType>, sender: mpsc::Sender<Event>) -> Self {
Self { allowed_events, sender: Some(sender) }
}
/// Emits an event. If the event isn't in the restricted set of allowed types, this operation
/// is a no-op. An error is returned when sending the event into the channel fails.
pub async fn emit(&mut self, event: Event) -> Result<(), mpsc::SendError> {
if let Some(sender) = &mut self.sender {
if self.allowed_events.contains(&event.ty()) {
sender.send(event).await?;
}
}
Ok(())
}
#[cfg(test)]
pub fn new_for_test(allowed_events: BTreeSet<AnyEventType>) -> (mpsc::Receiver<Event>, Self) {
let (sender, receiver) = mpsc::channel(100);
(receiver, Self::new(allowed_events, sender))
}
}
struct EventStreamLogger {
counters: BTreeMap<AnyEventType, inspect::UintProperty>,
component_log_node: BoundedListNode,
counters_node: inspect::Node,
_node: inspect::Node,
}
impl EventStreamLogger {
/// Creates a new event logger. All inspect data will be written as children of `parent`.
pub fn new(node: inspect::Node) -> Self {
let counters_node = node.create_child("event_counts");
let recent_events_node = node.create_child("recent_events");
Self {
_node: node,
counters: BTreeMap::new(),
counters_node,
component_log_node: BoundedListNode::new(recent_events_node, RECENT_EVENT_LIMIT),
}
}
/// Log a new component event to inspect.
pub fn log(&mut self, event: &Event) {
let ty = event.ty();
if self.counters.contains_key(&ty) {
self.counters.get_mut(&ty).unwrap().add(1);
} else {
let counter = self.counters_node.create_uint(ty.as_ref(), 1);
self.counters.insert(ty.clone(), counter);
}
// TODO(fxbug.dev/92374): leverage string references for the payload.
match &event.payload {
EventPayload::ComponentStarted(ComponentStartedPayload { component })
| EventPayload::ComponentStopped(ComponentStoppedPayload { component })
| EventPayload::DiagnosticsReady(DiagnosticsReadyPayload { component, .. })
| EventPayload::LogSinkRequested(LogSinkRequestedPayload { component, .. }) => {
self.log_inspect(ty.as_ref(), component);
}
}
}
fn log_inspect(&mut self, event_name: &str, identity: &ComponentIdentity) {
// TODO(fxbug.dev/92374): leverage string references for the `event_name`.
inspect_log!(self.component_log_node,
&*EVENT => event_name,
&*MONIKER => match &identity.instance_id {
Some(instance_id) => format!("{}:{}", identity.relative_moniker, instance_id),
None => identity.relative_moniker.to_string(),
}
);
}
}
/// Set of errors that can happen when setting up an event router and executing its dispatching loop.
#[derive(Debug, Error)]
pub enum RouterError {
#[error("Missing consumer for event type {0:?}")]
MissingConsumer(AnyEventType),
#[error("Missing producer for event type {0:?}")]
MissingProducer(AnyEventType),
}
/// Configuration for an event producer.
pub struct ProducerConfig<'a, T> {
/// The event producer that will receive a `Dispatcher`
pub producer: &'a mut T,
/// The set of events that the `producer` will be allowed to emit.
pub events: Vec<EventType>,
/// The set of singleton events that the `producer` will be allowed to emit.
pub singleton_events: Vec<SingletonEventType>,
/// The type of the producer.
pub producer_type: ProducerType,
}
/// Definition of the type of producers.
pub enum ProducerType {
/// An external producer emits events originating externally and that the archivist ingests.
/// These producers can be stopped to ensure all of their events are drained and handled when
/// shutting down the archivist.
External,
/// An internal producer emits events that are generated internally in the archivist.
/// These producers cannot be stopped and there's no guarantee their messages will be
/// drained and handled when shutting down the archivist.
Internal,
}
/// Configuration for an event consumer.
pub struct ConsumerConfig<'a, T> {
/// The event consumer that will receive events when they are emitted by producers.
pub consumer: &'a Arc<T>,
/// The set of event types that the `consumer` will receive.
pub events: Vec<EventType>,
/// The set of singleton event types that the `consumer` will receive.
pub singleton_events: Vec<SingletonEventType>,
}
/// Trait implemented by data types which receive events.
#[async_trait]
pub trait EventConsumer {
/// Event consumers will receive a call on this method when an event they are interested on
/// happens.
async fn handle(self: Arc<Self>, event: Event);
}
/// Trait implemented by data types which emit events.
pub trait EventProducer {
/// Whent registered, event producers will receive a call on this method with the `dispatcher`
/// they can use to emit events.
fn set_dispatcher(&mut self, dispatcher: Dispatcher);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events::types::ComponentIdentifier;
use assert_matches::assert_matches;
use fidl_fuchsia_logger::LogSinkMarker;
use fuchsia_async as fasync;
use fuchsia_inspect::assert_data_tree;
use fuchsia_zircon as zx;
use futures::{lock::Mutex, FutureExt};
use lazy_static::lazy_static;
const TEST_URL: &'static str = "NO-OP URL";
const FAKE_TIMESTAMP: i64 = 5;
lazy_static! {
static ref IDENTITY: ComponentIdentity = ComponentIdentity::from_identifier_and_url(
ComponentIdentifier::parse_from_moniker("./a/b").unwrap(),
TEST_URL
);
static ref LEGACY_IDENTITY: ComponentIdentity = ComponentIdentity::from_identifier_and_url(
ComponentIdentifier::Legacy {
instance_id: "12345".to_string(),
moniker: vec!["a", "b", "foo.cmx"].into(),
},
&*TEST_URL
);
}
#[derive(Default)]
struct TestEventProducer {
dispatcher: Dispatcher,
}
impl TestEventProducer {
async fn emit(&mut self, event_type: AnyEventType, identity: ComponentIdentity) {
let event = match event_type {
AnyEventType::General(EventType::ComponentStarted) => Event {
timestamp: zx::Time::from_nanos(FAKE_TIMESTAMP),
payload: EventPayload::ComponentStarted(ComponentStartedPayload {
component: identity,
}),
},
AnyEventType::General(EventType::ComponentStopped) => Event {
timestamp: zx::Time::from_nanos(FAKE_TIMESTAMP),
payload: EventPayload::ComponentStopped(ComponentStoppedPayload {
component: identity,
}),
},
AnyEventType::Singleton(SingletonEventType::DiagnosticsReady) => Event {
timestamp: zx::Time::from_nanos(FAKE_TIMESTAMP),
payload: EventPayload::DiagnosticsReady(DiagnosticsReadyPayload {
component: identity,
directory: None,
}),
},
AnyEventType::Singleton(SingletonEventType::LogSinkRequested) => Event {
timestamp: zx::Time::from_nanos(FAKE_TIMESTAMP),
payload: EventPayload::LogSinkRequested(LogSinkRequestedPayload {
component: identity,
request_stream: None,
}),
},
};
let _ = self.dispatcher.emit(event).await;
}
}
impl EventProducer for TestEventProducer {
fn set_dispatcher(&mut self, dispatcher: Dispatcher) {
self.dispatcher = dispatcher;
}
}
struct TestEventConsumer {
event_sender: Mutex<mpsc::Sender<Event>>,
}
impl TestEventConsumer {
fn new() -> (mpsc::Receiver<Event>, Arc<Self>) {
let (event_sender, event_receiver) = mpsc::channel(10);
(event_receiver, Arc::new(Self { event_sender: Mutex::new(event_sender) }))
}
}
#[async_trait]
impl EventConsumer for TestEventConsumer {
async fn handle(self: Arc<Self>, event: Event) {
self.event_sender.lock().await.send(event).await.unwrap();
}
}
#[fuchsia::test]
async fn invalid_routing() {
let mut producer = TestEventProducer::default();
let (_receiver, consumer) = TestEventConsumer::new();
let mut router = EventRouter::new(inspect::Node::default());
router.add_producer(ProducerConfig {
producer: &mut producer,
producer_type: ProducerType::Internal,
events: vec![EventType::ComponentStarted],
singleton_events: vec![],
});
router.add_consumer(ConsumerConfig {
consumer: &consumer,
events: vec![EventType::ComponentStopped],
singleton_events: vec![],
});
// An explicit match is needed here since unwrap_err requires Debug implemented for both T
// and E in Result<T, E> and T is a pair which second element is `impl Future` which
// doesn't implement Debug.
match router.start() {
Err(err) => {
assert_matches!(
err,
RouterError::MissingConsumer(AnyEventType::General(
EventType::ComponentStarted
)) | RouterError::MissingProducer(AnyEventType::General(
EventType::ComponentStopped
))
);
}
Ok(_) => panic!("expected an error from routing events"),
}
let mut producer = TestEventProducer::default();
let (_receiver, consumer) = TestEventConsumer::new();
let mut router = EventRouter::new(inspect::Node::default());
router.add_producer(ProducerConfig {
producer: &mut producer,
producer_type: ProducerType::External,
events: vec![],
singleton_events: vec![SingletonEventType::DiagnosticsReady],
});
router.add_consumer(ConsumerConfig {
consumer: &consumer,
events: vec![],
singleton_events: vec![SingletonEventType::LogSinkRequested],
});
match router.start() {
Err(err) => {
assert_matches!(
err,
RouterError::MissingConsumer(AnyEventType::Singleton(
SingletonEventType::DiagnosticsReady
)) | RouterError::MissingProducer(AnyEventType::Singleton(
SingletonEventType::LogSinkRequested,
))
);
}
Ok(_) => panic!("expected an error from routing events"),
}
}
#[fuchsia::test]
async fn singleton_event_subscription() {
let mut producer = TestEventProducer::default();
let (mut first_receiver, first_consumer) = TestEventConsumer::new();
let (mut second_receiver, second_consumer) = TestEventConsumer::new();
let mut router = EventRouter::new(inspect::Node::default());
router.add_producer(ProducerConfig {
producer: &mut producer,
producer_type: ProducerType::External,
events: vec![],
singleton_events: vec![SingletonEventType::LogSinkRequested],
});
router.add_consumer(ConsumerConfig {
consumer: &first_consumer,
events: vec![],
singleton_events: vec![SingletonEventType::LogSinkRequested],
});
router.add_consumer(ConsumerConfig {
consumer: &second_consumer,
events: vec![],
singleton_events: vec![SingletonEventType::LogSinkRequested],
});
let (_terminate_handle, fut) = router.start().unwrap();
let _router_task = fasync::Task::spawn(fut);
// Emit an event
let (_, request_stream) =
fidl::endpoints::create_request_stream::<LogSinkMarker>().unwrap();
let timestamp = zx::Time::get_monotonic();
producer
.dispatcher
.emit(Event {
timestamp: timestamp.clone(),
payload: EventPayload::LogSinkRequested(LogSinkRequestedPayload {
component: IDENTITY.clone(),
request_stream: Some(request_stream),
}),
})
.await
.unwrap();
// The first consumer that was registered must receive the request stream. The second one
// must receive no payload, but still receive the event.
let first_event = first_receiver.next().await.unwrap();
assert_matches!(first_event, Event {
payload: EventPayload::LogSinkRequested(payload),
..
} => {
assert_eq!(payload.component, *IDENTITY);
assert!(payload.request_stream.is_some());
});
let second_event = second_receiver.next().await.unwrap();
assert_matches!(second_event, Event {
payload: EventPayload::LogSinkRequested(payload),
..
} => {
assert_eq!(payload.component, *IDENTITY);
assert!(payload.request_stream.is_none());
});
}
#[fuchsia::test]
async fn regular_event_subscription() {
let mut producer = TestEventProducer::default();
let (mut first_receiver, first_consumer) = TestEventConsumer::new();
let (mut second_receiver, second_consumer) = TestEventConsumer::new();
let inspector = inspect::Inspector::new();
let mut router = EventRouter::new(inspector.root().create_child("events"));
router.add_producer(ProducerConfig {
producer: &mut producer,
producer_type: ProducerType::External,
events: vec![EventType::ComponentStarted],
singleton_events: vec![],
});
router.add_consumer(ConsumerConfig {
consumer: &first_consumer,
events: vec![EventType::ComponentStarted],
singleton_events: vec![],
});
router.add_consumer(ConsumerConfig {
consumer: &second_consumer,
events: vec![EventType::ComponentStarted],
singleton_events: vec![],
});
let (_terminate_handle, fut) = router.start().unwrap();
let _router_task = fasync::Task::spawn(fut);
let timestamp = zx::Time::get_monotonic();
// Emit an event
producer
.dispatcher
.emit(Event {
timestamp,
payload: EventPayload::ComponentStarted(ComponentStartedPayload {
component: IDENTITY.clone(),
}),
})
.await
.unwrap();
// Both consumers receive the exact same event.
let first_event = first_receiver.next().await.unwrap();
assert_matches!(first_event, Event {
payload: EventPayload::ComponentStarted(payload),
..
} => {
assert_eq!(payload, ComponentStartedPayload { component: IDENTITY.clone() });
});
let second_event = second_receiver.next().await.unwrap();
assert_matches!(
second_event,
Event { timestamp: t, payload: EventPayload::ComponentStarted(payload) } => {
assert_eq!(payload, ComponentStartedPayload { component: IDENTITY.clone() });
assert_eq!(timestamp, t);
}
);
}
#[fuchsia::test]
async fn consumers_cleanup() {
let mut producer = TestEventProducer::default();
let (mut first_receiver, first_consumer) = TestEventConsumer::new();
let (mut second_receiver, second_consumer) = TestEventConsumer::new();
let (mut third_receiver, third_consumer) = TestEventConsumer::new();
let mut router = EventRouter::new(inspect::Node::default());
router.add_producer(ProducerConfig {
producer: &mut producer,
producer_type: ProducerType::Internal,
events: vec![EventType::ComponentStarted],
singleton_events: vec![],
});
router.add_consumer(ConsumerConfig {
consumer: &first_consumer,
events: vec![EventType::ComponentStarted],
singleton_events: vec![],
});
router.add_consumer(ConsumerConfig {
consumer: &second_consumer,
events: vec![EventType::ComponentStarted],
singleton_events: vec![],
});
router.add_consumer(ConsumerConfig {
consumer: &third_consumer,
events: vec![EventType::ComponentStarted],
singleton_events: vec![],
});
drop(first_consumer);
drop(third_consumer);
let (_terminate_handle, fut) = router.start().unwrap();
let _router_task = fasync::Task::spawn(fut);
// Emit an event
producer
.dispatcher
.emit(Event {
timestamp: zx::Time::get_monotonic(),
payload: EventPayload::ComponentStarted(ComponentStartedPayload {
component: IDENTITY.clone(),
}),
})
.await
.unwrap();
// We see the event only in the receiver which consumer wasn't dropped.
let event = second_receiver.next().await.unwrap();
assert_matches!(event.payload, EventPayload::ComponentStarted(_));
assert!(first_receiver.next().now_or_never().unwrap().is_none());
assert!(third_receiver.next().now_or_never().unwrap().is_none());
// We see additional events in the second receiver which remains alive.
producer
.dispatcher
.emit(Event {
timestamp: zx::Time::get_monotonic(),
payload: EventPayload::ComponentStarted(ComponentStartedPayload {
component: IDENTITY.clone(),
}),
})
.await
.unwrap();
let event = second_receiver.next().await.unwrap();
assert_matches!(event.payload, EventPayload::ComponentStarted(_));
assert!(first_receiver.next().now_or_never().unwrap().is_none());
assert!(third_receiver.next().now_or_never().unwrap().is_none());
}
#[fuchsia::test]
async fn inspect_log() {
let inspector = inspect::Inspector::new();
let mut router = EventRouter::new(inspector.root().create_child("events"));
let mut producer1 = TestEventProducer::default();
let mut producer2 = TestEventProducer::default();
let (receiver, consumer) = TestEventConsumer::new();
router.add_consumer(ConsumerConfig {
consumer: &consumer,
events: vec![EventType::ComponentStarted, EventType::ComponentStopped],
singleton_events: vec![
SingletonEventType::LogSinkRequested,
SingletonEventType::DiagnosticsReady,
],
});
router.add_producer(ProducerConfig {
producer: &mut producer1,
producer_type: ProducerType::Internal,
events: vec![EventType::ComponentStarted, EventType::ComponentStopped],
singleton_events: vec![SingletonEventType::DiagnosticsReady],
});
router.add_producer(ProducerConfig {
producer: &mut producer2,
producer_type: ProducerType::Internal,
events: vec![EventType::ComponentStarted],
singleton_events: vec![SingletonEventType::LogSinkRequested],
});
producer1
.emit(AnyEventType::General(EventType::ComponentStarted), LEGACY_IDENTITY.clone())
.await;
producer1
.emit(
AnyEventType::Singleton(SingletonEventType::DiagnosticsReady),
LEGACY_IDENTITY.clone(),
)
.await;
producer1
.emit(AnyEventType::General(EventType::ComponentStopped), LEGACY_IDENTITY.clone())
.await;
producer2.emit(AnyEventType::General(EventType::ComponentStarted), IDENTITY.clone()).await;
producer2
.emit(AnyEventType::Singleton(SingletonEventType::LogSinkRequested), IDENTITY.clone())
.await;
// Consume the events.
let (_terminate_handle, fut) = router.start().unwrap();
let _router_task = fasync::Task::spawn(fut);
fasync::Task::spawn(async move {
receiver.take(5).collect::<Vec<_>>().await;
})
.await;
assert_data_tree!(inspector, root: {
events: {
event_counts: {
component_started: 2u64,
component_stopped: 1u64,
diagnostics_ready: 1u64,
log_sink_requested: 1u64
},
recent_events: {
"0": {
"@time": inspect::testing::AnyProperty,
event: "component_started",
moniker: "a/b/foo.cmx:12345"
},
"1": {
"@time": inspect::testing::AnyProperty,
event: "diagnostics_ready",
moniker: "a/b/foo.cmx:12345"
},
"2": {
"@time": inspect::testing::AnyProperty,
event: "component_stopped",
moniker: "a/b/foo.cmx:12345"
},
"3": {
"@time": inspect::testing::AnyProperty,
event: "component_started",
moniker: "a/b"
},
"4": {
"@time": inspect::testing::AnyProperty,
event: "log_sink_requested",
moniker: "a/b"
},
}
}
});
}
#[fuchsia::test]
async fn event_stream_round_robin_semantics() {
let inspector = inspect::Inspector::new();
let mut router = EventRouter::new(inspector.root().create_child("events"));
let mut producer1 = TestEventProducer::default();
let mut producer2 = TestEventProducer::default();
let (receiver, consumer) = TestEventConsumer::new();
router.add_consumer(ConsumerConfig {
consumer: &consumer,
events: vec![EventType::ComponentStarted, EventType::ComponentStopped],
singleton_events: vec![],
});
router.add_producer(ProducerConfig {
producer: &mut producer1,
producer_type: ProducerType::Internal,
events: vec![EventType::ComponentStarted],
singleton_events: vec![],
});
router.add_producer(ProducerConfig {
producer: &mut producer2,
producer_type: ProducerType::External,
events: vec![EventType::ComponentStopped],
singleton_events: vec![],
});
producer1
.emit(AnyEventType::General(EventType::ComponentStarted), LEGACY_IDENTITY.clone())
.await;
producer1.emit(AnyEventType::General(EventType::ComponentStarted), IDENTITY.clone()).await;
producer2
.emit(AnyEventType::General(EventType::ComponentStopped), LEGACY_IDENTITY.clone())
.await;
producer2.emit(AnyEventType::General(EventType::ComponentStopped), IDENTITY.clone()).await;
// We should see an event from each producer followed by an event from the other producer.
// Also events from each producer must be in order.
let (_terminate_handle, fut) = router.start().unwrap();
let _router_task = fasync::Task::spawn(fut);
let events = receiver.take(4).collect::<Vec<_>>().await;
let expected_events = vec![
stopped(LEGACY_IDENTITY.clone()),
started(LEGACY_IDENTITY.clone()),
stopped(IDENTITY.clone()),
started(IDENTITY.clone()),
];
assert_eq!(events.len(), expected_events.len());
for (event, expected_event) in std::iter::zip(events, expected_events) {
assert_event(event, expected_event);
}
}
#[fuchsia::test]
async fn external_stream_draining() {
let inspector = inspect::Inspector::new();
let mut router = EventRouter::new(inspector.root().create_child("events"));
let mut internal_producer = TestEventProducer::default();
let mut external_producer = TestEventProducer::default();
let (mut receiver, consumer) = TestEventConsumer::new();
router.add_consumer(ConsumerConfig {
consumer: &consumer,
events: vec![EventType::ComponentStarted, EventType::ComponentStopped],
singleton_events: vec![],
});
router.add_producer(ProducerConfig {
producer: &mut internal_producer,
producer_type: ProducerType::Internal,
events: vec![EventType::ComponentStarted],
singleton_events: vec![],
});
router.add_producer(ProducerConfig {
producer: &mut external_producer,
producer_type: ProducerType::External,
events: vec![EventType::ComponentStopped],
singleton_events: vec![],
});
internal_producer
.emit(AnyEventType::General(EventType::ComponentStarted), IDENTITY.clone())
.await;
external_producer
.emit(AnyEventType::General(EventType::ComponentStopped), IDENTITY.clone())
.await;
let (terminate_handle, fut) = router.start().unwrap();
let _router_task = fasync::Task::spawn(fut);
let on_drained = terminate_handle.terminate();
let drain_finished = fasync::Task::spawn(async move { on_drained.await });
assert_event(receiver.next().await.unwrap(), stopped(IDENTITY.clone()));
assert_event(receiver.next().await.unwrap(), started(IDENTITY.clone()));
// This future must be complete now.
drain_finished.await;
// We must never see any new event emitted by the external producer. But we must see
// events emitted by the internal producer.
external_producer
.emit(AnyEventType::General(EventType::ComponentStopped), IDENTITY.clone())
.await;
assert!(receiver.next().now_or_never().is_none());
internal_producer
.emit(AnyEventType::General(EventType::ComponentStarted), IDENTITY.clone())
.await;
assert_event(receiver.next().await.unwrap(), started(IDENTITY.clone()));
}
fn assert_event(event: Event, other: Event) {
assert_eq!(event.timestamp, other.timestamp);
match (event.payload, other.payload) {
(
EventPayload::ComponentStarted(payload),
EventPayload::ComponentStarted(other_payload),
) => {
assert_eq!(payload, other_payload);
}
(
EventPayload::ComponentStopped(payload),
EventPayload::ComponentStopped(other_payload),
) => {
assert_eq!(payload, other_payload);
}
_ => unimplemented!("no other combinations are expected in these tests"),
}
}
fn started(identity: ComponentIdentity) -> Event {
Event {
timestamp: zx::Time::from_nanos(FAKE_TIMESTAMP),
payload: EventPayload::ComponentStarted(ComponentStartedPayload {
component: identity,
}),
}
}
fn stopped(identity: ComponentIdentity) -> Event {
Event {
timestamp: zx::Time::from_nanos(FAKE_TIMESTAMP),
payload: EventPayload::ComponentStopped(ComponentStoppedPayload {
component: identity,
}),
}
}
}