blob: 2a57dee70d3c60a25515292c1a68300dadd873c0 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{format_err, Error},
async_utils::stream::{StreamItem, StreamWithEpitaph, WithEpitaph},
bt_rfcomm::ServerChannel,
fidl::endpoints::{create_request_stream, ClientEnd},
fidl_fuchsia_bluetooth::ErrorCode,
fidl_fuchsia_bluetooth_bredr as bredr, fuchsia_async as fasync,
fuchsia_bluetooth::{
profile::{psm_from_protocol, ChannelParameters, Psm, ServiceDefinition},
types::PeerId,
util::CollectExt,
},
fuchsia_inspect_derive::Inspect,
futures::{
self,
channel::mpsc,
future::BoxFuture,
select,
sink::SinkExt,
stream::{FuturesUnordered, StreamExt},
Future, FutureExt,
},
log::{error, info, trace},
std::{
collections::HashSet,
convert::{TryFrom, TryInto},
},
};
use crate::profile::*;
use crate::rfcomm::RfcommServer;
use crate::types::{AdvertiseParams, ServiceGroup, ServiceGroupHandle, Services};
/// The returned result of a Profile.Advertise request.
enum AdvertiseResult {
/// The Advertise request needed RFCOMM - the client's event stream is returned.
EventStream(StreamWithEpitaph<bredr::ConnectionReceiverEventStream, ServiceGroupHandle>),
/// The Advertise request did not need RFCOMM - the request is relayed directly
/// to the upstream server. Because Profile.Advertise returns when the advertisement finishes,
/// we return a Future that relays the result to the client.
AdvertiseRelay(BoxFuture<'static, ()>),
}
/// A connection request from the upstream server.
#[derive(Debug)]
enum ConnectionEvent {
/// A normal connection request.
Request(bredr::ConnectionReceiverRequest),
/// The upstream server canceled the advertisement, for any reason.
AdvertisementCanceled,
}
/// The tasks associated with a service advertisement.
struct AdvertiseTasks {
/// The Profile.Advertise() Future that resolves when advertisement terminates.
pub adv_task: BoxFuture<'static, ()>,
/// The relay task used to process incoming connection requests from the upstream
/// server.
pub relay_task: fasync::Task<()>,
}
/// The current advertisement status of the `ProfileRegistrar`.
enum AdvertiseStatus {
/// Currently advertising with a set of tasks processing requests.
Advertising(AdvertiseTasks),
/// We are not currently advertising.
NotAdvertising,
}
/// The ProfileRegistrar handles requests over the `fidl.fuchsia.bluetooth.bredr.Profile` service.
/// Clients can advertise, search, and connect to services.
/// The ProfileRegistrar can be thought of as a relay for clients using Profile. Clients not
/// requesting RFCOMM services will be relayed directly to the upstream server.
///
/// The ProfileRegistrar manages a single RFCOMM advertisement group. If a new client
/// requests to advertise services, the server will unregister the active advertisement, group
/// together all the services, and re-register.
/// The ProfileRegistrar also manages the `RfcommServer` - which is responsible for handling
/// connections over the RFCOMM PSM.
#[derive(Inspect)]
pub struct ProfileRegistrar {
/// An upstream provider of the Profile service. Typically provided by bt-host.
profile_upstream: bredr::ProfileProxy,
/// The `active_registration` is the current processing task for connection requests
/// from the upstream server.
active_registration: AdvertiseStatus,
/// The currently advertised services.
registered_services: Services,
/// Sender used to relay connection requests from the upstream server.
connection_sender: Option<mpsc::Sender<ConnectionEvent>>,
/// The RFCOMM server that handles allocating server channels, incoming
/// l2cap connections, outgoing l2cap connections, and multiplexing channels.
#[inspect(forward)]
rfcomm_server: RfcommServer,
}
impl ProfileRegistrar {
pub fn new(profile_upstream: bredr::ProfileProxy) -> Self {
Self {
profile_upstream,
active_registration: AdvertiseStatus::NotAdvertising,
registered_services: Services::new(),
connection_sender: None,
rfcomm_server: RfcommServer::new(),
}
}
/// Creates and returns a Task representing the running server.
pub fn start(self, receiver: mpsc::Receiver<bredr::ProfileRequestStream>) -> fasync::Task<()> {
let handler_fut = self.handle_fidl_requests(receiver);
fasync::Task::spawn(handler_fut)
}
/// Returns true if the requested `new_psms` do not overlap with the currently registered PSMs.
fn is_disjoint_psms(&self, new_psms: &HashSet<Psm>) -> bool {
self.registered_services.psms().is_disjoint(new_psms)
}
/// Unregisters all the active services advertised by this server.
/// This should be called when the upstream server drops the single service advertisement that
/// this server manages.
async fn unregister_all_services(&mut self) {
self.registered_services = Services::new();
self.rfcomm_server.free_all_server_channels().await;
}
/// Unregisters the group of services identified by `handle`. Re-registers any remaining
/// services.
/// This should be called when a profile client decides to stop advertising its services.
async fn unregister_service(&mut self, handle: ServiceGroupHandle) -> Result<(), Error> {
if !self.registered_services.contains(handle) {
return Err(format_err!("Attempt to unregister non-existent service: {:?}", handle));
}
// Remove the entry for this client.
let service_info = self.registered_services.remove(handle);
self.rfcomm_server.free_server_channels(service_info.allocated_server_channels()).await;
// Attempt to re-advertise.
self.refresh_advertisement().await;
Ok(())
}
/// Processes an incoming L2cap connection from the upstream server.
///
/// If the connection PSM is not RFCOMM, relays directly to the client.
///
/// Returns an error if the `protocol` is invalidly formatted, or if the provided
/// PSM is not represented by a client of the `ProfileRegistrar`.
fn handle_incoming_l2cap_connection(
&mut self,
peer_id: PeerId,
channel: bredr::Channel,
protocol: Vec<bredr::ProtocolDescriptor>,
) -> Result<(), Error> {
let local = protocol.iter().map(|p| p.into()).collect();
match psm_from_protocol(&local).ok_or(format_err!("No PSM provided"))? {
Psm::RFCOMM => self.rfcomm_server.new_l2cap_connection(peer_id, channel.try_into()?),
psm => {
match self.registered_services.iter().find(|(_, client)| client.contains_psm(psm)) {
Some((_, client)) => client.relay_connected(peer_id.into(), channel, protocol),
None => {
return Err(format_err!(
"Connection request for non-advertised PSM {:?}",
psm
))
}
}
}
}
}
/// Validates that there is an active connection with the peer specified by `peer_id`. If not,
/// creates and delivers the L2CAP connection to the RFCOMM server.
async fn ensure_service_connection(&mut self, peer_id: PeerId) -> Result<(), ErrorCode> {
if self.rfcomm_server.is_active_session(&peer_id) {
return Ok(());
}
let mut connect_params = bredr::ConnectParameters::L2cap(bredr::L2capParameters {
psm: Some(bredr::PSM_RFCOMM),
..bredr::L2capParameters::EMPTY
});
let l2cap_channel =
match self.profile_upstream.connect(&mut peer_id.into(), &mut connect_params).await {
Ok(Ok(channel)) => channel.try_into().unwrap(),
Ok(Err(e)) => {
return Err(e);
}
Err(e) => {
error!("Couldn't establish L2CAP connection with {:?}: {:?}", peer_id, e);
return Err(ErrorCode::Failed);
}
};
self.rfcomm_server
.new_l2cap_connection(peer_id, l2cap_channel)
.map_err(|_| ErrorCode::Failed)
}
/// Processes an outgoing L2Cap connection initiated by a client of the ProfileRegistrar.
///
/// Returns an error if the connection request fails.
async fn handle_outgoing_connection(
&mut self,
peer_id: PeerId,
mut connection: bredr::ConnectParameters,
responder: bredr::ProfileConnectResponder,
) -> Result<(), Error> {
// If the provided `connection` is for a non-RFCOMM PSM, simply forward the outbound
// connection to the upstream Profile service.
// Otherwise, route to the RFCOMM server.
match &connection {
bredr::ConnectParameters::L2cap { .. } => {
let mut result = self
.profile_upstream
.connect(&mut peer_id.into(), &mut connection)
.await
.unwrap_or_else(|_fidl_error| Err(ErrorCode::Failed));
let _ = responder.send(&mut result);
}
bredr::ConnectParameters::Rfcomm(bredr::RfcommParameters { channel, .. }) => {
let server_channel = match channel.map(ServerChannel::try_from) {
Some(Ok(sc)) => sc,
_ => {
let _ = responder.send(&mut Err(ErrorCode::InvalidArguments));
return Ok(());
}
};
// Ensure there is an RFCOMM Session between us and the peer.
if let Err(e) = self.ensure_service_connection(peer_id).await {
let _ = responder.send(&mut Err(e));
return Ok(());
}
// Open the RFCOMM channel.
self.rfcomm_server.open_rfcomm_channel(peer_id, server_channel, responder).await?;
}
}
Ok(())
}
/// Advertises `params` to the provided `profile_upstream`.
///
/// Returns a Future for the advertisement; this future should be polled in order to detect
/// when the advertisement has finished.
fn advertise(
profile_upstream: bredr::ProfileProxy,
params: AdvertiseParams,
connect_client: ClientEnd<bredr::ConnectionReceiverMarker>,
) -> impl Future<Output = ()> {
let fidl_services = params
.services
.iter()
.map(bredr::ServiceDefinition::try_from)
.collect_results()
.unwrap();
profile_upstream
.advertise(
&mut fidl_services.into_iter(),
(&params.parameters).try_into().unwrap(),
connect_client,
)
.map(|_| ())
}
/// Processes requests from the ConnectionReceiver stream and relays to the `sender`.
async fn connection_request_relay(
mut connect_requests: bredr::ConnectionReceiverRequestStream,
mut sender: mpsc::Sender<ConnectionEvent>,
) {
while let Some(connect_request) = connect_requests.next().await {
match connect_request {
Ok(request) => {
let _ = sender.send(ConnectionEvent::Request(request)).await;
}
Err(e) => info!("Connection request error: {:?}", e),
}
}
// The upstream server has dropped the ConnectionReceiver. Let the
// receiver know that the advertisement has been canceled.
let _ = sender.send(ConnectionEvent::AdvertisementCanceled).await;
}
/// Attempts to build and advertise services from `self.registered_services`.
async fn refresh_advertisement(&mut self) {
let status =
std::mem::replace(&mut self.active_registration, AdvertiseStatus::NotAdvertising);
match status {
AdvertiseStatus::Advertising(AdvertiseTasks { adv_task, relay_task }) => {
// If we are currently advertising, drop the stream processing task to unregister
// the services. Wait for the advertisement to resolve before attempting to
// re-advertise.
drop(relay_task);
let _ = adv_task.await;
trace!("Finished waiting for unregistration");
}
AdvertiseStatus::NotAdvertising => {}
}
// We are ready to advertise. Attempt to build the advertisement parameters, and create
// and save two tasks that 1) Make the Advertise request and wait for termination and
// 2) Process incoming requests from the upstream server.
if let Some(params) = self.registered_services.build_registration() {
trace!("Advertising from registered services: {:?}", params);
let (connect_client, connect_requests) =
create_request_stream::<bredr::ConnectionReceiverMarker>().unwrap();
// Spawn a task to advertise `params`.
let adv_fut =
ProfileRegistrar::advertise(self.profile_upstream.clone(), params, connect_client);
let adv_task = adv_fut.boxed();
// Spawn a task to handle incoming L2CAP connections.
let relay_task = fasync::Task::spawn(ProfileRegistrar::connection_request_relay(
connect_requests,
self.connection_sender.clone().unwrap(),
));
self.active_registration =
AdvertiseStatus::Advertising(AdvertiseTasks { adv_task, relay_task });
}
}
/// Handles an incoming request to advertise a group of `services`.
///
/// At least one service in `services` must request RFCOMM.
///
/// The RFCOMM-requesting services are assigned ServerChannels. The services are then
/// registered together with the currently registered services.
///
/// Returns the event stream for the receiver tagged with a unique identifier for the
/// registered group of services. The event stream should be continuously polled in
/// order to detect when the client terminates the advertisement.
async fn add_managed_advertisement(
&mut self,
mut services: Vec<ServiceDefinition>,
parameters: ChannelParameters,
receiver: bredr::ConnectionReceiverProxy,
responder: bredr::ProfileAdvertiseResponder,
) -> Result<StreamWithEpitaph<bredr::ConnectionReceiverEventStream, ServiceGroupHandle>, Error>
{
// Validate that the new PSMs are disjoint because we unregister and re-register as a group.
let new_psms = psms_from_service_definitions(&services);
if !self.is_disjoint_psms(&new_psms) {
let _ = responder.send(&mut Err(ErrorCode::Failed));
return Err(format_err!("New advertisement requesting pre-allocated PSMs"));
}
// Create an entry for this group of services with a unique handle.
let next_handle =
self.registered_services.insert(ServiceGroup::new(receiver.clone(), parameters));
// If the RfcommServer has enough free Server Channels, allocate and update
// the RFCOMM-requesting services.
let required_server_channels =
services.iter().filter(|def| is_rfcomm_service_definition(def)).count();
if required_server_channels > self.rfcomm_server.available_server_channels().await {
let _ = responder.send(&mut Err(ErrorCode::Failed));
return Err(format_err!("RfcommServer not enough free Server Channels"));
}
for mut service in services.iter_mut().filter(|def| is_rfcomm_service_definition(def)) {
let server_channel = self
.rfcomm_server
.allocate_server_channel(receiver.clone())
.await
.expect("just checked");
update_svc_def_with_server_channel(&mut service, server_channel)?;
}
let service_info = self.registered_services.get_mut(next_handle).expect("just inserted");
service_info.set_service_defs(services);
service_info.set_responder(responder);
// Attempt to re-advertise the updated services.
self.refresh_advertisement().await;
Ok(receiver.take_event_stream().with_epitaph(next_handle))
}
/// Makes a Profile.Advertise() request upstream, and returns a Future that relays the
/// result to the `responder` upon termination.
fn make_advertise_relay(
&self,
services: Vec<bredr::ServiceDefinition>,
parameters: bredr::ChannelParameters,
receiver: ClientEnd<bredr::ConnectionReceiverMarker>,
responder: bredr::ProfileAdvertiseResponder,
) -> impl Future<Output = ()> {
let adv_fut =
self.profile_upstream.advertise(&mut services.into_iter(), parameters, receiver);
async move {
let _ = adv_fut
.await
.and_then(|mut r| responder.send(&mut r))
.map_err(|e| trace!("Relayed advertisement terminated: {:?}", e));
}
}
/// Handles a request over the Profile protocol.
///
/// If the request was an advertisement, returns either the event stream associated with
/// the advertise request, or a future to relay the advertisement request directly upstream.
async fn handle_profile_request(
&mut self,
request: bredr::ProfileRequest,
) -> Option<AdvertiseResult> {
match request {
bredr::ProfileRequest::Advertise { services, parameters, receiver, responder } => {
let services_local =
services.iter().map(ServiceDefinition::try_from).collect_results().ok()?;
trace!("Received advertise request: {:?}", services_local);
if service_definitions_request_rfcomm(&services_local) {
let receiver = receiver.into_proxy().ok()?;
let parameters = ChannelParameters::try_from(&parameters).ok()?;
match self
.add_managed_advertisement(services_local, parameters, receiver, responder)
.await
{
Err(e) => error!("Error handling advertise request: {:?}", e),
Ok(evt_stream) => return Some(AdvertiseResult::EventStream(evt_stream)),
}
} else {
return Some(AdvertiseResult::AdvertiseRelay(
self.make_advertise_relay(services, parameters, receiver, responder)
.boxed(),
));
}
}
bredr::ProfileRequest::Connect { peer_id, connection, responder, .. } => {
if let Err(e) =
self.handle_outgoing_connection(peer_id.into(), connection, responder).await
{
error!("Error establishing outgoing connection {:?}", e);
}
}
bredr::ProfileRequest::Search { service_uuid, attr_ids, results, .. } => {
// Simply forward over the search to the Profile server.
let _ = self.profile_upstream.search(service_uuid, &attr_ids, results);
}
bredr::ProfileRequest::ConnectSco {
mut peer_id, initiator, params, receiver, ..
} => {
let _ =
self.profile_upstream.connect_sco(&mut peer_id, initiator, params, receiver);
}
}
None
}
/// Handles incoming connection requests from the processing task of the active service
/// advertisement.
///
/// There are two relevant cases:
/// 1) A connection request from the upstream Host Server. The incoming l2cap connection
/// must be handled - if PSM_RFCOMM, send to the RfcommServer, otherwise, relay
/// directly to the profile client.
/// 2) An epitaph of the relay task signaling the advertisement has canceled. This is
/// usually due to an error in the upstream server. We must clear all of the services.
async fn handle_connection_request(&mut self, request: ConnectionEvent) -> Result<(), Error> {
match request {
ConnectionEvent::Request(request) => {
let bredr::ConnectionReceiverRequest::Connected {
peer_id, channel, protocol, ..
} = request;
self.handle_incoming_l2cap_connection(peer_id.into(), channel, protocol)
}
ConnectionEvent::AdvertisementCanceled => {
// The upstream server unexpectedly dropped the advertisement. We must clean up
// all of the state.
self.unregister_all_services().await;
Ok(())
}
}
}
/// `handle_fidl_requests` processes requests/events over several sources.
/// This is the main "event loop" for the server.
///
/// 1) The server implements the Profile service. Service advertisements, searches, and
/// connection requests are processed and handled from any clients of the Profile service.
/// 2) Connection requests from the upstream server. The active processing task relays
/// incoming connection requests for both RFCOMM and non-RFCOMM PSMs.
/// 3) Termination events from service advertisements of profile clients. The
/// event streams of the service advertisements are polled in `client_event_streams`
/// and are used to determine when to unregister the aforementioned advertisements.
/// 4) Profile.Advertise() requests that have been relayed directly to the upstream server
/// must be continuously polled to relay the response after termination.
pub async fn handle_fidl_requests(
mut self,
mut profile_request_streams: mpsc::Receiver<bredr::ProfileRequestStream>,
) {
// Collection for bredr.Profile requests.
let mut profile_requests = futures::stream::SelectAll::new();
// Internal channel used to relay requests over the `ConnectionReceiver` protocol.
let (connection_sender, mut connection_receiver) = mpsc::channel(0);
self.connection_sender = Some(connection_sender);
// Collection of all client service advertisement event streams. Used to determine
// when the client has stopped advertising its services.
let mut client_event_streams = futures::stream::SelectAll::new();
// Collection of futures of Profile.Advertise requests that have been relayed directly
// upstream. This is polled continuously to relay the response when the advertisement
// terminates.
let mut advertise_futures: FuturesUnordered<BoxFuture<'static, ()>> =
FuturesUnordered::new();
loop {
select! {
request_stream = profile_request_streams.select_next_some() => {
profile_requests.push(request_stream);
}
profile_request = profile_requests.select_next_some() => {
let profile_request = match profile_request {
Ok(request) => request,
Err(e) => {
info!("Error from Profile request: {:?}", e);
continue;
}
};
match self.handle_profile_request(profile_request).await {
Some(AdvertiseResult::EventStream(evt_stream)) => client_event_streams.push(evt_stream),
Some(AdvertiseResult::AdvertiseRelay(fut)) => advertise_futures.push(fut),
_ => {},
}
}
connection_request = connection_receiver.select_next_some() => {
if let Err(e) = self.handle_connection_request(connection_request).await {
error!("Error processing incoming l2cap connection request: {:?}", e);
}
}
service_event = client_event_streams.next() => {
if let Some(StreamItem::Epitaph(service_id)) = service_event {
// The event stream of the advertisement has been exhausted.
// Unregister the service from the ProfileRegistrar.
info!("Client {:?} unregistered service advertisement", service_id);
if let Err(e) = self.unregister_service(service_id).await {
error!("Error unregistering service {:?}: {:?}", service_id, e);
}
}
}
_ = advertise_futures.next() => {},
complete => break,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::tests::{other_service_definition, rfcomm_service_definition};
use fidl::encoding::Decodable;
use fidl::endpoints::create_proxy_and_stream;
use futures::{pin_mut, task::Poll};
/// Returns true if the provided `service` has an assigned Server Channel.
fn service_def_has_assigned_server_channel(service: &bredr::ServiceDefinition) -> bool {
if let Some(protocol) = &service.protocol_descriptor_list {
for descriptor in protocol {
if descriptor.protocol == bredr::ProtocolIdentifier::Rfcomm {
if descriptor.params.is_empty() {
return false;
}
// The server channel should be the first element.
if let bredr::DataElement::Uint8(_) = descriptor.params[0] {
return true;
}
}
}
}
false
}
/// Creates a Profile::Search request.
fn generate_search_request(
exec: &mut fasync::Executor,
) -> (bredr::ProfileRequest, bredr::SearchResultsRequestStream) {
let (c, mut s) = create_proxy_and_stream::<bredr::ProfileMarker>().unwrap();
let (results, server) = create_request_stream::<bredr::SearchResultsMarker>().unwrap();
assert!(c.search(bredr::ServiceClassProfileIdentifier::AudioSink, &[], results).is_ok());
match exec.run_until_stalled(&mut s.next()) {
Poll::Ready(Some(Ok(x))) => (x, server),
x => panic!("Expected ProfileRequest but got: {:?}", x),
}
}
/// Creates a Profile::ConnectSco request.
fn generate_connect_sco_request(
exec: &mut fasync::Executor,
) -> (bredr::ProfileRequest, bredr::ScoConnectionReceiverRequestStream) {
let (profile_proxy, mut profile_request_stream) =
create_proxy_and_stream::<bredr::ProfileMarker>().unwrap();
let (receiver_client, receiver_server) =
create_request_stream::<bredr::ScoConnectionReceiverMarker>().unwrap();
assert!(profile_proxy
.connect_sco(
&mut PeerId(1).into(),
/*initiator=*/ true,
bredr::ScoConnectionParameters::new_empty(),
receiver_client
)
.is_ok());
match exec.run_until_stalled(&mut profile_request_stream.next()) {
Poll::Ready(Some(Ok(request))) => (request, receiver_server),
x => panic!("Expected ProfileRequest but got: {:?}", x),
}
}
/// Creates a Profile::Advertise request.
/// Returns the associated request stream, and the Advertise request as a Future.
fn make_advertise_request(
client: &bredr::ProfileProxy,
services: Vec<bredr::ServiceDefinition>,
) -> (
bredr::ConnectionReceiverRequestStream,
impl Future<Output = Result<Result<(), ErrorCode>, fidl::Error>>,
) {
let (connection, connection_stream) =
create_request_stream::<bredr::ConnectionReceiverMarker>().unwrap();
let adv_fut = client.advertise(
&mut services.into_iter(),
bredr::ChannelParameters::new_empty(),
connection,
);
(connection_stream, adv_fut)
}
fn new_client(
exec: &mut fasync::Executor,
mut profile_sender: mpsc::Sender<bredr::ProfileRequestStream>,
) -> bredr::ProfileProxy {
let (profile_client, profile_server) =
create_proxy_and_stream::<bredr::ProfileMarker>().unwrap();
let send_fut = profile_sender.send(profile_server);
pin_mut!(send_fut);
let _ = exec.run_until_stalled(&mut send_fut);
profile_client
}
/// Returns the `ProfileRegistrar.handle_fidl_requests()` Future. This consumes
/// the `server`.
fn setup_handler_fut(
server: ProfileRegistrar,
) -> (mpsc::Sender<bredr::ProfileRequestStream>, impl Future<Output = ()>) {
let (profile_sender, profile_receiver) = mpsc::channel(0);
let handler_fut = server.handle_fidl_requests(profile_receiver);
(profile_sender, handler_fut)
}
/// Creates the ProfileRegistrar with the upstream Profile service.
fn setup_server() -> (fasync::Executor, ProfileRegistrar, bredr::ProfileRequestStream) {
let exec = fasync::Executor::new().unwrap();
let (client, server) = create_proxy_and_stream::<bredr::ProfileMarker>().unwrap();
let profile_server = ProfileRegistrar::new(client);
(exec, profile_server, server)
}
/// Exercises a service advertisement with an empty set of services.
/// In practice, the upstream Host Server will reject this request but the RFCOMM
/// server will still classify the request as non-RFCOMM only, and relay directly
/// to the Profile Server.
/// This test validates that the parameters are relayed directly to the Profile Server. Also
/// validates that when the upstream Advertise call resolves, the result is relayed to the
/// client.
#[test]
fn test_handle_empty_advertise_request() {
let (mut exec, server, mut upstream_requests) = setup_server();
let (profile_sender, handler_fut) = setup_handler_fut(server);
pin_mut!(handler_fut);
assert!(exec.run_until_stalled(&mut handler_fut).is_pending());
// A new client connects to bt-rfcomm.cmx.
let client = {
let client = new_client(&mut exec, profile_sender.clone());
let _ = exec.run_until_stalled(&mut handler_fut);
client
};
// Client decides to advertise empty services.
let services = vec![];
let (_connection_stream, adv_fut) = make_advertise_request(&client, services);
pin_mut!(adv_fut);
assert!(exec.run_until_stalled(&mut adv_fut).is_pending());
let _ = exec.run_until_stalled(&mut handler_fut);
// The advertisement request should be relayed directly upstream.
let responder = match exec.run_until_stalled(&mut upstream_requests.next()) {
Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { responder, .. }))) => responder,
x => panic!("Expected advertise request, got: {:?}", x),
};
// Upstream decides to resolve the Advertise call.
let _ = responder.send(&mut Ok(()));
let _ = exec.run_until_stalled(&mut handler_fut);
// Client should be notified, and it's advertisement should terminate.
assert!(exec.run_until_stalled(&mut adv_fut).is_ready());
}
/// Exercises a service advertisement with no RFCOMM services.
/// The ProfileRegistrar server should classify the request as non-RFCOMM only, and relay
/// directly to the upstream Profile Server.
#[test]
fn test_handle_advertise_request_with_no_rfcomm() -> Result<(), Error> {
let (mut exec, server, mut upstream_requests) = setup_server();
let (profile_sender, handler_fut) = setup_handler_fut(server);
pin_mut!(handler_fut);
assert!(exec.run_until_stalled(&mut handler_fut).is_pending());
// A new client connects to bt-rfcomm.cmx.
let client = {
let client = new_client(&mut exec, profile_sender.clone());
let _ = exec.run_until_stalled(&mut handler_fut);
client
};
// Client decides to advertise.
let services =
vec![bredr::ServiceDefinition::try_from(&other_service_definition(Psm::new(1)))?];
let (_connection_stream, adv_fut) = make_advertise_request(&client, services);
pin_mut!(adv_fut);
assert!(exec.run_until_stalled(&mut adv_fut).is_pending());
let _ = exec.run_until_stalled(&mut handler_fut);
// The advertisement request should be relayed directly upstream.
match exec.run_until_stalled(&mut upstream_requests.next()) {
Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { .. }))) => {}
x => panic!("Expected advertise request, got: {:?}", x),
};
Ok(())
}
/// Service advertisement with only RFCOMM services. The services should be assigned
/// Server Channels and be relayed upstream.
#[test]
fn test_handle_advertise_request_with_only_rfcomm() -> Result<(), Error> {
let (mut exec, server, mut upstream_requests) = setup_server();
let (profile_sender, handler_fut) = setup_handler_fut(server);
pin_mut!(handler_fut);
assert!(exec.run_until_stalled(&mut handler_fut).is_pending());
// A new client connects to bt-rfcomm.cmx.
let client = {
let client = new_client(&mut exec, profile_sender.clone());
let _ = exec.run_until_stalled(&mut handler_fut);
client
};
// Client decides to advertise.
let services = vec![bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?];
let (_connection_stream, adv_fut) = make_advertise_request(&client, services);
pin_mut!(adv_fut);
assert!(exec.run_until_stalled(&mut adv_fut).is_pending());
let _ = exec.run_until_stalled(&mut handler_fut);
// Upstream should receive Advertise request for a service with an assigned server channel.
match exec.run_until_stalled(&mut upstream_requests.next()) {
Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { services, .. }))) => {
assert_eq!(services.len(), 1);
assert!(service_def_has_assigned_server_channel(&services[0]));
}
x => panic!("Expected advertise request, got: {:?}", x),
};
Ok(())
}
/// Service advertisement with both RFCOMM and non-RFCOMM services. Only the RFCOMM
/// services should be assigned Server Channels, and the group should be registered upstream.
#[test]
fn test_handle_advertise_request_with_all_services() -> Result<(), Error> {
let (mut exec, server, mut upstream_requests) = setup_server();
let (profile_sender, handler_fut) = setup_handler_fut(server);
pin_mut!(handler_fut);
assert!(exec.run_until_stalled(&mut handler_fut).is_pending());
// A new client connects to bt-rfcomm.cmx.
let client = {
let client = new_client(&mut exec, profile_sender.clone());
let _ = exec.run_until_stalled(&mut handler_fut);
client
};
// Client decides to advertise.
let services = vec![
bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?,
bredr::ServiceDefinition::try_from(&other_service_definition(Psm::new(10)))?,
bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?,
];
let n = services.len();
let (_connection_stream, adv_fut) = make_advertise_request(&client, services);
pin_mut!(adv_fut);
assert!(exec.run_until_stalled(&mut adv_fut).is_pending());
let _ = exec.run_until_stalled(&mut handler_fut);
// Expect an advertise request with all n services - validate that the RFCOMM services
// have assigned server channels.
match exec.run_until_stalled(&mut upstream_requests.next()) {
Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { services, .. }))) => {
assert_eq!(services.len(), n);
let res = services
.into_iter()
.filter(|service| {
is_rfcomm_service_definition(&ServiceDefinition::try_from(service).unwrap())
})
.map(|service| service_def_has_assigned_server_channel(&service))
.fold(true, |acc, elt| acc || elt);
assert!(res);
}
x => panic!("Expected advertise request, got: {:?}", x),
}
Ok(())
}
/// Tests handling two advertise requests with overlapping PSMs. The first one
/// should succeed and be relayed upstream. The second one should fail since it
/// is requesting already-allocated PSMs - the responder should be notified of the error.
#[test]
fn test_handle_advertise_requests_same_psm() -> Result<(), Error> {
let (mut exec, server, mut upstream_requests) = setup_server();
let (profile_sender, handler_fut) = setup_handler_fut(server);
pin_mut!(handler_fut);
assert!(exec.run_until_stalled(&mut handler_fut).is_pending());
// A new client connects to bt-rfcomm.cmx.
let client1 = {
let client = new_client(&mut exec, profile_sender.clone());
let _ = exec.run_until_stalled(&mut handler_fut);
client
};
// Client decides to advertise.
let psm = Psm::new(10);
let services1 = vec![
bredr::ServiceDefinition::try_from(&other_service_definition(psm))?,
bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?,
];
let (_connection_stream1, adv_fut1) = make_advertise_request(&client1, services1);
pin_mut!(adv_fut1);
assert!(exec.run_until_stalled(&mut adv_fut1).is_pending());
let _ = exec.run_until_stalled(&mut handler_fut);
// Save the Advertise parameters.
let _adv_req1 = match exec.run_until_stalled(&mut upstream_requests.next()) {
Poll::Ready(Some(Ok(request))) => request,
x => panic!("Expected advertise request, got: {:?}", x),
};
// A different client connects to bt-rfcomm.cmx. It decides to try to advertise over same
// PSM.
let client2 = {
let client = new_client(&mut exec, profile_sender.clone());
let _ = exec.run_until_stalled(&mut handler_fut);
client
};
let services2 = vec![
bredr::ServiceDefinition::try_from(&other_service_definition(psm))?,
bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?,
];
let (_connection_stream2, adv_fut2) = make_advertise_request(&client2, services2);
pin_mut!(adv_fut2);
assert!(exec.run_until_stalled(&mut adv_fut2).is_pending());
let _ = exec.run_until_stalled(&mut handler_fut);
// Advertisement 1 is OK. Advertisement 2 should resolve immediately with an ErrorCode.
assert!(exec.run_until_stalled(&mut adv_fut1).is_pending());
match exec.run_until_stalled(&mut adv_fut2) {
Poll::Ready(Ok(Err(e))) => assert_eq!(e, ErrorCode::Failed),
x => panic!("Expected Ready with ErrorCode but got {:?}", x),
}
Ok(())
}
/// Tests that independent service advertisements from multiple clients are correctly
/// grouped together and re-registered.
#[test]
fn test_handle_multiple_service_advertisements() -> Result<(), Error> {
let (mut exec, server, mut upstream_requests) = setup_server();
let (profile_sender, handler_fut) = setup_handler_fut(server);
pin_mut!(handler_fut);
assert!(exec.run_until_stalled(&mut handler_fut).is_pending());
// A new client connects to bt-rfcomm.cmx.
let client1 = {
let client = new_client(&mut exec, profile_sender.clone());
let _ = exec.run_until_stalled(&mut handler_fut);
client
};
// Client decides to advertise.
let psm1 = Psm::new(10);
let services1 = vec![
bredr::ServiceDefinition::try_from(&other_service_definition(psm1))?,
bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?,
];
let n1 = services1.len();
let (_connection_stream1, adv_fut1) = make_advertise_request(&client1, services1);
pin_mut!(adv_fut1);
assert!(exec.run_until_stalled(&mut adv_fut1).is_pending());
let _ = exec.run_until_stalled(&mut handler_fut);
// First advertisement request relayed upstream.
let (_receiver1, responder1) = match exec.run_until_stalled(&mut upstream_requests.next()) {
Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise {
receiver, responder, ..
}))) => (receiver, responder),
x => panic!("Expected advertise request, got: {:?}", x),
};
// A different client connects to bt-rfcomm.cmx. It decides to try to advertise over same
// PSM.
let client2 = {
let client = new_client(&mut exec, profile_sender.clone());
let _ = exec.run_until_stalled(&mut handler_fut);
client
};
// Client 2 decides to advertise three services.
let psm2 = Psm::new(15);
let n2 = 3;
let services2 = vec![
bredr::ServiceDefinition::try_from(&other_service_definition(psm2))?,
bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?,
bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?,
];
let (_connection_stream2, adv_fut2) = make_advertise_request(&client2, services2);
pin_mut!(adv_fut2);
assert!(exec.run_until_stalled(&mut adv_fut2).is_pending());
let _ = exec.run_until_stalled(&mut handler_fut);
// We expect ProfileRegistrar to unregister the current active advertisement. Respond to
// the unregister request by responding over the responder.
let _ = responder1.send(&mut Ok(()));
let _ = exec.run_until_stalled(&mut handler_fut);
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
// We expect a new advertisement upstream.
let (_receiver2, _responder2) = match exec.run_until_stalled(&mut upstream_requests.next())
{
Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise {
services,
receiver,
responder,
..
}))) => {
assert_eq!(services.len(), n1 + n2);
(receiver, responder)
}
x => panic!("Expected advertise request, got: {:?}", x),
};
assert!(exec.run_until_stalled(&mut adv_fut1).is_pending());
assert!(exec.run_until_stalled(&mut adv_fut2).is_pending());
Ok(())
}
/// This test validates that client Search requests are relayed directly upstream.
#[test]
fn test_handle_search_request() {
let (mut exec, mut server, mut profile_requests) = setup_server();
let (search_request, _stream) = generate_search_request(&mut exec);
let handle_fut = server.handle_profile_request(search_request);
pin_mut!(handle_fut);
assert!(exec.run_until_stalled(&mut handle_fut).is_ready());
// The search request should be relayed directly to the Profile Server.
match exec.run_until_stalled(&mut profile_requests.next()) {
Poll::Ready(Some(Ok(bredr::ProfileRequest::Search { .. }))) => {}
x => panic!("Expected search request, got: {:?}", x),
}
}
/// This test validates that client ConnectSco requests are relayed directly upstream.
#[test]
fn test_handle_connect_sco_request() {
let (mut exec, mut server, mut profile_requests) = setup_server();
let (connect_sco_request, _receiver_server) = generate_connect_sco_request(&mut exec);
let handle_fut = server.handle_profile_request(connect_sco_request);
pin_mut!(handle_fut);
assert!(exec.run_until_stalled(&mut handle_fut).is_ready());
// The connect request should be relayed directly to the Profile Server.
match exec.run_until_stalled(&mut profile_requests.next()) {
Poll::Ready(Some(Ok(bredr::ProfileRequest::ConnectSco { .. }))) => {}
x => panic!("Expected search request, got: {:?}", x),
}
}
/// This tests validates that 1) The call to Profile.Advertise correctly registers upstream and
/// 2) When the call resolves, the Future resolves.
#[test]
fn test_advertise_relay() {
let mut exec = fasync::Executor::new().unwrap();
let (upstream, mut upstream_server) =
create_proxy_and_stream::<bredr::ProfileMarker>().unwrap();
let (connect_client, _connect_requests) =
create_request_stream::<bredr::ConnectionReceiverMarker>().unwrap();
let params = AdvertiseParams { services: vec![], parameters: ChannelParameters::default() };
let advertise_fut = ProfileRegistrar::advertise(upstream, params, connect_client);
pin_mut!(advertise_fut);
assert!(exec.run_until_stalled(&mut advertise_fut).is_pending());
let (_connection_receiver, responder) = match exec
.run_until_stalled(&mut upstream_server.next())
{
Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise {
receiver, responder, ..
}))) => (receiver, responder),
x => panic!("Expected Advertise request but got: {:?}", x),
};
assert!(exec.run_until_stalled(&mut advertise_fut).is_pending());
// Upstream server decides to terminate advertisement - we expect the Future to finish.
let _ = responder.send(&mut Ok(()));
assert!(exec.run_until_stalled(&mut advertise_fut).is_ready());
}
/// This test validates that incoming connection requests are correctly relayed
/// to the Sender of the connection task.
#[test]
fn test_connection_request_relay() {
let mut exec = fasync::Executor::new().unwrap();
let (connect_client, connect_requests) =
create_proxy_and_stream::<bredr::ConnectionReceiverMarker>().unwrap();
let (sender, mut receiver) = mpsc::channel(0);
let relay_fut = ProfileRegistrar::connection_request_relay(connect_requests, sender);
pin_mut!(relay_fut);
let receiver_fut = receiver.next();
pin_mut!(receiver_fut);
// The task should still be active and no messages sent.
assert!(exec.run_until_stalled(&mut relay_fut).is_pending());
assert!(exec.run_until_stalled(&mut receiver_fut).is_pending());
// Upstream server gives us a connection.
let id = PeerId(123);
let mut protocol = vec![];
assert!(connect_client
.connected(&mut id.into(), bredr::Channel::new_empty(), &mut protocol.iter_mut())
.is_ok());
// Run the relay fut - should still be running.
assert!(exec.run_until_stalled(&mut relay_fut).is_pending());
// The relay should've sent the ConnectionEvent to the receiver.
match exec.run_until_stalled(&mut receiver_fut) {
Poll::Ready(Some(ConnectionEvent::Request(
bredr::ConnectionReceiverRequest::Connected { .. },
))) => {}
x => panic!("Expected connection request but got {:?}", x),
}
// Upstream drops ConnectionReceiver client for some reason.
drop(connect_client);
// The relay should notify the sender that the stream has terminated (i.e relay Canceled).
assert!(exec.run_until_stalled(&mut relay_fut).is_pending());
match exec.run_until_stalled(&mut receiver_fut) {
Poll::Ready(Some(ConnectionEvent::AdvertisementCanceled)) => {}
x => panic!("Expected Canceled but got: {:?}", x),
}
// Relay should be finished since the channel is closed.
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
assert!(exec.run_until_stalled(&mut relay_fut).is_ready());
}
}