blob: 2d14f2f6ab9c1e9a5c700a72339fc6a70a29397c [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{anyhow, Error};
use blocking::Unblock;
use fidl::{endpoints::Proxy, Socket};
use fidl_fuchsia_audio_controller as fac;
use futures::{future::Either, AsyncReadExt, AsyncWrite, FutureExt, TryFutureExt};
use serde::{Deserialize, Serialize};
use std::io::{BufRead, ErrorKind};
pub mod ffxtool;
#[derive(Debug, Serialize, PartialEq, Deserialize)]
pub struct PlayResult {
pub bytes_processed: Option<u64>,
#[derive(Debug, Serialize, PartialEq, Deserialize)]
pub struct RecordResult {
pub bytes_processed: Option<u64>,
pub packets_processed: Option<u64>,
pub late_wakeups: Option<u64>,
pub async fn play(
request: fac::PlayerPlayRequest,
controller: fac::PlayerProxy,
play_local: Socket, // Send data from ffx to target.
input_reader: Box<dyn std::io::Read + std::marker::Send + 'static>,
// Input generalized to stdin or test buffer. Forward to socket.
) -> Result<PlayResult, Error>
let (play_result, _stdin_reader_result) = futures::future::try_join(|e| anyhow!("future try join failed with error {e}")),
async move {
let mut socket_writer = fidl::AsyncSocket::from_socket(play_local);
let stdin_res = futures::io::copy(Unblock::new(input_reader), &mut socket_writer).await;
// Close ffx end of socket so that daemon end reads EOF and stops waiting for data.
stdin_res.map_err(|e| anyhow!("Error stdin: {}", e))
.map(|result| PlayResult { bytes_processed: result.bytes_processed })
.map_err(|e| anyhow!("Play failed with error {:?}", e))
pub async fn record<W>(
controller: fac::RecorderProxy,
request: fac::RecorderRecordRequest,
record_local: fidl::Socket,
mut output_writer: W, // Output generalized to stdout or a test buffer. Forward data
// from daemon to this writer.
keypress_handler: impl futures::Future<Output = Result<(), std::io::Error>>,
) -> Result<RecordResult, Error>
W: AsyncWrite + std::marker::Unpin,
let response_fut =
{ controller.record(request).map_err(|e| anyhow!("Controller record failure: {:?}", e)) };
let wav_fut = {
futures::io::copy(fidl::AsyncSocket::from_socket(record_local), &mut output_writer)
.map_err(|e| anyhow!("wav output failure: {:?}", e))
let (record_result, _keypress_result, _socket_result) = futures::future::try_join3(
keypress_handler.map_err(|e| anyhow!("Failed to wait for keypress: {e}")),
.map(|result| RecordResult {
bytes_processed: result.bytes_processed,
packets_processed: result.packets_processed,
late_wakeups: result.late_wakeups,
.map_err(|e| anyhow!("Record request failed with error {:?}", e))
pub fn get_stdin_waiter() -> futures::future::BoxFuture<'static, Result<(), std::io::Error>> {
blocking::unblock(move || {
let _ = std::io::stdin().lock().read_line(&mut String::new());
fn maybe_to_str(maybe_bytes: Option<u64>) -> String {
maybe_bytes.map_or("Unavailable".to_owned(), |s| ToString::to_string(&s))
pub fn format_record_result(result: Result<RecordResult, Error>) -> String {
match result {
Ok(result) => {
"Successfully recorded {} bytes of audio. \nPackets processed: {} \nLate wakeups: {}",
Err(e) => {
format!("Record failed with error {:?}", e)
pub async fn cancel_on_keypress(
proxy: fac::RecordCancelerProxy,
input_waiter: impl futures::Future<Output = Result<(), std::io::Error>>
+ futures::future::FusedFuture,
) -> Result<(), std::io::Error> {
let closed_fut = async {
let _ = proxy.on_closed().await;
futures::pin_mut!(closed_fut, input_waiter);
if let Either::Right(_) = futures::future::select(closed_fut, input_waiter).await {
match proxy.cancel().await {
Err(e) => return Err(std::io::Error::new(ErrorKind::Other, format!("FIDL error {e}"))),
Ok(Err(e)) => {
return Err(std::io::Error::new(ErrorKind::Other, format!("Canceler error {e}")))
Ok(_res) => return Ok(()),
pub mod tests {
use super::*;
use fuchsia_audio::stop_listener;
use futures::AsyncWriteExt;
use timeout::timeout;
// Header for an infinite-length audio stream: 16 kHz, 3-channel, 16-bit signed.
// This slice contains a complete WAVE_FORMAT_EXTENSIBLE file header
// (everything except the audio data itself), which includes the 'fmt ' chunk
// and the first two fields of the 'data' chunk. After this, the next bytes
// in this stream would be the audio itself (in 6-byte frames).
pub const WAV_HEADER_EXT: &'static [u8; 83] = b"\x52\x49\x46\x46\xff\xff\xff\xff
// Complete WAV file (140 bytes in all) for a 48 kHz, 1-channel, unsigned 8-bit
// audio stream. The file is the old-style PCMWAVEFORMAT and contains 96 frames
// of a sinusoid of approx. 439 Hz.
pub const SINE_WAV: &'static [u8; 140] = b"\
pub fn fake_audio_player() -> fac::PlayerProxy {
let callback = |req| match req {
fac::PlayerRequest::Play { payload, responder } => {
let data_socket =
payload.wav_source.ok_or(anyhow!("Socket argument missing.")).unwrap();
let mut socket = fidl::AsyncSocket::from_socket(data_socket);
let mut wav_file = vec![0u8; 11];
fuchsia_async::Task::local(async move {
let _ = socket.read_exact(&mut wav_file).await.unwrap();
// Pass back a fake value for total bytes processed.
// ffx tests are focused on the interaction between ffx and the user,
// so controller specific functionality can be covered by tests targeting
// it specifically.
let response =
fac::PlayerPlayResponse { bytes_processed: Some(1), ..Default::default() };
_ => unimplemented!(),
pub fn fake_audio_recorder() -> fac::RecorderProxy {
let callback = |req| match req {
fac::RecorderRequest::Record { payload, responder } => {
let wav_socket =
payload.wav_data.ok_or(anyhow!("Socket argument missing.")).unwrap();
fuchsia_async::Task::local(async move {
let mut socket = fidl::AsyncSocket::from_socket(wav_socket);
// Let either the duration run out and close the cancel channel or
// respond to cancel FIDL request.
fuchsia_async::Task::local(async move {
if let Some(cancel_server) = payload.canceler {
if let Some(duration_nanos) = payload.duration {
// Duration provided, after duration passes we need to close the
// server channel and sockets by dropping them.
let _ = timeout(
async {},
} else {
// No duration provided, should receive a cancel() FIDL message.
let stop_signal = std::sync::atomic::AtomicBool::new(false);
let _ = stop_listener(cancel_server, &stop_signal).await;
assert_eq!(stop_signal.load(std::sync::atomic::Ordering::SeqCst), true);
let _ = responder.send(Ok(fac::RecorderRecordResponse {
// Calculating bytes & packets processed can be tested in controller tests.
// ffx tests are for I/O and command line validation.
bytes_processed: Some(123),
packets_processed: Some(123),
late_wakeups: None,
_ => unimplemented!(),