blob: 56c7e1814680c78eb9dc3bb972ed972938dad029 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::model::{
component::{ComponentInstance, InstanceState},
error::ModelError,
events::synthesizer::{EventSynthesisProvider, ExtendedComponent},
hooks::{
Event, EventError, EventErrorPayload, EventPayload, EventType, Hook, HooksRegistration,
},
model::Model,
},
::routing::{event::EventFilter, rights::Rights},
async_trait::async_trait,
cm_rust::{
CapabilityName, CapabilityPath, ComponentDecl, ExposeDecl, ExposeDirectoryDecl,
ExposeSource, ExposeTarget,
},
fidl::endpoints::{Proxy, ServerEnd},
fidl_fuchsia_io as fio, fuchsia_async as fasync, fuchsia_fs, fuchsia_zircon as zx,
futures::stream::StreamExt,
log::*,
moniker::{AbsoluteMoniker, AbsoluteMonikerBase},
std::{
sync::{Arc, Mutex, Weak},
time::Duration,
},
};
// TODO(fxbug.dev/49198): The out/diagnostics directory propagation for runners includes a retry.
// The reason of this is that flutter fills the out/ directory *after*
// serving it. Therefore we need to watch that directory to notify.
// Sadly the PseudoDir exposed in the SDK (and used by flutter) returns ZX_ERR_NOT_SUPPORTED on
// Watch.
const OPEN_OUT_SUBDIR_RETRY_INITIAL_DELAY_MS: u64 = 500;
const OPEN_OUT_SUBDIR_RETRY_MAX_DELAY_MS: u64 = 15000;
const OPEN_OUT_SUBDIR_MAX_RETRIES: usize = 30;
/// Awaits for `Started` events and for each capability exposed to framework, dispatches a
/// `DirectoryReady` event.
pub struct DirectoryReadyNotifier {
model: Weak<Model>,
/// Capabilities offered by component manager that we wish to provide through `DirectoryReady`
/// events. For example, the diagnostics directory hosting inspect data.
builtin_capabilities: Mutex<Vec<(String, fio::NodeProxy)>>,
}
impl DirectoryReadyNotifier {
pub fn new(model: Weak<Model>) -> Self {
Self { model, builtin_capabilities: Mutex::new(Vec::new()) }
}
pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
vec![HooksRegistration::new(
"DirectoryReadyNotifier",
vec![EventType::Started],
Arc::downgrade(self) as Weak<dyn Hook>,
)]
}
pub fn register_component_manager_capability(
&self,
name: impl Into<String>,
node: fio::NodeProxy,
) {
if let Ok(mut guard) = self.builtin_capabilities.lock() {
guard.push((name.into(), node));
}
}
async fn on_component_started(
self: &Arc<Self>,
target_moniker: &AbsoluteMoniker,
outgoing_dir: &fio::DirectoryProxy,
decl: ComponentDecl,
) -> Result<(), ModelError> {
// Forward along errors into the new task so that dispatch can forward the
// error as an event.
let outgoing_node_result = clone_outgoing_root(&outgoing_dir, target_moniker).await;
// Don't block the handling on the event on the exposed capabilities being ready
let this = self.clone();
let target_moniker = target_moniker.clone();
fasync::Task::spawn(async move {
// If we can't find the component then we can't dispatch any DirectoryReady event,
// error or otherwise. This isn't necessarily an error as the model or component might've been
// destroyed in the intervening time, so we just exit early.
let target = match this.model.upgrade() {
Some(model) => {
if let Ok(component) = model.look_up(&target_moniker).await {
component
} else {
return;
}
}
None => return,
};
let matching_exposes = filter_matching_exposes(&decl, None);
this.dispatch_capabilities_ready(
outgoing_node_result,
&decl,
matching_exposes,
&target,
)
.await;
})
.detach();
Ok(())
}
/// Waits for the OnOpen event on the directory. This will hang until the component starts
/// serving that directory. The directory should have been cloned/opened with DESCRIBE.
async fn wait_for_on_open(
&self,
node: &fio::NodeProxy,
moniker: &AbsoluteMoniker,
path: String,
) -> Result<(), ModelError> {
let mut events = node.take_event_stream();
match events.next().await {
Some(Ok(fio::NodeEvent::OnOpen_ { s: status, info: _ })) => zx::Status::ok(status)
.map_err(|_| ModelError::open_directory_error(moniker.clone(), path)),
Some(Ok(fio::NodeEvent::OnConnectionInfo { .. })) => Ok(()),
_ => Err(ModelError::open_directory_error(moniker.clone(), path)),
}
}
/// Waits for the outgoing directory to be ready and then notifies hooks of all the capabilities
/// inside it that were exposed to the framework by the component.
async fn dispatch_capabilities_ready(
&self,
outgoing_node_result: Result<fio::NodeProxy, ModelError>,
decl: &ComponentDecl,
matching_exposes: Vec<&ExposeDecl>,
target: &Arc<ComponentInstance>,
) {
let directory_ready_events =
self.create_events(outgoing_node_result, decl, matching_exposes, target).await;
for directory_ready_event in directory_ready_events {
target.hooks.dispatch(&directory_ready_event).await.unwrap_or_else(|e| {
warn!("Error notifying directory ready for {}: {:?}", target.abs_moniker, e)
});
}
}
async fn create_events(
&self,
outgoing_node_result: Result<fio::NodeProxy, ModelError>,
decl: &ComponentDecl,
matching_exposes: Vec<&ExposeDecl>,
target: &Arc<ComponentInstance>,
) -> Vec<Event> {
// Forward along the result for opening the outgoing directory into the DirectoryReady
// dispatch in order to propagate any potential errors as an event.
let outgoing_dir_result = async move {
let outgoing_node = outgoing_node_result?;
self.wait_for_on_open(&outgoing_node, &target.abs_moniker, "/".to_string()).await?;
fuchsia_fs::node_to_directory(outgoing_node)
.map_err(|_| ModelError::open_directory_error(target.abs_moniker.clone(), "/"))
}
.await;
let mut events = Vec::new();
for expose_decl in matching_exposes {
let event = match expose_decl {
ExposeDecl::Directory(ExposeDirectoryDecl { source_name, target_name, .. }) => {
let (source_path, rights) = {
if let Some(directory_decl) = decl.find_directory_source(source_name) {
(
directory_decl
.source_path
.as_ref()
.expect("missing directory source path"),
directory_decl.rights,
)
} else {
panic!("Missing directory declaration for expose: {:?}", decl);
}
};
self.create_event(
&target,
outgoing_dir_result.as_ref(),
Rights::from(rights),
source_path,
target_name,
)
.await
}
_ => {
unreachable!("should have skipped above");
}
};
events.push(event);
}
events
}
/// Creates an event with the directory at the given `target_path` inside the provided
/// outgoing directory if the capability is available.
async fn create_event(
&self,
target: &Arc<ComponentInstance>,
outgoing_dir_result: Result<&fio::DirectoryProxy, &ModelError>,
rights: Rights,
source_path: &CapabilityPath,
target_name: &CapabilityName,
) -> Event {
let target_name = target_name.to_string();
let node_result = async move {
// DirProxy.open fails on absolute paths.
let source_path = source_path.to_string();
let outgoing_dir = outgoing_dir_result.map_err(|e| e.clone())?;
let mut current_delay = 0;
let mut retries = 0;
loop {
match self.try_opening(&outgoing_dir, &source_path, &rights).await {
Ok(node) => return Ok(node),
Err(TryOpenError::Fidl(_)) => {
break Err(ModelError::open_directory_error(
target.abs_moniker.clone(),
source_path.clone(),
));
}
Err(TryOpenError::Status(status)) => {
// If the directory doesn't exist, retry.
if status == zx::Status::NOT_FOUND {
if retries < OPEN_OUT_SUBDIR_MAX_RETRIES {
retries += 1;
current_delay = std::cmp::min(
OPEN_OUT_SUBDIR_RETRY_MAX_DELAY_MS,
current_delay + OPEN_OUT_SUBDIR_RETRY_INITIAL_DELAY_MS,
);
fasync::Timer::new(Duration::from_millis(current_delay)).await;
continue;
}
}
break Err(ModelError::open_directory_error(
target.abs_moniker.clone(),
source_path.clone(),
));
}
}
}
}
.await;
match node_result {
Ok(node) => {
Event::new(&target, Ok(EventPayload::DirectoryReady { name: target_name, node }))
}
Err(e) => Event::new(
&target,
Err(EventError::new(&e, EventErrorPayload::DirectoryReady { name: target_name })),
),
}
}
async fn try_opening(
&self,
outgoing_dir: &fio::DirectoryProxy,
source_path: &str,
rights: &Rights,
) -> Result<fio::NodeProxy, TryOpenError> {
let canonicalized_path = fuchsia_fs::canonicalize_path(&source_path);
let (node, server_end) = fidl::endpoints::create_proxy::<fio::NodeMarker>().unwrap();
outgoing_dir
.open(
rights.into_legacy() | fio::OpenFlags::DESCRIBE,
fio::MODE_TYPE_DIRECTORY,
&canonicalized_path,
ServerEnd::new(server_end.into_channel()),
)
.map_err(TryOpenError::Fidl)?;
let mut events = node.take_event_stream();
match events.next().await {
Some(Ok(fio::NodeEvent::OnOpen_ { s: status, .. })) => {
let zx_status = zx::Status::from_raw(status);
if zx_status != zx::Status::OK {
return Err(TryOpenError::Status(zx_status));
}
}
Some(Ok(fio::NodeEvent::OnConnectionInfo { .. })) => {}
_ => {
return Err(TryOpenError::Status(zx::Status::PEER_CLOSED));
}
}
Ok(node)
}
async fn provide_builtin(&self, filter: &EventFilter) -> Vec<Event> {
if let Ok(capabilities) = self.builtin_capabilities.lock() {
(*capabilities)
.iter()
.filter_map(|(name, node)| {
if !filter.contains("name", vec![name.to_string()]) {
return None;
}
let (node_clone, server_end) = fidl::endpoints::create_proxy().unwrap();
let event = node
.clone(fio::OpenFlags::CLONE_SAME_RIGHTS, server_end)
.map(|_| {
Event::new_builtin(Ok(EventPayload::DirectoryReady {
name: name.clone(),
node: node_clone,
}))
})
.unwrap_or_else(|_| {
let err =
ModelError::clone_node_error(AbsoluteMoniker::root(), name.clone());
Event::new_builtin(Err(EventError::new(
&err,
EventErrorPayload::DirectoryReady { name: name.clone() },
)))
});
Some(event)
})
.collect()
} else {
vec![]
}
}
}
fn filter_matching_exposes<'a>(
decl: &'a ComponentDecl,
filter: Option<&EventFilter>,
) -> Vec<&'a ExposeDecl> {
decl.exposes
.iter()
.filter(|expose_decl| {
match expose_decl {
ExposeDecl::Directory(ExposeDirectoryDecl {
source, target, target_name, ..
}) => {
if let Some(filter) = filter {
if !filter.contains("name", vec![target_name.to_string()]) {
return false;
}
}
if target != &ExposeTarget::Framework || source != &ExposeSource::Self_ {
return false;
}
}
_ => {
return false;
}
}
true
})
.collect()
}
async fn clone_outgoing_root(
outgoing_dir: &fio::DirectoryProxy,
target_moniker: &AbsoluteMoniker,
) -> Result<fio::NodeProxy, ModelError> {
let outgoing_dir = fuchsia_fs::clone_directory(
&outgoing_dir,
fio::OpenFlags::CLONE_SAME_RIGHTS | fio::OpenFlags::DESCRIBE,
)
.map_err(|_| ModelError::open_directory_error(target_moniker.clone(), "/"))?;
let outgoing_dir_channel = outgoing_dir
.into_channel()
.map_err(|_| ModelError::open_directory_error(target_moniker.clone(), "/"))?;
Ok(fio::NodeProxy::from_channel(outgoing_dir_channel))
}
#[async_trait]
impl EventSynthesisProvider for DirectoryReadyNotifier {
async fn provide(&self, component: ExtendedComponent, filter: &EventFilter) -> Vec<Event> {
let component = match component {
ExtendedComponent::ComponentManager => {
return self.provide_builtin(filter).await;
}
ExtendedComponent::ComponentInstance(component) => component,
};
let decl = match *component.lock_state().await {
InstanceState::Resolved(ref s) => s.decl().clone(),
InstanceState::New | InstanceState::Discovered | InstanceState::Destroyed => {
return vec![];
}
};
let matching_exposes = filter_matching_exposes(&decl, Some(&filter));
if matching_exposes.is_empty() {
// Short-circuit if there are no matching exposes so we don't wait for the component's
// outgoing directory if there are no DirectoryReady events to send.
return vec![];
}
let maybe_outgoing_node_result = async {
let execution = component.lock_execution().await;
if execution.runtime.is_none() {
return None;
}
let runtime = execution.runtime.as_ref().unwrap();
let out_dir = match runtime.outgoing_dir.as_ref().ok_or(
ModelError::open_directory_error(component.abs_moniker.clone(), "/".to_string()),
) {
Ok(out_dir) => out_dir,
Err(e) => return Some(Err(e)),
};
Some(clone_outgoing_root(&out_dir, &component.abs_moniker).await)
}
.await;
let outgoing_node_result = match maybe_outgoing_node_result {
None => return vec![],
Some(result) => result,
};
self.create_events(outgoing_node_result, &decl, matching_exposes, &component).await
}
}
enum TryOpenError {
Fidl(fidl::Error),
Status(zx::Status),
}
#[async_trait]
impl Hook for DirectoryReadyNotifier {
async fn on(self: Arc<Self>, event: &Event) -> Result<(), ModelError> {
let target_moniker = event
.target_moniker
.unwrap_instance_moniker_or(ModelError::UnexpectedComponentManagerMoniker)?;
match &event.result {
Ok(EventPayload::Started { runtime, component_decl, .. }) => {
if filter_matching_exposes(&component_decl, None).is_empty() {
// Short-circuit if there are no matching exposes so we don't spawn a task
// if there's nothing to do. In particular, don't wait for the component's
// outgoing directory if there are no DirectoryReady events to send.
return Ok(());
}
if let Some(outgoing_dir) = &runtime.outgoing_dir {
self.on_component_started(
&target_moniker,
outgoing_dir,
component_decl.clone(),
)
.await?;
}
}
_ => {}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{
environment::Environment,
testing::test_helpers::{TestEnvironmentBuilder, TestModelResult},
};
use cm_rust_testing::ComponentDeclBuilder;
use std::convert::TryFrom;
#[fuchsia::test]
async fn verify_get_event_retry() {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>().unwrap();
let _task = fasync::Task::spawn(async move {
serve_fake_dir(stream).await;
});
let components = vec![("root", ComponentDeclBuilder::new().build())];
let TestModelResult { model, .. } =
TestEnvironmentBuilder::new().set_components(components).build().await;
let component = Arc::new(ComponentInstance::new_root(
Environment::empty(),
Weak::new(),
Weak::new(),
"test:///root".to_string(),
));
let notifier = DirectoryReadyNotifier::new(Arc::downgrade(&model));
let event = notifier
.create_event(
&component,
Ok(&proxy),
Rights::from(*routing::rights::READ_RIGHTS),
&CapabilityPath::try_from("/foo").unwrap(),
&CapabilityName::from("foo"),
)
.await;
assert_eq!(event.target_moniker, AbsoluteMoniker::root().into());
assert_eq!(event.component_url, "test:///root");
let payload = event.result.expect("got ok result");
match payload {
EventPayload::DirectoryReady { name, .. } => {
assert_eq!(name, "foo");
}
other => {
panic!("Unexpected payload: {:?}", other);
}
}
}
/// Serves a fake directory that returns NOT_FOUND for the given path until the third request.
async fn serve_fake_dir(mut request_stream: fio::DirectoryRequestStream) {
let mut requests = 0;
while let Some(req) = request_stream.next().await {
match req {
Ok(fio::DirectoryRequest::Open { path, object, .. }) => {
assert_eq!("foo", path);
let (_stream, control_handle) =
object.into_stream_and_control_handle().unwrap();
if requests >= 3 {
control_handle
.send_on_open_(
zx::Status::OK.into_raw(),
Some(&mut fio::NodeInfo::Directory(fio::DirectoryObject {})),
)
.unwrap();
} else {
control_handle
.send_on_open_(zx::Status::NOT_FOUND.into_raw(), None)
.unwrap();
}
requests += 1;
}
_ => {}
}
}
}
}