blob: 1d07b2e1df687ff28b91e830dca47bbb80154b11 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{format_err, Context, Error},
fidl::endpoints::ServerEnd,
fidl_fuchsia_component_runner as fcrunner, fidl_fuchsia_data as fdata, fidl_fuchsia_io as fio,
fuchsia_async as fasync,
futures::lock::Mutex,
futures::{future::BoxFuture, TryStreamExt},
std::{collections::HashMap, sync::Arc},
tracing::*,
vfs::execution_scope::ExecutionScope,
};
pub const RUNNER_NAME: &'static str = "realm_builder";
pub const LOCAL_COMPONENT_ID_KEY: &'static str = "local_component_id";
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LocalComponentId(String);
impl From<LocalComponentId> for String {
fn from(local_id: LocalComponentId) -> Self {
local_id.0
}
}
#[derive(Clone)]
pub enum ComponentImplementer {
RunnerProxy(Arc<Mutex<Option<fcrunner::ComponentRunnerProxy>>>),
Builtin(
Arc<
dyn Fn(ServerEnd<fio::DirectoryMarker>) -> BoxFuture<'static, ()>
+ Sync
+ Send
+ 'static,
>,
),
}
pub struct Runner {
next_local_component_id: Mutex<u64>,
local_component_proxies: Mutex<HashMap<String, ComponentImplementer>>,
execution_scope: ExecutionScope,
}
impl Runner {
pub fn new() -> Arc<Self> {
Arc::new(Self {
next_local_component_id: Mutex::new(0),
local_component_proxies: Mutex::new(HashMap::new()),
execution_scope: ExecutionScope::new(),
})
}
#[cfg(test)]
pub async fn local_component_proxies(
self: &Arc<Self>,
) -> HashMap<String, ComponentImplementer> {
self.local_component_proxies.lock().await.clone()
}
pub async fn register_local_component(
self: &Arc<Self>,
runner_proxy: Arc<Mutex<Option<fcrunner::ComponentRunnerProxy>>>,
) -> LocalComponentId {
let mut next_local_component_id_guard = self.next_local_component_id.lock().await;
let mut local_component_proxies_guard = self.local_component_proxies.lock().await;
let local_component_id = format!("{}", *next_local_component_id_guard);
*next_local_component_id_guard += 1;
local_component_proxies_guard
.insert(local_component_id.clone(), ComponentImplementer::RunnerProxy(runner_proxy));
LocalComponentId(local_component_id)
}
pub async fn register_builtin_component<M>(
self: &Arc<Self>,
implementation: M,
) -> LocalComponentId
where
M: Fn(ServerEnd<fio::DirectoryMarker>) -> BoxFuture<'static, ()> + Sync + Send + 'static,
{
let mut next_local_component_id_guard = self.next_local_component_id.lock().await;
let mut local_component_proxies_guard = self.local_component_proxies.lock().await;
let local_component_id = format!("{}", *next_local_component_id_guard);
*next_local_component_id_guard += 1;
local_component_proxies_guard.insert(
local_component_id.clone(),
ComponentImplementer::Builtin(Arc::new(implementation)),
);
LocalComponentId(local_component_id)
}
pub async fn delete_component(self: &Arc<Self>, component_to_delete: &LocalComponentId) {
self.local_component_proxies.lock().await.remove(&component_to_delete.0);
}
pub fn run_runner_service(self: &Arc<Self>, stream: fcrunner::ComponentRunnerRequestStream) {
let self_ref = self.clone();
fasync::Task::local(async move {
if let Err(err) = self_ref.handle_runner_request_stream(stream).await {
warn!(%err, "`ComponentRunner` server unexpectedly failed.");
}
})
.detach();
}
async fn handle_runner_request_stream(
self: &Arc<Self>,
mut stream: fcrunner::ComponentRunnerRequestStream,
) -> Result<(), Error> {
while let Some(req) = stream.try_next().await? {
match req {
fcrunner::ComponentRunnerRequest::Start { start_info, controller, .. } => {
let program = start_info
.program
.clone()
.ok_or(format_err!("`program` is missing from `StartInfo`."))?;
if start_info.ns.is_none() {
return Err(format_err!("Namespace is missing from `StartInfo`."));
}
if start_info.outgoing_dir.is_none() {
return Err(format_err!("Outgoing directory is missing from `StartInfo`."));
}
if start_info.runtime_dir.is_none() {
return Err(format_err!("Runtime directory is missing from `StartInfo`."));
}
match extract_local_component_id(program)? {
LocalComponentId(local_component_id) => {
self.launch_local_component(local_component_id, start_info, controller)
.await?;
}
}
}
}
}
Ok(())
}
async fn launch_local_component(
self: &Arc<Self>,
local_component_id: String,
mut start_info: fcrunner::ComponentStartInfo,
controller: ServerEnd<fcrunner::ComponentControllerMarker>,
) -> Result<(), Error> {
let local_component_proxies_guard = self.local_component_proxies.lock().await;
let local_component_control_handle_or_runner_proxy = local_component_proxies_guard
.get(&local_component_id)
.ok_or(format_err!(
"Received non-existent local component \"{}\".",
local_component_id
))?
.clone();
match local_component_control_handle_or_runner_proxy {
ComponentImplementer::RunnerProxy(runner_proxy_placeholder) => {
let runner_proxy_placeholder_guard = runner_proxy_placeholder.lock().await;
if runner_proxy_placeholder_guard.is_none() {
return Err(format_err!("Runner request received for a local component before Builder.Build was called, this should be impossible."));
}
let runner_proxy = runner_proxy_placeholder_guard.as_ref().unwrap();
if let Some(mut program) = start_info.program.as_mut() {
remove_local_component_id(&mut program);
}
runner_proxy
.start(start_info, controller)
.context("Failed to send start request for local component to client.")?;
}
ComponentImplementer::Builtin(implementation) => {
self.execution_scope.spawn(run_builtin_controller(
controller.into_stream()?,
fasync::Task::local((*implementation)(start_info.outgoing_dir.unwrap())),
));
}
};
Ok(())
}
}
/// Extracts either the value for the `local_component_id` key from the provided
/// dictionary. It is an error for anything else to be present in the dictionary.
fn extract_local_component_id<'a>(dict: fdata::Dictionary) -> Result<LocalComponentId, Error> {
let entries = dict.entries.ok_or(format_err!("program section is empty"))?;
for entry in entries.into_iter() {
let entry_value =
entry.value.map(|box_| *box_).ok_or(format_err!("program section is missing value"))?;
match (entry.key.as_str(), entry_value) {
(LOCAL_COMPONENT_ID_KEY, fdata::DictionaryValue::Str(s)) => {
return Ok(LocalComponentId(s.clone()))
}
_ => continue,
}
}
return Err(format_err!("malformed program section"));
}
fn remove_local_component_id(dict: &mut fdata::Dictionary) {
if let Some(entries) = &mut dict.entries {
*entries = entries
.drain(..)
.filter(|entry| entry.key.as_str() != LOCAL_COMPONENT_ID_KEY)
.collect();
}
}
async fn run_builtin_controller(
mut stream: fcrunner::ComponentControllerRequestStream,
builtin_task: fasync::Task<()>,
) {
if let Some(req) =
stream.try_next().await.expect("invalid controller request from component manager")
{
match req {
fcrunner::ComponentControllerRequest::Stop { .. }
| fcrunner::ComponentControllerRequest::Kill { .. } => {
// The `return` would have dropped this anyway, but let's do it explicitly to help
// convey to the reader that the whole point here is that the task stops running
// when a stop or kill command is received.
drop(builtin_task);
}
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
assert_matches::assert_matches,
fidl::endpoints::{create_endpoints, create_proxy_and_stream},
futures::{channel::mpsc, FutureExt, SinkExt, StreamExt},
};
// There are two separate `fuchsia.component.runner/ComponentRunner` channels for every local
// component that's launched: one connecting component manager to the realm builder runner, and
// one connecting the realm builder runner to a client. This test feeds a launch request into
// the client end of the first channel pair (pretending to be component manager), and observes
// the request be sent out by the realm builder runner on the server end of the second pair
// (pretending to be the realm builder client).
#[fuchsia::test]
async fn launch_local_component() {
let runner = Runner::new();
let (client_runner_proxy, mut client_runner_request_stream) =
create_proxy_and_stream::<fcrunner::ComponentRunnerMarker>().unwrap();
let LocalComponentId(local_component_id) =
runner.register_local_component(Arc::new(Mutex::new(Some(client_runner_proxy)))).await;
let (server_runner_proxy, server_runner_request_stream) =
create_proxy_and_stream::<fcrunner::ComponentRunnerMarker>().unwrap();
let _runner_request_stream_task = fasync::Task::local(async move {
if let Err(e) = runner.handle_runner_request_stream(server_runner_request_stream).await
{
panic!("error returned by request stream: {:?}", e);
}
});
let example_program = fdata::Dictionary {
entries: Some(vec![
fdata::DictionaryEntry {
key: "hippos".to_string(),
value: Some(Box::new(fdata::DictionaryValue::Str("rule!".to_string()))),
},
fdata::DictionaryEntry {
key: LOCAL_COMPONENT_ID_KEY.to_string(),
value: Some(Box::new(fdata::DictionaryValue::Str(local_component_id))),
},
]),
..Default::default()
};
let (_controller_client_end, controller_server_end) =
create_endpoints::<fcrunner::ComponentControllerMarker>();
let (_outgoing_dir_client_end, outgoing_dir_server_end) =
create_endpoints::<fio::DirectoryMarker>();
let (_runtime_dir_client_end, runtime_dir_server_end) =
create_endpoints::<fio::DirectoryMarker>();
server_runner_proxy
.start(
fcrunner::ComponentStartInfo {
program: Some(example_program),
ns: Some(vec![]),
outgoing_dir: Some(outgoing_dir_server_end),
runtime_dir: Some(runtime_dir_server_end),
..Default::default()
},
controller_server_end,
)
.expect("failed to write start message");
assert_matches!(
client_runner_request_stream
.try_next()
.await
.expect("failed to read from client_runner_request_stream"),
Some(fcrunner::ComponentRunnerRequest::Start { start_info, .. })
if start_info.program == Some(fdata::Dictionary {
// The `LOCAL_COMPONENT_ID_KEY` entry gets removed from the program section
// before sending it off to the client, as this value is only used for
// bookkeeping internal to the realm builder runner.
entries: Some(vec![
fdata::DictionaryEntry {
key: "hippos".to_string(),
value: Some(Box::new(fdata::DictionaryValue::Str("rule!".to_string()))),
},
]),
..Default::default()
})
);
}
#[fuchsia::test]
async fn launch_builtin_component() {
let runner = Runner::new();
let (sender, mut receiver) = mpsc::channel(1);
let LocalComponentId(local_component_id) = runner
.register_builtin_component(move |_outgoing_dir| {
let mut sender = sender.clone();
async move {
sender.send(()).await.expect("failed to send that builtin was invoked");
}
.boxed()
})
.await;
let (server_runner_proxy, server_runner_request_stream) =
create_proxy_and_stream::<fcrunner::ComponentRunnerMarker>().unwrap();
let _runner_request_stream_task = fasync::Task::local(async move {
if let Err(e) = runner.handle_runner_request_stream(server_runner_request_stream).await
{
panic!("error returned by request stream: {:?}", e);
}
});
let example_program = fdata::Dictionary {
entries: Some(vec![fdata::DictionaryEntry {
key: LOCAL_COMPONENT_ID_KEY.to_string(),
value: Some(Box::new(fdata::DictionaryValue::Str(local_component_id))),
}]),
..Default::default()
};
let (_controller_client_end, controller_server_end) =
create_endpoints::<fcrunner::ComponentControllerMarker>();
let (_outgoing_dir_client_end, outgoing_dir_server_end) =
create_endpoints::<fio::DirectoryMarker>();
let (_runtime_dir_client_end, runtime_dir_server_end) =
create_endpoints::<fio::DirectoryMarker>();
server_runner_proxy
.start(
fcrunner::ComponentStartInfo {
program: Some(example_program),
ns: Some(vec![]),
outgoing_dir: Some(outgoing_dir_server_end),
runtime_dir: Some(runtime_dir_server_end),
..Default::default()
},
controller_server_end,
)
.expect("failed to write start message");
receiver.next().await.expect("failed to receive that builtin was invoked");
}
}