blob: ed24146a62b279c0b6686de3e6d7530f9fc423c0 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::Error,
bt_a2dp::{codec::MediaCodecConfig, media_task::*},
bt_avdtp::MediaStream,
fidl_fuchsia_media::{AudioChannelId, AudioPcmMode, PcmFormat},
fuchsia_async as fasync,
fuchsia_bluetooth::{inspect::DataStreamInspect, types::PeerId},
fuchsia_trace as trace,
futures::{
channel::oneshot,
future::{BoxFuture, Shared},
lock::Mutex,
AsyncWriteExt, FutureExt, TryFutureExt, TryStreamExt,
},
log::{info, trace, warn},
std::sync::Arc,
};
use crate::encoding::EncodedStream;
use crate::sources;
/// SourceTaskBuilder is a MediaTaskBuilder will build `ConfiguredSourceTask`s when configured.
/// `source_type` determines where the source of audio is provided.
/// When configured, a test stream is created to confirm that it is possible to stream audio using
/// the configuration. This stream is discarded and the stream is restarted when the resulting
/// `ConfiguredSourceTask` is started.
#[derive(Clone)]
pub struct SourceTaskBuilder {
/// The type of source audio.
source_type: sources::AudioSourceType,
}
impl MediaTaskBuilder for SourceTaskBuilder {
fn configure(
&self,
peer_id: &PeerId,
codec_config: &MediaCodecConfig,
data_stream_inspect: DataStreamInspect,
) -> BoxFuture<'static, Result<Box<dyn MediaTaskRunner>, MediaTaskError>> {
let res = self.configure_task(peer_id, codec_config, data_stream_inspect);
Box::pin(async { Ok::<Box<dyn MediaTaskRunner>, _>(Box::new(res?)) })
}
}
impl SourceTaskBuilder {
/// Make a new builder that will source audio from `source_type`. See `sources::build_stream`
/// for documentation on the types of streams that are available.
pub fn new(source_type: sources::AudioSourceType) -> Self {
Self { source_type }
}
pub(crate) fn configure_task(
&self,
peer_id: &PeerId,
codec_config: &MediaCodecConfig,
data_stream_inspect: DataStreamInspect,
) -> Result<ConfiguredSourceTask, MediaTaskError> {
let channel_map = match codec_config.channel_count() {
Ok(1) => vec![AudioChannelId::Cf],
Ok(2) => vec![AudioChannelId::Lf, AudioChannelId::Rf],
Ok(_) | Err(_) => return Err(MediaTaskError::ConfigurationNotSupported),
};
let pcm_format = PcmFormat {
pcm_mode: AudioPcmMode::Linear,
bits_per_sample: 16,
frames_per_second: codec_config.sampling_frequency()?,
channel_map,
};
let source_stream = sources::build_stream(&peer_id, pcm_format.clone(), self.source_type)
.map_err(|_e| MediaTaskError::ConfigurationNotSupported)?;
if let Err(e) = EncodedStream::build(pcm_format.clone(), source_stream, codec_config) {
trace!("SourceTaskBuilder: can't build encoded stream: {:?}", e);
return Err(MediaTaskError::Other(format!("Can't build encoded stream: {}", e)));
}
Ok(ConfiguredSourceTask::build(
pcm_format,
self.source_type,
peer_id.clone(),
codec_config,
data_stream_inspect,
))
}
}
/// Provides audio from this to the MediaStream when started. Streams are created and started when
/// this task is started, and destoyed when stopped.
pub(crate) struct ConfiguredSourceTask {
/// The type of source audio.
source_type: sources::AudioSourceType,
/// Format the source audio should be produced in.
pub(crate) pcm_format: PcmFormat,
/// Id of the peer that will be receiving the stream. Used to distinguish sources for Fuchsia
/// Media.
peer_id: PeerId,
/// Configuration providing the format of encoded audio requested by the peer.
codec_config: MediaCodecConfig,
/// Data Stream inspect object for tracking total bytes / current transfer speed.
data_stream_inspect: Arc<Mutex<DataStreamInspect>>,
}
impl ConfiguredSourceTask {
/// Build a new ConfiguredSourceTask. Usually only called by SourceTaskBuilder.
/// `ConfiguredSourceTask::start` will only return errors if the settings here cannot produce a
/// stream. No checks are done when building.
pub(crate) fn build(
pcm_format: PcmFormat,
source_type: sources::AudioSourceType,
peer_id: PeerId,
codec_config: &MediaCodecConfig,
data_stream_inspect: DataStreamInspect,
) -> Self {
Self {
pcm_format,
source_type,
peer_id,
codec_config: codec_config.clone(),
data_stream_inspect: Arc::new(Mutex::new(data_stream_inspect)),
}
}
}
impl MediaTaskRunner for ConfiguredSourceTask {
fn start(&mut self, stream: MediaStream) -> Result<Box<dyn MediaTask>, MediaTaskError> {
let source_stream =
sources::build_stream(&self.peer_id, self.pcm_format.clone(), self.source_type)
.map_err(|e| MediaTaskError::Other(format!("Building stream: {}", e)))?;
let encoded_stream =
EncodedStream::build(self.pcm_format.clone(), source_stream, &self.codec_config)
.map_err(|e| MediaTaskError::Other(format!("Can't build encoded stream: {}", e)))?;
let stream_task = RunningSourceTask::build(
self.codec_config.clone(),
encoded_stream,
stream,
self.data_stream_inspect.clone(),
);
let _ = self.data_stream_inspect.try_lock().map(|mut l| l.start());
Ok(Box::new(stream_task))
}
}
struct RunningSourceTask {
stream_task: Option<fasync::Task<()>>,
result_fut: Shared<BoxFuture<'static, Result<(), MediaTaskError>>>,
}
impl RunningSourceTask {
/// The main streaming task. Reads encoded audio from the encoded_stream and packages into RTP
/// packets, sending the resulting RTP packets using `media_stream`.
async fn stream_task(
codec_config: MediaCodecConfig,
mut encoded_stream: EncodedStream,
mut media_stream: MediaStream,
data_stream_inspect: Arc<Mutex<DataStreamInspect>>,
) -> Result<(), Error> {
let frames_per_encoded = codec_config.pcm_frames_per_encoded_frame() as u32;
let max_tx_size = media_stream.max_tx_size()?;
let mut packet_builder = codec_config.make_packet_builder(max_tx_size)?;
loop {
let encoded = match encoded_stream.try_next().await? {
None => continue,
Some(encoded) => encoded,
};
let packets = match packet_builder.add_frame(encoded, frames_per_encoded) {
Err(e) => {
warn!("Can't add packet to RTP packet: {:?}", e);
continue;
}
Ok(packets) => packets,
};
for packet in packets {
trace::duration_begin!("bt-a2dp", "Media:PacketSent");
if let Err(e) = media_stream.write(&packet).await {
info!("Failed sending packet to peer: {}", e);
trace::duration_end!("bt-a2dp", "Media:PacketSent");
return Ok(());
}
let _ = data_stream_inspect.try_lock().map(|mut l| {
l.record_transferred(packet.len(), fasync::Time::now());
});
trace::duration_end!("bt-a2dp", "Media:PacketSent");
}
}
}
fn build(
codec_config: MediaCodecConfig,
encoded_stream: EncodedStream,
media_stream: MediaStream,
inspect: Arc<Mutex<DataStreamInspect>>,
) -> Self {
let (sender, receiver) = oneshot::channel();
let stream_task_fut =
Self::stream_task(codec_config, encoded_stream, media_stream, inspect);
let wrapped_task = fasync::Task::spawn(async move {
trace::instant!("bt-a2dp", "Media:Start", trace::Scope::Thread);
let result = stream_task_fut
.await
.map_err(|e| MediaTaskError::Other(format!("Error in streaming audio: {}", e)));
let _ = sender.send(result);
});
let result_fut = receiver.map_ok_or_else(|_err| Ok(()), |result| result).boxed().shared();
Self { stream_task: Some(wrapped_task), result_fut }
}
}
impl MediaTask for RunningSourceTask {
fn finished(&mut self) -> BoxFuture<'static, Result<(), MediaTaskError>> {
self.result_fut.clone().boxed()
}
fn stop(&mut self) -> Result<(), MediaTaskError> {
if let Some(_task) = self.stream_task.take() {
trace::instant!("bt-a2dp", "Media:Stopped", trace::Scope::Thread);
}
// Either a result already happened, or we just sent an Ok(()) by dropping the result
// sender.
self.result().unwrap_or(Ok(()))
}
}
#[cfg(all(test, feature = "test_encoding"))]
mod tests {
use super::*;
use bt_a2dp::{codec::MediaCodecConfig, media_types::*};
use bt_avdtp::MediaCodecType;
use fuchsia_bluetooth::types::Channel;
use fuchsia_inspect as inspect;
use fuchsia_inspect_derive::WithInspect;
use futures::StreamExt;
use parking_lot::Mutex;
use std::sync::RwLock;
use test_util::assert_gt;
#[test]
fn configures_source_from_codec_config() {
let _exec = fasync::Executor::new().expect("failed to create an executor");
let builder = SourceTaskBuilder::new(sources::AudioSourceType::BigBen);
// Minimum SBC requirements are mono, 48kHz
let mono_config = MediaCodecConfig::min_sbc();
let task = builder
.configure_task(&PeerId(1), &mono_config, DataStreamInspect::default())
.expect("should build okay");
assert_eq!(48000, task.pcm_format.frames_per_second);
assert_eq!(1, task.pcm_format.channel_map.len());
// A standard SBC audio config which is stereo and 44.1kHz
let sbc_codec_info = SbcCodecInfo::new(
SbcSamplingFrequency::FREQ44100HZ,
SbcChannelMode::JOINT_STEREO,
SbcBlockCount::SIXTEEN,
SbcSubBands::EIGHT,
SbcAllocation::LOUDNESS,
SbcCodecInfo::BITPOOL_MIN,
SbcCodecInfo::BITPOOL_MAX,
)
.unwrap();
let stereo_config =
MediaCodecConfig::build(MediaCodecType::AUDIO_SBC, &sbc_codec_info.to_bytes().to_vec())
.unwrap();
let task = builder
.configure_task(&PeerId(1), &stereo_config, DataStreamInspect::default())
.expect("should build okay");
assert_eq!(44100, task.pcm_format.frames_per_second);
assert_eq!(2, task.pcm_format.channel_map.len());
}
#[test]
fn source_media_stream_stats() {
let mut exec = fasync::Executor::new().expect("executor should build");
let builder = SourceTaskBuilder::new(sources::AudioSourceType::BigBen);
let inspector = inspect::component::inspector();
let root = inspector.root();
let d = DataStreamInspect::default().with_inspect(root, "stream").expect("attach to tree");
// Minimum SBC requirements are mono, 48kHz
let mono_config = MediaCodecConfig::min_sbc();
let mut task =
builder.configure_task(&PeerId(1), &mono_config, d).expect("should build okay");
let (mut remote, local) = Channel::create();
let local = Arc::new(RwLock::new(local));
let weak_local = Arc::downgrade(&local);
let stream = MediaStream::new(Arc::new(Mutex::new(true)), weak_local);
let _running_task = task.start(stream).expect("media should start");
let _ = exec.run_singlethreaded(remote.next()).expect("some packet");
let hierarchy =
exec.run_singlethreaded(inspect::reader::read(inspector)).expect("got hierarchy");
// We don't know exactly how many were sent at this point, but make sure we got at
// least some recorded.
let total_bytes = hierarchy
.get_property_by_path(&vec!["stream", "total_bytes"])
.expect("missing property");
assert_gt!(total_bytes.uint().expect("uint"), &0);
let bytes_per_second_current = hierarchy
.get_property_by_path(&vec!["stream", "bytes_per_second_current"])
.expect("missing property");
assert_gt!(bytes_per_second_current.uint().expect("uint"), &0);
}
}