blob: 348720972dd75be13b77d8a00fa4df8ab681d2e8 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{format_err, Context as _, Error};
use bt_a2dp as a2dp;
use fidl_fuchsia_media::{AudioFormat, AudioUncompressedFormat, DomainFormat, PcmFormat};
use fuchsia_async as fasync;
use fuchsia_audio_codec::StreamProcessor;
use fuchsia_trace as trace;
use fuchsia_zircon::{self as zx, DurationNum};
use futures::{
io::AsyncWrite,
stream::BoxStream,
task::{Context, Poll},
FutureExt, Stream, StreamExt,
};
use std::{collections::VecDeque, pin::Pin};
use tracing::info;
pub struct EncodedStream {
/// The input media stream
source: BoxStream<'static, fuchsia_audio_device::Result<Vec<u8>>>,
/// The encoder input.
encoder: Box<dyn AsyncWrite + Unpin + Send>,
/// The underlying encoder stream
encoded_stream: BoxStream<'static, Result<Vec<u8>, Error>>,
/// Bytes that have been sent to the encoder and not flushed.
unflushed_bytecount: usize,
/// Bytes that are buffered to send to the encoder
encoder_input_buffers: VecDeque<Vec<u8>>,
/// Cursor on the first buffer waiting indicating the next byte to be written to the encoder
encoder_input_cursor: usize,
/// Number of bytes to encode of the input before flushing to get an output packet
pcm_bytes_per_encoded_packet: usize,
}
impl EncodedStream {
/// Build a new EncodedStream which produces encoded frames from the given `source`.
/// Returns an error if codec setup fails. Successfully building a EncodedStream does not
/// guarantee that the system can encode - many errors can only be detected once encoding
/// is attempted. EncodedStream produces a Some(Err) result in these cases. It is
/// recommended to confirm that the system can encode using `EncodedStream::test()` first.
pub fn build(
input_format: PcmFormat,
source: BoxStream<'static, fuchsia_audio_device::Result<Vec<u8>>>,
config: &a2dp::codec::MediaCodecConfig,
) -> Result<Self, Error> {
let encoder_settings = config.encoder_settings()?;
let bytes_per_pcm_frame =
(input_format.bits_per_sample / 8) as usize * input_format.channel_map.len();
let pcm_bytes_per_encoded_packet = config.pcm_frames_per_encoded_frame()
* bytes_per_pcm_frame
* config.frames_per_packet();
let pcm_input_format = DomainFormat::Audio(AudioFormat::Uncompressed(
AudioUncompressedFormat::Pcm(input_format),
));
let mut encoder =
Box::new(StreamProcessor::create_encoder(pcm_input_format, encoder_settings)?);
let encoded_stream = encoder.take_output_stream()?.boxed();
Ok(Self {
source,
encoder,
encoded_stream,
unflushed_bytecount: 0,
encoder_input_buffers: VecDeque::new(),
encoder_input_cursor: 0,
pcm_bytes_per_encoded_packet,
})
}
/// Build a test version of this, that replaces the encoder with a set of streams that are
/// given in the constructor.
#[cfg(test)]
fn build_test(
source: BoxStream<'static, fuchsia_audio_device::Result<Vec<u8>>>,
encoder: Box<dyn AsyncWrite + Unpin + Send>,
encoded_stream: BoxStream<'static, Result<Vec<u8>, Error>>,
pcm_bytes_per_encoded_packet: usize,
) -> Self {
Self {
source,
encoder,
encoded_stream,
unflushed_bytecount: 0,
encoder_input_buffers: VecDeque::new(),
encoder_input_cursor: 0,
pcm_bytes_per_encoded_packet,
}
}
/// Run a preliminary test for a encoding audio in `input_format` into the codec `config`.
pub async fn test(
input_format: PcmFormat,
config: &a2dp::codec::MediaCodecConfig,
) -> Result<(), Error> {
let silence_source = SilenceStream::build(input_format.clone());
let mut encoder = EncodedStream::build(input_format, silence_source.boxed(), config)
.context("Building encoder")?;
match encoder.next().await {
Some(Ok(encoded_frame)) => {
if encoded_frame.is_empty() {
Err(format_err!("Encoded frame was empty"))
} else {
Ok(())
}
}
Some(Err(e)) => Err(e),
None => Err(format_err!("Encoder ended stream")),
}
}
}
impl Stream for EncodedStream {
type Item = Result<Vec<u8>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Read audio out.
while let Poll::Ready(item) = self.source.poll_next_unpin(cx) {
match item {
None => {
info!("Audio stream closed.");
return Poll::Ready(None);
}
Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Some(Ok(bytes)) => {
trace::instant!( c"bt-a2dp-source", c"Media:PacketReceived",
trace::Scope::Thread, "bytes" => bytes.len() as u64);
self.encoder_input_buffers.push_back(bytes)
}
}
}
// Push audio into the encoder.
while let Some(vec) = self.encoder_input_buffers.pop_front() {
let cursor = self.encoder_input_cursor;
match Pin::new(&mut self.encoder).poll_write(cx, &vec[cursor..]) {
Poll::Pending => {
self.encoder_input_buffers.push_front(vec);
break;
}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Ok(written)) => {
self.encoder_input_cursor = cursor + written;
self.unflushed_bytecount = self.unflushed_bytecount + written;
// flush() if we have sent enough bytes to generate a frame
if self.unflushed_bytecount > self.pcm_bytes_per_encoded_packet {
// Attempt to flush.
if let Poll::Ready(Ok(())) = Pin::new(&mut self.encoder).poll_flush(cx) {
self.unflushed_bytecount = 0;
}
}
if self.encoder_input_cursor != vec.len() {
self.encoder_input_buffers.push_front(vec);
} else {
// Reset to the front of the next buffer.
self.encoder_input_cursor = 0;
}
}
}
}
// Finally, read data out of the encoder if it's ready.
self.encoded_stream.poll_next_unpin(cx)
}
}
const PCM_SAMPLE_SIZE: usize = 2;
struct SilenceStream {
pcm_format: PcmFormat,
next_frame_timer: fasync::Timer,
/// the last time we delivered frames.
last_frame_time: Option<zx::Time>,
}
impl futures::Stream for SilenceStream {
type Item = fuchsia_audio_device::Result<Vec<u8>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let now = zx::Time::get_monotonic();
if self.last_frame_time.is_none() {
self.last_frame_time = Some(now - 1.second());
}
let last_time = self.last_frame_time.as_ref().unwrap().clone();
let repeats = (now - last_time).into_seconds();
if repeats == 0 {
self.next_frame_timer = fasync::Timer::new(last_time + 1.second());
let poll = self.next_frame_timer.poll_unpin(cx);
assert_eq!(Poll::Pending, poll);
return Poll::Pending;
}
// Generate one second of silence.
let pcm_frame_size = self.pcm_format.channel_map.len() * PCM_SAMPLE_SIZE;
let buffer = vec![0; self.pcm_format.frames_per_second as usize * pcm_frame_size];
self.last_frame_time = Some(last_time + 1.second());
Poll::Ready(Some(Ok(buffer)))
}
}
impl SilenceStream {
fn build(pcm_format: PcmFormat) -> Self {
Self {
pcm_format,
next_frame_timer: fasync::Timer::new(fasync::Time::INFINITE_PAST),
last_frame_time: None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::io;
use std::sync::{Arc, Mutex};
/// A stream that just returns a looping string of numbers.
#[derive(Clone)]
struct CountingStream(Arc<Mutex<CountingStreamInner>>);
struct CountingStreamInner {
next: u16,
ready_bytes: usize,
}
impl Default for CountingStream {
fn default() -> Self {
Self(Arc::new(Mutex::new(CountingStreamInner { next: 0, ready_bytes: 0 })))
}
}
impl CountingStream {
fn set_bytes_ready(&self, bytes: usize) {
self.0.lock().unwrap().ready_bytes = bytes;
}
}
impl futures::Stream for CountingStream {
type Item = fuchsia_audio_device::Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let s = Pin::into_inner(self);
let mut locked = s.0.lock().unwrap();
if locked.ready_bytes == 0 {
return Poll::Pending;
}
let len = (locked.ready_bytes / std::mem::size_of::<u16>()) as u16;
let mut vec = Vec::with_capacity(locked.ready_bytes);
for i in 0..len {
vec.extend_from_slice(&locked.next.wrapping_add(i).to_be_bytes());
}
locked.next = locked.next.wrapping_add(len);
locked.ready_bytes = 0;
Poll::Ready(Some(Ok(vec)))
}
}
/// An "encoder" that just buffers the input and sends it to the output when it's asked for.
#[derive(Clone)]
struct PassthroughEncoder(Arc<Mutex<PassthroughEncoderInner>>);
struct PassthroughEncoderInner {
// The k
buffered: VecDeque<Vec<u8>>,
stalled: bool,
}
impl Default for PassthroughEncoder {
fn default() -> Self {
Self(Arc::new(Mutex::new(PassthroughEncoderInner {
buffered: VecDeque::new(),
stalled: false,
})))
}
}
impl PassthroughEncoder {
fn stall_input(&self, stall: bool) {
self.0.lock().unwrap().stalled = stall;
}
fn push_input(&self, input: Vec<u8>) {
self.0.lock().unwrap().buffered.push_front(input);
}
fn get_output(&self) -> Option<Vec<u8>> {
self.0.lock().unwrap().buffered.pop_back()
}
fn is_stalled(&self) -> bool {
self.0.lock().unwrap().stalled
}
}
impl AsyncWrite for PassthroughEncoder {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
if self.is_stalled() {
Poll::Pending
} else {
self.push_input(buf.iter().cloned().collect());
Poll::Ready(Ok(buf.len()))
}
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl Stream for PassthroughEncoder {
type Item = Result<Vec<u8>, Error>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(vec) = self.get_output() {
Poll::Ready(Some(Ok(vec)))
} else {
Poll::Pending
}
}
}
#[test]
fn test_stalled_encoder_input() {
let input_stream = CountingStream::default();
let passthrough = PassthroughEncoder::default();
let passthrough_input = passthrough.clone();
let passthrough_output = passthrough.clone();
let mut stream = EncodedStream::build_test(
input_stream.clone().boxed(),
Box::new(passthrough_input),
passthrough_output.boxed(),
/* pcm bytes per encoded packet */ 500,
);
let mut noop_cx = Context::from_waker(futures::task::noop_waker_ref());
// Polling for the next thing should run a whole cycle without an issue.
input_stream.set_bytes_ready(2);
match stream.poll_next_unpin(&mut noop_cx) {
Poll::Ready(Some(Ok(data))) => assert_eq!(vec![0, 0], data),
x => panic!("Expected ready poll, got {:?}", x),
};
// Stall the input of the encoder.
passthrough.stall_input(true);
// Polling should queue up because the encoder is stalled.
input_stream.set_bytes_ready(2);
assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
input_stream.set_bytes_ready(2);
assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
// Unstall the input of the encoder.
passthrough.stall_input(false);
// Next time we poll, we didn't skip any packets.
input_stream.set_bytes_ready(2);
match stream.poll_next_unpin(&mut noop_cx) {
Poll::Ready(Some(Ok(data))) => assert_eq!(vec![0, 1], data),
x => panic!("Expected ready poll, got {:?}", x),
};
match stream.poll_next_unpin(&mut noop_cx) {
Poll::Ready(Some(Ok(data))) => assert_eq!(vec![0, 2], data),
x => panic!("Expected ready poll, got {:?}", x),
};
match stream.poll_next_unpin(&mut noop_cx) {
Poll::Ready(Some(Ok(data))) => assert_eq!(vec![0, 3], data),
x => panic!("Expected ready poll, got {:?}", x),
};
}
}
#[cfg(feature = "test_encoding")]
mod encoder_tests {
use super::*;
use bt_avdtp as avdtp;
use fidl_fuchsia_media::{AudioChannelId, AudioPcmMode};
pub async fn test_encoding_capability(
capability: &avdtp::ServiceCapability,
) -> Result<(), Error> {
let config = a2dp::codec::MediaCodecConfig::try_from(capability)?;
let channel_map = match config.channel_count()? {
1 => vec![AudioChannelId::Lf],
2 => vec![AudioChannelId::Lf, AudioChannelId::Rf],
_ => panic!("More than 2 channels not supported"),
};
let input_format = PcmFormat {
pcm_mode: AudioPcmMode::Linear,
bits_per_sample: 16,
frames_per_second: 48000,
channel_map,
};
EncodedStream::test(input_format, &config).await
}
#[test]
fn test_sbc_encodes_correctly() {
let mut exec = fasync::TestExecutor::new();
let sbc_capability = &avdtp::ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x15, 2, 53],
};
match exec.run_singlethreaded(test_encoding_capability(sbc_capability)) {
Ok(()) => {}
x => panic!("Expected encoding SBC to be Ok but got {:?}", x),
};
}
#[test]
fn test_aac_encodes_correctly() {
let mut exec = fasync::TestExecutor::new();
let aac_capability = &avdtp::ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_AAC,
codec_extra: vec![128, 1, 4, 4, 226, 0],
};
match exec.run_singlethreaded(test_encoding_capability(aac_capability)) {
Ok(()) => {}
x => panic!("Expected encoding AAC to be Ok but got {:?}", x),
};
}
}