blob: 1a9ce1b49229a8e4ad8a762c1fd7ca044bd033cb [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
fidl_fuchsia_component_runner as fcrunner, fidl_fuchsia_diagnostics_types as fdiagnostics,
fuchsia_async as fasync, fuchsia_zircon as zx,
futures::{
channel::oneshot,
future::{BoxFuture, Shared},
FutureExt, StreamExt,
},
std::ops::Deref,
};
/// Wrapper around the `ComponentControllerProxy` with utilities for handling events.
pub struct ComponentController {
/// The wrapped `ComponentController` connection.
inner: fcrunner::ComponentControllerProxy,
/// Receiver for epitaphs coming from the connection.
epitaph_value_recv: Shared<oneshot::Receiver<zx::Status>>,
/// Receiver for diganostics data coming from an event.
diagnostics_value_recv: Option<oneshot::Receiver<fdiagnostics::ComponentDiagnostics>>,
/// The task listening for events.
_event_listener_task: fasync::Task<()>,
}
impl Deref for ComponentController {
type Target = fcrunner::ComponentControllerProxy;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<'a> ComponentController {
/// Create a new wrapper around a `ComponentControllerProxy`.
///
/// A task will be spawned to dispatch events on the `ComponentControllerProxy`.
pub fn new(proxy: fcrunner::ComponentControllerProxy) -> Self {
let (epitaph_sender, epitaph_value_recv) = oneshot::channel();
let (diagnostics_sender, diagnostics_value_recv) = oneshot::channel();
let event_stream = proxy.take_event_stream();
let events_fut = Self::listen_for_events(event_stream, epitaph_sender, diagnostics_sender);
let event_listener_task = fasync::Task::spawn(events_fut);
Self {
inner: proxy,
epitaph_value_recv: epitaph_value_recv.shared(),
diagnostics_value_recv: Some(diagnostics_value_recv),
_event_listener_task: event_listener_task,
}
}
/// Obtain a future for the epitaph value.
///
/// If the async task handling the `ComponentControllerProxy` unexpectedly exits, the future
/// will resolve to `PEER_CLOSED`. Otherwise, it will resolve to the `zx::Status` representing
/// the epitaph sent on the channel.
///
/// This method may be called multiple times from multiple threads.
pub fn wait_for_epitaph(&self) -> BoxFuture<'static, zx::Status> {
let val = self.epitaph_value_recv.clone();
async move { val.await.unwrap_or(zx::Status::PEER_CLOSED) }.boxed()
}
/// Obtain a receiver for the diagnostics values sent by the runner for the component.
///
/// This method may be called multiple times, but only the first call will obtain the value.
pub fn take_diagnostics_receiver(
&mut self,
) -> Option<oneshot::Receiver<fdiagnostics::ComponentDiagnostics>> {
self.diagnostics_value_recv.take()
}
async fn listen_for_events(
mut event_stream: fcrunner::ComponentControllerEventStream,
epitaph_sender: oneshot::Sender<zx::Status>,
diagnostics_sender: oneshot::Sender<fdiagnostics::ComponentDiagnostics>,
) {
let mut epitaph_sender = Some(epitaph_sender);
let mut diagnostics_sender = Some(diagnostics_sender);
while let Some(value) = event_stream.next().await {
match value {
Err(fidl::Error::ClientChannelClosed { status, .. }) => {
epitaph_sender.take().and_then(|sender| sender.send(status).ok());
}
Err(_) => {
epitaph_sender
.take()
.and_then(|sender| sender.send(zx::Status::PEER_CLOSED).ok());
}
Ok(event) => match event {
fcrunner::ComponentControllerEvent::OnPublishDiagnostics {
payload, ..
} => {
diagnostics_sender.take().and_then(|sender| sender.send(payload).ok());
}
},
}
}
epitaph_sender.take().map(|sender| sender.send(zx::Status::PEER_CLOSED).unwrap_or(()));
}
}
#[cfg(test)]
mod tests {
use {super::*, fidl::endpoints::RequestStream, matches::assert_matches};
#[fuchsia::test]
async fn handles_diagnostics_event() {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<fcrunner::ComponentControllerMarker>()
.unwrap();
let mut controller = ComponentController::new(proxy);
let receiver = controller.take_diagnostics_receiver();
assert!(receiver.is_some());
assert!(controller.take_diagnostics_receiver().is_none());
stream
.control_handle()
.send_on_publish_diagnostics(fdiagnostics::ComponentDiagnostics {
tasks: Some(fdiagnostics::ComponentTasks::EMPTY),
..fdiagnostics::ComponentDiagnostics::EMPTY
})
.expect("sent diagnostics");
assert_matches!(
receiver.unwrap().await,
Ok(fdiagnostics::ComponentDiagnostics { tasks: Some(_), .. })
);
}
#[fuchsia::test]
async fn handles_connection_epitaph() {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<fcrunner::ComponentControllerMarker>()
.unwrap();
let controller = ComponentController::new(proxy);
let epitaph_fut = controller.wait_for_epitaph();
stream.control_handle().shutdown_with_epitaph(zx::Status::UNAVAILABLE);
assert_eq!(epitaph_fut.await, zx::Status::UNAVAILABLE);
}
#[fuchsia::test]
async fn handles_epitaph_for_closed_connection() {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<fcrunner::ComponentControllerMarker>()
.unwrap();
let controller = ComponentController::new(proxy);
let epitaph_fut = controller.wait_for_epitaph();
drop(stream);
assert_eq!(epitaph_fut.await, zx::Status::PEER_CLOSED);
}
#[fuchsia::test]
async fn handles_epitaph_for_dropped_controller() {
let (proxy, _) =
fidl::endpoints::create_proxy::<fcrunner::ComponentControllerMarker>().unwrap();
let controller = ComponentController::new(proxy);
let epitaph_fut = controller.wait_for_epitaph();
drop(controller);
assert_eq!(epitaph_fut.await, zx::Status::PEER_CLOSED);
}
}