blob: bced26edeb1206b66eb721819691f8df35c2c04c [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::manager::DEFAULT_TIMEOUT_IN_SECONDS;
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use fidl::endpoints::ClientEnd;
use futures::channel::mpsc;
use futures::{join, pin_mut, select, AsyncWriteExt, FutureExt, SinkExt, StreamExt};
use std::cell::RefCell;
use std::collections::LinkedList;
use std::rc::Rc;
use test_manager::Artifact;
use tracing::{info, warn};
use {
fidl_fuchsia_diagnostics as diagnostics, fidl_fuchsia_test_manager as test_manager,
fuchsia_async as fasync, fuchsia_zircon as zx,
};
/// Dispatches `test_manager` artifacts to the `ArtifactBridge`s used to forward data to `ffx fuzz`.
///
/// This object receives the artifacts extracted and forwarded from the `RunEventHandler` and
/// `SuiteEventHandler`.
pub struct ArtifactHandler {
artifact_receiver: mpsc::UnboundedReceiver<Artifact>,
stdout: Option<mpsc::UnboundedSender<zx::Socket>>,
stderr: Option<mpsc::UnboundedSender<zx::Socket>>,
syslog: Option<mpsc::UnboundedSender<ClientEnd<diagnostics::BatchIteratorMarker>>>,
}
impl ArtifactHandler {
/// Wraps the artifact channel in a handler.
pub fn new(artifact_receiver: mpsc::UnboundedReceiver<Artifact>) -> Self {
Self { artifact_receiver, stdout: None, stderr: None, syslog: None }
}
/// Creates an artifact bridge for forwarding stdout from `test_manager` to `ffx fuzz`.
pub fn create_stdout_bridge(
&mut self,
socket_receiver: mpsc::UnboundedReceiver<zx::Socket>,
) -> SocketBridge {
let (artifact_sender, artifact_receiver) = mpsc::unbounded::<zx::Socket>();
self.stdout = Some(artifact_sender);
SocketBridge::new(artifact_receiver, socket_receiver)
}
/// Creates an artifact bridge for forwarding stderr from `test_manager` to `ffx fuzz`.
pub fn create_stderr_bridge(
&mut self,
socket_receiver: mpsc::UnboundedReceiver<zx::Socket>,
) -> SocketBridge {
let (artifact_sender, artifact_receiver) = mpsc::unbounded::<zx::Socket>();
self.stderr = Some(artifact_sender);
SocketBridge::new(artifact_receiver, socket_receiver)
}
/// Creates an artifact bridge for forwarding syslog from `test_manager` to `ffx fuzz`.
pub fn create_syslog_bridge(
&mut self,
socket_receiver: mpsc::UnboundedReceiver<zx::Socket>,
) -> LogBridge {
let (artifact_sender, artifact_receiver) =
mpsc::unbounded::<ClientEnd<diagnostics::BatchIteratorMarker>>();
self.syslog = Some(artifact_sender);
LogBridge::new(artifact_receiver, socket_receiver)
}
/// Handles `Artifact`s and dispatches them to the appropriate bridge.
pub async fn run(&mut self) -> Result<()> {
while let Some(artifact) = self.artifact_receiver.next().await {
match artifact {
test_manager::Artifact::Stdout(socket) => {
if let Some(mut stdout) = self.stdout.as_ref() {
stdout.send(socket).await.context("failed to send stdout artifact")?;
}
}
test_manager::Artifact::Stderr(socket) => {
if let Some(mut stderr) = self.stderr.as_ref() {
stderr.send(socket).await.context("failed to send stderr artifact")?;
}
}
test_manager::Artifact::Log(test_manager::Syslog::Batch(client_end)) => {
if let Some(mut syslog) = self.syslog.as_ref() {
syslog.send(client_end).await.context("failed to send syslog artifact")?;
}
}
artifact => info!("Ignoring unsupported artifact: {:?}", artifact),
};
}
Ok(())
}
}
struct Message {
data: Vec<u8>,
expiration: zx::Time,
}
impl Message {
fn new(data: Vec<u8>) -> Self {
let mut expiration = zx::Time::get_monotonic();
expiration += zx::Duration::from_seconds(DEFAULT_TIMEOUT_IN_SECONDS);
Self { data, expiration }
}
}
#[async_trait(?Send)]
pub trait ArtifactBridge {
/// Forwards data from `test_manager` artifacts to `ffx fuzz` sockets.
async fn forward(&self);
}
#[async_trait(?Send)]
trait ArtifactBridgeInternal: ArtifactBridge {
// Waits to get an |artifact| for this object.
async fn get_artifact(&self);
// Returns bytes read from the registered artifact, or an error if the artifact is unset or the
// read fails.
async fn read_from_artifact(&self) -> Vec<u8>;
/// Returns a future that reads from a received artfact and writes to received sockets.
///
/// This future only completes on error or when the receiver channels are closed.
async fn forward_impl(&self, socket_receiver: mpsc::UnboundedReceiver<zx::Socket>) {
// Cap the number of outstanding, unexpired message that can be queued.
let (msg_sender, msg_receiver) = mpsc::channel::<Message>(0x1000);
self.get_artifact().await;
join!(self.recv(msg_sender), self.send(msg_receiver, socket_receiver));
}
/// Reads data from the artifact and encapsulate it as timestamped `Message`s.
async fn recv(&self, mut msg_sender: mpsc::Sender<Message>) {
loop {
let data = self.read_from_artifact().await;
if data.is_empty() {
break;
}
let num_read = data.len();
if let Err(e) = msg_sender.send(Message::new(data)).await {
if e.is_full() {
warn!("dropped output: {} bytes", num_read);
}
if e.is_disconnected() {
break;
}
}
}
}
/// Reads `Message`s and sends them to sockets as they become available.
async fn send(
&self,
mut msg_receiver: mpsc::Receiver<Message>,
socket_receiver: mpsc::UnboundedReceiver<zx::Socket>,
) {
let mut socket = None;
let socket_receiver_rc = Rc::new(RefCell::new(socket_receiver));
while let Some(msg) = msg_receiver.next().await {
let now = zx::Time::get_monotonic();
if msg.expiration < now {
continue;
}
// If there's no socket, try to get one up to the expiration of the current message.
if socket.is_none() {
let timer_fut = fasync::Timer::new(msg.expiration - now).fuse();
let socket_receiver = Rc::clone(&socket_receiver_rc);
let socket_fut = || async move {
let mut socket_receiver = socket_receiver.borrow_mut();
socket_receiver.next().await.and_then(|s| Some(fasync::Socket::from_socket(s)))
};
let socket_fut = socket_fut().fuse();
pin_mut!(timer_fut, socket_fut);
socket = select! {
_ = timer_fut => None,
s = socket_fut => s,
};
}
// Proactively try to replace the socket if one is available. Use a loop to get the most
// recently provided one.
loop {
let mut socket_receiver = socket_receiver_rc.borrow_mut();
match socket_receiver.try_next() {
Ok(Some(s)) => {
socket = Some(fasync::Socket::from_socket(s));
}
// Either the socket_receiver closed, or there's no sockets available.
Ok(None) => break,
Err(_) => break,
}
}
// Clients may reconnect, so if writing to the socket fails, keep reading from the
// artifact.
let result = match socket.as_ref() {
// Only way to get here is to have the msg expire or the socket_recevier close
// before receiving a usable socket,
None => continue,
Some(mut socket) => socket.write_all(&msg.data).await,
};
if let Err(e) = result {
warn!("failed to write to socket: {}", e);
socket = None;
}
}
}
}
/// Forwards stdio from a persistent socket to a replaceable one.
#[derive(Debug)]
pub struct SocketBridge {
artifact: RefCell<Option<fasync::Socket>>,
artifact_receiver: RefCell<mpsc::UnboundedReceiver<zx::Socket>>,
socket_receiver: RefCell<Option<mpsc::UnboundedReceiver<zx::Socket>>>,
}
impl SocketBridge {
fn new(
artifact_receiver: mpsc::UnboundedReceiver<zx::Socket>,
socket_receiver: mpsc::UnboundedReceiver<zx::Socket>,
) -> Self {
Self {
artifact: RefCell::new(None),
artifact_receiver: RefCell::new(artifact_receiver),
socket_receiver: RefCell::new(Some(socket_receiver)),
}
}
}
#[async_trait(?Send)]
impl ArtifactBridge for SocketBridge {
async fn forward(&self) {
let socket_receiver = self.socket_receiver.borrow_mut().take().unwrap();
self.forward_impl(socket_receiver).await
}
}
#[async_trait(?Send)]
impl ArtifactBridgeInternal for SocketBridge {
async fn get_artifact(&self) {
let mut artifact_receiver = self.artifact_receiver.borrow_mut();
if let Some(socket) = artifact_receiver.next().await {
let mut artifact = self.artifact.borrow_mut();
*artifact = Some(fasync::Socket::from_socket(socket));
}
}
async fn read_from_artifact(&self) -> Vec<u8> {
let mut data = Vec::new();
let mut artifact = self.artifact.borrow_mut();
if let Some(socket) = artifact.as_ref() {
if let Err(e) = socket.read_datagram(&mut data).await {
if e != zx::Status::PEER_CLOSED {
warn!("failed to read from socket: {}", e);
}
*artifact = None;
}
}
data
}
}
/// Forwards JSON-encoded syslogs from a `BatchIterator` client to a replaceable socket.
#[derive(Debug)]
pub struct LogBridge {
artifact: RefCell<Option<diagnostics::BatchIteratorProxy>>,
queue: RefCell<LinkedList<diagnostics::FormattedContent>>,
artifact_receiver:
RefCell<mpsc::UnboundedReceiver<ClientEnd<diagnostics::BatchIteratorMarker>>>,
socket_receiver: RefCell<Option<mpsc::UnboundedReceiver<zx::Socket>>>,
}
impl LogBridge {
fn new(
artifact_receiver: mpsc::UnboundedReceiver<ClientEnd<diagnostics::BatchIteratorMarker>>,
socket_receiver: mpsc::UnboundedReceiver<zx::Socket>,
) -> Self {
Self {
artifact: RefCell::new(None),
queue: RefCell::new(LinkedList::new()),
artifact_receiver: RefCell::new(artifact_receiver),
socket_receiver: RefCell::new(Some(socket_receiver)),
}
}
}
#[async_trait(?Send)]
impl ArtifactBridge for LogBridge {
async fn forward(&self) {
let socket_receiver = self.socket_receiver.borrow_mut().take().unwrap();
self.forward_impl(socket_receiver).await
}
}
#[async_trait(?Send)]
impl ArtifactBridgeInternal for LogBridge {
async fn get_artifact(&self) {
let mut artifact_receiver = self.artifact_receiver.borrow_mut();
if let Some(client_end) = artifact_receiver.next().await {
let mut artifact = self.artifact.borrow_mut();
*artifact = client_end.into_proxy().ok();
}
}
async fn read_from_artifact(&self) -> Vec<u8> {
let mut artifact = self.artifact.borrow_mut();
let mut queue = self.queue.borrow_mut();
if let Some(batch_iterator) = artifact.as_ref() {
if queue.is_empty() {
match batch_iterator.get_next().await {
Ok(Ok(batch)) => queue.extend(batch.into_iter()),
Ok(Err(e)) => warn!("BatchIterator returned error: {:?}", e),
Err(e) => warn!("BatchIterator FIDL failure: {}", e),
}
}
}
let buf = match queue.pop_front() {
Some(diagnostics::FormattedContent::Json(buf)) => buf,
Some(diagnostics::FormattedContent::Text(buf)) => buf,
Some(_) => unreachable!("unsupported FormattedContent"),
None => {
*artifact = None;
return Vec::new();
}
};
let mut data = vec![0; buf.size as usize];
if let Err(e) = buf.vmo.read(&mut data, 0) {
warn!("failed to read from VMO: {}", e);
}
data
}
}