blob: 04fcb37957c1e2475dd119f5a4ddfeec8501cbb3 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{format_err, Error};
use bt_rfcomm::{profile::build_rfcomm_protocol, ServerChannel};
use fidl::prelude::*;
use fidl_fuchsia_bluetooth::ErrorCode;
use fidl_fuchsia_bluetooth_bredr as bredr;
use fidl_fuchsia_bluetooth_rfcomm_test::RfcommTestRequest;
use fuchsia_async as fasync;
use fuchsia_bluetooth::detachable_map::DetachableMap;
use fuchsia_bluetooth::types::{Channel, PeerId};
use fuchsia_inspect as inspect;
use fuchsia_inspect_derive::{AttachError, Inspect};
use futures::{lock::Mutex, FutureExt};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tracing::{debug, info, trace, warn};
use crate::rfcomm::session::Session;
use crate::rfcomm::types::{status_to_rls_error, SignaledTask};
/// An RFCOMM client that is registered with the RFCOMM server.
struct RegisteredClient {
/// The proxy used to relay RFCOMM channels to the client.
connection_receiver: bredr::ConnectionReceiverProxy,
/// The channel number associated with the registration.
_channel_number: inspect::UintProperty,
}
/// Manages the current clients of the RFCOMM server. Provides an API for registering,
/// unregistering, and relaying RFCOMM channels to clients.
#[derive(Clone, Inspect)]
pub struct Clients {
#[inspect(forward)]
inner: Arc<Mutex<ClientsInner>>,
}
#[derive(Inspect)]
pub struct ClientsInner {
/// The currently registered clients. Each registered client is identified by a unique
/// ServerChannel and can be inspected.
#[inspect(skip)]
channel_receivers: HashMap<ServerChannel, RegisteredClient>,
/// The inspect node for the set of clients.
inspect_node: inspect::Node,
}
impl Clients {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(ClientsInner {
channel_receivers: HashMap::new(),
inspect_node: inspect::Node::default(),
})),
}
}
/// Returns the number of available spaces for clients that can be registered.
async fn available_space(&self) -> usize {
let inner = self.inner.lock().await;
ServerChannel::all().filter(|sc| !inner.channel_receivers.contains_key(&sc)).count()
}
/// Removes the client that has registered `server_channel`.
async fn remove(&self, server_channel: &ServerChannel) {
let _ = self.inner.lock().await.channel_receivers.remove(server_channel);
}
/// Clears all the registered clients.
async fn clear(&self) {
self.inner.lock().await.channel_receivers.clear();
}
/// Reserves the next available ServerChannel for a client represented by a `proxy`.
///
/// If allocated, returns the ServerChannel assigned to the client, None otherwise.
pub async fn new_client(&self, proxy: bredr::ConnectionReceiverProxy) -> Option<ServerChannel> {
let mut inner = self.inner.lock().await;
let new_channel =
ServerChannel::all().find(|sc| !inner.channel_receivers.contains_key(&sc));
new_channel.map(|channel| {
trace!(server_channel = %channel, "Reserving RFCOMM channel");
let tagged_client = RegisteredClient {
connection_receiver: proxy,
_channel_number: inner
.inspect_node
.create_uint(inspect::unique_name("channel_number"), u8::from(channel) as u64),
};
let _ = inner.channel_receivers.insert(channel, tagged_client);
channel
})
}
/// Delivers the `channel` to the client that has registered the `server_channel`.
/// Returns an error if delivery fails or if there is no such client.
pub async fn deliver_channel(
&self,
peer_id: PeerId,
server_channel: ServerChannel,
channel: Channel,
) -> Result<(), Error> {
trace!(%peer_id, %server_channel, "Delivering RFCOMM channel to client");
let inner = self.inner.lock().await;
let client = inner
.channel_receivers
.get(&server_channel)
.ok_or(format_err!("{server_channel:?} not registered"))?;
// Build the RFCOMM protocol descriptor and relay the channel.
let protocol: Vec<bredr::ProtocolDescriptor> =
build_rfcomm_protocol(server_channel).iter().map(Into::into).collect();
if client.connection_receiver.is_closed() {
return Err(format_err!("connection receiver peer closed"));
}
client
.connection_receiver
.connected(&peer_id.into(), bredr::Channel::try_from(channel).unwrap(), &protocol)
.map_err(|e| format_err!("{e:?}"))
}
}
/// The RfcommServer handles connection requests from profiles clients and remote peers.
pub struct RfcommServer {
/// The currently registered profile clients of the RFCOMM server.
clients: Clients,
/// Active sessions between us and a remote peer. Each Session will multiplex
/// RFCOMM connections over a single L2CAP channel.
/// There can only be one session per remote peer. See RFCOMM Section 5.2.
sessions: DetachableMap<PeerId, Session>,
/// Inspect node for Sessions to attach to.
inspect: inspect::Node,
}
impl Inspect for &mut RfcommServer {
fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
self.inspect = parent.create_child(name.as_ref());
self.clients.iattach(&self.inspect, "advertised_channels")?;
Ok(())
}
}
impl RfcommServer {
pub fn new() -> Self {
Self {
clients: Clients::new(),
sessions: DetachableMap::new(),
inspect: inspect::Node::default(),
}
}
/// Returns true if a session identified by `id` exists and is currently
/// active.
/// An RFCOMM Session is active if there is a currently running processing task.
pub fn is_active_session(&mut self, id: &PeerId) -> bool {
self.sessions.get(id).is_some_and(|session| session.upgrade().is_some())
}
/// Returns the number of available server channels in this server.
pub async fn available_server_channels(&self) -> usize {
self.clients.available_space().await
}
/// De-allocates the server `channels` provided.
pub async fn free_server_channels(&mut self, channels: &HashSet<ServerChannel>) {
for sc in channels {
self.clients.remove(sc).await;
}
}
/// De-allocates all the server channels in this server.
pub async fn free_all_server_channels(&mut self) {
self.clients.clear().await;
}
/// Reserves the next available ServerChannel for a client's `proxy`.
///
/// Returns the allocated ServerChannel.
pub async fn allocate_server_channel(
&mut self,
proxy: bredr::ConnectionReceiverProxy,
) -> Option<ServerChannel> {
self.clients.new_client(proxy).await
}
/// Opens an RFCOMM channel specified by `server_channel` with the remote peer.
///
/// Returns an error if there is no session established with the peer.
pub async fn open_rfcomm_channel(
&mut self,
peer_id: PeerId,
server_channel: ServerChannel,
responder: bredr::ProfileConnectResponder,
) -> Result<(), Error> {
trace!(%peer_id, %server_channel, "Opening RFCOMM channel");
match self.sessions.get(&peer_id).and_then(|s| s.upgrade()) {
None => {
// Peer either disconnected or doesn't exist.
let _ = responder.send(Err(ErrorCode::Failed));
Err(format_err!("Invalid peer ID {peer_id}"))
}
Some(session) => {
let channel_opened_callback =
Box::new(move |channel: Result<Channel, ErrorCode>| {
let channel = channel.map(|c| bredr::Channel::try_from(c).unwrap());
responder.send(channel).map_err(|e| format_err!("{e:?}"))
});
session.open_rfcomm_channel(server_channel, channel_opened_callback).await;
Ok(())
}
}
}
/// Handles the establishment of a new L2CAP connection.
///
/// Creates and stores a new RFCOMM Session for the provided `l2cap` channel.
/// Returns Error if an active session already exists for the peer `id`.
pub fn new_l2cap_connection(&mut self, peer_id: PeerId, l2cap: Channel) -> Result<(), Error> {
if self.is_active_session(&peer_id) {
return Err(format_err!("RFCOMM Session already exists with peer {peer_id}"));
}
info!(%peer_id, "Handling new L2CAP connection for the RFCOMM PSM");
// Create a new RFCOMM Session with the provided `channel_opened_callback` which will be
// called anytime an RFCOMM channel is created. Opened RFCOMM channels will be delivered
// to the `clients` of the `RfcommServer`.
let clients = self.clients.clone();
let channel_opened_callback = Box::new(move |server_channel, channel| {
let peer_id = peer_id;
let clients = clients.clone();
async move { clients.deliver_channel(peer_id, server_channel, channel).await }.boxed()
});
let mut session = Session::create(peer_id, l2cap, channel_opened_callback);
let _ = session.iattach(&self.inspect, inspect::unique_name("peer_"));
let closed_fut = session.finished();
if self.sessions.insert(peer_id, session).is_some() {
debug!(%peer_id, "Overwriting existing RFCOMM session");
}
// Task eagerly removes the Session from the set of active sessions upon termination.
let detached_session = self.sessions.get(&peer_id).expect("just inserted");
fasync::Task::spawn(async move {
let _ = closed_fut.await;
detached_session.detach();
})
.detach();
Ok(())
}
/// Handles a `RfcommTest` FIDL request.
pub async fn handle_test_request(&mut self, request: RfcommTestRequest) {
info!("Received RFCOMM Test request: {:?}", request);
// Note: The test request is a no-op if there is no connected session with the peer.
match request {
RfcommTestRequest::Disconnect { id, .. } => {
let id = id.into();
if let Some(session) = self.sessions.get(&id).and_then(|s| s.upgrade()) {
session.close().await;
}
}
RfcommTestRequest::RemoteLineStatus { id, channel_number, status, .. } => {
let id = id.into();
let server_channel_number = match ServerChannel::try_from(channel_number) {
Ok(sc) => sc,
Err(e) => {
warn!(
"RemoteLineStatus FIDL request with invalid ServerChannel number: {e:?}"
);
return;
}
};
if let Some(session) = self.sessions.get(&id).and_then(|s| s.upgrade()) {
session
.send_remote_line_status(server_channel_number, status_to_rls_error(status))
.await;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use async_utils::PollExt;
use bt_rfcomm::{frame::mux_commands::*, frame::*, Role, DLCI};
use diagnostics_assertions::assert_data_tree;
use fidl::endpoints::{create_proxy, create_proxy_and_stream};
use fidl_fuchsia_bluetooth_bredr::ConnectionReceiverMarker;
use fuchsia_async as fasync;
use fuchsia_inspect_derive::WithInspect;
use futures::{task::Poll, AsyncWriteExt, StreamExt};
use std::pin::pin;
use crate::rfcomm::test_util::{expect_frame_received_by_peer, send_peer_frame};
fn setup_rfcomm_manager() -> (fasync::TestExecutor, RfcommServer) {
let exec = fasync::TestExecutor::new();
let rfcomm = RfcommServer::new();
(exec, rfcomm)
}
#[fasync::run_singlethreaded(test)]
async fn test_allocate_server_channel() {
let mut rfcomm = RfcommServer::new();
let expected_free_channels = ServerChannel::all().count();
assert_eq!(rfcomm.available_server_channels().await, expected_free_channels);
// Allocating a server channel should be OK.
let (c, _s) = create_proxy::<ConnectionReceiverMarker>().unwrap();
let first_channel =
rfcomm.allocate_server_channel(c.clone()).await.expect("should allocate");
// Allocate the remaining n-1 channels.
let mut n = expected_free_channels - 1;
while n > 0 {
assert!(rfcomm.allocate_server_channel(c.clone()).await.is_some());
n -= 1;
}
// Allocating another should fail.
assert_eq!(rfcomm.available_server_channels().await, 0);
assert!(rfcomm.allocate_server_channel(c.clone()).await.is_none());
// De-allocating should work.
let single_channel = vec![first_channel].into_iter().collect();
rfcomm.free_server_channels(&single_channel).await;
// We should be able to allocate another now that space has freed.
let (c, _s) = create_proxy::<ConnectionReceiverMarker>().unwrap();
assert!(rfcomm.allocate_server_channel(c).await.is_some());
}
#[fuchsia::test]
fn test_new_l2cap_connection() {
let (mut exec, mut rfcomm) = setup_rfcomm_manager();
let id = PeerId(123);
let (mut remote, channel) = Channel::create();
assert!(rfcomm.new_l2cap_connection(id, channel).is_ok());
// The Session should still be active.
assert!(rfcomm.is_active_session(&id));
// Simulate peer sending RFCOMM data to the session - should be OK.
let buf = [0x00, 0x00, 0x00];
let mut write_fut = remote.write(&buf[..]);
match exec.run_until_stalled(&mut write_fut) {
Poll::Ready(Ok(x)) => {
assert_eq!(x, 3);
}
x => panic!("Expected write ready but got {:?}", x),
}
// Remote peer disconnects - drive the background processing task to detect disconnection.
drop(remote);
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
// The session should be inactive now.
assert!(!rfcomm.is_active_session(&id));
// Checking again is OK.
assert!(!rfcomm.is_active_session(&id));
}
#[fuchsia::test]
fn test_new_rfcomm_channel_is_relayed_to_client() {
let (mut exec, mut rfcomm) = setup_rfcomm_manager();
// Profile-client reserves a server channel.
let (c, mut s) = create_proxy_and_stream::<ConnectionReceiverMarker>().unwrap();
let first_channel = {
let fut = rfcomm.allocate_server_channel(c.clone());
let mut fut = pin!(fut);
match exec.run_until_stalled(&mut fut) {
Poll::Ready(Some(sc)) => sc,
x => panic!("Expected server channel but got {:?}", x),
}
};
let profile_client_fut = s.next();
let mut profile_client_fut = pin!(profile_client_fut);
exec.run_until_stalled(&mut profile_client_fut)
.expect_pending("waiting for connection request");
// Start up a session with remote peer.
let id = PeerId(1);
let (local, mut remote) = Channel::create();
assert!(rfcomm.new_l2cap_connection(id, local).is_ok());
assert!(rfcomm.is_active_session(&id));
// Remote peer requests to start up session multiplexer.
let sabm = Frame::make_sabm_command(Role::Unassigned, DLCI::MUX_CONTROL_DLCI);
send_peer_frame(remote.as_ref(), sabm);
// Expect to send a positive response to the peer.
expect_frame_received_by_peer(&mut exec, &mut remote);
// Remote peer requests to open an RFCOMM channel.
let user_dlci = first_channel.to_dlci(Role::Responder).unwrap();
let user_sabm = Frame::make_sabm_command(Role::Initiator, user_dlci);
send_peer_frame(remote.as_ref(), user_sabm);
// Expect to send a positive response to the peer.
expect_frame_received_by_peer(&mut exec, &mut remote);
// The Session should open a new RFCOMM channel for the provided `user_dlci`, and
// the Channel should be relayed to the profile client.
let () = match exec.run_until_stalled(&mut profile_client_fut) {
Poll::Ready(Some(Ok(bredr::ConnectionReceiverRequest::Connected { .. }))) => {}
x => panic!("Expected connection but got {:?}", x),
};
}
#[fasync::run_singlethreaded(test)]
async fn test_register_and_deliver_inbound_channel_to_clients() {
let clients = Clients::new();
// Initial capacity is the range of all valid Server Channels (1..30).
let mut expected_space = 30;
assert_eq!(clients.available_space().await, expected_space);
// Attempting to deliver an inbound channel for an unregistered ServerChannel should be
// an error.
let random_server_channel = ServerChannel::try_from(10).unwrap();
let (local, _remote) = Channel::create();
assert!(clients.deliver_channel(PeerId(1), random_server_channel, local).await.is_err());
// Registering a new client should be OK.
let (c, s) = create_proxy_and_stream::<bredr::ConnectionReceiverMarker>().unwrap();
let server_channel = clients.new_client(c).await.unwrap();
expected_space -= 1;
assert_eq!(clients.available_space().await, expected_space);
// Delivering channel to registered client should be OK.
let (local, _remote) = Channel::create();
assert!(clients.deliver_channel(PeerId(1), server_channel, local).await.is_ok());
// Client disconnects - delivering a new channel should fail.
drop(s);
let (local, _remote) = Channel::create();
assert!(clients.deliver_channel(PeerId(1), server_channel, local).await.is_err());
}
#[fasync::run_singlethreaded(test)]
async fn clients_inspect_tree() {
let inspect = inspect::Inspector::default();
let clients = Clients::new()
.with_inspect(inspect.root(), "advertised_channels")
.expect("valid inspect tree");
// Default inspect tree.
assert_data_tree!(inspect, root: {
advertised_channels: {
}
});
// New client is ok.
let (c1, _s1) = create_proxy_and_stream::<bredr::ConnectionReceiverMarker>().unwrap();
let ch_number1 = clients.new_client(c1).await.expect("valid client");
let ch_number_raw1 = u8::from(ch_number1) as u64;
assert_data_tree!(inspect, root: {
advertised_channels: {
channel_number0: ch_number_raw1,
}
});
// Multiple clients is ok.
let (c2, _s2) = create_proxy_and_stream::<bredr::ConnectionReceiverMarker>().unwrap();
let ch_number2 = clients.new_client(c2).await.expect("valid client");
let ch_number_raw2 = u8::from(ch_number2) as u64;
assert_data_tree!(inspect, root: {
advertised_channels: {
channel_number0: ch_number_raw1,
channel_number1: ch_number_raw2,
}
});
// Removing a client should result in an updated inspect tree.
clients.remove(&ch_number1).await;
assert_data_tree!(inspect, root: {
advertised_channels: {
channel_number1: ch_number_raw2,
}
});
}
/// Makes a client Profile::Connect() request and returns the responder for the request
/// and a Future associated with the request.
#[track_caller]
fn make_client_connect_request(
exec: &mut fasync::TestExecutor,
id: PeerId,
) -> (
bredr::ProfileConnectResponder,
fidl::client::QueryResponseFut<Result<bredr::Channel, ErrorCode>>,
) {
let (profile, mut profile_server) =
create_proxy_and_stream::<bredr::ProfileMarker>().unwrap();
let mut profile_stream = Box::pin(profile_server.next());
let connect_request = profile.connect(
&id.into(),
&bredr::ConnectParameters::L2cap(bredr::L2capParameters::default()),
);
let responder = match exec.run_until_stalled(&mut profile_stream) {
Poll::Ready(Some(Ok(bredr::ProfileRequest::Connect { responder, .. }))) => responder,
x => panic!("Expected ready connect request but got: {:?}", x),
};
(responder, connect_request)
}
#[fuchsia::test]
fn test_request_outbound_connection_succeeds() {
let (mut exec, mut rfcomm) = setup_rfcomm_manager();
// Start up a session with remote peer.
let id = PeerId(1);
let (local, mut remote) = Channel::create();
assert!(rfcomm.new_l2cap_connection(id, local).is_ok());
// Simulate a client connect request.
let (responder, connect_request_fut) = make_client_connect_request(&mut exec, id);
let mut connect_request_fut = pin!(connect_request_fut);
exec.run_until_stalled(&mut connect_request_fut).expect_pending("waiting for channel");
// We expect the open channel request to be OK - still awaiting the channel.
let server_channel = ServerChannel::try_from(9).unwrap();
let expected_dlci = server_channel.to_dlci(Role::Responder).unwrap();
let mut outbound_fut = Box::pin(rfcomm.open_rfcomm_channel(id, server_channel, responder));
assert_matches!(exec.run_until_stalled(&mut outbound_fut), Poll::Ready(Ok(_)));
exec.run_until_stalled(&mut connect_request_fut).expect_pending("waiting for channel");
// Expect to send a frame to the peer - SABM for mux startup.
expect_frame_received_by_peer(&mut exec, &mut remote);
// Simulate peer responding positively.
let ua = Frame::make_ua_response(Role::Unassigned, DLCI::MUX_CONTROL_DLCI);
send_peer_frame(remote.as_ref(), ua);
// Expect to send a frame to peer - parameter negotiation.
expect_frame_received_by_peer(&mut exec, &mut remote);
// Simulate peer responding positively.
let data = MuxCommand {
params: MuxCommandParams::ParameterNegotiation(ParameterNegotiationParams {
dlci: expected_dlci,
credit_based_flow_handshake: CreditBasedFlowHandshake::SupportedResponse,
priority: 12,
max_frame_size: 100,
initial_credits: 1,
}),
command_response: CommandResponse::Response,
};
let pn_response = Frame::make_mux_command(Role::Responder, data);
send_peer_frame(remote.as_ref(), pn_response);
// Expect to send a frame to peer - SABM for channel opening.
expect_frame_received_by_peer(&mut exec, &mut remote);
// Simulate peer responding positively.
let ua = Frame::make_ua_response(Role::Responder, expected_dlci);
send_peer_frame(remote.as_ref(), ua);
// The channel should be established and relayed to the client that requested it.
assert_matches!(exec.run_until_stalled(&mut connect_request_fut), Poll::Ready(Ok(Ok(_))));
}
#[fuchsia::test]
fn test_request_outbound_connection_invalid_peer() {
let (mut exec, mut rfcomm) = setup_rfcomm_manager();
// Simulate a client connect request.
let random_id = PeerId(41);
let (responder, connect_request_fut) = make_client_connect_request(&mut exec, random_id);
let mut connect_request_fut = pin!(connect_request_fut);
exec.run_until_stalled(&mut connect_request_fut).expect_pending("waiting for channel");
// We expect the open channel request to fail.
let server_channel = ServerChannel::try_from(8).unwrap();
let mut outbound_fut =
Box::pin(rfcomm.open_rfcomm_channel(random_id, server_channel, responder));
assert_matches!(exec.run_until_stalled(&mut outbound_fut), Poll::Ready(Err(_)));
// Responder should be notified of failure.
assert_matches!(exec.run_until_stalled(&mut connect_request_fut), Poll::Ready(Ok(Err(_))));
}
}