blob: c993549153c81bc9d66f8cee2dc294b4c2c68ea0 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{format_err, Context, Error},
async_trait::async_trait,
fidl::endpoints::{create_proxy, create_request_stream, ClientEnd, ServerEnd, ServiceMarker},
fidl::Channel,
fidl_fuchsia_test_events as fevents, fuchsia_async as fasync,
fuchsia_component::client::*,
futures::{
future::{AbortHandle, Abortable, BoxFuture, TryFutureExt},
lock::Mutex,
StreamExt,
},
std::sync::Arc,
};
/// A wrapper over the EventSourceSync FIDL proxy.
/// Provides all of the FIDL methods with a cleaner, simpler interface.
/// Refer to events.fidl for a detailed description of this protocol.
pub struct EventSource {
proxy: fevents::EventSourceSyncProxy,
}
impl EventSource {
/// Connects to the EventSourceSync service at its default location
/// The default location is presumably "/svc/fuchsia.test.events.EventSourceSync"
pub fn new() -> Result<Self, Error> {
let proxy = connect_to_service::<fevents::EventSourceSyncMarker>()
.context("could not connect to EventSourceSync service")?;
Ok(EventSource::from_proxy(proxy))
}
/// Wraps a provided EventSourceSync proxy
pub fn from_proxy(proxy: fevents::EventSourceSyncProxy) -> Self {
Self { proxy }
}
pub async fn subscribe(
&self,
event_types: Vec<fevents::EventType>,
) -> Result<EventStream, Error> {
let (proxy, server_end) = create_proxy::<fevents::EventStreamSyncMarker>()?;
self.proxy.subscribe(&mut event_types.into_iter(), server_end).await?;
Ok(EventStream::new(proxy))
}
pub async fn soak_events(
&self,
event_types: Vec<fevents::EventType>,
) -> Result<EventSink, Error> {
let event_stream = self.subscribe(event_types).await?;
Ok(EventSink::soak_async(event_stream))
}
// This is a convenience method that sets a breakpoint on the the `RouteCapability`,
// spawns a new task, and injects the service provided by the injector if requested
// by the event.
pub async fn install_injector<I: 'static>(&self, injector: Arc<I>) -> Result<AbortHandle, Error>
where
I: Injector,
{
let event_stream = self.subscribe(vec![RouteCapability::TYPE]).await?;
let (abort_handle, abort_registration) = AbortHandle::new_pair();
fasync::spawn(
Abortable::new(
async move {
loop {
let event = event_stream
.wait_until_type::<RouteCapability>()
.await
.expect("Type mismatch");
if event.capability_id == injector.capability_path() {
event.inject(injector.clone()).await.expect("injection failed");
}
event.resume().await.expect("resumption failed");
}
},
abort_registration,
)
.unwrap_or_else(|_| ()),
);
Ok(abort_handle)
}
// This is a convenience method that sets a breakpoint on the the `RouteCapability`,
// spawns a new task, and interposes the service provided by the interposer if requested
// by the event.
pub async fn install_interposer<I: 'static>(
&self,
interposer: Arc<I>,
) -> Result<AbortHandle, Error>
where
I: Interposer,
{
let event_stream = self.subscribe(vec![RouteCapability::TYPE]).await?;
let (abort_handle, abort_registration) = AbortHandle::new_pair();
fasync::spawn(
Abortable::new(
async move {
loop {
let event = event_stream
.wait_until_type::<RouteCapability>()
.await
.expect("Type mismatch");
if event.capability_id == interposer.capability_path() {
event.interpose(interposer.clone()).await.expect("injection failed");
}
event.resume().await.expect("resumption failed");
}
},
abort_registration,
)
.unwrap_or_else(|_| (())),
);
Ok(abort_handle)
}
pub async fn start_component_tree(&self) -> Result<(), Error> {
self.proxy.start_component_tree().await.context("could not start component tree")?;
Ok(())
}
}
/// A wrapper over the EventStreamSync FIDL proxy.
/// Provides convenience methods that build on EventStreamSync::Next
pub struct EventStream {
proxy: fevents::EventStreamSyncProxy,
}
impl EventStream {
fn new(proxy: fevents::EventStreamSyncProxy) -> Self {
Self { proxy }
}
pub async fn next(&self) -> Result<fevents::Event, Error> {
let event = self.proxy.next().await.context("could not get next breakpoint")?;
Ok(event)
}
/// Expects the next event to be of a particular type.
/// Returns the casted type if successful and an error otherwise.
pub async fn expect_type<T: Event>(&self) -> Result<T, Error> {
let event = self.next().await?;
T::from_fidl(event)
}
/// Expects the next event to be of a particular type and moniker.
/// Returns the casted type if successful and an error otherwise.
pub async fn expect_exact<T: Event>(&self, expected_moniker: &str) -> Result<T, Error> {
let event = self.expect_type::<T>().await?;
if expected_moniker == event.target_moniker() {
Ok(event)
} else {
Err(format_err!("Incorrect moniker"))
}
}
/// Waits for an event of a particular type.
/// Implicitly resumes all other events.
/// Returns the casted type if successful and an error otherwise.
pub async fn wait_until_type<T: Event>(&self) -> Result<T, Error> {
loop {
let event = self.next().await?;
if let Ok(event) = T::from_fidl(event) {
return Ok(event);
}
}
}
/// Waits for an event of a particular type and target moniker.
/// Implicitly resumes all other events.
/// Returns the casted type if successful and an error otherwise.
pub async fn wait_until_exact<T: Event>(
&self,
expected_target_moniker: &str,
) -> Result<T, Error> {
loop {
let event = self.wait_until_type::<T>().await?;
if event.target_moniker() == expected_target_moniker {
return Ok(event);
}
event.resume().await?;
}
}
/// Waits for a component capability to be routed matching a particular
/// target moniker and capability. Implicitly resumes all other events.
/// Returns the casted type if successful and an error otherwise.
pub async fn wait_until_component_capability(
&self,
expected_target_moniker: &str,
expected_capability_id: &str,
) -> Result<RouteCapability, Error> {
loop {
let event = self.wait_until_exact::<RouteCapability>(expected_target_moniker).await?;
if expected_capability_id == event.capability_id {
match event.source {
fevents::CapabilitySource::Component(_) => return Ok(event),
_ => {}
}
}
event.resume().await?;
}
}
/// Waits for a framework capability to be routed matching a particular
/// target moniker, scope moniker and capability. Implicitly resumes all other events.
/// Returns the casted type if successful and an error otherwise.
pub async fn wait_until_framework_capability(
&self,
expected_target_moniker: &str,
expected_capability_id: &str,
expected_scope_moniker: Option<&str>,
) -> Result<RouteCapability, Error> {
loop {
let event = self.wait_until_exact::<RouteCapability>(expected_target_moniker).await?;
// If the capability ID matches and the capability source is framework
// with a matching optional scope moniker, then return the event.
if expected_capability_id == event.capability_id {
match &event.source {
fevents::CapabilitySource::Framework(fevents::FrameworkCapability {
scope_moniker,
..
}) if scope_moniker.as_ref().map(|s| s.as_str()) == expected_scope_moniker => {
return Ok(event)
}
_ => {}
}
}
event.resume().await?;
}
}
}
/// Common features of any event - event type, target moniker, conversion function
pub trait Event: Handler {
const TYPE: fevents::EventType;
fn target_moniker(&self) -> &str;
fn from_fidl(event: fevents::Event) -> Result<Self, Error>;
}
/// Basic handler that resumes/unblocks from an Event
#[must_use = "invoke resume() otherwise component manager will be halted indefinitely!"]
pub trait Handler: Sized {
fn handler_proxy(self) -> fevents::HandlerProxy;
#[must_use = "futures do nothing unless you await on them!"]
fn resume<'a>(self) -> BoxFuture<'a, Result<(), fidl::Error>> {
let proxy = self.handler_proxy();
Box::pin(async move { proxy.resume().await })
}
}
/// Implemented on fevents::Event for resuming a generic event
impl Handler for fevents::Event {
fn handler_proxy(self) -> fevents::HandlerProxy {
self.handler
.expect("Could not find handler in Event object")
.into_proxy()
.expect("Could not convert into proxy")
}
}
/// An Injector allows a test to implement a protocol to be used by a component.
///
/// Client <---> Injector
#[async_trait]
pub trait Injector: Send + Sync {
type Marker: ServiceMarker;
/// This function will be run in a spawned task when a client attempts
/// to connect to the service being injected. `request_stream` is a stream of
/// requests coming in from the client.
async fn serve(
self: Arc<Self>,
mut request_stream: <<Self as Injector>::Marker as ServiceMarker>::RequestStream,
) -> Result<(), Error>;
fn capability_path(&self) -> String {
format!("/svc/{}", Self::Marker::NAME)
}
}
/// An Interposer allows a test to sit between a service and a client
/// and mutate or silently observe messages being passed between them.
///
/// Client <---> Interposer <---> Service
#[async_trait]
pub trait Interposer: Send + Sync {
type Marker: ServiceMarker;
/// This function will be run asynchronously when a client attempts
/// to connect to the service being interposed. `request_stream` is a stream of
/// requests coming in from the client and `to_service` is a proxy to the
/// real service.
async fn interpose(
self: Arc<Self>,
mut request_stream: <<Self as Interposer>::Marker as ServiceMarker>::RequestStream,
to_service: <<Self as Interposer>::Marker as ServiceMarker>::Proxy,
) -> Result<(), Error>;
fn capability_path(&self) -> String {
format!("/svc/{}", Self::Marker::NAME)
}
}
/// A protocol that allows routing capabilities over FIDL.
#[async_trait]
pub trait RoutingProtocol {
fn protocol_proxy(&self) -> fevents::RoutingProtocolProxy;
#[must_use = "futures do nothing unless you await on them!"]
async fn set_provider(
&self,
client_end: ClientEnd<fevents::CapabilityProviderMarker>,
) -> Result<(), fidl::Error> {
let proxy = self.protocol_proxy();
proxy.set_provider(client_end).await
}
/// Set an Injector for the given capability.
#[must_use = "futures do nothing unless you await on them!"]
async fn inject<I: 'static>(&self, injector: Arc<I>) -> Result<(), fidl::Error>
where
I: Injector,
{
// Create the CapabilityProvider channel
let (provider_client_end, mut provider_capability_stream) =
create_request_stream::<fevents::CapabilityProviderMarker>()
.expect("Could not create request stream for CapabilityProvider");
// Wait for an Open request on the CapabilityProvider channel
fasync::spawn(async move {
if let Some(Ok(fevents::CapabilityProviderRequest::Open { server_end, responder })) =
provider_capability_stream.next().await
{
// Unblock component manager from the open request
responder.send().expect("Could not respond to CapabilityProvider::Open");
// Create the stream for the Client <---> Interposer connection
let stream = ServerEnd::<I::Marker>::new(server_end)
.into_stream()
.expect("could not convert channel into stream");
injector.serve(stream).await.expect("Injection failed");
} else {
panic!(
"Failed to inject capability! CapabilityProvider was not able to invoke Open"
);
}
});
// Send the client end of the CapabilityProvider protocol
self.protocol_proxy().set_provider(provider_client_end).await
}
/// Set an Interposer for the given capability.
#[must_use = "futures do nothing unless you await on them!"]
async fn interpose<I: 'static>(&self, interposer: Arc<I>) -> Result<(), fidl::Error>
where
I: Interposer,
{
// Create the Interposer <---> Server channel
let (client_end, server_end) = Channel::create().expect("could not create channel");
// Create the CapabilityProvider channel
let (provider_client_end, mut provider_capability_stream) =
create_request_stream::<fevents::CapabilityProviderMarker>()
.expect("Could not create request stream for CapabilityProvider");
// Wait for an Open request on the CapabilityProvider channel
fasync::spawn(async move {
if let Some(Ok(fevents::CapabilityProviderRequest::Open { server_end, responder })) =
provider_capability_stream.next().await
{
// Unblock component manager from the open request
responder.send().expect("Could not respond to CapabilityProvider::Open");
// Create the proxy for the Interposer <---> Server connection
let proxy = ClientEnd::<I::Marker>::new(client_end)
.into_proxy()
.expect("could not convert into proxy");
// Create the stream for the Client <---> Interposer connection
let stream = ServerEnd::<I::Marker>::new(server_end)
.into_stream()
.expect("could not convert channel into stream");
// Start interposing!
interposer.interpose(stream, proxy).await.expect("Interposition failed");
} else {
panic!("Failed to interpose! CapabilityProvider was not able to invoke Open");
}
});
// Replace the existing provider and open it with the
// server end of the Interposer <---> Server channel.
self.protocol_proxy().replace_and_open(provider_client_end, server_end).await
}
}
/// Describes an event drained out by the EventSink
#[derive(Eq, PartialEq, PartialOrd, Ord, Debug)]
pub struct DrainedEvent {
pub event_type: fevents::EventType,
pub target_moniker: String,
}
/// Soaks events from an EventStream, allowing them to be
/// drained at a later point in time.
pub struct EventSink {
drained_events: Arc<Mutex<Vec<DrainedEvent>>>,
}
impl EventSink {
fn soak_async(event_stream: EventStream) -> Self {
let drained_events = Arc::new(Mutex::new(vec![]));
{
// Start an async task that soaks up events from the event_stream
let drained_events = drained_events.clone();
fasync::spawn(async move {
// TODO(xbhatnag): Terminate this infinite loop when EventSink is dropped.
// Or pass in a Weak and terminate if it can't be upgraded.
loop {
// Get the next event from the event_stream
let event = event_stream
.next()
.await
.expect("Failed to get next event from EventStreamSync");
// Construct the DrainedEvent from the Event
let event_type = event.event_type.expect("Failed to get event type from Event");
let target_moniker = event
.target_moniker
.as_ref()
.expect("Failed to get target moniker from Event")
.clone();
let drained_event = DrainedEvent { event_type, target_moniker };
// Insert the event into the list
{
let mut drained_events = drained_events.lock().await;
drained_events.push(drained_event);
}
// Resume from the event
event.resume().await.expect("Could not resume from event");
}
});
}
Self { drained_events }
}
pub async fn drain(&self) -> Vec<DrainedEvent> {
// Lock and drain out all events from the vector
let mut drained_events = self.drained_events.lock().await;
drained_events.drain(..).collect()
}
}
/// The macro defined below will automatically create event classes corresponding
/// to their events.fidl and hooks.rs counterparts. Every event class implements
/// the Event and Handler traits. These minimum requirements allow every event to
/// be handled by the events client library.
/// Creates an event class based on event type and an optional payload
/// * event_type -> FIDL name for event type
/// * payload -> If an event has a payload, describe the additional params:
/// * name -> FIDL name for the payload
/// * data -> If a payload contains data items, describe the additional params:
/// * name -> FIDL name for the data item
/// * ty -> Rust type for the data item
/// * protocol -> If a payload contains protocols, describe the additional params:
/// * name -> FIDL name for the protocol
/// * ty -> Rust type for the protocol proxy
/// * trait_name -> Rust name for the trait implementing this protocol
macro_rules! create_event {
(
event_type: $event_type:ident,
payload: {
name: $payload_name:ident,
data: {$(
{
name: $data_name:ident,
ty: $data_ty:ty,
}
)*},
protocols: {$(
{
name: $protocol_name:ident,
ty: $protocol_ty:ty,
trait_name: $protocol_trait_name:ident,
}
)*},
}
) => {
pub struct $event_type {
target_moniker: String,
handler: fevents::HandlerProxy,
$($protocol_name: $protocol_ty,)*
$(pub $data_name: $data_ty,)*
}
impl Event for $event_type {
const TYPE: fevents::EventType = fevents::EventType::$event_type;
fn target_moniker(&self) -> &str {
&self.target_moniker
}
fn from_fidl(event: fevents::Event) -> Result<Self, Error> {
// Event type in event must match what is expected
let event_type = event.event_type.ok_or(
format_err!("Missing event_type from Event object")
)?;
if event_type != Self::TYPE {
return Err(format_err!("Incorrect event type"));
}
let target_moniker = event.target_moniker.ok_or(
format_err!("Missing target_moniker from Event object")
)?;
let handler = event.handler.ok_or(
format_err!("Missing handler from Event object")
)?.into_proxy()?;
// Extract the payload from the Event object.
let event_payload = event.event_payload.ok_or(
format_err!("Missing event_payload from Event object")
)?;
let $payload_name = event_payload.$payload_name.ok_or(
format_err!("Missing $payload_name from EventPayload object")
)?;
// Extract the additional data from the Payload object.
$(
let $data_name: $data_ty = $payload_name.$data_name.ok_or(
format_err!("Missing $data_name from $payload_name object")
)?;
)*
// Extract the additional protocols from the Payload object.
$(
let $protocol_name: $protocol_ty = $payload_name.$protocol_name.ok_or(
format_err!("Missing $protocol_name from $payload_name object")
)?.into_proxy()?;
)*
Ok($event_type { target_moniker, handler, $($data_name,)* $($protocol_name,)* })
}
}
impl Handler for $event_type {
fn handler_proxy(self) -> fevents::HandlerProxy {
self.handler
}
}
$(
impl $protocol_trait_name for $event_type {
fn protocol_proxy(&self) -> $protocol_ty {
self.$protocol_name.clone()
}
}
)*
};
($event_type:ident) => {
pub struct $event_type {
target_moniker: String,
handler: fevents::HandlerProxy,
}
impl Event for $event_type {
const TYPE: fevents::EventType = fevents::EventType::$event_type;
fn target_moniker(&self) -> &str {
&self.target_moniker
}
fn from_fidl(event: fevents::Event) -> Result<Self, Error> {
// Event type in event must match what is expected
let event_type = event.event_type.ok_or(
format_err!("Missing event_type from Event object")
)?;
if event_type != Self::TYPE {
return Err(format_err!("Incorrect event type"));
}
let target_moniker = event.target_moniker.ok_or(
format_err!("Missing target_moniker from Event object")
)?;
let handler = event.handler.ok_or(
format_err!("Missing handler from Event object")
)?.into_proxy()?;
// There should be no payload for this event
if event.event_payload.is_some() {
return Err(format_err!("Unexpected event payload"));
}
Ok($event_type { target_moniker, handler, })
}
}
impl Handler for $event_type {
fn handler_proxy(self) -> fevents::HandlerProxy {
self.handler
}
}
};
}
// To create a class for an event, use the above macro here.
create_event!(AddDynamicChild);
create_event!(BeforeStartInstance);
create_event!(PostDestroyInstance);
create_event!(PreDestroyInstance);
create_event!(ResolveInstance);
create_event!(StopInstance);
create_event!(
event_type: RouteCapability,
payload: {
name: routing_payload,
data: {
{
name: source,
ty: fevents::CapabilitySource,
}
{
name: capability_id,
ty: String,
}
},
protocols: {
{
name: routing_protocol,
ty: fevents::RoutingProtocolProxy,
trait_name: RoutingProtocol,
}
},
}
);