blob: e692276b4c7b4d38dd2faa728189ab7a91f3cc45 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
events::{
CapabilityRouted, Event, EventMode, EventSource, EventStream, EventStreamError,
EventSubscription, RoutingProtocol,
},
matcher::EventMatcher,
},
anyhow::Error,
async_trait::async_trait,
fidl::endpoints::{create_request_stream, ClientEnd, ServerEnd, ServiceMarker},
fidl_fuchsia_io as fio, fidl_fuchsia_sys2 as fsys, fuchsia_async as fasync,
futures::{
future::{AbortHandle, Abortable, TryFutureExt},
StreamExt,
},
log::warn,
std::sync::Arc,
};
/// A ProtocolInterposer allows a test to sit between a service and a client
/// and mutate or silently observe messages being passed between them.
///
/// Client <---> ProtocolInterposer <---> Server
#[async_trait]
pub trait ProtocolInterposer: 'static + Send + Sync {
type Marker: ServiceMarker;
async fn interpose(
self: Arc<Self>,
event_source: &EventSource,
matcher: EventMatcher,
) -> AbortHandle {
let matcher = matcher.capability_name(Self::Marker::NAME);
let server_end = event_source
.subscribe_endpoint(vec![EventSubscription::new(
vec![CapabilityRouted::NAME],
EventMode::Sync,
)])
.await
.expect("Could not subscribe to CapabilityRouted event for interposition");
let (abort_handle, abort_registration) = AbortHandle::new_pair();
// Spawn a new thread to listen to CapabilityRoutedEvents.
// We use a new thread here because running this on the main thread may
// not work if a test writer needs to do blocking operations.
fasync::Task::blocking(
Abortable::new(
async move {
let mut event_stream = EventStream::new(
server_end
.into_stream()
.expect("Could not create EventStream from ServerEnd"),
);
loop {
// Wait for a capability routed event that matches
let event =
match matcher.clone().wait::<CapabilityRouted>(&mut event_stream).await
{
Ok(e) => e,
Err(e) => match e.downcast::<EventStreamError>() {
Ok(EventStreamError::StreamClosed) => return,
Err(e) => panic!("Unknown error! {:?}", e),
},
};
// An event was found! Inject the route.
if event.is_ok() {
let (provider_client_end, server_end) = self.clone().route();
event
.protocol_proxy()
.expect("Event does not have routing protocol")
.replace_and_open(provider_client_end, server_end)
.await
.expect("Could not set provider for CapabilityRouted event");
}
}
},
abort_registration,
)
.unwrap_or_else(|_| ()),
)
.detach();
abort_handle
}
fn route(self: Arc<Self>) -> (ClientEnd<fsys::CapabilityProviderMarker>, fidl::Channel) {
// Create the Interposer <---> Server channel
let (client_end, server_end) = fidl::Channel::create().expect("could not create channel");
// Create the CapabilityProvider channel
let (provider_client_end, mut provider_capability_stream) =
create_request_stream::<fsys::CapabilityProviderMarker>()
.expect("Could not create request stream for CapabilityProvider");
// Spawn a task to handle this new route
fasync::Task::spawn(async move {
if let Some(Ok(fsys::CapabilityProviderRequest::Open {
server_end,
flags,
mode,
path,
responder,
})) = provider_capability_stream.next().await
{
// Unblock component manager
responder.send().expect("Failed to respond to CapabilityProvider Open");
if !path.is_empty() {
warn!(
"Interposed service {} was provided a non-empty path: {}",
Self::Marker::NAME,
path
);
}
if flags != (fio::OPEN_RIGHT_READABLE | fio::OPEN_RIGHT_WRITABLE) {
warn!(
"Interposed service {} was provided unexpected flags: 0x{:x}",
Self::Marker::NAME,
flags
);
}
let mode_type = mode & fio::MODE_TYPE_MASK;
if mode_type != fio::MODE_TYPE_SERVICE {
warn!(
"Interposed service {} was provided unexpected mode type: 0x{:x}",
Self::Marker::NAME,
mode_type
);
}
// Create the proxy for the Interposer <---> Server connection
let proxy = ClientEnd::<Self::Marker>::new(client_end)
.into_proxy()
.expect("could not convert into proxy");
// Create the stream for the Client <---> Interposer connection
let stream = ServerEnd::<Self::Marker>::new(server_end)
.into_stream()
.expect("could not convert channel into stream");
// Start interposing!
self.serve(stream, proxy).await.expect("Interposition failed");
}
})
.detach();
(provider_client_end, server_end)
}
/// This function will be run asynchronously when a client attempts
/// to connect to the service being interposed. `from_client` is a stream of
/// requests coming in from the client and `to_server` is a proxy to the
/// real server.
async fn serve(
self: Arc<Self>,
mut from_client: <<Self as ProtocolInterposer>::Marker as ServiceMarker>::RequestStream,
to_server: <<Self as ProtocolInterposer>::Marker as ServiceMarker>::Proxy,
) -> Result<(), Error>;
}