| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| events::{ |
| CapabilityRouted, Event, EventSource, EventStream, EventStreamError, RoutingProtocol, |
| }, |
| matcher::EventMatcher, |
| }, |
| anyhow::Error, |
| async_trait::async_trait, |
| fidl::endpoints::{create_request_stream, ClientEnd, ServerEnd, ServiceMarker}, |
| fidl_fuchsia_io as fio, fidl_fuchsia_sys2 as fsys, fuchsia_async as fasync, |
| futures::StreamExt, |
| io_util::node::connect_in_namespace, |
| log::{info, warn}, |
| std::sync::Arc, |
| vfs::{ |
| directory::{ |
| entry::DirectoryEntry, immutable::connection::io1::ImmutableConnection, simple::Simple, |
| }, |
| execution_scope::ExecutionScope, |
| path::Path, |
| }, |
| }; |
| |
| #[async_trait] |
| pub trait CapabilityInjector: 'static + Send + Sync { |
| async fn inject(self: &Arc<Self>, event_source: &EventSource, matcher: EventMatcher) { |
| self.subscribe(event_source, matcher).await; |
| } |
| |
| async fn subscribe(self: &Arc<Self>, event_source: &EventSource, matcher: EventMatcher) { |
| let server_end = event_source |
| .subscribe_endpoint(vec![CapabilityRouted::NAME]) |
| .await |
| .expect("Could not subscribe to CapabilityRouted event for injection"); |
| |
| // Spawn a new thread to listen to CapabilityRoutedEvents. |
| // We use a new thread here because running this on the main thread may |
| // not work if a test writer needs to do blocking operations. |
| let injector = self.clone(); |
| fasync::Task::blocking(async move { |
| let mut event_stream = EventStream::new( |
| server_end.into_stream().expect("Could not create EventStream from ServerEnd"), |
| ); |
| loop { |
| // Wait for a capability routed event that matches |
| let event = match matcher.clone().wait::<CapabilityRouted>(&mut event_stream).await |
| { |
| Ok(e) => e, |
| Err(e) => match e.downcast::<EventStreamError>() { |
| Ok(EventStreamError::StreamClosed) => return, |
| Err(e) => panic!("Unknown error! {:?}", e), |
| }, |
| }; |
| |
| // An event was found! Inject the route. |
| if let Ok(payload) = event.result() { |
| info!("Injecting CapabilityProvider for `{}`", payload.name); |
| let provider_client_end = injector.clone().route(); |
| event |
| .protocol_proxy() |
| .expect(&format!( |
| "CapabilityRouted Event for {} does not have RoutingProtocol", |
| payload.name |
| )) |
| .set_provider(provider_client_end) |
| .await |
| .expect(&format!( |
| "Could not set provider for CapabilityRouted event for `{}`", |
| payload.name |
| )); |
| } |
| } |
| }) |
| .detach(); |
| } |
| |
| fn route(self: Arc<Self>) -> ClientEnd<fsys::CapabilityProviderMarker> { |
| // Create the CapabilityProvider channel |
| let (provider_client_end, mut provider_capability_stream) = |
| create_request_stream::<fsys::CapabilityProviderMarker>() |
| .expect("Could not create request stream for CapabilityProvider"); |
| |
| // Spawn a task to handle this new route |
| fasync::Task::spawn(async move { |
| while let Some(Ok(fsys::CapabilityProviderRequest::Open { |
| server_end, |
| flags, |
| mode, |
| path, |
| responder, |
| })) = provider_capability_stream.next().await |
| { |
| // Unblock component manager |
| responder.send().expect("Failed to respond to CapabilityProvider Open"); |
| |
| // Spawn a task to serve the capability |
| let injector = self.clone(); |
| fasync::Task::spawn(async move { |
| injector.open(server_end, flags, mode, path).await; |
| }) |
| .detach(); |
| } |
| }) |
| .detach(); |
| |
| provider_client_end |
| } |
| |
| // Implement this method to serve the capability on this route |
| async fn open(self: Arc<Self>, server_end: fidl::Channel, flags: u32, mode: u32, path: String); |
| } |
| |
| /// Allows tests to inject pseudo-filesystems for capabilities. |
| /// This can be used to serve directory capabilities like /dev |
| /// and /temp by hosting the filesystem within the test itself. |
| pub struct DirectoryInjector { |
| dir: Arc<Simple<ImmutableConnection>>, |
| scope: ExecutionScope, |
| } |
| |
| impl DirectoryInjector { |
| pub fn new(dir: Arc<Simple<ImmutableConnection>>) -> Arc<Self> { |
| Arc::new(Self { dir, scope: ExecutionScope::new() }) |
| } |
| |
| pub async fn wait(self) { |
| self.scope.wait().await; |
| } |
| } |
| |
| #[async_trait] |
| impl CapabilityInjector for DirectoryInjector { |
| async fn open(self: Arc<Self>, server_end: fidl::Channel, flags: u32, mode: u32, path: String) { |
| let mode_type = mode & fio::MODE_TYPE_MASK; |
| if mode_type != fio::MODE_TYPE_DIRECTORY { |
| warn!("Injected directory received unexpected mode type: 0x{:x}", mode_type); |
| } |
| |
| let server_end = ServerEnd::from(server_end); |
| let dir = self.dir.clone(); |
| let scope = self.scope.clone(); |
| let relative_path = |
| Path::validate_and_split(path).expect("Relative path could not be validated by VFS."); |
| dir.open(scope, flags, mode, relative_path, server_end); |
| } |
| } |
| |
| /// Allows tests to inject capabilities from the test namespace. |
| /// This can be used to serve capabilities that are difficult to |
| /// mock within a test (such as Vulkan). |
| pub struct TestNamespaceInjector { |
| path_in_test_namespace: &'static str, |
| } |
| |
| impl TestNamespaceInjector { |
| pub fn new(path_in_test_namespace: &'static str) -> Arc<Self> { |
| Arc::new(Self { path_in_test_namespace }) |
| } |
| } |
| |
| #[async_trait] |
| impl CapabilityInjector for TestNamespaceInjector { |
| async fn open( |
| self: Arc<Self>, |
| server_end: fidl::Channel, |
| flags: u32, |
| _mode: u32, |
| path: String, |
| ) { |
| if !path.is_empty() { |
| warn!("TestNamespaceInjector received non-empty relative path: \"{}\"", path); |
| } |
| |
| if flags & !(fio::OPEN_RIGHT_READABLE | fio::OPEN_RIGHT_WRITABLE) != 0 { |
| warn!("TestNamespaceInjector received unexpected flags: 0x{:x}", flags); |
| } |
| |
| connect_in_namespace(self.path_in_test_namespace, flags, server_end) |
| .expect("Could not connect to capability in test namespace"); |
| } |
| } |
| |
| /// A ProtocolInjector allows a test to implement a protocol to be used by a component. |
| /// |
| /// Client <---> ProtocolInjector |
| #[async_trait] |
| pub trait ProtocolInjector { |
| type Marker: ServiceMarker; |
| |
| /// This function will be run in a spawned task when a client attempts |
| /// to connect to the service being injected. `request_stream` is a stream of |
| /// requests coming in from the client. |
| async fn serve( |
| self: Arc<Self>, |
| mut request_stream: <<Self as ProtocolInjector>::Marker as ServiceMarker>::RequestStream, |
| ) -> Result<(), Error>; |
| } |
| |
| #[async_trait] |
| impl<M: ServiceMarker, T: ProtocolInjector<Marker = M> + 'static + Sync + Send> CapabilityInjector |
| for T |
| { |
| async fn inject(self: &Arc<Self>, event_source: &EventSource, matcher: EventMatcher) { |
| let matcher = match matcher.capability_name { |
| None => matcher.capability_name(M::NAME), |
| Some(_) => matcher, |
| }; |
| CapabilityInjector::subscribe(self, event_source, matcher).await; |
| } |
| |
| async fn open(self: Arc<Self>, server_end: fidl::Channel, flags: u32, mode: u32, path: String) { |
| if !path.is_empty() { |
| warn!("Injected protocol {} received a non-empty path: {}", M::NAME, path); |
| } |
| |
| if flags != (fio::OPEN_RIGHT_READABLE | fio::OPEN_RIGHT_WRITABLE) { |
| warn!("Injected protocol {} received unexpected flags: 0x{:x}", M::NAME, flags); |
| } |
| |
| let mode_type = mode & fio::MODE_TYPE_MASK; |
| if mode_type != fio::MODE_TYPE_SERVICE { |
| warn!("Injected protocol {} received unexpected mode type: 0x{:x}", M::NAME, mode_type); |
| } |
| |
| // Create the stream for the Client <---> Injector connection |
| let stream = ServerEnd::<M>::new(server_end) |
| .into_stream() |
| .expect("could not convert channel into stream"); |
| let capability_name = <M as ServiceMarker>::NAME; |
| info!("Serving injected capability `{}`...", capability_name); |
| if let Err(e) = self.serve(stream).await { |
| if let Some(e) = e.downcast_ref::<fidl::Error>() { |
| if e.is_closed() { |
| warn!("Injection for `{}` has stopped.", capability_name); |
| return; |
| } |
| } |
| panic!(e); |
| } |
| } |
| } |