blob: d1aeb0244e1d85de23f78ec03061ec11c9bacccf [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{format_err, Error},
bt_rfcomm::{profile::build_rfcomm_protocol, ServerChannel},
fidl_fuchsia_bluetooth::ErrorCode,
fidl_fuchsia_bluetooth_bredr as bredr, fuchsia_async as fasync,
fuchsia_bluetooth::{
detachable_map::DetachableMap,
types::{Channel, PeerId},
},
fuchsia_inspect as inspect,
fuchsia_inspect_derive::{AttachError, Inspect},
futures::{lock::Mutex, FutureExt},
log::{info, trace},
std::{
collections::{HashMap, HashSet},
convert::TryFrom,
sync::Arc,
},
};
use crate::rfcomm::{session::Session, types::SignaledTask};
/// Manages the current clients of the RFCOMM server. Provides an API for
/// registering, unregistering, and relaying RFCOMM channels to clients.
pub struct Clients {
/// The currently registered clients. Each registered client is identified
/// by a unique ServerChannel.
channel_receivers: Mutex<HashMap<ServerChannel, bredr::ConnectionReceiverProxy>>,
}
impl Clients {
pub fn new() -> Self {
Self { channel_receivers: Mutex::new(HashMap::new()) }
}
/// Returns the number of available spaces for clients that can be registered.
async fn available_space(&self) -> usize {
let server_channels = self.channel_receivers.lock().await;
ServerChannel::all().filter(|sc| !server_channels.contains_key(&sc)).count()
}
/// Removes the client that has registered `server_channel`.
async fn remove(&self, server_channel: &ServerChannel) {
self.channel_receivers.lock().await.remove(server_channel);
}
/// Clears all the registered clients.
async fn clear(&self) {
self.channel_receivers.lock().await.clear();
}
/// Reserves the next available ServerChannel for a client represented by a `proxy`.
///
/// If allocated, returns the ServerChannel assigned to the client, None otherwise.
pub async fn new_client(&self, proxy: bredr::ConnectionReceiverProxy) -> Option<ServerChannel> {
let mut server_channels = self.channel_receivers.lock().await;
let new_channel = ServerChannel::all().find(|sc| !server_channels.contains_key(&sc));
new_channel.map(|channel| {
trace!("Allocating {:?}", channel);
server_channels.insert(channel, proxy);
channel
})
}
/// Delivers the `channel` to the client that has registered the `server_channel`.
/// Returns an error if delivery fails or if there is no such client.
pub async fn deliver_channel(
&self,
peer_id: PeerId,
server_channel: ServerChannel,
channel: Channel,
) -> Result<(), Error> {
let clients = self.channel_receivers.lock().await;
let client = clients
.get(&server_channel)
.ok_or(format_err!("ServerChannel {:?} not registered", server_channel))?;
// Build the RFCOMM protocol descriptor and relay the channel.
let mut protocol: Vec<bredr::ProtocolDescriptor> =
build_rfcomm_protocol(server_channel).iter().map(Into::into).collect();
client
.connected(
&mut peer_id.into(),
bredr::Channel::try_from(channel).unwrap(),
&mut protocol.iter_mut(),
)
.map_err(|e| format_err!("{:?}", e))
}
}
/// The RfcommServer handles connection requests from profiles clients and remote peers.
pub struct RfcommServer {
/// The currently registered profile clients of the RFCOMM server.
clients: Arc<Clients>,
/// Active sessions between us and a remote peer. Each Session will multiplex
/// RFCOMM connections over a single L2CAP channel.
/// There can only be one session per remote peer. See RFCOMM Section 5.2.
sessions: DetachableMap<PeerId, Session>,
/// Inspect node for Sessions to attach to.
inspect: inspect::Node,
}
impl Inspect for &mut RfcommServer {
fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
self.inspect = parent.create_child(name);
Ok(())
}
}
impl RfcommServer {
pub fn new() -> Self {
Self {
clients: Arc::new(Clients::new()),
sessions: DetachableMap::new(),
inspect: inspect::Node::default(),
}
}
/// Returns true if a session identified by `id` exists and is currently
/// active.
/// An RFCOMM Session is active if there is a currently running processing task.
pub fn is_active_session(&mut self, id: &PeerId) -> bool {
self.sessions.get(id).map_or(false, |session| session.upgrade().is_some())
}
/// Returns the number of available server channels in this server.
pub async fn available_server_channels(&self) -> usize {
self.clients.available_space().await
}
/// De-allocates the server `channels` provided.
pub async fn free_server_channels(&mut self, channels: &HashSet<ServerChannel>) {
for sc in channels {
self.clients.remove(sc).await;
}
}
/// De-allocates all the server channels in this server.
pub async fn free_all_server_channels(&mut self) {
self.clients.clear().await;
}
/// Reserves the next available ServerChannel for a client's `proxy`.
///
/// Returns the allocated ServerChannel.
pub async fn allocate_server_channel(
&mut self,
proxy: bredr::ConnectionReceiverProxy,
) -> Option<ServerChannel> {
self.clients.new_client(proxy).await
}
/// Opens an RFCOMM channel specified by `server_channel` with the remote peer.
///
/// Returns an error if there is no session established with the peer.
pub async fn open_rfcomm_channel(
&mut self,
id: PeerId,
server_channel: ServerChannel,
responder: bredr::ProfileConnectResponder,
) -> Result<(), Error> {
trace!("Received request to open RFCOMM channel {:?} with peer {:?}", server_channel, id);
match self.sessions.get(&id).and_then(|s| s.upgrade()) {
None => {
// Peer either disconnected or doesn't exist.
let _ = responder.send(&mut Err(ErrorCode::Failed));
Err(format_err!("Invalid peer ID {:?}", id))
}
Some(session) => {
let channel_opened_callback =
Box::new(move |channel: Result<Channel, ErrorCode>| {
let mut channel = channel.map(|c| bredr::Channel::try_from(c).unwrap());
responder.send(&mut channel).map_err(|e| format_err!("{:?}", e))
});
session.open_rfcomm_channel(server_channel, channel_opened_callback).await;
Ok(())
}
}
}
/// Handles an incoming L2CAP connection from the remote peer.
///
/// If there is already an active session established with this peer, returns an Error
/// as there can only be one active session per peer.
/// Otherwise, creates and stores a new session over the provided `l2cap` channel.
pub fn new_l2cap_connection(&mut self, id: PeerId, l2cap: Channel) -> Result<(), Error> {
if self.is_active_session(&id) {
return Err(format_err!("RFCOMM Session already exists with peer {:?}", id));
}
info!("Received new l2cap connection from peer {:?}", id);
// Create a new RFCOMM Session with the provided `channel_opened_callback` which will be
// called anytime an RFCOMM channel is created. Opened RFCOMM channels will be delivered
// to the `clients` of the `RfcommServer`.
let clients = self.clients.clone();
let channel_opened_callback = Box::new(move |server_channel, channel| {
let peer_id = id;
let clients = clients.clone();
async move { clients.deliver_channel(peer_id, server_channel, channel).await }.boxed()
});
let mut session = Session::create(id, l2cap, channel_opened_callback);
let _ = session.iattach(&self.inspect, inspect::unique_name("peer_"));
let closed_fut = session.finished();
self.sessions.insert(id, session);
// Task eagerly removes the Session from the set of active sessions upon termination.
let detached_session = self.sessions.get(&id).expect("just inserted");
fasync::Task::spawn(async move {
let _ = closed_fut.await;
detached_session.detach();
})
.detach();
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use {
bt_rfcomm::{frame::mux_commands::*, frame::*, Role, DLCI},
fidl::{
encoding::Decodable,
endpoints::{create_proxy, create_proxy_and_stream},
},
fidl_fuchsia_bluetooth_bredr::ConnectionReceiverMarker,
fuchsia_async as fasync,
fuchsia_bluetooth::types::Channel,
futures::{pin_mut, task::Poll, AsyncWriteExt, StreamExt},
matches::assert_matches,
};
use crate::rfcomm::test_util::{expect_frame_received_by_peer, send_peer_frame};
fn setup_rfcomm_manager() -> (fasync::Executor, RfcommServer) {
let exec = fasync::Executor::new().unwrap();
let rfcomm = RfcommServer::new();
(exec, rfcomm)
}
#[fasync::run_singlethreaded(test)]
async fn test_allocate_server_channel() {
let mut rfcomm = RfcommServer::new();
let expected_free_channels = ServerChannel::all().count();
assert_eq!(rfcomm.available_server_channels().await, expected_free_channels);
// Allocating a server channel should be OK.
let (c, _s) = create_proxy::<ConnectionReceiverMarker>().unwrap();
let first_channel =
rfcomm.allocate_server_channel(c.clone()).await.expect("should allocate");
// Allocate the remaining n-1 channels.
let mut n = expected_free_channels - 1;
while n > 0 {
assert!(rfcomm.allocate_server_channel(c.clone()).await.is_some());
n -= 1;
}
// Allocating another should fail.
assert_eq!(rfcomm.available_server_channels().await, 0);
assert!(rfcomm.allocate_server_channel(c.clone()).await.is_none());
// De-allocating should work.
let mut single_channel = HashSet::new();
single_channel.insert(first_channel);
rfcomm.free_server_channels(&single_channel).await;
// We should be able to allocate another now that space has freed.
let (c, _s) = create_proxy::<ConnectionReceiverMarker>().unwrap();
assert!(rfcomm.allocate_server_channel(c).await.is_some());
}
#[test]
fn test_new_l2cap_connection() {
let (mut exec, mut rfcomm) = setup_rfcomm_manager();
let id = PeerId(123);
let (mut remote, channel) = Channel::create();
assert!(rfcomm.new_l2cap_connection(id, channel).is_ok());
// The Session should still be active.
assert!(rfcomm.is_active_session(&id));
// Simulate peer sending RFCOMM data to the session - should be OK.
let buf = [0x00, 0x00, 0x00];
let mut write_fut = remote.write(&buf[..]);
match exec.run_until_stalled(&mut write_fut) {
Poll::Ready(Ok(x)) => {
assert_eq!(x, 3);
}
x => panic!("Expected write ready but got {:?}", x),
}
// Remote peer disconnects - drive the background processing task to detect disconnection.
drop(remote);
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
// The session should be inactive now.
assert!(!rfcomm.is_active_session(&id));
// Checking again is OK.
assert!(!rfcomm.is_active_session(&id));
}
#[test]
fn test_new_rfcomm_channel_is_relayed_to_client() {
let (mut exec, mut rfcomm) = setup_rfcomm_manager();
// Profile-client reserves a server channel.
let (c, mut s) = create_proxy_and_stream::<ConnectionReceiverMarker>().unwrap();
let first_channel = {
let fut = rfcomm.allocate_server_channel(c.clone());
pin_mut!(fut);
match exec.run_until_stalled(&mut fut) {
Poll::Ready(Some(sc)) => sc,
x => panic!("Expected server channel but got {:?}", x),
}
};
let profile_client_fut = s.next();
pin_mut!(profile_client_fut);
assert!(exec.run_until_stalled(&mut profile_client_fut).is_pending());
// Start up a session with remote peer.
let id = PeerId(1);
let (local, mut remote) = Channel::create();
assert!(rfcomm.new_l2cap_connection(id, local).is_ok());
assert!(rfcomm.is_active_session(&id));
// Remote peer requests to start up session multiplexer.
let sabm = Frame::make_sabm_command(Role::Unassigned, DLCI::MUX_CONTROL_DLCI);
send_peer_frame(remote.as_ref(), sabm);
// Expect to send a positive response to the peer.
expect_frame_received_by_peer(&mut exec, &mut remote);
// Remote peer requests to open an RFCOMM channel.
let user_dlci = first_channel.to_dlci(Role::Responder).unwrap();
let user_sabm = Frame::make_sabm_command(Role::Initiator, user_dlci);
send_peer_frame(remote.as_ref(), user_sabm);
// Expect to send a positive response to the peer.
expect_frame_received_by_peer(&mut exec, &mut remote);
// The Session should open a new RFCOMM channel for the provided `user_dlci`, and
// the Channel should be relayed to the profile client.
match exec.run_until_stalled(&mut profile_client_fut) {
Poll::Ready(Some(Ok(bredr::ConnectionReceiverRequest::Connected { .. }))) => {}
x => panic!("Expected connection but got {:?}", x),
}
}
#[fasync::run_singlethreaded(test)]
async fn test_register_and_deliver_inbound_channel_to_clients() {
let clients = Clients::new();
// Initial capacity is the range of all valid Server Channels (1..30).
let mut expected_space = 30;
assert_eq!(clients.available_space().await, expected_space);
// Attempting to deliver an inbound channel for an unregistered ServerChannel should be
// an error.
let random_server_channel = ServerChannel::try_from(10).unwrap();
let (local, _remote) = Channel::create();
assert!(clients.deliver_channel(PeerId(1), random_server_channel, local).await.is_err());
// Registering a new client should be OK.
let (c, s) = create_proxy_and_stream::<bredr::ConnectionReceiverMarker>().unwrap();
let server_channel = clients.new_client(c).await.unwrap();
expected_space -= 1;
assert_eq!(clients.available_space().await, expected_space);
// Delivering channel to registered client should be OK.
let (local, _remote) = Channel::create();
assert!(clients.deliver_channel(PeerId(1), server_channel, local).await.is_ok());
// Client disconnects - delivering a new channel should fail.
drop(s);
let (local, _remote) = Channel::create();
assert!(clients.deliver_channel(PeerId(1), server_channel, local).await.is_err());
}
/// Makes a client Profile::Connect() request and returns the responder for the request
/// and a Future associated with the request.
#[track_caller]
fn make_client_connect_request(
exec: &mut fasync::Executor,
id: PeerId,
) -> (
bredr::ProfileConnectResponder,
fidl::client::QueryResponseFut<Result<bredr::Channel, ErrorCode>>,
) {
let (profile, mut profile_server) =
create_proxy_and_stream::<bredr::ProfileMarker>().unwrap();
let mut profile_stream = Box::pin(profile_server.next());
let connect_request =
profile.connect(&mut id.into(), &mut bredr::ConnectParameters::new_empty());
let responder = match exec.run_until_stalled(&mut profile_stream) {
Poll::Ready(Some(Ok(bredr::ProfileRequest::Connect { responder, .. }))) => responder,
x => panic!("Expected ready connect request but got: {:?}", x),
};
(responder, connect_request)
}
#[test]
fn test_request_outbound_connection_succeeds() {
let (mut exec, mut rfcomm) = setup_rfcomm_manager();
// Start up a session with remote peer.
let id = PeerId(1);
let (local, mut remote) = Channel::create();
assert!(rfcomm.new_l2cap_connection(id, local).is_ok());
// Simulate a client connect request.
let (responder, connect_request_fut) = make_client_connect_request(&mut exec, id);
pin_mut!(connect_request_fut);
assert!(exec.run_until_stalled(&mut connect_request_fut).is_pending());
// We expect the open channel request to be OK - still awaiting the channel.
let server_channel = ServerChannel::try_from(9).unwrap();
let expected_dlci = server_channel.to_dlci(Role::Responder).unwrap();
let mut outbound_fut = Box::pin(rfcomm.open_rfcomm_channel(id, server_channel, responder));
assert_matches!(exec.run_until_stalled(&mut outbound_fut), Poll::Ready(Ok(_)));
assert!(exec.run_until_stalled(&mut connect_request_fut).is_pending());
// Expect to send a frame to the peer - SABM for mux startup.
expect_frame_received_by_peer(&mut exec, &mut remote);
// Simulate peer responding positively.
let ua = Frame::make_ua_response(Role::Unassigned, DLCI::MUX_CONTROL_DLCI);
send_peer_frame(remote.as_ref(), ua);
// Expect to send a frame to peer - parameter negotiation.
expect_frame_received_by_peer(&mut exec, &mut remote);
// Simulate peer responding positively.
let data = MuxCommand {
params: MuxCommandParams::ParameterNegotiation(ParameterNegotiationParams {
dlci: expected_dlci,
credit_based_flow_handshake: CreditBasedFlowHandshake::SupportedResponse,
priority: 12,
max_frame_size: 100,
initial_credits: 1,
}),
command_response: CommandResponse::Response,
};
let pn_response = Frame::make_mux_command(Role::Responder, data);
send_peer_frame(remote.as_ref(), pn_response);
// Expect to send a frame to peer - SABM for channel opening.
expect_frame_received_by_peer(&mut exec, &mut remote);
// Simulate peer responding positively.
let ua = Frame::make_ua_response(Role::Responder, expected_dlci);
send_peer_frame(remote.as_ref(), ua);
// The channel should be established and relayed to the client that requested it.
assert_matches!(exec.run_until_stalled(&mut connect_request_fut), Poll::Ready(Ok(Ok(_))));
}
#[test]
fn test_request_outbound_connection_invalid_peer() {
let (mut exec, mut rfcomm) = setup_rfcomm_manager();
// Simulate a client connect request.
let random_id = PeerId(41);
let (responder, connect_request_fut) = make_client_connect_request(&mut exec, random_id);
pin_mut!(connect_request_fut);
assert!(exec.run_until_stalled(&mut connect_request_fut).is_pending());
// We expect the open channel request to fail.
let server_channel = ServerChannel::try_from(8).unwrap();
let mut outbound_fut =
Box::pin(rfcomm.open_rfcomm_channel(random_id, server_channel, responder));
assert_matches!(exec.run_until_stalled(&mut outbound_fut), Poll::Ready(Err(_)));
// Responder should be notified of failure.
assert_matches!(exec.run_until_stalled(&mut connect_request_fut), Poll::Ready(Ok(Err(_))));
}
}