blob: 44b430669360c613b6ccdbd8decbf15e50191c5c [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::Context as _,
core::{
convert::TryInto,
pin::Pin,
task::{Context, Poll},
},
fidl::{client::QueryResponseFut, encoding::Decodable, endpoints::create_request_stream},
fidl_fuchsia_bluetooth_bredr as bredr,
fuchsia_bluetooth::{
error::Error as BtError,
profile::{Attribute, ProtocolDescriptor},
types::{Channel, PeerId},
},
futures::{
future::{FusedFuture, FutureExt},
stream::{FusedStream, Stream, StreamExt},
},
log::trace,
};
use crate::{
config::AudioGatewayFeatureSupport,
error::{AdvertisementTerminated, Error},
service_definitions,
};
/// Possible items produced by `Profile` when used as a `Stream`.
#[derive(Debug)]
pub enum ProfileEvent {
/// A Bluetooth peer has requested a new HFP connection with the specified protocol
/// descriptor information.
ConnectionRequest { id: PeerId, protocol: Vec<ProtocolDescriptor>, channel: Channel },
/// A Bluetooth peer has SDP records that meet the Profile's search for HFP Handsfree devices.
SearchResult {
id: PeerId,
protocol: Option<Vec<ProtocolDescriptor>>,
attributes: Vec<Attribute>,
},
}
impl ProfileEvent {
/// Return the PeerId associated with this event.
pub fn peer_id(&self) -> PeerId {
match self {
Self::ConnectionRequest { id, .. } => *id,
Self::SearchResult { id, .. } => *id,
}
}
}
/// Represents a BR/EDR based Bluetooth HFP profile which is advertising as a specific role and
/// searching for the complimentary role.
///
/// `Profile` implements the `Stream` trait and yields `ProfileEvent` items. Errors are yielded
/// when `Profile` fails to register a search request, fails to register an advertisement, when the
/// advertisement is terminated, or any channel to the bredr service returns an error.
pub struct Profile {
proxy: bredr::ProfileProxy,
advertisement: QueryResponseFut<bredr::ProfileAdvertiseResult>,
connect_requests: bredr::ConnectionReceiverRequestStream,
search_results: bredr::SearchResultsRequestStream,
terminated: bool,
}
impl Profile {
/// Register as an Audio Gateway using the provided config. Constructing and returning
/// a `Profile` on success.
pub fn register_audio_gateway(features: AudioGatewayFeatureSupport) -> anyhow::Result<Self> {
let proxy = fuchsia_component::client::connect_to_service::<bredr::ProfileMarker>()
.context("Failed to connect to Bluetooth Profile service")?;
Self::register(
proxy,
service_definitions::audio_gateway(features),
bredr::ServiceClassProfileIdentifier::Handsfree,
)
}
/// Return a clone of the associated `ProfileProxy`.
pub fn proxy(&self) -> bredr::ProfileProxy {
self.proxy.clone()
}
/// Advertise `service` and search for `search_id` using `proxy`.
fn register(
proxy: bredr::ProfileProxy,
service: bredr::ServiceDefinition,
search_id: bredr::ServiceClassProfileIdentifier,
) -> anyhow::Result<Self> {
let (results_client, search_results) =
create_request_stream().context("SearchResults creation")?;
proxy.search(search_id, &[], results_client)?;
let (connect_client, connect_requests) =
create_request_stream().context("ConnectionReceiver creation")?;
let advertisement = proxy
.advertise(&mut vec![service].into_iter(), Decodable::new_empty(), connect_client)
.check()
.context("Advertise request")?;
Ok(Self { proxy, advertisement, connect_requests, search_results, terminated: false })
}
fn handle_connection_request(
request: bredr::ConnectionReceiverRequest,
) -> Result<ProfileEvent, Error> {
let bredr::ConnectionReceiverRequest::Connected { peer_id, channel, protocol, .. } =
request;
let id: PeerId = peer_id.into();
let channel: Channel = channel.try_into().map_err(Error::profile_connection_receiver)?;
let protocol: Vec<ProtocolDescriptor> =
protocol.iter().map(ProtocolDescriptor::from).collect();
trace!("Connection Request from {:?} - protocol {:#?}", peer_id, protocol);
// TODO (fxbug.dev/66592): Validate protocol
Ok(ProfileEvent::ConnectionRequest { id, channel, protocol })
}
fn handle_search_result(request: bredr::SearchResultsRequest) -> Result<ProfileEvent, Error> {
let bredr::SearchResultsRequest::ServiceFound { peer_id, protocol, attributes, responder } =
request;
let id: PeerId = peer_id.into();
responder.send().map_err(Error::profile_search_results)?;
let protocol = protocol.map(|p| p.iter().map(ProtocolDescriptor::from).collect());
let attributes = attributes.iter().map(Attribute::from).collect();
trace!("Discovered {:?} - protocol {:#?}, attributes {:#?}", peer_id, protocol, attributes);
Ok(ProfileEvent::SearchResult { id, protocol, attributes })
}
/// Check whether the stream should be terminated, marking it terminated if necessary and
/// returning the termination state.
fn check_for_stream_termination(&mut self) -> bool {
if self.connect_requests.is_terminated()
|| self.search_results.is_terminated()
|| self.advertisement.is_terminated()
{
self.terminated = true;
}
self.terminated
}
}
impl Stream for Profile {
type Item = Result<ProfileEvent, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// A stream must never be polled after it has been terminated.
if self.terminated {
panic!("Polling Profile after it has terminated");
}
// Check if any of the underlying streams or futures were marked terminated by the previous
// call to Profile::poll_next.
if self.check_for_stream_termination() {
return Poll::Ready(None);
}
// Check to see if the advertisement has been removed. The stream returns an error and sets
// itself terminated in this situation. It should be up to the caller decide what to do in
// this case.
if let Poll::Ready(advertise_result) = self.advertisement.poll_unpin(cx) {
let error = match advertise_result {
Ok(Ok(())) => Error::profile_advertise(AdvertisementTerminated),
Ok(Err(e)) => Error::profile_advertise(BtError::from(e)),
Err(e) => Error::profile_advertise(e),
};
return Poll::Ready(Some(Err(error)));
}
// Check to see if there are connection requests available via the BR/EDR
// ConnectionRequestStream.
if let Poll::Ready(r) = self.connect_requests.poll_next_unpin(cx) {
return Poll::Ready(r.map(|r| {
r.map_err(Error::profile_connection_receiver)
.and_then(Self::handle_connection_request)
}));
}
// Check to see if there are search results available via the BR/EDR
// SearchResultsRequestStream.
if let Poll::Ready(r) = self.search_results.poll_next_unpin(cx) {
return Poll::Ready(r.map(|r| {
r.map_err(Error::profile_search_results).and_then(Self::handle_search_result)
}));
}
Poll::Pending
}
}
impl FusedStream for Profile {
fn is_terminated(&self) -> bool {
self.terminated
}
}
#[cfg(test)]
pub(crate) mod test_server {
use {super::*, fidl_fuchsia_bluetooth_bredr as bredr};
/// Register a new Profile object, and create an associated test server.
pub(crate) fn setup_profile_and_test_server() -> (Profile, LocalProfileTestServer) {
let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<bredr::ProfileMarker>()
.expect("Create new profile connection");
let profile = Profile::register(
proxy,
service_definitions::audio_gateway(Default::default()),
bredr::ServiceClassProfileIdentifier::Handsfree,
)
.expect("register profile");
(profile, stream.into())
}
/// Holds all the server side resources associated with a `Profile`'s connection to
/// fuchsia.bluetooth.bredr.Profile. Provides helper methods for common test related tasks.
/// Some fields are optional because they are not populated until the Profile has completed
/// registration.
pub(crate) struct LocalProfileTestServer {
pub stream: bredr::ProfileRequestStream,
pub responder: Option<bredr::ProfileAdvertiseResponder>,
pub receiver: Option<bredr::ConnectionReceiverProxy>,
pub results: Option<bredr::SearchResultsProxy>,
}
impl From<bredr::ProfileRequestStream> for LocalProfileTestServer {
fn from(stream: bredr::ProfileRequestStream) -> Self {
Self { stream, responder: None, receiver: None, results: None }
}
}
impl LocalProfileTestServer {
pub fn receiver(&self) -> &bredr::ConnectionReceiverProxy {
self.receiver.as_ref().expect("receiver to be present")
}
pub fn results(&self) -> &bredr::SearchResultsProxy {
self.results.as_ref().expect("results to be present")
}
/// Returns true if the `Profile` has registered an `Advertise` and `Search` request.
fn is_registration_complete(&self) -> bool {
self.responder.is_some() && self.receiver.is_some() && self.results.is_some()
}
/// Run through the registration process of a new `Profile`.
pub async fn complete_registration(&mut self) {
while let Some(request) = self.stream.next().await {
match request {
Ok(bredr::ProfileRequest::Advertise { receiver, responder, .. }) => {
if self.is_registration_complete() {
panic!("unexpected second advertise request");
}
self.responder = Some(responder);
self.receiver = Some(receiver.into_proxy().unwrap());
if self.is_registration_complete() {
break;
}
}
Ok(bredr::ProfileRequest::Search { results, .. }) => {
if self.is_registration_complete() {
panic!("unexpected second search request");
}
self.results = Some(results.into_proxy().unwrap());
if self.is_registration_complete() {
break;
}
}
_ => panic!("unexpected result on profile request stream: {:?}", request),
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::{AdvertisementTerminated, ProfileResource};
use {
fidl_fuchsia_bluetooth as bt, fuchsia_async as fasync, futures::StreamExt,
matches::assert_matches,
};
#[fasync::run_singlethreaded(test)]
async fn registration_causes_advertisement_and_search_request() {
let (_profile, mut server) = test_server::setup_profile_and_test_server();
let server = fasync::Task::local(async move {
server.complete_registration().await;
assert!(server.responder.is_some());
assert!(server.receiver.is_some());
assert!(server.results.is_some());
});
server.await;
}
#[fasync::run_singlethreaded(test)]
async fn stream_returns_events() {
let (mut profile, mut server) = test_server::setup_profile_and_test_server();
let _server = fasync::Task::local(async move {
// First, register the profile.
server.complete_registration().await;
// Second, send a search results event.
server
.results()
.service_found(&mut bt::PeerId { value: 1 }, None, &mut vec![].iter_mut())
.await
.expect("successful request to search results receiver");
// Third, send a connection request event.
let (_left, right) = fuchsia_bluetooth::types::Channel::create();
server
.receiver()
.connected(
&mut bt::PeerId { value: 1 },
right.try_into().expect("valid channel"),
&mut vec![].iter_mut(),
)
.expect("successful request to connection receiver");
// This will be pending forever since the server does not expect any further messages in
// this test.
server.stream.next().await;
});
// The profile server task first sends a search result.
let event = profile.next().await;
assert_matches!(event, Some(Ok(ProfileEvent::SearchResult { .. })));
// The profile server task then sends a connection request.
let event = profile.next().await;
assert_matches!(event, Some(Ok(ProfileEvent::ConnectionRequest { .. })));
// Profile is waiting on futher events.
let event = profile.next().now_or_never();
assert_matches!(event, None);
}
#[fasync::run_singlethreaded(test)]
async fn stream_end_of_advertise_returns_error() {
let (mut profile, mut server) = test_server::setup_profile_and_test_server();
let _server = fasync::Task::local(async move {
server.complete_registration().await;
server.responder.take().expect("responder to be present").send(&mut Ok(())).unwrap();
});
// The profile server task then sends a connection request.
let event = profile.next().await;
if let Some(Err(Error::ProfileResourceError {
resource: ProfileResource::Advertise,
source,
})) = event
{
assert!(
source.is::<AdvertisementTerminated>(),
"unexpected error polling Profile Stream"
);
} else {
panic!("unexpected result from Profile stream: {:?}", event);
}
// After an error, the stream will return None and terminate.
let event = profile.next().await;
assert_matches!(event, None);
assert!(profile.is_terminated());
}
#[fasync::run_singlethreaded(test)]
async fn stream_advertise_error_returns_error() {
let (mut profile, mut server) = test_server::setup_profile_and_test_server();
let _server = fasync::Task::local(async move {
server.complete_registration().await;
server
.responder
.take()
.expect("responder to be present")
.send(&mut Err(bt::ErrorCode::Already))
.unwrap();
});
// The profile server task then sends a connection request.
let event = profile.next().await;
if let Some(Err(Error::ProfileResourceError {
resource: ProfileResource::Advertise,
source,
})) = event
{
assert!(source.is::<BtError>(), "unexpected error polling Profile Stream");
} else {
panic!("unexpected result from Profile stream: {:?}", event);
}
// After an error, the stream will return None and terminate.
let event = profile.next().await;
assert_matches!(event, None);
assert!(profile.is_terminated());
}
#[fasync::run_singlethreaded(test)]
async fn stream_terminates_on_server_disconnect() {
let (mut profile, mut server) = test_server::setup_profile_and_test_server();
let server = fasync::Task::local(async move {
server.complete_registration().await;
// Send an event to check that the profile is receiving events at this point
server
.results()
.service_found(&mut bt::PeerId { value: 1 }, None, &mut vec![].iter_mut())
.await
.expect("successful request to search results receiver");
});
// The profile server task first sends a search result.
let event = profile.next().await;
assert_matches!(event, Some(Ok(ProfileEvent::SearchResult { .. })));
assert!(!profile.is_terminated());
drop(server);
// The profile server task has been dropped so the event stream is closed and Profile
// return an Error followed by None.
let event = profile.next().await;
assert_matches!(event, Some(Err(_)));
assert!(!profile.is_terminated());
let event = profile.next().await;
assert_matches!(event, None);
assert!(profile.is_terminated());
}
}