blob: 6351c9a60a9b53ae738700e4e4470f5f00b28fc8 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::model::{
error::ModelError,
events::{
error::EventsError,
registry::{
EventRegistry, EventSubscription, ExecutionMode, SubscriptionOptions,
SubscriptionType,
},
serve::serve_event_stream,
},
hooks::{Event, EventPayload, EventType, Hook, HooksRegistration},
},
async_trait::async_trait,
cm_rust::{CapabilityName, ComponentDecl, EventMode, UseDecl, UseEventStreamDecl},
fidl::endpoints::{create_endpoints, ServerEnd},
fidl_fuchsia_sys2 as fsys, fuchsia_async as fasync,
futures::lock::Mutex,
moniker::{AbsoluteMoniker, ExtendedMoniker},
std::{
collections::HashMap,
sync::{Arc, Weak},
},
};
pub struct EventStreamAttachment {
/// The name of this event stream.
name: String,
/// The server end of a component's event stream.
server_end: ServerEnd<fsys::EventStreamMarker>,
}
/// Creates EventStreams on component resolution according to statically declared
/// event_streams, and passes them along to components on start.
pub struct EventStreamProvider {
/// A shared reference to the event registry used to subscribe and dispatch events.
registry: Weak<EventRegistry>,
/// A mapping from a component instance's AbsoluteMoniker, to the set of
/// event streams and their corresponding paths in the component instance's out directory.
streams: Arc<Mutex<HashMap<ExtendedMoniker, Vec<EventStreamAttachment>>>>,
/// The mode in which component manager is running.
execution_mode: ExecutionMode,
}
impl EventStreamProvider {
pub fn new(registry: Weak<EventRegistry>, execution_mode: ExecutionMode) -> Self {
Self { registry, streams: Arc::new(Mutex::new(HashMap::new())), execution_mode }
}
pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
vec![HooksRegistration::new(
"EventStreamProvider",
vec![EventType::Destroyed, EventType::Resolved],
Arc::downgrade(self) as Weak<dyn Hook>,
)]
}
/// Returns the server end of the event stream with provided `name` associated with
/// the component with the provided `target_moniker`. This method returns None if such a stream
/// does not exist or the channel has already been taken.
pub async fn take_static_event_stream(
&self,
target_moniker: &ExtendedMoniker,
stream_name: String,
) -> Option<ServerEnd<fsys::EventStreamMarker>> {
let mut streams = self.streams.lock().await;
if let Some(event_streams) = streams.get_mut(&target_moniker) {
if let Some(pos) =
event_streams.iter().position(|event_stream| event_stream.name == stream_name)
{
let event_stream = event_streams.remove(pos);
return Some(event_stream.server_end);
}
}
return None;
}
/// Creates a static EventStream listening for the specified `events` for a given |target_moniker|
/// component and with the provided `target_path`.
pub async fn create_static_event_stream(
self: &Arc<Self>,
target_moniker: &ExtendedMoniker,
stream_name: String,
subscriptions: Vec<EventSubscription>,
) -> Result<fasync::Task<()>, ModelError> {
let registry = self.registry.upgrade().ok_or(EventsError::RegistryNotFound)?;
let subscription_type = match target_moniker {
ExtendedMoniker::ComponentManager => SubscriptionType::AboveRoot,
ExtendedMoniker::ComponentInstance(abs_moniker) => {
SubscriptionType::Component(abs_moniker.clone())
}
};
let options = SubscriptionOptions::new(subscription_type, self.execution_mode.clone());
let event_stream = registry.subscribe(&options, subscriptions).await?;
let mut streams = self.streams.lock().await;
let event_streams = streams.entry(target_moniker.clone()).or_insert(vec![]);
let (client_end, server_end) = create_endpoints::<fsys::EventStreamMarker>().unwrap();
event_streams.push(EventStreamAttachment { name: stream_name, server_end });
Ok(fasync::Task::spawn(async move {
serve_event_stream(event_stream, client_end).await;
}))
}
async fn on_component_destroyed(
self: &Arc<Self>,
target_moniker: &AbsoluteMoniker,
) -> Result<(), ModelError> {
let mut streams = self.streams.lock().await;
// Remove all event streams associated with the `target_moniker` component.
streams.remove(&ExtendedMoniker::ComponentInstance(target_moniker.clone()));
Ok(())
}
async fn on_component_resolved(
self: &Arc<Self>,
target_moniker: &AbsoluteMoniker,
decl: &ComponentDecl,
) -> Result<(), ModelError> {
for use_decl in &decl.uses {
match use_decl {
UseDecl::EventStream(UseEventStreamDecl { name, subscriptions }) => {
self.create_static_event_stream(
&ExtendedMoniker::ComponentInstance(target_moniker.clone()),
name.to_string(),
subscriptions
.iter()
.map(|subscription| EventSubscription {
event_name: CapabilityName::from(subscription.event_name.clone()),
mode: match subscription.mode {
cm_rust::EventMode::Sync => EventMode::Sync,
_ => EventMode::Async,
},
})
.collect(),
)
.await?
.detach();
}
_ => {}
}
}
Ok(())
}
}
#[async_trait]
impl Hook for EventStreamProvider {
async fn on(self: Arc<Self>, event: &Event) -> Result<(), ModelError> {
let target_moniker = event
.target_moniker
.unwrap_instance_moniker_or(ModelError::UnexpectedComponentManagerMoniker)?;
match &event.result {
Ok(EventPayload::Destroyed) => {
self.on_component_destroyed(target_moniker).await?;
}
Ok(EventPayload::Resolved { decl, .. }) => {
self.on_component_resolved(target_moniker, decl).await?;
}
_ => {}
}
Ok(())
}
}