blob: 4102a3196235202feeb9375c14d201c7df67d694 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![recursion_limit = "512"]
use {
anyhow::{format_err, Context as _, Error},
argh::FromArgs,
bt_a2dp::{
codec::{CodecNegotiation, MediaCodecConfig},
connected_peers::ConnectedPeers,
media_types::*,
peer::ControllerPool,
stream,
},
bt_a2dp_metrics as metrics,
bt_avdtp::{self as avdtp, ServiceCapability, ServiceCategory, StreamEndpoint},
fidl::endpoints::create_request_stream,
fidl_fuchsia_bluetooth_a2dp::{AudioModeRequest, AudioModeRequestStream, Role},
fidl_fuchsia_bluetooth_bredr as bredr,
fidl_fuchsia_media::{AudioChannelId, AudioPcmMode, PcmFormat},
fidl_fuchsia_media_sessions2 as sessions2,
fuchsia_async::{self as fasync, DurationExt},
fuchsia_bluetooth::{
profile::{find_profile_descriptors, find_service_classes, profile_descriptor_to_assigned},
types::{Channel, PeerId, Uuid},
},
fuchsia_cobalt::{CobaltConnector, CobaltSender, ConnectionType},
fuchsia_component::server::ServiceFs,
fuchsia_inspect as inspect,
fuchsia_inspect_derive::Inspect,
fuchsia_zircon as zx,
futures::{self, select, StreamExt, TryStreamExt},
log::{error, info, trace, warn},
parking_lot::Mutex,
std::{convert::TryFrom, convert::TryInto, sync::Arc},
};
mod avrcp_relay;
mod avrcp_target;
mod encoding;
mod latm;
mod pcm_audio;
mod player;
mod sink_task;
mod source_task;
mod sources;
mod volume_relay;
use crate::encoding::EncodedStream;
use crate::pcm_audio::PcmAudio;
use sources::AudioSourceType;
/// Make the SDP definition for the A2DP service.
fn make_profile_service_definition(service_uuid: Uuid) -> bredr::ServiceDefinition {
bredr::ServiceDefinition {
service_class_uuids: Some(vec![service_uuid.into()]),
protocol_descriptor_list: Some(vec![
bredr::ProtocolDescriptor {
protocol: bredr::ProtocolIdentifier::L2Cap,
params: vec![bredr::DataElement::Uint16(bredr::PSM_AVDTP)],
},
bredr::ProtocolDescriptor {
protocol: bredr::ProtocolIdentifier::Avdtp,
params: vec![bredr::DataElement::Uint16(0x0103)], // Indicate v1.3
},
]),
profile_descriptors: Some(vec![bredr::ProfileDescriptor {
profile_id: bredr::ServiceClassProfileIdentifier::AdvancedAudioDistribution,
major_version: 1,
minor_version: 2,
}]),
..bredr::ServiceDefinition::EMPTY
}
}
// SDP Attribute ID for the Supported Features of A2DP.
// Defined in Assigned Numbers for SDP
// https://www.bluetooth.com/specifications/assigned-numbers/service-discovery
const ATTR_A2DP_SUPPORTED_FEATURES: u16 = 0x0311;
// Arbitrarily chosen IDs for the endpoints we might enable.
const SBC_SINK_SEID: u8 = 6;
const AAC_SINK_SEID: u8 = 7;
const SBC_SOURCE_SEID: u8 = 8;
const AAC_SOURCE_SEID: u8 = 9;
pub const DEFAULT_SAMPLE_RATE: u32 = 48000;
pub const DEFAULT_SESSION_ID: u64 = 0;
// Highest AAC bitrate we want to transmit
const MAX_BITRATE_AAC: u32 = 250000;
/// Pick a reasonable quality bitrate to use by default. 64k average per channel.
const PREFERRED_BITRATE_AAC: u32 = 128000;
// Duration for A2DP to wait before assuming role of the initiator.
// If a signaling channel has not been established by this time, A2DP will
// create the signaling channel, configure, open and start the stream.
const INITIATOR_DELAY: zx::Duration = zx::Duration::from_seconds(2);
fn find_codec_cap<'a>(endpoint: &'a StreamEndpoint) -> Option<&'a ServiceCapability> {
endpoint.capabilities().iter().find(|cap| cap.category() == ServiceCategory::MediaCodec)
}
#[derive(Clone)]
struct StreamsBuilder {
cobalt_sender: CobaltSender,
codec_negotiation: CodecNegotiation,
domain: Option<String>,
aac_available: bool,
source_type: AudioSourceType,
}
impl StreamsBuilder {
async fn system_available(
cobalt_sender: CobaltSender,
domain: Option<String>,
source_type: AudioSourceType,
) -> Result<Self, Error> {
// TODO(fxbug.dev/1126): detect codecs, add streams for each codec
// Sink codecs
// SBC is required for sink.
let sbc_endpoint = Self::build_sbc_sink_endpoint()?;
let sbc_codec_cap = find_codec_cap(&sbc_endpoint).expect("just built");
let sbc_config = MediaCodecConfig::try_from(sbc_codec_cap)?;
if let Err(e) = player::Player::test_playable(&sbc_config).await {
warn!("Can't play required SBC audio: {}", e);
return Err(e);
}
let mut caps_available = vec![sbc_codec_cap.clone()];
let aac_endpoint = Self::build_aac_sink_endpoint()?;
let aac_codec_cap = find_codec_cap(&aac_endpoint).expect("just built");
let aac_config = MediaCodecConfig::try_from(aac_codec_cap)?;
let aac_available = player::Player::test_playable(&aac_config).await.is_ok();
if aac_available {
caps_available = vec![
Self::build_aac_capability(avdtp::EndpointType::Sink, PREFERRED_BITRATE_AAC)?,
sbc_codec_cap.clone(),
];
}
let codec_negotiation = CodecNegotiation::build(caps_available, avdtp::EndpointType::Sink)?;
Ok(Self { cobalt_sender, codec_negotiation, domain, aac_available, source_type })
}
fn build_sbc_sink_endpoint() -> avdtp::Result<avdtp::StreamEndpoint> {
let sbc_codec_info = SbcCodecInfo::new(
SbcSamplingFrequency::MANDATORY_SNK,
SbcChannelMode::MANDATORY_SNK,
SbcBlockCount::MANDATORY_SNK,
SbcSubBands::MANDATORY_SNK,
SbcAllocation::MANDATORY_SNK,
SbcCodecInfo::BITPOOL_MIN,
SbcCodecInfo::BITPOOL_MAX,
)?;
trace!("Supported SBC codec parameters: {:?}.", sbc_codec_info);
avdtp::StreamEndpoint::new(
SBC_SINK_SEID,
avdtp::MediaType::Audio,
avdtp::EndpointType::Sink,
vec![
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: sbc_codec_info.to_bytes().to_vec(),
},
],
)
}
fn build_aac_capability(
endpoint_type: avdtp::EndpointType,
bitrate: u32,
) -> avdtp::Result<avdtp::ServiceCapability> {
let codec_info = match endpoint_type {
avdtp::EndpointType::Sink => AacCodecInfo::new(
AacObjectType::MANDATORY_SNK,
AacSamplingFrequency::MANDATORY_SNK,
AacChannels::MANDATORY_SNK,
true,
bitrate,
)?,
avdtp::EndpointType::Source => AacCodecInfo::new(
AacObjectType::MANDATORY_SRC,
AacSamplingFrequency::FREQ48000HZ,
AacChannels::TWO,
true,
bitrate,
)?,
};
trace!("Supported AAC codec parameters: {:?}.", codec_info);
Ok(ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_AAC,
codec_extra: codec_info.to_bytes().to_vec(),
})
}
fn build_aac_sink_endpoint() -> avdtp::Result<avdtp::StreamEndpoint> {
let endpoint_type = avdtp::EndpointType::Sink;
// 0 = Unknown constant bitrate support (A2DP Sec. 4.5.2.4)
let codec_cap = Self::build_aac_capability(endpoint_type, /* bitrate = */ 0)?;
avdtp::StreamEndpoint::new(
AAC_SINK_SEID,
avdtp::MediaType::Audio,
endpoint_type,
vec![ServiceCapability::MediaTransport, codec_cap],
)
}
fn build_sbc_source_endpoint() -> avdtp::Result<avdtp::StreamEndpoint> {
let sbc_codec_info = SbcCodecInfo::new(
SbcSamplingFrequency::FREQ48000HZ,
SbcChannelMode::JOINT_STEREO,
SbcBlockCount::MANDATORY_SRC,
SbcSubBands::MANDATORY_SRC,
SbcAllocation::MANDATORY_SRC,
SbcCodecInfo::BITPOOL_MIN,
SbcCodecInfo::BITPOOL_MAX,
)?;
trace!("Supported SBC codec parameters: {:?}.", sbc_codec_info);
let codec_cap = ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: sbc_codec_info.to_bytes().to_vec(),
};
avdtp::StreamEndpoint::new(
SBC_SOURCE_SEID,
avdtp::MediaType::Audio,
avdtp::EndpointType::Source,
vec![ServiceCapability::MediaTransport, codec_cap],
)
}
fn build_aac_source_endpoint() -> avdtp::Result<avdtp::StreamEndpoint> {
let endpoint_type = avdtp::EndpointType::Source;
let codec_cap = Self::build_aac_capability(endpoint_type, MAX_BITRATE_AAC)?;
avdtp::StreamEndpoint::new(
AAC_SOURCE_SEID,
avdtp::MediaType::Audio,
endpoint_type,
vec![ServiceCapability::MediaTransport, codec_cap],
)
}
fn streams(&self) -> Result<stream::Streams, Error> {
let publisher =
fuchsia_component::client::connect_to_service::<sessions2::PublisherMarker>()
.context("Failed to connect to MediaSession interface")?;
let domain = self.domain.clone().unwrap_or("Bluetooth".to_string());
let sink_task_builder =
sink_task::SinkTaskBuilder::new(self.cobalt_sender.clone(), publisher, domain);
let source_task_builder = source_task::SourceTaskBuilder::new(self.source_type);
let mut streams = stream::Streams::new();
let sbc_sink_endpoint = Self::build_sbc_sink_endpoint()?;
streams.insert(stream::Stream::build(sbc_sink_endpoint, sink_task_builder.clone()));
let sbc_source_endpoint = Self::build_sbc_source_endpoint()?;
streams.insert(stream::Stream::build(sbc_source_endpoint, source_task_builder.clone()));
if self.aac_available {
let aac_sink_endpoint = Self::build_aac_sink_endpoint()?;
streams.insert(stream::Stream::build(aac_sink_endpoint, sink_task_builder.clone()));
let aac_source_endpoint = Self::build_aac_source_endpoint()?;
streams.insert(stream::Stream::build(aac_source_endpoint, source_task_builder.clone()));
}
Ok(streams)
}
fn negotiation(&self) -> CodecNegotiation {
self.codec_negotiation.clone()
}
}
/// Establishes the signaling channel after an INITIATOR_DELAY.
async fn connect_after_timeout(
peer_id: PeerId,
peers: Arc<Mutex<ConnectedPeers>>,
profile_svc: bredr::ProfileProxy,
channel_mode: bredr::ChannelMode,
) {
trace!("waiting {}s before connecting to peer {}.", INITIATOR_DELAY.into_seconds(), peer_id);
fuchsia_async::Timer::new(INITIATOR_DELAY.after_now()).await;
if peers.lock().is_connected(&peer_id) {
return;
}
trace!("peer {} didn't connect - initiating connection", peer_id);
let channel = match profile_svc
.connect(
&mut peer_id.into(),
&mut bredr::ConnectParameters::L2cap(bredr::L2capParameters {
psm: Some(bredr::PSM_AVDTP),
parameters: Some(bredr::ChannelParameters {
channel_mode: Some(channel_mode),
..bredr::ChannelParameters::EMPTY
}),
..bredr::L2capParameters::EMPTY
}),
)
.await
{
Err(e) => {
warn!("FIDL error on connect: {:?}", e);
return;
}
Ok(Err(e)) => {
warn!("Couldn't connect to {}: {:?}", peer_id, e);
return;
}
Ok(Ok(channel)) => channel,
};
let channel: Channel = match channel.try_into() {
Err(e) => {
warn!("Didn't get channel from peer {}: {}", peer_id, e);
return;
}
Ok(chan) => chan,
};
info!(
"Connected to {}: mode {} max_tx {}",
peer_id,
channel.channel_mode(),
channel.max_tx_size()
);
if let Err(e) = peers.lock().connected(peer_id, channel, /* initiator = */ true) {
warn!("Problem connecting to peer: {}", e);
}
}
/// Handles found services. Stores the found information and then spawns a task which will
/// assume initator role after a delay.
fn handle_services_found(
peer_id: &PeerId,
attributes: &[bredr::Attribute],
peers: Arc<Mutex<ConnectedPeers>>,
profile_svc: bredr::ProfileProxy,
channel_mode: bredr::ChannelMode,
) {
let service_names: Vec<&str> =
find_service_classes(attributes).iter().map(|an| an.name).collect();
let profiles = find_profile_descriptors(attributes).unwrap_or(vec![]);
let profile_names: Vec<String> = profiles
.iter()
.filter_map(|p| {
profile_descriptor_to_assigned(p)
.map(|a| format!("{} ({}.{})", a.name, p.major_version, p.minor_version))
})
.collect();
info!(
"Audio profile found on {}, classes: {:?}, profiles: {:?}",
peer_id, service_names, profile_names
);
let profile = match profiles.first() {
Some(profile) => profile.clone(),
None => {
info!("Couldn't find profile in peer {} search results, ignoring.", peer_id);
return;
}
};
peers.lock().found(peer_id.clone(), profile);
if peers.lock().is_connected(&peer_id) {
return;
}
fasync::Task::local(connect_after_timeout(
peer_id.clone(),
peers.clone(),
profile_svc,
channel_mode,
))
.detach();
}
/// Parses the ChannelMode from the String argument.
///
/// Returns an Error if the provided argument is an invalid string.
fn channel_mode_from_arg(channel_mode: Option<String>) -> Result<bredr::ChannelMode, Error> {
match channel_mode {
None => Ok(bredr::ChannelMode::Basic),
Some(s) if s == "basic" => Ok(bredr::ChannelMode::Basic),
Some(s) if s == "ertm" => Ok(bredr::ChannelMode::EnhancedRetransmission),
Some(s) => return Err(format_err!("invalid channel mode: {}", s)),
}
}
/// Options available from the command line
#[derive(FromArgs)]
#[argh(description = "Bluetooth Advanced Audio Distribution Profile")]
struct Opt {
#[argh(option)]
/// published Media Session Domain (optional, defaults to a native Fuchsia session)
// TODO - Point to any media documentation about domains
domain: Option<String>,
#[argh(option, default = "AudioSourceType::AudioOut")]
/// audio source. options: [audio_out, big_ben]. Defaults to 'audio_out'
source: AudioSourceType,
#[argh(option, short = 'c', long = "channelmode")]
/// channel mode preferred for the AVDTP signaling channel
/// (optional, defaults to "basic", values: "basic", "ertm").
channel_mode: Option<String>,
}
async fn test_encode_sbc() -> Result<(), Error> {
// all sinks must support these options
let required_format = PcmFormat {
pcm_mode: AudioPcmMode::Linear,
bits_per_sample: 16,
frames_per_second: 48000,
channel_map: vec![AudioChannelId::Lf],
};
EncodedStream::test(required_format, &MediaCodecConfig::min_sbc()).await
}
/// Handles role change requests from serving AudioMode
fn handle_audio_mode_connection(
peers: Arc<Mutex<ConnectedPeers>>,
mut stream: AudioModeRequestStream,
) {
fasync::Task::spawn(async move {
info!("AudioMode Client connected");
while let Some(request) = stream.next().await {
match request {
Err(e) => info!("AudioMode client connection error: {}", e),
Ok(AudioModeRequest::SetRole { role, responder }) => {
// We want to be `role` so we prefer to start streams of the opposite direction.
let direction = match role {
Role::Source => avdtp::EndpointType::Sink,
Role::Sink => avdtp::EndpointType::Source,
};
info!("Setting AudioMode to {:?}", role);
peers.lock().set_preferred_direction(direction);
if let Err(e) = responder.send() {
warn!("Failed to respond to mode request: {}", e);
}
}
}
}
})
.detach();
}
#[fasync::run_singlethreaded]
async fn main() -> Result<(), Error> {
let opts: Opt = argh::from_env();
fuchsia_syslog::init_with_tags(&["a2dp"]).expect("Can't init logger");
fuchsia_trace_provider::trace_provider_create_with_fdio();
let signaling_channel_mode = channel_mode_from_arg(opts.channel_mode)?;
// Check to see that we can encode SBC audio.
// This is a requireement of A2DP 1.3: Section 4.2
if let Err(e) = test_encode_sbc().await {
error!("Can't encode SBC Audio: {:?}", e);
return Ok(());
}
let controller_pool = Arc::new(ControllerPool::new());
let mut fs = ServiceFs::new();
let inspect = inspect::Inspector::new();
inspect.serve(&mut fs)?;
let abs_vol_relay = volume_relay::VolumeRelay::start();
if let Err(e) = &abs_vol_relay {
info!("Failed to start AbsoluteVolume Relay: {:?}", e);
}
let cobalt_logger: CobaltSender = {
let (sender, reporter) =
CobaltConnector::default().serve(ConnectionType::project_id(metrics::PROJECT_ID));
fasync::Task::spawn(reporter).detach();
sender
};
let stream_builder =
StreamsBuilder::system_available(cobalt_logger.clone(), opts.domain, opts.source).await?;
let profile_svc = fuchsia_component::client::connect_to_service::<bredr::ProfileMarker>()
.context("Failed to connect to Bluetooth Profile service")?;
let mut peers = ConnectedPeers::new(
stream_builder.streams()?,
stream_builder.negotiation(),
profile_svc.clone(),
Some(cobalt_logger.clone()),
);
if let Err(e) = peers.iattach(&inspect.root(), "connected") {
warn!("Failed to attach to inspect: {:?}", e);
}
let peers_connected_stream = peers.connected_stream();
let _controller_pool_connected_task = fasync::Task::spawn({
let pool = controller_pool.clone();
peers_connected_stream.map(move |p| pool.peer_connected(p)).collect::<()>()
});
// TODO(fxbug.dev/66124): launch the target component based on the A2DP source availability
let _avrcp_target = avrcp_target::launch().await.or_else(|e| {
warn!("Couldn't launch AVRCP target: {}", e);
Err(e)
});
let peers = Arc::new(Mutex::new(peers));
fs.dir("svc").add_fidl_service(move |s| controller_pool.connected(s));
fs.dir("svc").add_fidl_service({
let peers = peers.clone();
move |s| handle_audio_mode_connection(peers.clone(), s)
});
if let Err(e) = fs.take_and_serve_directory_handle() {
warn!("Unable to serve service directory: {}", e);
}
let _servicefs_task = fasync::Task::spawn(fs.collect::<()>());
let source_uuid = Uuid::new16(bredr::ServiceClassProfileIdentifier::AudioSource as u16);
let sink_uuid = Uuid::new16(bredr::ServiceClassProfileIdentifier::AudioSink as u16);
let service_defs = vec![
make_profile_service_definition(source_uuid),
make_profile_service_definition(sink_uuid),
];
let (connect_client, connect_requests) =
create_request_stream().context("ConnectionReceiver creation")?;
let _ = profile_svc
.advertise(
&mut service_defs.into_iter(),
bredr::ChannelParameters {
channel_mode: Some(signaling_channel_mode),
..bredr::ChannelParameters::EMPTY
},
connect_client,
)
.check()?;
const ATTRS: [u16; 4] = [
bredr::ATTR_PROTOCOL_DESCRIPTOR_LIST,
bredr::ATTR_SERVICE_CLASS_ID_LIST,
bredr::ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST,
ATTR_A2DP_SUPPORTED_FEATURES,
];
let (results_client, source_results) =
create_request_stream().context("make request stream")?;
profile_svc.search(
bredr::ServiceClassProfileIdentifier::AudioSource,
&ATTRS,
results_client,
)?;
let (results_client, sink_results) = create_request_stream().context("make request stream")?;
profile_svc.search(bredr::ServiceClassProfileIdentifier::AudioSink, &ATTRS, results_client)?;
handle_profile_events(
peers,
profile_svc,
signaling_channel_mode,
connect_requests,
source_results,
sink_results,
)
.await
}
async fn handle_profile_events(
peers: Arc<Mutex<ConnectedPeers>>,
profile_svc: bredr::ProfileProxy,
channel_mode: bredr::ChannelMode,
mut connect_requests: bredr::ConnectionReceiverRequestStream,
mut source_results: bredr::SearchResultsRequestStream,
mut sink_results: bredr::SearchResultsRequestStream,
) -> Result<(), Error> {
loop {
select! {
connect_request = connect_requests.try_next() => {
let request = connect_request?.ok_or(format_err!("BR/EDR ended service registration"))?;
match request {
bredr::ConnectionReceiverRequest::Connected { peer_id, channel, .. } => {
let peer_id = peer_id.into();
let channel: Channel = channel.try_into()?;
info!("Connection from {}: mode {} max_tx {}", peer_id, channel.channel_mode(), channel.max_tx_size());
if let Err(e) = peers.lock().connected(peer_id, channel, /* initiator= */ false) {
warn!("Problem accepting peer connection: {}", e);
}
}
x => info!("Unrecognized connection request: {:?}", x),
};
}
source_result = source_results.try_next() => {
let result = source_result?.ok_or(format_err!("BR/EDR ended source service search"))?;
match result {
bredr::SearchResultsRequest::ServiceFound { peer_id, protocol, attributes, responder } => {
handle_services_found(
&peer_id.into(),
&attributes,
peers.clone(),
profile_svc.clone(),
channel_mode.clone());
responder.send()?;
}
x => info!("Unhandled Search Result: {:?}", x),
};
}
sink_result = sink_results.try_next() => {
let result = sink_result?.ok_or(format_err!("BR/EDR ended sink service search"))?;
match result {
bredr::SearchResultsRequest::ServiceFound { peer_id, protocol, attributes, responder } => {
handle_services_found(
&peer_id.into(),
&attributes,
peers.clone(),
profile_svc.clone(),
channel_mode.clone());
responder.send()?;
}
x => info!("Unhandled Search Result: {:?}", x),
};
}
complete => break,
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use async_utils::PollExt;
use fidl::endpoints::create_proxy_and_stream;
use fidl_fuchsia_bluetooth_a2dp as a2dp;
use fidl_fuchsia_bluetooth_bredr::{
ConnectionReceiverMarker, ProfileRequest, ProfileRequestStream, SearchResultsMarker,
};
use fidl_fuchsia_cobalt::CobaltEvent;
use fuchsia_bluetooth::types::PeerId;
use futures::channel::mpsc;
use futures::{pin_mut, task::Poll, StreamExt};
use matches::assert_matches;
pub(crate) fn fake_cobalt_sender() -> (CobaltSender, mpsc::Receiver<CobaltEvent>) {
const BUFFER_SIZE: usize = 100;
let (sender, receiver) = mpsc::channel(BUFFER_SIZE);
(CobaltSender::new(sender), receiver)
}
fn run_to_stalled(exec: &mut fasync::Executor) {
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
}
fn setup_connected_peers(
) -> (Arc<Mutex<ConnectedPeers>>, bredr::ProfileProxy, ProfileRequestStream) {
let (proxy, stream) = create_proxy_and_stream::<bredr::ProfileMarker>()
.expect("Profile proxy should be created");
let (cobalt_sender, _) = fake_cobalt_sender();
let peers = Arc::new(Mutex::new(ConnectedPeers::new(
stream::Streams::new(),
CodecNegotiation::build(vec![], avdtp::EndpointType::Sink).unwrap(),
proxy.clone(),
Some(cobalt_sender),
)));
(peers, proxy, stream)
}
#[test]
fn test_responds_to_search_results() {
let mut exec = fasync::Executor::new().expect("executor should build");
let (peers, profile_proxy, _profile_stream) = setup_connected_peers();
let (sink_results_proxy, sink_results_stream) =
create_proxy_and_stream::<SearchResultsMarker>()
.expect("SearchResults proxy should be created");
let (source_results_proxy, source_results_stream) =
create_proxy_and_stream::<SearchResultsMarker>()
.expect("SearchResults proxy should be created");
let (_connect_proxy, connect_stream) =
create_proxy_and_stream::<ConnectionReceiverMarker>()
.expect("ConnectionReceiver proxy should be created");
let handler_fut = handle_profile_events(
peers,
profile_proxy,
bredr::ChannelMode::Basic,
connect_stream,
source_results_stream,
sink_results_stream,
);
pin_mut!(handler_fut);
let res = exec.run_until_stalled(&mut handler_fut);
assert!(res.is_pending());
// Report a sink search result, which should be replied to.
let mut attributes = vec![];
let results_fut = sink_results_proxy.service_found(
&mut PeerId(1).into(),
None,
&mut attributes.iter_mut(),
);
pin_mut!(results_fut);
assert!(exec.run_until_stalled(&mut handler_fut).is_pending());
match exec.run_until_stalled(&mut results_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Expected a response from the result, got {:?}", x),
};
// Report a source search result, which should be replied to.
let mut attributes = vec![];
let results_fut = source_results_proxy.service_found(
&mut PeerId(2).into(),
None,
&mut attributes.iter_mut(),
);
pin_mut!(results_fut);
assert!(exec.run_until_stalled(&mut handler_fut).is_pending());
match exec.run_until_stalled(&mut results_fut) {
Poll::Ready(Ok(())) => {}
x => panic!("Expected a response from the result, got {:?}", x),
};
// Handler should finish when one of the results is disconnected.
drop(sink_results_proxy);
let res = exec.run_until_stalled(&mut handler_fut).expect("handler should have finished");
// Ending service search is an error condition for the handler.
assert_matches!(res, Err(_));
}
#[test]
/// build_local_streams should fail because it can't start the SBC encoder, because
/// MediaPlayer isn't available in the test environment.
fn test_sbc_unavailable_error() {
let mut exec = fasync::Executor::new().expect("executor should build");
let (sender, _) = fake_cobalt_sender();
let mut streams_fut =
Box::pin(StreamsBuilder::system_available(sender, None, AudioSourceType::BigBen));
let streams = exec.run_singlethreaded(&mut streams_fut);
assert!(streams.is_err(), "Stream building should fail when it can't reach MediaPlayer");
}
#[test]
/// Tests that A2DP sink assumes the initiator role when a peer is found, but
/// not connected, and the timeout completes.
fn wait_to_initiate_success_with_no_connected_peer() {
let mut exec = fasync::Executor::new_with_fake_time().expect("executor should build");
let (peers, proxy, mut prof_stream) = setup_connected_peers();
// Initialize context to a fixed point in time.
exec.set_fake_time(fasync::Time::from_nanos(1000000000));
let peer_id = PeerId(1);
// Simulate getting the service found event.
let attributes = vec![bredr::Attribute {
id: bredr::ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST,
element: bredr::DataElement::Sequence(vec![Some(Box::new(
bredr::DataElement::Sequence(vec![
Some(Box::new(
Uuid::from(bredr::ServiceClassProfileIdentifier::AudioSource).into(),
)),
Some(Box::new(bredr::DataElement::Uint16(0x0103))), // Version 1.3
]),
))]),
}];
handle_services_found(
&peer_id,
&attributes,
peers.clone(),
proxy.clone(),
bredr::ChannelMode::Basic,
);
run_to_stalled(&mut exec);
// At this point, a remote peer was found, but hasn't connected yet. There
// should be no entry for it.
assert!(!peers.lock().is_connected(&peer_id));
// Fast forward time by 5 seconds. In this time, the remote peer has not
// connected.
exec.set_fake_time(fasync::Time::from_nanos(6000000000));
exec.wake_expired_timers();
run_to_stalled(&mut exec);
// After fast forwarding time, expect and handle the `connect` request
// because A2DP-sink should be initiating.
let (_test, transport) = Channel::create();
let request = exec.run_until_stalled(&mut prof_stream.next());
match request {
Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, responder, .. }))) => {
assert_eq!(PeerId(1), peer_id.into());
let channel = transport.try_into().unwrap();
responder.send(&mut Ok(channel)).expect("responder sends");
}
x => panic!("Should have sent a connect request, but got {:?}", x),
};
run_to_stalled(&mut exec);
// The remote peer did not connect to us, A2DP Sink should initiate a connection
// and insert into `peers`.
assert!(peers.lock().is_connected(&peer_id));
}
#[test]
/// Tests that A2DP sink does not assume the initiator role when a peer connects
/// before `INITIATOR_DELAY` timeout completes.
fn wait_to_initiate_returns_early_with_connected_peer() {
let mut exec = fasync::Executor::new_with_fake_time().expect("executor should build");
let (peers, proxy, mut prof_stream) = setup_connected_peers();
// Initialize context to a fixed point in time.
exec.set_fake_time(fasync::Time::from_nanos(1000000000));
let peer_id = PeerId(1);
// Simulate getting the service found event.
let attributes = vec![bredr::Attribute {
id: bredr::ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST,
element: bredr::DataElement::Sequence(vec![Some(Box::new(
bredr::DataElement::Sequence(vec![
Some(Box::new(
Uuid::from(bredr::ServiceClassProfileIdentifier::AudioSource).into(),
)),
Some(Box::new(bredr::DataElement::Uint16(0x0103))), // Version 1.3
]),
))]),
}];
handle_services_found(
&peer_id,
&attributes,
peers.clone(),
proxy.clone(),
bredr::ChannelMode::Basic,
);
// At this point, a remote peer was found, but hasn't connected yet. There
// should be no entry for it.
assert!(!peers.lock().is_connected(&peer_id));
// Fast forward time by .5 seconds. The threshold is 1 second, so the timer
// to initiate connections has not been triggered.
exec.set_fake_time(fasync::Time::from_nanos(1500000000));
exec.wake_expired_timers();
run_to_stalled(&mut exec);
// A peer connects before the timeout.
let (_remote, transport) = Channel::create();
let _ = peers.lock().connected(peer_id.clone(), transport, /* initiator= */ false);
run_to_stalled(&mut exec);
// The remote peer connected to us, and should be in the map.
assert!(peers.lock().is_connected(&peer_id));
// Fast forward time by 4.5 seconds. Ensure no outbound connection is initiated
// by us, since the remote peer has assumed the INT role.
exec.set_fake_time(fasync::Time::from_nanos(6000000000));
exec.wake_expired_timers();
run_to_stalled(&mut exec);
let request = exec.run_until_stalled(&mut prof_stream.next());
match request {
Poll::Ready(x) => panic!("There should be no l2cap connection requests: {:?}", x),
Poll::Pending => {}
};
run_to_stalled(&mut exec);
}
#[cfg(not(feature = "test_encoding"))]
#[test]
fn test_encoding_fails_in_test_environment() {
let mut exec = fasync::Executor::new().expect("executor should build");
let result = exec.run_singlethreaded(test_encode_sbc());
assert!(result.is_err());
}
#[test]
fn test_channel_mode_from_arg() {
let channel_string = None;
assert_matches!(channel_mode_from_arg(channel_string), Ok(bredr::ChannelMode::Basic));
let channel_string = Some("basic".to_string());
assert_matches!(channel_mode_from_arg(channel_string), Ok(bredr::ChannelMode::Basic));
let channel_string = Some("ertm".to_string());
assert_matches!(
channel_mode_from_arg(channel_string),
Ok(bredr::ChannelMode::EnhancedRetransmission)
);
let channel_string = Some("foobar123".to_string());
assert!(channel_mode_from_arg(channel_string).is_err());
}
#[test]
fn test_audio_mode_connection() {
let mut exec = fasync::Executor::new().expect("executor should build");
let (peers, _profile_proxy, _profile_stream) = setup_connected_peers();
let (proxy, stream) = create_proxy_and_stream::<a2dp::AudioModeMarker>()
.expect("AudioMode proxy should be created");
handle_audio_mode_connection(peers.clone(), stream);
exec.run_singlethreaded(proxy.set_role(a2dp::Role::Sink)).expect("set role response");
assert_eq!(avdtp::EndpointType::Source, peers.lock().preferred_direction());
exec.run_singlethreaded(proxy.set_role(a2dp::Role::Source)).expect("set role response");
assert_eq!(avdtp::EndpointType::Sink, peers.lock().preferred_direction());
}
}