blob: 1240861f65408cfee92bae42b02cffb0fae01b2a [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
capability::CapabilityProvider,
channel,
model::{
error::ModelError,
events::{
error::EventsError,
event::SyncMode,
registry::{EventRegistry, ExecutionMode, SubscriptionOptions, SubscriptionType},
serve::serve_event_source_sync,
stream::EventStream,
},
hooks::EventType,
model::Model,
},
},
async_trait::async_trait,
cm_rust::CapabilityName,
fidl::endpoints::ServerEnd,
fidl_fuchsia_sys2 as fsys, fuchsia_async as fasync, fuchsia_zircon as zx,
futures::lock::Mutex,
std::{
path::PathBuf,
sync::{Arc, Weak},
},
};
/// A system responsible for implementing basic events functionality on a scoped realm.
#[derive(Clone)]
pub struct EventSource {
/// The component model, needed to route events.
model: Weak<Model>,
/// A shared reference to the event registry used to subscribe and dispatch events.
registry: Weak<EventRegistry>,
/// Used for `BlockingEventSource.StartComponentTree`.
// TODO(fxbug.dev/48245): this shouldn't be done for any EventSource. Only for tests.
resolve_instance_event_stream: Arc<Mutex<Option<EventStream>>>,
/// The options used to subscribe to events.
options: SubscriptionOptions,
}
impl EventSource {
/// Creates a new `EventSource` that will be used by the component identified with the given
/// `target_moniker`.
pub async fn new(
model: Weak<Model>,
options: SubscriptionOptions,
registry: Weak<EventRegistry>,
) -> Result<Self, ModelError> {
// TODO(fxbug.dev/48245): this shouldn't be done for any EventSource. Only for tests.
let resolve_instance_event_stream = Arc::new(Mutex::new(match options.sync_mode {
SyncMode::Async => None,
SyncMode::Sync => {
let registry = registry.upgrade().ok_or(EventsError::RegistryNotFound)?;
Some(registry.subscribe(&options, vec![EventType::Resolved.into()]).await?)
}
}));
Ok(Self { registry, model, options, resolve_instance_event_stream })
}
pub async fn new_for_debug(
model: Weak<Model>,
registry: Weak<EventRegistry>,
sync_mode: SyncMode,
) -> Result<Self, ModelError> {
Self::new(
model,
SubscriptionOptions::new(SubscriptionType::AboveRoot, sync_mode, ExecutionMode::Debug),
registry,
)
.await
}
/// Drops the `Resolved` event stream, thereby permitting components within the
/// realm to be started.
pub async fn start_component_tree(&mut self) {
let mut resolve_instance_event_stream = self.resolve_instance_event_stream.lock().await;
*resolve_instance_event_stream = None;
}
/// Subscribes to events provided in the `events` vector.
///
/// The client might request to subscribe to events that it's not allowed to see. Events
/// that are allowed should have been defined in its manifest and either offered to it or
/// requested from the current realm.
pub async fn subscribe(
&mut self,
events: Vec<CapabilityName>,
) -> Result<EventStream, ModelError> {
let registry = self.registry.upgrade().ok_or(EventsError::RegistryNotFound)?;
// Create an event stream for the given events
registry.subscribe(&self.options, events).await
}
/// Serves a `EventSource` FIDL protocol.
pub fn serve(self, stream: fsys::BlockingEventSourceRequestStream) {
fasync::Task::spawn(async move {
serve_event_source_sync(self, stream).await;
})
.detach();
}
}
#[async_trait]
impl CapabilityProvider for EventSource {
async fn open(
self: Box<Self>,
_flags: u32,
_open_mode: u32,
_relative_path: PathBuf,
server_end: &mut zx::Channel,
) -> Result<(), ModelError> {
let server_end = channel::take_channel(server_end);
let stream = ServerEnd::<fsys::BlockingEventSourceMarker>::new(server_end)
.into_stream()
.expect("could not convert channel into stream");
self.serve(stream);
Ok(())
}
}