| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use anyhow::Error; |
| use base64; |
| use byteorder::{LittleEndian, WriteBytesExt}; |
| use fidl_fuchsia_media::*; |
| use fidl_fuchsia_virtualaudio::*; |
| use fuchsia_async as fasync; |
| use fuchsia_component as app; |
| use fuchsia_zircon as zx; |
| use futures::channel::mpsc; |
| use futures::lock::Mutex; |
| use futures::{select, StreamExt, TryFutureExt, TryStreamExt}; |
| use parking_lot::RwLock; |
| use serde_json::{to_value, Value}; |
| use std::io::Write; |
| use std::sync::Arc; |
| |
| use fuchsia_syslog::macros::*; |
| |
| // Values found in: |
| // zircon/system/public/zircon/device/audio.h |
| const AUDIO_SAMPLE_FORMAT_8BIT: u32 = 1 << 1; |
| const AUDIO_SAMPLE_FORMAT_16BIT: u32 = 1 << 2; |
| const AUDIO_SAMPLE_FORMAT_24BIT_IN32: u32 = 1 << 7; |
| const AUDIO_SAMPLE_FORMAT_32BIT_FLOAT: u32 = 1 << 9; |
| |
| const ASF_RANGE_FLAG_FPS_CONTINUOUS: u16 = 1 << 0; |
| |
| // If this changes, so too must the astro audio_core_config. |
| const AUDIO_OUTPUT_ID: [u8; 16] = [0x01; 16]; |
| |
| fn get_sample_size(format: u32) -> Result<usize, Error> { |
| Ok(match format { |
| // These are the currently implemented formats. |
| AUDIO_SAMPLE_FORMAT_8BIT => 1, |
| AUDIO_SAMPLE_FORMAT_16BIT => 2, |
| AUDIO_SAMPLE_FORMAT_24BIT_IN32 => 4, |
| AUDIO_SAMPLE_FORMAT_32BIT_FLOAT => 4, |
| _ => return Err(format_err!("Cannot handle sample_format: {:?}", format)), |
| }) |
| } |
| |
| fn get_zircon_sample_format(format: AudioSampleFormat) -> u32 { |
| match format { |
| AudioSampleFormat::Signed16 => AUDIO_SAMPLE_FORMAT_16BIT, |
| AudioSampleFormat::Unsigned8 => AUDIO_SAMPLE_FORMAT_8BIT, |
| AudioSampleFormat::Signed24In32 => AUDIO_SAMPLE_FORMAT_24BIT_IN32, |
| AudioSampleFormat::Float => AUDIO_SAMPLE_FORMAT_32BIT_FLOAT, |
| // No default case, these are all the audio sample formats supported right now. |
| } |
| } |
| |
| #[derive(Debug)] |
| enum ExtractMsg { |
| Start, |
| Stop { out_sender: mpsc::Sender<Vec<u8>> }, |
| } |
| |
| #[derive(Debug, Default)] |
| struct OutputWorker { |
| va_output: Option<OutputProxy>, |
| extracted_data: Vec<u8>, |
| vmo: Option<zx::Vmo>, |
| |
| // Whether we should store samples when we receive notification from the VAD |
| capturing: bool, |
| |
| // How much of the vmo's data we're actually using, in bytes. |
| work_space: u64, |
| |
| // How often, in frames, we want to be updated on the state of the extraction ring buffer. |
| frames_per_notification: u64, |
| |
| // How many bytes a frame is. |
| frame_size: u64, |
| |
| // Offset into vmo where we'll start to read next, in bytes. |
| next_read: u64, |
| |
| // Offset into vmo where we'll finish reading next, in bytes. |
| next_read_end: u64, |
| } |
| |
| impl OutputWorker { |
| fn on_set_format( |
| &mut self, |
| frames_per_second: u32, |
| sample_format: u32, |
| num_channels: u32, |
| _external_delay: i64, |
| ) -> Result<(), Error> { |
| let sample_size = get_sample_size(sample_format)?; |
| self.frame_size = num_channels as u64 * sample_size as u64; |
| |
| let frames_per_millisecond = frames_per_second as u64 / 1000; |
| self.frames_per_notification = 50 * frames_per_millisecond; |
| |
| fx_log_info!( |
| "AudioFacade::OutputWorker: configuring with {:?} fps, {:?} bpf", |
| frames_per_second, |
| self.frame_size |
| ); |
| |
| Ok(()) |
| } |
| |
| fn on_buffer_created( |
| &mut self, |
| ring_buffer: zx::Vmo, |
| num_ring_buffer_frames: u32, |
| _notifications_per_ring: u32, |
| ) -> Result<(), Error> { |
| let va_output = self.va_output.as_mut().ok_or(format_err!("va_output not initialized"))?; |
| |
| // Ignore AudioCore's notification cadence (_notifications_per_ring); set up our own. |
| let target_notifications_per_ring = |
| num_ring_buffer_frames as u64 / self.frames_per_notification; |
| |
| va_output.set_notification_frequency(target_notifications_per_ring as u32)?; |
| fx_log_info!( |
| "AudioFacade::OutputWorker: created buffer with {:?} frames, {:?} notifications", |
| num_ring_buffer_frames, |
| target_notifications_per_ring |
| ); |
| |
| self.work_space = num_ring_buffer_frames as u64 * self.frame_size; |
| |
| // Start reading from the beginning. |
| self.next_read = 0; |
| self.next_read_end = 0; |
| |
| self.vmo = Some(ring_buffer); |
| Ok(()) |
| } |
| |
| fn on_position_notify( |
| &mut self, |
| _monotonic_time: i64, |
| ring_position: u32, |
| capturing: bool, |
| ) -> Result<(), Error> { |
| if capturing && self.next_read != self.next_read_end { |
| let vmo = if let Some(vmo) = &self.vmo { vmo } else { return Ok(()) }; |
| |
| fx_log_info!( |
| "AudioFacade::OutputWorker read byte {:?} to {:?}", |
| self.next_read, |
| self.next_read_end |
| ); |
| |
| if (self.next_read_end as u64) < self.next_read { |
| // Wrap-around case, read through the end. |
| let mut data = vec![0u8; (self.work_space - self.next_read) as usize]; |
| let overwrite1 = vec![1u8; (self.work_space - self.next_read) as usize]; |
| vmo.read(&mut data, self.next_read)?; |
| vmo.write(&overwrite1, self.next_read)?; |
| self.extracted_data.append(&mut data); |
| |
| // Read remaining data. |
| let mut data = vec![0u8; self.next_read_end as usize]; |
| let overwrite2 = vec![1u8; self.next_read_end as usize]; |
| vmo.read(&mut data, 0)?; |
| vmo.write(&overwrite2, 0)?; |
| |
| self.extracted_data.append(&mut data); |
| } else { |
| // Normal case, just read all the bytes. |
| let mut data = vec![0u8; (self.next_read_end - self.next_read) as usize]; |
| let overwrite = vec![1u8; (self.next_read_end - self.next_read) as usize]; |
| vmo.read(&mut data, self.next_read)?; |
| vmo.write(&overwrite, self.next_read)?; |
| |
| self.extracted_data.append(&mut data); |
| } |
| } |
| // We always stay 1 notification behind, since audio_core writes audio data into |
| // our shared buffer based on these same notifications. This avoids audio glitches. |
| self.next_read = self.next_read_end; |
| self.next_read_end = ring_position as u64; |
| Ok(()) |
| } |
| |
| async fn run( |
| &mut self, |
| mut rx: mpsc::Receiver<ExtractMsg>, |
| va_output: OutputProxy, |
| ) -> Result<(), Error> { |
| let mut output_events = va_output.take_event_stream(); |
| self.va_output = Some(va_output); |
| |
| // Monotonic timestamp returned by the most-recent OnStart/OnPositionNotify/OnStop response. |
| let mut last_timestamp = zx::Time::from_nanos(0); |
| // Observed monotonic time that OnStart/OnPositionNotify/OnStop messages actually arrived. |
| let mut last_event_time = zx::Time::from_nanos(0); |
| |
| loop { |
| select! { |
| rx_msg = rx.next() => { |
| match rx_msg { |
| None => { |
| return Err(format_err!("Got None ExtractMsg Event, exiting worker")); |
| }, |
| Some(ExtractMsg::Stop { mut out_sender }) => { |
| fx_log_info!("AudioFacade::OutputWorker: Stop capture"); |
| self.capturing = false; |
| let mut ret_data = vec![0u8; 0]; |
| |
| ret_data.append(&mut self.extracted_data); |
| |
| out_sender.try_send(ret_data)?; |
| } |
| Some(ExtractMsg::Start) => { |
| fx_log_info!("AudioFacade::OutputWorker: Start capture"); |
| self.extracted_data.clear(); |
| self.capturing = true; |
| } |
| } |
| }, |
| output_msg = output_events.try_next() => { |
| match output_msg? { |
| None => { |
| return Err(format_err!("Got None OutputEvent Message, exiting worker")); |
| }, |
| Some(OutputEvent::OnSetFormat { frames_per_second, sample_format, |
| num_channels, external_delay}) => { |
| self.on_set_format(frames_per_second, sample_format, num_channels, |
| external_delay)?; |
| }, |
| Some(OutputEvent::OnBufferCreated { ring_buffer, num_ring_buffer_frames, |
| notifications_per_ring }) => { |
| self.on_buffer_created(ring_buffer, num_ring_buffer_frames, |
| notifications_per_ring)?; |
| }, |
| Some(OutputEvent::OnStart { start_time }) => { |
| if self.capturing && last_timestamp > zx::Time::from_nanos(0) { |
| fx_log_info!("AudioFacade::OutputWorker: Extraction OnPositionNotify received before OnStart"); |
| } |
| last_timestamp = zx::Time::from_nanos(start_time); |
| last_event_time = zx::Time::get_monotonic(); |
| }, |
| Some(OutputEvent::OnStop { stop_time, ring_position }) => { |
| if last_timestamp == zx::Time::from_nanos(0) { |
| fx_log_info!( |
| "AudioFacade::OutputWorker: Extraction OnPositionNotify timestamp cleared before OnStop"); |
| } |
| last_timestamp = zx::Time::from_nanos(0); |
| last_event_time = zx::Time::from_nanos(0); |
| }, |
| Some(OutputEvent::OnPositionNotify { monotonic_time, ring_position }) => { |
| let monotonic_zx_time = zx::Time::from_nanos(monotonic_time); |
| let now = zx::Time::get_monotonic(); |
| |
| // To minimize logspam, log glitches only when capturing. |
| if self.capturing { |
| if last_timestamp == zx::Time::from_nanos(0) { |
| fx_log_info!( |
| "AudioFacade::OutputWorker: Extraction OnStart not received before OnPositionNotify"); |
| } |
| |
| // Log if our timestamps had a gap of more than 100ms. This is highly |
| // abnormal and indicates possible glitching while receiving playback |
| // audio from the system and/or extracting it for analysis. |
| let timestamp_interval = monotonic_zx_time - last_timestamp; |
| if timestamp_interval > zx::Duration::from_millis(100) { |
| fx_log_info!( |
| "AudioFacade::OutputWorker: Extraction position timestamp jumped by more than 100ms ({:?}ms). Expect glitches.", |
| timestamp_interval.into_millis()); |
| } |
| if monotonic_zx_time < last_timestamp { |
| fx_log_info!( |
| "AudioFacade::OutputWorker: Extraction position timestamp moved backwards ({:?}ms). Expect glitches.", |
| timestamp_interval.into_millis()); |
| } |
| |
| // Log if there was a gap in position notification arrivals of more |
| // than 150ms. This is highly abnormal and indicates possible glitching |
| // while receiving playback audio from the system and/or extracting it |
| // for analysis. |
| let observed_interval = now - last_event_time; |
| if observed_interval > zx::Duration::from_millis(150) { |
| fx_log_info!( |
| "AudioFacade::OutputWorker: Extraction position not updated for 150ms ({:?}ms). Expect glitches.", |
| observed_interval.into_millis()); |
| } |
| } |
| |
| last_timestamp = monotonic_zx_time; |
| last_event_time = now; |
| |
| self.on_position_notify(monotonic_time, ring_position, self.capturing)?; |
| }, |
| Some(evt) => { |
| fx_log_info!("AudioFacade::OutputWorker: Got unknown OutputEvent {:?}", evt); |
| } |
| } |
| }, |
| }; |
| } |
| } |
| } |
| |
| #[derive(Debug)] |
| struct VirtualOutput { |
| extracted_data: Vec<u8>, |
| capturing: Arc<Mutex<bool>>, |
| have_data: bool, |
| |
| sample_format: AudioSampleFormat, |
| channels: u8, |
| frames_per_second: u32, |
| |
| output: Option<OutputProxy>, |
| output_sender: Option<mpsc::Sender<ExtractMsg>>, |
| } |
| |
| impl VirtualOutput { |
| pub fn new( |
| sample_format: AudioSampleFormat, |
| channels: u8, |
| frames_per_second: u32, |
| ) -> Result<VirtualOutput, Error> { |
| Ok(VirtualOutput { |
| extracted_data: vec![], |
| capturing: Arc::new(Mutex::new(false)), |
| have_data: false, |
| |
| sample_format, |
| channels, |
| frames_per_second, |
| |
| output: None, |
| output_sender: None, |
| }) |
| } |
| |
| pub fn start_output(&mut self) -> Result<(), Error> { |
| let va_output = app::client::connect_to_service::<OutputMarker>()?; |
| va_output.clear_format_ranges()?; |
| va_output.set_fifo_depth(0)?; |
| va_output.set_external_delay(0)?; |
| va_output.set_unique_id(&mut AUDIO_OUTPUT_ID.clone())?; |
| |
| let sample_format = get_zircon_sample_format(self.sample_format); |
| va_output.add_format_range( |
| sample_format as u32, |
| self.frames_per_second, |
| self.frames_per_second, |
| self.channels, |
| self.channels, |
| ASF_RANGE_FLAG_FPS_CONTINUOUS, |
| )?; |
| |
| // set buffer size to be at least 1s. |
| let frames_1ms = self.frames_per_second / 1000; |
| let frames_low = 1000 * frames_1ms; |
| let frames_high = 2000 * frames_1ms; |
| let frames_modulo = 1 * frames_1ms; |
| va_output.set_ring_buffer_restrictions(frames_low, frames_high, frames_modulo)?; |
| |
| let (tx, rx) = mpsc::channel(512); |
| va_output.add()?; |
| fasync::Task::spawn( |
| async move { |
| let mut worker = OutputWorker::default(); |
| worker.run(rx, va_output).await?; |
| Ok::<(), Error>(()) |
| } |
| .unwrap_or_else(|e| eprintln!("Output extraction thread failed: {:?}", e)), |
| ) |
| .detach(); |
| |
| self.output_sender = Some(tx); |
| Ok(()) |
| } |
| |
| pub fn write_header(&mut self, len: u32) -> Result<(), Error> { |
| let bytes_per_sample = get_sample_size(get_zircon_sample_format(self.sample_format))?; |
| |
| // 8 Bytes |
| self.extracted_data.write("RIFF".as_bytes())?; |
| self.extracted_data.write_u32::<LittleEndian>(len as u32 + 8 + 28 + 8)?; |
| |
| // 28 bytes |
| self.extracted_data.write("WAVE".as_bytes())?; // wave_four_cc uint32 |
| self.extracted_data.write("fmt ".as_bytes())?; // fmt_four_cc uint32 |
| self.extracted_data.write_u32::<LittleEndian>(16u32)?; // fmt_chunk_len |
| self.extracted_data.write_u16::<LittleEndian>(1u16)?; // format |
| self.extracted_data.write_u16::<LittleEndian>(self.channels as u16)?; |
| self.extracted_data.write_u32::<LittleEndian>(self.frames_per_second)?; |
| self.extracted_data.write_u32::<LittleEndian>( |
| bytes_per_sample as u32 * self.channels as u32 * self.frames_per_second, |
| )?; // avg_byte_rate |
| self.extracted_data |
| .write_u16::<LittleEndian>(bytes_per_sample as u16 * self.channels as u16)?; |
| self.extracted_data.write_u16::<LittleEndian>((bytes_per_sample * 8) as u16)?; |
| |
| // 8 bytes |
| self.extracted_data.write("data".as_bytes())?; |
| self.extracted_data.write_u32::<LittleEndian>(len as u32)?; |
| |
| Ok(()) |
| } |
| } |
| |
| #[derive(Debug, Default)] |
| struct InputWorker { |
| va_input: Option<InputProxy>, |
| inj_data: Vec<u8>, |
| vmo: Option<zx::Vmo>, |
| |
| // How much of the vmo's data we're actually using, in bytes. |
| work_space: usize, |
| |
| // How many frames ahead of the position we want to be writing to, when writing. |
| target_frames: usize, |
| |
| // How often, in frames, we want to be updated on the state of the injection ring buffer. |
| frames_per_notification: usize, |
| |
| // How many bytes a frame is. |
| frame_size: usize, |
| |
| // Next write pointer as a byte number. |
| // This is not an offset into vmo (it does not wrap around). |
| write_pointer: usize, |
| |
| // Next ring buffer pointer as a byte number. |
| // This is not an offset into vmo (it does not wrap around). |
| ring_pointer: usize, |
| |
| // Last relative ring buffer byte offset. |
| // This is an offset into vmo (it wraps around). |
| last_ring_offset: usize, |
| } |
| |
| impl InputWorker { |
| fn write_to_vmo(&self, data: &[u8]) -> Result<(), Error> { |
| let vmo = if let Some(vmo) = &self.vmo { vmo } else { return Ok(()) }; |
| let start = self.write_pointer % self.work_space; |
| let end = (self.write_pointer + data.len()) % self.work_space; |
| |
| // Write in two chunks if we've wrapped around. |
| if end < start { |
| let split = self.work_space - start; |
| vmo.write(&data[0..split], start as u64)?; |
| vmo.write(&data[split..], 0)?; |
| } else { |
| vmo.write(&data, start as u64)?; |
| } |
| Ok(()) |
| } |
| |
| // Events from the sl4f facade |
| fn flush(&mut self) { |
| self.inj_data.clear(); |
| } |
| |
| fn set_data(&mut self, data: Vec<u8>) -> Result<(), Error> { |
| if self.inj_data.len() > 0 { |
| return Err(format_err!("Cannot inject new audio without flushing old audio")); |
| } |
| // This is a bad assumption, wav headers can be many different sizes. |
| // 8 Bytes for riff header |
| // 28 bytes for wave fmt block |
| // 8 bytes for data header |
| self.inj_data.write(&data[44..])?; |
| fx_log_info!("AudioFacade::InputWorker: Injecting {:?} bytes", self.inj_data.len()); |
| Ok(()) |
| } |
| |
| // Events from the Virtual Audio Device |
| fn on_set_format( |
| &mut self, |
| frames_per_second: u32, |
| sample_format: u32, |
| num_channels: u32, |
| _external_delay: i64, |
| ) -> Result<(), Error> { |
| let sample_size = get_sample_size(sample_format)?; |
| self.frame_size = num_channels as usize * sample_size as usize; |
| |
| let frames_per_millisecond = frames_per_second as usize / 1000; |
| |
| fx_log_info!( |
| "AudioFacade::InputWorker: configuring with {:?} fps, {:?} bpf", |
| frames_per_second, |
| self.frame_size |
| ); |
| |
| // We get notified every 50ms and write up to 250ms worth of data in the future. |
| // The gap between frames_per_notification and target_frames gives us slack to |
| // account for scheduling delays. Audio injection proceeds as follows: |
| // |
| // 1. When the buffer is created, the initial ring buffer position is "0ms". |
| // At that time, the ring buffer is zeroed and we pretend to write target_frames |
| // worth of silence to the ring buffer. This puts our write pointer ~250ms |
| // ahead of the ring buffer's safe read pointer. |
| // |
| // 2. We receive the first notification at time 50ms + scheduling delay. At this |
| // point we write data for the range 250ms - 300ms. As long as our scheduling |
| // delay is < 250ms, our writes will stay ahead of the ring buffer's safe read |
| // pointer. We assume that 250ms is more than enough time (this test framework |
| // should never be run in a debug or sanitizer build). |
| // |
| // 3. This continues ad infinitum. |
| // |
| // The sum of 250ms + 50ms must fit within the ring buffer. We currently use a 1s |
| // ring buffer: see VirtualInput.start_input. |
| // |
| self.target_frames = 250 * frames_per_millisecond; |
| self.frames_per_notification = 50 * frames_per_millisecond; |
| Ok(()) |
| } |
| |
| fn on_buffer_created( |
| &mut self, |
| ring_buffer: zx::Vmo, |
| num_ring_buffer_frames: usize, |
| _notifications_per_ring: u32, |
| ) -> Result<(), Error> { |
| let va_input = self.va_input.as_mut().ok_or(format_err!("va_input not initialized"))?; |
| |
| // Ignore AudioCore's notification cadence (_notifications_per_ring); set up our own. |
| let target_notifications_per_ring = num_ring_buffer_frames / self.frames_per_notification; |
| |
| va_input.set_notification_frequency(target_notifications_per_ring as u32)?; |
| |
| // The buffer starts zeroed and our write pointer starts target_frames in the future. |
| self.work_space = num_ring_buffer_frames * self.frame_size; |
| self.write_pointer = self.target_frames * self.frame_size; |
| self.last_ring_offset = 0; |
| self.vmo = Some(ring_buffer); |
| |
| fx_log_info!( |
| "AudioFacade::InputWorker: created buffer with {:?} frames, {:?} notifications, {:?} target frames per write", |
| num_ring_buffer_frames, |
| target_notifications_per_ring, |
| self.target_frames |
| ); |
| Ok(()) |
| } |
| |
| fn on_position_notify( |
| &mut self, |
| _monotonic_time: i64, |
| ring_offset: usize, |
| ) -> Result<(), Error> { |
| if ring_offset < self.last_ring_offset { |
| self.ring_pointer += (self.work_space - self.last_ring_offset) + ring_offset; |
| } else { |
| self.ring_pointer += ring_offset - self.last_ring_offset; |
| }; |
| |
| let next_write_pointer = self.ring_pointer + self.target_frames * self.frame_size; |
| let bytes_to_write = next_write_pointer - self.write_pointer; |
| |
| // Next segment of inj_data. |
| if self.inj_data.len() > 0 { |
| let data_end = std::cmp::min(self.inj_data.len(), bytes_to_write); |
| self.write_to_vmo(&self.inj_data[0..data_end])?; |
| self.inj_data.drain(0..data_end); |
| self.write_pointer += data_end; |
| } |
| |
| // Pad with zeroes. |
| if self.write_pointer < next_write_pointer { |
| let zeroes = vec![0; next_write_pointer - self.write_pointer]; |
| self.write_to_vmo(&zeroes)?; |
| self.write_pointer = next_write_pointer; |
| } |
| |
| self.last_ring_offset = ring_offset; |
| Ok(()) |
| } |
| |
| async fn run( |
| &mut self, |
| mut rx: mpsc::Receiver<InjectMsg>, |
| va_input: InputProxy, |
| ) -> Result<(), Error> { |
| let mut input_events = va_input.take_event_stream(); |
| self.va_input = Some(va_input); |
| |
| // Monotonic timestamp returned by the most-recent OnStart/OnPositionNotify/OnStop response. |
| let mut last_timestamp = zx::Time::from_nanos(0); |
| // Observed monotonic time that OnStart/OnPositionNotify/OnStop messages actually arrived. |
| let mut last_event_time = zx::Time::from_nanos(0); |
| |
| loop { |
| select! { |
| rx_msg = rx.next() => { |
| match rx_msg { |
| None => { |
| return Err(format_err!("Got None InjectMsg Event, exiting worker")); |
| }, |
| Some(InjectMsg::Flush) => { |
| self.flush(); |
| } |
| Some(InjectMsg::Data { data }) => { |
| self.set_data(data)?; |
| } |
| } |
| }, |
| input_msg = input_events.try_next() => { |
| match input_msg? { |
| None => { |
| return Err(format_err!("Got None InputEvent Message, exiting worker")); |
| }, |
| Some(InputEvent::OnSetFormat { frames_per_second, sample_format, |
| num_channels, external_delay}) => { |
| self.on_set_format(frames_per_second, sample_format, num_channels, |
| external_delay)?; |
| }, |
| Some(InputEvent::OnBufferCreated { ring_buffer, num_ring_buffer_frames, |
| notifications_per_ring }) => { |
| self.on_buffer_created(ring_buffer, num_ring_buffer_frames as usize, |
| notifications_per_ring)?; |
| }, |
| Some(InputEvent::OnStart { start_time }) => { |
| if last_timestamp > zx::Time::from_nanos(0) { |
| fx_log_info!("AudioFacade::InputWorker: Injection OnPositionNotify received before OnStart"); |
| } |
| last_timestamp = zx::Time::from_nanos(start_time); |
| last_event_time = zx::Time::get_monotonic(); |
| }, |
| Some(InputEvent::OnStop { stop_time, ring_position }) => { |
| if last_timestamp == zx::Time::from_nanos(0) { |
| fx_log_info!("AudioFacade::InputWorker: Injection OnPositionNotify timestamp cleared before OnStop"); |
| } |
| last_timestamp = zx::Time::from_nanos(0); |
| last_event_time = zx::Time::from_nanos(0); |
| }, |
| Some(InputEvent::OnPositionNotify { monotonic_time, ring_position }) => { |
| let monotonic_zx_time = zx::Time::from_nanos(monotonic_time); |
| let now = zx::Time::get_monotonic(); |
| |
| // To minimize logspam, log glitches only when writing audio. |
| if self.inj_data.len() > 0 { |
| if last_timestamp == zx::Time::from_nanos(0) { |
| fx_log_info!("AudioFacade::InputWorker: Injection OnStart not received before OnPositionNotify"); |
| } |
| |
| // Log if our timestamps had a gap of more than 100ms. This is highly |
| // abnormal and indicates possible glitching while receiving audio to |
| // be injected and/or providing it to the system. |
| let timestamp_interval = monotonic_zx_time - last_timestamp; |
| |
| if timestamp_interval > zx::Duration::from_millis(100) { |
| fx_log_info!("AudioFacade::InputWorker: Injection position timestamp jumped by more than 100ms ({:?}ms). Expect glitches.", |
| timestamp_interval.into_millis()); |
| } |
| if monotonic_zx_time < last_timestamp { |
| fx_log_info!("AudioFacade::InputWorker: Injection position timestamp moved backwards ({:?}ms). Expect glitches.", |
| timestamp_interval.into_millis()); |
| } |
| |
| // Log if there was a gap in position notification arrivals of more |
| // than 150ms. This is highly abnormal and indicates possible glitching |
| // while receiving audio to be injected and/or providing it to the |
| // system. |
| let observed_interval = now - last_event_time; |
| |
| if observed_interval > zx::Duration::from_millis(150) { |
| fx_log_info!("AudioFacade::InputWorker: Injection position not updated for 150ms ({:?}ms). Expect glitches.", |
| observed_interval.into_millis()); |
| } |
| } |
| |
| last_timestamp = monotonic_zx_time; |
| last_event_time = now; |
| self.on_position_notify(monotonic_time, ring_position as usize)?; |
| }, |
| Some(evt) => { |
| fx_log_info!("AudioFacade::InputWorker: Got unknown InputEvent {:?}", evt); |
| } |
| } |
| }, |
| }; |
| } |
| } |
| } |
| |
| #[derive(Debug)] |
| struct VirtualInput { |
| injection_data: Vec<Vec<u8>>, |
| have_data: bool, |
| |
| sample_format: AudioSampleFormat, |
| channels: u8, |
| frames_per_second: u32, |
| |
| input_sender: Option<mpsc::Sender<InjectMsg>>, |
| } |
| |
| impl VirtualInput { |
| fn new(sample_format: AudioSampleFormat, channels: u8, frames_per_second: u32) -> Self { |
| VirtualInput { |
| injection_data: vec![], |
| have_data: false, |
| |
| sample_format, |
| channels, |
| frames_per_second, |
| |
| input_sender: None, |
| } |
| } |
| |
| pub fn start_input(&mut self) -> Result<(), Error> { |
| let va_input = app::client::connect_to_service::<InputMarker>()?; |
| va_input.clear_format_ranges()?; |
| va_input.set_fifo_depth(0)?; |
| va_input.set_external_delay(0)?; |
| |
| let sample_format = get_zircon_sample_format(self.sample_format); |
| va_input.add_format_range( |
| sample_format as u32, |
| self.frames_per_second, |
| self.frames_per_second, |
| self.channels, |
| self.channels, |
| ASF_RANGE_FLAG_FPS_CONTINUOUS, |
| )?; |
| |
| // set buffer size to be at least 1s. |
| let frames_1ms = self.frames_per_second / 1000; |
| let frames_low = 1000 * frames_1ms; |
| let frames_high = 2000 * frames_1ms; |
| let frames_modulo = 1 * frames_1ms; |
| va_input.set_ring_buffer_restrictions(frames_low, frames_high, frames_modulo)?; |
| |
| va_input.add()?; |
| let (tx, rx) = mpsc::channel(512); |
| fasync::Task::spawn( |
| async move { |
| let mut worker = InputWorker::default(); |
| worker.run(rx, va_input).await?; |
| Ok::<(), Error>(()) |
| } |
| .unwrap_or_else(|e| eprintln!("Input injection thread failed: {:?}", e)), |
| ) |
| .detach(); |
| |
| self.input_sender = Some(tx); |
| Ok(()) |
| } |
| |
| pub fn play(&mut self, index: usize) -> Result<(), Error> { |
| let sender = |
| self.input_sender.as_mut().ok_or(format_err!("input_sender not initialized"))?; |
| sender.try_send(InjectMsg::Flush)?; |
| sender.try_send(InjectMsg::Data { data: self.injection_data[index].clone() })?; |
| Ok(()) |
| } |
| |
| pub fn stop(&mut self) -> Result<(), Error> { |
| let sender = |
| self.input_sender.as_mut().ok_or(format_err!("input_sender not initialized"))?; |
| sender.try_send(InjectMsg::Flush)?; |
| Ok(()) |
| } |
| } |
| |
| #[derive(Debug)] |
| enum InjectMsg { |
| Flush, |
| Data { data: Vec<u8> }, |
| } |
| |
| #[derive(Debug)] |
| struct VirtualAudio { |
| // Output is from the AudioCore side, so it's what we'll be capturing and extracting |
| output_sample_format: AudioSampleFormat, |
| output_channels: u8, |
| output_frames_per_second: u32, |
| output: Option<OutputProxy>, |
| |
| // Input is from the AudioCore side, so it's the audio we'll be injecting |
| input_sample_format: AudioSampleFormat, |
| input_channels: u8, |
| input_frames_per_second: u32, |
| |
| controller: ControlProxy, |
| } |
| |
| impl VirtualAudio { |
| fn new() -> Result<VirtualAudio, Error> { |
| let va_control = app::client::connect_to_service::<ControlMarker>()?; |
| Ok(VirtualAudio { |
| output_sample_format: AudioSampleFormat::Signed16, |
| output_channels: 2, |
| output_frames_per_second: 48000, |
| output: None, |
| |
| input_sample_format: AudioSampleFormat::Signed16, |
| input_channels: 2, |
| input_frames_per_second: 16000, |
| |
| controller: va_control, |
| }) |
| } |
| } |
| |
| /// Perform Audio operations. |
| /// |
| /// Note this object is shared among all threads created by server. |
| #[derive(Debug)] |
| pub struct AudioFacade { |
| // TODO(perley): This will be needed after migrating to using virtual audio devices rather than |
| // renderer+capturer in the facade. |
| vad_control: RwLock<VirtualAudio>, |
| audio_output: RwLock<VirtualOutput>, |
| audio_input: RwLock<VirtualInput>, |
| initialized: Mutex<bool>, |
| } |
| |
| impl AudioFacade { |
| async fn ensure_initialized(&self) -> Result<(), Error> { |
| let mut initialized = self.initialized.lock().await; |
| if !*(initialized) { |
| let controller = self.vad_control.read().controller.clone(); |
| controller.disable().await?; |
| controller.enable().await?; |
| self.audio_input.write().start_input()?; |
| self.audio_output.write().start_output()?; |
| *(initialized) = true; |
| } |
| Ok(()) |
| } |
| |
| pub fn new() -> Result<AudioFacade, Error> { |
| let vad_control = RwLock::new(VirtualAudio::new()?); |
| let audio_output = RwLock::new(VirtualOutput::new( |
| vad_control.read().output_sample_format, |
| vad_control.read().output_channels, |
| vad_control.read().output_frames_per_second, |
| )?); |
| let audio_input = RwLock::new(VirtualInput::new( |
| vad_control.read().input_sample_format, |
| vad_control.read().input_channels, |
| vad_control.read().input_frames_per_second, |
| )); |
| let initialized = Mutex::new(false); |
| |
| Ok(AudioFacade { vad_control, audio_output, audio_input, initialized }) |
| } |
| |
| pub async fn start_output_save(&self) -> Result<Value, Error> { |
| self.ensure_initialized().await?; |
| let capturing = self.audio_output.read().capturing.clone(); |
| let mut capturing = capturing.lock().await; |
| if !*capturing { |
| let mut write = self.audio_output.write(); |
| let sender = write |
| .output_sender |
| .as_mut() |
| .ok_or(format_err!("Failed unwrapping output sender"))?; |
| sender.try_send(ExtractMsg::Start)?; |
| *(capturing) = true; |
| |
| Ok(to_value(true)?) |
| } else { |
| return Err(format_err!("Cannot StartOutputSave, already started.")); |
| } |
| } |
| |
| pub async fn stop_output_save(&self) -> Result<Value, Error> { |
| self.ensure_initialized().await?; |
| let capturing = self.audio_output.read().capturing.clone(); |
| let mut capturing = capturing.lock().await; |
| if *capturing { |
| let mut write = self.audio_output.write(); |
| write.extracted_data.clear(); |
| |
| let sender = write |
| .output_sender |
| .as_mut() |
| .ok_or(format_err!("Failed unwrapping output sender"))?; |
| |
| let (tx, mut rx) = mpsc::channel(512); |
| sender.try_send(ExtractMsg::Stop { out_sender: tx })?; |
| |
| let mut saved_audio = rx |
| .next() |
| .await |
| .ok_or_else(|| format_err!("StopOutputSave failed, could not retrieve data."))?; |
| |
| write.write_header(saved_audio.len() as u32)?; |
| |
| write.extracted_data.append(&mut saved_audio); |
| *(capturing) = false; |
| Ok(to_value(true)?) |
| } else { |
| return Err(format_err!("Cannot StopOutputSave, not started.")); |
| } |
| } |
| |
| pub async fn get_output_audio(&self) -> Result<Value, Error> { |
| self.ensure_initialized().await?; |
| let capturing = self.audio_output.read().capturing.clone(); |
| let capturing = capturing.lock().await; |
| if !*capturing { |
| Ok(to_value(base64::encode(&self.audio_output.read().extracted_data))?) |
| } else { |
| return Err(format_err!("GetOutputAudio failed, still saving.")); |
| } |
| } |
| |
| pub async fn put_input_audio(&self, args: Value) -> Result<Value, Error> { |
| self.ensure_initialized().await?; |
| let data = args.get("data").ok_or(format_err!("PutInputAudio failed, no data"))?; |
| let data = data.as_str().ok_or(format_err!("PutInputAudio failed, data not string"))?; |
| |
| let mut wave_data_vec = base64::decode(data)?; |
| let sample_index = args["index"].as_u64().ok_or(format_err!("index not a number"))?; |
| let sample_index = sample_index as usize; |
| |
| // TODO(perley): check wave format for correct bits per sample and float/int. |
| let byte_cnt = wave_data_vec.len(); |
| { |
| let mut write = self.audio_input.write(); |
| // Make sure we have somewhere to store the wav data. |
| if write.injection_data.len() <= sample_index { |
| write.injection_data.resize(sample_index + 1, vec![]); |
| } |
| |
| write.injection_data[sample_index].clear(); |
| write.injection_data[sample_index].append(&mut wave_data_vec); |
| write.have_data = true; |
| } |
| Ok(to_value(byte_cnt)?) |
| } |
| |
| pub async fn start_input_injection(&self, args: Value) -> Result<Value, Error> { |
| self.ensure_initialized().await?; |
| { |
| let sample_index = args["index"].as_u64().ok_or(format_err!("index not a number"))?; |
| let sample_index = sample_index as usize; |
| |
| if !self.audio_input.read().have_data { |
| return Err(format_err!("StartInputInjection failed, no Audio data to inject.")); |
| } |
| self.audio_input.write().play(sample_index)?; |
| } |
| Ok(to_value(true)?) |
| } |
| |
| pub async fn stop_input_injection(&self) -> Result<Value, Error> { |
| self.ensure_initialized().await?; |
| self.audio_input.write().stop()?; |
| Ok(to_value(true)?) |
| } |
| } |