blob: ecbf5e66e99e08b1723805b3e9954a9feb04b9c1 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{Context, Error};
use base64;
use fdio;
use fidl::endpoints::create_endpoints;
use fidl_fuchsia_media::*;
use fidl_fuchsia_virtualaudio;
use fuchsia_async as fasync;
use fuchsia_zircon as zx;
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::{select, StreamExt, TryFutureExt, TryStreamExt};
use parking_lot::RwLock;
use serde_json::{to_value, Value};
use std::convert::{TryFrom, TryInto};
use std::io::Write;
use std::sync::Arc;
use fuchsia_syslog::macros::*;
// Fixed configuration for our virtual output device.
const OUTPUT_SAMPLE_FORMAT: AudioSampleFormat = AudioSampleFormat::Signed16;
const OUTPUT_CHANNELS: u8 = 2;
const OUTPUT_FRAMES_PER_SECOND: u32 = 48000;
// Fixed configuration for our virtual input device.
const INPUT_SAMPLE_FORMAT: AudioSampleFormat = AudioSampleFormat::Signed16;
const INPUT_CHANNELS: u8 = 2;
const INPUT_FRAMES_PER_SECOND: u32 = 16000;
// Values found in:
// zircon/system/public/zircon/device/audio.h
const AUDIO_SAMPLE_FORMAT_8BIT: u32 = 1 << 1;
const AUDIO_SAMPLE_FORMAT_16BIT: u32 = 1 << 2;
const AUDIO_SAMPLE_FORMAT_24BIT_IN32: u32 = 1 << 7;
const AUDIO_SAMPLE_FORMAT_32BIT_FLOAT: u32 = 1 << 9;
const ASF_RANGE_FLAG_FPS_CONTINUOUS: u16 = 1 << 0;
// If this changes, so too must the astro audio_core_config.
const AUDIO_OUTPUT_ID: [u8; 16] = [0x01; 16];
fn get_sample_size(format: u32) -> Result<u32, Error> {
Ok(match format {
// These are the currently implemented formats.
AUDIO_SAMPLE_FORMAT_8BIT => 1,
AUDIO_SAMPLE_FORMAT_16BIT => 2,
AUDIO_SAMPLE_FORMAT_24BIT_IN32 => 4,
AUDIO_SAMPLE_FORMAT_32BIT_FLOAT => 4,
_ => return Err(format_err!("Cannot handle sample_format: {:?}", format)),
})
}
fn get_zircon_sample_format(format: AudioSampleFormat) -> u32 {
match format {
AudioSampleFormat::Signed16 => AUDIO_SAMPLE_FORMAT_16BIT,
AudioSampleFormat::Unsigned8 => AUDIO_SAMPLE_FORMAT_8BIT,
AudioSampleFormat::Signed24In32 => AUDIO_SAMPLE_FORMAT_24BIT_IN32,
AudioSampleFormat::Float => AUDIO_SAMPLE_FORMAT_32BIT_FLOAT,
// No default case, these are all the audio sample formats supported right now.
}
}
#[derive(Debug)]
enum ExtractMsg {
Start,
Stop { out_sender: mpsc::Sender<Vec<u8>> },
}
#[derive(Debug, Default)]
struct OutputWorker {
va_output: Option<fidl_fuchsia_virtualaudio::DeviceProxy>,
extracted_data: Vec<u8>,
vmo: Option<zx::Vmo>,
// Whether we should store samples when we receive notification from the VAD
capturing: bool,
// How much of the vmo's data we're actually using, in bytes.
work_space: u64,
// How often, in frames, we want to be updated on the state of the extraction ring buffer.
frames_per_notification: u64,
// How many bytes a frame is.
frame_size: u64,
// Offset into vmo where we'll start to read next, in bytes.
next_read: u64,
// Offset into vmo where we'll finish reading next, in bytes.
next_read_end: u64,
}
impl OutputWorker {
fn on_set_format(
&mut self,
frames_per_second: u32,
sample_format: u32,
num_channels: u32,
_external_delay: i64,
) -> Result<(), Error> {
let sample_size = get_sample_size(sample_format)?;
self.frame_size = u64::from(num_channels) * u64::from(sample_size);
let frames_per_millisecond = u64::from(frames_per_second / 1000);
self.frames_per_notification = frames_per_millisecond * 50;
fx_log_info!(
"AudioFacade::OutputWorker: configuring with {:?} fps, {:?} bpf",
frames_per_second,
self.frame_size
);
Ok(())
}
fn on_buffer_created(
&mut self,
ring_buffer: zx::Vmo,
num_ring_buffer_frames: u32,
_notifications_per_ring: u32,
) -> Result<(), Error> {
let va_output = self.va_output.as_mut().ok_or(format_err!("va_output not initialized"))?;
// Ignore AudioCore's notification cadence (_notifications_per_ring); set up our own.
let target_notifications_per_ring =
u64::from(num_ring_buffer_frames) / self.frames_per_notification;
let target_notifications_per_ring = target_notifications_per_ring.try_into()?;
va_output.set_notification_frequency(target_notifications_per_ring)?;
fx_log_info!(
"AudioFacade::OutputWorker: created buffer with {:?} frames, {:?} notifications",
num_ring_buffer_frames,
target_notifications_per_ring
);
self.work_space = u64::from(num_ring_buffer_frames) * self.frame_size;
// Start reading from the beginning.
self.next_read = 0;
self.next_read_end = 0;
self.vmo = Some(ring_buffer);
Ok(())
}
fn on_position_notify(
&mut self,
_monotonic_time: i64,
ring_position: u32,
capturing: bool,
) -> Result<(), Error> {
if capturing && self.next_read != self.next_read_end {
let vmo = if let Some(vmo) = &self.vmo { vmo } else { return Ok(()) };
fx_log_info!(
"AudioFacade::OutputWorker read byte {:?} to {:?}",
self.next_read,
self.next_read_end
);
if self.next_read_end < self.next_read {
// Wrap-around case, read through the end.
let len = (self.work_space - self.next_read).try_into()?;
let mut data = vec![0u8; len];
let overwrite1 = vec![1u8; len];
vmo.read(&mut data, self.next_read)?;
vmo.write(&overwrite1, self.next_read)?;
self.extracted_data.append(&mut data);
// Read remaining data.
let next_read_end = self.next_read_end.try_into()?;
let mut data = vec![0u8; next_read_end];
let overwrite2 = vec![1u8; next_read_end];
vmo.read(&mut data, 0)?;
vmo.write(&overwrite2, 0)?;
self.extracted_data.append(&mut data);
} else {
// Normal case, just read all the bytes.
let len = (self.next_read_end - self.next_read).try_into()?;
let mut data = vec![0u8; len];
let overwrite = vec![1u8; len];
vmo.read(&mut data, self.next_read)?;
vmo.write(&overwrite, self.next_read)?;
self.extracted_data.append(&mut data);
}
}
// We always stay 1 notification behind, since audio_core writes audio data into
// our shared buffer based on these same notifications. This avoids audio glitches.
self.next_read = self.next_read_end;
self.next_read_end = ring_position.into();
Ok(())
}
async fn run(
&mut self,
mut rx: mpsc::Receiver<ExtractMsg>,
va_output: fidl_fuchsia_virtualaudio::DeviceProxy,
) -> Result<(), Error> {
let mut output_events = va_output.take_event_stream();
self.va_output = Some(va_output);
// Monotonic timestamp returned by the most-recent OnStart/OnPositionNotify/OnStop response.
let mut last_timestamp = zx::Time::from_nanos(0);
// Observed monotonic time that OnStart/OnPositionNotify/OnStop messages actually arrived.
let mut last_event_time = zx::Time::from_nanos(0);
loop {
select! {
rx_msg = rx.next() => {
match rx_msg {
None => {
return Err(format_err!("Got None ExtractMsg Event, exiting worker"));
},
Some(ExtractMsg::Stop { mut out_sender }) => {
fx_log_info!("AudioFacade::OutputWorker: Stop capture");
self.capturing = false;
let mut ret_data = vec![0u8; 0];
ret_data.append(&mut self.extracted_data);
out_sender.try_send(ret_data)?;
}
Some(ExtractMsg::Start) => {
fx_log_info!("AudioFacade::OutputWorker: Start capture");
self.extracted_data.clear();
self.capturing = true;
}
}
},
output_msg = output_events.try_next() => {
match output_msg? {
None => {
return Err(format_err!("Got None DeviceEvent Message, exiting worker"));
},
Some(fidl_fuchsia_virtualaudio::DeviceEvent::OnSetFormat { frames_per_second, sample_format,
num_channels, external_delay}) => {
self.on_set_format(frames_per_second, sample_format, num_channels,
external_delay)?;
},
Some(fidl_fuchsia_virtualaudio::DeviceEvent::OnBufferCreated { ring_buffer, num_ring_buffer_frames,
notifications_per_ring }) => {
self.on_buffer_created(ring_buffer, num_ring_buffer_frames,
notifications_per_ring)?;
},
Some(fidl_fuchsia_virtualaudio::DeviceEvent::OnStart { start_time }) => {
if self.capturing && last_timestamp > zx::Time::from_nanos(0) {
fx_log_info!("AudioFacade::OutputWorker: Extraction OnPositionNotify received before OnStart");
}
last_timestamp = zx::Time::from_nanos(start_time);
last_event_time = zx::Time::get_monotonic();
},
Some(fidl_fuchsia_virtualaudio::DeviceEvent::OnStop { stop_time: _, ring_position: _ }) => {
if last_timestamp == zx::Time::from_nanos(0) {
fx_log_info!(
"AudioFacade::OutputWorker: Extraction OnPositionNotify timestamp cleared before OnStop");
}
last_timestamp = zx::Time::from_nanos(0);
last_event_time = zx::Time::from_nanos(0);
},
Some(fidl_fuchsia_virtualaudio::DeviceEvent::OnPositionNotify { monotonic_time, ring_position }) => {
let monotonic_zx_time = zx::Time::from_nanos(monotonic_time);
let now = zx::Time::get_monotonic();
// To minimize logspam, log glitches only when capturing.
if self.capturing {
if last_timestamp == zx::Time::from_nanos(0) {
fx_log_info!(
"AudioFacade::OutputWorker: Extraction OnStart not received before OnPositionNotify");
}
// Log if our timestamps had a gap of more than 100ms. This is highly
// abnormal and indicates possible glitching while receiving playback
// audio from the system and/or extracting it for analysis.
let timestamp_interval = monotonic_zx_time - last_timestamp;
if timestamp_interval > zx::Duration::from_millis(100) {
fx_log_info!(
"AudioFacade::OutputWorker: Extraction position timestamp jumped by more than 100ms ({:?}ms). Expect glitches.",
timestamp_interval.into_millis());
}
if monotonic_zx_time < last_timestamp {
fx_log_info!(
"AudioFacade::OutputWorker: Extraction position timestamp moved backwards ({:?}ms). Expect glitches.",
timestamp_interval.into_millis());
}
// Log if there was a gap in position notification arrivals of more
// than 150ms. This is highly abnormal and indicates possible glitching
// while receiving playback audio from the system and/or extracting it
// for analysis.
let observed_interval = now - last_event_time;
if observed_interval > zx::Duration::from_millis(150) {
fx_log_info!(
"AudioFacade::OutputWorker: Extraction position not updated for 150ms ({:?}ms). Expect glitches.",
observed_interval.into_millis());
}
}
last_timestamp = monotonic_zx_time;
last_event_time = now;
self.on_position_notify(monotonic_time, ring_position, self.capturing)?;
},
Some(evt) => {
fx_log_info!("AudioFacade::OutputWorker: Got unknown DeviceEvent {:?}", evt);
}
}
},
};
}
}
}
#[derive(Debug)]
struct VirtualOutput {
extracted_data: Vec<u8>,
capturing: Arc<Mutex<bool>>,
sample_format: AudioSampleFormat,
channels: u8,
frames_per_second: u32,
output_sender: Option<mpsc::Sender<ExtractMsg>>,
}
impl VirtualOutput {
pub fn new(
sample_format: AudioSampleFormat,
channels: u8,
frames_per_second: u32,
) -> Result<VirtualOutput, Error> {
Ok(VirtualOutput {
extracted_data: vec![],
capturing: Arc::new(Mutex::new(false)),
sample_format,
channels,
frames_per_second,
output_sender: None,
})
}
pub fn start_output(
&mut self,
vad_control: &fidl_fuchsia_virtualaudio::ControlSynchronousProxy,
) -> Result<(), Error> {
// set buffer size to be at least 1s.
let frames_1ms = self.frames_per_second / 1000;
let frames_low = 1000 * frames_1ms;
let frames_high = 2000 * frames_1ms;
let frames_modulo = 1 * frames_1ms;
let config = fidl_fuchsia_virtualaudio::Configuration {
unique_id: Some(AUDIO_OUTPUT_ID),
fifo_depth_bytes: Some(0),
external_delay: Some(0),
supported_formats: Some(vec![fidl_fuchsia_virtualaudio::FormatRange {
sample_format_flags: get_zircon_sample_format(self.sample_format),
min_frame_rate: self.frames_per_second,
max_frame_rate: self.frames_per_second,
min_channels: self.channels,
max_channels: self.channels,
rate_family_flags: ASF_RANGE_FLAG_FPS_CONTINUOUS,
}]),
ring_buffer_constraints: Some(fidl_fuchsia_virtualaudio::RingBufferConstraints {
min_frames: frames_low,
max_frames: frames_high,
modulo_frames: frames_modulo,
}),
..fidl_fuchsia_virtualaudio::Configuration::EMPTY
};
// Create the output.
let (va_output_client, va_output_server) =
create_endpoints::<fidl_fuchsia_virtualaudio::DeviceMarker>()?;
vad_control
.add_output(config, va_output_server, zx::Time::INFINITE)?
.map_err(|status| anyhow!("AddOutput returned error {:?}", status))?;
// Create a channel for handling requests.
let (tx, rx) = mpsc::channel(512);
fasync::Task::spawn(
async move {
let mut worker = OutputWorker::default();
let va_output = fidl_fuchsia_virtualaudio::DeviceProxy::new(
fasync::Channel::from_channel(va_output_client.into_channel())?,
);
worker.run(rx, va_output).await?;
Ok::<(), Error>(())
}
.unwrap_or_else(|e| eprintln!("Output extraction thread failed: {:?}", e)),
)
.detach();
self.output_sender = Some(tx);
Ok(())
}
#[allow(clippy::unused_io_amount)] // TODO(fxbug.dev/95034)
pub fn write_header(&mut self, len: u32) -> Result<(), Error> {
let bytes_per_sample = get_sample_size(get_zircon_sample_format(self.sample_format))?;
// 8 Bytes
self.extracted_data.write("RIFF".as_bytes())?;
self.extracted_data.write(&u32::to_le_bytes(len + 8 + 28 + 8))?;
// 28 bytes
self.extracted_data.write("WAVE".as_bytes())?; // wave_four_cc uint32
self.extracted_data.write("fmt ".as_bytes())?; // fmt_four_cc uint32
self.extracted_data.write(&u32::to_le_bytes(16))?; // fmt_chunk_len
self.extracted_data.write(&u16::to_le_bytes(1))?; // format
self.extracted_data.write(&u16::to_le_bytes(self.channels.into()))?;
self.extracted_data.write(&u32::to_le_bytes(self.frames_per_second))?;
let channels: u32 = self.channels.into();
self.extracted_data
.write(&u32::to_le_bytes(bytes_per_sample * channels * self.frames_per_second))?; // avg_byte_rate
self.extracted_data.write(&u16::to_le_bytes((bytes_per_sample * channels).try_into()?))?;
self.extracted_data.write(&u16::to_le_bytes((bytes_per_sample * 8).try_into()?))?;
// 8 bytes
self.extracted_data.write("data".as_bytes())?;
self.extracted_data.write(&u32::to_le_bytes(len))?;
Ok(())
}
}
#[derive(Debug, Default)]
struct InputWorker {
va_input: Option<fidl_fuchsia_virtualaudio::DeviceProxy>,
inj_data: Vec<u8>,
vmo: Option<zx::Vmo>,
// How much of the vmo's data we're actually using, in bytes.
work_space: usize,
// How many frames ahead of the position we want to be writing to, when writing.
target_frames: usize,
// How often, in frames, we want to be updated on the state of the injection ring buffer.
frames_per_notification: usize,
// How many bytes a frame is.
frame_size: usize,
// Next write pointer as a byte number.
// This is not an offset into vmo (it does not wrap around).
write_pointer: usize,
// Next ring buffer pointer as a byte number.
// This is not an offset into vmo (it does not wrap around).
ring_pointer: usize,
// Last relative ring buffer byte offset.
// This is an offset into vmo (it wraps around).
last_ring_offset: usize,
}
impl InputWorker {
fn write_to_vmo(&self, data: &[u8]) -> Result<(), Error> {
let vmo = if let Some(vmo) = &self.vmo { vmo } else { return Ok(()) };
let start = self.write_pointer % self.work_space;
let end = (self.write_pointer + data.len()) % self.work_space;
let start_u64 = start.try_into()?;
// Write in two chunks if we've wrapped around.
if end < start {
let split = self.work_space - start;
vmo.write(&data[0..split], start_u64)?;
vmo.write(&data[split..], 0)?;
} else {
vmo.write(&data, start_u64)?;
}
Ok(())
}
// Events from the sl4f facade
fn flush(&mut self) {
self.inj_data.clear();
}
#[allow(clippy::unused_io_amount)] // TODO(fxbug.dev/95034)
fn set_data(&mut self, data: Vec<u8>) -> Result<(), Error> {
if self.inj_data.len() > 0 {
return Err(format_err!("Cannot inject new audio without flushing old audio"));
}
// This is a bad assumption, wav headers can be many different sizes.
// 8 Bytes for riff header
// 28 bytes for wave fmt block
// 8 bytes for data header
self.inj_data.write(&data[44..])?;
fx_log_info!("AudioFacade::InputWorker: Injecting {:?} bytes", self.inj_data.len());
Ok(())
}
// Events from the Virtual Audio Device
fn on_set_format(
&mut self,
frames_per_second: u32,
sample_format: u32,
num_channels: u32,
_external_delay: i64,
) -> Result<(), Error> {
let sample_size = get_sample_size(sample_format)?;
self.frame_size = usize::try_from(num_channels)? * usize::try_from(sample_size)?;
let frames_per_millisecond = frames_per_second / 1000;
fx_log_info!(
"AudioFacade::InputWorker: configuring with {:?} fps, {:?} bpf",
frames_per_second,
self.frame_size
);
// We get notified every 50ms and write up to 250ms worth of data in the future.
// The gap between frames_per_notification and target_frames gives us slack to
// account for scheduling delays. Audio injection proceeds as follows:
//
// 1. When the buffer is created, the initial ring buffer position is "0ms".
// At that time, the ring buffer is zeroed and we pretend to write target_frames
// worth of silence to the ring buffer. This puts our write pointer ~250ms
// ahead of the ring buffer's safe read pointer.
//
// 2. We receive the first notification at time 50ms + scheduling delay. At this
// point we write data for the range 250ms - 300ms. As long as our scheduling
// delay is < 250ms, our writes will stay ahead of the ring buffer's safe read
// pointer. We assume that 250ms is more than enough time (this test framework
// should never be run in a debug or sanitizer build).
//
// 3. This continues ad infinitum.
//
// The sum of 250ms + 50ms must fit within the ring buffer. We currently use a 1s
// ring buffer: see VirtualInput.start_input.
//
self.target_frames = (250 * frames_per_millisecond).try_into()?;
self.frames_per_notification = (50 * frames_per_millisecond).try_into()?;
Ok(())
}
fn on_buffer_created(
&mut self,
ring_buffer: zx::Vmo,
num_ring_buffer_frames: u32,
_notifications_per_ring: u32,
) -> Result<(), Error> {
let va_input = self.va_input.as_mut().ok_or(format_err!("va_input not initialized"))?;
// Ignore AudioCore's notification cadence (_notifications_per_ring); set up our own.
let target_notifications_per_ring =
num_ring_buffer_frames / u32::try_from(self.frames_per_notification)?;
va_input.set_notification_frequency(target_notifications_per_ring)?;
// The buffer starts zeroed and our write pointer starts target_frames in the future.
self.work_space = usize::try_from(num_ring_buffer_frames)? * self.frame_size;
self.write_pointer = self.target_frames * self.frame_size;
self.last_ring_offset = 0;
self.vmo = Some(ring_buffer);
fx_log_info!(
"AudioFacade::InputWorker: created buffer with {:?} frames, {:?} notifications, {:?} target frames per write",
num_ring_buffer_frames,
target_notifications_per_ring,
self.target_frames
);
Ok(())
}
fn on_position_notify(&mut self, _monotonic_time: i64, ring_offset: u32) -> Result<(), Error> {
let ring_offset = ring_offset.try_into()?;
if ring_offset < self.last_ring_offset {
self.ring_pointer += self.work_space - self.last_ring_offset + ring_offset;
} else {
self.ring_pointer += ring_offset - self.last_ring_offset;
};
let next_write_pointer = self.ring_pointer + self.target_frames * self.frame_size;
let bytes_to_write = next_write_pointer - self.write_pointer;
// Next segment of inj_data.
if self.inj_data.len() > 0 {
let data_end = std::cmp::min(self.inj_data.len(), bytes_to_write);
self.write_to_vmo(&self.inj_data[0..data_end])?;
self.inj_data.drain(0..data_end);
self.write_pointer += data_end;
}
// Pad with zeroes.
if self.write_pointer < next_write_pointer {
let zeroes = vec![0; next_write_pointer - self.write_pointer];
self.write_to_vmo(&zeroes)?;
self.write_pointer = next_write_pointer;
}
self.last_ring_offset = ring_offset;
Ok(())
}
async fn run(
&mut self,
mut rx: mpsc::Receiver<InjectMsg>,
va_input: fidl_fuchsia_virtualaudio::DeviceProxy,
) -> Result<(), Error> {
let mut input_events = va_input.take_event_stream();
self.va_input = Some(va_input);
// Monotonic timestamp returned by the most-recent OnStart/OnPositionNotify/OnStop response.
let mut last_timestamp = zx::Time::from_nanos(0);
// Observed monotonic time that OnStart/OnPositionNotify/OnStop messages actually arrived.
let mut last_event_time = zx::Time::from_nanos(0);
loop {
select! {
rx_msg = rx.next() => {
match rx_msg {
None => {
return Err(format_err!("Got None InjectMsg Event, exiting worker"));
},
Some(InjectMsg::Flush) => {
self.flush();
}
Some(InjectMsg::Data { data }) => {
self.set_data(data)?;
}
}
},
input_msg = input_events.try_next() => {
match input_msg? {
None => {
return Err(format_err!("Got None DeviceEvent Message, exiting worker"));
},
Some(fidl_fuchsia_virtualaudio::DeviceEvent::OnSetFormat { frames_per_second, sample_format,
num_channels, external_delay}) => {
self.on_set_format(frames_per_second, sample_format, num_channels,
external_delay)?;
},
Some(fidl_fuchsia_virtualaudio::DeviceEvent::OnBufferCreated { ring_buffer, num_ring_buffer_frames,
notifications_per_ring }) => {
self.on_buffer_created(ring_buffer, num_ring_buffer_frames,
notifications_per_ring)?;
},
Some(fidl_fuchsia_virtualaudio::DeviceEvent::OnStart { start_time }) => {
if last_timestamp > zx::Time::from_nanos(0) {
fx_log_info!("AudioFacade::InputWorker: Injection OnPositionNotify received before OnStart");
}
last_timestamp = zx::Time::from_nanos(start_time);
last_event_time = zx::Time::get_monotonic();
},
Some(fidl_fuchsia_virtualaudio::DeviceEvent::OnStop { stop_time: _, ring_position: _ }) => {
if last_timestamp == zx::Time::from_nanos(0) {
fx_log_info!("AudioFacade::InputWorker: Injection OnPositionNotify timestamp cleared before OnStop");
}
last_timestamp = zx::Time::from_nanos(0);
last_event_time = zx::Time::from_nanos(0);
},
Some(fidl_fuchsia_virtualaudio::DeviceEvent::OnPositionNotify { monotonic_time, ring_position }) => {
let monotonic_zx_time = zx::Time::from_nanos(monotonic_time);
let now = zx::Time::get_monotonic();
// To minimize logspam, log glitches only when writing audio.
if self.inj_data.len() > 0 {
if last_timestamp == zx::Time::from_nanos(0) {
fx_log_info!("AudioFacade::InputWorker: Injection OnStart not received before OnPositionNotify");
}
// Log if our timestamps had a gap of more than 100ms. This is highly
// abnormal and indicates possible glitching while receiving audio to
// be injected and/or providing it to the system.
let timestamp_interval = monotonic_zx_time - last_timestamp;
if timestamp_interval > zx::Duration::from_millis(100) {
fx_log_info!("AudioFacade::InputWorker: Injection position timestamp jumped by more than 100ms ({:?}ms). Expect glitches.",
timestamp_interval.into_millis());
}
if monotonic_zx_time < last_timestamp {
fx_log_info!("AudioFacade::InputWorker: Injection position timestamp moved backwards ({:?}ms). Expect glitches.",
timestamp_interval.into_millis());
}
// Log if there was a gap in position notification arrivals of more
// than 150ms. This is highly abnormal and indicates possible glitching
// while receiving audio to be injected and/or providing it to the
// system.
let observed_interval = now - last_event_time;
if observed_interval > zx::Duration::from_millis(150) {
fx_log_info!("AudioFacade::InputWorker: Injection position not updated for 150ms ({:?}ms). Expect glitches.",
observed_interval.into_millis());
}
}
last_timestamp = monotonic_zx_time;
last_event_time = now;
self.on_position_notify(monotonic_time, ring_position)?;
},
Some(evt) => {
fx_log_info!("AudioFacade::InputWorker: Got unknown DeviceEvent {:?}", evt);
}
}
},
};
}
}
}
#[derive(Debug)]
struct VirtualInput {
injection_data: Vec<Vec<u8>>,
have_data: bool,
sample_format: AudioSampleFormat,
channels: u8,
frames_per_second: u32,
input_sender: Option<mpsc::Sender<InjectMsg>>,
}
impl VirtualInput {
fn new(sample_format: AudioSampleFormat, channels: u8, frames_per_second: u32) -> Self {
VirtualInput {
injection_data: vec![],
have_data: false,
sample_format,
channels,
frames_per_second,
input_sender: None,
}
}
pub fn start_input(
&mut self,
vad_control: &fidl_fuchsia_virtualaudio::ControlSynchronousProxy,
) -> Result<(), Error> {
// set buffer size to be at least 1s.
let frames_1ms = self.frames_per_second / 1000;
let frames_low = 1000 * frames_1ms;
let frames_high = 2000 * frames_1ms;
let frames_modulo = 1 * frames_1ms;
let config = fidl_fuchsia_virtualaudio::Configuration {
fifo_depth_bytes: Some(0),
external_delay: Some(0),
supported_formats: Some(vec![fidl_fuchsia_virtualaudio::FormatRange {
sample_format_flags: get_zircon_sample_format(self.sample_format),
min_frame_rate: self.frames_per_second,
max_frame_rate: self.frames_per_second,
min_channels: self.channels,
max_channels: self.channels,
rate_family_flags: ASF_RANGE_FLAG_FPS_CONTINUOUS,
}]),
ring_buffer_constraints: Some(fidl_fuchsia_virtualaudio::RingBufferConstraints {
min_frames: frames_low,
max_frames: frames_high,
modulo_frames: frames_modulo,
}),
..fidl_fuchsia_virtualaudio::Configuration::EMPTY
};
// Create the input.
let (va_input_client, va_input_server) =
create_endpoints::<fidl_fuchsia_virtualaudio::DeviceMarker>()?;
vad_control
.add_input(config, va_input_server, zx::Time::INFINITE)?
.map_err(|status| anyhow!("AddInput returned error {:?}", status))?;
// Create a channel for handling requests.
let (tx, rx) = mpsc::channel(512);
fasync::Task::spawn(
async move {
let mut worker = InputWorker::default();
let va_input = fidl_fuchsia_virtualaudio::DeviceProxy::new(
fasync::Channel::from_channel(va_input_client.into_channel())?,
);
worker.run(rx, va_input).await?;
Ok::<(), Error>(())
}
.unwrap_or_else(|e| eprintln!("Input injection thread failed: {:?}", e)),
)
.detach();
self.input_sender = Some(tx);
Ok(())
}
pub fn play(&mut self, index: usize) -> Result<(), Error> {
let sender =
self.input_sender.as_mut().ok_or(format_err!("input_sender not initialized"))?;
sender.try_send(InjectMsg::Flush)?;
sender.try_send(InjectMsg::Data { data: self.injection_data[index].clone() })?;
Ok(())
}
pub fn stop(&mut self) -> Result<(), Error> {
let sender =
self.input_sender.as_mut().ok_or(format_err!("input_sender not initialized"))?;
sender.try_send(InjectMsg::Flush)?;
Ok(())
}
}
#[derive(Debug)]
enum InjectMsg {
Flush,
Data { data: Vec<u8> },
}
/// Perform Audio operations.
///
/// Note this object is shared among all threads created by server.
#[derive(Debug)]
pub struct AudioFacade {
vad_control: fidl_fuchsia_virtualaudio::ControlSynchronousProxy,
audio_output: RwLock<VirtualOutput>,
audio_input: RwLock<VirtualInput>,
initialized: Mutex<bool>,
}
impl AudioFacade {
async fn ensure_initialized(&self) -> Result<(), Error> {
let mut initialized = self.initialized.lock().await;
if !*(initialized) {
// Make sure there are no other virtual devices to ensure that audio_core
// will connect to our new virtual devices.
self.vad_control.remove_all(zx::Time::INFINITE)?;
self.audio_input.write().start_input(&self.vad_control)?;
self.audio_output.write().start_output(&self.vad_control)?;
*(initialized) = true;
}
Ok(())
}
pub fn new() -> Result<AudioFacade, Error> {
// Connect to the virtual audio control service.
let (control_client, control_server) = zx::Channel::create()?;
fdio::service_connect(fidl_fuchsia_virtualaudio::CONTROL_NODE_NAME, control_server)
.context(format!(
"failed to connect to '{}'",
fidl_fuchsia_virtualaudio::CONTROL_NODE_NAME
))?;
let vad_control = fidl_fuchsia_virtualaudio::ControlSynchronousProxy::new(control_client);
// The input and output devices are initialized lazily.
let audio_output = RwLock::new(VirtualOutput::new(
OUTPUT_SAMPLE_FORMAT,
OUTPUT_CHANNELS,
OUTPUT_FRAMES_PER_SECOND,
)?);
let audio_input = RwLock::new(VirtualInput::new(
INPUT_SAMPLE_FORMAT,
INPUT_CHANNELS,
INPUT_FRAMES_PER_SECOND,
));
let initialized = Mutex::new(false);
Ok(AudioFacade { vad_control, audio_output, audio_input, initialized })
}
pub async fn start_output_save(&self) -> Result<Value, Error> {
self.ensure_initialized().await?;
let capturing = self.audio_output.read().capturing.clone();
let mut capturing = capturing.lock().await;
if !*capturing {
let mut write = self.audio_output.write();
let sender = write
.output_sender
.as_mut()
.ok_or(format_err!("Failed unwrapping output sender"))?;
sender.try_send(ExtractMsg::Start)?;
*(capturing) = true;
Ok(to_value(true)?)
} else {
return Err(format_err!("Cannot StartOutputSave, already started."));
}
}
pub async fn stop_output_save(&self) -> Result<Value, Error> {
self.ensure_initialized().await?;
let capturing = self.audio_output.read().capturing.clone();
let mut capturing = capturing.lock().await;
if *capturing {
let mut write = self.audio_output.write();
write.extracted_data.clear();
let sender = write
.output_sender
.as_mut()
.ok_or(format_err!("Failed unwrapping output sender"))?;
let (tx, mut rx) = mpsc::channel(512);
sender.try_send(ExtractMsg::Stop { out_sender: tx })?;
let mut saved_audio = rx
.next()
.await
.ok_or_else(|| format_err!("StopOutputSave failed, could not retrieve data."))?;
let len = saved_audio.len().try_into()?;
write.write_header(len)?;
write.extracted_data.append(&mut saved_audio);
*(capturing) = false;
Ok(to_value(true)?)
} else {
return Err(format_err!("Cannot StopOutputSave, not started."));
}
}
pub async fn get_output_audio(&self) -> Result<Value, Error> {
self.ensure_initialized().await?;
let capturing = self.audio_output.read().capturing.clone();
let capturing = capturing.lock().await;
if !*capturing {
Ok(to_value(base64::encode(&self.audio_output.read().extracted_data))?)
} else {
return Err(format_err!("GetOutputAudio failed, still saving."));
}
}
pub async fn put_input_audio(&self, args: Value) -> Result<Value, Error> {
self.ensure_initialized().await?;
let data = args.get("data").ok_or(format_err!("PutInputAudio failed, no data"))?;
let data = data.as_str().ok_or(format_err!("PutInputAudio failed, data not string"))?;
let mut wave_data_vec = base64::decode(data)?;
let sample_index = args["index"].as_u64().ok_or(format_err!("index not a number"))?;
let sample_index = sample_index.try_into()?;
// TODO(perley): check wave format for correct bits per sample and float/int.
let byte_cnt = wave_data_vec.len();
{
let mut write = self.audio_input.write();
// Make sure we have somewhere to store the wav data.
if write.injection_data.len() <= sample_index {
write.injection_data.resize(sample_index + 1, vec![]);
}
write.injection_data[sample_index].clear();
write.injection_data[sample_index].append(&mut wave_data_vec);
write.have_data = true;
}
Ok(to_value(byte_cnt)?)
}
pub async fn start_input_injection(&self, args: Value) -> Result<Value, Error> {
self.ensure_initialized().await?;
{
let sample_index = args["index"].as_u64().ok_or(format_err!("index not a number"))?;
let sample_index = sample_index.try_into()?;
if !self.audio_input.read().have_data {
return Err(format_err!("StartInputInjection failed, no Audio data to inject."));
}
self.audio_input.write().play(sample_index)?;
}
Ok(to_value(true)?)
}
pub async fn stop_input_injection(&self) -> Result<Value, Error> {
self.ensure_initialized().await?;
self.audio_input.write().stop()?;
Ok(to_value(true)?)
}
}