blob: 7ea6ef561d6e2dac5cf860e469c8b13af9e7296b [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::model::{
events::{
dispatcher::EventDispatcher,
event::{Event, SyncMode},
},
hooks::EventType,
moniker::AbsoluteMoniker,
},
fuchsia_trace as trace,
futures::{channel::mpsc, StreamExt},
std::{
collections::HashSet,
sync::{Arc, Weak},
},
};
pub struct EventStream {
/// The receiving end of a channel of Events.
rx: mpsc::Receiver<Event>,
/// The sending end of a channel of Events.
tx: mpsc::Sender<Event>,
/// A vector of EventDispatchers to this EventStream.
/// EventStream assumes ownership of the dispatchers. They are
/// destroyed when this EventStream is destroyed.
dispatchers: Vec<Arc<EventDispatcher>>,
}
impl EventStream {
pub fn new() -> Self {
let (tx, rx) = mpsc::channel(2);
Self { rx, tx, dispatchers: vec![] }
}
pub fn create_dispatcher(
&mut self,
sync_mode: SyncMode,
scope_monikers: HashSet<AbsoluteMoniker>,
) -> Weak<EventDispatcher> {
let dispatcher =
Arc::new(EventDispatcher::new(sync_mode.clone(), scope_monikers, self.tx.clone()));
self.dispatchers.push(dispatcher.clone());
Arc::downgrade(&dispatcher)
}
/// Receives the next event from the sender.
pub async fn next(&mut self) -> Option<Event> {
trace::duration!("component_manager", "events:next");
self.rx.next().await
}
/// Waits for an event with a particular EventType against a component with a
/// particular moniker. Ignores all other events.
pub async fn wait_until(
&mut self,
expected_event_type: EventType,
expected_moniker: AbsoluteMoniker,
) -> Option<Event> {
while let Some(event) = self.next().await {
let actual_event_type = event.event.payload.type_();
if expected_moniker == event.event.target_moniker
&& expected_event_type == actual_event_type
{
return Some(event);
}
event.resume();
}
None
}
}