| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use base64; |
| use byteorder::{LittleEndian, WriteBytesExt}; |
| use failure::Error; |
| use fidl_fuchsia_media::*; |
| use fidl_fuchsia_virtualaudio::*; |
| use fuchsia_async as fasync; |
| use fuchsia_component as app; |
| use fuchsia_zircon as zx; |
| use futures::channel::mpsc; |
| use futures::lock::Mutex; |
| use futures::{select, StreamExt, TryFutureExt, TryStreamExt}; |
| use parking_lot::RwLock; |
| use serde_json::{to_value, Value}; |
| use std::io::Write; |
| use std::iter::FromIterator; |
| use std::sync::Arc; |
| |
| use fuchsia_syslog::macros::*; |
| |
| // Values found in: |
| // zircon/system/public/zircon/device/audio.h |
| const AUDIO_SAMPLE_FORMAT_8BIT: u32 = (1 << 1); |
| const AUDIO_SAMPLE_FORMAT_16BIT: u32 = (1 << 2); |
| const AUDIO_SAMPLE_FORMAT_24BIT_IN32: u32 = (1 << 7); |
| const AUDIO_SAMPLE_FORMAT_32BIT_FLOAT: u32 = (1 << 9); |
| |
| const ASF_RANGE_FLAG_FPS_CONTINUOUS: u16 = (1 << 0); |
| |
| fn get_sample_size(format: u32) -> Result<usize, Error> { |
| Ok(match format { |
| // These are the currently implemented formats. |
| AUDIO_SAMPLE_FORMAT_8BIT => 1, |
| AUDIO_SAMPLE_FORMAT_16BIT => 2, |
| AUDIO_SAMPLE_FORMAT_24BIT_IN32 => 4, |
| AUDIO_SAMPLE_FORMAT_32BIT_FLOAT => 4, |
| _ => bail!("Cannot handle sample_format: {:?}", format), |
| }) |
| } |
| |
| fn get_zircon_sample_format(format: AudioSampleFormat) -> u32 { |
| match format { |
| AudioSampleFormat::Signed16 => AUDIO_SAMPLE_FORMAT_16BIT, |
| AudioSampleFormat::Unsigned8 => AUDIO_SAMPLE_FORMAT_8BIT, |
| AudioSampleFormat::Signed24In32 => AUDIO_SAMPLE_FORMAT_24BIT_IN32, |
| AudioSampleFormat::Float => AUDIO_SAMPLE_FORMAT_32BIT_FLOAT, |
| // No default case, these are all the audio sample formats supported right now. |
| } |
| } |
| |
| #[derive(Debug)] |
| enum ExtractMsg { |
| Start, |
| Stop { out_sender: mpsc::Sender<Vec<u8>> }, |
| } |
| |
| #[derive(Debug, Default)] |
| struct OutputWorker { |
| va_output: Option<OutputProxy>, |
| extracted_data: Vec<u8>, |
| vmo: Option<zx::Vmo>, |
| |
| // Whether we should store samples when we receive notification from the VAD |
| capturing: bool, |
| |
| // How much of the vmo's data we're actually using, in bytes. |
| work_space: u64, |
| |
| // How often, in frames, we want to be updated on the state. |
| frames_per_notification: u64, |
| |
| // How many bytes a frame is. |
| frame_size: u64, |
| |
| // Offset into vmo where we'll start to read next, in bytes. |
| next_read: u64, |
| |
| // Offset into vmo where we'll finish reading next, in bytes. |
| next_read_end: u64, |
| } |
| |
| impl OutputWorker { |
| fn on_set_format( |
| &mut self, |
| frames_per_second: u32, |
| sample_format: u32, |
| num_channels: u32, |
| _external_delay: i64, |
| ) -> Result<(), Error> { |
| let sample_size = get_sample_size(sample_format)?; |
| self.frame_size = num_channels as u64 * sample_size as u64; |
| |
| let frames_per_millisecond = frames_per_second as u64 / 1000; |
| |
| self.frames_per_notification = 50 * frames_per_millisecond; |
| |
| Ok(()) |
| } |
| |
| fn on_buffer_created( |
| &mut self, |
| ring_buffer: zx::Vmo, |
| num_ring_buffer_frames: u32, |
| _notifications_per_ring: u32, |
| ) -> Result<(), Error> { |
| let va_output = self.va_output.as_mut().ok_or(format_err!("va_input not initialized"))?; |
| |
| let target_frames_per_notification = |
| num_ring_buffer_frames as u64 / self.frames_per_notification; |
| |
| va_output.set_notification_frequency(target_frames_per_notification as u32)?; |
| |
| self.work_space = num_ring_buffer_frames as u64 * self.frame_size; |
| |
| // Start reading from the beginning. |
| self.next_read = 0; |
| self.next_read_end = 0; |
| |
| self.vmo = Some(ring_buffer); |
| Ok(()) |
| } |
| |
| fn on_position_notify( |
| &mut self, |
| ring_position: u32, |
| _clock_time: i64, |
| capturing: bool, |
| ) -> Result<(), Error> { |
| if capturing && self.next_read != self.next_read_end { |
| let vmo = if let Some(vmo) = &self.vmo { vmo } else { return Ok(()) }; |
| |
| if (self.next_read_end as u64) < self.next_read { |
| // Wrap-around case, read through the end. |
| let mut data = vec![0u8; (self.work_space - self.next_read) as usize]; |
| let overwrite1 = vec![1u8; (self.work_space - self.next_read) as usize]; |
| vmo.read(&mut data, self.next_read)?; |
| vmo.write(&overwrite1, self.next_read)?; |
| self.extracted_data.append(&mut data); |
| |
| // Read remaining data. |
| let mut data = vec![0u8; self.next_read_end as usize]; |
| let overwrite2 = vec![1u8; self.next_read_end as usize]; |
| vmo.read(&mut data, 0)?; |
| vmo.write(&overwrite2, 0)?; |
| |
| self.extracted_data.append(&mut data); |
| } else { |
| // Normal case, just read all the bytes. |
| let mut data = vec![0u8; (self.next_read_end - self.next_read) as usize]; |
| let overwrite = vec![1u8; (self.next_read_end - self.next_read) as usize]; |
| vmo.read(&mut data, self.next_read)?; |
| vmo.write(&overwrite, self.next_read)?; |
| |
| self.extracted_data.append(&mut data); |
| } |
| } |
| // We always stay 1 notification behind to work around audio glitches. |
| self.next_read = self.next_read_end; |
| self.next_read_end = ring_position as u64; |
| Ok(()) |
| } |
| |
| async fn run( |
| &mut self, |
| mut rx: mpsc::Receiver<ExtractMsg>, |
| va_output: OutputProxy, |
| ) -> Result<(), Error> { |
| let mut output_events = va_output.take_event_stream(); |
| self.va_output = Some(va_output); |
| |
| // Monotonic time of last OnPositionNotify |
| let mut last_notify = zx::Time::from_nanos(0); |
| |
| loop { |
| select! { |
| rx_msg = rx.next() => { |
| match rx_msg { |
| None => { |
| bail!("Got None InjectMsg Event, exiting worker"); |
| }, |
| Some(ExtractMsg::Stop { mut out_sender }) => { |
| self.capturing = false; |
| let mut ret_data = vec![0u8; 0]; |
| |
| ret_data.append(&mut self.extracted_data); |
| |
| out_sender.try_send(ret_data)?; |
| } |
| Some(ExtractMsg::Start) => { |
| self.extracted_data.clear(); |
| self.capturing = true; |
| } |
| } |
| }, |
| output_msg = output_events.try_next() => { |
| match output_msg? { |
| None => { |
| bail!("Got None InputEvent Message, exiting worker"); |
| }, |
| Some(OutputEvent::OnSetFormat { frames_per_second, sample_format, num_channels, external_delay}) => { |
| self.on_set_format(frames_per_second, sample_format, num_channels, external_delay)?; |
| }, |
| Some(OutputEvent::OnBufferCreated { ring_buffer, num_ring_buffer_frames, notifications_per_ring }) => { |
| self.on_buffer_created(ring_buffer, num_ring_buffer_frames, notifications_per_ring)?; |
| }, |
| Some(OutputEvent::OnPositionNotify { ring_position, clock_time }) => { |
| // Log if we have been delayed more than 150ms. This is highly abnormal and |
| // is indicative of possible glitching in the captured audio. |
| let now = zx::Time::get(zx::ClockId::Monotonic); |
| if last_notify > zx::Time::from_nanos(0) { |
| let interval = now - last_notify; |
| if interval > zx::Duration::from_millis(150) { |
| fx_log_info!("Output position not updated for 150ms({:?}). Expect glitches.", interval.into_millis()); |
| } |
| } |
| last_notify = now; |
| |
| self.on_position_notify(ring_position, clock_time, self.capturing)?; |
| }, |
| Some(evt) => { |
| fx_log_info!("Got unknown InputEvent {:?}", evt); |
| } |
| } |
| }, |
| }; |
| } |
| } |
| } |
| |
| #[derive(Debug)] |
| struct VirtualOutput { |
| extracted_data: Vec<u8>, |
| capturing: Arc<Mutex<bool>>, |
| have_data: bool, |
| |
| sample_format: AudioSampleFormat, |
| channels: u8, |
| frames_per_second: u32, |
| |
| output: Option<OutputProxy>, |
| output_sender: Option<mpsc::Sender<ExtractMsg>>, |
| } |
| |
| impl VirtualOutput { |
| pub fn new( |
| sample_format: AudioSampleFormat, |
| channels: u8, |
| frames_per_second: u32, |
| ) -> Result<VirtualOutput, Error> { |
| Ok(VirtualOutput { |
| extracted_data: vec![], |
| capturing: Arc::new(Mutex::new(false)), |
| have_data: false, |
| |
| sample_format, |
| channels, |
| frames_per_second, |
| |
| output: None, |
| output_sender: None, |
| }) |
| } |
| |
| pub fn start_output(&mut self) -> Result<(), Error> { |
| let va_output = app::client::connect_to_service::<OutputMarker>()?; |
| va_output.clear_format_ranges()?; |
| let sample_format = get_zircon_sample_format(self.sample_format); |
| |
| va_output.add_format_range( |
| sample_format as u32, |
| self.frames_per_second, |
| self.frames_per_second, |
| self.channels, |
| self.channels, |
| ASF_RANGE_FLAG_FPS_CONTINUOUS, |
| )?; |
| |
| // set buffer size to be 500ms-1000ms |
| let frames_1ms = self.frames_per_second / 1000; |
| let frames_low = 500 * frames_1ms; |
| let frames_high = 1000 * frames_1ms; |
| let frames_modulo = 1 * frames_1ms; |
| va_output.set_ring_buffer_restrictions(frames_low, frames_high, frames_modulo)?; |
| |
| let (tx, rx) = mpsc::channel(512); |
| va_output.add()?; |
| fasync::spawn( |
| async move { |
| let mut worker = OutputWorker::default(); |
| await!(worker.run(rx, va_output))?; |
| Ok::<(), Error>(()) |
| } |
| .unwrap_or_else(|e| eprintln!("Input injection thread failed: {:?}", e)), |
| ); |
| |
| self.output_sender = Some(tx); |
| Ok(()) |
| } |
| |
| pub fn write_header(&mut self, len: u32) -> Result<(), Error> { |
| let bytes_per_sample = get_sample_size(get_zircon_sample_format(self.sample_format))?; |
| |
| // 8 Bytes |
| self.extracted_data.write("RIFF".as_bytes())?; |
| self.extracted_data.write_u32::<LittleEndian>(len as u32 + 8 + 28 + 8)?; |
| |
| // 28 bytes |
| self.extracted_data.write("WAVE".as_bytes())?; // wave_four_cc uint32 |
| self.extracted_data.write("fmt ".as_bytes())?; // fmt_four_cc uint32 |
| self.extracted_data.write_u32::<LittleEndian>(16u32)?; // fmt_chunk_len |
| self.extracted_data.write_u16::<LittleEndian>(1u16)?; // format |
| self.extracted_data.write_u16::<LittleEndian>(self.channels as u16)?; |
| self.extracted_data.write_u32::<LittleEndian>(self.frames_per_second)?; |
| self.extracted_data.write_u32::<LittleEndian>( |
| bytes_per_sample as u32 * self.channels as u32 * self.frames_per_second, |
| )?; // avg_byte_rate |
| self.extracted_data |
| .write_u16::<LittleEndian>(bytes_per_sample as u16 * self.channels as u16)?; |
| self.extracted_data.write_u16::<LittleEndian>((bytes_per_sample * 8) as u16)?; |
| |
| // 8 bytes |
| self.extracted_data.write("data".as_bytes())?; |
| self.extracted_data.write_u32::<LittleEndian>(len as u32)?; |
| |
| Ok(()) |
| } |
| } |
| |
| #[derive(Debug, Default)] |
| struct InputWorker { |
| va_input: Option<InputProxy>, |
| inj_data: Vec<u8>, |
| vmo: Option<zx::Vmo>, |
| |
| // How much of the vmo's data we're actually using, in bytes. |
| work_space: u64, |
| |
| // How many frames ahead of the position we want to be writing to, when writing. |
| target_frames: u64, |
| |
| // How close we let position get to where we've written before writing again. |
| low_frames: u64, |
| |
| // How often, in frames, we want to be updated on the state. |
| frames_per_notification: u64, |
| |
| // How many bytes a frame is. |
| frame_size: u64, |
| |
| // Offset into inj_data, in bytes |
| data_offset: usize, |
| |
| // Offset into vmo where we'll write next, in bytes. |
| next_write: u64, |
| |
| // How many zeros we've written. |
| zeros_written: usize, |
| } |
| |
| impl InputWorker { |
| // Events from the sl4f facade |
| fn flush(&mut self) { |
| // This will be set to false once the writer has zeroed out the vmo. |
| self.inj_data.clear(); |
| self.data_offset = 0; |
| } |
| |
| fn set_data(&mut self, data: Vec<u8>) -> Result<(), Error> { |
| self.inj_data.clear(); |
| // This is a bad assumption, wav headers can be many different sizes. |
| // 8 Bytes for riff header |
| // 28 bytes for wave fmt block |
| // 8 bytes for data header |
| self.data_offset = 44; |
| self.zeros_written = 0; |
| self.inj_data.write(&data)?; |
| Ok(()) |
| } |
| |
| // Events from the Virtual Audio Device |
| fn on_set_format( |
| &mut self, |
| frames_per_second: u32, |
| sample_format: u32, |
| num_channels: u32, |
| _external_delay: i64, |
| ) -> Result<(), Error> { |
| let sample_size = get_sample_size(sample_format)?; |
| self.frame_size = num_channels as u64 * sample_size as u64; |
| |
| let frames_per_millisecond = frames_per_second as u64 / 1000; |
| |
| self.target_frames = 150 * frames_per_millisecond; |
| self.low_frames = 100 * frames_per_millisecond; |
| self.frames_per_notification = 50 * frames_per_millisecond; |
| Ok(()) |
| } |
| |
| fn on_buffer_created( |
| &mut self, |
| ring_buffer: zx::Vmo, |
| num_ring_buffer_frames: u32, |
| _notifications_per_ring: u32, |
| ) -> Result<(), Error> { |
| let va_input = self.va_input.as_mut().ok_or(format_err!("va_input not initialized"))?; |
| |
| let target_frames_per_notification = |
| num_ring_buffer_frames as u64 / self.frames_per_notification; |
| |
| va_input.set_notification_frequency(target_frames_per_notification as u32)?; |
| |
| self.work_space = num_ring_buffer_frames as u64 * self.frame_size; |
| |
| // It starts zeroed, so just pretend we wrote the zeroes at the start. |
| self.next_write = 2 * self.frames_per_notification * self.frame_size; |
| |
| self.vmo = Some(ring_buffer); |
| Ok(()) |
| } |
| |
| fn on_position_notify( |
| &mut self, |
| ring_position: u32, |
| _clock_time: i64, |
| active: bool, |
| ) -> Result<(), Error> { |
| let vmo = if let Some(vmo) = &self.vmo { vmo } else { return Ok(()) }; |
| |
| let current_fill = if self.next_write < ring_position as u64 { |
| self.work_space - ring_position as u64 + self.next_write |
| } else { |
| self.next_write - ring_position as u64 |
| }; |
| |
| // If we've still have more than low_frames notifications worth of data, just continue. |
| if current_fill > self.low_frames * self.frame_size { |
| return Ok(()); |
| } |
| |
| // Calculate where to stop writing to hit the target buffer fill. |
| let write_end = |
| (ring_position as u64 + self.target_frames * self.frame_size) % self.work_space; |
| |
| if active { |
| // Calculate how many bytes we're writing, and figure out if we've wrapped around, and |
| // if so, calculate the split. |
| let (split_point, write_size) = if write_end < self.next_write { |
| ( |
| (self.work_space - self.next_write) as usize, |
| (self.work_space - self.next_write + write_end) as usize, |
| ) |
| } else { |
| (0 as usize, (write_end - self.next_write) as usize) |
| }; |
| |
| // Calculate the range of the data we need to use. |
| let start = self.data_offset as usize; |
| let end; |
| |
| let data_size; |
| let src_data_size = self.inj_data.len(); |
| if write_size as usize > (src_data_size - self.data_offset) { |
| data_size = src_data_size - self.data_offset; |
| end = src_data_size as usize; |
| } else { |
| data_size = write_size; |
| end = self.data_offset + write_size as usize; |
| } |
| |
| // Build the data to send. |
| let mut wav_data = Vec::from_iter(self.inj_data[start..end].iter().cloned()); |
| // Fill out with zeroes. |
| wav_data.resize_with(write_size as usize, || 0); |
| |
| // If we're writing any wav data, clear our bookkeeping of having zeroed out the buffer. |
| if !(data_size == 0) { |
| self.zeros_written = 0; |
| } |
| |
| // Only write to the VMO when we're going to be changing the contents. |
| if self.zeros_written < self.work_space as usize { |
| if split_point == 0 { |
| vmo.write(&wav_data, self.next_write)?; |
| } else { |
| vmo.write(&wav_data[0..split_point], self.next_write)?; |
| vmo.write(&wav_data[split_point..], 0)?; |
| } |
| } |
| self.data_offset += data_size as usize; |
| self.zeros_written += write_size - data_size; |
| } |
| self.next_write = write_end; |
| |
| Ok(()) |
| } |
| |
| async fn run( |
| &mut self, |
| mut rx: mpsc::Receiver<InjectMsg>, |
| va_input: InputProxy, |
| active: Arc<Mutex<bool>>, |
| ) -> Result<(), Error> { |
| let mut input_events = va_input.take_event_stream(); |
| self.va_input = Some(va_input); |
| |
| // Monotonic time of last OnPositionNotify |
| let mut last_notify = zx::Time::from_nanos(0); |
| |
| loop { |
| select! { |
| rx_msg = rx.next() => { |
| match rx_msg { |
| None => { |
| bail!("Got None InjectMsg Event, exiting worker"); |
| }, |
| Some(InjectMsg::Flush) => { |
| let active = await!(active.lock()); |
| self.flush(); |
| } |
| Some(InjectMsg::Data { data }) => { |
| let mut active = await!(active.lock()); |
| self.set_data(data)?; |
| *(active) = true; |
| } |
| } |
| }, |
| input_msg = input_events.try_next() => { |
| match input_msg? { |
| None => { |
| bail!("Got None InputEvent Message, exiting worker"); |
| }, |
| Some(InputEvent::OnSetFormat { frames_per_second, sample_format, num_channels, external_delay}) => { |
| self.on_set_format(frames_per_second, sample_format, num_channels, external_delay)?; |
| }, |
| Some(InputEvent::OnBufferCreated { ring_buffer, num_ring_buffer_frames, notifications_per_ring }) => { |
| self.on_buffer_created(ring_buffer, num_ring_buffer_frames, notifications_per_ring)?; |
| }, |
| Some(InputEvent::OnPositionNotify { ring_position, clock_time }) => { |
| // Log if we have been delayed more than 150ms. This is highly abnormal and |
| // is indicative of possible glitching in the injection. |
| let now = zx::Time::get(zx::ClockId::Monotonic); |
| if last_notify > zx::Time::from_nanos(0) { |
| let interval = now - last_notify; |
| if interval > zx::Duration::from_millis(150) { |
| fx_log_info!("Input position not updated for 150ms({:?}). Expect glitches.", interval.into_millis()); |
| } |
| } |
| last_notify = now; |
| |
| let mut active = await!(active.lock()); |
| self.on_position_notify(ring_position, clock_time, *active)?; |
| if self.zeros_written >= self.work_space as usize { |
| *(active) = false; |
| } |
| }, |
| Some(evt) => { |
| fx_log_info!("Got unknown InputEvent {:?}", evt); |
| } |
| } |
| }, |
| }; |
| } |
| } |
| } |
| |
| #[derive(Debug)] |
| struct VirtualInput { |
| injection_data: Vec<Vec<u8>>, |
| active: Arc<Mutex<bool>>, |
| have_data: bool, |
| |
| sample_format: AudioSampleFormat, |
| channels: u8, |
| frames_per_second: u32, |
| |
| input_sender: Option<mpsc::Sender<InjectMsg>>, |
| } |
| |
| impl VirtualInput { |
| fn new(sample_format: AudioSampleFormat, channels: u8, frames_per_second: u32) -> Self { |
| VirtualInput { |
| injection_data: vec![], |
| active: Arc::new(Mutex::new(false)), |
| have_data: false, |
| |
| sample_format, |
| channels, |
| frames_per_second, |
| |
| input_sender: None, |
| } |
| } |
| |
| pub fn start_input(&mut self) -> Result<(), Error> { |
| let va_input = app::client::connect_to_service::<InputMarker>()?; |
| va_input.clear_format_ranges()?; |
| let sample_format = get_zircon_sample_format(self.sample_format); |
| |
| va_input.add_format_range( |
| sample_format as u32, |
| self.frames_per_second, |
| self.frames_per_second, |
| self.channels, |
| self.channels, |
| ASF_RANGE_FLAG_FPS_CONTINUOUS, |
| )?; |
| |
| // set buffer size to be 500ms-1000ms |
| let frames_1ms = self.frames_per_second / 1000; |
| let frames_low = 500 * frames_1ms; |
| let frames_high = 1000 * frames_1ms; |
| let frames_modulo = 1 * frames_1ms; |
| va_input.set_ring_buffer_restrictions(frames_low, frames_high, frames_modulo)?; |
| |
| va_input.add()?; |
| let (tx, rx) = mpsc::channel(512); |
| let active = self.active.clone(); |
| fasync::spawn( |
| async move { |
| let mut worker = InputWorker::default(); |
| await!(worker.run(rx, va_input, active))?; |
| Ok::<(), Error>(()) |
| } |
| .unwrap_or_else(|e| eprintln!("Input injection thread failed: {:?}", e)), |
| ); |
| |
| self.input_sender = Some(tx); |
| Ok(()) |
| } |
| |
| pub fn play(&mut self, index: usize) -> Result<(), Error> { |
| let sender = |
| self.input_sender.as_mut().ok_or(format_err!("input_sender not initialized"))?; |
| sender.try_send(InjectMsg::Flush)?; |
| sender.try_send(InjectMsg::Data { data: self.injection_data[index].clone() })?; |
| Ok(()) |
| } |
| |
| pub fn stop(&mut self) -> Result<(), Error> { |
| // The input worker will handle setting the active flag to false after it has zeroed out the vmo. |
| let sender = |
| self.input_sender.as_mut().ok_or(format_err!("input_sender not initialized"))?; |
| sender.try_send(InjectMsg::Flush)?; |
| Ok(()) |
| } |
| } |
| |
| #[derive(Debug)] |
| enum InjectMsg { |
| Flush, |
| Data { data: Vec<u8> }, |
| } |
| |
| #[derive(Debug)] |
| struct VirtualAudio { |
| // Output is from the AudioCore side, so it's what we'll be capturing |
| output_sample_format: AudioSampleFormat, |
| output_channels: u8, |
| output_frames_per_second: u32, |
| output: Option<OutputProxy>, |
| |
| // Input is from the AudioCore side, so it's the audio we'll be injecting |
| input_sample_format: AudioSampleFormat, |
| input_channels: u8, |
| input_frames_per_second: u32, |
| |
| controller: ControlProxy, |
| } |
| |
| impl VirtualAudio { |
| fn new() -> Result<VirtualAudio, Error> { |
| let va_control = app::client::connect_to_service::<ControlMarker>()?; |
| Ok(VirtualAudio { |
| output_sample_format: AudioSampleFormat::Signed16, |
| output_channels: 2, |
| output_frames_per_second: 48000, |
| output: None, |
| |
| input_sample_format: AudioSampleFormat::Signed16, |
| input_channels: 2, |
| input_frames_per_second: 16000, |
| |
| controller: va_control, |
| }) |
| } |
| } |
| |
| /// Perform Audio operations. |
| /// |
| /// Note this object is shared among all threads created by server. |
| #[derive(Debug)] |
| pub struct AudioFacade { |
| // TODO(perley): This will be needed after migrating to using virtual audio devices rather than |
| // renderer+capturer in the facade. |
| vad_control: RwLock<VirtualAudio>, |
| audio_output: RwLock<VirtualOutput>, |
| audio_input: RwLock<VirtualInput>, |
| initialized: Mutex<bool>, |
| } |
| |
| impl AudioFacade { |
| async fn ensure_initialized(&self) -> Result<(), Error> { |
| let mut initialized = await!(self.initialized.lock()); |
| if !*(initialized) { |
| let controller = self.vad_control.read().controller.clone(); |
| await!(controller.disable())?; |
| await!(controller.enable())?; |
| self.audio_input.write().start_input()?; |
| self.audio_output.write().start_output()?; |
| *(initialized) = true; |
| } |
| Ok(()) |
| } |
| |
| pub fn new() -> Result<AudioFacade, Error> { |
| let vad_control = RwLock::new(VirtualAudio::new()?); |
| let audio_output = RwLock::new(VirtualOutput::new( |
| vad_control.read().output_sample_format, |
| vad_control.read().output_channels, |
| vad_control.read().output_frames_per_second, |
| )?); |
| let audio_input = RwLock::new(VirtualInput::new( |
| vad_control.read().input_sample_format, |
| vad_control.read().input_channels, |
| vad_control.read().input_frames_per_second, |
| )); |
| let initialized = Mutex::new(false); |
| |
| Ok(AudioFacade { vad_control, audio_output, audio_input, initialized }) |
| } |
| |
| pub async fn start_output_save(&self) -> Result<Value, Error> { |
| await!(self.ensure_initialized())?; |
| let capturing = self.audio_output.read().capturing.clone(); |
| let mut capturing = await!(capturing.lock()); |
| if !*capturing { |
| let mut write = self.audio_output.write(); |
| let sender = write |
| .output_sender |
| .as_mut() |
| .ok_or(format_err!("Failed unwrapping output sender"))?; |
| sender.try_send(ExtractMsg::Start)?; |
| *(capturing) = true; |
| |
| Ok(to_value(true)?) |
| } else { |
| bail!("Cannot StartOutputSave, already started.") |
| } |
| } |
| |
| pub async fn stop_output_save(&self) -> Result<Value, Error> { |
| await!(self.ensure_initialized())?; |
| let capturing = self.audio_output.read().capturing.clone(); |
| let mut capturing = await!(capturing.lock()); |
| if *capturing { |
| let mut write = self.audio_output.write(); |
| write.extracted_data.clear(); |
| |
| let sender = write |
| .output_sender |
| .as_mut() |
| .ok_or(format_err!("Failed unwrapping output sender"))?; |
| |
| let (tx, mut rx) = mpsc::channel(512); |
| sender.try_send(ExtractMsg::Stop { out_sender: tx })?; |
| |
| let mut saved_audio = await!(rx.next()) |
| .ok_or_else(|| format_err!("StopOutputSave failed, could not retrieve data."))?; |
| |
| write.write_header(saved_audio.len() as u32)?; |
| |
| write.extracted_data.append(&mut saved_audio); |
| *(capturing) = false; |
| Ok(to_value(true)?) |
| } else { |
| bail!("Cannot StopOutputSave, not started.") |
| } |
| } |
| |
| pub async fn get_output_audio(&self) -> Result<Value, Error> { |
| await!(self.ensure_initialized())?; |
| let capturing = self.audio_output.read().capturing.clone(); |
| let capturing = await!(capturing.lock()); |
| if !*capturing { |
| Ok(to_value(base64::encode(&self.audio_output.read().extracted_data))?) |
| } else { |
| bail!("GetOutputAudio failed, still saving.") |
| } |
| } |
| |
| pub async fn put_input_audio(&self, args: Value) -> Result<Value, Error> { |
| await!(self.ensure_initialized())?; |
| let data = args.get("data").ok_or(format_err!("PutInputAudio failed, no data"))?; |
| let data = data.as_str().ok_or(format_err!("PutInputAudio failed, data not string"))?; |
| |
| let mut wave_data_vec = base64::decode(data)?; |
| let sample_index = args["index"].as_u64().ok_or(format_err!("index not a number"))?; |
| let sample_index = sample_index as usize; |
| |
| // TODO(perley): check wave format for correct bits per sample and float/int. |
| let byte_cnt = wave_data_vec.len(); |
| { |
| let active = self.audio_input.read().active.clone(); |
| let active = await!(active.lock()); |
| if *active { |
| bail!("PutInputAudio failed, currently injecting audio.") |
| } |
| |
| { |
| let mut write = self.audio_input.write(); |
| // Make sure we have somewhere to store the wav data. |
| if write.injection_data.len() <= sample_index { |
| write.injection_data.resize(sample_index + 1, vec![]); |
| } |
| |
| write.injection_data[sample_index].clear(); |
| write.injection_data[sample_index].append(&mut wave_data_vec); |
| write.have_data = true; |
| } |
| } |
| Ok(to_value(byte_cnt)?) |
| } |
| |
| pub async fn start_input_injection(&self, args: Value) -> Result<Value, Error> { |
| await!(self.ensure_initialized())?; |
| { |
| let sample_index = args["index"].as_u64().ok_or(format_err!("index not a number"))?; |
| let sample_index = sample_index as usize; |
| |
| let active = self.audio_input.read().active.clone(); |
| let active = await!(active.lock()); |
| if *active { |
| bail!("StartInputInjection failed, already active.") |
| } else if !self.audio_input.read().have_data { |
| bail!("StartInputInjection failed, no Audio data to inject.") |
| } |
| self.audio_input.write().play(sample_index)?; |
| } |
| Ok(to_value(true)?) |
| } |
| |
| pub async fn stop_input_injection(&self) -> Result<Value, Error> { |
| await!(self.ensure_initialized())?; |
| { |
| let active = self.audio_input.read().active.clone(); |
| let active = await!(active.lock()); |
| if !*active { |
| bail!("StopInputInjection failed, not active.") |
| } |
| self.audio_input.write().stop()?; |
| } |
| |
| Ok(to_value(true)?) |
| } |
| } |