| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| anyhow::{format_err, Error}, |
| async_utils::stream::{StreamItem, StreamWithEpitaph, WithEpitaph}, |
| bt_rfcomm::ServerChannel, |
| fidl::endpoints::{create_request_stream, ClientEnd}, |
| fidl_fuchsia_bluetooth::ErrorCode, |
| fidl_fuchsia_bluetooth_bredr as bredr, fuchsia_async as fasync, |
| fuchsia_bluetooth::{ |
| profile::{psm_from_protocol, ChannelParameters, Psm, ServiceDefinition}, |
| types::PeerId, |
| util::CollectExt, |
| }, |
| fuchsia_inspect_derive::Inspect, |
| futures::{ |
| self, |
| channel::mpsc, |
| future::BoxFuture, |
| select, |
| sink::SinkExt, |
| stream::{FuturesUnordered, StreamExt}, |
| Future, FutureExt, |
| }, |
| log::{error, info, trace}, |
| std::{ |
| collections::HashSet, |
| convert::{TryFrom, TryInto}, |
| }, |
| }; |
| |
| use crate::profile::*; |
| use crate::rfcomm::RfcommServer; |
| use crate::types::{AdvertiseParams, ServiceGroup, ServiceGroupHandle, Services}; |
| |
| /// The returned result of a Profile.Advertise request. |
| enum AdvertiseResult { |
| /// The Advertise request needed RFCOMM - the client's event stream is returned. |
| EventStream(StreamWithEpitaph<bredr::ConnectionReceiverEventStream, ServiceGroupHandle>), |
| /// The Advertise request did not need RFCOMM - the request is relayed directly |
| /// to the upstream server. Because Profile.Advertise returns when the advertisement finishes, |
| /// we return a Future that relays the result to the client. |
| AdvertiseRelay(BoxFuture<'static, ()>), |
| } |
| |
| /// A connection request from the upstream server. |
| #[derive(Debug)] |
| enum ConnectionEvent { |
| /// A normal connection request. |
| Request(bredr::ConnectionReceiverRequest), |
| /// The upstream server canceled the advertisement, for any reason. |
| AdvertisementCanceled, |
| } |
| |
| /// The tasks associated with a service advertisement. |
| struct AdvertiseTasks { |
| /// The Profile.Advertise() Future that resolves when advertisement terminates. |
| pub adv_task: BoxFuture<'static, ()>, |
| /// The relay task used to process incoming connection requests from the upstream |
| /// server. |
| pub relay_task: fasync::Task<()>, |
| } |
| |
| /// The current advertisement status of the `ProfileRegistrar`. |
| enum AdvertiseStatus { |
| /// Currently advertising with a set of tasks processing requests. |
| Advertising(AdvertiseTasks), |
| /// We are not currently advertising. |
| NotAdvertising, |
| } |
| |
| /// The ProfileRegistrar handles requests over the `fidl.fuchsia.bluetooth.bredr.Profile` service. |
| /// Clients can advertise, search, and connect to services. |
| /// The ProfileRegistrar can be thought of as a relay for clients using Profile. Clients not |
| /// requesting RFCOMM services will be relayed directly to the upstream server. |
| /// |
| /// The ProfileRegistrar manages a single RFCOMM advertisement group. If a new client |
| /// requests to advertise services, the server will unregister the active advertisement, group |
| /// together all the services, and re-register. |
| /// The ProfileRegistrar also manages the `RfcommServer` - which is responsible for handling |
| /// connections over the RFCOMM PSM. |
| #[derive(Inspect)] |
| pub struct ProfileRegistrar { |
| /// An upstream provider of the Profile service. Typically provided by bt-host. |
| profile_upstream: bredr::ProfileProxy, |
| /// The `active_registration` is the current processing task for connection requests |
| /// from the upstream server. |
| active_registration: AdvertiseStatus, |
| /// The currently advertised services. |
| registered_services: Services, |
| /// Sender used to relay connection requests from the upstream server. |
| connection_sender: Option<mpsc::Sender<ConnectionEvent>>, |
| /// The RFCOMM server that handles allocating server channels, incoming |
| /// l2cap connections, outgoing l2cap connections, and multiplexing channels. |
| #[inspect(forward)] |
| rfcomm_server: RfcommServer, |
| } |
| |
| impl ProfileRegistrar { |
| pub fn new(profile_upstream: bredr::ProfileProxy) -> Self { |
| Self { |
| profile_upstream, |
| active_registration: AdvertiseStatus::NotAdvertising, |
| registered_services: Services::new(), |
| connection_sender: None, |
| rfcomm_server: RfcommServer::new(), |
| } |
| } |
| |
| /// Creates and returns a Task representing the running server. |
| pub fn start(self, receiver: mpsc::Receiver<bredr::ProfileRequestStream>) -> fasync::Task<()> { |
| let handler_fut = self.handle_fidl_requests(receiver); |
| fasync::Task::spawn(handler_fut) |
| } |
| |
| /// Returns true if the requested `new_psms` do not overlap with the currently registered PSMs. |
| fn is_disjoint_psms(&self, new_psms: &HashSet<Psm>) -> bool { |
| self.registered_services.psms().is_disjoint(new_psms) |
| } |
| |
| /// Unregisters all the active services advertised by this server. |
| /// This should be called when the upstream server drops the single service advertisement that |
| /// this server manages. |
| async fn unregister_all_services(&mut self) { |
| self.registered_services = Services::new(); |
| self.rfcomm_server.free_all_server_channels().await; |
| } |
| |
| /// Unregisters the group of services identified by `handle`. Re-registers any remaining |
| /// services. |
| /// This should be called when a profile client decides to stop advertising its services. |
| async fn unregister_service(&mut self, handle: ServiceGroupHandle) -> Result<(), Error> { |
| if !self.registered_services.contains(handle) { |
| return Err(format_err!("Attempt to unregister non-existent service: {:?}", handle)); |
| } |
| |
| // Remove the entry for this client. |
| let service_info = self.registered_services.remove(handle); |
| self.rfcomm_server.free_server_channels(service_info.allocated_server_channels()).await; |
| |
| // Attempt to re-advertise. |
| self.refresh_advertisement().await; |
| Ok(()) |
| } |
| |
| /// Processes an incoming L2cap connection from the upstream server. |
| /// |
| /// If the connection PSM is not RFCOMM, relays directly to the client. |
| /// |
| /// Returns an error if the `protocol` is invalidly formatted, or if the provided |
| /// PSM is not represented by a client of the `ProfileRegistrar`. |
| fn handle_incoming_l2cap_connection( |
| &mut self, |
| peer_id: PeerId, |
| channel: bredr::Channel, |
| protocol: Vec<bredr::ProtocolDescriptor>, |
| ) -> Result<(), Error> { |
| let local = protocol.iter().map(|p| p.into()).collect(); |
| match psm_from_protocol(&local).ok_or(format_err!("No PSM provided"))? { |
| Psm::RFCOMM => self.rfcomm_server.new_l2cap_connection(peer_id, channel.try_into()?), |
| psm => { |
| match self.registered_services.iter().find(|(_, client)| client.contains_psm(psm)) { |
| Some((_, client)) => client.relay_connected(peer_id.into(), channel, protocol), |
| None => { |
| return Err(format_err!( |
| "Connection request for non-advertised PSM {:?}", |
| psm |
| )) |
| } |
| } |
| } |
| } |
| } |
| |
| /// Validates that there is an active connection with the peer specified by `peer_id`. If not, |
| /// creates and delivers the L2CAP connection to the RFCOMM server. |
| async fn ensure_service_connection(&mut self, peer_id: PeerId) -> Result<(), ErrorCode> { |
| if self.rfcomm_server.is_active_session(&peer_id) { |
| return Ok(()); |
| } |
| |
| let mut connect_params = bredr::ConnectParameters::L2cap(bredr::L2capParameters { |
| psm: Some(bredr::PSM_RFCOMM), |
| ..bredr::L2capParameters::EMPTY |
| }); |
| let l2cap_channel = |
| match self.profile_upstream.connect(&mut peer_id.into(), &mut connect_params).await { |
| Ok(Ok(channel)) => channel.try_into().unwrap(), |
| Ok(Err(e)) => { |
| return Err(e); |
| } |
| Err(e) => { |
| error!("Couldn't establish L2CAP connection with {:?}: {:?}", peer_id, e); |
| return Err(ErrorCode::Failed); |
| } |
| }; |
| self.rfcomm_server |
| .new_l2cap_connection(peer_id, l2cap_channel) |
| .map_err(|_| ErrorCode::Failed) |
| } |
| |
| /// Processes an outgoing L2Cap connection initiated by a client of the ProfileRegistrar. |
| /// |
| /// Returns an error if the connection request fails. |
| async fn handle_outgoing_connection( |
| &mut self, |
| peer_id: PeerId, |
| mut connection: bredr::ConnectParameters, |
| responder: bredr::ProfileConnectResponder, |
| ) -> Result<(), Error> { |
| // If the provided `connection` is for a non-RFCOMM PSM, simply forward the outbound |
| // connection to the upstream Profile service. |
| // Otherwise, route to the RFCOMM server. |
| match &connection { |
| bredr::ConnectParameters::L2cap { .. } => { |
| let mut result = self |
| .profile_upstream |
| .connect(&mut peer_id.into(), &mut connection) |
| .await |
| .unwrap_or_else(|_fidl_error| Err(ErrorCode::Failed)); |
| let _ = responder.send(&mut result); |
| } |
| bredr::ConnectParameters::Rfcomm(bredr::RfcommParameters { channel, .. }) => { |
| let server_channel = match channel.map(ServerChannel::try_from) { |
| Some(Ok(sc)) => sc, |
| _ => { |
| let _ = responder.send(&mut Err(ErrorCode::InvalidArguments)); |
| return Ok(()); |
| } |
| }; |
| |
| // Ensure there is an RFCOMM Session between us and the peer. |
| if let Err(e) = self.ensure_service_connection(peer_id).await { |
| let _ = responder.send(&mut Err(e)); |
| return Ok(()); |
| } |
| // Open the RFCOMM channel. |
| self.rfcomm_server.open_rfcomm_channel(peer_id, server_channel, responder).await?; |
| } |
| } |
| Ok(()) |
| } |
| |
| /// Advertises `params` to the provided `profile_upstream`. |
| /// |
| /// Returns a Future for the advertisement; this future should be polled in order to detect |
| /// when the advertisement has finished. |
| fn advertise( |
| profile_upstream: bredr::ProfileProxy, |
| params: AdvertiseParams, |
| connect_client: ClientEnd<bredr::ConnectionReceiverMarker>, |
| ) -> impl Future<Output = ()> { |
| let fidl_services = params |
| .services |
| .iter() |
| .map(bredr::ServiceDefinition::try_from) |
| .collect_results() |
| .unwrap(); |
| profile_upstream |
| .advertise( |
| &mut fidl_services.into_iter(), |
| (¶ms.parameters).try_into().unwrap(), |
| connect_client, |
| ) |
| .map(|_| ()) |
| } |
| |
| /// Processes requests from the ConnectionReceiver stream and relays to the `sender`. |
| async fn connection_request_relay( |
| mut connect_requests: bredr::ConnectionReceiverRequestStream, |
| mut sender: mpsc::Sender<ConnectionEvent>, |
| ) { |
| while let Some(connect_request) = connect_requests.next().await { |
| match connect_request { |
| Ok(request) => { |
| let _ = sender.send(ConnectionEvent::Request(request)).await; |
| } |
| Err(e) => info!("Connection request error: {:?}", e), |
| } |
| } |
| // The upstream server has dropped the ConnectionReceiver. Let the |
| // receiver know that the advertisement has been canceled. |
| let _ = sender.send(ConnectionEvent::AdvertisementCanceled).await; |
| } |
| |
| /// Attempts to build and advertise services from `self.registered_services`. |
| async fn refresh_advertisement(&mut self) { |
| let status = |
| std::mem::replace(&mut self.active_registration, AdvertiseStatus::NotAdvertising); |
| match status { |
| AdvertiseStatus::Advertising(AdvertiseTasks { adv_task, relay_task }) => { |
| // If we are currently advertising, drop the stream processing task to unregister |
| // the services. Wait for the advertisement to resolve before attempting to |
| // re-advertise. |
| drop(relay_task); |
| let _ = adv_task.await; |
| trace!("Finished waiting for unregistration"); |
| } |
| AdvertiseStatus::NotAdvertising => {} |
| } |
| |
| // We are ready to advertise. Attempt to build the advertisement parameters, and create |
| // and save two tasks that 1) Make the Advertise request and wait for termination and |
| // 2) Process incoming requests from the upstream server. |
| if let Some(params) = self.registered_services.build_registration() { |
| trace!("Advertising from registered services: {:?}", params); |
| let (connect_client, connect_requests) = |
| create_request_stream::<bredr::ConnectionReceiverMarker>().unwrap(); |
| // Spawn a task to advertise `params`. |
| let adv_fut = |
| ProfileRegistrar::advertise(self.profile_upstream.clone(), params, connect_client); |
| let adv_task = adv_fut.boxed(); |
| // Spawn a task to handle incoming L2CAP connections. |
| let relay_task = fasync::Task::spawn(ProfileRegistrar::connection_request_relay( |
| connect_requests, |
| self.connection_sender.clone().unwrap(), |
| )); |
| |
| self.active_registration = |
| AdvertiseStatus::Advertising(AdvertiseTasks { adv_task, relay_task }); |
| } |
| } |
| |
| /// Handles an incoming request to advertise a group of `services`. |
| /// |
| /// At least one service in `services` must request RFCOMM. |
| /// |
| /// The RFCOMM-requesting services are assigned ServerChannels. The services are then |
| /// registered together with the currently registered services. |
| /// |
| /// Returns the event stream for the receiver tagged with a unique identifier for the |
| /// registered group of services. The event stream should be continuously polled in |
| /// order to detect when the client terminates the advertisement. |
| async fn add_managed_advertisement( |
| &mut self, |
| mut services: Vec<ServiceDefinition>, |
| parameters: ChannelParameters, |
| receiver: bredr::ConnectionReceiverProxy, |
| responder: bredr::ProfileAdvertiseResponder, |
| ) -> Result<StreamWithEpitaph<bredr::ConnectionReceiverEventStream, ServiceGroupHandle>, Error> |
| { |
| // Validate that the new PSMs are disjoint because we unregister and re-register as a group. |
| let new_psms = psms_from_service_definitions(&services); |
| if !self.is_disjoint_psms(&new_psms) { |
| let _ = responder.send(&mut Err(ErrorCode::Failed)); |
| return Err(format_err!("New advertisement requesting pre-allocated PSMs")); |
| } |
| |
| // Create an entry for this group of services with a unique handle. |
| let next_handle = |
| self.registered_services.insert(ServiceGroup::new(receiver.clone(), parameters)); |
| |
| // If the RfcommServer has enough free Server Channels, allocate and update |
| // the RFCOMM-requesting services. |
| let required_server_channels = |
| services.iter().filter(|def| is_rfcomm_service_definition(def)).count(); |
| if required_server_channels > self.rfcomm_server.available_server_channels().await { |
| let _ = responder.send(&mut Err(ErrorCode::Failed)); |
| return Err(format_err!("RfcommServer not enough free Server Channels")); |
| } |
| for mut service in services.iter_mut().filter(|def| is_rfcomm_service_definition(def)) { |
| let server_channel = self |
| .rfcomm_server |
| .allocate_server_channel(receiver.clone()) |
| .await |
| .expect("just checked"); |
| update_svc_def_with_server_channel(&mut service, server_channel)?; |
| } |
| |
| let service_info = self.registered_services.get_mut(next_handle).expect("just inserted"); |
| service_info.set_service_defs(services); |
| service_info.set_responder(responder); |
| |
| // Attempt to re-advertise the updated services. |
| self.refresh_advertisement().await; |
| |
| Ok(receiver.take_event_stream().with_epitaph(next_handle)) |
| } |
| |
| /// Makes a Profile.Advertise() request upstream, and returns a Future that relays the |
| /// result to the `responder` upon termination. |
| fn make_advertise_relay( |
| &self, |
| services: Vec<bredr::ServiceDefinition>, |
| parameters: bredr::ChannelParameters, |
| receiver: ClientEnd<bredr::ConnectionReceiverMarker>, |
| responder: bredr::ProfileAdvertiseResponder, |
| ) -> impl Future<Output = ()> { |
| let adv_fut = |
| self.profile_upstream.advertise(&mut services.into_iter(), parameters, receiver); |
| async move { |
| let _ = adv_fut |
| .await |
| .and_then(|mut r| responder.send(&mut r)) |
| .map_err(|e| trace!("Relayed advertisement terminated: {:?}", e)); |
| } |
| } |
| |
| /// Handles a request over the Profile protocol. |
| /// |
| /// If the request was an advertisement, returns either the event stream associated with |
| /// the advertise request, or a future to relay the advertisement request directly upstream. |
| async fn handle_profile_request( |
| &mut self, |
| request: bredr::ProfileRequest, |
| ) -> Option<AdvertiseResult> { |
| match request { |
| bredr::ProfileRequest::Advertise { services, parameters, receiver, responder } => { |
| let services_local = |
| services.iter().map(ServiceDefinition::try_from).collect_results().ok()?; |
| trace!("Received advertise request: {:?}", services_local); |
| if service_definitions_request_rfcomm(&services_local) { |
| let receiver = receiver.into_proxy().ok()?; |
| let parameters = ChannelParameters::try_from(¶meters).ok()?; |
| match self |
| .add_managed_advertisement(services_local, parameters, receiver, responder) |
| .await |
| { |
| Err(e) => error!("Error handling advertise request: {:?}", e), |
| Ok(evt_stream) => return Some(AdvertiseResult::EventStream(evt_stream)), |
| } |
| } else { |
| return Some(AdvertiseResult::AdvertiseRelay( |
| self.make_advertise_relay(services, parameters, receiver, responder) |
| .boxed(), |
| )); |
| } |
| } |
| bredr::ProfileRequest::Connect { peer_id, connection, responder, .. } => { |
| if let Err(e) = |
| self.handle_outgoing_connection(peer_id.into(), connection, responder).await |
| { |
| error!("Error establishing outgoing connection {:?}", e); |
| } |
| } |
| bredr::ProfileRequest::Search { service_uuid, attr_ids, results, .. } => { |
| // Simply forward over the search to the Profile server. |
| let _ = self.profile_upstream.search(service_uuid, &attr_ids, results); |
| } |
| bredr::ProfileRequest::ConnectSco { |
| mut peer_id, initiator, params, receiver, .. |
| } => { |
| let _ = |
| self.profile_upstream.connect_sco(&mut peer_id, initiator, params, receiver); |
| } |
| } |
| None |
| } |
| |
| /// Handles incoming connection requests from the processing task of the active service |
| /// advertisement. |
| /// |
| /// There are two relevant cases: |
| /// 1) A connection request from the upstream Host Server. The incoming l2cap connection |
| /// must be handled - if PSM_RFCOMM, send to the RfcommServer, otherwise, relay |
| /// directly to the profile client. |
| /// 2) An epitaph of the relay task signaling the advertisement has canceled. This is |
| /// usually due to an error in the upstream server. We must clear all of the services. |
| async fn handle_connection_request(&mut self, request: ConnectionEvent) -> Result<(), Error> { |
| match request { |
| ConnectionEvent::Request(request) => { |
| let bredr::ConnectionReceiverRequest::Connected { |
| peer_id, channel, protocol, .. |
| } = request; |
| self.handle_incoming_l2cap_connection(peer_id.into(), channel, protocol) |
| } |
| ConnectionEvent::AdvertisementCanceled => { |
| // The upstream server unexpectedly dropped the advertisement. We must clean up |
| // all of the state. |
| self.unregister_all_services().await; |
| Ok(()) |
| } |
| } |
| } |
| |
| /// `handle_fidl_requests` processes requests/events over several sources. |
| /// This is the main "event loop" for the server. |
| /// |
| /// 1) The server implements the Profile service. Service advertisements, searches, and |
| /// connection requests are processed and handled from any clients of the Profile service. |
| /// 2) Connection requests from the upstream server. The active processing task relays |
| /// incoming connection requests for both RFCOMM and non-RFCOMM PSMs. |
| /// 3) Termination events from service advertisements of profile clients. The |
| /// event streams of the service advertisements are polled in `client_event_streams` |
| /// and are used to determine when to unregister the aforementioned advertisements. |
| /// 4) Profile.Advertise() requests that have been relayed directly to the upstream server |
| /// must be continuously polled to relay the response after termination. |
| pub async fn handle_fidl_requests( |
| mut self, |
| mut profile_request_streams: mpsc::Receiver<bredr::ProfileRequestStream>, |
| ) { |
| // Collection for bredr.Profile requests. |
| let mut profile_requests = futures::stream::SelectAll::new(); |
| |
| // Internal channel used to relay requests over the `ConnectionReceiver` protocol. |
| let (connection_sender, mut connection_receiver) = mpsc::channel(0); |
| self.connection_sender = Some(connection_sender); |
| |
| // Collection of all client service advertisement event streams. Used to determine |
| // when the client has stopped advertising its services. |
| let mut client_event_streams = futures::stream::SelectAll::new(); |
| |
| // Collection of futures of Profile.Advertise requests that have been relayed directly |
| // upstream. This is polled continuously to relay the response when the advertisement |
| // terminates. |
| let mut advertise_futures: FuturesUnordered<BoxFuture<'static, ()>> = |
| FuturesUnordered::new(); |
| |
| loop { |
| select! { |
| request_stream = profile_request_streams.select_next_some() => { |
| profile_requests.push(request_stream); |
| } |
| profile_request = profile_requests.select_next_some() => { |
| let profile_request = match profile_request { |
| Ok(request) => request, |
| Err(e) => { |
| info!("Error from Profile request: {:?}", e); |
| continue; |
| } |
| }; |
| match self.handle_profile_request(profile_request).await { |
| Some(AdvertiseResult::EventStream(evt_stream)) => client_event_streams.push(evt_stream), |
| Some(AdvertiseResult::AdvertiseRelay(fut)) => advertise_futures.push(fut), |
| _ => {}, |
| } |
| } |
| connection_request = connection_receiver.select_next_some() => { |
| if let Err(e) = self.handle_connection_request(connection_request).await { |
| error!("Error processing incoming l2cap connection request: {:?}", e); |
| } |
| } |
| service_event = client_event_streams.next() => { |
| if let Some(StreamItem::Epitaph(service_id)) = service_event { |
| // The event stream of the advertisement has been exhausted. |
| // Unregister the service from the ProfileRegistrar. |
| info!("Client {:?} unregistered service advertisement", service_id); |
| if let Err(e) = self.unregister_service(service_id).await { |
| error!("Error unregistering service {:?}: {:?}", service_id, e); |
| } |
| } |
| } |
| _ = advertise_futures.next() => {}, |
| complete => break, |
| } |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| use crate::types::tests::{other_service_definition, rfcomm_service_definition}; |
| |
| use fidl::encoding::Decodable; |
| use fidl::endpoints::create_proxy_and_stream; |
| use futures::{pin_mut, task::Poll}; |
| |
| /// Returns true if the provided `service` has an assigned Server Channel. |
| fn service_def_has_assigned_server_channel(service: &bredr::ServiceDefinition) -> bool { |
| if let Some(protocol) = &service.protocol_descriptor_list { |
| for descriptor in protocol { |
| if descriptor.protocol == bredr::ProtocolIdentifier::Rfcomm { |
| if descriptor.params.is_empty() { |
| return false; |
| } |
| // The server channel should be the first element. |
| if let bredr::DataElement::Uint8(_) = descriptor.params[0] { |
| return true; |
| } |
| } |
| } |
| } |
| false |
| } |
| |
| /// Creates a Profile::Search request. |
| fn generate_search_request( |
| exec: &mut fasync::Executor, |
| ) -> (bredr::ProfileRequest, bredr::SearchResultsRequestStream) { |
| let (c, mut s) = create_proxy_and_stream::<bredr::ProfileMarker>().unwrap(); |
| let (results, server) = create_request_stream::<bredr::SearchResultsMarker>().unwrap(); |
| |
| assert!(c.search(bredr::ServiceClassProfileIdentifier::AudioSink, &[], results).is_ok()); |
| |
| match exec.run_until_stalled(&mut s.next()) { |
| Poll::Ready(Some(Ok(x))) => (x, server), |
| x => panic!("Expected ProfileRequest but got: {:?}", x), |
| } |
| } |
| |
| /// Creates a Profile::ConnectSco request. |
| fn generate_connect_sco_request( |
| exec: &mut fasync::Executor, |
| ) -> (bredr::ProfileRequest, bredr::ScoConnectionReceiverRequestStream) { |
| let (profile_proxy, mut profile_request_stream) = |
| create_proxy_and_stream::<bredr::ProfileMarker>().unwrap(); |
| let (receiver_client, receiver_server) = |
| create_request_stream::<bredr::ScoConnectionReceiverMarker>().unwrap(); |
| |
| assert!(profile_proxy |
| .connect_sco( |
| &mut PeerId(1).into(), |
| /*initiator=*/ true, |
| bredr::ScoConnectionParameters::new_empty(), |
| receiver_client |
| ) |
| .is_ok()); |
| |
| match exec.run_until_stalled(&mut profile_request_stream.next()) { |
| Poll::Ready(Some(Ok(request))) => (request, receiver_server), |
| x => panic!("Expected ProfileRequest but got: {:?}", x), |
| } |
| } |
| |
| /// Creates a Profile::Advertise request. |
| /// Returns the associated request stream, and the Advertise request as a Future. |
| fn make_advertise_request( |
| client: &bredr::ProfileProxy, |
| services: Vec<bredr::ServiceDefinition>, |
| ) -> ( |
| bredr::ConnectionReceiverRequestStream, |
| impl Future<Output = Result<Result<(), ErrorCode>, fidl::Error>>, |
| ) { |
| let (connection, connection_stream) = |
| create_request_stream::<bredr::ConnectionReceiverMarker>().unwrap(); |
| let adv_fut = client.advertise( |
| &mut services.into_iter(), |
| bredr::ChannelParameters::new_empty(), |
| connection, |
| ); |
| (connection_stream, adv_fut) |
| } |
| |
| fn new_client( |
| exec: &mut fasync::Executor, |
| mut profile_sender: mpsc::Sender<bredr::ProfileRequestStream>, |
| ) -> bredr::ProfileProxy { |
| let (profile_client, profile_server) = |
| create_proxy_and_stream::<bredr::ProfileMarker>().unwrap(); |
| let send_fut = profile_sender.send(profile_server); |
| pin_mut!(send_fut); |
| let _ = exec.run_until_stalled(&mut send_fut); |
| profile_client |
| } |
| |
| /// Returns the `ProfileRegistrar.handle_fidl_requests()` Future. This consumes |
| /// the `server`. |
| fn setup_handler_fut( |
| server: ProfileRegistrar, |
| ) -> (mpsc::Sender<bredr::ProfileRequestStream>, impl Future<Output = ()>) { |
| let (profile_sender, profile_receiver) = mpsc::channel(0); |
| let handler_fut = server.handle_fidl_requests(profile_receiver); |
| (profile_sender, handler_fut) |
| } |
| |
| /// Creates the ProfileRegistrar with the upstream Profile service. |
| fn setup_server() -> (fasync::Executor, ProfileRegistrar, bredr::ProfileRequestStream) { |
| let exec = fasync::Executor::new().unwrap(); |
| let (client, server) = create_proxy_and_stream::<bredr::ProfileMarker>().unwrap(); |
| let profile_server = ProfileRegistrar::new(client); |
| (exec, profile_server, server) |
| } |
| |
| /// Exercises a service advertisement with an empty set of services. |
| /// In practice, the upstream Host Server will reject this request but the RFCOMM |
| /// server will still classify the request as non-RFCOMM only, and relay directly |
| /// to the Profile Server. |
| /// This test validates that the parameters are relayed directly to the Profile Server. Also |
| /// validates that when the upstream Advertise call resolves, the result is relayed to the |
| /// client. |
| #[test] |
| fn test_handle_empty_advertise_request() { |
| let (mut exec, server, mut upstream_requests) = setup_server(); |
| |
| let (profile_sender, handler_fut) = setup_handler_fut(server); |
| pin_mut!(handler_fut); |
| assert!(exec.run_until_stalled(&mut handler_fut).is_pending()); |
| |
| // A new client connects to bt-rfcomm.cmx. |
| let client = { |
| let client = new_client(&mut exec, profile_sender.clone()); |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| client |
| }; |
| |
| // Client decides to advertise empty services. |
| let services = vec![]; |
| let (_connection_stream, adv_fut) = make_advertise_request(&client, services); |
| pin_mut!(adv_fut); |
| assert!(exec.run_until_stalled(&mut adv_fut).is_pending()); |
| |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| |
| // The advertisement request should be relayed directly upstream. |
| let responder = match exec.run_until_stalled(&mut upstream_requests.next()) { |
| Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { responder, .. }))) => responder, |
| x => panic!("Expected advertise request, got: {:?}", x), |
| }; |
| |
| // Upstream decides to resolve the Advertise call. |
| let _ = responder.send(&mut Ok(())); |
| |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| // Client should be notified, and it's advertisement should terminate. |
| assert!(exec.run_until_stalled(&mut adv_fut).is_ready()); |
| } |
| |
| /// Exercises a service advertisement with no RFCOMM services. |
| /// The ProfileRegistrar server should classify the request as non-RFCOMM only, and relay |
| /// directly to the upstream Profile Server. |
| #[test] |
| fn test_handle_advertise_request_with_no_rfcomm() -> Result<(), Error> { |
| let (mut exec, server, mut upstream_requests) = setup_server(); |
| |
| let (profile_sender, handler_fut) = setup_handler_fut(server); |
| pin_mut!(handler_fut); |
| assert!(exec.run_until_stalled(&mut handler_fut).is_pending()); |
| |
| // A new client connects to bt-rfcomm.cmx. |
| let client = { |
| let client = new_client(&mut exec, profile_sender.clone()); |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| client |
| }; |
| |
| // Client decides to advertise. |
| let services = |
| vec![bredr::ServiceDefinition::try_from(&other_service_definition(Psm::new(1)))?]; |
| let (_connection_stream, adv_fut) = make_advertise_request(&client, services); |
| pin_mut!(adv_fut); |
| assert!(exec.run_until_stalled(&mut adv_fut).is_pending()); |
| |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| |
| // The advertisement request should be relayed directly upstream. |
| match exec.run_until_stalled(&mut upstream_requests.next()) { |
| Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { .. }))) => {} |
| x => panic!("Expected advertise request, got: {:?}", x), |
| }; |
| Ok(()) |
| } |
| |
| /// Service advertisement with only RFCOMM services. The services should be assigned |
| /// Server Channels and be relayed upstream. |
| #[test] |
| fn test_handle_advertise_request_with_only_rfcomm() -> Result<(), Error> { |
| let (mut exec, server, mut upstream_requests) = setup_server(); |
| |
| let (profile_sender, handler_fut) = setup_handler_fut(server); |
| pin_mut!(handler_fut); |
| assert!(exec.run_until_stalled(&mut handler_fut).is_pending()); |
| |
| // A new client connects to bt-rfcomm.cmx. |
| let client = { |
| let client = new_client(&mut exec, profile_sender.clone()); |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| client |
| }; |
| |
| // Client decides to advertise. |
| let services = vec![bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?]; |
| let (_connection_stream, adv_fut) = make_advertise_request(&client, services); |
| pin_mut!(adv_fut); |
| assert!(exec.run_until_stalled(&mut adv_fut).is_pending()); |
| |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| |
| // Upstream should receive Advertise request for a service with an assigned server channel. |
| match exec.run_until_stalled(&mut upstream_requests.next()) { |
| Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { services, .. }))) => { |
| assert_eq!(services.len(), 1); |
| assert!(service_def_has_assigned_server_channel(&services[0])); |
| } |
| x => panic!("Expected advertise request, got: {:?}", x), |
| }; |
| Ok(()) |
| } |
| |
| /// Service advertisement with both RFCOMM and non-RFCOMM services. Only the RFCOMM |
| /// services should be assigned Server Channels, and the group should be registered upstream. |
| #[test] |
| fn test_handle_advertise_request_with_all_services() -> Result<(), Error> { |
| let (mut exec, server, mut upstream_requests) = setup_server(); |
| |
| let (profile_sender, handler_fut) = setup_handler_fut(server); |
| pin_mut!(handler_fut); |
| assert!(exec.run_until_stalled(&mut handler_fut).is_pending()); |
| |
| // A new client connects to bt-rfcomm.cmx. |
| let client = { |
| let client = new_client(&mut exec, profile_sender.clone()); |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| client |
| }; |
| |
| // Client decides to advertise. |
| let services = vec![ |
| bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?, |
| bredr::ServiceDefinition::try_from(&other_service_definition(Psm::new(10)))?, |
| bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?, |
| ]; |
| let n = services.len(); |
| let (_connection_stream, adv_fut) = make_advertise_request(&client, services); |
| pin_mut!(adv_fut); |
| assert!(exec.run_until_stalled(&mut adv_fut).is_pending()); |
| |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| |
| // Expect an advertise request with all n services - validate that the RFCOMM services |
| // have assigned server channels. |
| match exec.run_until_stalled(&mut upstream_requests.next()) { |
| Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { services, .. }))) => { |
| assert_eq!(services.len(), n); |
| let res = services |
| .into_iter() |
| .filter(|service| { |
| is_rfcomm_service_definition(&ServiceDefinition::try_from(service).unwrap()) |
| }) |
| .map(|service| service_def_has_assigned_server_channel(&service)) |
| .fold(true, |acc, elt| acc || elt); |
| assert!(res); |
| } |
| x => panic!("Expected advertise request, got: {:?}", x), |
| } |
| Ok(()) |
| } |
| |
| /// Tests handling two advertise requests with overlapping PSMs. The first one |
| /// should succeed and be relayed upstream. The second one should fail since it |
| /// is requesting already-allocated PSMs - the responder should be notified of the error. |
| #[test] |
| fn test_handle_advertise_requests_same_psm() -> Result<(), Error> { |
| let (mut exec, server, mut upstream_requests) = setup_server(); |
| |
| let (profile_sender, handler_fut) = setup_handler_fut(server); |
| pin_mut!(handler_fut); |
| assert!(exec.run_until_stalled(&mut handler_fut).is_pending()); |
| |
| // A new client connects to bt-rfcomm.cmx. |
| let client1 = { |
| let client = new_client(&mut exec, profile_sender.clone()); |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| client |
| }; |
| |
| // Client decides to advertise. |
| let psm = Psm::new(10); |
| let services1 = vec![ |
| bredr::ServiceDefinition::try_from(&other_service_definition(psm))?, |
| bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?, |
| ]; |
| let (_connection_stream1, adv_fut1) = make_advertise_request(&client1, services1); |
| pin_mut!(adv_fut1); |
| assert!(exec.run_until_stalled(&mut adv_fut1).is_pending()); |
| |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| |
| // Save the Advertise parameters. |
| let _adv_req1 = match exec.run_until_stalled(&mut upstream_requests.next()) { |
| Poll::Ready(Some(Ok(request))) => request, |
| x => panic!("Expected advertise request, got: {:?}", x), |
| }; |
| |
| // A different client connects to bt-rfcomm.cmx. It decides to try to advertise over same |
| // PSM. |
| let client2 = { |
| let client = new_client(&mut exec, profile_sender.clone()); |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| client |
| }; |
| let services2 = vec![ |
| bredr::ServiceDefinition::try_from(&other_service_definition(psm))?, |
| bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?, |
| ]; |
| let (_connection_stream2, adv_fut2) = make_advertise_request(&client2, services2); |
| pin_mut!(adv_fut2); |
| assert!(exec.run_until_stalled(&mut adv_fut2).is_pending()); |
| |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| |
| // Advertisement 1 is OK. Advertisement 2 should resolve immediately with an ErrorCode. |
| assert!(exec.run_until_stalled(&mut adv_fut1).is_pending()); |
| match exec.run_until_stalled(&mut adv_fut2) { |
| Poll::Ready(Ok(Err(e))) => assert_eq!(e, ErrorCode::Failed), |
| x => panic!("Expected Ready with ErrorCode but got {:?}", x), |
| } |
| |
| Ok(()) |
| } |
| |
| /// Tests that independent service advertisements from multiple clients are correctly |
| /// grouped together and re-registered. |
| #[test] |
| fn test_handle_multiple_service_advertisements() -> Result<(), Error> { |
| let (mut exec, server, mut upstream_requests) = setup_server(); |
| |
| let (profile_sender, handler_fut) = setup_handler_fut(server); |
| pin_mut!(handler_fut); |
| assert!(exec.run_until_stalled(&mut handler_fut).is_pending()); |
| |
| // A new client connects to bt-rfcomm.cmx. |
| let client1 = { |
| let client = new_client(&mut exec, profile_sender.clone()); |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| client |
| }; |
| |
| // Client decides to advertise. |
| let psm1 = Psm::new(10); |
| let services1 = vec![ |
| bredr::ServiceDefinition::try_from(&other_service_definition(psm1))?, |
| bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?, |
| ]; |
| let n1 = services1.len(); |
| let (_connection_stream1, adv_fut1) = make_advertise_request(&client1, services1); |
| pin_mut!(adv_fut1); |
| assert!(exec.run_until_stalled(&mut adv_fut1).is_pending()); |
| |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| |
| // First advertisement request relayed upstream. |
| let (_receiver1, responder1) = match exec.run_until_stalled(&mut upstream_requests.next()) { |
| Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { |
| receiver, responder, .. |
| }))) => (receiver, responder), |
| x => panic!("Expected advertise request, got: {:?}", x), |
| }; |
| |
| // A different client connects to bt-rfcomm.cmx. It decides to try to advertise over same |
| // PSM. |
| let client2 = { |
| let client = new_client(&mut exec, profile_sender.clone()); |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| client |
| }; |
| // Client 2 decides to advertise three services. |
| let psm2 = Psm::new(15); |
| let n2 = 3; |
| let services2 = vec![ |
| bredr::ServiceDefinition::try_from(&other_service_definition(psm2))?, |
| bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?, |
| bredr::ServiceDefinition::try_from(&rfcomm_service_definition(None))?, |
| ]; |
| let (_connection_stream2, adv_fut2) = make_advertise_request(&client2, services2); |
| pin_mut!(adv_fut2); |
| assert!(exec.run_until_stalled(&mut adv_fut2).is_pending()); |
| |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| |
| // We expect ProfileRegistrar to unregister the current active advertisement. Respond to |
| // the unregister request by responding over the responder. |
| let _ = responder1.send(&mut Ok(())); |
| let _ = exec.run_until_stalled(&mut handler_fut); |
| let _ = exec.run_until_stalled(&mut futures::future::pending::<()>()); |
| |
| // We expect a new advertisement upstream. |
| let (_receiver2, _responder2) = match exec.run_until_stalled(&mut upstream_requests.next()) |
| { |
| Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { |
| services, |
| receiver, |
| responder, |
| .. |
| }))) => { |
| assert_eq!(services.len(), n1 + n2); |
| (receiver, responder) |
| } |
| x => panic!("Expected advertise request, got: {:?}", x), |
| }; |
| assert!(exec.run_until_stalled(&mut adv_fut1).is_pending()); |
| assert!(exec.run_until_stalled(&mut adv_fut2).is_pending()); |
| |
| Ok(()) |
| } |
| |
| /// This test validates that client Search requests are relayed directly upstream. |
| #[test] |
| fn test_handle_search_request() { |
| let (mut exec, mut server, mut profile_requests) = setup_server(); |
| |
| let (search_request, _stream) = generate_search_request(&mut exec); |
| |
| let handle_fut = server.handle_profile_request(search_request); |
| pin_mut!(handle_fut); |
| |
| assert!(exec.run_until_stalled(&mut handle_fut).is_ready()); |
| |
| // The search request should be relayed directly to the Profile Server. |
| match exec.run_until_stalled(&mut profile_requests.next()) { |
| Poll::Ready(Some(Ok(bredr::ProfileRequest::Search { .. }))) => {} |
| x => panic!("Expected search request, got: {:?}", x), |
| } |
| } |
| |
| /// This test validates that client ConnectSco requests are relayed directly upstream. |
| #[test] |
| fn test_handle_connect_sco_request() { |
| let (mut exec, mut server, mut profile_requests) = setup_server(); |
| |
| let (connect_sco_request, _receiver_server) = generate_connect_sco_request(&mut exec); |
| |
| let handle_fut = server.handle_profile_request(connect_sco_request); |
| pin_mut!(handle_fut); |
| |
| assert!(exec.run_until_stalled(&mut handle_fut).is_ready()); |
| |
| // The connect request should be relayed directly to the Profile Server. |
| match exec.run_until_stalled(&mut profile_requests.next()) { |
| Poll::Ready(Some(Ok(bredr::ProfileRequest::ConnectSco { .. }))) => {} |
| x => panic!("Expected search request, got: {:?}", x), |
| } |
| } |
| |
| /// This tests validates that 1) The call to Profile.Advertise correctly registers upstream and |
| /// 2) When the call resolves, the Future resolves. |
| #[test] |
| fn test_advertise_relay() { |
| let mut exec = fasync::Executor::new().unwrap(); |
| let (upstream, mut upstream_server) = |
| create_proxy_and_stream::<bredr::ProfileMarker>().unwrap(); |
| let (connect_client, _connect_requests) = |
| create_request_stream::<bredr::ConnectionReceiverMarker>().unwrap(); |
| let params = AdvertiseParams { services: vec![], parameters: ChannelParameters::default() }; |
| |
| let advertise_fut = ProfileRegistrar::advertise(upstream, params, connect_client); |
| pin_mut!(advertise_fut); |
| assert!(exec.run_until_stalled(&mut advertise_fut).is_pending()); |
| |
| let (_connection_receiver, responder) = match exec |
| .run_until_stalled(&mut upstream_server.next()) |
| { |
| Poll::Ready(Some(Ok(bredr::ProfileRequest::Advertise { |
| receiver, responder, .. |
| }))) => (receiver, responder), |
| x => panic!("Expected Advertise request but got: {:?}", x), |
| }; |
| |
| assert!(exec.run_until_stalled(&mut advertise_fut).is_pending()); |
| |
| // Upstream server decides to terminate advertisement - we expect the Future to finish. |
| let _ = responder.send(&mut Ok(())); |
| assert!(exec.run_until_stalled(&mut advertise_fut).is_ready()); |
| } |
| |
| /// This test validates that incoming connection requests are correctly relayed |
| /// to the Sender of the connection task. |
| #[test] |
| fn test_connection_request_relay() { |
| let mut exec = fasync::Executor::new().unwrap(); |
| |
| let (connect_client, connect_requests) = |
| create_proxy_and_stream::<bredr::ConnectionReceiverMarker>().unwrap(); |
| let (sender, mut receiver) = mpsc::channel(0); |
| |
| let relay_fut = ProfileRegistrar::connection_request_relay(connect_requests, sender); |
| pin_mut!(relay_fut); |
| |
| let receiver_fut = receiver.next(); |
| pin_mut!(receiver_fut); |
| |
| // The task should still be active and no messages sent. |
| assert!(exec.run_until_stalled(&mut relay_fut).is_pending()); |
| assert!(exec.run_until_stalled(&mut receiver_fut).is_pending()); |
| |
| // Upstream server gives us a connection. |
| let id = PeerId(123); |
| let mut protocol = vec![]; |
| assert!(connect_client |
| .connected(&mut id.into(), bredr::Channel::new_empty(), &mut protocol.iter_mut()) |
| .is_ok()); |
| |
| // Run the relay fut - should still be running. |
| assert!(exec.run_until_stalled(&mut relay_fut).is_pending()); |
| |
| // The relay should've sent the ConnectionEvent to the receiver. |
| match exec.run_until_stalled(&mut receiver_fut) { |
| Poll::Ready(Some(ConnectionEvent::Request( |
| bredr::ConnectionReceiverRequest::Connected { .. }, |
| ))) => {} |
| x => panic!("Expected connection request but got {:?}", x), |
| } |
| |
| // Upstream drops ConnectionReceiver client for some reason. |
| drop(connect_client); |
| |
| // The relay should notify the sender that the stream has terminated (i.e relay Canceled). |
| assert!(exec.run_until_stalled(&mut relay_fut).is_pending()); |
| match exec.run_until_stalled(&mut receiver_fut) { |
| Poll::Ready(Some(ConnectionEvent::AdvertisementCanceled)) => {} |
| x => panic!("Expected Canceled but got: {:?}", x), |
| } |
| |
| // Relay should be finished since the channel is closed. |
| let _ = exec.run_until_stalled(&mut futures::future::pending::<()>()); |
| assert!(exec.run_until_stalled(&mut relay_fut).is_ready()); |
| } |
| } |