[components] Split up events code into file per class.
To improve readability and discoverability, this CL splits events "classes"
into multiple files in the model/events/ directory.
Change-Id: I29cb51819a351dcad17007f8252dc1c65ec9d761
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/377909
Commit-Queue: Fady Samuel <fsamuel@google.com>
Reviewed-by: Miguel Flores <miguelfrde@google.com>
Testability-Review: Miguel Flores <miguelfrde@google.com>
diff --git a/src/sys/component_manager/src/builtin_environment.rs b/src/sys/component_manager/src/builtin_environment.rs
index bd3ffbb..920296c 100644
--- a/src/sys/component_manager/src/builtin_environment.rs
+++ b/src/sys/component_manager/src/builtin_environment.rs
@@ -19,7 +19,7 @@
model::{
binding::Binder,
error::ModelError,
- events::core::EventSourceFactory,
+ events::source_factory::EventSourceFactory,
hub::Hub,
model::{ComponentManagerConfig, Model},
runner::Runner,
diff --git a/src/sys/component_manager/src/framework.rs b/src/sys/component_manager/src/framework.rs
index 015ef32..6290ac3 100644
--- a/src/sys/component_manager/src/framework.rs
+++ b/src/sys/component_manager/src/framework.rs
@@ -336,7 +336,7 @@
builtin_environment::BuiltinEnvironment,
model::{
binding::Binder,
- events::{core::EventSource, registry::EventStream},
+ events::{source::EventSource, stream::EventStream},
model::ModelParams,
moniker::AbsoluteMoniker,
resolver::ResolverRegistry,
diff --git a/src/sys/component_manager/src/model/events/core.rs b/src/sys/component_manager/src/model/events/core.rs
deleted file mode 100644
index 25bef08..0000000
--- a/src/sys/component_manager/src/model/events/core.rs
+++ /dev/null
@@ -1,487 +0,0 @@
-// Copyright 2020 The Fuchsia Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-use {
- crate::{
- capability::{CapabilityProvider, CapabilitySource, FrameworkCapability},
- model::{
- error::ModelError,
- events::{
- registry::{EventRegistry, EventStream, RoutedEvent, SyncMode},
- serve::serve_event_source_sync,
- },
- hooks::{Event, EventPayload, EventType, Hook, HooksRegistration},
- model::Model,
- moniker::AbsoluteMoniker,
- realm::Realm,
- routing,
- },
- },
- async_trait::async_trait,
- cm_rust::{CapabilityName, CapabilityPath, ComponentDecl, UseDecl, UseEventDecl},
- fidl::endpoints::ServerEnd,
- fidl_fuchsia_sys2 as fsys, fuchsia_async as fasync, fuchsia_zircon as zx,
- futures::lock::Mutex,
- lazy_static::lazy_static,
- log::*,
- maplit::hashset,
- std::{
- collections::{HashMap, HashSet},
- convert::TryInto,
- path::PathBuf,
- sync::{Arc, Weak},
- },
- thiserror::Error,
-};
-
-lazy_static! {
- pub static ref EVENT_SOURCE_SERVICE_PATH: CapabilityPath =
- "/svc/fuchsia.sys2.EventSource".try_into().unwrap();
- pub static ref EVENT_SOURCE_SYNC_SERVICE_PATH: CapabilityPath =
- "/svc/fuchsia.sys2.BlockingEventSource".try_into().unwrap();
-}
-
-/// Allows to create `EventSource`s and tracks all the created ones.
-pub struct EventSourceFactory {
- /// Tracks the event source used by each component ideantified with the given `moniker`.
- event_source_registry: Mutex<HashMap<AbsoluteMoniker, EventSource>>,
-
- /// The event registry. It subscribes to all events happening in the system and
- /// routes them to subscribers.
- // TODO(fxb/48512): instead of using a global registry integrate more with the hooks model.
- event_registry: Arc<EventRegistry>,
-
- /// The component model, needed to route events.
- model: Weak<Model>,
-}
-
-impl EventSourceFactory {
- /// Creates a new event source factory.
- pub fn new(model: Weak<Model>) -> Self {
- Self {
- event_source_registry: Mutex::new(HashMap::new()),
- event_registry: Arc::new(EventRegistry::new()),
- model,
- }
- }
-
- /// Creates the subscription to the required events.
- /// `CapabilityReady` used to track events and associate them with the component that needs them
- /// as well as the scoped that will be allowed. Also the EventSource protocol capability.
- pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
- let mut hooks = self.event_registry.hooks();
- hooks.append(&mut vec![
- // This hook provides the EventSource capability to components in the tree
- HooksRegistration::new(
- "EventSourceFactory",
- vec![EventType::CapabilityRouted, EventType::Destroyed, EventType::Resolved],
- Arc::downgrade(self) as Weak<dyn Hook>,
- ),
- ]);
- hooks
- }
-
- /// Creates a debug event source.
- pub async fn create_for_debug(&self) -> Result<EventSource, ModelError> {
- EventSource::new_for_debug(
- self.model.clone(),
- AbsoluteMoniker::root(),
- &self.event_registry,
- )
- .await
- }
-
- /// Creates a `EventSource` for the given `target_moniker`.
- pub async fn create(
- &self,
- target_moniker: AbsoluteMoniker,
- sync_mode: SyncMode,
- ) -> Result<EventSource, ModelError> {
- EventSource::new(self.model.clone(), target_moniker, &self.event_registry, sync_mode).await
- }
-
- /// Returns an EventSource. An EventSource holds an AbsoluteMoniker that
- /// corresponds to the realm in which it will receive events.
- async fn on_scoped_framework_capability_routed_async(
- self: Arc<Self>,
- capability_decl: &FrameworkCapability,
- target_moniker: AbsoluteMoniker,
- _scope_moniker: AbsoluteMoniker,
- capability: Option<Box<dyn CapabilityProvider>>,
- ) -> Result<Option<Box<dyn CapabilityProvider>>, ModelError> {
- match (capability, capability_decl) {
- (None, FrameworkCapability::Protocol(source_path))
- if *source_path == *EVENT_SOURCE_SERVICE_PATH
- || *source_path == *EVENT_SOURCE_SYNC_SERVICE_PATH =>
- {
- let event_source_registry = self.event_source_registry.lock().await;
- if let Some(event_source) = event_source_registry.get(&target_moniker) {
- Ok(Some(Box::new(event_source.clone()) as Box<dyn CapabilityProvider>))
- } else {
- return Err(ModelError::capability_discovery_error(format!(
- "Unable to find EventSource in registry for {}",
- target_moniker
- )));
- }
- }
- (c, _) => return Ok(c),
- }
- }
-
- async fn on_destroyed_async(self: &Arc<Self>, target_moniker: &AbsoluteMoniker) {
- let mut event_source_registry = self.event_source_registry.lock().await;
- event_source_registry.remove(&target_moniker);
- }
-
- async fn on_resolved_async(
- self: &Arc<Self>,
- target_moniker: &AbsoluteMoniker,
- decl: &ComponentDecl,
- ) -> Result<(), ModelError> {
- let sync_mode = if decl.uses_protocol_from_framework(&EVENT_SOURCE_SERVICE_PATH) {
- SyncMode::Async
- } else if decl.uses_protocol_from_framework(&EVENT_SOURCE_SYNC_SERVICE_PATH) {
- SyncMode::Sync
- } else {
- return Ok(());
- };
- let key = target_moniker.clone();
- let mut event_source_registry = self.event_source_registry.lock().await;
- // It is currently assumed that a component instance's declaration
- // is resolved only once. Someday, this may no longer be true if individual
- // components can be updated.
- assert!(!event_source_registry.contains_key(&key));
- // An EventSource is created on resolution in order to ensure that discovery
- // and resolution of children is not missed.
- let event_source = self.create(key.clone(), sync_mode).await?;
- event_source_registry.insert(key, event_source);
- Ok(())
- }
-
- #[cfg(test)]
- async fn has_event_source(&self, abs_moniker: &AbsoluteMoniker) -> bool {
- let event_source_registry = self.event_source_registry.lock().await;
- event_source_registry.contains_key(abs_moniker)
- }
-}
-
-#[async_trait]
-impl Hook for EventSourceFactory {
- async fn on(self: Arc<Self>, event: &Event) -> Result<(), ModelError> {
- match &event.payload {
- EventPayload::CapabilityRouted {
- source:
- CapabilitySource::Framework { capability, scope_moniker: Some(scope_moniker) },
- capability_provider,
- } => {
- let mut capability_provider = capability_provider.lock().await;
- *capability_provider = self
- .on_scoped_framework_capability_routed_async(
- &capability,
- event.target_moniker.clone(),
- scope_moniker.clone(),
- capability_provider.take(),
- )
- .await?;
- }
- EventPayload::Destroyed => {
- self.on_destroyed_async(&event.target_moniker).await;
- }
- EventPayload::Resolved { decl } => {
- self.on_resolved_async(&event.target_moniker, decl).await?;
- }
- _ => {}
- }
- Ok(())
- }
-}
-
-struct RouteEventsResult {
- /// Maps from source name to a set of scope monikers
- mapping: HashMap<CapabilityName, HashSet<AbsoluteMoniker>>,
-}
-
-impl RouteEventsResult {
- fn new() -> Self {
- Self { mapping: HashMap::new() }
- }
-
- fn insert(&mut self, source_name: CapabilityName, scope_moniker: AbsoluteMoniker) {
- self.mapping.entry(source_name).or_insert(HashSet::new()).insert(scope_moniker);
- }
-
- fn len(&self) -> usize {
- self.mapping.len()
- }
-
- fn contains_event(&self, event_name: &CapabilityName) -> bool {
- self.mapping.contains_key(event_name)
- }
-
- fn to_vec(self) -> Vec<RoutedEvent> {
- self.mapping
- .into_iter()
- .map(|(source_name, scope_monikers)| RoutedEvent { source_name, scope_monikers })
- .collect()
- }
-}
-
-/// A system responsible for implementing basic events functionality on a scoped realm.
-#[derive(Clone)]
-pub struct EventSource {
- /// The component model, needed to route events.
- model: Weak<Model>,
-
- /// The moniker identifying the realm that requested this event source
- target_moniker: AbsoluteMoniker,
-
- /// A shared reference to the event registry used to subscribe and dispatche events.
- registry: Weak<EventRegistry>,
-
- /// Used for `BlockingEventSource.StartComponentTree`.
- // TODO(fxb/48245): this shouldn't be done for any EventSource. Only for tests.
- resolve_instance_event_stream: Arc<Mutex<Option<EventStream>>>,
-
- /// Whether or not this is a debug instance.
- debug: bool,
-
- /// Whether or not this EventSource dispatches events asynchronously.
- sync_mode: SyncMode,
-}
-
-#[derive(Debug, Error)]
-pub enum EventsError {
- #[error("Registry not found")]
- RegistryNotFound,
-
- #[error("Events not allowed for subscription {:?}", names)]
- NotAvailable { names: Vec<CapabilityName> },
-
- // TODO(fxb/48720): use dedicated RoutingError type.
- #[error("Routing failed")]
- RoutingFailed(#[source] ModelError),
-}
-
-impl EventSource {
- /// Creates a new `EventSource` that will be used by the component identified with the given
- /// `target_moniker`.
- pub async fn new(
- model: Weak<Model>,
- target_moniker: AbsoluteMoniker,
- registry: &Arc<EventRegistry>,
- sync_mode: SyncMode,
- ) -> Result<Self, ModelError> {
- // TODO(fxb/48245): this shouldn't be done for any EventSource. Only for tests.
- let resolve_instance_event_stream = Arc::new(Mutex::new(if sync_mode == SyncMode::Async {
- None
- } else {
- Some(
- registry
- .subscribe(
- &sync_mode,
- vec![RoutedEvent {
- source_name: EventType::Resolved.into(),
- scope_monikers: hashset!(target_moniker.clone()),
- }],
- )
- .await,
- )
- }));
- Ok(Self {
- registry: Arc::downgrade(®istry),
- model,
- target_moniker,
- resolve_instance_event_stream,
- debug: false,
- sync_mode,
- })
- }
-
- async fn new_for_debug(
- model: Weak<Model>,
- target_moniker: AbsoluteMoniker,
- registry: &Arc<EventRegistry>,
- ) -> Result<Self, ModelError> {
- let mut event_source = Self::new(model, target_moniker, registry, SyncMode::Sync).await?;
- event_source.debug = true;
- Ok(event_source)
- }
-
- /// Drops the `Resolved` event stream, thereby permitting components within the
- /// realm to be started.
- pub async fn start_component_tree(&mut self) {
- let mut resolve_instance_event_stream = self.resolve_instance_event_stream.lock().await;
- *resolve_instance_event_stream = None;
- }
-
- /// Subscribes to events provided in the `events` vector.
- ///
- /// The client might request to subscribe to events that it's not allowed to see. Events
- /// that are allowed should have been defined in its manifest and either offered to it or
- /// requested from the current realm.
- pub async fn subscribe(
- &mut self,
- events: Vec<CapabilityName>,
- ) -> Result<EventStream, EventsError> {
- // Register event capabilities if any. It identifies the sources of these events (might be the
- // containing realm or this realm itself). It consturcts an "allow-list tree" of events and
- // realms.
- let events = if self.debug {
- events
- .into_iter()
- .map(|event| RoutedEvent {
- source_name: event.clone(),
- scope_monikers: hashset!(AbsoluteMoniker::root()),
- })
- .collect()
- } else {
- let route_result =
- self.route_events(&events).await.map_err(|e| EventsError::RoutingFailed(e))?;
- if route_result.len() != events.len() {
- let names = events
- .into_iter()
- .filter(|event| !route_result.contains_event(&event))
- .collect();
- return Err(EventsError::NotAvailable { names });
- }
- route_result.to_vec()
- };
-
- // Create an event stream for the given events
- if let Some(registry) = self.registry.upgrade() {
- return Ok(registry.subscribe(&self.sync_mode, events).await);
- }
- Err(EventsError::RegistryNotFound)
- }
-
- /// Serves a `EventSource` FIDL protocol.
- pub fn serve(self, stream: fsys::BlockingEventSourceRequestStream) {
- fasync::spawn(async move {
- serve_event_source_sync(self, stream).await;
- });
- }
-
- async fn route_events(
- &self,
- events: &Vec<CapabilityName>,
- ) -> Result<RouteEventsResult, ModelError> {
- let model = self.model.upgrade().ok_or(ModelError::ModelNotAvailable)?;
- let realm = model.look_up_realm(&self.target_moniker).await?;
- let decl = {
- let state = realm.lock_state().await;
- state.as_ref().expect("route_events: not registered").decl().clone()
- };
-
- let mut result = RouteEventsResult::new();
- for use_decl in decl.uses {
- match &use_decl {
- UseDecl::Event(event_decl) => {
- if !events.contains(&event_decl.target_name) {
- continue;
- }
- let (source_name, scope_moniker) = self.route_event(event_decl, &realm).await?;
- result.insert(source_name, scope_moniker);
- }
- _ => {}
- }
- }
-
- Ok(result)
- }
-
- /// Routes an event and returns its source name and scope on success.
- async fn route_event(
- &self,
- event_decl: &UseEventDecl,
- realm: &Arc<Realm>,
- ) -> Result<(CapabilityName, AbsoluteMoniker), ModelError> {
- routing::route_use_event_capability(&UseDecl::Event(event_decl.clone()), &realm).await.map(
- |source| match source {
- CapabilitySource::Framework {
- capability: FrameworkCapability::Event(source_name),
- scope_moniker: Some(scope_moniker),
- } => (source_name, scope_moniker),
- _ => unreachable!(),
- },
- )
- }
-}
-
-#[async_trait]
-impl CapabilityProvider for EventSource {
- async fn open(
- self: Box<Self>,
- _flags: u32,
- _open_mode: u32,
- _relative_path: PathBuf,
- server_end: zx::Channel,
- ) -> Result<(), ModelError> {
- let stream = ServerEnd::<fsys::BlockingEventSourceMarker>::new(server_end)
- .into_stream()
- .expect("could not convert channel into stream");
- self.serve(stream);
- Ok(())
- }
-}
-
-#[cfg(test)]
-mod tests {
- use {
- super::*,
- crate::model::{
- hooks::Hooks, model::ModelParams, resolver::ResolverRegistry,
- testing::test_helpers::ComponentDeclBuilder,
- },
- cm_rust::{UseProtocolDecl, UseSource},
- };
-
- async fn dispatch_resolved_event(
- hooks: &Hooks,
- target_moniker: &AbsoluteMoniker,
- ) -> Result<(), ModelError> {
- let decl = ComponentDeclBuilder::new()
- .use_(UseDecl::Protocol(UseProtocolDecl {
- source: UseSource::Framework,
- source_path: (*EVENT_SOURCE_SYNC_SERVICE_PATH).clone(),
- target_path: (*EVENT_SOURCE_SYNC_SERVICE_PATH).clone(),
- }))
- .build();
- let event =
- Event::new(target_moniker.clone(), EventPayload::Resolved { decl: decl.clone() });
- hooks.dispatch(&event).await
- }
-
- async fn dispatch_destroyed_event(
- hooks: &Hooks,
- target_moniker: &AbsoluteMoniker,
- ) -> Result<(), ModelError> {
- let event = Event::new(target_moniker.clone(), EventPayload::Destroyed);
- hooks.dispatch(&event).await
- }
-
- #[fuchsia_async::run_singlethreaded(test)]
- async fn drop_event_source_when_component_destroyed() {
- let model = {
- let registry = ResolverRegistry::new();
- Arc::new(Model::new(ModelParams {
- root_component_url: "test:///root".to_string(),
- root_resolver_registry: registry,
- }))
- };
- let event_source_factory = Arc::new(EventSourceFactory::new(Arc::downgrade(&model)));
-
- let hooks = Hooks::new(None);
- hooks.install(event_source_factory.hooks()).await;
-
- let root = AbsoluteMoniker::root();
-
- // Verify that there is no EventSource for the root until we dispatch the Resolved event.
- assert!(!event_source_factory.has_event_source(&root).await);
- dispatch_resolved_event(&hooks, &root).await.unwrap();
- assert!(event_source_factory.has_event_source(&root).await);
- // Verify that destroying the component destroys the EventSource.
- dispatch_destroyed_event(&hooks, &root).await.unwrap();
- assert!(!event_source_factory.has_event_source(&root).await);
- }
-}
diff --git a/src/sys/component_manager/src/model/events/dispatcher.rs b/src/sys/component_manager/src/model/events/dispatcher.rs
new file mode 100644
index 0000000..1617f41
--- /dev/null
+++ b/src/sys/component_manager/src/model/events/dispatcher.rs
@@ -0,0 +1,99 @@
+// Copyright 2020 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+ crate::model::{
+ events::event::{Event, SyncMode},
+ hooks::Event as ComponentEvent,
+ moniker::AbsoluteMoniker,
+ },
+ anyhow::Error,
+ fuchsia_trace as trace,
+ futures::{
+ channel::{mpsc, oneshot},
+ lock::Mutex,
+ sink::SinkExt,
+ },
+ std::collections::HashSet,
+};
+
+/// EventDispatcher and EventStream are two ends of a channel.
+///
+/// EventDispatcher represents the sending end of the channel.
+///
+/// An EventDispatcher receives events of a particular event type,
+/// and dispatches though events out to the EventStream if they fall within
+/// one of the scopes associated with the dispatcher.
+///
+/// EventDispatchers are owned by EventStreams. If an EventStream is dropped,
+/// all corresponding EventDispatchers are dropped.
+///
+/// An EventStream is owned by the client - usually a test harness or a
+/// EventSource. It receives a Event from an EventDispatcher and propagates it
+/// to the client.
+pub struct EventDispatcher {
+ /// Whether or not this EventDispatcher dispatches events asynchronously.
+ sync_mode: SyncMode,
+ /// Specifies the realms that this EventDispatcher can dispatch events from.
+ scope_monikers: HashSet<AbsoluteMoniker>,
+ /// An `mpsc::Sender` used to dispatch an event. Note that this
+ /// `mpsc::Sender` is wrapped in an Mutex<..> to allow it to be passed along
+ /// to other tasks for dispatch.
+ tx: Mutex<mpsc::Sender<Event>>,
+}
+
+impl EventDispatcher {
+ pub fn new(
+ sync_mode: SyncMode,
+ scope_monikers: HashSet<AbsoluteMoniker>,
+ tx: mpsc::Sender<Event>,
+ ) -> Self {
+ // TODO(fxb/48360): flatten scope_monikers. There might be monikers that are
+ // contained within another moniker in the list.
+ Self { sync_mode, scope_monikers, tx: Mutex::new(tx) }
+ }
+
+ /// Sends the event to an event stream, if fired in the scope of `scope_moniker`. Returns
+ /// a responder which can be blocked on.
+ pub async fn dispatch(
+ &self,
+ event: ComponentEvent,
+ ) -> Result<Option<oneshot::Receiver<()>>, Error> {
+ // TODO(fxb/48360): once flattening of monikers is done, we would expect to have a single
+ // moniker here. For now taking the first one and ignoring the rest.
+ // Ensure that the event is coming from a realm within the scope of this dispatcher.
+ let maybe_scope_moniker = self
+ .scope_monikers
+ .iter()
+ .filter(|moniker| moniker.contains_in_realm(&event.target_moniker))
+ .next();
+ if maybe_scope_moniker.is_none() {
+ return Ok(None);
+ }
+
+ let scope_moniker = maybe_scope_moniker.unwrap().clone();
+
+ trace::duration!("component_manager", "events:send");
+ let event_type = format!("{:?}", event.payload.type_());
+ let target_moniker = event.target_moniker.to_string();
+ trace::flow_begin!(
+ "component_manager",
+ "event",
+ event.id,
+ "event_type" => event_type.as_str(),
+ "target_moniker" => target_moniker.as_str()
+ );
+ let (maybe_responder_tx, maybe_responder_rx) = if self.sync_mode == SyncMode::Async {
+ (None, None)
+ } else {
+ let (responder_tx, responder_rx) = oneshot::channel();
+ (Some(responder_tx), Some(responder_rx))
+ };
+ {
+ let mut tx = self.tx.lock().await;
+ tx.send(Event { event, scope_moniker, responder: maybe_responder_tx }).await?;
+ }
+ Ok(maybe_responder_rx)
+ }
+}
diff --git a/src/sys/component_manager/src/model/events/event.rs b/src/sys/component_manager/src/model/events/event.rs
new file mode 100644
index 0000000..8380b72
--- /dev/null
+++ b/src/sys/component_manager/src/model/events/event.rs
@@ -0,0 +1,51 @@
+// Copyright 2020 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+ crate::model::{hooks::Event as ComponentEvent, moniker::AbsoluteMoniker},
+ fuchsia_trace as trace,
+ futures::channel::oneshot,
+};
+
+#[derive(PartialEq, Clone)]
+pub enum SyncMode {
+ Sync,
+ Async,
+}
+
+/// Created for a particular component event.
+/// Contains the Event that occurred along with a means to resume/unblock the component manager.
+#[must_use = "invoke resume() otherwise component manager will be halted indefinitely!"]
+pub struct Event {
+ /// The event itself.
+ pub event: ComponentEvent,
+
+ /// The scope where this event comes from. This can be seen as a superset of the
+ /// `event.target_moniker` itself given that the events might have been offered from an
+ /// ancestor realm.
+ pub scope_moniker: AbsoluteMoniker,
+
+ /// This Sender is used to unblock the component manager if available.
+ /// If a Sender is unspecified then that indicates that this event is asynchronous and
+ /// non-blocking.
+ pub responder: Option<oneshot::Sender<()>>,
+}
+
+impl Event {
+ pub fn sync_mode(&self) -> SyncMode {
+ if self.responder.is_none() {
+ SyncMode::Async
+ } else {
+ SyncMode::Sync
+ }
+ }
+
+ pub fn resume(self) {
+ trace::duration!("component_manager", "events:resume");
+ trace::flow_step!("component_manager", "event", self.event.id);
+ if let Some(responder) = self.responder {
+ responder.send(()).unwrap()
+ }
+ }
+}
diff --git a/src/sys/component_manager/src/model/events/mod.rs b/src/sys/component_manager/src/model/events/mod.rs
index a9af5f5..d8f52c0 100644
--- a/src/sys/component_manager/src/model/events/mod.rs
+++ b/src/sys/component_manager/src/model/events/mod.rs
@@ -2,6 +2,10 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-pub mod core;
+pub mod dispatcher;
+pub mod event;
pub mod registry;
pub(crate) mod serve;
+pub mod source;
+pub mod source_factory;
+pub mod stream;
diff --git a/src/sys/component_manager/src/model/events/registry.rs b/src/sys/component_manager/src/model/events/registry.rs
index b3d834d..6574258 100644
--- a/src/sys/component_manager/src/model/events/registry.rs
+++ b/src/sys/component_manager/src/model/events/registry.rs
@@ -5,185 +5,20 @@
use {
crate::model::{
error::ModelError,
+ events::{dispatcher::EventDispatcher, event::SyncMode, stream::EventStream},
hooks::{Event as ComponentEvent, EventType, Hook, HooksRegistration},
moniker::AbsoluteMoniker,
},
- anyhow::Error,
async_trait::async_trait,
cm_rust::CapabilityName,
fuchsia_trace as trace,
- futures::{channel::*, lock::Mutex, sink::SinkExt, StreamExt},
+ futures::lock::Mutex,
std::{
collections::{HashMap, HashSet},
sync::{Arc, Weak},
},
};
-#[derive(PartialEq, Clone)]
-pub enum SyncMode {
- Sync,
- Async,
-}
-
-/// Created for a particular component event.
-/// Contains the Event that occurred along with a means to resume/unblock the component manager.
-#[must_use = "invoke resume() otherwise component manager will be halted indefinitely!"]
-pub struct Event {
- /// The event itself.
- pub event: ComponentEvent,
-
- /// The scope where this event comes from. This can be seen as a superset of the
- /// `event.target_moniker` itself given that the events might have been offered from an
- /// ancestor realm.
- pub scope_moniker: AbsoluteMoniker,
-
- /// This Sender is used to unblock the component manager if available.
- /// If a Sender is unspecified then that indicates that this event is asynchronous and
- /// non-blocking.
- responder: Option<oneshot::Sender<()>>,
-}
-
-impl Event {
- pub fn sync_mode(&self) -> SyncMode {
- if self.responder.is_none() {
- SyncMode::Async
- } else {
- SyncMode::Sync
- }
- }
-
- pub fn resume(self) {
- trace::duration!("component_manager", "events:resume");
- trace::flow_step!("component_manager", "event", self.event.id);
- if let Some(responder) = self.responder {
- responder.send(()).unwrap()
- }
- }
-}
-
-/// EventDispatcher and EventStream are two ends of a channel.
-///
-/// An EventDispatcher is owned by the EventRegistry. It sends an
-/// Event to the EventStream.
-///
-/// An EventStream is owned by the client - usually a test harness or a
-/// EventSource. It receives a Event from an EventDispatcher and propagates it
-/// to the client.
-#[derive(Clone)]
-pub struct EventDispatcher {
- /// Whether or not this EventDispatcher dispatches events asynchronously.
- sync_mode: SyncMode,
- /// Specifies the realms that this EventDispatcher can dispatch events from.
- scope_monikers: HashSet<AbsoluteMoniker>,
- /// An `mpsc::Sender` used to dispatch an event. Note that this
- /// `mpsc::Sender` is wrapped in an Arc<Mutex<..>> to allow it to be cloneable
- /// and passed along to other tasks for dispatch.
- tx: Arc<Mutex<mpsc::Sender<Event>>>,
-}
-
-impl EventDispatcher {
- /// Creates a new event dispatcher. This dispatcher will only dispatch events that arrive from
- /// the given scopes and dispatch them under the given name for that scope.
- fn new(
- sync_mode: SyncMode,
- scope_monikers: HashSet<AbsoluteMoniker>,
- tx: mpsc::Sender<Event>,
- ) -> Self {
- // TODO(fxb/48360): flatten scope_monikers. There might be monikers that are
- // contained within another moniker in the list.
- Self { sync_mode, scope_monikers, tx: Arc::new(Mutex::new(tx)) }
- }
-
- /// Sends the event to an event stream, if fired in the scope of `scope_moniker`. Returns
- /// a responder which can be blocked on.
- async fn send(&self, event: ComponentEvent) -> Result<Option<oneshot::Receiver<()>>, Error> {
- // TODO(fxb/48360): once flattening of monikers is done, we would expect to have a single
- // moniker here. For now taking the first one and ignoring the rest.
- // Ensure that the event is coming from a realm within the scope of this dispatcher.
- let maybe_scope_moniker = self
- .scope_monikers
- .iter()
- .filter(|moniker| moniker.contains_in_realm(&event.target_moniker))
- .next();
- if maybe_scope_moniker.is_none() {
- return Ok(None);
- }
-
- let scope_moniker = maybe_scope_moniker.unwrap().clone();
-
- trace::duration!("component_manager", "events:send");
- let event_type = format!("{:?}", event.payload.type_());
- let target_moniker = event.target_moniker.to_string();
- trace::flow_begin!(
- "component_manager",
- "event",
- event.id,
- "event_type" => event_type.as_str(),
- "target_moniker" => target_moniker.as_str()
- );
- let (maybe_responder_tx, maybe_responder_rx) = if self.sync_mode == SyncMode::Async {
- (None, None)
- } else {
- let (responder_tx, responder_rx) = oneshot::channel();
- (Some(responder_tx), Some(responder_rx))
- };
- {
- let mut tx = self.tx.lock().await;
- tx.send(Event { event, scope_moniker, responder: maybe_responder_tx }).await?;
- }
- Ok(maybe_responder_rx)
- }
-}
-
-pub struct EventStream {
- rx: mpsc::Receiver<Event>,
- tx: mpsc::Sender<Event>,
- dispatchers: Vec<Arc<EventDispatcher>>,
-}
-
-impl EventStream {
- fn new() -> Self {
- let (tx, rx) = mpsc::channel(2);
- Self { rx, tx, dispatchers: vec![] }
- }
-
- fn create_dispatcher(
- &mut self,
- sync_mode: SyncMode,
- scope_monikers: HashSet<AbsoluteMoniker>,
- ) -> Weak<EventDispatcher> {
- let dispatcher =
- Arc::new(EventDispatcher::new(sync_mode.clone(), scope_monikers, self.tx.clone()));
- self.dispatchers.push(dispatcher.clone());
- Arc::downgrade(&dispatcher)
- }
-
- /// Receives the next event from the sender.
- pub async fn next(&mut self) -> Option<Event> {
- trace::duration!("component_manager", "events:next");
- self.rx.next().await
- }
-
- /// Waits for an event with a particular EventType against a component with a
- /// particular moniker. Ignores all other events.
- pub async fn wait_until(
- &mut self,
- expected_event_type: EventType,
- expected_moniker: AbsoluteMoniker,
- ) -> Option<Event> {
- while let Some(event) = self.next().await {
- let actual_event_type = event.event.payload.type_();
- if expected_moniker == event.event.target_moniker
- && expected_event_type == actual_event_type
- {
- return Some(event);
- }
- event.resume();
- }
- None
- }
-}
-
pub struct RoutedEvent {
pub source_name: CapabilityName,
pub scope_monikers: HashSet<AbsoluteMoniker>,
@@ -232,8 +67,8 @@
// Copy the senders so we don't hold onto the sender map lock
// If we didn't do this, it is possible to deadlock while holding onto this lock.
// For example,
- // Task A : call send(event1) -> lock on sender map -> send -> wait for responders
- // Task B : call send(event2) -> lock on sender map
+ // Task A : call dispatch(event1) -> lock on sender map -> send -> wait for responders
+ // Task B : call dispatch(event2) -> lock on sender map
// If task B was required to respond to event1, then this is a deadlock.
// Neither task can make progress.
let dispatchers = {
@@ -257,7 +92,7 @@
let mut responder_channels = vec![];
for dispatcher in dispatchers {
- let result = dispatcher.send(event.clone()).await;
+ let result = dispatcher.dispatch(event.clone()).await;
match result {
Ok(Some(responder_channel)) => {
// A future can be canceled if the EventStream was dropped after
diff --git a/src/sys/component_manager/src/model/events/serve.rs b/src/sys/component_manager/src/model/events/serve.rs
index e8e8640..5e50c14 100644
--- a/src/sys/component_manager/src/model/events/serve.rs
+++ b/src/sys/component_manager/src/model/events/serve.rs
@@ -7,8 +7,11 @@
capability::{CapabilityProvider, CapabilitySource, FrameworkCapability},
model::{
error::ModelError,
- events::core::EventSource,
- events::registry::{Event, EventStream, SyncMode},
+ events::{
+ event::{Event, SyncMode},
+ source::EventSource,
+ stream::EventStream,
+ },
hooks::EventPayload,
moniker::{AbsoluteMoniker, RelativeMoniker},
},
diff --git a/src/sys/component_manager/src/model/events/source.rs b/src/sys/component_manager/src/model/events/source.rs
new file mode 100644
index 0000000..1bdbe89
--- /dev/null
+++ b/src/sys/component_manager/src/model/events/source.rs
@@ -0,0 +1,263 @@
+// Copyright 2020 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+ crate::{
+ capability::{CapabilityProvider, CapabilitySource, FrameworkCapability},
+ model::{
+ error::ModelError,
+ events::{
+ event::SyncMode,
+ registry::{EventRegistry, RoutedEvent},
+ serve::serve_event_source_sync,
+ stream::EventStream,
+ },
+ hooks::EventType,
+ model::Model,
+ moniker::AbsoluteMoniker,
+ realm::Realm,
+ routing,
+ },
+ },
+ async_trait::async_trait,
+ cm_rust::{CapabilityName, UseDecl, UseEventDecl},
+ fidl::endpoints::ServerEnd,
+ fidl_fuchsia_sys2 as fsys, fuchsia_async as fasync, fuchsia_zircon as zx,
+ futures::lock::Mutex,
+ maplit::hashset,
+ std::{
+ collections::{HashMap, HashSet},
+ path::PathBuf,
+ sync::{Arc, Weak},
+ },
+ thiserror::Error,
+};
+
+/// A system responsible for implementing basic events functionality on a scoped realm.
+#[derive(Clone)]
+pub struct EventSource {
+ /// The component model, needed to route events.
+ model: Weak<Model>,
+
+ /// The moniker identifying the realm that requested this event source
+ target_moniker: AbsoluteMoniker,
+
+ /// A shared reference to the event registry used to subscribe and dispatche events.
+ registry: Weak<EventRegistry>,
+
+ /// Used for `BlockingEventSource.StartComponentTree`.
+ // TODO(fxb/48245): this shouldn't be done for any EventSource. Only for tests.
+ resolve_instance_event_stream: Arc<Mutex<Option<EventStream>>>,
+
+ /// Whether or not this is a debug instance.
+ debug: bool,
+
+ /// Whether or not this EventSource dispatches events asynchronously.
+ sync_mode: SyncMode,
+}
+
+#[derive(Debug, Error)]
+pub enum EventsError {
+ #[error("Registry not found")]
+ RegistryNotFound,
+
+ #[error("Events not allowed for subscription {:?}", names)]
+ NotAvailable { names: Vec<CapabilityName> },
+
+ // TODO(fxb/48720): use dedicated RoutingError type.
+ #[error("Routing failed")]
+ RoutingFailed(#[source] ModelError),
+}
+
+struct RouteEventsResult {
+ /// Maps from source name to a set of scope monikers
+ mapping: HashMap<CapabilityName, HashSet<AbsoluteMoniker>>,
+}
+
+impl RouteEventsResult {
+ fn new() -> Self {
+ Self { mapping: HashMap::new() }
+ }
+
+ fn insert(&mut self, source_name: CapabilityName, scope_moniker: AbsoluteMoniker) {
+ self.mapping.entry(source_name).or_insert(HashSet::new()).insert(scope_moniker);
+ }
+
+ fn len(&self) -> usize {
+ self.mapping.len()
+ }
+
+ fn contains_event(&self, event_name: &CapabilityName) -> bool {
+ self.mapping.contains_key(event_name)
+ }
+
+ fn to_vec(self) -> Vec<RoutedEvent> {
+ self.mapping
+ .into_iter()
+ .map(|(source_name, scope_monikers)| RoutedEvent { source_name, scope_monikers })
+ .collect()
+ }
+}
+
+impl EventSource {
+ /// Creates a new `EventSource` that will be used by the component identified with the given
+ /// `target_moniker`.
+ pub async fn new(
+ model: Weak<Model>,
+ target_moniker: AbsoluteMoniker,
+ registry: &Arc<EventRegistry>,
+ sync_mode: SyncMode,
+ ) -> Result<Self, ModelError> {
+ // TODO(fxb/48245): this shouldn't be done for any EventSource. Only for tests.
+ let resolve_instance_event_stream = Arc::new(Mutex::new(if sync_mode == SyncMode::Async {
+ None
+ } else {
+ Some(
+ registry
+ .subscribe(
+ &sync_mode,
+ vec![RoutedEvent {
+ source_name: EventType::Resolved.into(),
+ scope_monikers: hashset!(target_moniker.clone()),
+ }],
+ )
+ .await,
+ )
+ }));
+ Ok(Self {
+ registry: Arc::downgrade(®istry),
+ model,
+ target_moniker,
+ resolve_instance_event_stream,
+ debug: false,
+ sync_mode,
+ })
+ }
+
+ pub async fn new_for_debug(
+ model: Weak<Model>,
+ target_moniker: AbsoluteMoniker,
+ registry: &Arc<EventRegistry>,
+ ) -> Result<Self, ModelError> {
+ let mut event_source = Self::new(model, target_moniker, registry, SyncMode::Sync).await?;
+ event_source.debug = true;
+ Ok(event_source)
+ }
+
+ /// Drops the `Resolved` event stream, thereby permitting components within the
+ /// realm to be started.
+ pub async fn start_component_tree(&mut self) {
+ let mut resolve_instance_event_stream = self.resolve_instance_event_stream.lock().await;
+ *resolve_instance_event_stream = None;
+ }
+
+ /// Subscribes to events provided in the `events` vector.
+ ///
+ /// The client might request to subscribe to events that it's not allowed to see. Events
+ /// that are allowed should have been defined in its manifest and either offered to it or
+ /// requested from the current realm.
+ pub async fn subscribe(
+ &mut self,
+ events: Vec<CapabilityName>,
+ ) -> Result<EventStream, EventsError> {
+ // Register event capabilities if any. It identifies the sources of these events (might be the
+ // containing realm or this realm itself). It consturcts an "allow-list tree" of events and
+ // realms.
+ let events = if self.debug {
+ events
+ .into_iter()
+ .map(|event| RoutedEvent {
+ source_name: event.clone(),
+ scope_monikers: hashset!(AbsoluteMoniker::root()),
+ })
+ .collect()
+ } else {
+ let route_result =
+ self.route_events(&events).await.map_err(|e| EventsError::RoutingFailed(e))?;
+ if route_result.len() != events.len() {
+ let names = events
+ .into_iter()
+ .filter(|event| !route_result.contains_event(&event))
+ .collect();
+ return Err(EventsError::NotAvailable { names });
+ }
+ route_result.to_vec()
+ };
+
+ // Create an event stream for the given events
+ if let Some(registry) = self.registry.upgrade() {
+ return Ok(registry.subscribe(&self.sync_mode, events).await);
+ }
+ Err(EventsError::RegistryNotFound)
+ }
+
+ /// Serves a `EventSource` FIDL protocol.
+ pub fn serve(self, stream: fsys::BlockingEventSourceRequestStream) {
+ fasync::spawn(async move {
+ serve_event_source_sync(self, stream).await;
+ });
+ }
+
+ async fn route_events(
+ &self,
+ events: &Vec<CapabilityName>,
+ ) -> Result<RouteEventsResult, ModelError> {
+ let model = self.model.upgrade().ok_or(ModelError::ModelNotAvailable)?;
+ let realm = model.look_up_realm(&self.target_moniker).await?;
+ let decl = {
+ let state = realm.lock_state().await;
+ state.as_ref().expect("route_events: not registered").decl().clone()
+ };
+
+ let mut result = RouteEventsResult::new();
+ for use_decl in decl.uses {
+ match &use_decl {
+ UseDecl::Event(event_decl) => {
+ if !events.contains(&event_decl.target_name) {
+ continue;
+ }
+ let (source_name, scope_moniker) = self.route_event(event_decl, &realm).await?;
+ result.insert(source_name, scope_moniker);
+ }
+ _ => {}
+ }
+ }
+
+ Ok(result)
+ }
+
+ /// Routes an event and returns its source name and scope on success.
+ async fn route_event(
+ &self,
+ event_decl: &UseEventDecl,
+ realm: &Arc<Realm>,
+ ) -> Result<(CapabilityName, AbsoluteMoniker), ModelError> {
+ routing::route_use_event_capability(&UseDecl::Event(event_decl.clone()), &realm).await.map(
+ |source| match source {
+ CapabilitySource::Framework {
+ capability: FrameworkCapability::Event(source_name),
+ scope_moniker: Some(scope_moniker),
+ } => (source_name, scope_moniker),
+ _ => unreachable!(),
+ },
+ )
+ }
+}
+
+#[async_trait]
+impl CapabilityProvider for EventSource {
+ async fn open(
+ self: Box<Self>,
+ _flags: u32,
+ _open_mode: u32,
+ _relative_path: PathBuf,
+ server_end: zx::Channel,
+ ) -> Result<(), ModelError> {
+ let stream = ServerEnd::<fsys::BlockingEventSourceMarker>::new(server_end)
+ .into_stream()
+ .expect("could not convert channel into stream");
+ self.serve(stream);
+ Ok(())
+ }
+}
diff --git a/src/sys/component_manager/src/model/events/source_factory.rs b/src/sys/component_manager/src/model/events/source_factory.rs
new file mode 100644
index 0000000..0b26f76
--- /dev/null
+++ b/src/sys/component_manager/src/model/events/source_factory.rs
@@ -0,0 +1,248 @@
+// Copyright 2020 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+ crate::{
+ capability::{CapabilityProvider, CapabilitySource, FrameworkCapability},
+ model::{
+ error::ModelError,
+ events::{event::SyncMode, registry::EventRegistry, source::EventSource},
+ hooks::{Event, EventPayload, EventType, Hook, HooksRegistration},
+ model::Model,
+ moniker::AbsoluteMoniker,
+ },
+ },
+ async_trait::async_trait,
+ cm_rust::{CapabilityPath, ComponentDecl},
+ futures::lock::Mutex,
+ lazy_static::lazy_static,
+ std::{
+ collections::HashMap,
+ convert::TryInto,
+ sync::{Arc, Weak},
+ },
+};
+
+lazy_static! {
+ pub static ref EVENT_SOURCE_SERVICE_PATH: CapabilityPath =
+ "/svc/fuchsia.sys2.EventSource".try_into().unwrap();
+ pub static ref EVENT_SOURCE_SYNC_SERVICE_PATH: CapabilityPath =
+ "/svc/fuchsia.sys2.BlockingEventSource".try_into().unwrap();
+}
+
+/// Allows to create `EventSource`s and tracks all the created ones.
+pub struct EventSourceFactory {
+ /// Tracks the event source used by each component ideantified with the given `moniker`.
+ event_source_registry: Mutex<HashMap<AbsoluteMoniker, EventSource>>,
+
+ /// The event registry. It subscribes to all events happening in the system and
+ /// routes them to subscribers.
+ // TODO(fxb/48512): instead of using a global registry integrate more with the hooks model.
+ event_registry: Arc<EventRegistry>,
+
+ /// The component model, needed to route events.
+ model: Weak<Model>,
+}
+
+impl EventSourceFactory {
+ /// Creates a new event source factory.
+ pub fn new(model: Weak<Model>) -> Self {
+ Self {
+ event_source_registry: Mutex::new(HashMap::new()),
+ event_registry: Arc::new(EventRegistry::new()),
+ model,
+ }
+ }
+
+ /// Creates the subscription to the required events.
+ /// `CapabilityReady` used to track events and associate them with the component that needs them
+ /// as well as the scoped that will be allowed. Also the EventSource protocol capability.
+ pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
+ let mut hooks = self.event_registry.hooks();
+ hooks.append(&mut vec![
+ // This hook provides the EventSource capability to components in the tree
+ HooksRegistration::new(
+ "EventSourceFactory",
+ vec![EventType::CapabilityRouted, EventType::Destroyed, EventType::Resolved],
+ Arc::downgrade(self) as Weak<dyn Hook>,
+ ),
+ ]);
+ hooks
+ }
+
+ /// Creates a debug event source.
+ pub async fn create_for_debug(&self) -> Result<EventSource, ModelError> {
+ EventSource::new_for_debug(
+ self.model.clone(),
+ AbsoluteMoniker::root(),
+ &self.event_registry,
+ )
+ .await
+ }
+
+ /// Creates a `EventSource` for the given `target_moniker`.
+ pub async fn create(
+ &self,
+ target_moniker: AbsoluteMoniker,
+ sync_mode: SyncMode,
+ ) -> Result<EventSource, ModelError> {
+ EventSource::new(self.model.clone(), target_moniker, &self.event_registry, sync_mode).await
+ }
+
+ /// Returns an EventSource. An EventSource holds an AbsoluteMoniker that
+ /// corresponds to the realm in which it will receive events.
+ async fn on_scoped_framework_capability_routed_async(
+ self: Arc<Self>,
+ capability_decl: &FrameworkCapability,
+ target_moniker: AbsoluteMoniker,
+ _scope_moniker: AbsoluteMoniker,
+ capability: Option<Box<dyn CapabilityProvider>>,
+ ) -> Result<Option<Box<dyn CapabilityProvider>>, ModelError> {
+ match (capability, capability_decl) {
+ (None, FrameworkCapability::Protocol(source_path))
+ if *source_path == *EVENT_SOURCE_SERVICE_PATH
+ || *source_path == *EVENT_SOURCE_SYNC_SERVICE_PATH =>
+ {
+ let event_source_registry = self.event_source_registry.lock().await;
+ if let Some(event_source) = event_source_registry.get(&target_moniker) {
+ Ok(Some(Box::new(event_source.clone()) as Box<dyn CapabilityProvider>))
+ } else {
+ return Err(ModelError::capability_discovery_error(format!(
+ "Unable to find EventSource in registry for {}",
+ target_moniker
+ )));
+ }
+ }
+ (c, _) => return Ok(c),
+ }
+ }
+
+ async fn on_destroyed_async(self: &Arc<Self>, target_moniker: &AbsoluteMoniker) {
+ let mut event_source_registry = self.event_source_registry.lock().await;
+ event_source_registry.remove(&target_moniker);
+ }
+
+ async fn on_resolved_async(
+ self: &Arc<Self>,
+ target_moniker: &AbsoluteMoniker,
+ decl: &ComponentDecl,
+ ) -> Result<(), ModelError> {
+ let sync_mode = if decl.uses_protocol_from_framework(&EVENT_SOURCE_SERVICE_PATH) {
+ SyncMode::Async
+ } else if decl.uses_protocol_from_framework(&EVENT_SOURCE_SYNC_SERVICE_PATH) {
+ SyncMode::Sync
+ } else {
+ return Ok(());
+ };
+ let key = target_moniker.clone();
+ let mut event_source_registry = self.event_source_registry.lock().await;
+ // It is currently assumed that a component instance's declaration
+ // is resolved only once. Someday, this may no longer be true if individual
+ // components can be updated.
+ assert!(!event_source_registry.contains_key(&key));
+ // An EventSource is created on resolution in order to ensure that discovery
+ // and resolution of children is not missed.
+ let event_source = self.create(key.clone(), sync_mode).await?;
+ event_source_registry.insert(key, event_source);
+ Ok(())
+ }
+
+ #[cfg(test)]
+ async fn has_event_source(&self, abs_moniker: &AbsoluteMoniker) -> bool {
+ let event_source_registry = self.event_source_registry.lock().await;
+ event_source_registry.contains_key(abs_moniker)
+ }
+}
+
+#[async_trait]
+impl Hook for EventSourceFactory {
+ async fn on(self: Arc<Self>, event: &Event) -> Result<(), ModelError> {
+ match &event.payload {
+ EventPayload::CapabilityRouted {
+ source:
+ CapabilitySource::Framework { capability, scope_moniker: Some(scope_moniker) },
+ capability_provider,
+ } => {
+ let mut capability_provider = capability_provider.lock().await;
+ *capability_provider = self
+ .on_scoped_framework_capability_routed_async(
+ &capability,
+ event.target_moniker.clone(),
+ scope_moniker.clone(),
+ capability_provider.take(),
+ )
+ .await?;
+ }
+ EventPayload::Destroyed => {
+ self.on_destroyed_async(&event.target_moniker).await;
+ }
+ EventPayload::Resolved { decl } => {
+ self.on_resolved_async(&event.target_moniker, decl).await?;
+ }
+ _ => {}
+ }
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use {
+ super::*,
+ crate::model::{
+ hooks::Hooks, model::ModelParams, resolver::ResolverRegistry,
+ testing::test_helpers::ComponentDeclBuilder,
+ },
+ cm_rust::{UseDecl, UseProtocolDecl, UseSource},
+ };
+
+ async fn dispatch_resolved_event(
+ hooks: &Hooks,
+ target_moniker: &AbsoluteMoniker,
+ ) -> Result<(), ModelError> {
+ let decl = ComponentDeclBuilder::new()
+ .use_(UseDecl::Protocol(UseProtocolDecl {
+ source: UseSource::Framework,
+ source_path: (*EVENT_SOURCE_SYNC_SERVICE_PATH).clone(),
+ target_path: (*EVENT_SOURCE_SYNC_SERVICE_PATH).clone(),
+ }))
+ .build();
+ let event =
+ Event::new(target_moniker.clone(), EventPayload::Resolved { decl: decl.clone() });
+ hooks.dispatch(&event).await
+ }
+
+ async fn dispatch_destroyed_event(
+ hooks: &Hooks,
+ target_moniker: &AbsoluteMoniker,
+ ) -> Result<(), ModelError> {
+ let event = Event::new(target_moniker.clone(), EventPayload::Destroyed);
+ hooks.dispatch(&event).await
+ }
+
+ #[fuchsia_async::run_singlethreaded(test)]
+ async fn drop_event_source_when_component_destroyed() {
+ let model = {
+ let registry = ResolverRegistry::new();
+ Arc::new(Model::new(ModelParams {
+ root_component_url: "test:///root".to_string(),
+ root_resolver_registry: registry,
+ }))
+ };
+ let event_source_factory = Arc::new(EventSourceFactory::new(Arc::downgrade(&model)));
+
+ let hooks = Hooks::new(None);
+ hooks.install(event_source_factory.hooks()).await;
+
+ let root = AbsoluteMoniker::root();
+
+ // Verify that there is no EventSource for the root until we dispatch the Resolved event.
+ assert!(!event_source_factory.has_event_source(&root).await);
+ dispatch_resolved_event(&hooks, &root).await.unwrap();
+ assert!(event_source_factory.has_event_source(&root).await);
+ // Verify that destroying the component destroys the EventSource.
+ dispatch_destroyed_event(&hooks, &root).await.unwrap();
+ assert!(!event_source_factory.has_event_source(&root).await);
+ }
+}
diff --git a/src/sys/component_manager/src/model/events/stream.rs b/src/sys/component_manager/src/model/events/stream.rs
new file mode 100644
index 0000000..7ea6ef5
--- /dev/null
+++ b/src/sys/component_manager/src/model/events/stream.rs
@@ -0,0 +1,74 @@
+// Copyright 2020 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+ crate::model::{
+ events::{
+ dispatcher::EventDispatcher,
+ event::{Event, SyncMode},
+ },
+ hooks::EventType,
+ moniker::AbsoluteMoniker,
+ },
+ fuchsia_trace as trace,
+ futures::{channel::mpsc, StreamExt},
+ std::{
+ collections::HashSet,
+ sync::{Arc, Weak},
+ },
+};
+
+pub struct EventStream {
+ /// The receiving end of a channel of Events.
+ rx: mpsc::Receiver<Event>,
+ /// The sending end of a channel of Events.
+ tx: mpsc::Sender<Event>,
+ /// A vector of EventDispatchers to this EventStream.
+ /// EventStream assumes ownership of the dispatchers. They are
+ /// destroyed when this EventStream is destroyed.
+ dispatchers: Vec<Arc<EventDispatcher>>,
+}
+
+impl EventStream {
+ pub fn new() -> Self {
+ let (tx, rx) = mpsc::channel(2);
+ Self { rx, tx, dispatchers: vec![] }
+ }
+
+ pub fn create_dispatcher(
+ &mut self,
+ sync_mode: SyncMode,
+ scope_monikers: HashSet<AbsoluteMoniker>,
+ ) -> Weak<EventDispatcher> {
+ let dispatcher =
+ Arc::new(EventDispatcher::new(sync_mode.clone(), scope_monikers, self.tx.clone()));
+ self.dispatchers.push(dispatcher.clone());
+ Arc::downgrade(&dispatcher)
+ }
+
+ /// Receives the next event from the sender.
+ pub async fn next(&mut self) -> Option<Event> {
+ trace::duration!("component_manager", "events:next");
+ self.rx.next().await
+ }
+
+ /// Waits for an event with a particular EventType against a component with a
+ /// particular moniker. Ignores all other events.
+ pub async fn wait_until(
+ &mut self,
+ expected_event_type: EventType,
+ expected_moniker: AbsoluteMoniker,
+ ) -> Option<Event> {
+ while let Some(event) = self.next().await {
+ let actual_event_type = event.event.payload.type_();
+ if expected_moniker == event.event.target_moniker
+ && expected_event_type == actual_event_type
+ {
+ return Some(event);
+ }
+ event.resume();
+ }
+ None
+ }
+}
diff --git a/src/sys/component_manager/src/model/testing/routing_test_helpers.rs b/src/sys/component_manager/src/model/testing/routing_test_helpers.rs
index c04ebaf..f1421ce 100644
--- a/src/sys/component_manager/src/model/testing/routing_test_helpers.rs
+++ b/src/sys/component_manager/src/model/testing/routing_test_helpers.rs
@@ -639,7 +639,7 @@
/// Contains functions to use capabilities in routing tests.
pub mod capability_util {
use {
- super::*, crate::model::events::core::EVENT_SOURCE_SYNC_SERVICE_PATH,
+ super::*, crate::model::events::source_factory::EVENT_SOURCE_SYNC_SERVICE_PATH,
cm_rust::NativeIntoFidl, fidl::endpoints::ServiceMarker,
fidl_fuchsia_sys2::BlockingEventSourceMarker, std::path::PathBuf,
};
diff --git a/src/sys/component_manager/src/model/tests/routing.rs b/src/sys/component_manager/src/model/tests/routing.rs
index 6e5b0ef..24dbdba 100644
--- a/src/sys/component_manager/src/model/tests/routing.rs
+++ b/src/sys/component_manager/src/model/tests/routing.rs
@@ -8,7 +8,7 @@
framework::REALM_SERVICE,
model::{
error::ModelError,
- events::core::EVENT_SOURCE_SYNC_SERVICE_PATH,
+ events::source_factory::EVENT_SOURCE_SYNC_SERVICE_PATH,
hooks::{Event, EventPayload, EventType, Hook, HooksRegistration},
moniker::AbsoluteMoniker,
rights, routing,