// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{format_err, Context as _, Error},
bt_avdtp::{MediaCodecType, RtpHeader},
AudioConsumerProxy, AudioConsumerStartFlags, AudioConsumerStatus, AudioSampleFormat,
AudioStreamType, Compression, SessionAudioConsumerFactoryMarker,
SessionAudioConsumerFactoryProxy, StreamPacket, StreamSinkProxy, NO_TIMESTAMP,
fuchsia_async as fasync,
fuchsia_trace as trace,
fuchsia_zircon::{self as zx, HandleBased},
future::{AbortHandle, Abortable, Aborted, MapOk},
io::{AsyncWrite, AsyncWriteExt},
task::{Context, Poll},
Future, StreamExt, TryFutureExt,
log::{info, warn},
std::{convert::TryInto, io, pin::Pin},
use crate::latm::AudioMuxElement;
// Max supported by AudioConsumer as defined in the FIDL interface
const NUM_BUFFERS: usize = 16;
// For both SBC and AAC, buffers are less than a page
const DEFAULT_BUFFER_LEN: usize = 4096;
struct AudioConsumerSink {
buffers: Vec<zx::Vmo>,
buffers_free: HashSet<usize>,
tx_count: u64,
first_packet_sent: Option<fasync::Time>,
flags_receiver: mpsc::Receiver<u32>,
stream_sink: StreamSinkProxy,
audio_consumer: AudioConsumerProxy,
/// A set of futures that finish when packets are no longer in use by the sink.
FuturesUnordered<MapOk<QueryResponseFut<()>, Box<dyn FnOnce(()) -> usize + Send>>>,
impl AudioConsumerSink {
fn build(
audio_consumer: &mut AudioConsumerProxy,
frames_per_second: u32,
mut compression: Option<Compression>,
flags_receiver: mpsc::Receiver<u32>,
) -> Result<AudioConsumerSink, Error> {
let (stream_sink, stream_sink_server) = fidl::endpoints::create_proxy()?;
let mut audio_stream_type = AudioStreamType {
sample_format: AudioSampleFormat::Signed16,
channels: 2, // Stereo
// Build buffer set
let mut buffers = Vec::new();
let mut vmos_for_sink = Vec::new();
for _ in 0..NUM_BUFFERS {
let vmo = zx::Vmo::create(DEFAULT_BUFFER_LEN as u64)?;
| zx::Rights::DUPLICATE
| zx::Rights::GET_PROPERTY
| zx::Rights::TRANSFER
| zx::Rights::MAP,
let buffers_free = (0..buffers.len()).collect();
&mut vmos_for_sink.into_iter(),
&mut audio_stream_type,
Ok(AudioConsumerSink {
tx_count: 0,
first_packet_sent: None,
audio_consumer: audio_consumer.clone(),
send_futures: FuturesUnordered::new(),
fn poll_writable(&mut self, cx: &mut Context<'_>) -> Poll<()> {
while let Poll::Ready(Some(result)) = self.send_futures.poll_next_unpin(cx) {
match result {
Ok(index) => {
Err(e) => warn!("Failed to send packet: {}", e),
if !self.buffers_free.is_empty() {
} else {
// The waker in cx will have been set to wake by the send_futures above,
// so the writer will be woken up when any buffer becomes free.
/// Get a free stream sink buffer and write `frame` into it, marking as in use.
/// Returns "none" if no buffer is available, or if the buffers allocated aren't large enough
/// for the frame.
fn copy_to_buffer(&mut self, frame: &[u8]) -> Option<usize> {
let buffer_index = match self.buffers_free.iter().next() {
Some(idx) => *idx,
None => return None,
let buffer = &mut self.buffers[buffer_index];
if frame.len() > DEFAULT_BUFFER_LEN {
return None;
if let Err(_) = buffer.write(frame, 0) {
return None;
/// Push an encoded media frame into the buffer and signal that it's there to media.
fn send_frame(&mut self, frame: &[u8], flags: u32) -> Result<(), Error> {
trace::duration!("bt-a2dp-sink", "Media:PacketSent");
let buffer_index = self.copy_to_buffer(frame).ok_or(format_err!("No free buffers"))?;
self.tx_count += 1;
trace::flow_begin!("stream-sink", "SendPacket", self.tx_count);
let mut packet = StreamPacket {
payload_buffer_id: buffer_index as u32,
payload_offset: 0,
payload_size: frame.len() as u64,
buffer_config: 0,
stream_segment_id: 0,
if self.first_packet_sent.is_none() {
let now = fasync::Time::now();
self.first_packet_sent = Some(now);
self.audio_consumer.start(AudioConsumerStartFlags::SupplyDriven, 0, NO_TIMESTAMP)?;
let send_fut = self.stream_sink.send_packet(&mut packet);
self.send_futures.push(send_fut.map_ok(Box::new(move |_| buffer_index)));
impl AsyncWrite for AudioConsumerSink {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let mut flags = 0;
loop {
match self.flags_receiver.try_next() {
Ok(Some(flag)) => flags |= flag,
Ok(None) | Err(_) => break,
match self.send_frame(buf, flags) {
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(e) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
// We write data immediately to the shared VMO. There is nothing to flush here.
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
// TODO( Actually close the stream.
pub enum PlayerEvent {
#[derive(Debug, PartialEq)]
enum ChannelMode {
impl From<u8> for ChannelMode {
fn from(bits: u8) -> Self {
match bits {
0 => ChannelMode::Mono,
1 => ChannelMode::DualChannel,
2 => ChannelMode::Stereo,
3 => ChannelMode::JointStereo,
_ => panic!("invalid channel mode"),
bitfield! {
pub struct SbcHeader(u32);
impl Debug;
syncword, _: 7, 0;
subbands, _: 8;
allocation_method, _: 9;
into ChannelMode, channel_mode, _: 11, 10;
blocks_bits, _: 13, 12;
frequency_bits, _: 15, 14;
bitpool_bits, _: 23, 16;
crccheck, _: 31, 24;
impl SbcHeader {
/// The number of channels, based on the channel mode in the header.
/// From Table 12.18 in the A2DP Spec.
fn channels(&self) -> usize {
match self.channel_mode() {
ChannelMode::Mono => 1,
_ => 2,
fn has_syncword(&self) -> bool {
const SBC_SYNCWORD: u8 = 0x9c;
self.syncword() == SBC_SYNCWORD
/// The number of blocks, based on tbe bits in the header.
/// From Table 12.17 in the A2DP Spec.
fn blocks(&self) -> usize {
4 * (self.blocks_bits() + 1) as usize
fn bitpool(&self) -> usize {
self.bitpool_bits() as usize
/// Number of subbands based on the header bit.
/// From Table 12.20 in the A2DP Spec.
fn num_subbands(&self) -> usize {
if self.subbands() {
} else {
/// Calculates the frame length.
/// Formula from Section 12.9 of the A2DP Spec.
fn frame_length(&self) -> Result<usize, Error> {
if !self.has_syncword() {
return Err(format_err!("syncword does not match"));
let len = 4 + (4 * self.num_subbands() * self.channels()) / 8;
let rest = (match self.channel_mode() {
ChannelMode::Mono | ChannelMode::DualChannel => {
self.blocks() * self.channels() * self.bitpool()
ChannelMode::Stereo => self.blocks() * self.bitpool(),
ChannelMode::JointStereo => self.num_subbands() + (self.blocks() * self.bitpool()),
} as f64
/ 8.0)
.ceil() as usize;
Ok(len + rest)
/// Players are configured and accept media frames, which are sent to the
/// media subsystem.
pub struct Player {
codec_config: MediaCodecConfig,
audio_sink: Pin<Box<dyn AsyncWrite + Send>>,
watch_status_stream: HangingGetStream<AudioConsumerStatus>,
playing: bool,
next_packet_flags: mpsc::Sender<u32>,
last_seq_played: u16,
decoder_task: Option<AbortHandle>,
impl Player {
/// Build Player given a SessionAudioConsumerFactoryProxy to use to create an audio consumer
/// with the given `session_id`
/// The `codec_config` specifies the expected codec of payload data frames.
pub fn new(
session_id: u64,
codec_config: MediaCodecConfig,
audio_consumer_factory: SessionAudioConsumerFactoryProxy,
) -> Result<Player, Error> {
let mut decoder = None;
let encoding = codec_config.stream_encoding();
let mut compression = Some(Compression { type_: encoding.to_string(), parameters: None });
let mut decoder_task = None;
if codec_config.codec_type() == &MediaCodecType::AUDIO_SBC {
let dec = StreamProcessor::create_decoder(
compression = None;
decoder = Some(dec);
let (mut audio_consumer, audio_consumer_server) = fidl::endpoints::create_proxy()?;
audio_consumer_factory.create_audio_consumer(session_id, audio_consumer_server)?;
let (sender, receiver) = mpsc::channel(1);
let mut audio_sink: Pin<Box<dyn AsyncWrite + Send>> = Box::pin(AudioConsumerSink::build(
&mut audio_consumer,
if let Some(mut decoder) = decoder {
let mut decoded_stream = decoder.take_output_stream()?;
let mut sink = audio_sink;
let decoding_task_fut = async move {
while let Some(decoded) = {
let decoded = match decoded {
Ok(decoded) => decoded,
Err(e) => {
info!("Decoded stream failed to produce: {:?}", e);
if let Err(e) = sink.write_all(&decoded).await {
info!("AudioConsumer failed to write: {:?}", e);
audio_sink = Box::pin(decoder);
let (stop_handle, stop_registration) = AbortHandle::new_pair();
let abortable_task_fut = Abortable::new(decoding_task_fut, stop_registration);
fuchsia_async::Task::local(async move {
if let Err(Aborted) = abortable_task_fut.await {
info!("Decoder forwarding task completed.");
decoder_task = Some(stop_handle);
let watch_status_stream =
HangingGetStream::new(Box::new(move || Some(audio_consumer.watch_status())));
Ok(Player {
playing: false,
next_packet_flags: sender,
last_seq_played: 0,
/// Test that a given codec `config` is playable.
/// If an error occurs, playing audio via any Player with the same `codec_config` is likely to
/// fail.
/// It also tests that a decoder can be found if the AudioConsumer provided by the system
/// cannot decode the compressed format as specified in the `config`.
/// Communicates with the current default AudioConsumer.
pub async fn test_playable(config: &MediaCodecConfig) -> Result<(), Error> {
let audio_consumer_factory =
.context("Failed to connect to audio consumer factory")?;
let mut player = Self::new(0, config.clone(), audio_consumer_factory)?;
// wait for initial event
match player.next_event().await {
PlayerEvent::Closed => Err(format_err!("AudioConsumer closed")),
PlayerEvent::Status(_status) => Ok(()),
/// Given a buffer with an SBC frame at the start, find the length of the
/// SBC frame.
fn find_sbc_frame_len(buf: &[u8]) -> Result<usize, Error> {
if buf.len() < 4 {
return Err(format_err!("Buffer too short for header"));
/// Accepts a payload which may contain multiple frames and breaks it into
/// frames and sends it to media.
pub async fn push_payload(&mut self, payload: &[u8]) -> Result<(), Error> {
trace::duration_begin!("bt-a2dp-sink", "Media:PacketReceived");
let rtp = RtpHeader::new(payload)?;
let seq = rtp.sequence_number();
let discontinuity = seq.wrapping_sub(self.last_seq_played.wrapping_add(1));
self.last_seq_played = seq;
if discontinuity > 0 && self.playing {
let _ = self.next_packet_flags.try_send(STREAM_PACKET_FLAG_DISCONTINUITY);
let mut offset = RtpHeader::LENGTH;
// TODO( Handle SBC packet header
offset += self.codec_config.rtp_frame_header().len();
while offset < payload.len() {
match self.codec_config.codec_type() {
&MediaCodecType::AUDIO_SBC => {
let len = Player::find_sbc_frame_len(&payload[offset..]).or_else(|e| {
let _ = self.next_packet_flags.try_send(STREAM_PACKET_FLAG_DISCONTINUITY);
trace::instant!("bt-a2dp-sink", "SBC frame", trace::Scope::Thread);
if offset + len > payload.len() {
let _ = self.next_packet_flags.try_send(STREAM_PACKET_FLAG_DISCONTINUITY);
return Err(format_err!("Ran out of buffer for SBC frame"));
if let Err(e) = self.audio_sink.write_all(&payload[offset..offset + len]).await
info!("Failed to push packet to audio: {:?}", e);
offset += len;
&MediaCodecType::AUDIO_AAC => {
let element = AudioMuxElement::try_from_bytes(&payload[offset..])?;
let frame = element.get_payload(0).ok_or(format_err!("Payload not found"))?;
if let Err(e) = self.audio_sink.write_all(frame).await {
info!("Failed to write packet to sink: {:?}", e);
// Only one payload per AAC RTP Pakcet.
_ => return Err(format_err!("Unrecognized codec!")),
if let Err(e) = self.audio_sink.flush().await {
info!("Failed to flush audio packets: {:?}", e);
self.playing = true;
trace::duration_end!("bt-a2dp-sink", "Media:PacketReceived");
/// If PlayerEvent::Closed is returned, that indicates the underlying
/// service went away and the player should be closed/rebuilt
/// This function should be always be polled when running
pub fn next_event(&mut self) -> impl Future<Output = PlayerEvent> + '_ {
let next_fut =;
async move {
match next_fut.await {
None => PlayerEvent::Closed,
Some(Err(_)) => PlayerEvent::Closed,
Some(Ok(s)) => PlayerEvent::Status(s),
impl Drop for Player {
fn drop(&mut self) {
self.decoder_task.take().map(|t| t.abort());
pub(crate) mod tests {
use super::*;
use matches::assert_matches;
use {
AudioConsumerMarker, AudioConsumerRequest, AudioConsumerRequestStream,
SessionAudioConsumerFactoryRequest, SessionAudioConsumerFactoryRequestStream,
StreamSinkRequest, StreamSinkRequestStream,
fuchsia_async as fasync,
futures::{pin_mut, task::Poll, FutureExt},
fn test_frame_length() {
// 44.1, 16 blocks, Joint Stereo, Loudness, 8 subbands, 53 bitpool (Android P)
let header1 = [0x9c, 0xBD, 0x35, 0xA2];
const HEADER1_FRAMELEN: usize = 119;
let head = SbcHeader(u32::from_le_bytes(header1));
assert_eq!(16, head.blocks());
assert_eq!(ChannelMode::JointStereo, head.channel_mode());
assert_eq!(2, head.channels());
assert_eq!(53, head.bitpool());
assert_eq!(HEADER1_FRAMELEN, head.frame_length().unwrap());
Player::find_sbc_frame_len(&[0x9c, 0xBD, 0x35, 0xA2]).unwrap()
// 44.1, 16 blocks, Stereo, Loudness, 8 subbands, 53 bitpool (OS X)
let header2 = [0x9c, 0xB9, 0x35, 0xA2];
const HEADER2_FRAMELEN: usize = 118;
let head = SbcHeader(u32::from_le_bytes(header2));
assert_eq!(16, head.blocks());
assert_eq!(ChannelMode::Stereo, head.channel_mode());
assert_eq!(2, head.channels());
assert_eq!(53, head.bitpool());
assert_eq!(HEADER2_FRAMELEN, head.frame_length().unwrap());
assert_eq!(HEADER2_FRAMELEN, Player::find_sbc_frame_len(&header2).unwrap());
pub fn expect_audio_consumer_sink_setup(
exec: &mut fasync::Executor,
audio_consumer_request_stream: &mut AudioConsumerRequestStream,
expect_compression: bool,
) -> (StreamSinkRequestStream, Vec<zx::Vmo>) {
let complete =
exec.run_until_stalled(&mut audio_consumer_request_stream.select_next_some());
let audio_consumer_req = match complete {
Poll::Ready(Ok(req)) => req,
x => panic!("expected audio consumer request message but got {:?}", x),
let (stream_sink_request, buffers, compression) = match audio_consumer_req {
AudioConsumerRequest::CreateStreamSink {
} => (stream_sink_request, buffers, compression),
_ => panic!("should be CreateStreamSink"),
assert_eq!(expect_compression, compression.is_some());
buffers[0].write(&[0], 0).expect_err("Write should fail");
let sink_request_stream = stream_sink_request
.expect("a sink request stream to be created from the request");
(sink_request_stream, buffers)
pub(crate) fn expect_player_setup(
mut exec: &mut fasync::Executor,
audio_consumer_factory_request_stream: &mut SessionAudioConsumerFactoryRequestStream,
codec_type: MediaCodecType,
expected_session_id: u64,
) -> (StreamSinkRequestStream, AudioConsumerRequestStream, Vec<zx::Vmo>) {
let complete =
exec.run_until_stalled(&mut audio_consumer_factory_request_stream.select_next_some());
let audio_consumer_create_req = match complete {
Poll::Ready(Ok(req)) => req,
x => panic!("expected audio consumer create request message but got {:?}", x),
let (audio_consumer_create_request, session_id) = match audio_consumer_create_req {
SessionAudioConsumerFactoryRequest::CreateAudioConsumer {
} => (audio_consumer_request, session_id),
assert_eq!(session_id, expected_session_id);
let mut audio_consumer_request_stream =
audio_consumer_create_request.into_stream().expect("audio consumer stream");
let expect_compression = codec_type == MediaCodecType::AUDIO_AAC;
let (sink_request_stream, buffers) = expect_audio_consumer_sink_setup(
&mut exec,
&mut audio_consumer_request_stream,
(sink_request_stream, audio_consumer_request_stream, buffers)
pub(crate) fn respond_event_status(
exec: &mut fasync::Executor,
audio_consumer_request_stream: &mut AudioConsumerRequestStream,
status: AudioConsumerStatus,
) {
let complete =
exec.run_until_stalled(&mut audio_consumer_request_stream.select_next_some());
let audio_consumer_req = match complete {
Poll::Ready(Ok(req)) => req,
x => panic!("expected audio consumer request message but got {:?}", x),
let watch_status_responder = match audio_consumer_req {
AudioConsumerRequest::WatchStatus { responder, .. } => responder,
_ => panic!("should be WatchStatus"),
watch_status_responder.send(status).expect("watch status sent");
/// Runs through the setup sequence of a AudioConsumer, returning the audio consumer,
/// StreamSinkRequestStream and AudioConsumerRequestStream that it is communicating with, and
/// the VMO payload buffer that was provided to the AudioConsumer.
pub(crate) fn setup_player(
mut exec: &mut fasync::Executor,
codec_config: MediaCodecConfig,
) -> (Player, StreamSinkRequestStream, AudioConsumerRequestStream, Vec<zx::Vmo>) {
const TEST_SESSION_ID: u64 = 1;
let codec_type = codec_config.codec_type().clone();
let (audio_consumer_factory_proxy, mut audio_consumer_factory_request_stream) =
.expect("proxy pair creation");
let mut player = Player::new(TEST_SESSION_ID, codec_config, audio_consumer_factory_proxy)
.expect("player to build");
let (sink_request_stream, mut audio_consumer_request_stream, sink_vmos) =
&mut exec,
&mut audio_consumer_factory_request_stream,
let player_next_event_fut = player.next_event();
// player creation is done in stages, waiting for the below source/sink
// objects to be created. Just run the creation up until the first
// blocking point.
assert!(exec.run_until_stalled(&mut player_next_event_fut).is_pending());
&mut exec,
&mut audio_consumer_request_stream,
AudioConsumerStatus {
min_lead_time: Some(50),
max_lead_time: Some(500),
error: None,
presentation_timeline: None,
match exec.run_until_stalled(&mut player_next_event_fut) {
Poll::Ready(PlayerEvent::Status(_s)) => {}
x => panic!("Player should be ready with status but got {:?}", x),
(player, sink_request_stream, audio_consumer_request_stream, sink_vmos)
fn build_config(codec_type: &MediaCodecType) -> MediaCodecConfig {
match codec_type {
&MediaCodecType::AUDIO_SBC => {
MediaCodecConfig::build(codec_type.clone(), &[0x82, 0x15, 2, 250])
&MediaCodecType::AUDIO_AAC => MediaCodecConfig::build(codec_type.clone(), &[0; 6]),
x => panic!("Can't build unknown codec type {:?}", x),
.expect("codec should build")
fn test_player_setup() {
let mut exec = fasync::Executor::new().expect("executor should build");
setup_player(&mut exec, build_config(&MediaCodecType::AUDIO_AAC));
fn test_player_closed() {
let mut exec = fasync::Executor::new().expect("executor should build");
let (mut player, _sink_request_stream, mut audio_consumer_request_stream, _sink_vmo) =
setup_player(&mut exec, build_config(&MediaCodecType::AUDIO_AAC));
let player_next_event_fut = player.next_event();
// player creation is done in stages, waiting for the below source/sink
// objects to be created. Just run the creation up until the first
// blocking point.
assert!(exec.run_until_stalled(&mut player_next_event_fut).is_pending());
let complete =
exec.run_until_stalled(&mut audio_consumer_request_stream.select_next_some());
let watch_status_responder = match complete {
Poll::Ready(Ok(AudioConsumerRequest::WatchStatus { responder, .. })) => responder,
x => panic!("expected audio consumer WatchStatus request but got {:?}", x),
match exec.run_until_stalled(&mut player_next_event_fut) {
Poll::Ready(PlayerEvent::Closed) => {}
x => panic!("Expected player to be closed, got {:?}", x),
const AUDIO_MUX_LENGTH: usize = 928;
const AAC_HEADER_LENGTH: usize = 13;
fn build_rtp_aac_packet(payload: &[u8]) -> [u8; AAC_RTP_PACKET_LENGTH] {
// raw rtp header with sequence number of 1 followed by 1 aac AudioMuxElement with 0's for
// payload
let rtp: &[u8] = &[128, 96, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0];
let aac_header: &[u8] = &[71, 252, 0, 0, 176, 144, 128, 3, 0, 255, 255, 255, 150];
raw[MUX_ELEMENT_START..(MUX_ELEMENT_START + payload.len())].copy_from_slice(payload);
/// Tests that the creation of a player executes with the expected interaction with the
/// AudioConsumer and stream setup.
/// This tests that the buffer is sent correctly and that data "sent" through the shared
/// VMO is readable by the receiver of the VMO.
/// We do this by mocking the AudioConsumer and StreamSink interfaces that are used.
fn test_send_frame() {
let mut exec = fasync::Executor::new().expect("executor should build");
let (mut player, mut sink_request_stream, _player_request_stream, sink_vmos) =
setup_player(&mut exec, build_config(&MediaCodecType::AUDIO_AAC));
let content = &[1, 2, 3, 4, 5, 6, 7, 8, 9];
let payload = build_rtp_aac_packet(content);
let payload_fut = player.push_payload(&payload);
match exec.run_singlethreaded(&mut payload_fut) {
Ok(()) => {}
x => panic!("Expected push_payload Ok(()) but got {:?}", x),
let sink_req = exec
.run_singlethreaded(&mut sink_request_stream.select_next_some())
.expect("got a packet");
let (offset, size, buffer_index) = match sink_req {
StreamSinkRequest::SendPacket { responder, packet, .. } => {
responder.send().expect("sent response");
packet.payload_size as usize,
packet.payload_buffer_id as usize,
_ => panic!("should have received a packet"),
let mut recv = Vec::with_capacity(size);
recv.resize(size, 0);
.read(recv.as_mut_slice(), offset)
.expect("should be able to read packet data");
assert_eq!(&recv[..content.len()], content, "received didn't match payload");
/// Helper function for pushing payloads to player and returning the packet flags
fn push_payload_get_flags(
payload: &[u8],
exec: &mut fasync::Executor,
player: &mut Player,
sink_request_stream: &mut StreamSinkRequestStream,
) -> u32 {
let push_fut = player.push_payload(payload);
exec.run_singlethreaded(&mut push_fut).expect("wrote payload");
// Drive until the sink receives a packet.
let sink_req = exec
.run_singlethreaded(&mut sink_request_stream.select_next_some())
.expect("sent packet");
match sink_req {
StreamSinkRequest::SendPacket { packet, responder, .. } => {
responder.send().expect("send reponse should work");
_ => panic!("should have received a packet"),
/// Test that discontinuous packets are flagged as such. We do this by
/// sending packets through a Player and examining them after they come out
/// of the mock StreamSink interface.
fn test_packet_discontinuities() {
let mut exec = fasync::Executor::new().expect("executor should build");
let (mut player, mut sink_request_stream, mut player_request_stream, _) =
setup_player(&mut exec, build_config(&MediaCodecType::AUDIO_AAC));
let mut raw = build_rtp_aac_packet(&[]);
let flags = push_payload_get_flags(&raw, &mut exec, &mut player, &mut sink_request_stream);
// should not have a discontinuity yet
// Should have started the player when the first packet gets pushed.
let complete = exec.run_until_stalled(&mut player_request_stream.select_next_some());
let player_req = match complete {
Poll::Ready(Ok(req)) => req,
x => panic!("expected player req message but got {:?}", x),
assert_matches!(player_req, AudioConsumerRequest::Start { .. });
// increment sequence number
raw[3] = 2;
let flags = push_payload_get_flags(&raw, &mut exec, &mut player, &mut sink_request_stream);
// should not have a discontinuity yet
// introduce discont
raw[3] = 8;
let flags = push_payload_get_flags(&raw, &mut exec, &mut player, &mut sink_request_stream);
/// Test that parsing works when pushing an AAC packet
fn test_aac_parsing() {
let mut exec = fasync::Executor::new().expect("executor should build");
let (mut player, mut sink_request_stream, mut player_request_stream, _) =
setup_player(&mut exec, build_config(&MediaCodecType::AUDIO_AAC));
let mut raw = build_rtp_aac_packet(&[]);
push_payload_get_flags(&raw, &mut exec, &mut player, &mut sink_request_stream);
// Should have started the player
let complete = exec.run_until_stalled(&mut player_request_stream.select_next_some());
let player_req = match complete {
Poll::Ready(Ok(req)) => req,
x => panic!("expected player req message but got {:?}", x),
assert_matches!(player_req, AudioConsumerRequest::Start { .. });
// corrupt AudioMuxElement
raw[RtpHeader::LENGTH + 1] = 0xff;
let push_fut = player.push_payload(&raw);
exec.run_singlethreaded(&mut push_fut).expect_err("fail to write corrupted payload");
#[cfg(features = "test_encoding")]
/// Test that bytes flow through to decoder when SBC is active
fn test_sbc_decoder_write() {
let mut exec = fasync::Executor::new().expect("executor should build");
let (mut player, mut sink_request_stream, mut player_request_stream, _) =
setup_player(&mut exec, build_config(&MediaCodecType::AUDIO_SBC));
// raw rtp header with sequence number of 1 followed by 1 sbc frame
let raw = [
128, 96, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0x9c, 0xb1, 0x20, 0x3b, 0x80, 0x00, 0x00,
0x11, 0x7f, 0xfa, 0xab, 0xef, 0x7f, 0xfa, 0xab, 0xef, 0x80, 0x4a, 0xab, 0xaf, 0x80,
0xf2, 0xab, 0xcf, 0x83, 0x8a, 0xac, 0x32, 0x8a, 0x78, 0x8a, 0x53, 0x90, 0xdc, 0xad,
0x49, 0x96, 0xba, 0xaa, 0xe6, 0x9c, 0xa2, 0xab, 0xac, 0xa2, 0x72, 0xa9, 0x2d, 0xa8,
0x9a, 0xab, 0x75, 0xae, 0x82, 0xad, 0x49, 0xb4, 0x6a, 0xad, 0xb1, 0xba, 0x52, 0xa9,
0xa8, 0xc0, 0x32, 0xad, 0x11, 0xc6, 0x5a, 0xab, 0x3a,
push_payload_get_flags(&raw, &mut exec, &mut player, &mut sink_request_stream);
// Should have started the player
let complete = exec.run_until_stalled(&mut player_request_stream.select_next_some());
let player_req = match complete {
Poll::Ready(Ok(req)) => req,
x => panic!("expected player req message but got {:?}", x),
assert_matches!(player_req, AudioConsumerRequest::Start { .. });
/// Test that the buffers behave correctly for AudioConsumerSink
fn test_sink_buffer_handling() {
let mut exec = fasync::Executor::new().expect("executor should build");
let (mut audio_consumer_proxy, mut audio_consumer_request_stream) =
create_proxy_and_stream::<AudioConsumerMarker>().expect("proxy creation");
let (_sender, receiver) = mpsc::channel(1);
let mut sink = AudioConsumerSink::build(&mut audio_consumer_proxy, 48000, None, receiver)
.expect("builds correctliy");
let (mut sink_request_stream, _buffers) =
expect_audio_consumer_sink_setup(&mut exec, &mut audio_consumer_request_stream, false);
let payload = &[0xF0, 0x9F, 0x92, 0x96];
let mut responders = Vec::new();
// run out of send buffers
for _ in 0..NUM_BUFFERS {
assert!(sink.send_frame(payload, 0).is_ok());
// Should send the packet through
let req = exec.run_until_stalled(&mut;
match req {
Poll::Ready(Some(Ok(StreamSinkRequest::SendPacket { packet: _, responder }))) => {
x => panic!("Expected SendPacket request, got {:?}", x),
// No more buffers left, send_frame should be an error.
assert!(sink.send_frame(payload, 0).is_err());
// Writing to the sink shoould be pending.
let mut write_fut = sink.write_all(payload);
let (waker, write_fut_wake_count) = new_count_waker();
let mut counting_ctx = Context::from_waker(&waker);
assert!(write_fut.poll_unpin(&mut counting_ctx).is_pending());
// responding to one of the responders should wake up the writer.
responders.pop().expect("a responder").send().expect("responder to send correctly");
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
assert_eq!(1, write_fut_wake_count.get());
// Polling the write future should finish now, since a buffer is ready.
assert!(write_fut.poll_unpin(&mut counting_ctx).is_ready());
// Should have sent the packet through
let req = exec.run_until_stalled(&mut;
match req {
Poll::Ready(Some(Ok(StreamSinkRequest::SendPacket { packet: _, responder }))) => {
x => panic!("Expected SendPacket request, got {:?}", x),