[components] Split up events code into file per class.

To improve readability and discoverability, this CL splits events "classes"
into multiple files in the model/events/ directory.

Change-Id: I29cb51819a351dcad17007f8252dc1c65ec9d761
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/377909
Commit-Queue: Fady Samuel <fsamuel@google.com>
Reviewed-by: Miguel Flores <miguelfrde@google.com>
Testability-Review: Miguel Flores <miguelfrde@google.com>
diff --git a/src/sys/component_manager/src/builtin_environment.rs b/src/sys/component_manager/src/builtin_environment.rs
index bd3ffbb..920296c 100644
--- a/src/sys/component_manager/src/builtin_environment.rs
+++ b/src/sys/component_manager/src/builtin_environment.rs
@@ -19,7 +19,7 @@
         model::{
             binding::Binder,
             error::ModelError,
-            events::core::EventSourceFactory,
+            events::source_factory::EventSourceFactory,
             hub::Hub,
             model::{ComponentManagerConfig, Model},
             runner::Runner,
diff --git a/src/sys/component_manager/src/framework.rs b/src/sys/component_manager/src/framework.rs
index 015ef32..6290ac3 100644
--- a/src/sys/component_manager/src/framework.rs
+++ b/src/sys/component_manager/src/framework.rs
@@ -336,7 +336,7 @@
             builtin_environment::BuiltinEnvironment,
             model::{
                 binding::Binder,
-                events::{core::EventSource, registry::EventStream},
+                events::{source::EventSource, stream::EventStream},
                 model::ModelParams,
                 moniker::AbsoluteMoniker,
                 resolver::ResolverRegistry,
diff --git a/src/sys/component_manager/src/model/events/core.rs b/src/sys/component_manager/src/model/events/core.rs
deleted file mode 100644
index 25bef08..0000000
--- a/src/sys/component_manager/src/model/events/core.rs
+++ /dev/null
@@ -1,487 +0,0 @@
-// Copyright 2020 The Fuchsia Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-use {
-    crate::{
-        capability::{CapabilityProvider, CapabilitySource, FrameworkCapability},
-        model::{
-            error::ModelError,
-            events::{
-                registry::{EventRegistry, EventStream, RoutedEvent, SyncMode},
-                serve::serve_event_source_sync,
-            },
-            hooks::{Event, EventPayload, EventType, Hook, HooksRegistration},
-            model::Model,
-            moniker::AbsoluteMoniker,
-            realm::Realm,
-            routing,
-        },
-    },
-    async_trait::async_trait,
-    cm_rust::{CapabilityName, CapabilityPath, ComponentDecl, UseDecl, UseEventDecl},
-    fidl::endpoints::ServerEnd,
-    fidl_fuchsia_sys2 as fsys, fuchsia_async as fasync, fuchsia_zircon as zx,
-    futures::lock::Mutex,
-    lazy_static::lazy_static,
-    log::*,
-    maplit::hashset,
-    std::{
-        collections::{HashMap, HashSet},
-        convert::TryInto,
-        path::PathBuf,
-        sync::{Arc, Weak},
-    },
-    thiserror::Error,
-};
-
-lazy_static! {
-    pub static ref EVENT_SOURCE_SERVICE_PATH: CapabilityPath =
-        "/svc/fuchsia.sys2.EventSource".try_into().unwrap();
-    pub static ref EVENT_SOURCE_SYNC_SERVICE_PATH: CapabilityPath =
-        "/svc/fuchsia.sys2.BlockingEventSource".try_into().unwrap();
-}
-
-/// Allows to create `EventSource`s and tracks all the created ones.
-pub struct EventSourceFactory {
-    /// Tracks the event source used by each component ideantified with the given `moniker`.
-    event_source_registry: Mutex<HashMap<AbsoluteMoniker, EventSource>>,
-
-    /// The event registry. It subscribes to all events happening in the system and
-    /// routes them to subscribers.
-    // TODO(fxb/48512): instead of using a global registry integrate more with the hooks model.
-    event_registry: Arc<EventRegistry>,
-
-    /// The component model, needed to route events.
-    model: Weak<Model>,
-}
-
-impl EventSourceFactory {
-    /// Creates a new event source factory.
-    pub fn new(model: Weak<Model>) -> Self {
-        Self {
-            event_source_registry: Mutex::new(HashMap::new()),
-            event_registry: Arc::new(EventRegistry::new()),
-            model,
-        }
-    }
-
-    /// Creates the subscription to the required events.
-    /// `CapabilityReady` used to track events and associate them with the component that needs them
-    /// as well as the scoped that will be allowed. Also the EventSource protocol capability.
-    pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
-        let mut hooks = self.event_registry.hooks();
-        hooks.append(&mut vec![
-            // This hook provides the EventSource capability to components in the tree
-            HooksRegistration::new(
-                "EventSourceFactory",
-                vec![EventType::CapabilityRouted, EventType::Destroyed, EventType::Resolved],
-                Arc::downgrade(self) as Weak<dyn Hook>,
-            ),
-        ]);
-        hooks
-    }
-
-    /// Creates a debug event source.
-    pub async fn create_for_debug(&self) -> Result<EventSource, ModelError> {
-        EventSource::new_for_debug(
-            self.model.clone(),
-            AbsoluteMoniker::root(),
-            &self.event_registry,
-        )
-        .await
-    }
-
-    /// Creates a `EventSource` for the given `target_moniker`.
-    pub async fn create(
-        &self,
-        target_moniker: AbsoluteMoniker,
-        sync_mode: SyncMode,
-    ) -> Result<EventSource, ModelError> {
-        EventSource::new(self.model.clone(), target_moniker, &self.event_registry, sync_mode).await
-    }
-
-    /// Returns an EventSource. An EventSource holds an AbsoluteMoniker that
-    /// corresponds to the realm in which it will receive events.
-    async fn on_scoped_framework_capability_routed_async(
-        self: Arc<Self>,
-        capability_decl: &FrameworkCapability,
-        target_moniker: AbsoluteMoniker,
-        _scope_moniker: AbsoluteMoniker,
-        capability: Option<Box<dyn CapabilityProvider>>,
-    ) -> Result<Option<Box<dyn CapabilityProvider>>, ModelError> {
-        match (capability, capability_decl) {
-            (None, FrameworkCapability::Protocol(source_path))
-                if *source_path == *EVENT_SOURCE_SERVICE_PATH
-                    || *source_path == *EVENT_SOURCE_SYNC_SERVICE_PATH =>
-            {
-                let event_source_registry = self.event_source_registry.lock().await;
-                if let Some(event_source) = event_source_registry.get(&target_moniker) {
-                    Ok(Some(Box::new(event_source.clone()) as Box<dyn CapabilityProvider>))
-                } else {
-                    return Err(ModelError::capability_discovery_error(format!(
-                        "Unable to find EventSource in registry for {}",
-                        target_moniker
-                    )));
-                }
-            }
-            (c, _) => return Ok(c),
-        }
-    }
-
-    async fn on_destroyed_async(self: &Arc<Self>, target_moniker: &AbsoluteMoniker) {
-        let mut event_source_registry = self.event_source_registry.lock().await;
-        event_source_registry.remove(&target_moniker);
-    }
-
-    async fn on_resolved_async(
-        self: &Arc<Self>,
-        target_moniker: &AbsoluteMoniker,
-        decl: &ComponentDecl,
-    ) -> Result<(), ModelError> {
-        let sync_mode = if decl.uses_protocol_from_framework(&EVENT_SOURCE_SERVICE_PATH) {
-            SyncMode::Async
-        } else if decl.uses_protocol_from_framework(&EVENT_SOURCE_SYNC_SERVICE_PATH) {
-            SyncMode::Sync
-        } else {
-            return Ok(());
-        };
-        let key = target_moniker.clone();
-        let mut event_source_registry = self.event_source_registry.lock().await;
-        // It is currently assumed that a component instance's declaration
-        // is resolved only once. Someday, this may no longer be true if individual
-        // components can be updated.
-        assert!(!event_source_registry.contains_key(&key));
-        // An EventSource is created on resolution in order to ensure that discovery
-        // and resolution of children is not missed.
-        let event_source = self.create(key.clone(), sync_mode).await?;
-        event_source_registry.insert(key, event_source);
-        Ok(())
-    }
-
-    #[cfg(test)]
-    async fn has_event_source(&self, abs_moniker: &AbsoluteMoniker) -> bool {
-        let event_source_registry = self.event_source_registry.lock().await;
-        event_source_registry.contains_key(abs_moniker)
-    }
-}
-
-#[async_trait]
-impl Hook for EventSourceFactory {
-    async fn on(self: Arc<Self>, event: &Event) -> Result<(), ModelError> {
-        match &event.payload {
-            EventPayload::CapabilityRouted {
-                source:
-                    CapabilitySource::Framework { capability, scope_moniker: Some(scope_moniker) },
-                capability_provider,
-            } => {
-                let mut capability_provider = capability_provider.lock().await;
-                *capability_provider = self
-                    .on_scoped_framework_capability_routed_async(
-                        &capability,
-                        event.target_moniker.clone(),
-                        scope_moniker.clone(),
-                        capability_provider.take(),
-                    )
-                    .await?;
-            }
-            EventPayload::Destroyed => {
-                self.on_destroyed_async(&event.target_moniker).await;
-            }
-            EventPayload::Resolved { decl } => {
-                self.on_resolved_async(&event.target_moniker, decl).await?;
-            }
-            _ => {}
-        }
-        Ok(())
-    }
-}
-
-struct RouteEventsResult {
-    /// Maps from source name to a set of scope monikers
-    mapping: HashMap<CapabilityName, HashSet<AbsoluteMoniker>>,
-}
-
-impl RouteEventsResult {
-    fn new() -> Self {
-        Self { mapping: HashMap::new() }
-    }
-
-    fn insert(&mut self, source_name: CapabilityName, scope_moniker: AbsoluteMoniker) {
-        self.mapping.entry(source_name).or_insert(HashSet::new()).insert(scope_moniker);
-    }
-
-    fn len(&self) -> usize {
-        self.mapping.len()
-    }
-
-    fn contains_event(&self, event_name: &CapabilityName) -> bool {
-        self.mapping.contains_key(event_name)
-    }
-
-    fn to_vec(self) -> Vec<RoutedEvent> {
-        self.mapping
-            .into_iter()
-            .map(|(source_name, scope_monikers)| RoutedEvent { source_name, scope_monikers })
-            .collect()
-    }
-}
-
-/// A system responsible for implementing basic events functionality on a scoped realm.
-#[derive(Clone)]
-pub struct EventSource {
-    /// The component model, needed to route events.
-    model: Weak<Model>,
-
-    /// The moniker identifying the realm that requested this event source
-    target_moniker: AbsoluteMoniker,
-
-    /// A shared reference to the event registry used to subscribe and dispatche events.
-    registry: Weak<EventRegistry>,
-
-    /// Used for `BlockingEventSource.StartComponentTree`.
-    // TODO(fxb/48245): this shouldn't be done for any EventSource. Only for tests.
-    resolve_instance_event_stream: Arc<Mutex<Option<EventStream>>>,
-
-    /// Whether or not this is a debug instance.
-    debug: bool,
-
-    /// Whether or not this EventSource dispatches events asynchronously.
-    sync_mode: SyncMode,
-}
-
-#[derive(Debug, Error)]
-pub enum EventsError {
-    #[error("Registry not found")]
-    RegistryNotFound,
-
-    #[error("Events not allowed for subscription {:?}", names)]
-    NotAvailable { names: Vec<CapabilityName> },
-
-    // TODO(fxb/48720): use dedicated RoutingError type.
-    #[error("Routing failed")]
-    RoutingFailed(#[source] ModelError),
-}
-
-impl EventSource {
-    /// Creates a new `EventSource` that will be used by the component identified with the given
-    /// `target_moniker`.
-    pub async fn new(
-        model: Weak<Model>,
-        target_moniker: AbsoluteMoniker,
-        registry: &Arc<EventRegistry>,
-        sync_mode: SyncMode,
-    ) -> Result<Self, ModelError> {
-        // TODO(fxb/48245): this shouldn't be done for any EventSource. Only for tests.
-        let resolve_instance_event_stream = Arc::new(Mutex::new(if sync_mode == SyncMode::Async {
-            None
-        } else {
-            Some(
-                registry
-                    .subscribe(
-                        &sync_mode,
-                        vec![RoutedEvent {
-                            source_name: EventType::Resolved.into(),
-                            scope_monikers: hashset!(target_moniker.clone()),
-                        }],
-                    )
-                    .await,
-            )
-        }));
-        Ok(Self {
-            registry: Arc::downgrade(&registry),
-            model,
-            target_moniker,
-            resolve_instance_event_stream,
-            debug: false,
-            sync_mode,
-        })
-    }
-
-    async fn new_for_debug(
-        model: Weak<Model>,
-        target_moniker: AbsoluteMoniker,
-        registry: &Arc<EventRegistry>,
-    ) -> Result<Self, ModelError> {
-        let mut event_source = Self::new(model, target_moniker, registry, SyncMode::Sync).await?;
-        event_source.debug = true;
-        Ok(event_source)
-    }
-
-    /// Drops the `Resolved` event stream, thereby permitting components within the
-    /// realm to be started.
-    pub async fn start_component_tree(&mut self) {
-        let mut resolve_instance_event_stream = self.resolve_instance_event_stream.lock().await;
-        *resolve_instance_event_stream = None;
-    }
-
-    /// Subscribes to events provided in the `events` vector.
-    ///
-    /// The client might request to subscribe to events that it's not allowed to see. Events
-    /// that are allowed should have been defined in its manifest and either offered to it or
-    /// requested from the current realm.
-    pub async fn subscribe(
-        &mut self,
-        events: Vec<CapabilityName>,
-    ) -> Result<EventStream, EventsError> {
-        // Register event capabilities if any. It identifies the sources of these events (might be the
-        // containing realm or this realm itself). It consturcts an "allow-list tree" of events and
-        // realms.
-        let events = if self.debug {
-            events
-                .into_iter()
-                .map(|event| RoutedEvent {
-                    source_name: event.clone(),
-                    scope_monikers: hashset!(AbsoluteMoniker::root()),
-                })
-                .collect()
-        } else {
-            let route_result =
-                self.route_events(&events).await.map_err(|e| EventsError::RoutingFailed(e))?;
-            if route_result.len() != events.len() {
-                let names = events
-                    .into_iter()
-                    .filter(|event| !route_result.contains_event(&event))
-                    .collect();
-                return Err(EventsError::NotAvailable { names });
-            }
-            route_result.to_vec()
-        };
-
-        // Create an event stream for the given events
-        if let Some(registry) = self.registry.upgrade() {
-            return Ok(registry.subscribe(&self.sync_mode, events).await);
-        }
-        Err(EventsError::RegistryNotFound)
-    }
-
-    /// Serves a `EventSource` FIDL protocol.
-    pub fn serve(self, stream: fsys::BlockingEventSourceRequestStream) {
-        fasync::spawn(async move {
-            serve_event_source_sync(self, stream).await;
-        });
-    }
-
-    async fn route_events(
-        &self,
-        events: &Vec<CapabilityName>,
-    ) -> Result<RouteEventsResult, ModelError> {
-        let model = self.model.upgrade().ok_or(ModelError::ModelNotAvailable)?;
-        let realm = model.look_up_realm(&self.target_moniker).await?;
-        let decl = {
-            let state = realm.lock_state().await;
-            state.as_ref().expect("route_events: not registered").decl().clone()
-        };
-
-        let mut result = RouteEventsResult::new();
-        for use_decl in decl.uses {
-            match &use_decl {
-                UseDecl::Event(event_decl) => {
-                    if !events.contains(&event_decl.target_name) {
-                        continue;
-                    }
-                    let (source_name, scope_moniker) = self.route_event(event_decl, &realm).await?;
-                    result.insert(source_name, scope_moniker);
-                }
-                _ => {}
-            }
-        }
-
-        Ok(result)
-    }
-
-    /// Routes an event and returns its source name and scope on success.
-    async fn route_event(
-        &self,
-        event_decl: &UseEventDecl,
-        realm: &Arc<Realm>,
-    ) -> Result<(CapabilityName, AbsoluteMoniker), ModelError> {
-        routing::route_use_event_capability(&UseDecl::Event(event_decl.clone()), &realm).await.map(
-            |source| match source {
-                CapabilitySource::Framework {
-                    capability: FrameworkCapability::Event(source_name),
-                    scope_moniker: Some(scope_moniker),
-                } => (source_name, scope_moniker),
-                _ => unreachable!(),
-            },
-        )
-    }
-}
-
-#[async_trait]
-impl CapabilityProvider for EventSource {
-    async fn open(
-        self: Box<Self>,
-        _flags: u32,
-        _open_mode: u32,
-        _relative_path: PathBuf,
-        server_end: zx::Channel,
-    ) -> Result<(), ModelError> {
-        let stream = ServerEnd::<fsys::BlockingEventSourceMarker>::new(server_end)
-            .into_stream()
-            .expect("could not convert channel into stream");
-        self.serve(stream);
-        Ok(())
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use {
-        super::*,
-        crate::model::{
-            hooks::Hooks, model::ModelParams, resolver::ResolverRegistry,
-            testing::test_helpers::ComponentDeclBuilder,
-        },
-        cm_rust::{UseProtocolDecl, UseSource},
-    };
-
-    async fn dispatch_resolved_event(
-        hooks: &Hooks,
-        target_moniker: &AbsoluteMoniker,
-    ) -> Result<(), ModelError> {
-        let decl = ComponentDeclBuilder::new()
-            .use_(UseDecl::Protocol(UseProtocolDecl {
-                source: UseSource::Framework,
-                source_path: (*EVENT_SOURCE_SYNC_SERVICE_PATH).clone(),
-                target_path: (*EVENT_SOURCE_SYNC_SERVICE_PATH).clone(),
-            }))
-            .build();
-        let event =
-            Event::new(target_moniker.clone(), EventPayload::Resolved { decl: decl.clone() });
-        hooks.dispatch(&event).await
-    }
-
-    async fn dispatch_destroyed_event(
-        hooks: &Hooks,
-        target_moniker: &AbsoluteMoniker,
-    ) -> Result<(), ModelError> {
-        let event = Event::new(target_moniker.clone(), EventPayload::Destroyed);
-        hooks.dispatch(&event).await
-    }
-
-    #[fuchsia_async::run_singlethreaded(test)]
-    async fn drop_event_source_when_component_destroyed() {
-        let model = {
-            let registry = ResolverRegistry::new();
-            Arc::new(Model::new(ModelParams {
-                root_component_url: "test:///root".to_string(),
-                root_resolver_registry: registry,
-            }))
-        };
-        let event_source_factory = Arc::new(EventSourceFactory::new(Arc::downgrade(&model)));
-
-        let hooks = Hooks::new(None);
-        hooks.install(event_source_factory.hooks()).await;
-
-        let root = AbsoluteMoniker::root();
-
-        // Verify that there is no EventSource for the root until we dispatch the Resolved event.
-        assert!(!event_source_factory.has_event_source(&root).await);
-        dispatch_resolved_event(&hooks, &root).await.unwrap();
-        assert!(event_source_factory.has_event_source(&root).await);
-        // Verify that destroying the component destroys the EventSource.
-        dispatch_destroyed_event(&hooks, &root).await.unwrap();
-        assert!(!event_source_factory.has_event_source(&root).await);
-    }
-}
diff --git a/src/sys/component_manager/src/model/events/dispatcher.rs b/src/sys/component_manager/src/model/events/dispatcher.rs
new file mode 100644
index 0000000..1617f41
--- /dev/null
+++ b/src/sys/component_manager/src/model/events/dispatcher.rs
@@ -0,0 +1,99 @@
+// Copyright 2020 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    crate::model::{
+        events::event::{Event, SyncMode},
+        hooks::Event as ComponentEvent,
+        moniker::AbsoluteMoniker,
+    },
+    anyhow::Error,
+    fuchsia_trace as trace,
+    futures::{
+        channel::{mpsc, oneshot},
+        lock::Mutex,
+        sink::SinkExt,
+    },
+    std::collections::HashSet,
+};
+
+/// EventDispatcher and EventStream are two ends of a channel.
+///
+/// EventDispatcher represents the sending end of the channel.
+///
+/// An EventDispatcher receives events of a particular event type,
+/// and dispatches though events out to the EventStream if they fall within
+/// one of the scopes associated with the dispatcher.
+///
+/// EventDispatchers are owned by EventStreams. If an EventStream is dropped,
+/// all corresponding EventDispatchers are dropped.
+///
+/// An EventStream is owned by the client - usually a test harness or a
+/// EventSource. It receives a Event from an EventDispatcher and propagates it
+/// to the client.
+pub struct EventDispatcher {
+    /// Whether or not this EventDispatcher dispatches events asynchronously.
+    sync_mode: SyncMode,
+    /// Specifies the realms that this EventDispatcher can dispatch events from.
+    scope_monikers: HashSet<AbsoluteMoniker>,
+    /// An `mpsc::Sender` used to dispatch an event. Note that this
+    /// `mpsc::Sender` is wrapped in an Mutex<..> to allow it to be passed along
+    /// to other tasks for dispatch.
+    tx: Mutex<mpsc::Sender<Event>>,
+}
+
+impl EventDispatcher {
+    pub fn new(
+        sync_mode: SyncMode,
+        scope_monikers: HashSet<AbsoluteMoniker>,
+        tx: mpsc::Sender<Event>,
+    ) -> Self {
+        // TODO(fxb/48360): flatten scope_monikers. There might be monikers that are
+        // contained within another moniker in the list.
+        Self { sync_mode, scope_monikers, tx: Mutex::new(tx) }
+    }
+
+    /// Sends the event to an event stream, if fired in the scope of `scope_moniker`. Returns
+    /// a responder which can be blocked on.
+    pub async fn dispatch(
+        &self,
+        event: ComponentEvent,
+    ) -> Result<Option<oneshot::Receiver<()>>, Error> {
+        // TODO(fxb/48360): once flattening of monikers is done, we would expect to have a single
+        // moniker here. For now taking the first one and ignoring the rest.
+        // Ensure that the event is coming from a realm within the scope of this dispatcher.
+        let maybe_scope_moniker = self
+            .scope_monikers
+            .iter()
+            .filter(|moniker| moniker.contains_in_realm(&event.target_moniker))
+            .next();
+        if maybe_scope_moniker.is_none() {
+            return Ok(None);
+        }
+
+        let scope_moniker = maybe_scope_moniker.unwrap().clone();
+
+        trace::duration!("component_manager", "events:send");
+        let event_type = format!("{:?}", event.payload.type_());
+        let target_moniker = event.target_moniker.to_string();
+        trace::flow_begin!(
+            "component_manager",
+            "event",
+            event.id,
+            "event_type" => event_type.as_str(),
+            "target_moniker" => target_moniker.as_str()
+        );
+        let (maybe_responder_tx, maybe_responder_rx) = if self.sync_mode == SyncMode::Async {
+            (None, None)
+        } else {
+            let (responder_tx, responder_rx) = oneshot::channel();
+            (Some(responder_tx), Some(responder_rx))
+        };
+        {
+            let mut tx = self.tx.lock().await;
+            tx.send(Event { event, scope_moniker, responder: maybe_responder_tx }).await?;
+        }
+        Ok(maybe_responder_rx)
+    }
+}
diff --git a/src/sys/component_manager/src/model/events/event.rs b/src/sys/component_manager/src/model/events/event.rs
new file mode 100644
index 0000000..8380b72
--- /dev/null
+++ b/src/sys/component_manager/src/model/events/event.rs
@@ -0,0 +1,51 @@
+// Copyright 2020 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    crate::model::{hooks::Event as ComponentEvent, moniker::AbsoluteMoniker},
+    fuchsia_trace as trace,
+    futures::channel::oneshot,
+};
+
+#[derive(PartialEq, Clone)]
+pub enum SyncMode {
+    Sync,
+    Async,
+}
+
+/// Created for a particular component event.
+/// Contains the Event that occurred along with a means to resume/unblock the component manager.
+#[must_use = "invoke resume() otherwise component manager will be halted indefinitely!"]
+pub struct Event {
+    /// The event itself.
+    pub event: ComponentEvent,
+
+    /// The scope where this event comes from. This can be seen as a superset of the
+    /// `event.target_moniker` itself given that the events might have been offered from an
+    /// ancestor realm.
+    pub scope_moniker: AbsoluteMoniker,
+
+    /// This Sender is used to unblock the component manager if available.
+    /// If a Sender is unspecified then that indicates that this event is asynchronous and
+    /// non-blocking.
+    pub responder: Option<oneshot::Sender<()>>,
+}
+
+impl Event {
+    pub fn sync_mode(&self) -> SyncMode {
+        if self.responder.is_none() {
+            SyncMode::Async
+        } else {
+            SyncMode::Sync
+        }
+    }
+
+    pub fn resume(self) {
+        trace::duration!("component_manager", "events:resume");
+        trace::flow_step!("component_manager", "event", self.event.id);
+        if let Some(responder) = self.responder {
+            responder.send(()).unwrap()
+        }
+    }
+}
diff --git a/src/sys/component_manager/src/model/events/mod.rs b/src/sys/component_manager/src/model/events/mod.rs
index a9af5f5..d8f52c0 100644
--- a/src/sys/component_manager/src/model/events/mod.rs
+++ b/src/sys/component_manager/src/model/events/mod.rs
@@ -2,6 +2,10 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-pub mod core;
+pub mod dispatcher;
+pub mod event;
 pub mod registry;
 pub(crate) mod serve;
+pub mod source;
+pub mod source_factory;
+pub mod stream;
diff --git a/src/sys/component_manager/src/model/events/registry.rs b/src/sys/component_manager/src/model/events/registry.rs
index b3d834d..6574258 100644
--- a/src/sys/component_manager/src/model/events/registry.rs
+++ b/src/sys/component_manager/src/model/events/registry.rs
@@ -5,185 +5,20 @@
 use {
     crate::model::{
         error::ModelError,
+        events::{dispatcher::EventDispatcher, event::SyncMode, stream::EventStream},
         hooks::{Event as ComponentEvent, EventType, Hook, HooksRegistration},
         moniker::AbsoluteMoniker,
     },
-    anyhow::Error,
     async_trait::async_trait,
     cm_rust::CapabilityName,
     fuchsia_trace as trace,
-    futures::{channel::*, lock::Mutex, sink::SinkExt, StreamExt},
+    futures::lock::Mutex,
     std::{
         collections::{HashMap, HashSet},
         sync::{Arc, Weak},
     },
 };
 
-#[derive(PartialEq, Clone)]
-pub enum SyncMode {
-    Sync,
-    Async,
-}
-
-/// Created for a particular component event.
-/// Contains the Event that occurred along with a means to resume/unblock the component manager.
-#[must_use = "invoke resume() otherwise component manager will be halted indefinitely!"]
-pub struct Event {
-    /// The event itself.
-    pub event: ComponentEvent,
-
-    /// The scope where this event comes from. This can be seen as a superset of the
-    /// `event.target_moniker` itself given that the events might have been offered from an
-    /// ancestor realm.
-    pub scope_moniker: AbsoluteMoniker,
-
-    /// This Sender is used to unblock the component manager if available.
-    /// If a Sender is unspecified then that indicates that this event is asynchronous and
-    /// non-blocking.
-    responder: Option<oneshot::Sender<()>>,
-}
-
-impl Event {
-    pub fn sync_mode(&self) -> SyncMode {
-        if self.responder.is_none() {
-            SyncMode::Async
-        } else {
-            SyncMode::Sync
-        }
-    }
-
-    pub fn resume(self) {
-        trace::duration!("component_manager", "events:resume");
-        trace::flow_step!("component_manager", "event", self.event.id);
-        if let Some(responder) = self.responder {
-            responder.send(()).unwrap()
-        }
-    }
-}
-
-/// EventDispatcher and EventStream are two ends of a channel.
-///
-/// An EventDispatcher is owned by the EventRegistry. It sends an
-/// Event to the EventStream.
-///
-/// An EventStream is owned by the client - usually a test harness or a
-/// EventSource. It receives a Event from an EventDispatcher and propagates it
-/// to the client.
-#[derive(Clone)]
-pub struct EventDispatcher {
-    /// Whether or not this EventDispatcher dispatches events asynchronously.
-    sync_mode: SyncMode,
-    /// Specifies the realms that this EventDispatcher can dispatch events from.
-    scope_monikers: HashSet<AbsoluteMoniker>,
-    /// An `mpsc::Sender` used to dispatch an event. Note that this
-    /// `mpsc::Sender` is wrapped in an Arc<Mutex<..>> to allow it to be cloneable
-    /// and passed along to other tasks for dispatch.
-    tx: Arc<Mutex<mpsc::Sender<Event>>>,
-}
-
-impl EventDispatcher {
-    /// Creates a new event dispatcher. This dispatcher will only dispatch events that arrive from
-    /// the given scopes and dispatch them under the given name for that scope.
-    fn new(
-        sync_mode: SyncMode,
-        scope_monikers: HashSet<AbsoluteMoniker>,
-        tx: mpsc::Sender<Event>,
-    ) -> Self {
-        // TODO(fxb/48360): flatten scope_monikers. There might be monikers that are
-        // contained within another moniker in the list.
-        Self { sync_mode, scope_monikers, tx: Arc::new(Mutex::new(tx)) }
-    }
-
-    /// Sends the event to an event stream, if fired in the scope of `scope_moniker`. Returns
-    /// a responder which can be blocked on.
-    async fn send(&self, event: ComponentEvent) -> Result<Option<oneshot::Receiver<()>>, Error> {
-        // TODO(fxb/48360): once flattening of monikers is done, we would expect to have a single
-        // moniker here. For now taking the first one and ignoring the rest.
-        // Ensure that the event is coming from a realm within the scope of this dispatcher.
-        let maybe_scope_moniker = self
-            .scope_monikers
-            .iter()
-            .filter(|moniker| moniker.contains_in_realm(&event.target_moniker))
-            .next();
-        if maybe_scope_moniker.is_none() {
-            return Ok(None);
-        }
-
-        let scope_moniker = maybe_scope_moniker.unwrap().clone();
-
-        trace::duration!("component_manager", "events:send");
-        let event_type = format!("{:?}", event.payload.type_());
-        let target_moniker = event.target_moniker.to_string();
-        trace::flow_begin!(
-            "component_manager",
-            "event",
-            event.id,
-            "event_type" => event_type.as_str(),
-            "target_moniker" => target_moniker.as_str()
-        );
-        let (maybe_responder_tx, maybe_responder_rx) = if self.sync_mode == SyncMode::Async {
-            (None, None)
-        } else {
-            let (responder_tx, responder_rx) = oneshot::channel();
-            (Some(responder_tx), Some(responder_rx))
-        };
-        {
-            let mut tx = self.tx.lock().await;
-            tx.send(Event { event, scope_moniker, responder: maybe_responder_tx }).await?;
-        }
-        Ok(maybe_responder_rx)
-    }
-}
-
-pub struct EventStream {
-    rx: mpsc::Receiver<Event>,
-    tx: mpsc::Sender<Event>,
-    dispatchers: Vec<Arc<EventDispatcher>>,
-}
-
-impl EventStream {
-    fn new() -> Self {
-        let (tx, rx) = mpsc::channel(2);
-        Self { rx, tx, dispatchers: vec![] }
-    }
-
-    fn create_dispatcher(
-        &mut self,
-        sync_mode: SyncMode,
-        scope_monikers: HashSet<AbsoluteMoniker>,
-    ) -> Weak<EventDispatcher> {
-        let dispatcher =
-            Arc::new(EventDispatcher::new(sync_mode.clone(), scope_monikers, self.tx.clone()));
-        self.dispatchers.push(dispatcher.clone());
-        Arc::downgrade(&dispatcher)
-    }
-
-    /// Receives the next event from the sender.
-    pub async fn next(&mut self) -> Option<Event> {
-        trace::duration!("component_manager", "events:next");
-        self.rx.next().await
-    }
-
-    /// Waits for an event with a particular EventType against a component with a
-    /// particular moniker. Ignores all other events.
-    pub async fn wait_until(
-        &mut self,
-        expected_event_type: EventType,
-        expected_moniker: AbsoluteMoniker,
-    ) -> Option<Event> {
-        while let Some(event) = self.next().await {
-            let actual_event_type = event.event.payload.type_();
-            if expected_moniker == event.event.target_moniker
-                && expected_event_type == actual_event_type
-            {
-                return Some(event);
-            }
-            event.resume();
-        }
-        None
-    }
-}
-
 pub struct RoutedEvent {
     pub source_name: CapabilityName,
     pub scope_monikers: HashSet<AbsoluteMoniker>,
@@ -232,8 +67,8 @@
         // Copy the senders so we don't hold onto the sender map lock
         // If we didn't do this, it is possible to deadlock while holding onto this lock.
         // For example,
-        // Task A : call send(event1) -> lock on sender map -> send -> wait for responders
-        // Task B : call send(event2) -> lock on sender map
+        // Task A : call dispatch(event1) -> lock on sender map -> send -> wait for responders
+        // Task B : call dispatch(event2) -> lock on sender map
         // If task B was required to respond to event1, then this is a deadlock.
         // Neither task can make progress.
         let dispatchers = {
@@ -257,7 +92,7 @@
 
         let mut responder_channels = vec![];
         for dispatcher in dispatchers {
-            let result = dispatcher.send(event.clone()).await;
+            let result = dispatcher.dispatch(event.clone()).await;
             match result {
                 Ok(Some(responder_channel)) => {
                     // A future can be canceled if the EventStream was dropped after
diff --git a/src/sys/component_manager/src/model/events/serve.rs b/src/sys/component_manager/src/model/events/serve.rs
index e8e8640..5e50c14 100644
--- a/src/sys/component_manager/src/model/events/serve.rs
+++ b/src/sys/component_manager/src/model/events/serve.rs
@@ -7,8 +7,11 @@
         capability::{CapabilityProvider, CapabilitySource, FrameworkCapability},
         model::{
             error::ModelError,
-            events::core::EventSource,
-            events::registry::{Event, EventStream, SyncMode},
+            events::{
+                event::{Event, SyncMode},
+                source::EventSource,
+                stream::EventStream,
+            },
             hooks::EventPayload,
             moniker::{AbsoluteMoniker, RelativeMoniker},
         },
diff --git a/src/sys/component_manager/src/model/events/source.rs b/src/sys/component_manager/src/model/events/source.rs
new file mode 100644
index 0000000..1bdbe89
--- /dev/null
+++ b/src/sys/component_manager/src/model/events/source.rs
@@ -0,0 +1,263 @@
+// Copyright 2020 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    crate::{
+        capability::{CapabilityProvider, CapabilitySource, FrameworkCapability},
+        model::{
+            error::ModelError,
+            events::{
+                event::SyncMode,
+                registry::{EventRegistry, RoutedEvent},
+                serve::serve_event_source_sync,
+                stream::EventStream,
+            },
+            hooks::EventType,
+            model::Model,
+            moniker::AbsoluteMoniker,
+            realm::Realm,
+            routing,
+        },
+    },
+    async_trait::async_trait,
+    cm_rust::{CapabilityName, UseDecl, UseEventDecl},
+    fidl::endpoints::ServerEnd,
+    fidl_fuchsia_sys2 as fsys, fuchsia_async as fasync, fuchsia_zircon as zx,
+    futures::lock::Mutex,
+    maplit::hashset,
+    std::{
+        collections::{HashMap, HashSet},
+        path::PathBuf,
+        sync::{Arc, Weak},
+    },
+    thiserror::Error,
+};
+
+/// A system responsible for implementing basic events functionality on a scoped realm.
+#[derive(Clone)]
+pub struct EventSource {
+    /// The component model, needed to route events.
+    model: Weak<Model>,
+
+    /// The moniker identifying the realm that requested this event source
+    target_moniker: AbsoluteMoniker,
+
+    /// A shared reference to the event registry used to subscribe and dispatche events.
+    registry: Weak<EventRegistry>,
+
+    /// Used for `BlockingEventSource.StartComponentTree`.
+    // TODO(fxb/48245): this shouldn't be done for any EventSource. Only for tests.
+    resolve_instance_event_stream: Arc<Mutex<Option<EventStream>>>,
+
+    /// Whether or not this is a debug instance.
+    debug: bool,
+
+    /// Whether or not this EventSource dispatches events asynchronously.
+    sync_mode: SyncMode,
+}
+
+#[derive(Debug, Error)]
+pub enum EventsError {
+    #[error("Registry not found")]
+    RegistryNotFound,
+
+    #[error("Events not allowed for subscription {:?}", names)]
+    NotAvailable { names: Vec<CapabilityName> },
+
+    // TODO(fxb/48720): use dedicated RoutingError type.
+    #[error("Routing failed")]
+    RoutingFailed(#[source] ModelError),
+}
+
+struct RouteEventsResult {
+    /// Maps from source name to a set of scope monikers
+    mapping: HashMap<CapabilityName, HashSet<AbsoluteMoniker>>,
+}
+
+impl RouteEventsResult {
+    fn new() -> Self {
+        Self { mapping: HashMap::new() }
+    }
+
+    fn insert(&mut self, source_name: CapabilityName, scope_moniker: AbsoluteMoniker) {
+        self.mapping.entry(source_name).or_insert(HashSet::new()).insert(scope_moniker);
+    }
+
+    fn len(&self) -> usize {
+        self.mapping.len()
+    }
+
+    fn contains_event(&self, event_name: &CapabilityName) -> bool {
+        self.mapping.contains_key(event_name)
+    }
+
+    fn to_vec(self) -> Vec<RoutedEvent> {
+        self.mapping
+            .into_iter()
+            .map(|(source_name, scope_monikers)| RoutedEvent { source_name, scope_monikers })
+            .collect()
+    }
+}
+
+impl EventSource {
+    /// Creates a new `EventSource` that will be used by the component identified with the given
+    /// `target_moniker`.
+    pub async fn new(
+        model: Weak<Model>,
+        target_moniker: AbsoluteMoniker,
+        registry: &Arc<EventRegistry>,
+        sync_mode: SyncMode,
+    ) -> Result<Self, ModelError> {
+        // TODO(fxb/48245): this shouldn't be done for any EventSource. Only for tests.
+        let resolve_instance_event_stream = Arc::new(Mutex::new(if sync_mode == SyncMode::Async {
+            None
+        } else {
+            Some(
+                registry
+                    .subscribe(
+                        &sync_mode,
+                        vec![RoutedEvent {
+                            source_name: EventType::Resolved.into(),
+                            scope_monikers: hashset!(target_moniker.clone()),
+                        }],
+                    )
+                    .await,
+            )
+        }));
+        Ok(Self {
+            registry: Arc::downgrade(&registry),
+            model,
+            target_moniker,
+            resolve_instance_event_stream,
+            debug: false,
+            sync_mode,
+        })
+    }
+
+    pub async fn new_for_debug(
+        model: Weak<Model>,
+        target_moniker: AbsoluteMoniker,
+        registry: &Arc<EventRegistry>,
+    ) -> Result<Self, ModelError> {
+        let mut event_source = Self::new(model, target_moniker, registry, SyncMode::Sync).await?;
+        event_source.debug = true;
+        Ok(event_source)
+    }
+
+    /// Drops the `Resolved` event stream, thereby permitting components within the
+    /// realm to be started.
+    pub async fn start_component_tree(&mut self) {
+        let mut resolve_instance_event_stream = self.resolve_instance_event_stream.lock().await;
+        *resolve_instance_event_stream = None;
+    }
+
+    /// Subscribes to events provided in the `events` vector.
+    ///
+    /// The client might request to subscribe to events that it's not allowed to see. Events
+    /// that are allowed should have been defined in its manifest and either offered to it or
+    /// requested from the current realm.
+    pub async fn subscribe(
+        &mut self,
+        events: Vec<CapabilityName>,
+    ) -> Result<EventStream, EventsError> {
+        // Register event capabilities if any. It identifies the sources of these events (might be the
+        // containing realm or this realm itself). It consturcts an "allow-list tree" of events and
+        // realms.
+        let events = if self.debug {
+            events
+                .into_iter()
+                .map(|event| RoutedEvent {
+                    source_name: event.clone(),
+                    scope_monikers: hashset!(AbsoluteMoniker::root()),
+                })
+                .collect()
+        } else {
+            let route_result =
+                self.route_events(&events).await.map_err(|e| EventsError::RoutingFailed(e))?;
+            if route_result.len() != events.len() {
+                let names = events
+                    .into_iter()
+                    .filter(|event| !route_result.contains_event(&event))
+                    .collect();
+                return Err(EventsError::NotAvailable { names });
+            }
+            route_result.to_vec()
+        };
+
+        // Create an event stream for the given events
+        if let Some(registry) = self.registry.upgrade() {
+            return Ok(registry.subscribe(&self.sync_mode, events).await);
+        }
+        Err(EventsError::RegistryNotFound)
+    }
+
+    /// Serves a `EventSource` FIDL protocol.
+    pub fn serve(self, stream: fsys::BlockingEventSourceRequestStream) {
+        fasync::spawn(async move {
+            serve_event_source_sync(self, stream).await;
+        });
+    }
+
+    async fn route_events(
+        &self,
+        events: &Vec<CapabilityName>,
+    ) -> Result<RouteEventsResult, ModelError> {
+        let model = self.model.upgrade().ok_or(ModelError::ModelNotAvailable)?;
+        let realm = model.look_up_realm(&self.target_moniker).await?;
+        let decl = {
+            let state = realm.lock_state().await;
+            state.as_ref().expect("route_events: not registered").decl().clone()
+        };
+
+        let mut result = RouteEventsResult::new();
+        for use_decl in decl.uses {
+            match &use_decl {
+                UseDecl::Event(event_decl) => {
+                    if !events.contains(&event_decl.target_name) {
+                        continue;
+                    }
+                    let (source_name, scope_moniker) = self.route_event(event_decl, &realm).await?;
+                    result.insert(source_name, scope_moniker);
+                }
+                _ => {}
+            }
+        }
+
+        Ok(result)
+    }
+
+    /// Routes an event and returns its source name and scope on success.
+    async fn route_event(
+        &self,
+        event_decl: &UseEventDecl,
+        realm: &Arc<Realm>,
+    ) -> Result<(CapabilityName, AbsoluteMoniker), ModelError> {
+        routing::route_use_event_capability(&UseDecl::Event(event_decl.clone()), &realm).await.map(
+            |source| match source {
+                CapabilitySource::Framework {
+                    capability: FrameworkCapability::Event(source_name),
+                    scope_moniker: Some(scope_moniker),
+                } => (source_name, scope_moniker),
+                _ => unreachable!(),
+            },
+        )
+    }
+}
+
+#[async_trait]
+impl CapabilityProvider for EventSource {
+    async fn open(
+        self: Box<Self>,
+        _flags: u32,
+        _open_mode: u32,
+        _relative_path: PathBuf,
+        server_end: zx::Channel,
+    ) -> Result<(), ModelError> {
+        let stream = ServerEnd::<fsys::BlockingEventSourceMarker>::new(server_end)
+            .into_stream()
+            .expect("could not convert channel into stream");
+        self.serve(stream);
+        Ok(())
+    }
+}
diff --git a/src/sys/component_manager/src/model/events/source_factory.rs b/src/sys/component_manager/src/model/events/source_factory.rs
new file mode 100644
index 0000000..0b26f76
--- /dev/null
+++ b/src/sys/component_manager/src/model/events/source_factory.rs
@@ -0,0 +1,248 @@
+// Copyright 2020 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    crate::{
+        capability::{CapabilityProvider, CapabilitySource, FrameworkCapability},
+        model::{
+            error::ModelError,
+            events::{event::SyncMode, registry::EventRegistry, source::EventSource},
+            hooks::{Event, EventPayload, EventType, Hook, HooksRegistration},
+            model::Model,
+            moniker::AbsoluteMoniker,
+        },
+    },
+    async_trait::async_trait,
+    cm_rust::{CapabilityPath, ComponentDecl},
+    futures::lock::Mutex,
+    lazy_static::lazy_static,
+    std::{
+        collections::HashMap,
+        convert::TryInto,
+        sync::{Arc, Weak},
+    },
+};
+
+lazy_static! {
+    pub static ref EVENT_SOURCE_SERVICE_PATH: CapabilityPath =
+        "/svc/fuchsia.sys2.EventSource".try_into().unwrap();
+    pub static ref EVENT_SOURCE_SYNC_SERVICE_PATH: CapabilityPath =
+        "/svc/fuchsia.sys2.BlockingEventSource".try_into().unwrap();
+}
+
+/// Allows to create `EventSource`s and tracks all the created ones.
+pub struct EventSourceFactory {
+    /// Tracks the event source used by each component ideantified with the given `moniker`.
+    event_source_registry: Mutex<HashMap<AbsoluteMoniker, EventSource>>,
+
+    /// The event registry. It subscribes to all events happening in the system and
+    /// routes them to subscribers.
+    // TODO(fxb/48512): instead of using a global registry integrate more with the hooks model.
+    event_registry: Arc<EventRegistry>,
+
+    /// The component model, needed to route events.
+    model: Weak<Model>,
+}
+
+impl EventSourceFactory {
+    /// Creates a new event source factory.
+    pub fn new(model: Weak<Model>) -> Self {
+        Self {
+            event_source_registry: Mutex::new(HashMap::new()),
+            event_registry: Arc::new(EventRegistry::new()),
+            model,
+        }
+    }
+
+    /// Creates the subscription to the required events.
+    /// `CapabilityReady` used to track events and associate them with the component that needs them
+    /// as well as the scoped that will be allowed. Also the EventSource protocol capability.
+    pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
+        let mut hooks = self.event_registry.hooks();
+        hooks.append(&mut vec![
+            // This hook provides the EventSource capability to components in the tree
+            HooksRegistration::new(
+                "EventSourceFactory",
+                vec![EventType::CapabilityRouted, EventType::Destroyed, EventType::Resolved],
+                Arc::downgrade(self) as Weak<dyn Hook>,
+            ),
+        ]);
+        hooks
+    }
+
+    /// Creates a debug event source.
+    pub async fn create_for_debug(&self) -> Result<EventSource, ModelError> {
+        EventSource::new_for_debug(
+            self.model.clone(),
+            AbsoluteMoniker::root(),
+            &self.event_registry,
+        )
+        .await
+    }
+
+    /// Creates a `EventSource` for the given `target_moniker`.
+    pub async fn create(
+        &self,
+        target_moniker: AbsoluteMoniker,
+        sync_mode: SyncMode,
+    ) -> Result<EventSource, ModelError> {
+        EventSource::new(self.model.clone(), target_moniker, &self.event_registry, sync_mode).await
+    }
+
+    /// Returns an EventSource. An EventSource holds an AbsoluteMoniker that
+    /// corresponds to the realm in which it will receive events.
+    async fn on_scoped_framework_capability_routed_async(
+        self: Arc<Self>,
+        capability_decl: &FrameworkCapability,
+        target_moniker: AbsoluteMoniker,
+        _scope_moniker: AbsoluteMoniker,
+        capability: Option<Box<dyn CapabilityProvider>>,
+    ) -> Result<Option<Box<dyn CapabilityProvider>>, ModelError> {
+        match (capability, capability_decl) {
+            (None, FrameworkCapability::Protocol(source_path))
+                if *source_path == *EVENT_SOURCE_SERVICE_PATH
+                    || *source_path == *EVENT_SOURCE_SYNC_SERVICE_PATH =>
+            {
+                let event_source_registry = self.event_source_registry.lock().await;
+                if let Some(event_source) = event_source_registry.get(&target_moniker) {
+                    Ok(Some(Box::new(event_source.clone()) as Box<dyn CapabilityProvider>))
+                } else {
+                    return Err(ModelError::capability_discovery_error(format!(
+                        "Unable to find EventSource in registry for {}",
+                        target_moniker
+                    )));
+                }
+            }
+            (c, _) => return Ok(c),
+        }
+    }
+
+    async fn on_destroyed_async(self: &Arc<Self>, target_moniker: &AbsoluteMoniker) {
+        let mut event_source_registry = self.event_source_registry.lock().await;
+        event_source_registry.remove(&target_moniker);
+    }
+
+    async fn on_resolved_async(
+        self: &Arc<Self>,
+        target_moniker: &AbsoluteMoniker,
+        decl: &ComponentDecl,
+    ) -> Result<(), ModelError> {
+        let sync_mode = if decl.uses_protocol_from_framework(&EVENT_SOURCE_SERVICE_PATH) {
+            SyncMode::Async
+        } else if decl.uses_protocol_from_framework(&EVENT_SOURCE_SYNC_SERVICE_PATH) {
+            SyncMode::Sync
+        } else {
+            return Ok(());
+        };
+        let key = target_moniker.clone();
+        let mut event_source_registry = self.event_source_registry.lock().await;
+        // It is currently assumed that a component instance's declaration
+        // is resolved only once. Someday, this may no longer be true if individual
+        // components can be updated.
+        assert!(!event_source_registry.contains_key(&key));
+        // An EventSource is created on resolution in order to ensure that discovery
+        // and resolution of children is not missed.
+        let event_source = self.create(key.clone(), sync_mode).await?;
+        event_source_registry.insert(key, event_source);
+        Ok(())
+    }
+
+    #[cfg(test)]
+    async fn has_event_source(&self, abs_moniker: &AbsoluteMoniker) -> bool {
+        let event_source_registry = self.event_source_registry.lock().await;
+        event_source_registry.contains_key(abs_moniker)
+    }
+}
+
+#[async_trait]
+impl Hook for EventSourceFactory {
+    async fn on(self: Arc<Self>, event: &Event) -> Result<(), ModelError> {
+        match &event.payload {
+            EventPayload::CapabilityRouted {
+                source:
+                    CapabilitySource::Framework { capability, scope_moniker: Some(scope_moniker) },
+                capability_provider,
+            } => {
+                let mut capability_provider = capability_provider.lock().await;
+                *capability_provider = self
+                    .on_scoped_framework_capability_routed_async(
+                        &capability,
+                        event.target_moniker.clone(),
+                        scope_moniker.clone(),
+                        capability_provider.take(),
+                    )
+                    .await?;
+            }
+            EventPayload::Destroyed => {
+                self.on_destroyed_async(&event.target_moniker).await;
+            }
+            EventPayload::Resolved { decl } => {
+                self.on_resolved_async(&event.target_moniker, decl).await?;
+            }
+            _ => {}
+        }
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use {
+        super::*,
+        crate::model::{
+            hooks::Hooks, model::ModelParams, resolver::ResolverRegistry,
+            testing::test_helpers::ComponentDeclBuilder,
+        },
+        cm_rust::{UseDecl, UseProtocolDecl, UseSource},
+    };
+
+    async fn dispatch_resolved_event(
+        hooks: &Hooks,
+        target_moniker: &AbsoluteMoniker,
+    ) -> Result<(), ModelError> {
+        let decl = ComponentDeclBuilder::new()
+            .use_(UseDecl::Protocol(UseProtocolDecl {
+                source: UseSource::Framework,
+                source_path: (*EVENT_SOURCE_SYNC_SERVICE_PATH).clone(),
+                target_path: (*EVENT_SOURCE_SYNC_SERVICE_PATH).clone(),
+            }))
+            .build();
+        let event =
+            Event::new(target_moniker.clone(), EventPayload::Resolved { decl: decl.clone() });
+        hooks.dispatch(&event).await
+    }
+
+    async fn dispatch_destroyed_event(
+        hooks: &Hooks,
+        target_moniker: &AbsoluteMoniker,
+    ) -> Result<(), ModelError> {
+        let event = Event::new(target_moniker.clone(), EventPayload::Destroyed);
+        hooks.dispatch(&event).await
+    }
+
+    #[fuchsia_async::run_singlethreaded(test)]
+    async fn drop_event_source_when_component_destroyed() {
+        let model = {
+            let registry = ResolverRegistry::new();
+            Arc::new(Model::new(ModelParams {
+                root_component_url: "test:///root".to_string(),
+                root_resolver_registry: registry,
+            }))
+        };
+        let event_source_factory = Arc::new(EventSourceFactory::new(Arc::downgrade(&model)));
+
+        let hooks = Hooks::new(None);
+        hooks.install(event_source_factory.hooks()).await;
+
+        let root = AbsoluteMoniker::root();
+
+        // Verify that there is no EventSource for the root until we dispatch the Resolved event.
+        assert!(!event_source_factory.has_event_source(&root).await);
+        dispatch_resolved_event(&hooks, &root).await.unwrap();
+        assert!(event_source_factory.has_event_source(&root).await);
+        // Verify that destroying the component destroys the EventSource.
+        dispatch_destroyed_event(&hooks, &root).await.unwrap();
+        assert!(!event_source_factory.has_event_source(&root).await);
+    }
+}
diff --git a/src/sys/component_manager/src/model/events/stream.rs b/src/sys/component_manager/src/model/events/stream.rs
new file mode 100644
index 0000000..7ea6ef5
--- /dev/null
+++ b/src/sys/component_manager/src/model/events/stream.rs
@@ -0,0 +1,74 @@
+// Copyright 2020 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    crate::model::{
+        events::{
+            dispatcher::EventDispatcher,
+            event::{Event, SyncMode},
+        },
+        hooks::EventType,
+        moniker::AbsoluteMoniker,
+    },
+    fuchsia_trace as trace,
+    futures::{channel::mpsc, StreamExt},
+    std::{
+        collections::HashSet,
+        sync::{Arc, Weak},
+    },
+};
+
+pub struct EventStream {
+    /// The receiving end of a channel of Events.
+    rx: mpsc::Receiver<Event>,
+    /// The sending end of a channel of Events.
+    tx: mpsc::Sender<Event>,
+    /// A vector of EventDispatchers to this EventStream.
+    /// EventStream assumes ownership of the dispatchers. They are
+    /// destroyed when this EventStream is destroyed.
+    dispatchers: Vec<Arc<EventDispatcher>>,
+}
+
+impl EventStream {
+    pub fn new() -> Self {
+        let (tx, rx) = mpsc::channel(2);
+        Self { rx, tx, dispatchers: vec![] }
+    }
+
+    pub fn create_dispatcher(
+        &mut self,
+        sync_mode: SyncMode,
+        scope_monikers: HashSet<AbsoluteMoniker>,
+    ) -> Weak<EventDispatcher> {
+        let dispatcher =
+            Arc::new(EventDispatcher::new(sync_mode.clone(), scope_monikers, self.tx.clone()));
+        self.dispatchers.push(dispatcher.clone());
+        Arc::downgrade(&dispatcher)
+    }
+
+    /// Receives the next event from the sender.
+    pub async fn next(&mut self) -> Option<Event> {
+        trace::duration!("component_manager", "events:next");
+        self.rx.next().await
+    }
+
+    /// Waits for an event with a particular EventType against a component with a
+    /// particular moniker. Ignores all other events.
+    pub async fn wait_until(
+        &mut self,
+        expected_event_type: EventType,
+        expected_moniker: AbsoluteMoniker,
+    ) -> Option<Event> {
+        while let Some(event) = self.next().await {
+            let actual_event_type = event.event.payload.type_();
+            if expected_moniker == event.event.target_moniker
+                && expected_event_type == actual_event_type
+            {
+                return Some(event);
+            }
+            event.resume();
+        }
+        None
+    }
+}
diff --git a/src/sys/component_manager/src/model/testing/routing_test_helpers.rs b/src/sys/component_manager/src/model/testing/routing_test_helpers.rs
index c04ebaf..f1421ce 100644
--- a/src/sys/component_manager/src/model/testing/routing_test_helpers.rs
+++ b/src/sys/component_manager/src/model/testing/routing_test_helpers.rs
@@ -639,7 +639,7 @@
 /// Contains functions to use capabilities in routing tests.
 pub mod capability_util {
     use {
-        super::*, crate::model::events::core::EVENT_SOURCE_SYNC_SERVICE_PATH,
+        super::*, crate::model::events::source_factory::EVENT_SOURCE_SYNC_SERVICE_PATH,
         cm_rust::NativeIntoFidl, fidl::endpoints::ServiceMarker,
         fidl_fuchsia_sys2::BlockingEventSourceMarker, std::path::PathBuf,
     };
diff --git a/src/sys/component_manager/src/model/tests/routing.rs b/src/sys/component_manager/src/model/tests/routing.rs
index 6e5b0ef..24dbdba 100644
--- a/src/sys/component_manager/src/model/tests/routing.rs
+++ b/src/sys/component_manager/src/model/tests/routing.rs
@@ -8,7 +8,7 @@
         framework::REALM_SERVICE,
         model::{
             error::ModelError,
-            events::core::EVENT_SOURCE_SYNC_SERVICE_PATH,
+            events::source_factory::EVENT_SOURCE_SYNC_SERVICE_PATH,
             hooks::{Event, EventPayload, EventType, Hook, HooksRegistration},
             moniker::AbsoluteMoniker,
             rights, routing,