blob: 92f470b647d1a105e709833463fdea96c9bd7910 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{format_err, Result};
use async_helpers::maybe_stream::MaybeStream;
use fidl::endpoints::{ControlHandle, RequestStream, Responder};
use fuchsia_bluetooth::profile::ProtocolDescriptor;
use fuchsia_bluetooth::types::PeerId;
use futures::stream::{FusedStream, FuturesUnordered};
use futures::{select, FutureExt, StreamExt};
use profile_client::{ProfileClient, ProfileEvent};
use std::collections::HashMap;
use std::future::Future;
use tracing::{debug, info, warn};
use {
fidl_fuchsia_bluetooth_bredr as bredr, fidl_fuchsia_bluetooth_hfp as fidl_hfp,
fuchsia_async as fasync, fuchsia_zircon as zx,
};
use crate::config::HandsFreeFeatureSupport;
use crate::peer::Peer;
#[cfg(test)]
mod tests;
pub const SEARCH_RESULT_CONNECT_DELAY_SECONDS: i64 = 1;
const SEARCH_RESULT_CONNECT_DELAY_DURATION: fasync::Duration =
fasync::Duration::from_seconds(SEARCH_RESULT_CONNECT_DELAY_SECONDS);
type SearchResultTimer =
Box<dyn Future<Output = (PeerId, Option<Vec<ProtocolDescriptor>>)> + Unpin>;
/// Toplevel struct containing the streams of incoming events that are not specific to a single
/// peer.
///
/// The Stream of incoming HandsFree FIDL protocol connections should be instantiated as a
/// ServiceFs for non-test code or with another stream implementation for testing.
pub struct Hfp<S>
where
S: FusedStream<Item = fidl_hfp::HandsFreeRequestStream> + Unpin + 'static,
{
/// Configuration for which HF features we support.
features: HandsFreeFeatureSupport,
/// Provides Hfp with a means to drive the `fuchsia.bluetooth.bredr` related APIs.
profile_client: ProfileClient,
/// The client connection to the `fuchsia.bluetooth.bredr.Profile` protocol.
profile_proxy: bredr::ProfileProxy,
/// Timers for asynchronously handling search result profile events.
search_result_timers: FuturesUnordered<SearchResultTimer>,
/// A stream of incoming HandsFree FIDL protocol connections. This should be instantiated as a
/// ServiceFs for live code and as some other stream for testing.
hands_free_connection_stream: S,
/// Stream of incoming HandsFree FIDL protocol Requests
hands_free_request_maybe_stream: MaybeStream<fidl_hfp::HandsFreeRequestStream>,
/// A collection of discovered and/or connected Bluetooth peers that support the AG role.
// TODO(https://fxbug.dev/42082435) Convert this to a FutureMap and await peer tasks finishing and clean up.
peers: HashMap<PeerId, Peer>,
// TODO(fxb/127364) Update HangingGet with peer, and delete this which just keeps the proxy
// around to make tests pass.
peer_handler_proxies: Vec<fidl_hfp::PeerHandlerProxy>,
}
impl<S> Hfp<S>
where
S: FusedStream<Item = fidl_hfp::HandsFreeRequestStream> + Unpin + 'static,
{
pub fn new(
features: HandsFreeFeatureSupport,
profile_client: ProfileClient,
profile_proxy: bredr::ProfileProxy,
hands_free_connection_stream: S,
) -> Self {
let search_result_timers = FuturesUnordered::new();
let hands_free_connection_stream = hands_free_connection_stream;
let hands_free_request_maybe_stream = MaybeStream::default();
let peers = HashMap::new();
Self {
features,
profile_client,
profile_proxy,
hands_free_connection_stream,
hands_free_request_maybe_stream,
peers,
search_result_timers,
peer_handler_proxies: Vec::new(),
}
}
/// Handle incoming profile events, HFP FIDL streams from new client connections and HandsFree
/// FIDL protocol events. This is all the incoming events that are not specific to a single
/// peer.
pub async fn run(mut self) -> Result<()> {
loop {
select! {
profile_event_result_option = self.profile_client.next() => {
debug!("Received profile event: {:?}", profile_event_result_option);
let profile_event_result = profile_event_result_option
.ok_or(format_err!("Profile client stream closed."))?;
let profile_event = profile_event_result?;
self.handle_profile_event(profile_event)?;
},
(peer_id, protocol) = self.search_result_timers.select_next_some() => {
debug!("Timer for search result from peer {} expired.", peer_id);
self.handle_search_result_timer_expiry(peer_id, protocol).await;
},
hands_free_request_stream_option = self.hands_free_connection_stream.next() => {
let stream_str = hands_free_request_stream_option
.as_ref().map(|_stream| "<stream>");
debug!("HandsFree FIDL protocol client connected: {:?}", stream_str);
let hands_free_request_stream = hands_free_request_stream_option
.ok_or(format_err!("HandsFree FIDL protocol connection stream closed."))?;
self.handle_hands_free_request_stream(hands_free_request_stream)
},
hands_free_request_option = self.hands_free_request_maybe_stream.next() => {
debug!("Received HandsFree FIDL protocol request: {:?}",
hands_free_request_option);
if let Some(Ok(hands_free_request)) = hands_free_request_option {
self.handle_hands_free_request(hands_free_request)
} else {
warn!("Dropping HandsFree FIDL protocol request stream");
let _old_stream =
MaybeStream::take(&mut self.hands_free_request_maybe_stream);
}
}
}
}
}
fn handle_profile_event(&mut self, event: ProfileEvent) -> Result<()> {
let peer_id = event.peer_id();
let peer = self
.peers
.entry(peer_id)
.or_insert_with(|| Peer::new(peer_id, self.features, self.profile_proxy.clone()));
match event {
ProfileEvent::PeerConnected { channel, .. } => {
info!("Received peer_connected for peer {}.", peer_id);
let peer_handler_proxy = peer.handle_peer_connected(channel);
self.report_peer_handler(peer_handler_proxy)
}
ProfileEvent::SearchResult { protocol, .. } => {
debug!("Received search results for peer {}", peer_id);
if peer.task_exists() {
debug!(
"Peer task already created by previous profile event for peer {}",
peer_id
);
} else {
debug!("Setting timer for peer task search results for peer {}", peer_id);
// Convert FIDL ProtocolDescriptor to BT ProtocolDescriptor.
let protocol = protocol.map_or(Ok(None), |p| {
p.iter()
.map(|p| ProtocolDescriptor::try_from(p))
.collect::<Result<Vec<_>, _>>()
.map(|p| Some(p))
})?;
let search_result_timer = Self::search_result_timer(peer_id, protocol);
self.search_result_timers.push(search_result_timer);
}
}
}
Ok(())
}
// We expect peers to connect to us. If they don't connect to us but we get
// a search result, we should connect to them. To prevent races where both
// we and the remote peer attempt to connect to the other simultaneously, we
// delay connecting after receiving a search result and see if the remote
// peer has connected first.
fn search_result_timer(
peer_id: PeerId,
protocol: Option<Vec<ProtocolDescriptor>>,
) -> SearchResultTimer {
let time = fasync::Time::after(SEARCH_RESULT_CONNECT_DELAY_DURATION);
let timer = fasync::Timer::new(time);
let fut = FutureExt::map(timer, move |_| (peer_id, protocol));
Box::new(fut)
}
async fn handle_search_result_timer_expiry(
&mut self,
peer_id: PeerId,
protocol: Option<Vec<ProtocolDescriptor>>,
) {
debug!("Handle search results timer expired for peer {:?}", peer_id);
let peer_result = self.peers.get_mut(&peer_id);
let peer_handler_proxy_result = match peer_result {
None => {
info!("Peer task for peer {} completed before handling search result.", peer_id);
Ok(None)
}
Some(peer) => peer.handle_search_result(protocol).await,
};
let peer_handler_proxy_option = match peer_handler_proxy_result {
Ok(proxy) => proxy,
Err(err) => {
// An error handling one peer should not be a fatal error.
warn!("Error handling search result timer expiry for peer {:}: {:?}", peer_id, err);
let _removed_peer = self.peers.remove(&peer_id);
return; // Early return.
}
};
if let Some(peer_handler_proxy) = peer_handler_proxy_option {
self.report_peer_handler(peer_handler_proxy);
}
}
/// Report the PeerHandlerProxy og a newly connected peer to the FIDL client of th HFP protocol
fn report_peer_handler(&mut self, peer_handler_proxy: fidl_hfp::PeerHandlerProxy) {
// TODO(fxb/127364) Update HangingGet with peer. Make sure to set the new
// PeerProxy on the peer. Be careful of races between the new PeerProxy and any
// old ones.
//
// For now, just keep these around to prevent test failures caused by closing
// streams.
self.peer_handler_proxies.push(peer_handler_proxy);
}
fn handle_hands_free_request_stream(&mut self, stream: fidl_hfp::HandsFreeRequestStream) {
if self.hands_free_request_maybe_stream.is_some() {
info!("Got new HandsFree request stream while one already exists. Closing the new stream.");
let control_handle = stream.control_handle();
control_handle.shutdown_with_epitaph(zx::Status::ALREADY_BOUND);
} else {
self.hands_free_request_maybe_stream.set(stream);
// TODO(fxb/127364) Update HangingGet with all peers. Make sure to set the new PeerProxy
// on each peer. Be careful of races between the new PeerProxy and any old ones
}
}
fn handle_hands_free_request(&mut self, request: fidl_hfp::HandsFreeRequest) {
let fidl_hfp::HandsFreeRequest::WatchPeerConnected { responder } = request;
// TODO(fxb/127364) Update HangingGet with new subscriber.
// Handle FIDL calls here.
responder.drop_without_shutdown();
}
}