[bluetooth][a2dp-sink] Use platform decoder for SBC audio
Use StreamProcessor to decode SBC audio in process instead of requiring
AudioConsumer to support audio/sbc decoding.
Test: fx run-test bt-a2dp-sink-tests
Change-Id: I5dde8f432971981ecc4f210bf37d80d90402d6cc
diff --git a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/BUILD.gn b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/BUILD.gn
index 177626e..83f152a 100644
--- a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/BUILD.gn
+++ b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/BUILD.gn
@@ -35,6 +35,7 @@
"//src/connectivity/bluetooth/lib/bt-a2dp",
"//src/connectivity/bluetooth/lib/bt-avdtp",
"//src/connectivity/bluetooth/lib/bt-avdtp",
+ "//src/connectivity/bluetooth/lib/fuchsia-audio-codec",
"//src/connectivity/bluetooth/lib/fuchsia-bluetooth",
"//src/lib/argh",
"//src/lib/cobalt/rust:fuchsia-cobalt",
diff --git a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink-unittests.cmx b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink-unittests.cmx
index 4a21df5..9b9facc 100644
--- a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink-unittests.cmx
+++ b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink-unittests.cmx
@@ -1,5 +1,20 @@
{
+ "facets": {
+ "fuchsia.test": {
+ "injected-services": {
+ "fuchsia.mediacodec.CodecFactory": "fuchsia-pkg://fuchsia.com/codec_factory#meta/codec_factory.cmx",
+ "fuchsia.sysmem.Allocator": "fuchsia-pkg://fuchsia.com/sysmem_connector#meta/sysmem_connector.cmx"
+ }
+ }
+ },
"program": {
"binary": "test/bt-a2dp-sink-unittests"
+ },
+ "sandbox": {
+ "services": [
+ "fuchsia.mediacodec.CodecFactory",
+ "fuchsia.sysmem.Allocator",
+ "fuchsia.logger.LogSink"
+ ]
}
}
diff --git a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink.cmx b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink.cmx
index 8502631..f2bd3013 100644
--- a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink.cmx
+++ b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink.cmx
@@ -9,6 +9,8 @@
"fuchsia.bluetooth.bredr.Profile",
"fuchsia.cobalt.LoggerFactory",
"fuchsia.logger.LogSink",
+ "fuchsia.mediacodec.CodecFactory",
+ "fuchsia.sysmem.Allocator",
"fuchsia.media.SessionAudioConsumerFactory",
"fuchsia.media.sessions2.Publisher"
]
diff --git a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/codec.rs b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/codec.rs
new file mode 100644
index 0000000..6b80f8d
--- /dev/null
+++ b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/codec.rs
@@ -0,0 +1,122 @@
+use bt_a2dp::media_types::*;
+use bt_avdtp as avdtp;
+use fidl_fuchsia_media::{AUDIO_ENCODING_AACLATM, AUDIO_ENCODING_SBC};
+use fuchsia_syslog::fx_log_warn;
+use std::convert::{TryFrom, TryInto};
+
+const SBC_CODEC_EXTRA_LEN: usize = 4;
+const AAC_CODEC_EXTRA_LEN: usize = 6;
+
+#[derive(Clone, Debug)]
+pub enum CodecExtra {
+ Sbc([u8; SBC_CODEC_EXTRA_LEN]),
+ Aac([u8; AAC_CODEC_EXTRA_LEN]),
+ Unknown,
+}
+
+pub enum CodecExtraError {
+ UnknownCodec,
+ InvalidLength,
+}
+
+impl CodecExtra {
+ /// Extract sampling freqency from SBC codec extra data field
+ /// (A2DP Sec. 4.3.2)
+ fn parse_sbc_sampling_frequency(codec_extra: &[u8]) -> Option<u32> {
+ if codec_extra.len() != SBC_CODEC_EXTRA_LEN {
+ fx_log_warn!("Invalid SBC codec extra length: {:?}", codec_extra.len());
+ return None;
+ }
+
+ let mut codec_info_bytes = [0_u8; SBC_CODEC_EXTRA_LEN];
+ codec_info_bytes.copy_from_slice(&codec_extra);
+
+ let codec_info = SbcCodecInfo(u32::from_be_bytes(codec_info_bytes));
+ let sample_freq = SbcSamplingFrequency::from_bits_truncate(codec_info.sampling_frequency());
+
+ match sample_freq {
+ SbcSamplingFrequency::FREQ48000HZ => Some(48000),
+ SbcSamplingFrequency::FREQ44100HZ => Some(44100),
+ _ => {
+ fx_log_warn!("Invalid sample_freq set in configuration {:?}", sample_freq);
+ None
+ }
+ }
+ }
+
+ /// Extract sampling frequency from AAC codec extra data field
+ /// (A2DP Sec. 4.5.2)
+ fn parse_aac_sampling_frequency(codec_extra: &[u8]) -> Option<u32> {
+ if codec_extra.len() != AAC_CODEC_EXTRA_LEN {
+ fx_log_warn!("Invalid AAC codec extra length: {:?}", codec_extra.len());
+ return None;
+ }
+
+ // AACMediaCodecInfo is represented as 8 bytes, with lower 6 bytes containing
+ // the codec extra data.
+ let mut codec_info_bytes = [0_u8; 8];
+ let codec_info_slice = &mut codec_info_bytes[2..8];
+
+ codec_info_slice.copy_from_slice(&codec_extra);
+
+ let codec_info = AACMediaCodecInfo(u64::from_be_bytes(codec_info_bytes));
+ let sample_freq = AACSamplingFrequency::from_bits_truncate(codec_info.sampling_frequency());
+
+ match sample_freq {
+ AACSamplingFrequency::FREQ48000HZ => Some(48000),
+ AACSamplingFrequency::FREQ44100HZ => Some(44100),
+ _ => {
+ fx_log_warn!("Invalid sample_freq set in configuration {:?}", sample_freq);
+ None
+ }
+ }
+ }
+
+ /// Parse the sampling frequency for this codec extra or return None
+ /// if none is configured.
+ pub fn sample_freq(&self) -> Option<u32> {
+ match self {
+ CodecExtra::Sbc(codec_extra) => Self::parse_sbc_sampling_frequency(codec_extra),
+ CodecExtra::Aac(codec_extra) => Self::parse_aac_sampling_frequency(codec_extra),
+ _ => None,
+ }
+ }
+
+ pub fn stream_type(&self) -> &str {
+ match self {
+ Self::Sbc(_) => AUDIO_ENCODING_SBC,
+ Self::Aac(_) => AUDIO_ENCODING_AACLATM,
+ Self::Unknown => "Unknown",
+ }
+ }
+}
+
+impl TryFrom<&avdtp::ServiceCapability> for CodecExtra {
+ type Error = CodecExtraError;
+
+ fn try_from(value: &avdtp::ServiceCapability) -> Result<Self, Self::Error> {
+ match value {
+ avdtp::ServiceCapability::MediaCodec {
+ media_type: avdtp::MediaType::Audio,
+ codec_type: avdtp::MediaCodecType::AUDIO_SBC,
+ codec_extra,
+ } => {
+ if codec_extra.len() != SBC_CODEC_EXTRA_LEN {
+ return Err(CodecExtraError::InvalidLength);
+ }
+ Ok(CodecExtra::Sbc(codec_extra[0..SBC_CODEC_EXTRA_LEN].try_into().unwrap()))
+ }
+ avdtp::ServiceCapability::MediaCodec {
+ media_type: avdtp::MediaType::Audio,
+ codec_type: avdtp::MediaCodecType::AUDIO_AAC,
+ codec_extra,
+ } => {
+ if codec_extra.len() != AAC_CODEC_EXTRA_LEN {
+ return Err(CodecExtraError::InvalidLength);
+ }
+ Ok(CodecExtra::Aac(codec_extra[0..AAC_CODEC_EXTRA_LEN].try_into().unwrap()))
+ }
+ _ => Err(CodecExtraError::UnknownCodec),
+ }
+ }
+}
diff --git a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/main.rs b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/main.rs
index 9b9a7b8..2714eaa 100644
--- a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/main.rs
+++ b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/main.rs
@@ -25,12 +25,14 @@
select, FutureExt, StreamExt,
},
parking_lot::Mutex,
- std::{collections::hash_map, collections::HashMap, sync::Arc},
+ std::{collections::hash_map, collections::HashMap, convert::TryFrom, sync::Arc},
};
+use crate::codec::CodecExtra;
use crate::inspect_types::StreamingInspectData;
mod avrcp_relay;
+mod codec;
mod connected_peers;
mod hanging_get;
mod inspect_types;
@@ -86,7 +88,7 @@
// Arbitrarily chosen ID for the AAC stream endpoint.
const AAC_SEID: u8 = 7;
-const DEFAULT_SAMPLE_RATE: u32 = 48000;
+pub const DEFAULT_SAMPLE_RATE: u32 = 48000;
const DEFAULT_SESSION_ID: u64 = 0;
/// Controls a stream endpoint and the media decoding task which is associated with it.
@@ -108,85 +110,13 @@
Stream { endpoint, encoding, suspend_sender: None }
}
- /// Extract sampling freqency from SBC codec extra data field
- /// (A2DP Sec. 4.3.2)
- fn parse_sbc_sampling_frequency(codec_extra: &[u8]) -> u32 {
- if codec_extra.len() != 4 {
- fx_log_warn!("Invalid SBC codec extra length: {:?}", codec_extra.len());
- return DEFAULT_SAMPLE_RATE;
- }
-
- let mut codec_info_bytes = [0_u8; 4];
- codec_info_bytes.copy_from_slice(&codec_extra);
-
- let codec_info = SbcCodecInfo(u32::from_be_bytes(codec_info_bytes));
- let sample_freq = SbcSamplingFrequency::from_bits_truncate(codec_info.sampling_frequency());
-
- match sample_freq {
- SbcSamplingFrequency::FREQ48000HZ => 48000,
- SbcSamplingFrequency::FREQ44100HZ => 44100,
- _ => {
- fx_log_warn!("Invalid sample_freq set in configuration {:?}", sample_freq);
- DEFAULT_SAMPLE_RATE
- }
- }
- }
-
- /// Extract sampling frequency from AAC codec extra data field
- /// (A2DP Sec. 4.5.2)
- fn parse_aac_sampling_frequency(codec_extra: &[u8]) -> u32 {
- if codec_extra.len() != 6 {
- fx_log_warn!("Invalid AAC codec extra length: {:?}", codec_extra.len());
- return DEFAULT_SAMPLE_RATE;
- }
-
- // AACMediaCodecInfo is represented as 8 bytes, with lower 6 bytes containing
- // the codec extra data.
- let mut codec_info_bytes = [0_u8; 8];
- let codec_info_slice = &mut codec_info_bytes[2..8];
-
- codec_info_slice.copy_from_slice(&codec_extra);
-
- let codec_info = AACMediaCodecInfo(u64::from_be_bytes(codec_info_bytes));
- let sample_freq = AACSamplingFrequency::from_bits_truncate(codec_info.sampling_frequency());
-
- match sample_freq {
- AACSamplingFrequency::FREQ48000HZ => 48000,
- AACSamplingFrequency::FREQ44100HZ => 44100,
- _ => {
- fx_log_warn!("Invalid sample_freq set in configuration {:?}", sample_freq);
- DEFAULT_SAMPLE_RATE
- }
- }
- }
-
- /// Get the currently configured sampling frequency for this stream or return a default value
- /// if none is configured.
- ///
- /// TODO: This should be removed once we have a structured way of accessing stream
- /// capabilities.
- fn sample_freq(&self) -> u32 {
+ /// Get the currently configured extra codec data
+ fn configured_codec_extra(&self) -> Result<CodecExtra, avdtp::Error> {
self.endpoint
- .get_configuration()
- .map(|caps| {
- for c in caps {
- match c {
- avdtp::ServiceCapability::MediaCodec {
- media_type: avdtp::MediaType::Audio,
- codec_type: avdtp::MediaCodecType::AUDIO_SBC,
- codec_extra,
- } => return Self::parse_sbc_sampling_frequency(&codec_extra),
- avdtp::ServiceCapability::MediaCodec {
- media_type: avdtp::MediaType::Audio,
- codec_type: avdtp::MediaCodecType::AUDIO_AAC,
- codec_extra,
- } => return Self::parse_aac_sampling_frequency(&codec_extra),
- _ => continue,
- };
- }
- DEFAULT_SAMPLE_RATE
- })
- .unwrap_or(DEFAULT_SAMPLE_RATE)
+ .get_configuration()?
+ .iter()
+ .find_map(|cap| CodecExtra::try_from(cap).ok())
+ .ok_or(avdtp::Error::InvalidState)
}
/// Attempt to start the media decoding task.
@@ -203,10 +133,11 @@
let (send, receive) = mpsc::channel(1);
self.suspend_sender = Some(send);
+ let codec_extra = self.configured_codec_extra()?;
+
fuchsia_async::spawn_local(decode_media_stream(
self.endpoint.take_transport()?,
- self.encoding.clone(),
- self.sample_freq(),
+ codec_extra,
// TODO(42976) get real media session id
DEFAULT_SESSION_ID,
receive,
@@ -260,13 +191,7 @@
let mut s = Streams::new();
// TODO(BT-533): detect codecs, add streams for each codec
// SBC is required
- if let Err(e) = player::Player::new(
- DEFAULT_SESSION_ID,
- AUDIO_ENCODING_SBC.to_string(),
- DEFAULT_SAMPLE_RATE,
- )
- .await
- {
+ if let Err(e) = player::Player::new(DEFAULT_SESSION_ID, CodecExtra::Sbc([0; 4])).await {
fx_log_warn!("Can't play required SBC audio: {}", e);
return Err(e);
}
@@ -377,14 +302,13 @@
/// Ends when signaled from `end_signal`, or when the media transport stream is closed.
async fn decode_media_stream(
mut stream: avdtp::MediaStream,
- encoding: String,
- sample_rate: u32,
+ codec_extra: CodecExtra,
session_id: u64,
mut end_signal: Receiver<()>,
mut inspect: StreamingInspectData,
cobalt_sender: CobaltSender,
) -> () {
- let mut player = match player::Player::new(session_id, encoding.clone(), sample_rate).await {
+ let mut player = match player::Player::new(session_id, codec_extra.clone()).await {
Ok(v) => v,
Err(e) => {
fx_log_info!("Can't setup player for Media: {:?}", e);
@@ -408,9 +332,11 @@
}
Some(Ok(packet)) => packet,
};
- if let Err(e) = player.push_payload(&pkt.as_slice()) {
+
+ if let Err(e) = player.push_payload(&pkt.as_slice()).await {
fx_log_info!("can't push packet: {:?}", e);
}
+
if !player.playing() {
player.play().unwrap_or_else(|e| fx_log_info!("Problem playing: {:}", e));
}
@@ -421,7 +347,7 @@
player::PlayerEvent::Closed => {
fx_log_info!("Rebuilding Player");
// The player died somehow? Attempt to rebuild the player.
- player = match player::Player::new(session_id, encoding.clone(), sample_rate).await {
+ player = match player::Player::new(session_id, codec_extra.clone()).await {
Ok(v) => v,
Err(e) => {
fx_log_info!("Can't rebuild player: {:?}", e);
@@ -431,7 +357,7 @@
},
player::PlayerEvent::Status(s) => {
fx_log_info!("PlayerEvent Status happened: {:?}", s);
- }
+ },
}
},
_ = inspect.update_interval.next() => {
@@ -445,13 +371,17 @@
}
let end_time = zx::Time::get(zx::ClockId::Monotonic);
- report_stream_metrics(cobalt_sender, &encoding, (end_time - start_time).into_seconds());
+ report_stream_metrics(cobalt_sender, &codec_extra, (end_time - start_time).into_seconds());
}
-fn report_stream_metrics(mut cobalt_sender: CobaltSender, encoding: &str, duration_seconds: i64) {
- let codec = match encoding.as_ref() {
- AUDIO_ENCODING_SBC => metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Sbc,
- AUDIO_ENCODING_AACLATM => metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Aac,
+fn report_stream_metrics(
+ mut cobalt_sender: CobaltSender,
+ codec_extra: &CodecExtra,
+ duration_seconds: i64,
+) {
+ let codec = match codec_extra {
+ CodecExtra::Sbc(_) => metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Sbc,
+ CodecExtra::Aac(_) => metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Aac,
_ => metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Unknown,
};
@@ -581,6 +511,7 @@
fn test_streams() {
let mut streams = Streams::new();
const LOCAL_ID: u8 = 1;
+ const TEST_SAMPLE_FREQ: u32 = 44100;
// An endpoint for testing
let s = avdtp::StreamEndpoint::new(
@@ -601,7 +532,6 @@
let id = s.local_id().clone();
let information = s.information();
let encoding = AUDIO_ENCODING_SBC.to_string();
- let sample_freq = 44100;
assert_matches!(streams.get_endpoint(&id), None);
@@ -632,12 +562,13 @@
)
.expect("Failed to configure endpoint");
- assert_eq!(sample_freq, streams.get_mut(&id).unwrap().sample_freq());
+ let stream = streams.get_mut(&id).expect("stream");
+ let codec_extra = stream.configured_codec_extra().expect("codec extra");
+ assert_matches!(codec_extra, CodecExtra::Sbc([41, 245, 2, 53]));
+ assert_matches!(codec_extra.sample_freq(), Some(TEST_SAMPLE_FREQ));
- let res = streams.get_mut(&id);
-
- assert_matches!(res.as_ref().unwrap().suspend_sender, None);
- assert_eq!(encoding, res.as_ref().unwrap().encoding);
+ assert_matches!(stream.suspend_sender, None);
+ assert_eq!(encoding, stream.encoding);
}
#[test]
@@ -645,6 +576,7 @@
fn test_aac_stream() {
let mut streams = Streams::new();
const LOCAL_ID: u8 = 1;
+ const TEST_SAMPLE_FREQ: u32 = 44100;
// An endpoint for testing
let s = avdtp::StreamEndpoint::new(
@@ -665,7 +597,6 @@
let id = s.local_id().clone();
let information = s.information();
let encoding = AUDIO_ENCODING_AACLATM.to_string();
- let sample_freq = 44100;
assert_matches!(streams.get_endpoint(&id), None);
@@ -696,12 +627,13 @@
)
.expect("Failed to configure endpoint");
- assert_eq!(sample_freq, streams.get_mut(&id).unwrap().sample_freq());
+ let stream = streams.get_mut(&id).expect("stream");
+ let codec_extra = stream.configured_codec_extra().expect("codec extra");
+ assert_matches!(codec_extra, CodecExtra::Aac([128, 1, 4, 4, 226, 0]));
+ assert_matches!(codec_extra.sample_freq(), Some(TEST_SAMPLE_FREQ));
- let res = streams.get_mut(&id);
-
- assert_matches!(res.as_ref().unwrap().suspend_sender, None);
- assert_eq!(encoding, res.as_ref().unwrap().encoding);
+ assert_matches!(stream.suspend_sender, None);
+ assert_eq!(encoding, stream.encoding);
}
#[test]
@@ -726,7 +658,7 @@
let (send, mut recv) = fake_cobalt_sender();
const TEST_DURATION: i64 = 1;
- report_stream_metrics(send, AUDIO_ENCODING_AACLATM, TEST_DURATION);
+ report_stream_metrics(send, &CodecExtra::Aac([0; 6]), TEST_DURATION);
let event = recv.try_next().expect("no stream error").expect("event present");
diff --git a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/player.rs b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/player.rs
index a609f8b..8cbca07 100644
--- a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/player.rs
+++ b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/player.rs
@@ -9,23 +9,30 @@
fidl_fuchsia_media::{
AudioConsumerProxy, AudioConsumerStartFlags, AudioConsumerStatus, AudioSampleFormat,
AudioStreamType, Compression, SessionAudioConsumerFactoryMarker,
- SessionAudioConsumerFactoryProxy, StreamPacket, StreamSinkProxy, AUDIO_ENCODING_AACLATM,
- AUDIO_ENCODING_SBC, NO_TIMESTAMP, STREAM_PACKET_FLAG_DISCONTINUITY,
+ SessionAudioConsumerFactoryProxy, StreamPacket, StreamSinkProxy, NO_TIMESTAMP,
+ STREAM_PACKET_FLAG_DISCONTINUITY,
},
+ fuchsia_audio_codec::StreamProcessor,
+ fuchsia_syslog::fx_log_info,
fuchsia_zircon::{self as zx, HandleBased},
- futures::{Future, FutureExt, StreamExt},
+ futures::{io::AsyncWriteExt, select, stream::pending, FutureExt, Stream, StreamExt},
};
+use crate::codec::CodecExtra;
use crate::hanging_get::HangingGetStream;
+use crate::DEFAULT_SAMPLE_RATE;
const DEFAULT_BUFFER_LEN: usize = 65536;
+pub type DecodedStreamItem = Result<Vec<u8>, Error>;
+pub type DecodedStream = Box<dyn Stream<Item = DecodedStreamItem> + Unpin>;
+
/// Players are configured and accept media frames, which are sent to the
/// media subsystem.
pub struct Player {
buffer: zx::Vmo,
buffer_len: usize,
- codec: String,
+ codec_extra: CodecExtra,
current_offset: usize,
stream_sink: StreamSinkProxy,
audio_consumer: AudioConsumerProxy,
@@ -33,6 +40,8 @@
playing: bool,
next_packet_flags: u32,
last_seq_played: u16,
+ decoder: Option<StreamProcessor>,
+ decoded_stream: DecodedStream,
}
pub enum PlayerEvent {
@@ -132,21 +141,30 @@
impl Player {
/// Attempt to make a new player that decodes and plays frames encoded in the
/// `codec`
- pub async fn new(session_id: u64, codec: String, sample_freq: u32) -> Result<Player, Error> {
+ pub async fn new(session_id: u64, codec_extra: CodecExtra) -> Result<Player, Error> {
let audio_consumer_factory =
fuchsia_component::client::connect_to_service::<SessionAudioConsumerFactoryMarker>()
.context("Failed to connect to audio consumer factory")?;
- Self::from_proxy(session_id, codec, sample_freq, audio_consumer_factory).await
+ Self::from_proxy(session_id, codec_extra, audio_consumer_factory).await
}
/// Build a AudioConsumer given a SessionAudioConsumerFactoryProxy.
/// Used in tests.
async fn from_proxy(
session_id: u64,
- codec: String,
- sample_freq: u32,
+ codec_extra: CodecExtra,
audio_consumer_factory: SessionAudioConsumerFactoryProxy,
) -> Result<Player, Error> {
+ let (decoder, decoded_stream) = match &codec_extra {
+ CodecExtra::Sbc(codec_extra_data) => {
+ let mut decoder =
+ StreamProcessor::create_decoder("audio/sbc", Some(codec_extra_data.to_vec()))?;
+ let decoded_stream = Box::new(decoder.take_output_stream()?);
+ (Some(decoder), decoded_stream as DecodedStream)
+ }
+ _ => (None, Box::new(pending::<DecodedStreamItem>()) as DecodedStream),
+ };
+
let (audio_consumer, audio_consumer_server) = fidl::endpoints::create_proxy()?;
audio_consumer_factory.create_audio_consumer(session_id, audio_consumer_server)?;
@@ -156,10 +174,15 @@
let mut audio_stream_type = AudioStreamType {
sample_format: AudioSampleFormat::Signed16,
channels: 2, // Stereo
- frames_per_second: sample_freq,
+ frames_per_second: codec_extra.sample_freq().unwrap_or(DEFAULT_SAMPLE_RATE),
};
- let mut compression = Compression { type_: codec.clone(), parameters: None };
+ let mut compression = match decoder {
+ None => {
+ Some(Compression { type_: codec_extra.stream_type().to_string(), parameters: None })
+ }
+ Some(_) => None,
+ };
let buffer = zx::Vmo::create(DEFAULT_BUFFER_LEN as u64)?;
let buffers = vec![buffer.duplicate_handle(zx::Rights::SAME_RIGHTS)?];
@@ -167,7 +190,7 @@
audio_consumer.create_stream_sink(
&mut buffers.into_iter(),
&mut audio_stream_type,
- Some(&mut compression),
+ compression.as_mut(),
stream_sink_server,
)?;
@@ -178,7 +201,7 @@
let mut player = Player {
buffer,
buffer_len: DEFAULT_BUFFER_LEN,
- codec,
+ codec_extra,
stream_sink,
audio_consumer,
watch_status_stream,
@@ -186,6 +209,8 @@
playing: false,
next_packet_flags: 0,
last_seq_played: 0,
+ decoder,
+ decoded_stream,
};
// wait for initial event
@@ -218,7 +243,7 @@
/// Accepts a payload which may contain multiple frames and breaks it into
/// frames and sends it to media.
- pub fn push_payload(&mut self, payload: &[u8]) -> Result<(), Error> {
+ pub async fn push_payload(&mut self, payload: &[u8]) -> Result<(), Error> {
let rtp = RtpHeader::new(payload)?;
let seq = rtp.sequence_number();
@@ -232,38 +257,55 @@
let mut offset = RtpHeader::LENGTH;
- if self.codec == AUDIO_ENCODING_SBC {
+ if let CodecExtra::Sbc(_) = self.codec_extra {
// TODO(40918) Handle SBC packet header
offset += 1;
}
while offset < payload.len() {
- if self.codec == AUDIO_ENCODING_SBC {
- let len = Player::find_sbc_frame_len(&payload[offset..]).or_else(|e| {
- self.next_packet_flags |= STREAM_PACKET_FLAG_DISCONTINUITY;
- Err(e)
- })?;
- if offset + len > payload.len() {
- self.next_packet_flags |= STREAM_PACKET_FLAG_DISCONTINUITY;
- return Err(format_err!("Ran out of buffer for SBC frame"));
+ match self.codec_extra {
+ CodecExtra::Sbc(_) => {
+ let len = Player::find_sbc_frame_len(&payload[offset..]).or_else(|e| {
+ self.next_packet_flags |= STREAM_PACKET_FLAG_DISCONTINUITY;
+ Err(e)
+ })?;
+ if offset + len > payload.len() {
+ self.next_packet_flags |= STREAM_PACKET_FLAG_DISCONTINUITY;
+ return Err(format_err!("Ran out of buffer for SBC frame"));
+ }
+
+ match &mut self.decoder {
+ Some(decoder) => {
+ decoder.write(&payload[offset..offset + len]).await?;
+ }
+ None => {
+ self.send_frame(&payload[offset..offset + len])?;
+ }
+ }
+
+ offset += len;
}
- self.send_frame(&payload[offset..offset + len])?;
- offset += len;
- } else if self.codec == AUDIO_ENCODING_AACLATM {
- self.send_frame(&payload[offset..])?;
- offset = payload.len();
- } else {
- return Err(format_err!("Unrecognized codec!"));
+ CodecExtra::Aac(_) => {
+ self.send_frame(&payload[offset..])?;
+ offset = payload.len();
+ }
+ _ => return Err(format_err!("Unrecognized codec!")),
}
}
+
+ if let Some(decoder) = &mut self.decoder {
+ decoder.flush()?;
+ }
+
Ok(())
}
/// Push an encoded media frame into the buffer and signal that it's there to media.
- pub fn send_frame(&mut self, frame: &[u8]) -> Result<(), Error> {
+ fn send_frame(&mut self, frame: &[u8]) -> Result<(), Error> {
let mut full_frame_len = frame.len();
- if self.codec == AUDIO_ENCODING_AACLATM {
+ // add room for LOAS header
+ if let CodecExtra::Aac(_) = self.codec_extra {
full_frame_len += 3;
}
@@ -277,7 +319,7 @@
let start_offset = self.current_offset;
- if self.codec == AUDIO_ENCODING_AACLATM {
+ if let CodecExtra::Aac(_) = self.codec_extra {
// Prepend LOAS sync word and mux length so mediaplayer decoder can handle it
let loas_syncword = [
0x56_u8,
@@ -325,12 +367,26 @@
/// If PlayerEvent::Closed is returned, that indicates the underlying
/// service went away and the player should be closed/rebuilt
- pub fn next_event(&mut self) -> impl Future<Output = PlayerEvent> + '_ {
- self.watch_status_stream.next().map(|event| match event {
- None => PlayerEvent::Closed,
- Some(Err(_)) => PlayerEvent::Closed,
- Some(Ok(s)) => PlayerEvent::Status(s),
- })
+ ///
+ /// This function should be always be polled when running
+ pub async fn next_event(&mut self) -> PlayerEvent {
+ loop {
+ select! {
+ event = self.watch_status_stream.next().fuse() => {
+ return match event {
+ None => PlayerEvent::Closed,
+ Some(Err(_)) => PlayerEvent::Closed,
+ Some(Ok(s)) => PlayerEvent::Status(s),
+ }
+ },
+ frame = self.decoded_stream.next().fuse() => {
+ match frame {
+ Some(Ok(frame)) => self.send_frame(&frame).unwrap_or_else(|e| fx_log_info!("failed to send frame")),
+ _ => fx_log_info!("error decoding"),
+ }
+ }
+ }
+ }
}
}
@@ -343,6 +399,8 @@
#[cfg(test)]
mod tests {
use super::*;
+ use crate::codec::CodecExtra;
+ use futures::future::{self, Either};
use matches::assert_matches;
use {
@@ -352,7 +410,7 @@
StreamSinkRequest, StreamSinkRequestStream,
},
fuchsia_async as fasync,
- std::task::Poll,
+ futures::{pin_mut, task::Poll},
};
#[test]
@@ -390,9 +448,8 @@
/// the VMO payload buffer that was provided to the AudioConsumer.
fn setup_player(
exec: &mut fasync::Executor,
- codec: &str,
+ codec_extra: CodecExtra,
) -> (Player, StreamSinkRequestStream, AudioConsumerRequestStream, zx::Vmo) {
- const TEST_SAMPLE_FREQ: u32 = 48000;
const TEST_SESSION_ID: u64 = 1;
let (audio_consumer_factory_proxy, mut audio_consumer_factory_request_stream) =
@@ -401,8 +458,7 @@
let mut player_new_fut = Box::pin(Player::from_proxy(
TEST_SESSION_ID,
- codec.to_string(),
- TEST_SAMPLE_FREQ,
+ codec_extra,
audio_consumer_factory_proxy,
));
@@ -484,7 +540,7 @@
fn test_player_setup() {
let mut exec = fasync::Executor::new().expect("executor should build");
- setup_player(&mut exec, "test");
+ setup_player(&mut exec, CodecExtra::Unknown);
}
#[test]
@@ -496,7 +552,8 @@
fn test_send_frame() {
let mut exec = fasync::Executor::new().expect("executor should build");
- let (mut player, mut sink_request_stream, _, sink_vmo) = setup_player(&mut exec, "test");
+ let (mut player, mut sink_request_stream, _, sink_vmo) =
+ setup_player(&mut exec, CodecExtra::Unknown);
let payload = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
@@ -530,17 +587,36 @@
player: &mut Player,
sink_request_stream: &mut StreamSinkRequestStream,
) -> u32 {
- player.push_payload(payload).expect("send happens okay");
+ {
+ let push_fut = player.push_payload(payload);
+ pin_mut!(push_fut);
+ exec.run_singlethreaded(&mut push_fut).expect("wrote payload");
+ }
- let complete = exec.run_until_stalled(&mut sink_request_stream.select_next_some());
- let sink_req = match complete {
- Poll::Ready(Ok(req)) => req,
- x => panic!("expected player req message but got {:?}", x),
- };
+ if let Some(_) = player.decoder {
+ // if decoder enabled, drive event stream future till packet is sent to sink.
+ let event_fut = player.next_event();
+ pin_mut!(event_fut);
- match sink_req {
- StreamSinkRequest::SendPacketNoReply { packet, .. } => packet.flags,
- _ => panic!("should have received a packet"),
+ let either = exec.run_singlethreaded(&mut future::select(
+ event_fut,
+ sink_request_stream.select_next_some(),
+ ));
+
+ match either {
+ Either::Right((Ok(StreamSinkRequest::SendPacketNoReply { packet, .. }), _)) => {
+ packet.flags
+ }
+ _ => panic!("should have received a packet"),
+ }
+ } else {
+ let sink_req = exec
+ .run_singlethreaded(&mut sink_request_stream.select_next_some())
+ .expect("sent packet");
+ match sink_req {
+ StreamSinkRequest::SendPacketNoReply { packet, .. } => packet.flags,
+ _ => panic!("should have received a packet"),
+ }
}
}
@@ -552,7 +628,7 @@
let mut exec = fasync::Executor::new().expect("executor should build");
let (mut player, mut sink_request_stream, mut player_request_stream, _) =
- setup_player(&mut exec, AUDIO_ENCODING_AACLATM);
+ setup_player(&mut exec, CodecExtra::Aac([0; 6]));
player.play().expect("player plays");
@@ -584,6 +660,37 @@
}
#[test]
+ /// Test that bytes flow through to decoder when SBC is active
+ fn test_sbc_decoder_write() {
+ let mut exec = fasync::Executor::new().expect("executor should build");
+
+ let (mut player, mut sink_request_stream, mut player_request_stream, _) =
+ setup_player(&mut exec, CodecExtra::Sbc([0x82, 0x00, 0x00, 0x00]));
+
+ player.play().expect("player plays");
+
+ let complete = exec.run_until_stalled(&mut player_request_stream.select_next_some());
+ let player_req = match complete {
+ Poll::Ready(Ok(req)) => req,
+ x => panic!("expected player req message but got {:?}", x),
+ };
+
+ assert_matches!(player_req, AudioConsumerRequest::Start { .. });
+
+ // raw rtp header with sequence number of 1 followed by 1 sbc frame
+ let raw = [
+ 128, 96, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 5, 0x9c, 0xb1, 0x20, 0x3b, 0x80, 0x00, 0x00,
+ 0x11, 0x7f, 0xfa, 0xab, 0xef, 0x7f, 0xfa, 0xab, 0xef, 0x80, 0x4a, 0xab, 0xaf, 0x80,
+ 0xf2, 0xab, 0xcf, 0x83, 0x8a, 0xac, 0x32, 0x8a, 0x78, 0x8a, 0x53, 0x90, 0xdc, 0xad,
+ 0x49, 0x96, 0xba, 0xaa, 0xe6, 0x9c, 0xa2, 0xab, 0xac, 0xa2, 0x72, 0xa9, 0x2d, 0xa8,
+ 0x9a, 0xab, 0x75, 0xae, 0x82, 0xad, 0x49, 0xb4, 0x6a, 0xad, 0xb1, 0xba, 0x52, 0xa9,
+ 0xa8, 0xc0, 0x32, 0xad, 0x11, 0xc6, 0x5a, 0xab, 0x3a,
+ ];
+
+ push_payload_get_flags(&raw, &mut exec, &mut player, &mut sink_request_stream);
+ }
+
+ #[test]
#[should_panic(expected = "out of bounds")]
fn test_as_u32_le_len() {
let _ = Player::as_u32_le(&[0, 1, 2]);