[bluetooth][a2dp-sink] Use platform decoder for SBC audio

Use StreamProcessor to decode SBC audio in process instead of requiring
AudioConsumer to support audio/sbc decoding.

Test: fx run-test bt-a2dp-sink-tests
Change-Id: I5dde8f432971981ecc4f210bf37d80d90402d6cc
diff --git a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/BUILD.gn b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/BUILD.gn
index 177626e..83f152a 100644
--- a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/BUILD.gn
+++ b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/BUILD.gn
@@ -35,6 +35,7 @@
     "//src/connectivity/bluetooth/lib/bt-a2dp",
     "//src/connectivity/bluetooth/lib/bt-avdtp",
     "//src/connectivity/bluetooth/lib/bt-avdtp",
+    "//src/connectivity/bluetooth/lib/fuchsia-audio-codec",
     "//src/connectivity/bluetooth/lib/fuchsia-bluetooth",
     "//src/lib/argh",
     "//src/lib/cobalt/rust:fuchsia-cobalt",
diff --git a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink-unittests.cmx b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink-unittests.cmx
index 4a21df5..9b9facc 100644
--- a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink-unittests.cmx
+++ b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink-unittests.cmx
@@ -1,5 +1,20 @@
 {
+    "facets": {
+        "fuchsia.test": {
+            "injected-services": {
+                "fuchsia.mediacodec.CodecFactory": "fuchsia-pkg://fuchsia.com/codec_factory#meta/codec_factory.cmx",
+                "fuchsia.sysmem.Allocator": "fuchsia-pkg://fuchsia.com/sysmem_connector#meta/sysmem_connector.cmx"
+            }
+        }
+    },
     "program": {
         "binary": "test/bt-a2dp-sink-unittests"
+    },
+    "sandbox": {
+        "services": [
+            "fuchsia.mediacodec.CodecFactory",
+            "fuchsia.sysmem.Allocator",
+            "fuchsia.logger.LogSink"
+        ]
     }
 }
diff --git a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink.cmx b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink.cmx
index 8502631..f2bd3013 100644
--- a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink.cmx
+++ b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/meta/bt-a2dp-sink.cmx
@@ -9,6 +9,8 @@
             "fuchsia.bluetooth.bredr.Profile",
             "fuchsia.cobalt.LoggerFactory",
             "fuchsia.logger.LogSink",
+            "fuchsia.mediacodec.CodecFactory",
+            "fuchsia.sysmem.Allocator",
             "fuchsia.media.SessionAudioConsumerFactory",
             "fuchsia.media.sessions2.Publisher"
         ]
diff --git a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/codec.rs b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/codec.rs
new file mode 100644
index 0000000..6b80f8d
--- /dev/null
+++ b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/codec.rs
@@ -0,0 +1,122 @@
+use bt_a2dp::media_types::*;
+use bt_avdtp as avdtp;
+use fidl_fuchsia_media::{AUDIO_ENCODING_AACLATM, AUDIO_ENCODING_SBC};
+use fuchsia_syslog::fx_log_warn;
+use std::convert::{TryFrom, TryInto};
+
+const SBC_CODEC_EXTRA_LEN: usize = 4;
+const AAC_CODEC_EXTRA_LEN: usize = 6;
+
+#[derive(Clone, Debug)]
+pub enum CodecExtra {
+    Sbc([u8; SBC_CODEC_EXTRA_LEN]),
+    Aac([u8; AAC_CODEC_EXTRA_LEN]),
+    Unknown,
+}
+
+pub enum CodecExtraError {
+    UnknownCodec,
+    InvalidLength,
+}
+
+impl CodecExtra {
+    /// Extract sampling freqency from SBC codec extra data field
+    /// (A2DP Sec. 4.3.2)
+    fn parse_sbc_sampling_frequency(codec_extra: &[u8]) -> Option<u32> {
+        if codec_extra.len() != SBC_CODEC_EXTRA_LEN {
+            fx_log_warn!("Invalid SBC codec extra length: {:?}", codec_extra.len());
+            return None;
+        }
+
+        let mut codec_info_bytes = [0_u8; SBC_CODEC_EXTRA_LEN];
+        codec_info_bytes.copy_from_slice(&codec_extra);
+
+        let codec_info = SbcCodecInfo(u32::from_be_bytes(codec_info_bytes));
+        let sample_freq = SbcSamplingFrequency::from_bits_truncate(codec_info.sampling_frequency());
+
+        match sample_freq {
+            SbcSamplingFrequency::FREQ48000HZ => Some(48000),
+            SbcSamplingFrequency::FREQ44100HZ => Some(44100),
+            _ => {
+                fx_log_warn!("Invalid sample_freq set in configuration {:?}", sample_freq);
+                None
+            }
+        }
+    }
+
+    /// Extract sampling frequency from AAC codec extra data field
+    /// (A2DP Sec. 4.5.2)
+    fn parse_aac_sampling_frequency(codec_extra: &[u8]) -> Option<u32> {
+        if codec_extra.len() != AAC_CODEC_EXTRA_LEN {
+            fx_log_warn!("Invalid AAC codec extra length: {:?}", codec_extra.len());
+            return None;
+        }
+
+        // AACMediaCodecInfo is represented as 8 bytes, with lower 6 bytes containing
+        // the codec extra data.
+        let mut codec_info_bytes = [0_u8; 8];
+        let codec_info_slice = &mut codec_info_bytes[2..8];
+
+        codec_info_slice.copy_from_slice(&codec_extra);
+
+        let codec_info = AACMediaCodecInfo(u64::from_be_bytes(codec_info_bytes));
+        let sample_freq = AACSamplingFrequency::from_bits_truncate(codec_info.sampling_frequency());
+
+        match sample_freq {
+            AACSamplingFrequency::FREQ48000HZ => Some(48000),
+            AACSamplingFrequency::FREQ44100HZ => Some(44100),
+            _ => {
+                fx_log_warn!("Invalid sample_freq set in configuration {:?}", sample_freq);
+                None
+            }
+        }
+    }
+
+    /// Parse the sampling frequency for this codec extra or return None
+    /// if none is configured.
+    pub fn sample_freq(&self) -> Option<u32> {
+        match self {
+            CodecExtra::Sbc(codec_extra) => Self::parse_sbc_sampling_frequency(codec_extra),
+            CodecExtra::Aac(codec_extra) => Self::parse_aac_sampling_frequency(codec_extra),
+            _ => None,
+        }
+    }
+
+    pub fn stream_type(&self) -> &str {
+        match self {
+            Self::Sbc(_) => AUDIO_ENCODING_SBC,
+            Self::Aac(_) => AUDIO_ENCODING_AACLATM,
+            Self::Unknown => "Unknown",
+        }
+    }
+}
+
+impl TryFrom<&avdtp::ServiceCapability> for CodecExtra {
+    type Error = CodecExtraError;
+
+    fn try_from(value: &avdtp::ServiceCapability) -> Result<Self, Self::Error> {
+        match value {
+            avdtp::ServiceCapability::MediaCodec {
+                media_type: avdtp::MediaType::Audio,
+                codec_type: avdtp::MediaCodecType::AUDIO_SBC,
+                codec_extra,
+            } => {
+                if codec_extra.len() != SBC_CODEC_EXTRA_LEN {
+                    return Err(CodecExtraError::InvalidLength);
+                }
+                Ok(CodecExtra::Sbc(codec_extra[0..SBC_CODEC_EXTRA_LEN].try_into().unwrap()))
+            }
+            avdtp::ServiceCapability::MediaCodec {
+                media_type: avdtp::MediaType::Audio,
+                codec_type: avdtp::MediaCodecType::AUDIO_AAC,
+                codec_extra,
+            } => {
+                if codec_extra.len() != AAC_CODEC_EXTRA_LEN {
+                    return Err(CodecExtraError::InvalidLength);
+                }
+                Ok(CodecExtra::Aac(codec_extra[0..AAC_CODEC_EXTRA_LEN].try_into().unwrap()))
+            }
+            _ => Err(CodecExtraError::UnknownCodec),
+        }
+    }
+}
diff --git a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/main.rs b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/main.rs
index 9b9a7b8..2714eaa 100644
--- a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/main.rs
+++ b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/main.rs
@@ -25,12 +25,14 @@
         select, FutureExt, StreamExt,
     },
     parking_lot::Mutex,
-    std::{collections::hash_map, collections::HashMap, sync::Arc},
+    std::{collections::hash_map, collections::HashMap, convert::TryFrom, sync::Arc},
 };
 
+use crate::codec::CodecExtra;
 use crate::inspect_types::StreamingInspectData;
 
 mod avrcp_relay;
+mod codec;
 mod connected_peers;
 mod hanging_get;
 mod inspect_types;
@@ -86,7 +88,7 @@
 // Arbitrarily chosen ID for the AAC stream endpoint.
 const AAC_SEID: u8 = 7;
 
-const DEFAULT_SAMPLE_RATE: u32 = 48000;
+pub const DEFAULT_SAMPLE_RATE: u32 = 48000;
 const DEFAULT_SESSION_ID: u64 = 0;
 
 /// Controls a stream endpoint and the media decoding task which is associated with it.
@@ -108,85 +110,13 @@
         Stream { endpoint, encoding, suspend_sender: None }
     }
 
-    /// Extract sampling freqency from SBC codec extra data field
-    /// (A2DP Sec. 4.3.2)
-    fn parse_sbc_sampling_frequency(codec_extra: &[u8]) -> u32 {
-        if codec_extra.len() != 4 {
-            fx_log_warn!("Invalid SBC codec extra length: {:?}", codec_extra.len());
-            return DEFAULT_SAMPLE_RATE;
-        }
-
-        let mut codec_info_bytes = [0_u8; 4];
-        codec_info_bytes.copy_from_slice(&codec_extra);
-
-        let codec_info = SbcCodecInfo(u32::from_be_bytes(codec_info_bytes));
-        let sample_freq = SbcSamplingFrequency::from_bits_truncate(codec_info.sampling_frequency());
-
-        match sample_freq {
-            SbcSamplingFrequency::FREQ48000HZ => 48000,
-            SbcSamplingFrequency::FREQ44100HZ => 44100,
-            _ => {
-                fx_log_warn!("Invalid sample_freq set in configuration {:?}", sample_freq);
-                DEFAULT_SAMPLE_RATE
-            }
-        }
-    }
-
-    /// Extract sampling frequency from AAC codec extra data field
-    /// (A2DP Sec. 4.5.2)
-    fn parse_aac_sampling_frequency(codec_extra: &[u8]) -> u32 {
-        if codec_extra.len() != 6 {
-            fx_log_warn!("Invalid AAC codec extra length: {:?}", codec_extra.len());
-            return DEFAULT_SAMPLE_RATE;
-        }
-
-        // AACMediaCodecInfo is represented as 8 bytes, with lower 6 bytes containing
-        // the codec extra data.
-        let mut codec_info_bytes = [0_u8; 8];
-        let codec_info_slice = &mut codec_info_bytes[2..8];
-
-        codec_info_slice.copy_from_slice(&codec_extra);
-
-        let codec_info = AACMediaCodecInfo(u64::from_be_bytes(codec_info_bytes));
-        let sample_freq = AACSamplingFrequency::from_bits_truncate(codec_info.sampling_frequency());
-
-        match sample_freq {
-            AACSamplingFrequency::FREQ48000HZ => 48000,
-            AACSamplingFrequency::FREQ44100HZ => 44100,
-            _ => {
-                fx_log_warn!("Invalid sample_freq set in configuration {:?}", sample_freq);
-                DEFAULT_SAMPLE_RATE
-            }
-        }
-    }
-
-    /// Get the currently configured sampling frequency for this stream or return a default value
-    /// if none is configured.
-    ///
-    /// TODO: This should be removed once we have a structured way of accessing stream
-    /// capabilities.
-    fn sample_freq(&self) -> u32 {
+    /// Get the currently configured extra codec data
+    fn configured_codec_extra(&self) -> Result<CodecExtra, avdtp::Error> {
         self.endpoint
-            .get_configuration()
-            .map(|caps| {
-                for c in caps {
-                    match c {
-                        avdtp::ServiceCapability::MediaCodec {
-                            media_type: avdtp::MediaType::Audio,
-                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
-                            codec_extra,
-                        } => return Self::parse_sbc_sampling_frequency(&codec_extra),
-                        avdtp::ServiceCapability::MediaCodec {
-                            media_type: avdtp::MediaType::Audio,
-                            codec_type: avdtp::MediaCodecType::AUDIO_AAC,
-                            codec_extra,
-                        } => return Self::parse_aac_sampling_frequency(&codec_extra),
-                        _ => continue,
-                    };
-                }
-                DEFAULT_SAMPLE_RATE
-            })
-            .unwrap_or(DEFAULT_SAMPLE_RATE)
+            .get_configuration()?
+            .iter()
+            .find_map(|cap| CodecExtra::try_from(cap).ok())
+            .ok_or(avdtp::Error::InvalidState)
     }
 
     /// Attempt to start the media decoding task.
@@ -203,10 +133,11 @@
         let (send, receive) = mpsc::channel(1);
         self.suspend_sender = Some(send);
 
+        let codec_extra = self.configured_codec_extra()?;
+
         fuchsia_async::spawn_local(decode_media_stream(
             self.endpoint.take_transport()?,
-            self.encoding.clone(),
-            self.sample_freq(),
+            codec_extra,
             // TODO(42976) get real media session id
             DEFAULT_SESSION_ID,
             receive,
@@ -260,13 +191,7 @@
         let mut s = Streams::new();
         // TODO(BT-533): detect codecs, add streams for each codec
         // SBC is required
-        if let Err(e) = player::Player::new(
-            DEFAULT_SESSION_ID,
-            AUDIO_ENCODING_SBC.to_string(),
-            DEFAULT_SAMPLE_RATE,
-        )
-        .await
-        {
+        if let Err(e) = player::Player::new(DEFAULT_SESSION_ID, CodecExtra::Sbc([0; 4])).await {
             fx_log_warn!("Can't play required SBC audio: {}", e);
             return Err(e);
         }
@@ -377,14 +302,13 @@
 /// Ends when signaled from `end_signal`, or when the media transport stream is closed.
 async fn decode_media_stream(
     mut stream: avdtp::MediaStream,
-    encoding: String,
-    sample_rate: u32,
+    codec_extra: CodecExtra,
     session_id: u64,
     mut end_signal: Receiver<()>,
     mut inspect: StreamingInspectData,
     cobalt_sender: CobaltSender,
 ) -> () {
-    let mut player = match player::Player::new(session_id, encoding.clone(), sample_rate).await {
+    let mut player = match player::Player::new(session_id, codec_extra.clone()).await {
         Ok(v) => v,
         Err(e) => {
             fx_log_info!("Can't setup player for Media: {:?}", e);
@@ -408,9 +332,11 @@
                     }
                     Some(Ok(packet)) => packet,
                 };
-                if let Err(e) = player.push_payload(&pkt.as_slice())  {
+
+                if let Err(e) = player.push_payload(&pkt.as_slice()).await {
                     fx_log_info!("can't push packet: {:?}", e);
                 }
+
                 if !player.playing() {
                     player.play().unwrap_or_else(|e| fx_log_info!("Problem playing: {:}", e));
                 }
@@ -421,7 +347,7 @@
                     player::PlayerEvent::Closed => {
                         fx_log_info!("Rebuilding Player");
                         // The player died somehow? Attempt to rebuild the player.
-                        player = match player::Player::new(session_id, encoding.clone(), sample_rate).await {
+                        player = match player::Player::new(session_id, codec_extra.clone()).await {
                             Ok(v) => v,
                             Err(e) => {
                                 fx_log_info!("Can't rebuild player: {:?}", e);
@@ -431,7 +357,7 @@
                     },
                     player::PlayerEvent::Status(s) => {
                         fx_log_info!("PlayerEvent Status happened: {:?}", s);
-                    }
+                    },
                 }
             },
             _ = inspect.update_interval.next() => {
@@ -445,13 +371,17 @@
     }
     let end_time = zx::Time::get(zx::ClockId::Monotonic);
 
-    report_stream_metrics(cobalt_sender, &encoding, (end_time - start_time).into_seconds());
+    report_stream_metrics(cobalt_sender, &codec_extra, (end_time - start_time).into_seconds());
 }
 
-fn report_stream_metrics(mut cobalt_sender: CobaltSender, encoding: &str, duration_seconds: i64) {
-    let codec = match encoding.as_ref() {
-        AUDIO_ENCODING_SBC => metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Sbc,
-        AUDIO_ENCODING_AACLATM => metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Aac,
+fn report_stream_metrics(
+    mut cobalt_sender: CobaltSender,
+    codec_extra: &CodecExtra,
+    duration_seconds: i64,
+) {
+    let codec = match codec_extra {
+        CodecExtra::Sbc(_) => metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Sbc,
+        CodecExtra::Aac(_) => metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Aac,
         _ => metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Unknown,
     };
 
@@ -581,6 +511,7 @@
     fn test_streams() {
         let mut streams = Streams::new();
         const LOCAL_ID: u8 = 1;
+        const TEST_SAMPLE_FREQ: u32 = 44100;
 
         // An endpoint for testing
         let s = avdtp::StreamEndpoint::new(
@@ -601,7 +532,6 @@
         let id = s.local_id().clone();
         let information = s.information();
         let encoding = AUDIO_ENCODING_SBC.to_string();
-        let sample_freq = 44100;
 
         assert_matches!(streams.get_endpoint(&id), None);
 
@@ -632,12 +562,13 @@
             )
             .expect("Failed to configure endpoint");
 
-        assert_eq!(sample_freq, streams.get_mut(&id).unwrap().sample_freq());
+        let stream = streams.get_mut(&id).expect("stream");
+        let codec_extra = stream.configured_codec_extra().expect("codec extra");
+        assert_matches!(codec_extra, CodecExtra::Sbc([41, 245, 2, 53]));
+        assert_matches!(codec_extra.sample_freq(), Some(TEST_SAMPLE_FREQ));
 
-        let res = streams.get_mut(&id);
-
-        assert_matches!(res.as_ref().unwrap().suspend_sender, None);
-        assert_eq!(encoding, res.as_ref().unwrap().encoding);
+        assert_matches!(stream.suspend_sender, None);
+        assert_eq!(encoding, stream.encoding);
     }
 
     #[test]
@@ -645,6 +576,7 @@
     fn test_aac_stream() {
         let mut streams = Streams::new();
         const LOCAL_ID: u8 = 1;
+        const TEST_SAMPLE_FREQ: u32 = 44100;
 
         // An endpoint for testing
         let s = avdtp::StreamEndpoint::new(
@@ -665,7 +597,6 @@
         let id = s.local_id().clone();
         let information = s.information();
         let encoding = AUDIO_ENCODING_AACLATM.to_string();
-        let sample_freq = 44100;
 
         assert_matches!(streams.get_endpoint(&id), None);
 
@@ -696,12 +627,13 @@
             )
             .expect("Failed to configure endpoint");
 
-        assert_eq!(sample_freq, streams.get_mut(&id).unwrap().sample_freq());
+        let stream = streams.get_mut(&id).expect("stream");
+        let codec_extra = stream.configured_codec_extra().expect("codec extra");
+        assert_matches!(codec_extra, CodecExtra::Aac([128, 1, 4, 4, 226, 0]));
+        assert_matches!(codec_extra.sample_freq(), Some(TEST_SAMPLE_FREQ));
 
-        let res = streams.get_mut(&id);
-
-        assert_matches!(res.as_ref().unwrap().suspend_sender, None);
-        assert_eq!(encoding, res.as_ref().unwrap().encoding);
+        assert_matches!(stream.suspend_sender, None);
+        assert_eq!(encoding, stream.encoding);
     }
 
     #[test]
@@ -726,7 +658,7 @@
         let (send, mut recv) = fake_cobalt_sender();
         const TEST_DURATION: i64 = 1;
 
-        report_stream_metrics(send, AUDIO_ENCODING_AACLATM, TEST_DURATION);
+        report_stream_metrics(send, &CodecExtra::Aac([0; 6]), TEST_DURATION);
 
         let event = recv.try_next().expect("no stream error").expect("event present");
 
diff --git a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/player.rs b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/player.rs
index a609f8b..8cbca07 100644
--- a/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/player.rs
+++ b/src/connectivity/bluetooth/profiles/bt-a2dp-sink/src/player.rs
@@ -9,23 +9,30 @@
     fidl_fuchsia_media::{
         AudioConsumerProxy, AudioConsumerStartFlags, AudioConsumerStatus, AudioSampleFormat,
         AudioStreamType, Compression, SessionAudioConsumerFactoryMarker,
-        SessionAudioConsumerFactoryProxy, StreamPacket, StreamSinkProxy, AUDIO_ENCODING_AACLATM,
-        AUDIO_ENCODING_SBC, NO_TIMESTAMP, STREAM_PACKET_FLAG_DISCONTINUITY,
+        SessionAudioConsumerFactoryProxy, StreamPacket, StreamSinkProxy, NO_TIMESTAMP,
+        STREAM_PACKET_FLAG_DISCONTINUITY,
     },
+    fuchsia_audio_codec::StreamProcessor,
+    fuchsia_syslog::fx_log_info,
     fuchsia_zircon::{self as zx, HandleBased},
-    futures::{Future, FutureExt, StreamExt},
+    futures::{io::AsyncWriteExt, select, stream::pending, FutureExt, Stream, StreamExt},
 };
 
+use crate::codec::CodecExtra;
 use crate::hanging_get::HangingGetStream;
+use crate::DEFAULT_SAMPLE_RATE;
 
 const DEFAULT_BUFFER_LEN: usize = 65536;
 
+pub type DecodedStreamItem = Result<Vec<u8>, Error>;
+pub type DecodedStream = Box<dyn Stream<Item = DecodedStreamItem> + Unpin>;
+
 /// Players are configured and accept media frames, which are sent to the
 /// media subsystem.
 pub struct Player {
     buffer: zx::Vmo,
     buffer_len: usize,
-    codec: String,
+    codec_extra: CodecExtra,
     current_offset: usize,
     stream_sink: StreamSinkProxy,
     audio_consumer: AudioConsumerProxy,
@@ -33,6 +40,8 @@
     playing: bool,
     next_packet_flags: u32,
     last_seq_played: u16,
+    decoder: Option<StreamProcessor>,
+    decoded_stream: DecodedStream,
 }
 
 pub enum PlayerEvent {
@@ -132,21 +141,30 @@
 impl Player {
     /// Attempt to make a new player that decodes and plays frames encoded in the
     /// `codec`
-    pub async fn new(session_id: u64, codec: String, sample_freq: u32) -> Result<Player, Error> {
+    pub async fn new(session_id: u64, codec_extra: CodecExtra) -> Result<Player, Error> {
         let audio_consumer_factory =
             fuchsia_component::client::connect_to_service::<SessionAudioConsumerFactoryMarker>()
                 .context("Failed to connect to audio consumer factory")?;
-        Self::from_proxy(session_id, codec, sample_freq, audio_consumer_factory).await
+        Self::from_proxy(session_id, codec_extra, audio_consumer_factory).await
     }
 
     /// Build a AudioConsumer given a SessionAudioConsumerFactoryProxy.
     /// Used in tests.
     async fn from_proxy(
         session_id: u64,
-        codec: String,
-        sample_freq: u32,
+        codec_extra: CodecExtra,
         audio_consumer_factory: SessionAudioConsumerFactoryProxy,
     ) -> Result<Player, Error> {
+        let (decoder, decoded_stream) = match &codec_extra {
+            CodecExtra::Sbc(codec_extra_data) => {
+                let mut decoder =
+                    StreamProcessor::create_decoder("audio/sbc", Some(codec_extra_data.to_vec()))?;
+                let decoded_stream = Box::new(decoder.take_output_stream()?);
+                (Some(decoder), decoded_stream as DecodedStream)
+            }
+            _ => (None, Box::new(pending::<DecodedStreamItem>()) as DecodedStream),
+        };
+
         let (audio_consumer, audio_consumer_server) = fidl::endpoints::create_proxy()?;
 
         audio_consumer_factory.create_audio_consumer(session_id, audio_consumer_server)?;
@@ -156,10 +174,15 @@
         let mut audio_stream_type = AudioStreamType {
             sample_format: AudioSampleFormat::Signed16,
             channels: 2, // Stereo
-            frames_per_second: sample_freq,
+            frames_per_second: codec_extra.sample_freq().unwrap_or(DEFAULT_SAMPLE_RATE),
         };
 
-        let mut compression = Compression { type_: codec.clone(), parameters: None };
+        let mut compression = match decoder {
+            None => {
+                Some(Compression { type_: codec_extra.stream_type().to_string(), parameters: None })
+            }
+            Some(_) => None,
+        };
 
         let buffer = zx::Vmo::create(DEFAULT_BUFFER_LEN as u64)?;
         let buffers = vec![buffer.duplicate_handle(zx::Rights::SAME_RIGHTS)?];
@@ -167,7 +190,7 @@
         audio_consumer.create_stream_sink(
             &mut buffers.into_iter(),
             &mut audio_stream_type,
-            Some(&mut compression),
+            compression.as_mut(),
             stream_sink_server,
         )?;
 
@@ -178,7 +201,7 @@
         let mut player = Player {
             buffer,
             buffer_len: DEFAULT_BUFFER_LEN,
-            codec,
+            codec_extra,
             stream_sink,
             audio_consumer,
             watch_status_stream,
@@ -186,6 +209,8 @@
             playing: false,
             next_packet_flags: 0,
             last_seq_played: 0,
+            decoder,
+            decoded_stream,
         };
 
         // wait for initial event
@@ -218,7 +243,7 @@
 
     /// Accepts a payload which may contain multiple frames and breaks it into
     /// frames and sends it to media.
-    pub fn push_payload(&mut self, payload: &[u8]) -> Result<(), Error> {
+    pub async fn push_payload(&mut self, payload: &[u8]) -> Result<(), Error> {
         let rtp = RtpHeader::new(payload)?;
 
         let seq = rtp.sequence_number();
@@ -232,38 +257,55 @@
 
         let mut offset = RtpHeader::LENGTH;
 
-        if self.codec == AUDIO_ENCODING_SBC {
+        if let CodecExtra::Sbc(_) = self.codec_extra {
             // TODO(40918) Handle SBC packet header
             offset += 1;
         }
 
         while offset < payload.len() {
-            if self.codec == AUDIO_ENCODING_SBC {
-                let len = Player::find_sbc_frame_len(&payload[offset..]).or_else(|e| {
-                    self.next_packet_flags |= STREAM_PACKET_FLAG_DISCONTINUITY;
-                    Err(e)
-                })?;
-                if offset + len > payload.len() {
-                    self.next_packet_flags |= STREAM_PACKET_FLAG_DISCONTINUITY;
-                    return Err(format_err!("Ran out of buffer for SBC frame"));
+            match self.codec_extra {
+                CodecExtra::Sbc(_) => {
+                    let len = Player::find_sbc_frame_len(&payload[offset..]).or_else(|e| {
+                        self.next_packet_flags |= STREAM_PACKET_FLAG_DISCONTINUITY;
+                        Err(e)
+                    })?;
+                    if offset + len > payload.len() {
+                        self.next_packet_flags |= STREAM_PACKET_FLAG_DISCONTINUITY;
+                        return Err(format_err!("Ran out of buffer for SBC frame"));
+                    }
+
+                    match &mut self.decoder {
+                        Some(decoder) => {
+                            decoder.write(&payload[offset..offset + len]).await?;
+                        }
+                        None => {
+                            self.send_frame(&payload[offset..offset + len])?;
+                        }
+                    }
+
+                    offset += len;
                 }
-                self.send_frame(&payload[offset..offset + len])?;
-                offset += len;
-            } else if self.codec == AUDIO_ENCODING_AACLATM {
-                self.send_frame(&payload[offset..])?;
-                offset = payload.len();
-            } else {
-                return Err(format_err!("Unrecognized codec!"));
+                CodecExtra::Aac(_) => {
+                    self.send_frame(&payload[offset..])?;
+                    offset = payload.len();
+                }
+                _ => return Err(format_err!("Unrecognized codec!")),
             }
         }
+
+        if let Some(decoder) = &mut self.decoder {
+            decoder.flush()?;
+        }
+
         Ok(())
     }
 
     /// Push an encoded media frame into the buffer and signal that it's there to media.
-    pub fn send_frame(&mut self, frame: &[u8]) -> Result<(), Error> {
+    fn send_frame(&mut self, frame: &[u8]) -> Result<(), Error> {
         let mut full_frame_len = frame.len();
 
-        if self.codec == AUDIO_ENCODING_AACLATM {
+        // add room for LOAS header
+        if let CodecExtra::Aac(_) = self.codec_extra {
             full_frame_len += 3;
         }
 
@@ -277,7 +319,7 @@
 
         let start_offset = self.current_offset;
 
-        if self.codec == AUDIO_ENCODING_AACLATM {
+        if let CodecExtra::Aac(_) = self.codec_extra {
             // Prepend LOAS sync word and mux length so mediaplayer decoder can handle it
             let loas_syncword = [
                 0x56_u8,
@@ -325,12 +367,26 @@
 
     /// If PlayerEvent::Closed is returned, that indicates the underlying
     /// service went away and the player should be closed/rebuilt
-    pub fn next_event(&mut self) -> impl Future<Output = PlayerEvent> + '_ {
-        self.watch_status_stream.next().map(|event| match event {
-            None => PlayerEvent::Closed,
-            Some(Err(_)) => PlayerEvent::Closed,
-            Some(Ok(s)) => PlayerEvent::Status(s),
-        })
+    ///
+    /// This function should be always be polled when running
+    pub async fn next_event(&mut self) -> PlayerEvent {
+        loop {
+            select! {
+                event = self.watch_status_stream.next().fuse() => {
+                    return match event {
+                        None => PlayerEvent::Closed,
+                        Some(Err(_)) => PlayerEvent::Closed,
+                        Some(Ok(s)) => PlayerEvent::Status(s),
+                    }
+                },
+                frame = self.decoded_stream.next().fuse() => {
+                    match frame {
+                        Some(Ok(frame)) => self.send_frame(&frame).unwrap_or_else(|e| fx_log_info!("failed to send frame")),
+                        _ => fx_log_info!("error decoding"),
+                    }
+                }
+            }
+        }
     }
 }
 
@@ -343,6 +399,8 @@
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::codec::CodecExtra;
+    use futures::future::{self, Either};
     use matches::assert_matches;
 
     use {
@@ -352,7 +410,7 @@
             StreamSinkRequest, StreamSinkRequestStream,
         },
         fuchsia_async as fasync,
-        std::task::Poll,
+        futures::{pin_mut, task::Poll},
     };
 
     #[test]
@@ -390,9 +448,8 @@
     /// the VMO payload buffer that was provided to the AudioConsumer.
     fn setup_player(
         exec: &mut fasync::Executor,
-        codec: &str,
+        codec_extra: CodecExtra,
     ) -> (Player, StreamSinkRequestStream, AudioConsumerRequestStream, zx::Vmo) {
-        const TEST_SAMPLE_FREQ: u32 = 48000;
         const TEST_SESSION_ID: u64 = 1;
 
         let (audio_consumer_factory_proxy, mut audio_consumer_factory_request_stream) =
@@ -401,8 +458,7 @@
 
         let mut player_new_fut = Box::pin(Player::from_proxy(
             TEST_SESSION_ID,
-            codec.to_string(),
-            TEST_SAMPLE_FREQ,
+            codec_extra,
             audio_consumer_factory_proxy,
         ));
 
@@ -484,7 +540,7 @@
     fn test_player_setup() {
         let mut exec = fasync::Executor::new().expect("executor should build");
 
-        setup_player(&mut exec, "test");
+        setup_player(&mut exec, CodecExtra::Unknown);
     }
 
     #[test]
@@ -496,7 +552,8 @@
     fn test_send_frame() {
         let mut exec = fasync::Executor::new().expect("executor should build");
 
-        let (mut player, mut sink_request_stream, _, sink_vmo) = setup_player(&mut exec, "test");
+        let (mut player, mut sink_request_stream, _, sink_vmo) =
+            setup_player(&mut exec, CodecExtra::Unknown);
 
         let payload = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
 
@@ -530,17 +587,36 @@
         player: &mut Player,
         sink_request_stream: &mut StreamSinkRequestStream,
     ) -> u32 {
-        player.push_payload(payload).expect("send happens okay");
+        {
+            let push_fut = player.push_payload(payload);
+            pin_mut!(push_fut);
+            exec.run_singlethreaded(&mut push_fut).expect("wrote payload");
+        }
 
-        let complete = exec.run_until_stalled(&mut sink_request_stream.select_next_some());
-        let sink_req = match complete {
-            Poll::Ready(Ok(req)) => req,
-            x => panic!("expected player req message but got {:?}", x),
-        };
+        if let Some(_) = player.decoder {
+            // if decoder enabled, drive event stream future till packet is sent to sink.
+            let event_fut = player.next_event();
+            pin_mut!(event_fut);
 
-        match sink_req {
-            StreamSinkRequest::SendPacketNoReply { packet, .. } => packet.flags,
-            _ => panic!("should have received a packet"),
+            let either = exec.run_singlethreaded(&mut future::select(
+                event_fut,
+                sink_request_stream.select_next_some(),
+            ));
+
+            match either {
+                Either::Right((Ok(StreamSinkRequest::SendPacketNoReply { packet, .. }), _)) => {
+                    packet.flags
+                }
+                _ => panic!("should have received a packet"),
+            }
+        } else {
+            let sink_req = exec
+                .run_singlethreaded(&mut sink_request_stream.select_next_some())
+                .expect("sent packet");
+            match sink_req {
+                StreamSinkRequest::SendPacketNoReply { packet, .. } => packet.flags,
+                _ => panic!("should have received a packet"),
+            }
         }
     }
 
@@ -552,7 +628,7 @@
         let mut exec = fasync::Executor::new().expect("executor should build");
 
         let (mut player, mut sink_request_stream, mut player_request_stream, _) =
-            setup_player(&mut exec, AUDIO_ENCODING_AACLATM);
+            setup_player(&mut exec, CodecExtra::Aac([0; 6]));
 
         player.play().expect("player plays");
 
@@ -584,6 +660,37 @@
     }
 
     #[test]
+    /// Test that bytes flow through to decoder when SBC is active
+    fn test_sbc_decoder_write() {
+        let mut exec = fasync::Executor::new().expect("executor should build");
+
+        let (mut player, mut sink_request_stream, mut player_request_stream, _) =
+            setup_player(&mut exec, CodecExtra::Sbc([0x82, 0x00, 0x00, 0x00]));
+
+        player.play().expect("player plays");
+
+        let complete = exec.run_until_stalled(&mut player_request_stream.select_next_some());
+        let player_req = match complete {
+            Poll::Ready(Ok(req)) => req,
+            x => panic!("expected player req message but got {:?}", x),
+        };
+
+        assert_matches!(player_req, AudioConsumerRequest::Start { .. });
+
+        // raw rtp header with sequence number of 1 followed by 1 sbc frame
+        let raw = [
+            128, 96, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 5, 0x9c, 0xb1, 0x20, 0x3b, 0x80, 0x00, 0x00,
+            0x11, 0x7f, 0xfa, 0xab, 0xef, 0x7f, 0xfa, 0xab, 0xef, 0x80, 0x4a, 0xab, 0xaf, 0x80,
+            0xf2, 0xab, 0xcf, 0x83, 0x8a, 0xac, 0x32, 0x8a, 0x78, 0x8a, 0x53, 0x90, 0xdc, 0xad,
+            0x49, 0x96, 0xba, 0xaa, 0xe6, 0x9c, 0xa2, 0xab, 0xac, 0xa2, 0x72, 0xa9, 0x2d, 0xa8,
+            0x9a, 0xab, 0x75, 0xae, 0x82, 0xad, 0x49, 0xb4, 0x6a, 0xad, 0xb1, 0xba, 0x52, 0xa9,
+            0xa8, 0xc0, 0x32, 0xad, 0x11, 0xc6, 0x5a, 0xab, 0x3a,
+        ];
+
+        push_payload_get_flags(&raw, &mut exec, &mut player, &mut sink_request_stream);
+    }
+
+    #[test]
     #[should_panic(expected = "out of bounds")]
     fn test_as_u32_le_len() {
         let _ = Player::as_u32_le(&[0, 1, 2]);