blob: 348720972dd75be13b77d8a00fa4df8ab681d2e8 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{format_err, Context as _, Error};
use bt_a2dp as a2dp;
use fidl_fuchsia_media::{AudioFormat, AudioUncompressedFormat, DomainFormat, PcmFormat};
use fuchsia_async as fasync;
use fuchsia_audio_codec::StreamProcessor;
use fuchsia_trace as trace;
use fuchsia_zircon::{self as zx, DurationNum};
use futures::{
task::{Context, Poll},
FutureExt, Stream, StreamExt,
use std::{collections::VecDeque, pin::Pin};
use tracing::info;
pub struct EncodedStream {
/// The input media stream
source: BoxStream<'static, fuchsia_audio_device::Result<Vec<u8>>>,
/// The encoder input.
encoder: Box<dyn AsyncWrite + Unpin + Send>,
/// The underlying encoder stream
encoded_stream: BoxStream<'static, Result<Vec<u8>, Error>>,
/// Bytes that have been sent to the encoder and not flushed.
unflushed_bytecount: usize,
/// Bytes that are buffered to send to the encoder
encoder_input_buffers: VecDeque<Vec<u8>>,
/// Cursor on the first buffer waiting indicating the next byte to be written to the encoder
encoder_input_cursor: usize,
/// Number of bytes to encode of the input before flushing to get an output packet
pcm_bytes_per_encoded_packet: usize,
impl EncodedStream {
/// Build a new EncodedStream which produces encoded frames from the given `source`.
/// Returns an error if codec setup fails. Successfully building a EncodedStream does not
/// guarantee that the system can encode - many errors can only be detected once encoding
/// is attempted. EncodedStream produces a Some(Err) result in these cases. It is
/// recommended to confirm that the system can encode using `EncodedStream::test()` first.
pub fn build(
input_format: PcmFormat,
source: BoxStream<'static, fuchsia_audio_device::Result<Vec<u8>>>,
config: &a2dp::codec::MediaCodecConfig,
) -> Result<Self, Error> {
let encoder_settings = config.encoder_settings()?;
let bytes_per_pcm_frame =
(input_format.bits_per_sample / 8) as usize * input_format.channel_map.len();
let pcm_bytes_per_encoded_packet = config.pcm_frames_per_encoded_frame()
* bytes_per_pcm_frame
* config.frames_per_packet();
let pcm_input_format = DomainFormat::Audio(AudioFormat::Uncompressed(
let mut encoder =
Box::new(StreamProcessor::create_encoder(pcm_input_format, encoder_settings)?);
let encoded_stream = encoder.take_output_stream()?.boxed();
Ok(Self {
unflushed_bytecount: 0,
encoder_input_buffers: VecDeque::new(),
encoder_input_cursor: 0,
/// Build a test version of this, that replaces the encoder with a set of streams that are
/// given in the constructor.
fn build_test(
source: BoxStream<'static, fuchsia_audio_device::Result<Vec<u8>>>,
encoder: Box<dyn AsyncWrite + Unpin + Send>,
encoded_stream: BoxStream<'static, Result<Vec<u8>, Error>>,
pcm_bytes_per_encoded_packet: usize,
) -> Self {
Self {
unflushed_bytecount: 0,
encoder_input_buffers: VecDeque::new(),
encoder_input_cursor: 0,
/// Run a preliminary test for a encoding audio in `input_format` into the codec `config`.
pub async fn test(
input_format: PcmFormat,
config: &a2dp::codec::MediaCodecConfig,
) -> Result<(), Error> {
let silence_source = SilenceStream::build(input_format.clone());
let mut encoder = EncodedStream::build(input_format, silence_source.boxed(), config)
.context("Building encoder")?;
match {
Some(Ok(encoded_frame)) => {
if encoded_frame.is_empty() {
Err(format_err!("Encoded frame was empty"))
} else {
Some(Err(e)) => Err(e),
None => Err(format_err!("Encoder ended stream")),
impl Stream for EncodedStream {
type Item = Result<Vec<u8>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Read audio out.
while let Poll::Ready(item) = self.source.poll_next_unpin(cx) {
match item {
None => {
info!("Audio stream closed.");
return Poll::Ready(None);
Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Some(Ok(bytes)) => {
trace::instant!( c"bt-a2dp-source", c"Media:PacketReceived",
trace::Scope::Thread, "bytes" => bytes.len() as u64);
// Push audio into the encoder.
while let Some(vec) = self.encoder_input_buffers.pop_front() {
let cursor = self.encoder_input_cursor;
match Pin::new(&mut self.encoder).poll_write(cx, &vec[cursor..]) {
Poll::Pending => {
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Ok(written)) => {
self.encoder_input_cursor = cursor + written;
self.unflushed_bytecount = self.unflushed_bytecount + written;
// flush() if we have sent enough bytes to generate a frame
if self.unflushed_bytecount > self.pcm_bytes_per_encoded_packet {
// Attempt to flush.
if let Poll::Ready(Ok(())) = Pin::new(&mut self.encoder).poll_flush(cx) {
self.unflushed_bytecount = 0;
if self.encoder_input_cursor != vec.len() {
} else {
// Reset to the front of the next buffer.
self.encoder_input_cursor = 0;
// Finally, read data out of the encoder if it's ready.
const PCM_SAMPLE_SIZE: usize = 2;
struct SilenceStream {
pcm_format: PcmFormat,
next_frame_timer: fasync::Timer,
/// the last time we delivered frames.
last_frame_time: Option<zx::Time>,
impl futures::Stream for SilenceStream {
type Item = fuchsia_audio_device::Result<Vec<u8>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let now = zx::Time::get_monotonic();
if self.last_frame_time.is_none() {
self.last_frame_time = Some(now - 1.second());
let last_time = self.last_frame_time.as_ref().unwrap().clone();
let repeats = (now - last_time).into_seconds();
if repeats == 0 {
self.next_frame_timer = fasync::Timer::new(last_time + 1.second());
let poll = self.next_frame_timer.poll_unpin(cx);
assert_eq!(Poll::Pending, poll);
return Poll::Pending;
// Generate one second of silence.
let pcm_frame_size = self.pcm_format.channel_map.len() * PCM_SAMPLE_SIZE;
let buffer = vec![0; self.pcm_format.frames_per_second as usize * pcm_frame_size];
self.last_frame_time = Some(last_time + 1.second());
impl SilenceStream {
fn build(pcm_format: PcmFormat) -> Self {
Self {
next_frame_timer: fasync::Timer::new(fasync::Time::INFINITE_PAST),
last_frame_time: None,
mod tests {
use super::*;
use futures::io;
use std::sync::{Arc, Mutex};
/// A stream that just returns a looping string of numbers.
struct CountingStream(Arc<Mutex<CountingStreamInner>>);
struct CountingStreamInner {
next: u16,
ready_bytes: usize,
impl Default for CountingStream {
fn default() -> Self {
Self(Arc::new(Mutex::new(CountingStreamInner { next: 0, ready_bytes: 0 })))
impl CountingStream {
fn set_bytes_ready(&self, bytes: usize) {
self.0.lock().unwrap().ready_bytes = bytes;
impl futures::Stream for CountingStream {
type Item = fuchsia_audio_device::Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let s = Pin::into_inner(self);
let mut locked = s.0.lock().unwrap();
if locked.ready_bytes == 0 {
return Poll::Pending;
let len = (locked.ready_bytes / std::mem::size_of::<u16>()) as u16;
let mut vec = Vec::with_capacity(locked.ready_bytes);
for i in 0..len {
} =;
locked.ready_bytes = 0;
/// An "encoder" that just buffers the input and sends it to the output when it's asked for.
struct PassthroughEncoder(Arc<Mutex<PassthroughEncoderInner>>);
struct PassthroughEncoderInner {
// The k
buffered: VecDeque<Vec<u8>>,
stalled: bool,
impl Default for PassthroughEncoder {
fn default() -> Self {
Self(Arc::new(Mutex::new(PassthroughEncoderInner {
buffered: VecDeque::new(),
stalled: false,
impl PassthroughEncoder {
fn stall_input(&self, stall: bool) {
self.0.lock().unwrap().stalled = stall;
fn push_input(&self, input: Vec<u8>) {
fn get_output(&self) -> Option<Vec<u8>> {
fn is_stalled(&self) -> bool {
impl AsyncWrite for PassthroughEncoder {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
if self.is_stalled() {
} else {
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
impl Stream for PassthroughEncoder {
type Item = Result<Vec<u8>, Error>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(vec) = self.get_output() {
} else {
fn test_stalled_encoder_input() {
let input_stream = CountingStream::default();
let passthrough = PassthroughEncoder::default();
let passthrough_input = passthrough.clone();
let passthrough_output = passthrough.clone();
let mut stream = EncodedStream::build_test(
/* pcm bytes per encoded packet */ 500,
let mut noop_cx = Context::from_waker(futures::task::noop_waker_ref());
// Polling for the next thing should run a whole cycle without an issue.
match stream.poll_next_unpin(&mut noop_cx) {
Poll::Ready(Some(Ok(data))) => assert_eq!(vec![0, 0], data),
x => panic!("Expected ready poll, got {:?}", x),
// Stall the input of the encoder.
// Polling should queue up because the encoder is stalled.
assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
// Unstall the input of the encoder.
// Next time we poll, we didn't skip any packets.
match stream.poll_next_unpin(&mut noop_cx) {
Poll::Ready(Some(Ok(data))) => assert_eq!(vec![0, 1], data),
x => panic!("Expected ready poll, got {:?}", x),
match stream.poll_next_unpin(&mut noop_cx) {
Poll::Ready(Some(Ok(data))) => assert_eq!(vec![0, 2], data),
x => panic!("Expected ready poll, got {:?}", x),
match stream.poll_next_unpin(&mut noop_cx) {
Poll::Ready(Some(Ok(data))) => assert_eq!(vec![0, 3], data),
x => panic!("Expected ready poll, got {:?}", x),
#[cfg(feature = "test_encoding")]
mod encoder_tests {
use super::*;
use bt_avdtp as avdtp;
use fidl_fuchsia_media::{AudioChannelId, AudioPcmMode};
pub async fn test_encoding_capability(
capability: &avdtp::ServiceCapability,
) -> Result<(), Error> {
let config = a2dp::codec::MediaCodecConfig::try_from(capability)?;
let channel_map = match config.channel_count()? {
1 => vec![AudioChannelId::Lf],
2 => vec![AudioChannelId::Lf, AudioChannelId::Rf],
_ => panic!("More than 2 channels not supported"),
let input_format = PcmFormat {
pcm_mode: AudioPcmMode::Linear,
bits_per_sample: 16,
frames_per_second: 48000,
EncodedStream::test(input_format, &config).await
fn test_sbc_encodes_correctly() {
let mut exec = fasync::TestExecutor::new();
let sbc_capability = &avdtp::ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x15, 2, 53],
match exec.run_singlethreaded(test_encoding_capability(sbc_capability)) {
Ok(()) => {}
x => panic!("Expected encoding SBC to be Ok but got {:?}", x),
fn test_aac_encodes_correctly() {
let mut exec = fasync::TestExecutor::new();
let aac_capability = &avdtp::ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_AAC,
codec_extra: vec![128, 1, 4, 4, 226, 0],
match exec.run_singlethreaded(test_encoding_capability(aac_capability)) {
Ok(()) => {}
x => panic!("Expected encoding AAC to be Ok but got {:?}", x),