blob: 9fee43ed6e97750998034a8997c6439ea5cc1b25 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::Error,
bt_a2dp::{codec::MediaCodecConfig, media_task::*},
bt_a2dp_metrics as metrics,
bt_avdtp::{self as avdtp, MediaStream},
fidl::endpoints::create_request_stream,
fidl_fuchsia_media_sessions2 as sessions2, fuchsia_async as fasync,
fuchsia_bluetooth::{inspect::DataStreamInspect, types::PeerId},
fuchsia_cobalt::CobaltSender,
fuchsia_trace as trace,
futures::{
channel::oneshot,
future::{BoxFuture, Future, Shared},
lock::Mutex,
select, FutureExt, StreamExt,
},
log::{info, trace},
std::sync::Arc,
thiserror::Error,
};
use crate::avrcp_relay::AvrcpRelay;
use crate::player;
#[derive(Clone)]
pub struct SinkTaskBuilder {
cobalt_sender: CobaltSender,
publisher: sessions2::PublisherProxy,
domain: String,
}
impl SinkTaskBuilder {
pub fn new(
cobalt_sender: CobaltSender,
publisher: sessions2::PublisherProxy,
domain: String,
) -> Self {
Self { cobalt_sender, publisher, domain }
}
}
impl MediaTaskBuilder for SinkTaskBuilder {
fn configure(
&self,
peer_id: &PeerId,
codec_config: &MediaCodecConfig,
data_stream_inspect: DataStreamInspect,
) -> BoxFuture<'static, Result<Box<dyn MediaTaskRunner>, MediaTaskError>> {
let s = self.clone();
let peer_id = peer_id.clone();
let codec_config = codec_config.clone();
Box::pin(async move {
let (player_client, player_requests) = create_request_stream()
.map_err(|e| MediaTaskError::Other(format!("FIDL error: {:?}", e)))?;
let registration = sessions2::PlayerRegistration {
domain: Some(s.domain),
..sessions2::PlayerRegistration::EMPTY
};
let session_id = s
.publisher
.publish(player_client, registration)
.await
.or(Err(MediaTaskError::ResourcesInUse))?;
info!("Session ID: {}", session_id);
// Ignoring AVRCP relay errors, they are logged.
let avrcp_task = AvrcpRelay::start(peer_id.clone(), player_requests).ok();
Ok::<Box<dyn MediaTaskRunner>, _>(Box::new(ConfiguredSinkTask::new(
codec_config,
s.cobalt_sender,
data_stream_inspect,
session_id,
avrcp_task,
)))
})
}
}
struct ConfiguredSinkTask {
/// Configuration providing the format of encoded audio requested.
codec_config: MediaCodecConfig,
/// Used to send statistics about the length of playback to cobalt.
cobalt_sender: CobaltSender,
/// Data Stream inspect object for tracking total bytes / current transfer speed.
stream_inspect: Arc<Mutex<DataStreamInspect>>,
/// Session ID for Media
session_id: u64,
/// Session Task (AVRCP relay)
_session_task: Option<fasync::Task<()>>,
}
impl ConfiguredSinkTask {
fn new(
codec_config: MediaCodecConfig,
cobalt_sender: CobaltSender,
stream_inspect: DataStreamInspect,
session_id: u64,
session_task: Option<fasync::Task<()>>,
) -> Self {
Self {
codec_config,
cobalt_sender,
stream_inspect: Arc::new(Mutex::new(stream_inspect)),
session_id,
_session_task: session_task,
}
}
}
impl MediaTaskRunner for ConfiguredSinkTask {
fn start(&mut self, stream: MediaStream) -> Result<Box<dyn MediaTask>, MediaTaskError> {
let codec_config = self.codec_config.clone();
let session_id = self.session_id;
let media_player_fut = media_stream_task(
stream,
Box::new(move || player::Player::new(session_id, codec_config.clone())),
self.stream_inspect.clone(),
);
let _ = self.stream_inspect.try_lock().map(|mut l| l.start());
let codec_type = self.codec_config.codec_type().clone();
let task = RunningSinkTask::start(media_player_fut, self.cobalt_sender.clone(), codec_type);
Ok(Box::new(task))
}
fn reconfigure(&mut self, codec_config: &MediaCodecConfig) -> Result<(), MediaTaskError> {
self.codec_config = codec_config.clone();
Ok(())
}
}
#[derive(Error, Debug)]
enum StreamingError {
/// The media stream ended.
#[error("Media stream ended")]
MediaStreamEnd,
/// The media stream returned an error. The error is provided.
#[error("Media stream error: {:?}", _0)]
MediaStreamError(avdtp::Error),
/// The Media Player closed unexpectedly.
#[error("Player closed unexpectedlky")]
PlayerClosed,
}
/// Sink task which is running a given media_task future, and will send it's result to multiple
/// interested parties.
/// Reports the streaming metrics to Cobalt when streaming has completed.
struct RunningSinkTask {
media_task: Option<fasync::Task<()>>,
_cobalt_task: fasync::Task<()>,
result_fut: Shared<fasync::Task<Result<(), MediaTaskError>>>,
}
impl RunningSinkTask {
fn start(
media_task: impl Future<Output = Result<(), MediaTaskError>> + Send + 'static,
cobalt_sender: CobaltSender,
codec_type: avdtp::MediaCodecType,
) -> Self {
let (sender, receiver) = oneshot::channel();
let wrapped_media_task = fasync::Task::spawn(async move {
let result = media_task.await;
let _ = sender.send(result);
});
let recv_task = fasync::Task::spawn(async move {
// Receives the result of the media task, or Canceled, from the stop() dropping it
receiver.await.unwrap_or(Ok(()))
});
let result_fut = recv_task.shared();
let cobalt_result = result_fut.clone();
let cobalt_task = fasync::Task::spawn(async move {
let start_time = fasync::Time::now();
trace::instant!("bt-a2dp-sink", "Media:Start", trace::Scope::Thread);
let _ = cobalt_result.await;
trace::instant!("bt-a2dp-sink", "Media:Stop", trace::Scope::Thread);
let end_time = fasync::Time::now();
report_stream_metrics(
cobalt_sender,
&codec_type,
(end_time - start_time).into_seconds(),
);
});
Self { media_task: Some(wrapped_media_task), result_fut, _cobalt_task: cobalt_task }
}
}
impl MediaTask for RunningSinkTask {
fn finished(&mut self) -> BoxFuture<'static, Result<(), MediaTaskError>> {
self.result_fut.clone().boxed()
}
fn stop(&mut self) -> Result<(), MediaTaskError> {
if let Some(_task) = self.media_task.take() {
info!("Media Task stopped via stop signal");
}
// Either there was already a result, or we just send Ok(()) by dropping the sender.
self.result().unwrap_or(Ok(()))
}
}
/// Wrapper function for media streaming that handles creation of the Player and the media stream
/// metrics reporting
async fn media_stream_task(
mut stream: (impl futures::Stream<Item = avdtp::Result<Vec<u8>>> + std::marker::Unpin),
player_gen: Box<dyn Fn() -> Result<player::Player, Error> + Send>,
inspect: Arc<Mutex<DataStreamInspect>>,
) -> Result<(), MediaTaskError> {
loop {
let mut player = player_gen()
.map_err(|e| MediaTaskError::Other(format!("Can't setup player: {:?}", e)))?;
// Get the first status from the player to confirm it is setup.
if let player::PlayerEvent::Closed = player.next_event().await {
return Err(MediaTaskError::Other(format!("Player failed during startup")));
}
match decode_media_stream(&mut stream, player, inspect.clone()).await {
StreamingError::PlayerClosed => info!("Player closed, rebuilding.."),
e => {
return Err(MediaTaskError::Other(format!(
"Unrecoverable streaming error: {:?}",
e
)));
}
};
}
}
/// Decodes a media stream by starting a Player and transferring media stream packets from AVDTP
/// to the player. Restarts the player on player errors.
/// Ends when signaled from `end_signal`, or when the media transport stream is closed.
async fn decode_media_stream(
stream: &mut (impl futures::Stream<Item = avdtp::Result<Vec<u8>>> + std::marker::Unpin),
mut player: player::Player,
inspect: Arc<Mutex<DataStreamInspect>>,
) -> StreamingError {
let mut packet_count: u64 = 0;
let _ = inspect.try_lock().map(|mut l| l.start());
loop {
select! {
stream_packet = stream.next().fuse() => {
let pkt = match stream_packet {
None => return StreamingError::MediaStreamEnd,
Some(Err(e)) => return StreamingError::MediaStreamError(e),
Some(Ok(packet)) => packet,
};
packet_count += 1;
// link incoming and outgoing flows togther with shared duration event
trace::duration!("bt-a2dp-sink", "ProfilePacket received");
trace::flow_end!("bluetooth", "ProfilePacket", packet_count);
let _ = inspect.try_lock().map(|mut l| {
l.record_transferred(pkt.len(), fasync::Time::now());
});
if let Err(e) = player.push_payload(&pkt.as_slice()).await {
info!("can't push packet: {:?}", e);
}
},
player_event = player.next_event().fuse() => {
match player_event {
player::PlayerEvent::Closed => return StreamingError::PlayerClosed,
player::PlayerEvent::Status(s) => {
trace!("PlayerEvent Status happened: {:?}", s);
},
}
},
}
}
}
fn report_stream_metrics(
mut cobalt_sender: CobaltSender,
codec_type: &avdtp::MediaCodecType,
duration_seconds: i64,
) {
let codec = match codec_type {
&avdtp::MediaCodecType::AUDIO_SBC => {
metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Sbc
}
&avdtp::MediaCodecType::AUDIO_AAC => {
metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Aac
}
_ => metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Unknown,
};
cobalt_sender.log_elapsed_time(
metrics::A2DP_STREAM_DURATION_IN_SECONDS_METRIC_ID,
codec as u32,
duration_seconds,
);
}
#[cfg(all(test, feature = "test_encoding"))]
mod tests {
use super::*;
use {
fidl::endpoints::create_proxy_and_stream,
fidl_fuchsia_cobalt::{CobaltEvent, EventPayload},
fidl_fuchsia_media::{
AudioConsumerRequest, AudioConsumerStatus, SessionAudioConsumerFactoryMarker,
StreamSinkRequest,
},
fuchsia_inspect as inspect,
fuchsia_inspect_derive::WithInspect,
fuchsia_zircon::DurationNum,
futures::{channel::mpsc, pin_mut, task::Poll, StreamExt},
};
use crate::tests::fake_cobalt_sender;
fn setup_media_stream_test(
) -> (fasync::Executor, MediaCodecConfig, Arc<Mutex<DataStreamInspect>>) {
let exec = fasync::Executor::new().expect("executor should build");
let sbc_config = MediaCodecConfig::min_sbc();
let inspect = Arc::new(Mutex::new(DataStreamInspect::default()));
(exec, sbc_config, inspect)
}
#[test]
/// Test that cobalt metrics are sent after stream ends
fn test_cobalt_metrics() {
let (send, mut recv) = fake_cobalt_sender();
const TEST_DURATION: i64 = 1;
report_stream_metrics(send, &avdtp::MediaCodecType::AUDIO_AAC, TEST_DURATION);
let event = recv.try_next().expect("no stream error").expect("event present");
assert_eq!(
event,
CobaltEvent {
metric_id: metrics::A2DP_STREAM_DURATION_IN_SECONDS_METRIC_ID,
event_codes: vec![
metrics::A2dpStreamDurationInSecondsMetricDimensionCodec::Aac as u32
],
component: None,
payload: EventPayload::ElapsedMicros(TEST_DURATION),
}
);
}
#[test]
fn decode_media_stream_empty() {
let (mut exec, sbc_config, inspect) = setup_media_stream_test();
let (player, _sink_requests, _consumer_requests, _vmo) =
player::tests::setup_player(&mut exec, sbc_config);
let mut empty_stream = futures::stream::empty();
let decode_fut = decode_media_stream(&mut empty_stream, player, inspect);
pin_mut!(decode_fut);
match exec.run_until_stalled(&mut decode_fut) {
Poll::Ready(StreamingError::MediaStreamEnd) => {}
x => panic!("Expected decoding to end when media stream ended, got {:?}", x),
};
}
#[test]
fn decode_media_stream_error() {
let (mut exec, sbc_config, inspect) = setup_media_stream_test();
let (player, _sink_requests, _consumer_requests, _vmo) =
player::tests::setup_player(&mut exec, sbc_config);
let mut error_stream =
futures::stream::poll_fn(|_| -> Poll<Option<avdtp::Result<Vec<u8>>>> {
Poll::Ready(Some(Err(avdtp::Error::PeerDisconnected)))
});
let decode_fut = decode_media_stream(&mut error_stream, player, inspect);
pin_mut!(decode_fut);
match exec.run_until_stalled(&mut decode_fut) {
Poll::Ready(StreamingError::MediaStreamError(avdtp::Error::PeerDisconnected)) => {}
x => panic!("Expected decoding to end with included error, got {:?}", x),
};
}
#[test]
fn decode_media_player_closed() {
let (mut exec, sbc_config, inspect) = setup_media_stream_test();
let (player, mut sink_requests, mut consumer_requests, _vmo) =
player::tests::setup_player(&mut exec, sbc_config);
let mut pending_stream = futures::stream::pending();
let decode_fut = decode_media_stream(&mut pending_stream, player, inspect);
pin_mut!(decode_fut);
match exec.run_until_stalled(&mut decode_fut) {
Poll::Pending => {}
x => panic!("Expected pending immediately after with no input but got {:?}", x),
};
let responder = match exec.run_until_stalled(&mut consumer_requests.select_next_some()) {
Poll::Ready(Ok(AudioConsumerRequest::WatchStatus { responder, .. })) => responder,
x => panic!("Expected a watch status request from the player setup, but got {:?}", x),
};
drop(responder);
drop(consumer_requests);
loop {
match exec.run_until_stalled(&mut sink_requests.select_next_some()) {
Poll::Pending => {}
x => info!("Got sink request: {:?}", x),
};
match exec.run_until_stalled(&mut decode_fut) {
Poll::Ready(StreamingError::PlayerClosed) => break,
Poll::Pending => {}
x => panic!("Expected decoding to end when player closed, got {:?}", x),
};
}
}
#[test]
fn decode_media_stream_stats() {
let mut exec = fasync::Executor::new_with_fake_time().expect("executor should build");
let sbc_config = MediaCodecConfig::min_sbc();
let (player, mut sink_requests, _consumer_requests, _vmo) =
player::tests::setup_player(&mut exec, sbc_config);
let inspector = inspect::component::inspector();
let root = inspector.root();
let d = DataStreamInspect::default().with_inspect(root, "stream").expect("attach to tree");
let inspect = Arc::new(Mutex::new(d));
exec.set_fake_time(fasync::Time::from_nanos(5_678900000));
let (mut media_sender, mut media_receiver) = mpsc::channel(1);
let decode_fut = decode_media_stream(&mut media_receiver, player, inspect);
pin_mut!(decode_fut);
assert!(exec.run_until_stalled(&mut decode_fut).is_pending());
fuchsia_inspect::assert_inspect_tree!(inspector, root: {
stream: {
start_time: 5_678900000i64,
total_bytes: 0 as u64,
bytes_per_second_current: 0 as u64,
}});
// raw rtp header with sequence number of 1 followed by 1 sbc frame
let raw = vec![
128, 96, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0x9c, 0xb1, 0x20, 0x3b, 0x80, 0x00, 0x00,
0x11, 0x7f, 0xfa, 0xab, 0xef, 0x7f, 0xfa, 0xab, 0xef, 0x80, 0x4a, 0xab, 0xaf, 0x80,
0xf2, 0xab, 0xcf, 0x83, 0x8a, 0xac, 0x32, 0x8a, 0x78, 0x8a, 0x53, 0x90, 0xdc, 0xad,
0x49, 0x96, 0xba, 0xaa, 0xe6, 0x9c, 0xa2, 0xab, 0xac, 0xa2, 0x72, 0xa9, 0x2d, 0xa8,
0x9a, 0xab, 0x75, 0xae, 0x82, 0xad, 0x49, 0xb4, 0x6a, 0xad, 0xb1, 0xba, 0x52, 0xa9,
0xa8, 0xc0, 0x32, 0xad, 0x11, 0xc6, 0x5a, 0xab, 0x3a,
];
let sbc_packet_size = 85u64;
media_sender.try_send(Ok(raw.clone())).expect("should be able to send into stream");
exec.set_fake_time(fasync::Time::after(1.seconds()));
assert!(exec.run_until_stalled(&mut decode_fut).is_pending());
// We should have updated the rx stats.
fuchsia_inspect::assert_inspect_tree!(inspector, root: {
stream: {
start_time: 5_678900000i64,
total_bytes: sbc_packet_size,
bytes_per_second_current: sbc_packet_size,
}});
// Should get a packet send to the sink eventually as player gets around to it
loop {
assert!(exec.run_until_stalled(&mut decode_fut).is_pending());
match exec.run_until_stalled(&mut sink_requests.select_next_some()) {
Poll::Ready(Ok(StreamSinkRequest::SendPacket { .. })) => break,
Poll::Pending => {}
x => panic!("Expected to receive a packet from sending data.. got {:?}", x),
};
}
}
#[test]
fn media_stream_task_reopens_player() {
let mut exec = fasync::Executor::new_with_fake_time().expect("executor should build");
let (audio_consumer_factory_proxy, mut audio_consumer_factory_request_stream) =
create_proxy_and_stream::<SessionAudioConsumerFactoryMarker>()
.expect("proxy pair creation");
let sbc_config = MediaCodecConfig::min_sbc();
let inspect = Arc::new(Mutex::new(DataStreamInspect::default()));
let pending_stream = futures::stream::pending();
let codec_type = sbc_config.codec_type().clone();
let session_id = 1;
let media_stream_fut = media_stream_task(
pending_stream,
Box::new(move || {
player::Player::from_proxy(
session_id,
sbc_config.clone(),
audio_consumer_factory_proxy.clone(),
)
}),
inspect,
);
pin_mut!(media_stream_fut);
assert!(exec.run_until_stalled(&mut media_stream_fut).is_pending());
let (_sink_request_stream, mut audio_consumer_request_stream, _sink_vmo) =
player::tests::expect_player_setup(
&mut exec,
&mut audio_consumer_factory_request_stream,
codec_type.clone(),
session_id,
);
player::tests::respond_event_status(
&mut exec,
&mut audio_consumer_request_stream,
AudioConsumerStatus {
min_lead_time: Some(50),
max_lead_time: Some(500),
error: None,
presentation_timeline: None,
..AudioConsumerStatus::EMPTY
},
);
drop(audio_consumer_request_stream);
assert!(exec.run_until_stalled(&mut media_stream_fut).is_pending());
// Should set up the player again after it closes.
let (_sink_request_stream, audio_consumer_request_stream, _sink_vmo) =
player::tests::expect_player_setup(
&mut exec,
&mut audio_consumer_factory_request_stream,
codec_type,
session_id,
);
// This time we don't respond to the event status, so the player failed immediately after
// trying to be rebuilt and we end.
drop(audio_consumer_request_stream);
assert!(exec.run_until_stalled(&mut media_stream_fut).is_ready());
}
}