blob: 5144cc3a5a019e734dea8bcf7f4144064ac19c69 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::model::events::{
dispatcher::{EventDispatcher, EventDispatcherScope},
event::Event,
registry::ComponentEventRoute,
},
futures::{channel::mpsc, stream::Peekable, task::Context, Stream, StreamExt},
moniker::ExtendedMoniker,
std::{
pin::Pin,
sync::{Arc, Weak},
task::Poll,
},
};
#[cfg(test)]
use {
hooks::{EventType, HasEventType},
moniker::Moniker,
};
pub struct EventStream {
/// The receiving end of a channel of Events.
rx: Peekable<mpsc::UnboundedReceiver<(Event, Option<Vec<ComponentEventRoute>>)>>,
/// The sending end of a channel of Events.
tx: mpsc::UnboundedSender<(Event, Option<Vec<ComponentEventRoute>>)>,
/// A vector of EventDispatchers to this EventStream.
/// EventStream assumes ownership of the dispatchers. They are
/// destroyed when this EventStream is destroyed.
dispatchers: Vec<Arc<EventDispatcher>>,
/// The route taken for this event stream.
/// This is used for access control and namespacing during
/// serving of the event stream.
pub route: Vec<ComponentEventRoute>,
/// Routing tasks associated with this event stream.
/// Tasks associated with the stream will be terminated
/// when the EventStream is destroyed.
pub tasks: Vec<fuchsia_async::Task<()>>,
}
impl EventStream {
pub fn new() -> Self {
let (tx, rx) = mpsc::unbounded();
Self { rx: rx.peekable(), tx, dispatchers: vec![], route: vec![], tasks: vec![] }
}
pub fn create_dispatcher(
&mut self,
subscriber: ExtendedMoniker,
scopes: Vec<EventDispatcherScope>,
route: Vec<ComponentEventRoute>,
) -> Weak<EventDispatcher> {
self.route = route.clone();
let dispatcher =
Arc::new(EventDispatcher::new_with_route(subscriber, scopes, self.tx.clone(), route));
self.dispatchers.push(dispatcher.clone());
Arc::downgrade(&dispatcher)
}
pub fn sender(&self) -> mpsc::UnboundedSender<(Event, Option<Vec<ComponentEventRoute>>)> {
self.tx.clone()
}
/// Waits for an event with a particular EventType against a component with a
/// particular moniker. Ignores all other events.
#[cfg(test)]
pub async fn wait_until(
&mut self,
expected_event_type: EventType,
expected_moniker: Moniker,
) -> Option<Event> {
let expected_moniker = ExtendedMoniker::ComponentInstance(expected_moniker);
while let Some((event, _)) = self.next().await {
let actual_event_type = event.event.event_type();
if expected_moniker == event.event.target_moniker
&& expected_event_type == actual_event_type
{
return Some(event);
}
}
None
}
}
impl Stream for EventStream {
type Item = (Event, Option<Vec<ComponentEventRoute>>);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.rx).poll_next(cx)
}
}