blob: 5659489ae49a7a86795b8f934d407999fe0b243c [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
capability::{CapabilitySource, InternalCapability},
model::{
component::{ComponentInstance, InstanceState},
error::ModelError,
events::{
dispatcher::{EventDispatcher, EventDispatcherScope},
error::EventsError,
filter::EventFilter,
mode_set::EventModeSet,
stream::EventStream,
synthesizer::{EventSynthesisProvider, EventSynthesizer},
},
hooks::{
Event as ComponentEvent, EventPayload, EventType, HasEventType, Hook,
HooksRegistration,
},
model::Model,
routing::{route_capability, RouteRequest, RouteSource},
},
},
async_trait::async_trait,
cm_rust::{CapabilityName, EventMode, UseDecl, UseEventDecl},
fuchsia_trace as trace,
futures::lock::Mutex,
moniker::{AbsoluteMoniker, ExtendedMoniker},
std::{
collections::HashMap,
sync::{Arc, Weak},
},
};
#[derive(Debug)]
pub struct RoutedEvent {
pub source_name: CapabilityName,
pub mode: EventMode,
pub scopes: Vec<EventDispatcherScope>,
}
#[derive(Debug)]
pub struct RequestedEventState {
pub mode: EventMode,
pub scopes: Vec<EventDispatcherScope>,
}
impl RequestedEventState {
pub fn new(mode: EventMode) -> Self {
Self { mode, scopes: Vec::new() }
}
}
#[derive(Debug)]
pub struct RouteEventsResult {
/// Maps from source name to a mode and set of scope monikers.
mapping: HashMap<CapabilityName, RequestedEventState>,
}
impl RouteEventsResult {
fn new() -> Self {
Self { mapping: HashMap::new() }
}
fn insert(
&mut self,
source_name: CapabilityName,
mode: EventMode,
scope: EventDispatcherScope,
) {
let event_state = self.mapping.entry(source_name).or_insert(RequestedEventState::new(mode));
if !event_state.scopes.contains(&scope) {
event_state.scopes.push(scope);
}
}
pub fn len(&self) -> usize {
self.mapping.len()
}
pub fn contains_event(&self, event_name: &CapabilityName) -> bool {
self.mapping.contains_key(event_name)
}
pub fn to_vec(self) -> Vec<RoutedEvent> {
self.mapping
.into_iter()
.map(|(source_name, state)| RoutedEvent {
source_name,
mode: state.mode,
scopes: state.scopes,
})
.collect()
}
}
#[derive(Clone)]
pub struct SubscriptionOptions {
/// Determines how event routing is done.
pub subscription_type: SubscriptionType,
/// Specifies the mode ComponentManager was started in.
pub execution_mode: ExecutionMode,
}
#[derive(Debug)]
pub struct EventSubscription {
pub event_name: CapabilityName,
/// Determines whether component manager waits for a response from the
/// event receiver.
pub mode: EventMode,
}
impl EventSubscription {
pub fn new(event_name: CapabilityName, mode: EventMode) -> Self {
Self { event_name, mode }
}
}
impl SubscriptionOptions {
pub fn new(subscription_type: SubscriptionType, execution_mode: ExecutionMode) -> Self {
Self { subscription_type, execution_mode }
}
}
impl Default for SubscriptionOptions {
fn default() -> SubscriptionOptions {
SubscriptionOptions {
subscription_type: SubscriptionType::Component(AbsoluteMoniker::root()),
execution_mode: ExecutionMode::Production,
}
}
}
#[derive(Clone, PartialEq, Eq)]
pub enum SubscriptionType {
/// Indicates that a client above the root is subscribing to events (e.g. a test).
/// Event routing will be bypassed and all events can be subscribed.
AboveRoot,
/// Indicates that a component is subscribing to events and the target is
/// the provided AbsoluteMoniker.
Component(AbsoluteMoniker),
}
#[derive(Clone)]
pub enum ExecutionMode {
/// Indicates that the component manager is running in Debug mode. This
/// enables some additional events such as CapabilityRouted.
Debug,
/// Indicates that the component manager is running in Production mode.
Production,
}
impl ExecutionMode {
pub fn is_debug(&self) -> bool {
match self {
ExecutionMode::Debug => true,
ExecutionMode::Production => false,
}
}
}
/// Subscribes to events from multiple tasks and sends events to all of them.
pub struct EventRegistry {
model: Weak<Model>,
dispatcher_map: Arc<Mutex<HashMap<CapabilityName, Vec<Weak<EventDispatcher>>>>>,
event_synthesizer: EventSynthesizer,
}
impl EventRegistry {
pub fn new(model: Weak<Model>) -> Self {
let event_synthesizer = EventSynthesizer::new(model.clone());
Self { model, dispatcher_map: Arc::new(Mutex::new(HashMap::new())), event_synthesizer }
}
pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
vec![
// This hook must be registered with all events.
// However, a task will only receive events to which it subscribed.
HooksRegistration::new(
"EventRegistry",
EventType::values(),
Arc::downgrade(self) as Weak<dyn Hook>,
),
]
}
/// Register a provider for an synthesized event.
pub fn register_synthesis_provider(
&mut self,
event: EventType,
provider: Arc<dyn EventSynthesisProvider>,
) {
self.event_synthesizer.register_provider(event, provider);
}
/// Subscribes to events of a provided set of EventTypes.
pub async fn subscribe(
&self,
options: &SubscriptionOptions,
subscriptions: Vec<EventSubscription>,
) -> Result<EventStream, ModelError> {
// Register event capabilities if any. It identifies the sources of these events (might be
// the parent or this component itself). It consturcts an "allow-list tree" of events and
// component instances.
let mut event_names = HashMap::new();
for subscription in subscriptions {
if event_names
.insert(subscription.event_name.clone(), subscription.mode.clone())
.is_some()
{
return Err(EventsError::duplicate_event(subscription.event_name).into());
}
}
let events = match &options.subscription_type {
SubscriptionType::AboveRoot => event_names
.iter()
.map(|(source_name, mode)| RoutedEvent {
source_name: source_name.clone(),
mode: mode.clone(),
scopes: vec![
EventDispatcherScope::new(AbsoluteMoniker::root().into()).for_debug()
],
})
.collect(),
SubscriptionType::Component(target_moniker) => {
let route_result = self.route_events(&target_moniker, &event_names).await?;
if route_result.len() != event_names.len() {
let names = event_names
.keys()
.into_iter()
.filter(|event_name| !route_result.contains_event(&event_name))
.cloned()
.collect();
return Err(EventsError::not_available(names).into());
}
route_result.to_vec()
}
};
self.subscribe_with_routed_events(&options, events).await
}
pub async fn subscribe_with_routed_events(
&self,
options: &SubscriptionOptions,
events: Vec<RoutedEvent>,
) -> Result<EventStream, ModelError> {
// TODO(fxbug.dev/48510): get rid of this channel and use FIDL directly.
let mut event_stream = EventStream::new();
let mut dispatcher_map = self.dispatcher_map.lock().await;
for event in &events {
if EventType::synthesized_only()
.iter()
.all(|e| e.to_string() != event.source_name.str())
{
let dispatchers = dispatcher_map.entry(event.source_name.clone()).or_insert(vec![]);
let dispatcher = event_stream.create_dispatcher(
options.clone(),
event.mode.clone(),
event.scopes.clone(),
);
dispatchers.push(dispatcher);
}
}
let events = events.into_iter().map(|event| (event.source_name, event.scopes)).collect();
self.event_synthesizer.spawn_synthesis(event_stream.sender(), events);
Ok(event_stream)
}
// TODO(fxbug.dev/48510): get rid of this
/// Sends the event to all dispatchers and waits to be unblocked by all
async fn dispatch(&self, event: &ComponentEvent) -> Result<(), ModelError> {
// Copy the senders so we don't hold onto the sender map lock
// If we didn't do this, it is possible to deadlock while holding onto this lock.
// For example,
// Task A : call dispatch(event1) -> lock on sender map -> send -> wait for responders
// Task B : call dispatch(event2) -> lock on sender map
// If task B was required to respond to event1, then this is a deadlock.
// Neither task can make progress.
let dispatchers = {
let mut dispatcher_map = self.dispatcher_map.lock().await;
if let Some(dispatchers) = dispatcher_map.get_mut(&event.event_type().into()) {
let mut strong_dispatchers = vec![];
dispatchers.retain(|dispatcher| {
if let Some(dispatcher) = dispatcher.upgrade() {
strong_dispatchers.push(dispatcher);
true
} else {
false
}
});
strong_dispatchers
} else {
// There were no senders for this event. Do nothing.
return Ok(());
}
};
let mut responder_channels = vec![];
for dispatcher in dispatchers {
let result = dispatcher.dispatch(event).await;
match result {
Ok(Some(responder_channel)) => {
// A future can be canceled if the EventStream was dropped after
// a send. We don't crash the system when this happens. It is
// perfectly valid for a EventStream to be dropped. That simply
// means that the EventStream is no longer interested in future
// events. So we force each future to return a success. This
// ensures that all the futures can be driven to completion.
let responder_channel = async move {
trace::duration!("component_manager", "events:wait_for_resume");
let _ = responder_channel.await;
trace::flow_end!("component_manager", "event", event.id);
};
responder_channels.push(responder_channel);
}
// There's nothing to do if event is outside the scope of the given
// `EventDispatcher`.
Ok(None) => (),
Err(_) => {
// A send can fail if the EventStream was dropped. We don't
// crash the system when this happens. It is perfectly
// valid for a EventStream to be dropped. That simply means
// that the EventStream is no longer interested in future
// events.
}
}
}
// Wait until all tasks have used the responder to unblock.
{
trace::duration!("component_manager", "events:wait_for_all_resume");
futures::future::join_all(responder_channels).await;
}
Ok(())
}
pub async fn route_events(
&self,
target_moniker: &AbsoluteMoniker,
events: &HashMap<CapabilityName, EventMode>,
) -> Result<RouteEventsResult, ModelError> {
let model = self.model.upgrade().ok_or(ModelError::ModelNotAvailable)?;
let component = model.look_up(&target_moniker).await?;
let decl = {
let state = component.lock_state().await;
match *state {
InstanceState::New | InstanceState::Discovered => {
panic!("route_events: not resolved");
}
InstanceState::Resolved(ref s) => s.decl().clone(),
InstanceState::Destroyed => {
return Err(ModelError::instance_not_found(target_moniker.clone()));
}
}
};
let mut result = RouteEventsResult::new();
for use_decl in decl.uses {
match use_decl {
UseDecl::Event(event_decl) => {
if let Some(mode) = events.get(&event_decl.target_name) {
let (source_name, scope_moniker) =
Self::route_event(event_decl.clone(), &component).await?;
let scope = EventDispatcherScope::new(scope_moniker)
.with_filter(EventFilter::new(event_decl.filter))
.with_mode_set(EventModeSet::new(event_decl.mode));
if scope.mode_set.supports_mode(mode) {
result.insert(source_name, mode.clone(), scope);
}
}
}
_ => {}
}
}
Ok(result)
}
/// Routes an event and returns its source name and scope on success.
async fn route_event(
event_decl: UseEventDecl,
component: &Arc<ComponentInstance>,
) -> Result<(CapabilityName, ExtendedMoniker), ModelError> {
let route_source = route_capability(RouteRequest::UseEvent(event_decl), component).await?;
match route_source {
RouteSource::Event(CapabilitySource::Framework {
capability: InternalCapability::Event(source_name),
component,
}) => Ok((source_name, component.moniker.into())),
RouteSource::Event(CapabilitySource::Builtin {
capability: InternalCapability::Event(source_name),
..
}) if source_name == "capability_ready".into() => {
Ok((source_name, ExtendedMoniker::ComponentManager))
}
_ => unreachable!(),
}
}
#[cfg(test)]
async fn dispatchers_per_event_type(&self, event_type: EventType) -> usize {
let dispatcher_map = self.dispatcher_map.lock().await;
dispatcher_map
.get(&event_type.into())
.map(|dispatchers| dispatchers.len())
.unwrap_or_default()
}
}
#[async_trait]
impl Hook for EventRegistry {
async fn on(self: Arc<Self>, event: &ComponentEvent) -> Result<(), ModelError> {
match &event.result {
Ok(EventPayload::CapabilityRouted { source, .. }) => {
// Only dispatch the CapabilityRouted event for capabilities
// that can be in a component's namespace.
// TODO(fxbug.dev/54251): In the future, if we wish to be able to mock or
// interpose runners, we can introduce, a new, separate event
// type.
if source.can_be_in_namespace() {
return self.dispatch(event).await;
}
return Ok(());
}
_ => self.dispatch(event).await,
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{
capability::ComponentCapability,
model::{
component::ComponentInstance,
environment::Environment,
events::event::Event,
hooks::{Event as ComponentEvent, EventError, EventErrorPayload, EventPayload},
testing::test_helpers::{TestModelResult, *},
},
},
::routing::{
component_instance::ComponentInstanceInterface, error::ComponentInstanceError,
},
cm_rust::ProtocolDecl,
fuchsia_async as fasync, fuchsia_zircon as zx,
matches::assert_matches,
moniker::AbsoluteMoniker,
};
async fn dispatch_capability_requested_event(
registry: &EventRegistry,
) -> Result<(), ModelError> {
let (_, capability_server_end) = zx::Channel::create().unwrap();
let capability_server_end = Arc::new(Mutex::new(Some(capability_server_end)));
let event = ComponentEvent::new_for_test(
AbsoluteMoniker::root(),
"fuchsia-pkg://root",
Ok(EventPayload::CapabilityRequested {
source_moniker: AbsoluteMoniker::root(),
name: "foo".to_string(),
capability: capability_server_end,
}),
);
registry.dispatch(&event).await
}
async fn dispatch_fake_event(registry: &EventRegistry) -> Result<(), ModelError> {
let event = ComponentEvent::new_for_test(
AbsoluteMoniker::root(),
"fuchsia-pkg://root",
Ok(EventPayload::Discovered),
);
registry.dispatch(&event).await
}
async fn dispatch_error_event(registry: &EventRegistry) -> Result<(), ModelError> {
let root = AbsoluteMoniker::root();
let event = ComponentEvent::new_for_test(
root.clone(),
"fuchsia-pkg://root",
Err(EventError::new(
&ModelError::instance_not_found(root.clone()),
EventErrorPayload::Resolved,
)),
);
registry.dispatch(&event).await
}
#[fuchsia::test]
async fn capability_routed_dispatch() -> Result<(), ModelError> {
let TestModelResult { model, .. } = TestEnvironmentBuilder::new().build().await;
let registry = EventRegistry::new(Arc::downgrade(&model));
let mut event_stream = registry
.subscribe(
&SubscriptionOptions::new(SubscriptionType::AboveRoot, ExecutionMode::Debug),
vec![EventSubscription::new(EventType::CapabilityRouted.into(), EventMode::Sync)],
)
.await
.expect("subscribe succeeds");
assert_eq!(1, registry.dispatchers_per_event_type(EventType::CapabilityRouted).await);
let component = ComponentInstance::new_root(
Environment::empty(),
Weak::new(),
Weak::new(),
"test:///root".to_string(),
);
let capability = ComponentCapability::Protocol(ProtocolDecl {
name: "foo".into(),
source_path: "/svc/foo".parse().unwrap(),
});
let source = CapabilitySource::Component {
capability: capability.clone(),
component: component.as_weak(),
};
let capability_provider = Arc::new(Mutex::new(None));
let event = ComponentEvent::new_for_test(
AbsoluteMoniker::root(),
"fuchsia-pkg://root",
Ok(EventPayload::CapabilityRouted { source: source.clone(), capability_provider }),
);
fasync::Task::spawn(async move {
registry.dispatch(&event).await.expect("failed dispatch");
})
.detach();
let event = event_stream.next().await.expect("null event");
assert_matches!(event, Event {
event: ComponentEvent {
result: Ok(EventPayload::CapabilityRouted {
source: CapabilitySource::Component {
capability, ..
},
..
}),
..
},
scope_moniker,
..
} if capability == capability && scope_moniker == AbsoluteMoniker::root().into());
Ok(())
}
#[fuchsia::test]
async fn drop_dispatcher_when_event_stream_dropped() {
let TestModelResult { model, .. } = TestEnvironmentBuilder::new().build().await;
let event_registry = EventRegistry::new(Arc::downgrade(&model));
assert_eq!(0, event_registry.dispatchers_per_event_type(EventType::Discovered).await);
let mut event_stream_a = event_registry
.subscribe(
&SubscriptionOptions::new(SubscriptionType::AboveRoot, ExecutionMode::Production),
vec![EventSubscription::new(EventType::Discovered.into(), EventMode::Async)],
)
.await
.expect("subscribe succeeds");
assert_eq!(1, event_registry.dispatchers_per_event_type(EventType::Discovered).await);
let mut event_stream_b = event_registry
.subscribe(
&SubscriptionOptions::new(SubscriptionType::AboveRoot, ExecutionMode::Production),
vec![EventSubscription::new(EventType::Discovered.into(), EventMode::Async)],
)
.await
.expect("subscribe succeeds");
assert_eq!(2, event_registry.dispatchers_per_event_type(EventType::Discovered).await);
dispatch_fake_event(&event_registry).await.unwrap();
// Verify that both EventStreams receive the event.
assert!(event_stream_a.next().await.is_some());
assert!(event_stream_b.next().await.is_some());
assert_eq!(2, event_registry.dispatchers_per_event_type(EventType::Discovered).await);
drop(event_stream_a);
// EventRegistry won't drop EventDispatchers until an event is dispatched.
assert_eq!(2, event_registry.dispatchers_per_event_type(EventType::Discovered).await);
dispatch_fake_event(&event_registry).await.unwrap();
assert!(event_stream_b.next().await.is_some());
assert_eq!(1, event_registry.dispatchers_per_event_type(EventType::Discovered).await);
drop(event_stream_b);
dispatch_fake_event(&event_registry).await.unwrap();
assert_eq!(0, event_registry.dispatchers_per_event_type(EventType::Discovered).await);
}
#[fuchsia::test]
async fn event_error_dispatch() {
let TestModelResult { model, .. } = TestEnvironmentBuilder::new().build().await;
let event_registry = EventRegistry::new(Arc::downgrade(&model));
assert_eq!(0, event_registry.dispatchers_per_event_type(EventType::Resolved).await);
let mut event_stream = event_registry
.subscribe(
&SubscriptionOptions::new(SubscriptionType::AboveRoot, ExecutionMode::Production),
vec![EventSubscription::new(EventType::Resolved.into(), EventMode::Async)],
)
.await
.expect("subscribed to event stream");
assert_eq!(1, event_registry.dispatchers_per_event_type(EventType::Resolved).await);
dispatch_error_event(&event_registry).await.unwrap();
let event = event_stream.next().await.map(|e| e.event).unwrap();
// Verify that we received the event error.
assert_matches!(
event.result,
Err(EventError {
source: ModelError::ComponentInstanceError {
err: ComponentInstanceError::InstanceNotFound { .. }
},
event_error_payload: EventErrorPayload::Resolved,
})
);
}
#[fuchsia::test]
async fn capability_requested_over_two_event_streams() {
let TestModelResult { model, .. } = TestEnvironmentBuilder::new().build().await;
let event_registry = EventRegistry::new(Arc::downgrade(&model));
assert_eq!(
0,
event_registry.dispatchers_per_event_type(EventType::CapabilityRequested).await
);
let options =
SubscriptionOptions::new(SubscriptionType::AboveRoot, ExecutionMode::Production);
let mut event_stream_a = event_registry
.subscribe(
&options,
vec![EventSubscription::new(
EventType::CapabilityRequested.into(),
EventMode::Async,
)],
)
.await
.expect("subscribe succeeds");
assert_eq!(
1,
event_registry.dispatchers_per_event_type(EventType::CapabilityRequested).await
);
let mut event_stream_b = event_registry
.subscribe(
&options,
vec![EventSubscription::new(
EventType::CapabilityRequested.into(),
EventMode::Async,
)],
)
.await
.expect("subscribe succeeds");
assert_eq!(
2,
event_registry.dispatchers_per_event_type(EventType::CapabilityRequested).await
);
dispatch_capability_requested_event(&event_registry).await.unwrap();
let event_a = event_stream_a.next().await.map(|e| e.event).unwrap();
// Verify that we received a valid CapabilityRequested event.
assert_matches!(event_a.result, Ok(EventPayload::CapabilityRequested { .. }));
let event_b = event_stream_b.next().await.map(|e| e.event).unwrap();
// Verify that we received the event error.
assert_matches!(
event_b.result,
Err(EventError {
event_error_payload: EventErrorPayload::CapabilityRequested { .. },
..
})
);
}
}