blob: 5250fb9afa1b7a2eee0d8d529af4dceea3a6344b [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
bt_a2dp_sink_metrics as metrics, bt_avdtp as avdtp,
fidl_fuchsia_bluetooth_bredr::ProfileDescriptor,
fuchsia_async as fasync,
fuchsia_bluetooth::{detachable_map::DetachableMap, types::PeerId},
fuchsia_cobalt::CobaltSender,
fuchsia_inspect as inspect,
fuchsia_syslog::{fx_log_info, fx_log_warn},
fuchsia_zircon as zx,
parking_lot::RwLock,
std::{
collections::hash_map::Entry,
collections::{HashMap, HashSet},
sync::Arc,
},
};
use crate::{avrcp_relay::AvrcpRelay, peer, Streams};
fn codectype_to_availability_metric(
codec_type: avdtp::MediaCodecType,
) -> metrics::A2dpCodecAvailabilityMetricDimensionCodec {
match codec_type {
avdtp::MediaCodecType::AUDIO_SBC => metrics::A2dpCodecAvailabilityMetricDimensionCodec::Sbc,
avdtp::MediaCodecType::AUDIO_MPEG12 => {
metrics::A2dpCodecAvailabilityMetricDimensionCodec::Mpeg12
}
avdtp::MediaCodecType::AUDIO_AAC => metrics::A2dpCodecAvailabilityMetricDimensionCodec::Aac,
avdtp::MediaCodecType::AUDIO_ATRAC => {
metrics::A2dpCodecAvailabilityMetricDimensionCodec::Atrac
}
avdtp::MediaCodecType::AUDIO_NON_A2DP => {
metrics::A2dpCodecAvailabilityMetricDimensionCodec::VendorSpecific
}
_ => metrics::A2dpCodecAvailabilityMetricDimensionCodec::Unknown,
}
}
fn spawn_stream_discovery(peer: &peer::Peer) {
let collect_fut = peer.collect_capabilities();
let remote_capabilities_inspect = peer.remote_capabilities_inspect();
let mut cobalt = peer.cobalt_logger();
let discover_fut = async move {
// Store deduplicated set of codec event codes for logging.
let mut codec_event_codes = HashSet::new();
let streams = match collect_fut.await {
Ok(streams) => streams,
Err(e) => {
fx_log_info!("Collecting capabilities failed: {:?}", e);
return;
}
};
for stream in streams {
let capabilities = stream.capabilities();
remote_capabilities_inspect.append(stream.local_id(), &capabilities).await;
for cap in capabilities {
if let avdtp::ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type,
..
} = cap
{
codec_event_codes
.insert(codectype_to_availability_metric(codec_type.clone()) as u32);
}
}
}
for event_code in codec_event_codes {
cobalt.log_event(metrics::A2DP_CODEC_AVAILABILITY_METRIC_ID, event_code);
}
};
fasync::spawn(discover_fut);
}
/// ConnectedPeers owns the set of connected peers and manages peers based on
/// discovery, connections and disconnections.
pub struct ConnectedPeers {
/// The set of connected peers.
connected: DetachableMap<PeerId, RwLock<peer::Peer>>,
/// ProfileDescriptors from discovering the peer.
descriptors: HashMap<PeerId, Option<ProfileDescriptor>>,
/// The set of streams that are made available to peers.
streams: Streams,
/// Cobalt logger to use and hand out to peers
cobalt_sender: CobaltSender,
}
impl ConnectedPeers {
pub(crate) fn new(streams: Streams, cobalt_sender: CobaltSender) -> Self {
Self {
connected: DetachableMap::new(),
descriptors: HashMap::new(),
streams,
cobalt_sender,
}
}
pub(crate) fn get(&self, id: &PeerId) -> Option<Arc<RwLock<peer::Peer>>> {
self.connected.get(id).and_then(|p| p.upgrade())
}
pub fn found(&mut self, id: PeerId, desc: ProfileDescriptor) {
if self.descriptors.insert(id, Some(desc)).is_some() {
// We have maybe connected to this peer before, and we just need to
// discover the streams.
if let Some(peer) = self.get(&id) {
peer.write().set_descriptor(desc.clone());
spawn_stream_discovery(&peer.read());
}
}
}
pub fn connected(&mut self, inspect: &inspect::Inspector, id: PeerId, channel: zx::Socket) {
match self.get(&id) {
Some(peer) => {
if let Err(e) = peer.write().receive_channel(channel) {
fx_log_warn!("{} failed to connect channel: {}", id, e);
}
}
None => {
fx_log_info!("Adding new peer for {}", id);
let avdtp_peer = match avdtp::Peer::new(channel) {
Ok(peer) => peer,
Err(e) => {
fx_log_warn!("Error adding signaling peer {}: {:?}", id, e);
return;
}
};
let inspect = inspect.root().create_child(format!("peer {}", id));
let mut peer = peer::Peer::create(
id,
avdtp_peer,
self.streams.clone(),
inspect,
self.cobalt_sender.clone(),
);
// Start remote discovery if profile information exists for the device_id
match self.descriptors.entry(id) {
Entry::Occupied(entry) => {
if let Some(prof) = entry.get() {
peer.set_descriptor(prof.clone());
spawn_stream_discovery(&peer);
}
}
// Otherwise just insert the device ID with no profile
// Run discovery when profile is updated
Entry::Vacant(entry) => {
entry.insert(None);
}
}
let closed_fut = peer.closed();
self.connected.insert(id, RwLock::new(peer));
let avrcp_relay = AvrcpRelay::start(id).ok();
// Remove the peer when we disconnect.
let detached_peer = self.connected.get(&id).expect("just added");
fasync::spawn(async move {
closed_fut.await;
detached_peer.detach();
// Captures the relay to extend the lifetime until after the peer clooses.
drop(avrcp_relay);
});
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bt_avdtp::Request;
use fidl_fuchsia_bluetooth_bredr::ServiceClassProfileIdentifier;
use fidl_fuchsia_cobalt::CobaltEvent;
use futures::channel::mpsc;
use futures::{self, task::Poll, StreamExt};
use std::convert::TryFrom;
fn fake_cobalt_sender() -> (CobaltSender, mpsc::Receiver<CobaltEvent>) {
const BUFFER_SIZE: usize = 100;
let (sender, receiver) = mpsc::channel(BUFFER_SIZE);
(CobaltSender::new(sender), receiver)
}
fn run_to_stalled(exec: &mut fasync::Executor) {
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
}
fn exercise_avdtp(exec: &mut fasync::Executor, remote: zx::Socket, peer: &peer::Peer) {
let remote_avdtp = avdtp::Peer::new(remote).expect("remote control should be creatable");
let mut remote_requests = remote_avdtp.take_request_stream();
// Should be able to actually communicate via the peer.
let avdtp = peer.avdtp_peer();
let discover_fut = avdtp.discover();
futures::pin_mut!(discover_fut);
assert!(exec.run_until_stalled(&mut discover_fut).is_pending());
let responder = match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(Request::Discover { responder }))) => responder,
x => panic!("Expected a Ready Discovery request but got {:?}", x),
};
let endpoint_id = avdtp::StreamEndpointId::try_from(1).expect("endpointid creation");
let information = avdtp::StreamInformation::new(
endpoint_id,
false,
avdtp::MediaType::Audio,
avdtp::EndpointType::Source,
);
responder.send(&[information]).expect("Sending response should have worked");
let _stream_infos = match exec.run_until_stalled(&mut discover_fut) {
Poll::Ready(Ok(infos)) => infos,
x => panic!("Expected a Ready response but got {:?}", x),
};
}
#[test]
fn connected_peers_connect_creates_peer() {
let mut exec = fasync::Executor::new().expect("executor should build");
let id = PeerId(1);
let (cobalt_sender, _) = fake_cobalt_sender();
let mut peers = ConnectedPeers::new(Streams::new(), cobalt_sender);
let inspect = inspect::Inspector::new();
let (remote, signaling) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
peers.connected(&inspect, id, signaling);
let peer = match peers.get(&id) {
None => panic!("Peer should be in ConnectedPeers after connection"),
Some(peer) => peer,
};
exercise_avdtp(&mut exec, remote, &peer.read());
}
fn expect_started_discovery(exec: &mut fasync::Executor, remote: zx::Socket) {
let remote_avdtp = avdtp::Peer::new(remote).expect("remote control should be creatable");
let mut remote_requests = remote_avdtp.take_request_stream();
// Start of discovery is by discovering the peer streaminformations.
let _ = match exec.run_until_stalled(&mut remote_requests.next()) {
Poll::Ready(Some(Ok(Request::Discover { responder }))) => responder,
x => panic!("Expected to get a discovery request but got {:?}", x),
};
}
#[test]
fn connected_peers_found_connected_peer_starts_discovery() {
let mut exec = fasync::Executor::new().expect("executor should build");
let id = PeerId(1);
let (cobalt_sender, _) = fake_cobalt_sender();
let mut peers = ConnectedPeers::new(Streams::new(), cobalt_sender);
let inspect = inspect::Inspector::new();
let (remote, signaling) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
peers.connected(&inspect, id, signaling);
let profile_desc = ProfileDescriptor {
profile_id: ServiceClassProfileIdentifier::AdvancedAudioDistribution,
major_version: 1,
minor_version: 3,
};
peers.found(id, profile_desc);
expect_started_discovery(&mut exec, remote);
}
#[test]
fn connected_peers_connected_found_peer_starts_discovery() {
let mut exec = fasync::Executor::new().expect("executor should build");
let id = PeerId(1);
let (cobalt_sender, _) = fake_cobalt_sender();
let mut peers = ConnectedPeers::new(Streams::new(), cobalt_sender);
let inspect = inspect::Inspector::new();
let (remote, signaling) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
let profile_desc = ProfileDescriptor {
profile_id: ServiceClassProfileIdentifier::AdvancedAudioDistribution,
major_version: 1,
minor_version: 3,
};
peers.found(id, profile_desc);
peers.connected(&inspect, id, signaling);
expect_started_discovery(&mut exec, remote);
}
#[test]
fn connected_peers_peer_disconnect_removes_peer() {
let mut exec = fasync::Executor::new().expect("executor should build");
let id = PeerId(1);
let (cobalt_sender, _) = fake_cobalt_sender();
let mut peers = ConnectedPeers::new(Streams::new(), cobalt_sender);
let inspect = inspect::Inspector::new();
let (remote, signaling) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
peers.connected(&inspect, id, signaling);
run_to_stalled(&mut exec);
// Disconnect the signaling channel, peer should be gone.
drop(remote);
run_to_stalled(&mut exec);
assert!(peers.get(&id).is_none());
}
#[test]
fn connected_peers_reconnect_works() {
let mut exec = fasync::Executor::new().expect("executor should build");
let id = PeerId(1);
let (cobalt_sender, _) = fake_cobalt_sender();
let mut peers = ConnectedPeers::new(Streams::new(), cobalt_sender);
let inspect = inspect::Inspector::new();
let (remote, signaling) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
peers.connected(&inspect, id, signaling);
run_to_stalled(&mut exec);
// Disconnect the signaling channel, peer should be gone.
drop(remote);
run_to_stalled(&mut exec);
assert!(peers.get(&id).is_none());
// Connect another peer with the same ID
let (_remote, signaling) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
peers.connected(&inspect, id, signaling);
run_to_stalled(&mut exec);
// Should be connected.
assert!(peers.get(&id).is_some());
}
}