blob: 0a4e474b67dd65fd5550ed2233d711301a44e7f2 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
bedrock::program::Program, builtin::runner::BuiltinRunnerFactory, model::resolver::Resolver,
},
::namespace::Namespace,
::routing::{
policy::ScopedPolicyChecker,
resolving::{ComponentAddress, ResolvedComponent, ResolvedPackage, ResolverError},
},
anyhow::format_err,
async_trait::async_trait,
cm_rust::{ComponentDecl, ConfigValuesData},
fidl::{
endpoints::{create_endpoints, ClientEnd, RequestStream, ServerEnd},
epitaph::ChannelEpitaphExt,
prelude::*,
},
fidl_fuchsia_component_runner as fcrunner,
fidl_fuchsia_diagnostics_types::{
ComponentDiagnostics, ComponentTasks, Task as DiagnosticsTask,
},
fidl_fuchsia_io as fio, fuchsia_async as fasync,
fuchsia_zircon::{self as zx, AsHandleRef, HandleBased, Koid},
futures::{
channel::oneshot,
future::{AbortHandle, Abortable},
lock::Mutex,
prelude::*,
},
std::{
collections::{HashMap, HashSet},
mem,
sync::{Arc, Mutex as SyncMutex},
},
tracing::warn,
vfs::{
directory::{entry::OpenRequest, entry_container::Directory},
execution_scope::ExecutionScope,
file::vmo::read_only,
pseudo_directory,
service::endpoint,
},
};
#[derive(Debug, Clone)]
pub struct MockResolver {
components: HashMap<String, ComponentDecl>,
configs: HashMap<String, ConfigValuesData>,
blockers: HashMap<String, Arc<Mutex<Option<(oneshot::Sender<()>, oneshot::Receiver<()>)>>>>,
}
impl MockResolver {
pub fn new() -> Self {
MockResolver {
components: HashMap::new(),
configs: HashMap::new(),
blockers: HashMap::new(),
}
}
async fn resolve_async(
&self,
component_url: String,
) -> Result<ResolvedComponent, ResolverError> {
const NAME_PREFIX: &str = "test:///";
debug_assert!(component_url.starts_with(NAME_PREFIX), "invalid component url");
let (_, name) = component_url.split_at(NAME_PREFIX.len());
let decl = self
.components
.get(name)
.ok_or(ResolverError::manifest_not_found(format_err!("not in the hashmap")))?;
let config_values = match &decl.config {
None => None,
Some(config_decl) => match &config_decl.value_source {
cm_rust::ConfigValueSource::Capabilities(_) => None,
cm_rust::ConfigValueSource::PackagePath(path) => Some(
self.configs
.get(path)
.ok_or_else(|| {
ResolverError::manifest_invalid(format_err!(
"config values not provided"
))
})?
.clone(),
),
},
};
let (client, server): (ClientEnd<fio::DirectoryMarker>, ServerEnd<fio::DirectoryMarker>) =
create_endpoints();
let sub_dir = pseudo_directory!(
"fake_file" => read_only(b"content"),
);
sub_dir.open(
ExecutionScope::new(),
fio::OpenFlags::RIGHT_READABLE
| fio::OpenFlags::RIGHT_WRITABLE
| fio::OpenFlags::DIRECTORY,
vfs::path::Path::dot(),
ServerEnd::new(server.into_channel()),
);
if let Some(blocker) = self.blockers.get(name) {
let mut blocker = blocker.lock().await;
if let Some(blocker) = blocker.take() {
let (send, recv) = blocker;
send.send(()).unwrap();
let _ = recv.await.unwrap();
}
}
Ok(ResolvedComponent {
resolved_url: format!("test:///{}_resolved", name),
context_to_resolve_children: None,
decl: decl.clone(),
package: Some(ResolvedPackage { url: "pkg".to_string(), directory: client }),
config_values,
abi_revision: Some(
version_history::HISTORY.get_example_supported_version_for_tests().abi_revision,
),
})
}
pub fn add_component(&mut self, name: &str, component: ComponentDecl) {
self.components.insert(name.to_string(), component);
}
pub fn add_config_values(&mut self, path: &str, values: ConfigValuesData) {
self.configs.insert(path.to_string(), values);
}
pub fn add_blocker(
&mut self,
path: &str,
send: oneshot::Sender<()>,
recv: oneshot::Receiver<()>,
) {
self.blockers.insert(path.to_string(), Arc::new(Mutex::new(Some((send, recv)))));
}
pub fn get_component_decl(&self, name: &str) -> Option<ComponentDecl> {
self.components.get(name).map(Clone::clone)
}
}
#[async_trait]
impl Resolver for MockResolver {
async fn resolve(
&self,
component_address: &ComponentAddress,
) -> Result<ResolvedComponent, ResolverError> {
self.resolve_async(component_address.url().to_string()).await
}
}
pub type HostFn = Box<dyn Fn(ServerEnd<fio::DirectoryMarker>) + Send + Sync>;
pub type ControllerResponseFn = Box<dyn Fn() -> ControllerActionResponse + Send + Sync>;
pub type ManagedNamespace = Mutex<Namespace>;
struct MockRunnerInner {
/// List of URLs started by this runner instance.
urls_run: Vec<String>,
/// Vector of waiters that wish to be notified when a new URL is run (used by `wait_for_urls`).
url_waiters: Vec<futures::channel::oneshot::Sender<()>>,
/// Namespace for each component, mapping resolved URL to the component's namespace.
namespaces: HashMap<String, Arc<Mutex<Namespace>>>,
/// Functions for serving the `outgoing` and `runtime` directories
/// of a given component. When a component is started, these
/// functions will be called with the server end of the directories.
outgoing_host_fns: HashMap<String, Arc<HostFn>>,
runtime_host_fns: HashMap<String, Arc<HostFn>>,
/// Set of URLs that the MockRunner will fail the `start` call for.
failing_urls: HashSet<String>,
/// Functions for setting the controller's response to stop and kill
/// requests. If not found, the controller will reply with success.
controller_response_fns: HashMap<String, Arc<ControllerResponseFn>>,
/// Map from the `Koid` of `Channel` owned by a `ComponentController` to
/// the messages received by that controller.
runner_requests: Arc<Mutex<HashMap<Koid, Vec<ControlMessage>>>>,
controller_abort_handles: HashMap<Koid, AbortHandle>,
controller_control_handles: HashMap<Koid, fcrunner::ComponentControllerControlHandle>,
last_checker: Option<ScopedPolicyChecker>,
}
pub struct MockRunner {
// The internal runner state.
//
// Inner state is guarded by a std::sync::Mutex to avoid helper
// functions needing "async" (and propagating to callers).
// std::sync::MutexGuard doesn't have the "Send" trait, so the
// compiler will prevent us calling ".await" while holding the lock.
inner: SyncMutex<MockRunnerInner>,
}
impl MockRunner {
pub fn new() -> Self {
MockRunner {
inner: SyncMutex::new(MockRunnerInner {
urls_run: vec![],
url_waiters: vec![],
namespaces: HashMap::new(),
outgoing_host_fns: HashMap::new(),
runtime_host_fns: HashMap::new(),
failing_urls: HashSet::new(),
runner_requests: Arc::new(Mutex::new(HashMap::new())),
controller_abort_handles: HashMap::new(),
controller_control_handles: HashMap::new(),
last_checker: None,
controller_response_fns: HashMap::new(),
}),
}
}
/// Cause the URL `url` to return an error when started.
pub fn add_failing_url(&self, url: &str) {
self.inner.lock().unwrap().failing_urls.insert(url.to_string());
}
/// Cause the component `name` to return an error when started.
pub fn cause_failure(&self, name: &str) {
self.add_failing_url(&format!("test:///{}_resolved", name))
}
/// Register `function` to serve the outgoing directory of component with `url`.
pub fn add_host_fn(&self, url: &str, function: HostFn) {
self.inner.lock().unwrap().outgoing_host_fns.insert(url.to_string(), Arc::new(function));
}
/// Register `function` to override the controller response for the component with URL `url`.
pub fn add_controller_response(&self, url: &str, function: ControllerResponseFn) {
self.inner
.lock()
.unwrap()
.controller_response_fns
.insert(url.to_string(), Arc::new(function));
}
/// Get the input namespace for component `name`.
pub fn get_namespace(&self, name: &str) -> Option<Arc<ManagedNamespace>> {
self.inner.lock().unwrap().namespaces.get(name).map(Arc::clone)
}
pub fn get_request_map(&self) -> Arc<Mutex<HashMap<Koid, Vec<ControlMessage>>>> {
self.inner.lock().unwrap().runner_requests.clone()
}
/// Returns a future that completes when `expected_url` is launched by the runner.
pub async fn wait_for_url(&self, expected_url: &str) {
self.wait_for_urls(&[expected_url]).await
}
/// Returns a future that completes when `expected_urls` were launched by the runner, in any
/// order.
pub async fn wait_for_urls(&self, expected_urls: &[&str]) {
loop {
let (sender, receiver) = oneshot::channel();
{
let mut inner = self.inner.lock().unwrap();
let expected_urls: HashSet<&str> = expected_urls.iter().map(|s| *s).collect();
let urls_run: HashSet<&str> = inner.urls_run.iter().map(|s| s as &str).collect();
if expected_urls.is_subset(&urls_run) {
return;
} else {
inner.url_waiters.push(sender);
}
}
receiver.await.expect("failed to receive url notice")
}
}
/// If the runner has ran a component with this URL, forget this fact.
/// This is useful when `wait_for_url` is to be used repeatedly to run a
/// component with the same URL.
pub fn reset_wait_for_url(&self, expected_url: &str) {
let mut inner = self.inner.lock().unwrap();
inner.urls_run.retain(|url| url != expected_url);
}
pub fn abort_controller(&self, koid: &Koid) {
let mut state = self.inner.lock().unwrap();
state.controller_control_handles.remove(koid).expect("koid was not available");
let handle = state.controller_abort_handles.get(koid).expect("koid was not available");
handle.abort();
}
/// Sends a `OnEscrow` event on the controller channel identified by `koid`.
pub fn send_on_escrow(
&self,
koid: &Koid,
request: fcrunner::ComponentControllerOnEscrowRequest,
) {
let state = self.inner.lock().unwrap();
let handle = state.controller_control_handles.get(koid).expect("koid was not available");
handle.send_on_escrow(request).unwrap();
}
async fn start(
&self,
start_info: fcrunner::ComponentStartInfo,
server_end: ServerEnd<fcrunner::ComponentControllerMarker>,
) {
let outgoing_host_fn;
let runtime_host_fn;
let runner_requests;
let resolved_url = start_info.resolved_url.unwrap();
// The koid is the only unique piece of information we have about a
// component start request. Two start requests for the same component
// URL look identical to the Runner, the only difference being the
// Channel passed to the Runner to use for the ComponentController
// protocol.
let channel_koid = server_end.as_handle_ref().basic_info().expect("basic info failed").koid;
{
let mut state = self.inner.lock().unwrap();
// Trigger a failure if previously requested.
if state.failing_urls.contains(&resolved_url) {
let status = zx::Status::UNAVAILABLE;
server_end.into_channel().close_with_epitaph(status).unwrap();
return;
}
// Fetch host functions, which will provide the outgoing and runtime directories
// for the component.
//
// If functions were not provided, then start_info.outgoing_dir will be
// automatically closed once it goes out of scope at the end of this
// function.
outgoing_host_fn = state.outgoing_host_fns.get(&resolved_url).map(Arc::clone);
runtime_host_fn = state.runtime_host_fns.get(&resolved_url).map(Arc::clone);
runner_requests = state.runner_requests.clone();
// Create a namespace for the component.
state.namespaces.insert(
resolved_url.clone(),
Arc::new(Mutex::new(start_info.ns.unwrap().try_into().unwrap())),
);
let controller = match state.controller_response_fns.get(&resolved_url).map(|f| f()) {
Some(response) => MockController::new_with_responses(
server_end,
runner_requests,
channel_koid,
response.clone(),
response,
),
None => MockController::new(server_end, runner_requests, channel_koid),
};
let control_handle = controller.request_stream.control_handle();
let abort_handle = controller.serve();
state.controller_abort_handles.insert(channel_koid, abort_handle);
state.controller_control_handles.insert(channel_koid, control_handle);
// Start serving the outgoing/runtime directories.
if let Some(outgoing_host_fn) = outgoing_host_fn {
outgoing_host_fn(start_info.outgoing_dir.unwrap());
}
if let Some(runtime_host_fn) = runtime_host_fn {
runtime_host_fn(start_info.runtime_dir.unwrap());
}
// Record that this URL has been started.
state.urls_run.push(resolved_url.clone());
let url_waiters = mem::replace(&mut state.url_waiters, vec![]);
for waiter in url_waiters {
waiter.send(()).expect("failed to send url notice");
}
}
}
}
impl BuiltinRunnerFactory for MockRunner {
fn get_scoped_runner(
self: Arc<Self>,
checker: ScopedPolicyChecker,
open_request: OpenRequest<'_>,
) -> Result<(), zx::Status> {
{
let mut state = self.inner.lock().unwrap();
state.last_checker = Some(checker);
}
open_request.open_service(endpoint(move |scope, server_end| {
let mut stream = fcrunner::ComponentRunnerRequestStream::from_channel(server_end);
let runner = self.clone();
scope.spawn(async move {
while let Ok(Some(request)) = stream.try_next().await {
let fcrunner::ComponentRunnerRequest::Start { start_info, controller, .. } =
request;
runner.start(start_info, controller).await;
}
});
}))
}
}
#[derive(Debug, PartialEq, Clone)]
pub enum ControlMessage {
Stop,
Kill,
}
#[derive(Clone)]
/// What the MockController should do when it receives a message.
pub struct ControllerActionResponse {
pub close_channel: bool,
pub delay: Option<zx::Duration>,
}
pub struct MockController {
pub messages: Arc<Mutex<HashMap<Koid, Vec<ControlMessage>>>>,
request_stream: fcrunner::ComponentControllerRequestStream,
koid: Koid,
stop_resp: ControllerActionResponse,
kill_resp: ControllerActionResponse,
}
impl MockController {
/// Create a `MockController` that listens to the `server_end` and inserts
/// `ControlMessage`'s into the Vec in the HashMap keyed under the provided
/// `Koid`. When either a request to stop or kill a component is received
/// the `MockController` will close the control channel immediately.
pub fn new(
server_end: ServerEnd<fcrunner::ComponentControllerMarker>,
messages: Arc<Mutex<HashMap<Koid, Vec<ControlMessage>>>>,
koid: Koid,
) -> MockController {
Self::new_with_responses(
server_end,
messages,
koid,
ControllerActionResponse { close_channel: true, delay: None },
ControllerActionResponse { close_channel: true, delay: None },
)
}
/// Create a MockController that listens to the `server_end` and inserts
/// `ControlMessage`'s into the Vec in the HashMap keyed under the provided
/// `Koid`. The `stop_response` controls the delay used before taking any
/// action on the control channel when a request to stop is received. The
/// `kill_response` provides the same control when the a request to kill is
/// received.
pub fn new_with_responses(
server_end: ServerEnd<fcrunner::ComponentControllerMarker>,
messages: Arc<Mutex<HashMap<Koid, Vec<ControlMessage>>>>,
koid: Koid,
stop_response: ControllerActionResponse,
kill_response: ControllerActionResponse,
) -> MockController {
MockController {
messages: messages,
request_stream: server_end.into_stream().expect("stream conversion failed"),
koid: koid,
stop_resp: stop_response,
kill_resp: kill_response,
}
}
/// Create a future which takes ownership of `server_end` and inserts
/// `ControlMessage`s into `messages` based on events sent on the
/// `ComponentController` channel.
pub fn into_serve_future(mut self) -> impl Future<Output = ()> {
let job_dup = fuchsia_runtime::job_default()
.duplicate_handle(zx::Rights::SAME_RIGHTS)
.expect("duplicate default job");
self.request_stream
.control_handle()
.send_on_publish_diagnostics(ComponentDiagnostics {
tasks: Some(ComponentTasks {
component_task: Some(DiagnosticsTask::Job(job_dup)),
..Default::default()
}),
..Default::default()
})
.unwrap_or_else(|e| {
warn!("sending diagnostics failed: {:?}", e);
});
async move {
self.messages.lock().await.insert(self.koid, Vec::new());
while let Ok(Some(request)) = self.request_stream.try_next().await {
match request {
fcrunner::ComponentControllerRequest::Stop { control_handle: c } => {
self.messages
.lock()
.await
.get_mut(&self.koid)
.expect("component channel koid key missing from mock runner map")
.push(ControlMessage::Stop);
if let Some(delay) = self.stop_resp.delay {
let delay_copy = delay.clone();
let close_channel = self.stop_resp.close_channel;
fasync::Task::spawn(async move {
fasync::Timer::new(fasync::Time::after(delay_copy)).await;
if close_channel {
c.shutdown_with_epitaph(zx::Status::OK);
}
})
.detach();
} else if self.stop_resp.close_channel {
c.shutdown_with_epitaph(zx::Status::OK);
break;
}
}
fcrunner::ComponentControllerRequest::Kill { control_handle: c } => {
self.messages
.lock()
.await
.get_mut(&self.koid)
.expect("component channel koid key missing from mock runner map")
.push(ControlMessage::Kill);
if let Some(delay) = self.kill_resp.delay {
let delay_copy = delay.clone();
let close_channel = self.kill_resp.close_channel;
fasync::Task::spawn(async move {
fasync::Timer::new(fasync::Time::after(delay_copy)).await;
if close_channel {
c.shutdown_with_epitaph(zx::Status::OK);
}
})
.detach();
if self.kill_resp.close_channel {
break;
}
} else if self.kill_resp.close_channel {
c.shutdown_with_epitaph(zx::Status::OK);
break;
}
}
}
}
}
}
/// Spawn an async execution context which takes ownership of `server_end`
/// and inserts `ControlMessage`s into `messages` based on events sent on
/// the `ComponentController` channel. This simply spawns a future which
/// awaits self.run().
pub fn serve(self) -> AbortHandle {
// Listen to the ComponentController server end and record the messages
// that arrive. Exit after the first one, as this is the contract we
// have implemented so far. Exiting will cause our handle to the
// channel to drop and close the channel.
let (handle, registration) = AbortHandle::new_pair();
let fut = Abortable::new(self.into_serve_future(), registration);
// Send default job for the sake of testing only.
fasync::Task::spawn(async move {
fut.await.unwrap_or(()); // Ignore cancellation.
})
.detach();
handle
}
}
/// Starts a program that does nothing but let us intercept requests to control its lifecycle.
pub fn mock_program() -> (Program, ServerEnd<fcrunner::ComponentControllerMarker>) {
let (controller, server_end) = create_endpoints::<fcrunner::ComponentControllerMarker>();
(Program::mock_from_controller(controller), server_end)
}