blob: 0385c1cb13ed1f2068a9846be3ae8c501e5c88fe [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use bt_avdtp::{EndpointType, MediaStream};
use dyn_clone::DynClone;
use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
use fuchsia_bluetooth::types::PeerId;
use fuchsia_inspect::Node;
use fuchsia_inspect_derive::AttachError;
use futures::{
future::{BoxFuture, Shared},
FutureExt,
};
use std::time::Duration;
use thiserror::Error;
use crate::codec::MediaCodecConfig;
#[derive(Debug, Error, Clone)]
#[non_exhaustive]
pub enum MediaTaskError {
#[error("Operation or configuration not supported")]
NotSupported,
#[error("Peer closed the media stream")]
PeerClosed,
#[error("Resources needed are already being used")]
ResourcesInUse,
#[error("Other Media Task Error: {}", _0)]
Other(String),
}
impl From<bt_avdtp::Error> for MediaTaskError {
fn from(error: bt_avdtp::Error) -> Self {
Self::Other(format!("AVDTP Error: {}", error))
}
}
/// MediaTaskRunners are configured with information about the media codec when either peer in a
/// conversation configures a stream endpoint. When successfully configured, they can start
/// MediaTasks by accepting a MediaStream, which will provide or consume media on that stream until
/// dropped or stopped.
///
/// A builder that will make media task runners from requested configurations.
pub trait MediaTaskBuilder: Send + Sync + DynClone {
/// Configure a new stream based on the given `codec_config` parameters.
/// Returns a MediaTaskRunner if the configuration is supported, an
/// MediaTaskError::NotSupported otherwise.
fn configure(
&self,
peer_id: &PeerId,
codec_config: &MediaCodecConfig,
) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError>;
/// Return the direction of tasks created by this builder.
/// Source tasks provide local encoded audio to a peer.
/// Sink tasks consume encoded audio from a peer.
fn direction(&self) -> EndpointType;
/// Provide a set of encoded media configurations that this task can support.
/// This can vary based on current system capabilities, and should be checked before
/// communicating capabilities to each peer.
/// `offload` is a proxy to the offload capabilities of the controller for this peer.
/// Returns a future that resolves to the set of MediaCodecConfigs that this builder supports,
/// typically one config per MediaCodecType, or an error if building the configs failed.
fn supported_configs(
&self,
peer_id: &PeerId,
offload: Option<AudioOffloadExtProxy>,
) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>>;
}
dyn_clone::clone_trait_object!(MediaTaskBuilder);
/// MediaTaskRunners represent an ability of the media system to start streaming media.
/// They are configured for a specific codec by `MediaTaskBuilder::configure`
/// Typically a MediaTaskRunner can start multiple streams without needing to be reconfigured,
/// although possibly not simultaneously.
pub trait MediaTaskRunner: Send {
/// Start a MediaTask using the MediaStream given.
/// If the task started, returns a MediaTask which will finish if the stream ends or an
/// error occurs, and can be stopped using `MediaTask::stop` or by dropping the MediaTask.
/// This can fail with MediaTaskError::ResourcesInUse if a MediaTask cannot be started because
/// one is already running.
fn start(
&mut self,
stream: MediaStream,
offload: Option<AudioOffloadExtProxy>,
) -> Result<Box<dyn MediaTask>, MediaTaskError>;
/// Try to reconfigure the MediaTask to accept a new configuration. This differs from
/// `MediaTaskBuilder::configure` as it attempts to preserve the same configured session.
/// The runner remains configured with the initial configuration on an error.
fn reconfigure(&mut self, _config: &MediaCodecConfig) -> Result<(), MediaTaskError> {
Err(MediaTaskError::NotSupported)
}
/// Set the delay reported from the peer for this media task.
/// This should configure the media source or sink to attempt to compensate.
/// Typically this is zero for Sink tasks, but Source tasks can receive this info from the peer.
/// May only be supported before start.
/// If an Error is returned, the delay has not been set.
fn set_delay(&mut self, _delay: Duration) -> Result<(), MediaTaskError> {
Err(MediaTaskError::NotSupported)
}
/// Add information from the running media task to the inspect tree
/// (i.e. data transferred, jitter, etc)
fn iattach(&mut self, _parent: &Node, _name: &str) -> Result<(), AttachError> {
Err("attach not implemented".into())
}
}
/// MediaTasks represent a media stream being actively processed (sent or received from a peer).
/// They are are created by `MediaTaskRunner::start`.
/// Typically a MediaTask will run a background task that is active until dropped or
/// `MediaTask::stop` is called.
pub trait MediaTask: Send {
/// Returns a Future that finishes when the running media task finshes for any reason.
/// Should return a future that immediately resolves if this task is finished.
fn finished(&mut self) -> BoxFuture<'static, Result<(), MediaTaskError>>;
/// Returns the result if this task has finished, and None otherwise
fn result(&mut self) -> Option<Result<(), MediaTaskError>> {
self.finished().now_or_never()
}
/// Stops the task normally, signalling to all waiters Ok(()).
/// Returns the result sent to MediaTask::finished futures, which may be different from Ok(()).
/// When this function returns, is is good practice to ensure the MediaStream that started
/// this task is also dropped.
fn stop(&mut self) -> Result<(), MediaTaskError>;
}
pub mod tests {
use super::*;
use futures::{
channel::{mpsc, oneshot},
stream::StreamExt,
Future, TryFutureExt,
};
use std::fmt;
use std::sync::{Arc, Mutex};
#[derive(Clone)]
pub struct TestMediaTask {
/// The PeerId that was used to make this Task
pub peer_id: PeerId,
/// The configuration used to make this task
pub codec_config: MediaCodecConfig,
/// If still started, this holds the MediaStream.
pub stream: Arc<Mutex<Option<MediaStream>>>,
/// Sender for the shared result future. None if already sent.
sender: Arc<Mutex<Option<oneshot::Sender<Result<(), MediaTaskError>>>>>,
/// Shared result future.
result: Shared<BoxFuture<'static, Result<(), MediaTaskError>>>,
/// Delay the task was started with.
pub delay: Duration,
}
impl fmt::Debug for TestMediaTask {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TestMediaTask")
.field("peer_id", &self.peer_id)
.field("codec_config", &self.codec_config)
.field("result", &self.result.clone().now_or_never())
.finish()
}
}
impl TestMediaTask {
pub fn new(
peer_id: PeerId,
codec_config: MediaCodecConfig,
stream: MediaStream,
delay: Duration,
) -> Self {
let (sender, receiver) = oneshot::channel();
let result = receiver
.map_ok_or_else(
|_err| Err(MediaTaskError::Other(format!("Nothing sent"))),
|result| result,
)
.boxed()
.shared();
Self {
peer_id,
codec_config,
stream: Arc::new(Mutex::new(Some(stream))),
sender: Arc::new(Mutex::new(Some(sender))),
result,
delay,
}
}
/// Return true if the background media task is running.
pub fn is_started(&self) -> bool {
// The stream being held represents the task running.
self.stream.lock().expect("stream lock").is_some()
}
/// End the streaming task without an external stop().
/// Sends an optional result from the task.
pub fn end_prematurely(&self, task_result: Option<Result<(), MediaTaskError>>) {
let _removed_stream = self.stream.lock().expect("mutex").take();
let mut lock = self.sender.lock().expect("sender lock");
let sender = lock.take();
if let (Some(result), Some(sender)) = (task_result, sender) {
sender.send(result).expect("send ok");
}
}
}
impl MediaTask for TestMediaTask {
fn finished(&mut self) -> BoxFuture<'static, Result<(), MediaTaskError>> {
self.result.clone().boxed()
}
fn stop(&mut self) -> Result<(), MediaTaskError> {
let _ = self.stream.lock().expect("stream lock").take();
{
let mut lock = self.sender.lock().expect("sender lock");
if let Some(sender) = lock.take() {
let _ = sender.send(Ok(()));
return Ok(());
}
}
// Result should be available.
self.finished().now_or_never().unwrap()
}
}
pub struct TestMediaTaskRunner {
/// The peer_id this was started with.
pub peer_id: PeerId,
/// The config that this runner will start tasks for
pub codec_config: MediaCodecConfig,
/// If this is reconfigurable
pub reconfigurable: bool,
/// If this supports delay reporting
pub supports_set_delay: bool,
/// What the delay is right now
pub set_delay: Option<std::time::Duration>,
/// The Sender that will send a clone of the started tasks to the builder.
pub sender: mpsc::Sender<TestMediaTask>,
}
impl MediaTaskRunner for TestMediaTaskRunner {
fn start(
&mut self,
stream: MediaStream,
_offload: Option<AudioOffloadExtProxy>,
) -> Result<Box<dyn MediaTask>, MediaTaskError> {
let task = TestMediaTask::new(
self.peer_id.clone(),
self.codec_config.clone(),
stream,
self.set_delay.unwrap_or(Duration::ZERO),
);
// Don't particularly care if the receiver got dropped.
let _ = self.sender.try_send(task.clone());
Ok(Box::new(task))
}
fn set_delay(&mut self, delay: std::time::Duration) -> Result<(), MediaTaskError> {
if self.supports_set_delay {
self.set_delay = Some(delay);
Ok(())
} else {
Err(MediaTaskError::NotSupported)
}
}
fn reconfigure(&mut self, config: &MediaCodecConfig) -> Result<(), MediaTaskError> {
if self.reconfigurable {
self.codec_config = config.clone();
Ok(())
} else {
Err(MediaTaskError::NotSupported)
}
}
}
/// A TestMediaTask expects to be configured once, and then started and stopped as appropriate.
/// It will Error if started again while started or stopped while stopped, or if it was
/// configured multiple times.
pub struct TestMediaTaskBuilder {
sender: Mutex<mpsc::Sender<TestMediaTask>>,
receiver: mpsc::Receiver<TestMediaTask>,
reconfigurable: bool,
supports_set_delay: bool,
configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
direction: EndpointType,
}
impl TestMediaTaskBuilder {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(5);
Self {
sender: Mutex::new(sender),
receiver,
reconfigurable: false,
supports_set_delay: false,
configs: Ok(vec![crate::codec::MediaCodecConfig::min_sbc()]),
direction: EndpointType::Sink,
}
}
pub fn with_configs(
&mut self,
configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
) -> &mut Self {
self.configs = configs;
self
}
pub fn with_direction(&mut self, direction: EndpointType) -> &mut Self {
self.direction = direction;
self
}
pub fn new_reconfigurable() -> Self {
Self { reconfigurable: true, ..Self::new() }
}
pub fn new_delayable() -> Self {
Self { supports_set_delay: true, ..Self::new() }
}
/// Returns a type that implements MediaTaskBuilder. When a MediaTask is built using
/// configure(), it will be available from `next_task`.
pub fn builder(&self) -> Box<dyn MediaTaskBuilder> {
Box::new(TestMediaTaskBuilderBuilder {
sender: self.sender.lock().expect("locking").clone(),
reconfigurable: self.reconfigurable,
supports_set_delay: self.supports_set_delay,
configs: self.configs.clone(),
direction: self.direction,
})
}
/// Gets a future that will return a handle to the next TestMediaTask that gets started
/// from a Runner that was retrieved from this builder.
/// The TestMediaTask, can tell you when it's started and give you a handle to the MediaStream.
pub fn next_task(&mut self) -> impl Future<Output = Option<TestMediaTask>> + '_ {
self.receiver.next()
}
/// Expects that a task had been built, and retrieves that task, or panics.
#[track_caller]
pub fn expect_task(&mut self) -> TestMediaTask {
self.receiver
.try_next()
.expect("should have made a task")
.expect("shouldn't have dropped all senders")
}
}
#[derive(Clone)]
struct TestMediaTaskBuilderBuilder {
sender: mpsc::Sender<TestMediaTask>,
reconfigurable: bool,
supports_set_delay: bool,
configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
direction: EndpointType,
}
impl MediaTaskBuilder for TestMediaTaskBuilderBuilder {
fn configure(
&self,
peer_id: &PeerId,
codec_config: &MediaCodecConfig,
) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError> {
let runner = TestMediaTaskRunner {
peer_id: peer_id.clone(),
codec_config: codec_config.clone(),
sender: self.sender.clone(),
reconfigurable: self.reconfigurable,
supports_set_delay: self.supports_set_delay,
set_delay: None,
};
Ok::<Box<dyn MediaTaskRunner>, _>(Box::new(runner))
}
fn direction(&self) -> EndpointType {
self.direction
}
fn supported_configs(
&self,
_peer_id: &PeerId,
_offload: Option<AudioOffloadExtProxy>,
) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>> {
futures::future::ready(self.configs.clone()).boxed()
}
}
}