blob: 20c9be777343461c17df4b52fd8fba4c28e6a2c4 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::model::{
component::{ComponentInstance, InstanceState},
error::ModelError,
events::{dispatcher::EventDispatcherScope, event::Event, filter::EventFilter},
hooks::{Event as HookEvent, EventType},
model::Model,
},
async_trait::async_trait,
cm_rust::CapabilityName,
fuchsia_async as fasync,
futures::{channel::mpsc, future::join_all, stream, SinkExt, StreamExt},
log::error,
moniker::{AbsoluteMoniker, ExtendedMoniker},
std::{
collections::{HashMap, HashSet},
sync::{Arc, Weak},
},
};
/// A component instance or component manager itself
pub enum ExtendedComponent {
ComponentManager,
ComponentInstance(Arc<ComponentInstance>),
}
impl From<Arc<ComponentInstance>> for ExtendedComponent {
fn from(instance: Arc<ComponentInstance>) -> Self {
Self::ComponentInstance(instance)
}
}
impl From<&Arc<ComponentInstance>> for ExtendedComponent {
fn from(instance: &Arc<ComponentInstance>) -> Self {
Self::ComponentInstance(instance.clone())
}
}
/// Implementors of this trait know how to synthesize an event.
#[async_trait]
pub trait EventSynthesisProvider: Send + Sync {
/// Provides a synthesized event applying the given `filter` under the given `component`.
async fn provide(&self, component: ExtendedComponent, filter: &EventFilter) -> Vec<HookEvent>;
}
/// Synthesis manager.
pub struct EventSynthesizer {
/// A reference to the model.
model: Weak<Model>,
/// Maps an event name to the provider for synthesis
providers: HashMap<CapabilityName, Arc<dyn EventSynthesisProvider>>,
}
impl EventSynthesizer {
/// Creates a new event synthesizer.
pub fn new(model: Weak<Model>) -> Self {
Self { model, providers: HashMap::new() }
}
/// Registers a new provider that will be used when synthesizing events of the type `event`.
pub fn register_provider(
&mut self,
event: EventType,
provider: Arc<dyn EventSynthesisProvider>,
) {
self.providers.insert(CapabilityName(event.to_string()), provider);
}
/// Spawns a synthesis task for the requested `events`. Resulting events will be sent on the
/// `sender` channel.
pub fn spawn_synthesis(
&self,
sender: mpsc::UnboundedSender<Event>,
events: HashMap<CapabilityName, Vec<EventDispatcherScope>>,
) {
SynthesisTask::new(&self, sender, events).spawn()
}
}
/// Information about an event that will be synthesized.
struct EventSynthesisInfo {
/// The provider of the synthesized event.
provider: Arc<dyn EventSynthesisProvider>,
/// The scopes under which the event will be synthesized.
scopes: Vec<EventDispatcherScope>,
}
struct SynthesisTask {
/// A reference to the model.
model: Weak<Model>,
/// The sender end of the channel where synthesized events will be sent.
sender: mpsc::UnboundedSender<Event>,
/// Information about the events to synthesize
event_infos: Vec<EventSynthesisInfo>,
}
impl SynthesisTask {
/// Creates a new synthesis task from the given events. It will ignore events for which the
/// `synthesizer` doesn't have a provider.
pub fn new(
synthesizer: &EventSynthesizer,
sender: mpsc::UnboundedSender<Event>,
mut events: HashMap<CapabilityName, Vec<EventDispatcherScope>>,
) -> Self {
let event_infos = synthesizer
.providers
.iter()
.filter_map(|(event_name, provider)| {
events
.remove(event_name)
.map(|scopes| EventSynthesisInfo { provider: provider.clone(), scopes })
})
.collect();
Self { model: synthesizer.model.clone(), sender, event_infos }
}
/// Spawns a task that will synthesize all events that were requested when creating the
/// `SynthesisTask`
pub fn spawn(self) {
if self.event_infos.is_empty() {
return;
}
fasync::Task::spawn(async move {
// If we can't find the component then we can't synthesize events.
// This isn't necessarily an error as the model or component might've been
// destroyed in the intervening time, so we just exit early.
if let Some(model) = self.model.upgrade() {
let sender = self.sender;
let futs = self
.event_infos
.into_iter()
.map(|event_info| Self::run(&model, sender.clone(), event_info));
for result in join_all(futs).await {
if let Err(e) = result {
error!("Event synthesis failed: {:?}", e);
}
}
}
})
.detach();
}
/// Performs a depth-first traversal of the component instance tree. It adds to the stream a
/// `Running` event for all components that are running. In the case of overlapping scopes,
/// events are deduped. It also synthesizes events that were requested which are synthesizable
/// (there's a provider for them). Those events will only be synthesized if their scope is
/// within the scope of a Running scope.
async fn run(
model: &Arc<Model>,
mut sender: mpsc::UnboundedSender<Event>,
info: EventSynthesisInfo,
) -> Result<(), ModelError> {
let mut visited_components = HashSet::new();
for scope in info.scopes {
// If the scope is component manager, synthesize the builtin events first and then
// proceed to synthesize from the root and down.
let scope_moniker = match scope.moniker {
ExtendedMoniker::ComponentManager => {
// Ignore this error. This can occur when the event stream is closed in the
// middle of synthesis. We can finish synthesizing if an error happens.
if let Err(_) = Self::send_events(
&info.provider,
&scope,
ExtendedComponent::ComponentManager,
&mut sender,
)
.await
{
return Ok(());
}
AbsoluteMoniker::root()
}
ExtendedMoniker::ComponentInstance(ref scope_moniker) => scope_moniker.clone(),
};
let root = model.look_up(&scope_moniker).await?;
let mut component_stream = get_subcomponents(root, visited_components.clone());
while let Some(component) = component_stream.next().await {
visited_components.insert(component.abs_moniker.clone());
if let Err(_) = Self::send_events(
&info.provider,
&scope,
ExtendedComponent::ComponentInstance(component),
&mut sender,
)
.await
{
return Ok(());
}
}
}
Ok(())
}
async fn send_events(
provider: &Arc<dyn EventSynthesisProvider>,
scope: &EventDispatcherScope,
target_component: ExtendedComponent,
sender: &mut mpsc::UnboundedSender<Event>,
) -> Result<(), anyhow::Error> {
let events = provider.provide(target_component, &scope.filter).await;
for event in events {
let event = Event { event, scope_moniker: scope.moniker.clone(), responder: None };
sender.send(event).await?;
}
Ok(())
}
}
/// Returns all components that are under the given `root` component. Skips the ones whose moniker
/// is contained in the `visited` set. The visited set is included for early pruning of a tree
/// branch.
fn get_subcomponents(
root: Arc<ComponentInstance>,
visited: HashSet<AbsoluteMoniker>,
) -> stream::BoxStream<'static, Arc<ComponentInstance>> {
let pending = vec![root];
stream::unfold((pending, visited), move |(mut pending, mut visited)| async move {
loop {
match pending.pop() {
None => return None,
Some(curr_component) => {
if visited.contains(&curr_component.abs_moniker) {
continue;
}
let state_guard = curr_component.lock_state().await;
match *state_guard {
InstanceState::New
| InstanceState::Discovered
| InstanceState::Destroyed => {}
InstanceState::Resolved(ref s) => {
for (_, child) in s.live_children() {
pending.push(child.clone());
}
}
}
drop(state_guard);
visited.insert(curr_component.abs_moniker.clone());
return Some((curr_component, (pending, visited)));
}
}
}
})
.boxed()
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::model::{
events::{
mode_set::EventModeSet,
registry::{EventRegistry, RoutedEvent, SubscriptionOptions},
stream::EventStream,
},
hooks::{EventError, EventErrorPayload, EventPayload},
testing::{routing_test_helpers::*, test_helpers::*},
},
cm_rust::{
DirectoryDecl, EventMode, ExposeDecl, ExposeDirectoryDecl, ExposeSource, ExposeTarget,
},
fidl_fuchsia_io2 as fio,
fuchsia_component::server::ServiceFs,
std::iter::FromIterator,
};
struct CreateStreamArgs<'a> {
registry: &'a EventRegistry,
scope_monikers: Vec<AbsoluteMoniker>,
events: Vec<EventType>,
include_builtin: bool,
}
// Shows that we see Running only for components that are bound at the moment of subscription.
#[fuchsia::test]
async fn synthesize_only_running() {
let test = setup_synthesis_test().await;
// Bind: b, c, e. We should see Running events only for these.
test.bind_instance(&vec!["b:0"].into()).await.expect("bind instance b success");
test.bind_instance(&vec!["c:0"].into()).await.expect("bind instance c success");
test.bind_instance(&vec!["c:0", "e:0"].into()).await.expect("bind instance e success");
let registry = test.builtin_environment.event_registry.clone();
let mut event_stream = create_stream(CreateStreamArgs {
registry: &registry,
scope_monikers: vec![AbsoluteMoniker::root()],
events: vec![EventType::Started, EventType::Running],
include_builtin: false,
})
.await;
// Bind f, this will be a Started event.
test.bind_instance(&vec!["c:0", "f:0"].into()).await.expect("bind instance success");
let mut result_monikers = HashSet::new();
while result_monikers.len() < 5 {
let event = event_stream.next().await.expect("got running event");
match event.event.result {
Ok(EventPayload::Running { .. }) => {
if event.event.target_moniker.to_string() == "/c:0/f:0" {
// There's a chance of receiving Started and Running for the instance we
// just bound if it happens to start while we are synthesizing. We assert
// that instance separately and count it once.
continue;
}
result_monikers.insert(event.event.target_moniker.to_string());
}
Ok(EventPayload::Started { .. }) => {
assert_eq!(event.event.target_moniker.to_string(), "/c:0/f:0");
result_monikers.insert(event.event.target_moniker.to_string());
}
payload => panic!("Expected running. Got: {:?}", payload),
}
}
// Events might be out of order, sort them
let expected_monikers = vec!["/", "/b:0", "/c:0", "/c:0/e:0", "/c:0/f:0"];
let mut result_monikers = Vec::from_iter(result_monikers.into_iter());
result_monikers.sort();
assert_eq!(expected_monikers, result_monikers);
}
// Shows that we see Running a single time even if the subscription scopes intersect.
#[fuchsia::test]
async fn synthesize_overlapping_scopes() {
let test = setup_synthesis_test().await;
test.bind_instance(&vec!["b:0"].into()).await.expect("bind instance b success");
test.bind_instance(&vec!["c:0"].into()).await.expect("bind instance c success");
test.bind_instance(&vec!["b:0", "d:0"].into()).await.expect("bind instance d success");
test.bind_instance(&vec!["c:0", "e:0"].into()).await.expect("bind instance e success");
test.bind_instance(&vec!["c:0", "e:0", "g:0"].into())
.await
.expect("bind instance g success");
test.bind_instance(&vec!["c:0", "e:0", "h:0"].into())
.await
.expect("bind instance h success");
test.bind_instance(&vec!["c:0", "f:0"].into()).await.expect("bind instance f success");
// Subscribing with scopes /c, /c/e and /c/e/h
let registry = test.builtin_environment.event_registry.clone();
let mut event_stream = create_stream(CreateStreamArgs {
registry: &registry,
scope_monikers: vec![
vec!["c:0"].into(),
vec!["c:0", "e:0"].into(),
vec!["c:0", "e:0", "h:0"].into(),
],
events: vec![EventType::Started, EventType::Running],
include_builtin: false,
})
.await;
let result_monikers = get_and_sort_running_events(&mut event_stream, 5).await;
let expected_monikers =
vec!["/c:0", "/c:0/e:0", "/c:0/e:0/g:0", "/c:0/e:0/h:0", "/c:0/f:0"];
assert_eq!(expected_monikers, result_monikers);
// Verify we don't get more Running events.
test.bind_instance(&vec!["c:0", "f:0", "i:0"].into())
.await
.expect("bind instance g success");
let event = event_stream.next().await.expect("got started event");
match event.event.result {
Ok(EventPayload::Started { .. }) => {
assert_eq!("/c:0/f:0/i:0", event.event.target_moniker.to_string());
}
payload => panic!("Expected started. Got: {:?}", payload),
}
}
// Shows that we see Running only for components under the given scopes.
#[fuchsia::test]
async fn synthesize_non_overlapping_scopes() {
let test = setup_synthesis_test().await;
test.bind_instance(&vec!["b:0"].into()).await.expect("bind instance b success");
test.bind_instance(&vec!["b:0", "d:0"].into()).await.expect("bind instance d success");
test.bind_instance(&vec!["c:0"].into()).await.expect("bind instance c success");
test.bind_instance(&vec!["c:0", "e:0"].into()).await.expect("bind instance e success");
test.bind_instance(&vec!["c:0", "e:0", "g:0"].into())
.await
.expect("bind instance g success");
test.bind_instance(&vec!["c:0", "e:0", "h:0"].into())
.await
.expect("bind instance g success");
test.bind_instance(&vec!["c:0", "f:0"].into()).await.expect("bind instance g success");
// Subscribing with scopes /c, /c/e and c/f/i
let registry = test.builtin_environment.event_registry.clone();
let mut event_stream = create_stream(CreateStreamArgs {
registry: &registry,
scope_monikers: vec![
vec!["c:0"].into(),
vec!["c:0", "e:0"].into(),
vec!["c:0", "f:0", "i:0"].into(),
],
events: vec![EventType::Started, EventType::Running],
include_builtin: false,
})
.await;
let result_monikers = get_and_sort_running_events(&mut event_stream, 5).await;
let expected_monikers =
vec!["/c:0", "/c:0/e:0", "/c:0/e:0/g:0", "/c:0/e:0/h:0", "/c:0/f:0"];
assert_eq!(expected_monikers, result_monikers);
// Verify we don't get more Running events.
test.bind_instance(&vec!["c:0", "f:0", "i:0"].into())
.await
.expect("bind instance g success");
let event = event_stream.next().await.expect("got started event");
match event.event.result {
Ok(EventPayload::Started { .. }) => {
assert_eq!("/c:0/f:0/i:0", event.event.target_moniker.to_string());
}
payload => panic!("Expected started. Got: {:?}", payload),
}
}
#[fuchsia::test]
async fn synthesize_capability_ready() {
let test = setup_synthesis_test().await;
test.bind_instance(&vec!["b:0"].into()).await.expect("bind instance b success");
test.bind_instance(&vec!["b:0", "d:0"].into()).await.expect("bind instance d success");
test.bind_instance(&vec!["c:0"].into()).await.expect("bind instance c success");
test.bind_instance(&vec!["c:0", "e:0"].into()).await.expect("bind instance e success");
test.bind_instance(&vec!["c:0", "e:0", "g:0"].into())
.await
.expect("bind instance g success");
test.bind_instance(&vec!["c:0", "e:0", "h:0"].into())
.await
.expect("bind instance h success");
test.bind_instance(&vec!["c:0", "f:0"].into()).await.expect("bind instance f success");
test.bind_instance(&vec!["c:0", "f:0", "i:0"].into())
.await
.expect("bind instance i success");
let registry = test.builtin_environment.event_registry.clone();
let mut event_stream = create_stream(CreateStreamArgs {
registry: &registry,
scope_monikers: vec![vec!["b:0"].into(), vec!["c:0", "e:0"].into()],
events: vec![EventType::Running, EventType::CapabilityReady],
include_builtin: false,
})
.await;
// We expect 4 CapabilityReady events and 5 running events.
// CR: b, d, e, g
// RN: b, d, e, g, h
let expected_capability_ready_monikers =
vec!["/b:0", "/b:0/d:0", "/c:0/e:0", "/c:0/e:0/g:0"];
let mut expected_running_monikers = expected_capability_ready_monikers.clone();
expected_running_monikers.extend(vec!["/c:0/e:0/h:0"].into_iter());
// We use sets given that the CapabilityReady could be dispatched twice: regular +
// synthesized.
let mut result_running_monikers = HashSet::new();
let mut result_capability_ready_monikers = HashSet::new();
while result_running_monikers.len() < 5 || result_capability_ready_monikers.len() < 4 {
let event = event_stream.next().await.expect("got running event");
match event.event.result {
Ok(EventPayload::Running { .. }) => {
result_running_monikers.insert(event.event.target_moniker.to_string());
}
// We get an error cuz the component is not really serving the directory, but is
// exposing it. For the purposes of the test, this is enough information.
Err(EventError {
event_error_payload: EventErrorPayload::CapabilityReady { name, .. },
..
}) if name == "diagnostics" => {
result_capability_ready_monikers.insert(event.event.target_moniker.to_string());
}
payload => panic!("Expected running or capability ready. Got: {:?}", payload),
}
}
let mut result_running = result_running_monikers.into_iter().collect::<Vec<_>>();
let mut result_capability_ready =
result_capability_ready_monikers.into_iter().collect::<Vec<_>>();
result_running.sort();
result_capability_ready.sort();
assert_eq!(result_running, expected_running_monikers);
assert_eq!(result_capability_ready, expected_capability_ready_monikers);
}
#[fuchsia::test]
async fn synthesize_capability_ready_builtin() {
let test = setup_synthesis_test().await;
let mut fs = ServiceFs::new();
test.builtin_environment.emit_diagnostics_for_test(&mut fs).expect("emitting diagnostics");
let registry = test.builtin_environment.event_registry.clone();
let mut event_stream = create_stream(CreateStreamArgs {
registry: &registry,
scope_monikers: vec![],
events: vec![EventType::CapabilityReady],
include_builtin: true,
})
.await;
let event = event_stream.next().await.expect("got running event");
match event.event.result {
Ok(EventPayload::CapabilityReady { name, .. }) if name == "diagnostics" => {
assert_eq!(event.event.target_moniker, ExtendedMoniker::ComponentManager);
}
payload => panic!("Expected running or capability ready. Got: {:?}", payload),
}
}
async fn create_stream<'a>(args: CreateStreamArgs<'a>) -> EventStream {
let mut scopes = args
.scope_monikers
.into_iter()
.map(|moniker| EventDispatcherScope {
moniker: moniker.into(),
filter: EventFilter::debug(),
mode_set: EventModeSet::new(cm_rust::EventMode::Sync),
})
.collect::<Vec<_>>();
if args.include_builtin {
scopes.push(EventDispatcherScope {
moniker: ExtendedMoniker::ComponentManager,
filter: EventFilter::debug(),
mode_set: EventModeSet::new(cm_rust::EventMode::Sync),
});
}
let events = args
.events
.into_iter()
.map(|event| RoutedEvent {
source_name: event.into(),
mode: EventMode::Async,
scopes: scopes.clone(),
})
.collect();
args.registry
.subscribe_with_routed_events(&SubscriptionOptions::default(), events)
.await
.expect("subscribe to event stream")
}
// Sets up the following topology (all children are lazy)
//
// a
// / \
// b c
// / / \
// d e f
// / \ \
// g h i
async fn setup_synthesis_test() -> RoutingTest {
let components = vec![
(
"a",
ComponentDeclBuilder::new()
.directory(diagnostics_decl())
.expose(expose_diagnostics_decl())
.add_lazy_child("b")
.add_lazy_child("c")
.build(),
),
(
"b",
ComponentDeclBuilder::new()
.directory(diagnostics_decl())
.expose(expose_diagnostics_decl())
.add_lazy_child("d")
.build(),
),
("c", ComponentDeclBuilder::new().add_lazy_child("e").add_lazy_child("f").build()),
(
"d",
ComponentDeclBuilder::new()
.directory(diagnostics_decl())
.expose(expose_diagnostics_decl())
.build(),
),
(
"e",
ComponentDeclBuilder::new()
.directory(diagnostics_decl())
.expose(expose_diagnostics_decl())
.add_lazy_child("g")
.add_lazy_child("h")
.build(),
),
("f", ComponentDeclBuilder::new().add_lazy_child("i").build()),
(
"g",
ComponentDeclBuilder::new()
.directory(diagnostics_decl())
.expose(expose_diagnostics_decl())
.build(),
),
("h", ComponentDeclBuilder::new().build()),
(
"i",
ComponentDeclBuilder::new()
.directory(diagnostics_decl())
.expose(expose_diagnostics_decl())
.build(),
),
];
RoutingTest::new("a", components).await
}
async fn get_and_sort_running_events(
event_stream: &mut EventStream,
total: usize,
) -> Vec<String> {
let mut result_monikers = Vec::new();
for _ in 0..total {
let event = event_stream.next().await.expect("got running event");
match event.event.result {
Ok(EventPayload::Running { .. }) => {
result_monikers.push(event.event.target_moniker.to_string());
}
payload => panic!("Expected running. Got: {:?}", payload),
}
}
result_monikers.sort();
result_monikers
}
fn diagnostics_decl() -> DirectoryDecl {
DirectoryDeclBuilder::new("diagnostics").path("/diagnostics").build()
}
fn expose_diagnostics_decl() -> ExposeDecl {
ExposeDecl::Directory(ExposeDirectoryDecl {
source: ExposeSource::Self_,
source_name: "diagnostics".into(),
target_name: "diagnostics".into(),
target: ExposeTarget::Framework,
rights: Some(fio::Operations::Connect),
subdir: None,
})
}
}