blob: cc6780260e1bd430b6ca821bd8a4ab8751a0c818 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::model::events::{event::Event, registry::ComponentEventRoute},
::routing::event::EventFilter,
anyhow::{format_err, Error},
cm_rust::DictionaryValue,
futures::{channel::mpsc, lock::Mutex, sink::SinkExt},
hooks::{Event as ComponentEvent, EventPayload, TransferEvent},
maplit::btreemap,
moniker::{ExtendedMoniker, MonikerBase},
};
/// EventDispatcher and EventStream are two ends of a channel.
///
/// EventDispatcher represents the sending end of the channel.
///
/// An EventDispatcher receives events of a particular event type,
/// and dispatches though events out to the EventStream if they fall within
/// one of the scopes associated with the dispatcher.
///
/// EventDispatchers are owned by EventStreams. If an EventStream is dropped,
/// all corresponding EventDispatchers are dropped.
///
/// An EventStream is owned by the client - usually a test harness or a
/// EventSource. It receives a Event from an EventDispatcher and propagates it
/// to the client.
pub struct EventDispatcher {
// The moniker of the component subscribing to events.
subscriber: ExtendedMoniker,
/// Specifies the realms that this EventDispatcher can dispatch events from and under what
/// conditions.
scopes: Vec<EventDispatcherScope>,
/// An `mpsc::Sender` used to dispatch an event. Note that this
/// `mpsc::Sender` is wrapped in an Mutex<..> to allow it to be passed along
/// to other tasks for dispatch. The Event is a lifecycle event that occurred,
/// and the Option<Vec<ComponentEventRoute>> is the path that the event
/// took (if applicable) to reach the destination. This route
/// is used for dynamic permission checks (to filter events a component shouldn't have
/// access to), and to rebase the moniker of the event.
tx: Mutex<mpsc::UnboundedSender<(Event, Option<Vec<ComponentEventRoute>>)>>,
/// Route information used externally for evaluating scopes
// TODO(https://fxbug.dev/332389972): Remove or explain #[allow(dead_code)].
#[allow(dead_code)]
pub route: Vec<ComponentEventRoute>,
}
impl EventDispatcher {
#[cfg(test)]
pub fn new(
subscriber: ExtendedMoniker,
scopes: Vec<EventDispatcherScope>,
tx: mpsc::UnboundedSender<(Event, Option<Vec<ComponentEventRoute>>)>,
) -> Self {
Self::new_with_route(subscriber, scopes, tx, vec![])
}
pub fn new_with_route(
subscriber: ExtendedMoniker,
scopes: Vec<EventDispatcherScope>,
tx: mpsc::UnboundedSender<(Event, Option<Vec<ComponentEventRoute>>)>,
route: Vec<ComponentEventRoute>,
) -> Self {
// TODO(https://fxbug.dev/42125209): flatten scope_monikers. There might be monikers that are
// contained within another moniker in the list.
Self { subscriber, scopes, tx: Mutex::new(tx), route }
}
/// Sends the event to an event stream, if fired in the scope of `scope_moniker`. Returns
/// a responder which can be blocked on.
pub async fn dispatch(&self, event: &ComponentEvent) -> Result<(), Error> {
let maybe_scope = self.find_scope(&event);
if maybe_scope.is_none() {
return Err(format_err!("Could not find scope for event"));
}
let scope_moniker = maybe_scope.unwrap().moniker.clone();
let mut tx = self.tx.lock().await;
tx.send((Event { event: event.transfer().await, scope_moniker }, None)).await?;
Ok(())
}
fn find_scope(&self, event: &ComponentEvent) -> Option<&EventDispatcherScope> {
// TODO(https://fxbug.dev/42125209): once flattening of monikers is done, we would expect to have a single
// moniker here. For now taking the first one and ignoring the rest.
// Ensure that the event is coming from a realm within the scope of this dispatcher and
// matching the path filter if one exists.
self.scopes.iter().filter(|scope| scope.contains(&self.subscriber, &event)).next()
}
}
/// A scope for dispatching and filters on that scope.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EventDispatcherScope {
/// The moniker of the realm
pub moniker: ExtendedMoniker,
/// Filters for an event in that realm.
pub filter: EventFilter,
}
impl EventDispatcherScope {
pub fn new(moniker: ExtendedMoniker) -> Self {
Self { moniker, filter: EventFilter::new(None) }
}
pub fn with_filter(mut self, filter: EventFilter) -> Self {
self.filter = filter;
self
}
/// For the top-level EventStreams and event strems used in unit tests in the c_m codebase we
/// don't take filters into account.
pub fn for_debug(mut self) -> Self {
self.filter = EventFilter::debug();
self
}
/// Given the subscriber, indicates whether or not the event is contained
/// in this scope.
pub fn contains(&self, subscriber: &ExtendedMoniker, event: &ComponentEvent) -> bool {
let in_scope = match &event.payload {
EventPayload::CapabilityRequested { source_moniker, .. } => match &subscriber {
ExtendedMoniker::ComponentManager => true,
ExtendedMoniker::ComponentInstance(target) => *source_moniker == *target,
},
_ => {
let contained_in_realm = event.target_moniker.has_prefix(&self.moniker);
let is_component_instance = matches!(
&event.target_moniker,
ExtendedMoniker::ComponentInstance(instance) if instance.is_root()
);
contained_in_realm || is_component_instance
}
};
if !in_scope {
return false;
}
// TODO(fxbug/122227): Creating hashmaps on every lookup is not ideal, but in practice this
// likely doesn't happen too often.
let filterable_fields = match &event.payload {
EventPayload::CapabilityRequested { name, .. } => Some(btreemap! {
"name".to_string() => DictionaryValue::Str(name.into())
}),
_ => None,
};
self.filter.has_fields(&filterable_fields)
}
}
#[cfg(test)]
mod tests {
use {
super::*, assert_matches::assert_matches, fuchsia_zircon as zx, futures::StreamExt,
hooks::CapabilityReceiver, moniker::Moniker, sandbox::Message, std::sync::Arc,
};
struct EventDispatcherFactory {
/// The receiving end of a channel of Events.
rx: mpsc::UnboundedReceiver<(Event, Option<Vec<ComponentEventRoute>>)>,
/// The sending end of a channel of Events.
tx: mpsc::UnboundedSender<(Event, Option<Vec<ComponentEventRoute>>)>,
}
impl EventDispatcherFactory {
fn new() -> Self {
let (tx, rx) = mpsc::unbounded();
Self { rx, tx }
}
/// Receives the next event from the sender.
pub async fn next_event(&mut self) -> Option<ComponentEvent> {
self.rx.next().await.map(|(e, _)| e.event)
}
fn create_dispatcher(&self, subscriber: ExtendedMoniker) -> Arc<EventDispatcher> {
let scopes = vec![EventDispatcherScope::new(Moniker::root().into()).for_debug()];
Arc::new(EventDispatcher::new(subscriber, scopes, self.tx.clone()))
}
}
async fn dispatch_capability_requested_event(
dispatcher: &EventDispatcher,
source_moniker: &Moniker,
) -> Result<(), Error> {
let (_, capability_server_end) = zx::Channel::create();
let (receiver, sender) = CapabilityReceiver::new();
let event = ComponentEvent {
target_moniker: ExtendedMoniker::ComponentInstance(Moniker::root()),
component_url: "fuchsia-pkg://root".parse().unwrap(),
payload: EventPayload::CapabilityRequested {
source_moniker: source_moniker.clone(),
name: "foo".to_string(),
receiver,
},
timestamp: zx::Time::get_monotonic(),
};
sender.send(Message { channel: capability_server_end }).unwrap();
dispatcher.dispatch(&event).await
}
// This test verifies that the CapabilityRequested event can only be sent to a source
// that matches its source moniker.
#[fuchsia::test]
async fn can_send_capability_requested_to_source() {
// Verify we can dispatch to a debug source.
// Sync events get a responder if the message was dispatched.
let mut factory = EventDispatcherFactory::new();
let dispatcher = factory.create_dispatcher(ExtendedMoniker::ComponentManager);
let source_moniker = vec!["root:0", "a:0", "b:0", "c:0"].try_into().unwrap();
assert!(dispatch_capability_requested_event(&dispatcher, &source_moniker).await.is_ok());
assert_matches!(
factory.next_event().await,
Some(ComponentEvent { payload: EventPayload::CapabilityRequested { .. }, .. })
);
// Verify that we cannot dispatch the CapabilityRequested event to the root component.
let subscriber = ExtendedMoniker::ComponentInstance(vec!["root:0"].try_into().unwrap());
let dispatcher = factory.create_dispatcher(subscriber);
assert!(dispatch_capability_requested_event(&dispatcher, &source_moniker).await.is_err());
// Verify that we cannot dispatch the CapabilityRequested event to the root:0/a:0 component.
let subscriber =
ExtendedMoniker::ComponentInstance(vec!["root:0", "a:0"].try_into().unwrap());
let dispatcher = factory.create_dispatcher(subscriber);
assert!(dispatch_capability_requested_event(&dispatcher, &source_moniker).await.is_err());
// Verify that we cannot dispatch the CapabilityRequested event to the root:0/a:0/b:0 component.
let subscriber =
ExtendedMoniker::ComponentInstance(vec!["root:0", "a:0", "b:0"].try_into().unwrap());
let dispatcher = factory.create_dispatcher(subscriber);
assert!(dispatch_capability_requested_event(&dispatcher, &source_moniker).await.is_err());
// Verify that we CAN dispatch the CapabilityRequested event to the root:0/a:0/b:0/c:0 component.
let subscriber = ExtendedMoniker::ComponentInstance(
vec!["root:0", "a:0", "b:0", "c:0"].try_into().unwrap(),
);
let dispatcher = factory.create_dispatcher(subscriber);
assert!(dispatch_capability_requested_event(&dispatcher, &source_moniker).await.is_ok());
assert_matches!(
factory.next_event().await,
Some(ComponentEvent { payload: EventPayload::CapabilityRequested { .. }, .. })
);
// Verify that we cannot dispatch the CapabilityRequested event to the root:0/a:0/b:0/c:0/d:0 component.
let subscriber = ExtendedMoniker::ComponentInstance(
vec!["root:0", "a:0", "b:0", "c:0", "d:0"].try_into().unwrap(),
);
let dispatcher = factory.create_dispatcher(subscriber);
assert!(dispatch_capability_requested_event(&dispatcher, &source_moniker).await.is_err());
}
}