blob: 3eb42b4b50016505a9b0b3b81bde992e998bfbe2 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::format_err,
async_utils::hanging_get::client::HangingGetStream,
bt_rfcomm::profile::server_channel_from_protocol,
fidl_fuchsia_bluetooth_bredr as bredr,
fidl_fuchsia_bluetooth_hfp::{NetworkInformation, PeerHandlerProxy},
fuchsia_async::Task,
fuchsia_bluetooth::{
profile::{Attribute, ProtocolDescriptor},
types::PeerId,
},
fuchsia_inspect::{self as inspect, Property},
fuchsia_inspect_derive::{AttachError, Inspect},
fuchsia_zircon as zx,
futures::{
channel::mpsc::{self, Sender},
future::{self, Either, Future},
select,
stream::{empty, Empty},
FutureExt, SinkExt, StreamExt,
},
parking_lot::Mutex,
profile_client::ProfileEvent,
std::{convert::TryInto, fmt, sync::Arc},
tracing::{error, info, warn},
vigil::{DropWatch, Vigil},
};
use super::{
calls::{Call, CallAction, Calls},
gain_control::GainControl,
indicators::{AgIndicator, AgIndicators, HfIndicator},
procedure::ProcedureMarker,
ringer::Ringer,
sco_state::{InspectableScoState, ScoActive, ScoState},
service_level_connection::ServiceLevelConnection,
slc_request::SlcRequest,
update::AgUpdate,
ConnectionBehavior, PeerRequest,
};
use crate::{
a2dp,
audio::AudioControl,
config::AudioGatewayFeatureSupport,
error::Error,
features::CodecId,
hfp,
inspect::PeerTaskInspect,
sco_connector::{ScoConnection, ScoConnector},
};
const CONNECTION_INIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const DEFAULT_CODECS: &[CodecId] = &[CodecId::CVSD];
pub(super) struct PeerTask {
id: PeerId,
_local_config: AudioGatewayFeatureSupport,
connection_behavior: ConnectionBehavior,
profile_proxy: bredr::ProfileProxy,
handler: Option<PeerHandlerProxy>,
network: NetworkInformation,
network_updates: Either<
HangingGetStream<PeerHandlerProxy, NetworkInformation>,
Empty<Result<NetworkInformation, fidl::Error>>,
>,
battery_level: u8,
calls: Calls,
gain_control: GainControl,
connection: ServiceLevelConnection,
a2dp_control: a2dp::Control,
sco_connector: ScoConnector,
sco_state: InspectableScoState,
ringer: Ringer,
audio_control: Arc<Mutex<Box<dyn AudioControl>>>,
hfp_sender: Sender<hfp::Event>,
manager_id: Option<hfp::ManagerConnectionId>,
inspect: PeerTaskInspect,
}
impl Inspect for &mut PeerTask {
fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
let _ = self.inspect.iattach(parent, name)?;
self.sco_state.iattach(self.inspect.node(), "sco_connection")?;
Ok(())
}
}
impl PeerTask {
pub fn new(
id: PeerId,
profile_proxy: bredr::ProfileProxy,
audio_control: Arc<Mutex<Box<dyn AudioControl>>>,
local_config: AudioGatewayFeatureSupport,
connection_behavior: ConnectionBehavior,
hfp_sender: Sender<hfp::Event>,
) -> Result<Self, Error> {
let connection = ServiceLevelConnection::with_init_timeout(fuchsia_async::Timer::new(
CONNECTION_INIT_TIMEOUT,
));
let sco_connector = ScoConnector::build(profile_proxy.clone());
let a2dp_control = a2dp::Control::connect();
Ok(Self {
id,
_local_config: local_config,
connection_behavior,
profile_proxy,
handler: None,
network: NetworkInformation::EMPTY,
network_updates: empty().right_stream(),
// Default to a battery level of 5 (max). This will be updated when we receive
// a battery information update.
battery_level: 5,
calls: Calls::new(None),
gain_control: GainControl::new()?,
connection,
a2dp_control,
sco_connector,
sco_state: InspectableScoState::default(),
ringer: Ringer::default(),
audio_control,
hfp_sender,
manager_id: None,
inspect: PeerTaskInspect::new(id),
})
}
pub fn spawn(
id: PeerId,
profile_proxy: bredr::ProfileProxy,
audio_control: Arc<Mutex<Box<dyn AudioControl>>>,
local_config: AudioGatewayFeatureSupport,
connection_behavior: ConnectionBehavior,
hfp_sender: Sender<hfp::Event>,
inspect: &inspect::Node,
) -> Result<(Task<()>, Sender<PeerRequest>), Error> {
let (sender, receiver) = mpsc::channel(0);
let mut peer = Self::new(
id,
profile_proxy,
audio_control,
local_config,
connection_behavior,
hfp_sender,
)?;
if let Err(e) = peer.iattach(inspect, "task") {
warn!("Failed to attach PeerTaskInspect to provided inspect node: {}", e)
}
let task = Task::local(peer.run(receiver).map(|_| ()));
Ok((task, sender))
}
fn set_handler(&mut self, handler: Option<PeerHandlerProxy>) {
self.handler = handler;
self.inspect.connected_peer_handler.set(self.handler.is_some());
}
/// Always give preference to connection requests received from the peer device.
async fn on_connection_request(
&mut self,
_protocol: Vec<ProtocolDescriptor>,
channel: fuchsia_bluetooth::types::Channel,
) -> Result<(), Error> {
if self.connection.connected() {
info!("Overwriting existing connection to {}", self.id);
} else {
info!("Connection request from {}", self.id);
}
self.connection.connect(channel);
if let Err(e) = self.connection.iattach(self.inspect.node(), "service_level_connection") {
warn!("Failed to attach ServiceLevelConnection to PeerTaskInspect: {}", e)
}
if let Some(id) = self.manager_id {
self.notify_peer_connected(id).await;
self.setup_handler().await?;
}
Ok(())
}
/// Make a connection to the peer only if there is not an existing connection.
async fn connect(&mut self, mut params: bredr::ConnectParameters) -> Result<(), anyhow::Error> {
if self.connection.connected() {
info!("Already connected to peer: {}.", self.id);
return Ok(());
}
info!("Initiating connection to peer: {}", self.id);
let channel = self
.profile_proxy
.connect(&mut self.id.into(), &mut params)
.await?
.map_err(|e| format_err!("Profile connection request error: {:?}", e))?;
self.connection.connect(channel.try_into()?);
if let Err(e) = self.connection.iattach(self.inspect.node(), "service_level_connection") {
warn!("Failed to attach ServiceLevelConnection to PeerTaskInspect: {}", e)
}
if let Some(id) = self.manager_id {
self.notify_peer_connected(id).await;
self.setup_handler()
.await
.map_err(|e| format_err!("Error setting up peer handler: {}", e))?;
}
Ok(())
}
async fn on_search_result(
&mut self,
protocol: Option<Vec<ProtocolDescriptor>>,
_attributes: Vec<Attribute>,
) {
info!("Search results received for {}", self.id);
let server_channel = match protocol.as_ref().map(server_channel_from_protocol) {
Some(Some(sc)) => sc,
_ => {
info!("Search result received for non-RFCOMM protocol: {:?}", protocol);
return;
}
};
let params = bredr::ConnectParameters::Rfcomm(bredr::RfcommParameters {
channel: Some(server_channel.into()),
..bredr::RfcommParameters::EMPTY
});
if self.connection_behavior.autoconnect {
if let Err(e) = self.connect(params).await {
info!("Error inititating connecting to peer {}: {:?}", self.id, e);
}
}
}
async fn notify_peer_connected(&mut self, manager_id: hfp::ManagerConnectionId) {
let (proxy, handle) =
fidl::endpoints::create_proxy().expect("Cannot create required fidl handle");
self.hfp_sender
.send(hfp::Event::PeerConnected { peer_id: self.id, manager_id, handle })
.await
.expect("Cannot communicate with main Hfp task");
self.set_handler(Some(proxy));
}
async fn setup_handler(&mut self) -> Result<(), Error> {
info!("Got request to handle peer {} headset using handler", self.id);
if let Some(handler) = self.handler.clone() {
// Getting the network information the first time should always return a complete table.
// If the call returns an error, do not set up the handler.
let info = match handler.watch_network_information().await {
Ok(info) => info,
Err(fidl::Error::ClientChannelClosed {
status: zx::Status::PEER_CLOSED, ..
}) => {
return Ok(());
}
Err(e) => {
warn!("Error handling peer request: {}", e);
return Ok(());
}
};
self.handle_network_update(info).await;
let client_end = self.gain_control.get_client_end()?;
if let Err(e) = handler.gain_control(client_end) {
warn!("Error setting gain control for peer {}: {}", self.id, e);
return Ok(());
}
self.calls = Calls::new(Some(handler.clone()));
if let Err(e) = self.calls.iattach(self.inspect.node(), "calls") {
warn!("Failed to attach Calls to PeerTaskInspect: {}", e)
}
self.create_network_updates_stream(handler);
}
Ok(())
}
/// A new call manager is connected. The PeerTask should
async fn on_manager_connected(&mut self, id: hfp::ManagerConnectionId) -> Result<(), Error> {
self.manager_id = Some(id);
if !self.connection.connected() {
// If there is no peer connection, there should not be a handler.
self.set_handler(None);
return Ok(());
}
self.notify_peer_connected(id).await;
self.setup_handler().await?;
Ok(())
}
/// When a new handler is received, the state is not known. It might be stale because the
/// system is asynchronous and the PeerHandler connection has two sides. The handler will only
/// be stored if all the associated fidl calls are successful. Otherwise it is dropped without
/// being set.
async fn on_peer_handler(&mut self, handler: PeerHandlerProxy) -> Result<(), Error> {
info!("Got request to handle peer {} headset using handler", self.id);
// Getting the network information the first time should always return a complete table.
// If the call returns an error, do not set up the handler.
let info = match handler.watch_network_information().await {
Ok(info) => info,
Err(fidl::Error::ClientChannelClosed { status: zx::Status::PEER_CLOSED, .. }) => {
return Ok(());
}
Err(e) => {
warn!("Error handling peer request: {}", e);
return Ok(());
}
};
self.handle_network_update(info).await;
let client_end = self.gain_control.get_client_end()?;
if let Err(e) = handler.gain_control(client_end) {
warn!("Error setting gain control for peer {}: {}", self.id, e);
return Ok(());
}
self.calls = Calls::new(Some(handler.clone()));
if let Err(e) = self.calls.iattach(self.inspect.node(), "calls") {
warn!("Failed to attach Calls to PeerTaskInspect: {}", e)
}
self.create_network_updates_stream(handler.clone());
self.set_handler(Some(handler));
Ok(())
}
/// Set a new HangingGetStream to watch for network information updates.
fn create_network_updates_stream(&mut self, handler: PeerHandlerProxy) {
self.network_updates =
HangingGetStream::new_with_fn_ptr(handler, PeerHandlerProxy::watch_network_information)
.left_stream();
}
async fn peer_request(&mut self, request: PeerRequest) -> Result<(), Error> {
match request {
PeerRequest::Profile(ProfileEvent::PeerConnected { protocol, channel, id: _ }) => {
let protocol = protocol.iter().map(ProtocolDescriptor::from).collect();
self.on_connection_request(protocol, channel).await?;
}
PeerRequest::Profile(ProfileEvent::SearchResult { protocol, attributes, id: _ }) => {
let protocol = protocol.map(|p| p.iter().map(ProtocolDescriptor::from).collect());
let attributes = attributes.iter().map(Attribute::from).collect();
self.on_search_result(protocol, attributes).await;
}
PeerRequest::ManagerConnected { id } => self.on_manager_connected(id).await?,
PeerRequest::Handle(handler) => self.on_peer_handler(handler).await?,
PeerRequest::BatteryLevel(level) => {
self.battery_level = level;
let status = AgIndicator::BatteryLevel(self.battery_level);
self.phone_status_update(status).await;
}
PeerRequest::Behavior(behavior) => {
self.connection_behavior = behavior;
}
}
Ok(())
}
/// Processes a `request` for information from an HFP procedure.
async fn procedure_request(&mut self, request: SlcRequest) {
info!("HF procedure request ({}): {:?}", self.id, request);
let marker = (&request).into();
match request {
SlcRequest::GetAgFeatures { response } => {
let features = (&self._local_config).into();
// Update the procedure with the retrieved AG update.
self.connection.receive_ag_request(marker, response(features)).await;
}
SlcRequest::GetSubscriberNumberInformation { response } => {
let result = if let Some(handler) = &mut self.handler {
handler.subscriber_number_information().await.ok().unwrap_or_else(Vec::new)
} else {
vec![]
};
self.connection.receive_ag_request(marker, response(result)).await;
}
SlcRequest::GetAgIndicatorStatus { response } => {
let call_ind = self.calls.indicators();
let status = AgIndicators {
service: self.network.service_available.unwrap_or(false),
call: call_ind.call,
callsetup: call_ind.callsetup,
callheld: call_ind.callheld,
signal: self.network.signal_strength.map(|ss| ss as u8).unwrap_or(0),
roam: self.network.roaming.unwrap_or(false),
battchg: self.battery_level,
};
// Update the procedure with the retrieved AG update.
self.connection.receive_ag_request(marker, response(status)).await;
}
SlcRequest::GetNetworkOperatorName { response } => {
let format = self.connection.network_operator_name_format();
let name = match &self.handler {
Some(h) => {
let result = h.query_operator().await;
if let Err(err) = &result {
warn!(
"Got error attempting to retrieve operator name from AG: {:} for peer {:}",
err, self.id
);
};
result.ok().flatten()
}
None => None,
};
let name_option = match (name, format) {
(Some(n), Some(_)) => Some(n),
_ => None, // The format must be set before getting the network name.
};
// Update the procedure with the result of retrieving the AG network name.
self.connection.receive_ag_request(marker, response(name_option)).await;
}
SlcRequest::SendDtmf { code, response } => {
let result = self.calls.send_dtmf_code(code).await;
self.connection.receive_ag_request(marker, response(result)).await;
}
SlcRequest::SendHfIndicator { indicator, response } => {
self.hf_indicator_update(indicator);
self.connection.receive_ag_request(marker, response()).await;
}
SlcRequest::SetNrec { enable, response } => {
let result = if let Some(handler) = &mut self.handler {
if let Ok(Ok(())) = handler.set_nrec_mode(enable).await {
Ok(())
} else {
Err(())
}
} else {
Err(())
};
self.connection.receive_ag_request(marker, response(result)).await;
}
SlcRequest::SpeakerVolumeSynchronization { level, response } => {
self.gain_control.report_speaker_gain(level);
self.connection.receive_ag_request(marker, response()).await;
}
SlcRequest::MicrophoneVolumeSynchronization { level, response } => {
self.gain_control.report_microphone_gain(level);
self.connection.receive_ag_request(marker, response()).await;
}
SlcRequest::QueryCurrentCalls { response } => {
let result = self.calls.current_calls();
self.connection.receive_ag_request(marker, response(result)).await;
}
SlcRequest::Answer { response } => {
let result = self.calls.answer().map_err(|e| {
warn!("Unexpected Answer from Hands Free for peer {}: {}", self.id, e);
});
self.connection.receive_ag_request(marker, response(result)).await;
}
SlcRequest::HangUp { response } => {
let result = self.calls.hang_up().map_err(|e| {
warn!("Unexpected Hang Up from Hands Free for peer {}: {}", self.id, e);
});
self.connection.receive_ag_request(marker, response(result)).await;
}
SlcRequest::Hold { command, response } => {
let result = self.calls.hold(command).map_err(|e| {
warn!(
"Unexpected Action {:?} from Hands Free for peer {}: {}",
command, self.id, e
);
});
self.connection.receive_ag_request(marker, response(result)).await;
}
SlcRequest::InitiateCall { call_action, response } => {
let result = self.handle_initiate_call(call_action).await;
self.connection.receive_ag_request(marker, response(result)).await;
}
SlcRequest::SynchronousConnectionSetup { response } => {
if self.sco_state.is_active() {
info!("Got SCO setup request when SCO state was active");
// Drop existing SCO connection.
self.sco_state.iset(ScoState::SettingUp);
}
// TODO(fxbug.dev/72681): Because we may need to send an OK response to the HF
// just before setting up the synchronous connection, we send it here by routing
// through the procedure.
self.connection.receive_ag_request(marker, AgUpdate::Ok).await;
let codecs = self.get_codecs();
let setup_result = self.sco_connector.connect(self.id.clone(), codecs).await;
let finish_result = match setup_result {
Ok(conn) => self.finish_sco_connection(conn).await,
Err(err) => Err(err.into()),
};
let result =
finish_result.map_err(|e| warn!("Error setting up audio connection: {:?}", e));
self.connection.receive_ag_request(marker, response(result)).await;
}
SlcRequest::RestartCodecConnectionSetup { response } => {
self.connection.receive_ag_request(marker, response()).await;
// Start CodecConnectionSetup running again.
self.initiate_codec_negotiation().await;
}
};
}
pub async fn handle_initiate_call(&mut self, call_action: CallAction) -> Result<(), ()> {
let three_way_calling = self.connection.three_way_calling();
let call_active = self.calls.is_call_active();
let handler = self.handler.as_ref().ok_or(())?;
if call_active && !three_way_calling {
warn!("Attempting to initiate unsupported outgoing call during active call.");
return Err(());
}
if call_active && three_way_calling {
// Hold calls, and return Err(_) if it fails.
self.calls.hold_active().map_err(|e| {
warn!(
"Failed to hold active call when making outgoing call for peer {}: {}",
self.id, e
)
})?;
}
match handler.request_outgoing_call(&mut call_action.into()).await {
Ok(Ok(())) => Ok(()),
err => {
warn!("Error initiating outgoing call for peer {}: {:?}", self.id, err);
Err(())
}
}
}
pub async fn run(mut self, mut task_channel: mpsc::Receiver<PeerRequest>) -> Self {
loop {
let mut active_sco_closed_fut = self.on_active_sco_closed().fuse();
info!("Beginning select for peer {:?}, SCO state is {:?}", self.id, self.sco_state);
let mut sco_state = self.sco_state.as_mut();
select! {
// Wait until the HF sets up a SCO connection.
conn_res = sco_state.on_connected() => {
drop(sco_state);
info!("Handling SCO Connection accepted for peer {}.", self.id);
match conn_res {
Ok(sco) if !sco.is_closed() => {
let finish_sco_res = self.finish_sco_connection(sco).await;
if let Err(err) = finish_sco_res {
warn!("Failed to finish SCO connection with {:} for peer {}", err, self.id)
}
let call_transfer_res = self.calls.transfer_to_hf();
if let Err(err) = call_transfer_res {
warn!("Transfer to HF failed with {:} for peer {}", err, self.id)
}
},
// This can occur if the HF opens and closes a SCO connection immediately.
Ok(_) => warn!("Got already closed SCO connection for peer {}.", self.id),
Err(err) => {
warn!("Got error waiting for SCO connection {:} for peer {}", err, self.id);
break;
}
}
}
// New request coming from elsewhere in the component
request = task_channel.next() => {
drop(sco_state);
info!("Handling peer request {:?} for peer {}", request, self.id);
if let Some(request) = request {
if let Err(e) = self.peer_request(request).await {
warn!("Error handling peer request {} for peer {}", e, self.id);
break;
}
} else {
info!("Peer task channel closed for peer {}", self.id);
break;
}
}
// New request on the gain control protocol
request = self.gain_control.select_next_some() => {
info!("Handling gain control {:?} for peer {}", request, self.id);
self.connection.receive_ag_request(ProcedureMarker::VolumeControl, request.into()).await;
},
// A new call state has been received from the call service
update = self.calls.select_next_some() => {
drop(sco_state);
info!("Handling call {:?} for peer {}", update, self.id);
// TODO(fxbug.dev/75538): for in-band ring setup audio if should_ring is true
self.ringer.ring(self.calls.should_ring());
if update.callwaiting {
if let Some(call) = self.calls.waiting() {
self.call_waiting_update(call).await;
}
}
for status in update.to_vec() {
self.phone_status_update(status).await;
}
// Sync the SCO connection state to the call state.
// The error is already logged and we can't do anything.
let _ = self.update_sco_state().await;
}
// SCO connection has closed.
_ = active_sco_closed_fut => {
drop(sco_state);
info!("Handling SCO Connection closed for peer {}, transferring call to AG.", self.id);
self.sco_state.iset(ScoState::TearingDown);
let call_transfer_res = self.calls.transfer_to_ag();
if let Err(err) = call_transfer_res {
warn!("Transfer to AG failed with {:} for peer {}", err, self.id)
}
}
request = self.connection.next() => {
drop(sco_state);
info!("Handling SLC request {:?} for peer {}.", request, self.id);
if let Some(request) = request {
match request {
Ok(r) => self.procedure_request(r).await,
Err(e) => {
warn!("SLC stream error {:?} for peer {}", e, self.id);
break;
}
}
} else {
info!("Peer task channel closed for peer {}", self.id);
break;
}
}
update = self.network_updates.next() => {
drop(sco_state);
info!("Handling network update {:?} for peer {}", update, self.id);
if let Some(update) = stream_item_map_or_log(update, "PeerHandler::WatchNetworkUpdate", &self.id) {
self.handle_network_update(update).await
} else {
break;
}
}
_ = self.ringer.select_next_some() => {
drop(sco_state);
info!("Handling ring for peer {}.", self.id);
if let Some(call) = self.calls.ringing() {
self.ring_update(call).await;
} else {
self.ringer.ring(false);
}
}
complete => break,
}
}
info!("Stopping task for peer {}", self.id);
self
}
/// Sends an HF Indicator update to the client.
fn hf_indicator_update(&mut self, indicator: HfIndicator) {
match indicator {
ind @ HfIndicator::EnhancedSafety(_) => {
info!("Received EnhancedSafety HF Indicator update {:?} for peer {}", ind, self.id);
}
HfIndicator::BatteryLevel(v) => {
if let Some(handler) = &mut self.handler {
if let Err(e) = handler.report_headset_battery_level(v) {
warn!("Couldn't report headset battery level {:?} for peer {}", e, self.id);
}
}
self.inspect.set_hf_battery_level(v);
}
}
}
/// Start the Codec Connection procedure if no sco connection exists.
/// This procedure will negotiate a codec and eventually call
/// `sco_connector.connect`.
async fn initiate_codec_negotiation(&mut self) {
if self.sco_state.is_active() {
return;
}
self.connection
.receive_ag_request(ProcedureMarker::CodecConnectionSetup, AgUpdate::CodecSetup(None))
.await;
}
/// Update the SCO connection state to reflect the call state, creating a
/// a future to wait for incoming SCO connections if necessary.
async fn update_sco_state(&mut self) -> Result<(), ()> {
let call_active = self.calls.is_call_active();
let call_transferred = self.calls.is_call_transferred_to_ag();
if call_active && call_transferred {
error!("Call both active and transferred for peer {}", self.id);
return Err(());
}
let previous_sco_state = &*self.sco_state;
info!(
%self.id,
?previous_sco_state,
"update_sco_state: active: {}, transferred: {}",
call_active,
call_transferred
);
if call_active {
match previous_sco_state {
// A call just started, so set up SCO.
ScoState::Inactive
// A call was just transferred to the HF, so set up SCO.
| ScoState::AwaitingRemote(_)
=> {
self.initiate_codec_negotiation().await;
self.sco_state.iset(ScoState::SettingUp);
},
// We are negotiating codecs; wait for that to finish before starting the SCO
// connection, so do nothing.
ScoState::SettingUp
// The SCO connection was closed by the peer and we requested the call manager set
// that call as transferred to AG, but we are waiting on the call manager to do
// so, so do nothing.
| ScoState::TearingDown
// A call is active and we have a SCO connection, so do nothing.
| ScoState::Active(_) => {},
}
} else if call_transferred {
// If a call is transferred to the AG, we should be waiting for a connection from the
// HF to transfer it back.
if let ScoState::AwaitingRemote(_) = previous_sco_state {
// Already waiting for a SCO connection, nothing to do.
info!("update_sco_state: already awaiting");
return Ok(());
}
let fut = self.sco_connector.accept(self.id.clone(), self.get_codecs());
self.sco_state.iset(ScoState::AwaitingRemote(Box::pin(fut)));
} else {
/* No call in progress */
self.sco_state.iset(ScoState::Inactive);
};
info!(%self.id, ?self.sco_state, "update_sco_state: finished");
Ok(())
}
async fn finish_sco_connection(&mut self, sco_connection: ScoConnection) -> Result<(), Error> {
let peer_id = self.id.clone();
info!("Finishing SCO connection for peer {:}.", peer_id);
let res = self.a2dp_control.pause(Some(self.id.clone())).await;
let pause_token = match res {
Err(e) => {
warn!("Couldn't pause A2DP Audio: {:?} for peer {:}", e, peer_id);
None
}
Ok(token) => {
info!("Successfully paused audio for peer {:}.", peer_id);
Some(token)
}
};
{
let mut audio = self.audio_control.lock();
// Start the DAI with the given parameters
if let Err(e) = audio.start(self.id.clone(), sco_connection.params.clone().into()) {
// Cancel the SCO connection, we can't send audio.
// TODO(fxbug.dev/79784): this probably means we should just cancel out of HFP and
// this peer's connection entirely.
warn!(
"Couldn't start Audio DAI ({:?}) for peer {:} - dropping audio connection",
self.id, e
);
return Err(Error::system(format!("Couldn't start audio DAI"), e));
} else {
info!("Successfully started Audio DAI for peer {:}.", peer_id);
}
}
let vigil = Vigil::new(ScoActive { sco_connection, _pause_token: pause_token });
Vigil::watch(&vigil, {
let control = self.audio_control.clone();
move |_| match control.lock().stop() {
Err(e) => warn!("Couldn't stop audio for peer {:}: {:?}", peer_id, e),
Ok(()) => info!("Stopped HFP Audio for peer {:}", peer_id),
}
});
info!("In finish_sco_connection for peer {:}, SCO state is {:?}", peer_id, vigil);
self.sco_state.iset(ScoState::Active(vigil));
Ok(())
}
fn on_active_sco_closed(&self) -> impl Future<Output = ()> + 'static {
match &*self.sco_state {
ScoState::Active(connection) => connection.sco_connection.on_closed().left_future(),
_ => future::pending().right_future(),
}
}
/// Request to send the phone `status` by initiating the Phone Status Indicator
/// procedure.
async fn ring_update(&mut self, call: Call) {
self.connection.receive_ag_request(ProcedureMarker::Ring, AgUpdate::Ring(call)).await;
}
/// Request to send a Call Waiting Notification.
async fn call_waiting_update(&mut self, call: Call) {
self.connection
.receive_ag_request(
ProcedureMarker::CallWaitingNotifications,
AgUpdate::CallWaiting(call),
)
.await;
}
/// Request to send the phone `status` by initiating the Phone Status Indicator
/// procedure.
async fn phone_status_update(&mut self, status: AgIndicator) {
self.connection.receive_ag_request(ProcedureMarker::PhoneStatus, status.into()).await;
}
/// Update the network information with the provided `update` value.
async fn handle_network_update(&mut self, update: NetworkInformation) {
if update_table_entry(&mut self.network.service_available, &update.service_available) {
let status = AgIndicator::Service(self.network.service_available.unwrap() as u8);
self.phone_status_update(status).await;
}
if update_table_entry(&mut self.network.signal_strength, &update.signal_strength) {
let status = AgIndicator::Signal(self.network.signal_strength.unwrap() as u8);
self.phone_status_update(status).await;
}
if update_table_entry(&mut self.network.roaming, &update.roaming) {
let status = AgIndicator::Roam(self.network.roaming.unwrap() as u8);
self.phone_status_update(status).await;
}
self.inspect.network.update(&self.network);
}
fn get_codecs(&self) -> Vec<CodecId> {
self.connection.get_selected_codec().map_or(DEFAULT_CODECS.to_vec(), |c| vec![c])
}
}
/// Table entries are all optional fields. Update `dst` if `src` is present and differs from `dst`.
///
/// Return true if the update occurred.
fn update_table_entry<T: PartialEq + Clone>(dst: &mut Option<T>, src: &Option<T>) -> bool {
if src.is_some() && src != dst {
*dst = src.clone();
true
} else {
false
}
}
/// Take an item from a stream the produces results and return an `Option<T>` when available.
/// Log a message including the `stream_name` when the stream produces an error or is exhausted.
///
/// This is useful when dealing with streams where the only meaningful difference between
/// a terminated stream and an error value is how it should be logged.
fn stream_item_map_or_log<T, E: fmt::Debug>(
item: Option<Result<T, E>>,
stream_name: &str,
peer_id: &PeerId,
) -> Option<T> {
match item {
Some(Ok(value)) => Some(value),
Some(Err(e)) => {
warn!("Error on stream {} for peer {:}: {:?}", stream_name, peer_id, e);
None
}
None => {
info!("Stream {} closed for peer {:}", stream_name, peer_id);
None
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
assert_matches::assert_matches,
async_test_helpers::run_while,
async_utils::PollExt,
at_commands::{self as at, SerDe},
bt_rfcomm::{profile::build_rfcomm_protocol, ServerChannel},
core::task::Poll,
fidl::AsHandleRef,
fidl_fuchsia_bluetooth_bredr::{ProfileMarker, ProfileRequestStream, ScoErrorCode},
fidl_fuchsia_bluetooth_hfp::{
CallDirection, CallRequest, CallRequestStream, CallState, NextCall, PeerHandlerMarker,
PeerHandlerRequest, PeerHandlerRequestStream, PeerHandlerWatchNextCallResponder,
SignalStrength,
},
fuchsia_async as fasync,
fuchsia_bluetooth::types::Channel,
futures::{
future::ready,
pin_mut,
stream::{FusedStream, Stream},
SinkExt,
},
proptest::prelude::*,
std::convert::TryFrom,
};
use crate::{
audio::TestAudioControl,
features::{AgFeatures, HfFeatures},
peer::{
calls::Number,
indicators::{AgIndicatorsReporting, HfIndicators},
service_level_connection::{
tests::{
create_and_connect_slc, create_and_initialize_slc,
expect_data_received_by_peer, expect_peer_ready, serialize_at_response,
},
SlcState,
},
},
};
fn arb_signal() -> impl Strategy<Value = Option<SignalStrength>> {
proptest::option::of(prop_oneof![
Just(SignalStrength::None),
Just(SignalStrength::VeryLow),
Just(SignalStrength::Low),
Just(SignalStrength::Medium),
Just(SignalStrength::High),
Just(SignalStrength::VeryHigh),
])
}
prop_compose! {
fn arb_network()(
service_available in any::<Option<bool>>(),
signal_strength in arb_signal(),
roaming in any::<Option<bool>>()
) -> NetworkInformation {
NetworkInformation {
service_available,
roaming,
signal_strength,
..NetworkInformation::EMPTY
}
}
}
fn setup_peer_task(
connection: Option<ServiceLevelConnection>,
) -> (PeerTask, Sender<PeerRequest>, mpsc::Receiver<PeerRequest>, ProfileRequestStream) {
let (sender, receiver) = mpsc::channel(1);
let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<ProfileMarker>().unwrap();
let audio: Arc<Mutex<Box<dyn AudioControl>>> =
Arc::new(Mutex::new(Box::new(TestAudioControl::default())));
let mut task = PeerTask::new(
PeerId(1),
proxy,
audio,
AudioGatewayFeatureSupport::default(),
ConnectionBehavior::default(),
mpsc::channel(1).0,
)
.expect("Could not create PeerTask");
if let Some(conn) = connection {
task.connection = conn;
}
(task, sender, receiver, stream)
}
proptest! {
#![proptest_config(ProptestConfig{
// Disable persistence to avoid the warning for not running in the
// source code directory (since we're running on a Fuchsia target)
failure_persistence: None,
.. ProptestConfig::default()
})]
#[test]
fn updates(a in arb_network(), b in arb_network()) {
let mut exec = fasync::TestExecutor::new().unwrap();
let mut task = setup_peer_task(None).0;
task.network = a.clone();
exec.run_singlethreaded(task.handle_network_update(b.clone()));
let c = task.network.clone();
// Check that the `service_available` field is correct.
if b.service_available.is_some() {
assert_eq!(c.service_available, b.service_available);
} else if a.service_available.is_some() {
assert_eq!(c.service_available, a.service_available);
} else {
assert_eq!(c.service_available, None);
}
// Check that the `signal_strength` field is correct.
if b.signal_strength.is_some() {
assert_eq!(c.signal_strength, b.signal_strength);
} else if a.signal_strength.is_some() {
assert_eq!(c.signal_strength, a.signal_strength);
} else {
assert_eq!(c.signal_strength, None);
}
// Check that the `roaming` field is correct.
if b.roaming.is_some() {
assert_eq!(c.roaming, b.roaming);
} else if a.roaming.is_some() {
assert_eq!(c.roaming, a.roaming);
} else {
assert_eq!(c.roaming, None);
}
}
}
#[fuchsia::test]
fn handle_peer_request_stores_peer_handler_proxy() {
let mut exec = fasync::TestExecutor::new().unwrap();
let mut peer = setup_peer_task(None).0;
assert!(peer.handler.is_none());
let (proxy, mut stream) =
fidl::endpoints::create_proxy_and_stream::<PeerHandlerMarker>().unwrap();
{
let request_fut = peer.peer_request(PeerRequest::Handle(proxy));
pin_mut!(request_fut);
let (result, request_fut) = run_while(&mut exec, request_fut, stream.next());
match result {
Some(Ok(PeerHandlerRequest::WatchNetworkInformation { responder })) => {
responder
.send(NetworkInformation::EMPTY)
.expect("Successfully send network information");
}
x => panic!("Expected watch network information request: {:?}", x),
};
// Request future should finish.
let request_result = exec.run_singlethreaded(request_fut);
assert!(request_result.is_ok());
}
assert!(peer.handler.is_some());
}
#[fuchsia::test(allow_stalls = false)]
async fn handle_peer_request_decline_to_handle() {
let mut peer = setup_peer_task(None).0;
assert!(peer.handler.is_none());
let (proxy, server_end) = fidl::endpoints::create_proxy::<PeerHandlerMarker>().unwrap();
// close the PeerHandler channel by dropping the server endpoint.
drop(server_end);
peer.peer_request(PeerRequest::Handle(proxy)).await.expect("request to succeed");
assert!(peer.handler.is_none());
}
#[fuchsia::test(allow_stalls = false)]
async fn task_runs_until_all_event_sources_close() {
let (peer, sender, receiver, _) = setup_peer_task(None);
// Peer will stop running when all event sources are closed.
// Close all sources
drop(sender);
// Test that `run()` completes.
let _ = peer.run(receiver).await;
}
/// Expects a message to be received by the peer. This expectation does not validate the
/// contents of the data received.
#[track_caller]
fn expect_message_received_by_peer(exec: &mut fasync::TestExecutor, remote: &mut Channel) {
let mut vec = Vec::new();
let mut remote_fut = Box::pin(remote.read_datagram(&mut vec));
assert!(exec.run_until_stalled(&mut remote_fut).is_ready());
}
#[fuchsia::test]
fn peer_task_drives_procedure() {
let mut exec = fasync::TestExecutor::new().unwrap();
let (mut peer, _sender, receiver, _profile) = setup_peer_task(None);
// Set up the RFCOMM connection.
let (local, mut remote) = Channel::create();
exec.run_singlethreaded(peer.on_connection_request(vec![], local))
.expect("Connection request handling to succeed");
let mut peer_task_fut = Box::pin(peer.run(receiver));
assert!(exec.run_until_stalled(&mut peer_task_fut).is_pending());
// Simulate remote peer (HF) sending AT command to start the SLC Init Procedure.
let features = HfFeatures::empty();
let command = format!("AT+BRSF={}\r", features.bits()).into_bytes();
let _ = remote.as_ref().write(&command);
let _ = exec.run_until_stalled(&mut peer_task_fut);
// We then expect an outgoing message to the peer.
expect_message_received_by_peer(&mut exec, &mut remote);
}
#[fuchsia::test]
fn network_information_updates_are_relayed_to_peer() {
// This test produces the following two network updates. Each update is expected to
// be sent to the remote peer.
let network_update_1 = NetworkInformation {
signal_strength: Some(SignalStrength::Low),
roaming: Some(false),
..NetworkInformation::EMPTY
};
// Expect to send the Signal and Roam indicators to the peer.
let expected_data1 = vec![AgIndicator::Signal(3).into(), AgIndicator::Roam(0).into()];
let network_update_2 = NetworkInformation {
service_available: Some(true),
roaming: Some(true),
..NetworkInformation::EMPTY
};
// Expect to send the Service and Roam indicators to the peer.
let expected_data2 = vec![AgIndicator::Service(1).into(), AgIndicator::Roam(1).into()];
// The value after the updates are applied is expected to be the following
let expected_network = NetworkInformation {
service_available: Some(true),
roaming: Some(true),
signal_strength: Some(SignalStrength::Low),
..NetworkInformation::EMPTY
};
// Set up the executor, peer, and background call manager task
let mut exec = fasync::TestExecutor::new().unwrap();
let state = SlcState {
ag_indicator_events_reporting: AgIndicatorsReporting::new_enabled(),
..SlcState::default()
};
let (connection, mut remote) = create_and_initialize_slc(state);
let (peer, mut sender, receiver, _profile) = setup_peer_task(Some(connection));
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<PeerHandlerMarker>().unwrap();
// A vec to hold all the stream items we don't care about for this test.
let mut junk_drawer = vec![];
// Filter out all items that are irrelevant to this particular test, placing them in
// the junk_drawer.
let mut stream = stream.filter_map(move |item| {
let item = match item {
Ok(PeerHandlerRequest::WatchNetworkInformation { responder }) => Some(responder),
x => {
junk_drawer.push(x);
None
}
};
ready(item)
});
// Pass in the client end connected to the call manager
let result = exec.run_singlethreaded(sender.send(PeerRequest::Handle(proxy)));
assert!(result.is_ok());
// The stream should produce the network update while the peer runs in the background.
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
// Send the first network update - should be relayed to the peer.
let (responder, run_fut) = run_while(&mut exec, run_fut, stream.next());
responder.unwrap().send(network_update_1).expect("Successfully send network information");
let ((), run_fut) = run_while(
&mut exec,
run_fut,
expect_data_received_by_peer(&mut remote, expected_data1),
);
// Send the second network update - should be relayed to the peer.
let (responder, run_fut) = run_while(&mut exec, run_fut, stream.next());
responder.unwrap().send(network_update_2).expect("Successfully send network information");
let ((), mut run_fut) = run_while(
&mut exec,
run_fut,
expect_data_received_by_peer(&mut remote, expected_data2),
);
// Drop the peer task sender to force the PeerTask's run future to complete
drop(sender);
let task = exec.run_singlethreaded(&mut run_fut);
// Check that the task's network information contains the expected values
// based on the updates provided by the call manager task.
assert_eq!(task.network, expected_network);
}
#[fuchsia::test]
fn terminated_slc_ends_peer_task() {
let mut exec = fasync::TestExecutor::new().unwrap();
let (connection, remote) = create_and_initialize_slc(SlcState::default());
let (peer, _sender, receiver, _profile) = setup_peer_task(Some(connection));
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
// The peer task is pending with no futher work to do at this time.
let result = exec.run_until_stalled(&mut run_fut);
assert!(result.is_pending());
// Closing the SLC connection will result in the completion of the peer task.
drop(remote);
let result = exec.run_until_stalled(&mut run_fut);
let peer = result.expect("run to complete");
assert!(peer.connection.is_terminated());
}
#[fuchsia::test]
fn error_in_slc_ends_peer_task() {
let mut exec = fasync::TestExecutor::new().unwrap();
let (connection, remote) = create_and_initialize_slc(SlcState::default());
let (peer, _sender, receiver, _profile) = setup_peer_task(Some(connection));
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
// Produces an error when polling the ServiceLevelConnection stream by disabling write
// on the remote socket and read on the local socket.
let status = unsafe {
zx::sys::zx_socket_set_disposition(
remote.as_ref().raw_handle(),
zx::sys::ZX_SOCKET_DISPOSITION_WRITE_DISABLED,
0,
)
};
zx::Status::ok(status).unwrap();
// Error on the SLC connection will result in the completion of the peer task.
let result = exec.run_until_stalled(&mut run_fut);
assert!(result.is_ready());
}
/// Transform `stream` into a Stream of WatchNextCall responders.
#[track_caller]
async fn wait_for_call_stream(
stream: PeerHandlerRequestStream,
) -> impl Stream<Item = PeerHandlerWatchNextCallResponder> {
filtered_stream(stream, |item| match item {
PeerHandlerRequest::WatchNextCall { responder } => Ok(responder),
x => Err(x),
})
.await
}
/// Transform `stream` into a Stream of `T`.
///
/// `f` is a function that takes a PeerHandlerRequest and either returns an Ok if the request
/// is relevant to the test or Err if the request irrelevant.
///
/// This test helper function can be used in the common case where the test interacts
/// with a particular kind of PeerHandlerRequest.
///
/// Initial setup of the handler is done, then a filtered stream is produced which
/// outputs items based on the result of `f`. Ok return values from `f` are returned from the
/// `filtered_stream`. Err return values from `f` are not returned from the `filtered_stream`.
/// Instead they are stored within `filtered_stream` until `filtered_stream` is dropped
/// so that they do not cause the underlying fidl channel to be closed.
#[track_caller]
async fn filtered_stream<T>(
mut stream: PeerHandlerRequestStream,
f: impl Fn(PeerHandlerRequest) -> Result<T, PeerHandlerRequest>,
) -> impl Stream<Item = T> {
// Send the network information immediately so the peer can make progress.
match stream.next().await {
Some(Ok(PeerHandlerRequest::WatchNetworkInformation { responder })) => {
responder
.send(NetworkInformation::EMPTY)
.expect("Successfully send network information");
}
x => panic!("Expected watch network information request: {:?}", x),
};
// A vec to hold all the stream items we don't care about for this test.
let mut junk_drawer = vec![];
// Filter out all items, placing them in the junk_drawer.
stream.filter_map(move |item| {
let item = match item {
Ok(item) => match f(item) {
Ok(t) => Some(t),
Err(x) => {
junk_drawer.push(x);
None
}
},
_ => None,
};
ready(item)
})
}
#[fuchsia::test]
fn call_updates_update_ringer_state() {
// Set up the executor, peer, and background call manager task
let mut exec = fasync::TestExecutor::new().unwrap();
// Setup the peer task with the specified SlcState to enable indicator events.
let state = SlcState {
ag_indicator_events_reporting: AgIndicatorsReporting::new_enabled(),
..SlcState::default()
};
let (connection, mut remote) = create_and_initialize_slc(state);
let (peer, mut sender, receiver, _profile) = setup_peer_task(Some(connection));
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<PeerHandlerMarker>().unwrap();
// Pass in the client end connected to the call manager
let result = exec.run_singlethreaded(sender.send(PeerRequest::Handle(proxy)));
assert!(result.is_ok());
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
let (mut stream, run_fut) = run_while(&mut exec, run_fut, wait_for_call_stream(stream));
// Send the incoming call
let (responder, run_fut) = run_while(&mut exec, run_fut, stream.next());
let (client_end, _call_stream) = fidl::endpoints::create_request_stream().unwrap();
let next_call = NextCall {
call: Some(client_end),
remote: Some("1234567".to_string()),
state: Some(CallState::IncomingRinging),
direction: Some(CallDirection::MobileTerminated),
..NextCall::EMPTY
};
responder.unwrap().send(next_call).expect("Successfully send call information");
let expected_data = vec![AgIndicator::CallSetup(1).into()];
let ((), mut run_fut) =
run_while(&mut exec, run_fut, expect_data_received_by_peer(&mut remote, expected_data));
// Drop the peer task sender to force the PeerTask's run future to complete
drop(sender);
let task = exec.run_until_stalled(&mut run_fut).expect("run_fut to complete");
// Check that the task's ringer has an active call with the expected call index.
assert!(task.ringer.ringing());
}
#[fuchsia::test]
fn transfers_change_sco_state() {
// Set up the executor, peer, and background call manager task
info!("transfers_change_sco_state: Creating executer.");
let mut exec = fasync::TestExecutor::new().unwrap();
// Setup the peer task.
info!("transfers_change_sco_state: Creating SLC.");
let (connection, _remote) = create_and_initialize_slc(SlcState::default());
info!("transfers_change_sco_state: Creating peer task.");
let (peer, mut sender, receiver, mut profile) = setup_peer_task(Some(connection));
info!("transfers_change_sco_state: Creating peer handler proxy.");
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<PeerHandlerMarker>().unwrap();
// Pass in the client end connected to the call manager
info!("transfers_change_sco_state: Sending peer handler proxy.");
let result = exec.run_singlethreaded(sender.send(PeerRequest::Handle(proxy)));
assert!(result.is_ok());
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
info!("transfers_change_sco_state: Receiving peer handler stream.");
let (mut stream, run_fut) = run_while(&mut exec, run_fut, wait_for_call_stream(stream));
// Send the incoming call
info!("transfers_change_sco_state: Getting call stream responder.");
let (responder, run_fut) = run_while(&mut exec, run_fut, stream.next());
info!("transfers_change_sco_state: Creating call stream.");
let (client_end, mut call_stream) = fidl::endpoints::create_request_stream().unwrap();
let next_call = NextCall {
call: Some(client_end),
remote: Some("1234567".to_string()),
state: Some(CallState::IncomingRinging),
direction: Some(CallDirection::MobileTerminated),
..NextCall::EMPTY
};
info!("transfers_change_sco_state: Sending next call.");
responder.unwrap().send(next_call).expect("Successfully send call information");
// Answer call.
info!("transfers_change_sco_state: Awaiting watch state to answer call.");
let (request, run_fut) = run_while(&mut exec, run_fut, &mut call_stream.next());
let state_responder = match request {
Some(Ok(CallRequest::WatchState { responder })) => responder,
req => panic!("Expected WatchState, got {:?}", req),
};
info!("transfers_change_sco_state: Sending OngoingActive to initially answer call.");
state_responder.send(CallState::OngoingActive).expect("Sent OngoingActive.");
info!("transfers_change_sco_state: Setting up SCO connection.");
let (sco, mut run_fut) =
run_while(&mut exec, run_fut, expect_sco_connection(&mut profile, true, Ok(())));
info!("transfers_change_sco_state: Setting up SCO connection--pausing audio.");
while let None = exec.run_one_step(&mut run_fut) {}
let mut sco = sco.unwrap();
// Call is transferred to AG by AG and SCO is torn down.
info!("transfers_change_sco_state: Getting WatchState for transferring call to AG by AG part 1.");
let (request, run_fut) = run_while(&mut exec, run_fut, &mut call_stream.next());
let state_responder = match request {
Some(Ok(CallRequest::WatchState { responder })) => responder,
req => panic!("Expected WatchState, got {:?}", req),
};
info!("transfers_change_sco_state: Sending TransferedToAg.");
state_responder.send(CallState::TransferredToAg).expect("Sent TransferredToAg.");
info!("transfers_change_sco_state: Transferring call to AG by AG.");
let (request, mut run_fut) = run_while(&mut exec, run_fut, &mut call_stream.next());
info!("transfers_change_sco_state: Transferring call to AG by AG--unpausing audio.");
while let None = exec.run_one_step(&mut run_fut) {}
info!("transfers_change_sco_state: Checking if SCO is closed.");
let (sco_result, run_fut) = run_while(&mut exec, run_fut, &mut sco.next());
assert_matches!(sco_result, None);
// Call is transferred to HF by AG and SCO is set up.
let state_responder = match request {
Some(Ok(CallRequest::WatchState { responder })) => responder,
req => panic!("Expected WatchState, got {:?}", req),
};
info!("transfers_change_sco_state: Sending OngoingActive.");
state_responder.send(CallState::OngoingActive).expect("Sent OngoingActive.");
// Don't send a SCO connection until they are trying to connect.
info!("transfers_change_sco_state: Transferring call to HF by AG.");
let (sco, mut run_fut) =
run_while(&mut exec, run_fut, expect_sco_connection(&mut profile, true, Ok(())));
info!("transfers_change_sco_state: Transferring call to HF by AG--pausing audio.");
let _ = exec.run_one_step(&mut run_fut);
let sco = sco.expect("SCO Connection.");
// Run until the connection is handled by the task. This avoids a race where the
// the incoming SCO connection is closed before it's received, in which case a call
// is never set to active.
info!("transfers_change_sco_state: Finishing transferring call to HF by AG.");
let _ = exec.run_one_step(&mut run_fut);
// SCO is torn down by HF and call is transferred to AG
info!("transfers_change_sco_state: Dropping SCO.");
drop(sco);
info!("transfers_change_sco_state: Getting WatchState for transferring to AG.");
let (watch_state_req, run_fut) = run_while(&mut exec, run_fut, &mut call_stream.next());
info!("transfers_change_sco_state: Getting RequestTransferAudio.");
let (req, run_fut) = run_while(&mut exec, run_fut, &mut call_stream.next());
assert_matches!(req, Some(Ok(CallRequest::RequestTransferAudio { .. })));
let state_responder = match watch_state_req {
Some(Ok(CallRequest::WatchState { responder })) => responder,
req => panic!("Expected WatchState, got {:?}", req),
};
info!("transfers_change_sco_state: Sending TransferredToAg.");
state_responder.send(CallState::TransferredToAg).expect("Sent TransferredToAg");
// SCO is set up by HF and call is transferred to HF
info!("transfers_change_sco_state: Expecting SCO set up by HF.");
let (_sco, mut run_fut) =
run_while(&mut exec, run_fut, expect_sco_connection(&mut profile, false, Ok(())));
info!("transfers_change_sco_state: Expected SCO set up by HF--pausing audio.");
while let None = exec.run_one_step(&mut run_fut) {}
info!("transfers_change_sco_state: Getting WatchState.");
let (_watch_state_req, run_fut) = run_while(&mut exec, run_fut, &mut call_stream.next());
info!("transfers_change_sco_state: Getting RequestActive.");
let (req, mut run_fut) = run_while(&mut exec, run_fut, &mut call_stream.next());
assert_matches!(req, Some(Ok(CallRequest::RequestActive { .. })));
// Drop the peer task sender to force the PeerTask's run future to complete
info!("transfers_change_sco_state: Dropping sender.");
drop(sender);
info!("transfers_change_sco_state: Finishing.");
while let None = exec.run_one_step(&mut run_fut) {}
info!("transfers_change_sco_state: Finished.");
}
#[fuchsia::test]
fn incoming_hf_indicator_battery_level_is_propagated_to_peer_handler_stream() {
// Set up the executor, peer, and background call manager task
let mut exec = fasync::TestExecutor::new().unwrap();
// Setup the peer task with the specified SlcState to enable the battery level HF indicator.
let mut hf_indicators = HfIndicators::default();
hf_indicators.enable_indicators(vec![at::BluetoothHFIndicator::BatteryLevel]);
let state = SlcState { hf_indicators, ..SlcState::default() };
let (connection, mut remote) = create_and_initialize_slc(state);
let (peer, mut sender, receiver, _profile) = setup_peer_task(Some(connection));
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<PeerHandlerMarker>().unwrap();
// The battery level that will be reported by the peer.
let expected_level = 4;
// Pass in the client end connected to the call manager
let result = exec.run_singlethreaded(sender.send(PeerRequest::Handle(proxy)));
assert!(result.is_ok());
// Run the PeerTask.
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
let headset_level_stream_fut = filtered_stream(stream, |item| match item {
PeerHandlerRequest::ReportHeadsetBatteryLevel { level, .. } => Ok(level),
x => Err(x),
});
let (mut stream, run_fut) = run_while(&mut exec, run_fut, headset_level_stream_fut);
// Peer sends us a battery level HF indicator update.
let battery_level_cmd = at::Command::Biev {
anum: at::BluetoothHFIndicator::BatteryLevel,
value: expected_level as i64,
};
let mut buf = Vec::new();
at::Command::serialize(&mut buf, &vec![battery_level_cmd]).expect("serialization is ok");
let _ = remote.as_ref().write(&buf[..]).expect("channel write is ok");
// Run the main future - the task should receive the HF indicator and report it.
let (battery_level, _run_fut) = run_while(&mut exec, run_fut, stream.next());
assert_eq!(battery_level, Some(expected_level));
// Since we (the AG) received a valid HF indicator, we expect to send an OK back to the peer.
expect_peer_ready(&mut exec, &mut remote, Some(serialize_at_response(at::Response::Ok)));
}
#[fuchsia::test]
fn local_battery_level_change_initiates_phone_status_procedure() {
let mut exec = fasync::TestExecutor::new().unwrap();
// Setup the peer task with the specified SlcState to enable the battery level indicator on
// both the HF and the AG.
let mut hf_indicators = HfIndicators::default();
hf_indicators.enable_indicators(vec![at::BluetoothHFIndicator::BatteryLevel]);
let ag_indicator_events_reporting = AgIndicatorsReporting::new_enabled();
let state =
SlcState { hf_indicators, ag_indicator_events_reporting, ..SlcState::default() };
let (connection, mut remote) = create_and_initialize_slc(state);
let (peer, mut sender, receiver, _profile) = setup_peer_task(Some(connection));
// Run the PeerTask.
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
// Receive a local update from the Fuchsia Battery Manager about a battery level change.
let request = PeerRequest::BatteryLevel(3);
let send_request_fut = sender.send(request);
let (send_result, run_fut) = run_while(&mut exec, run_fut, send_request_fut);
assert_matches!(send_result, Ok(()));
// We expect the peer (HF) to receive the PhoneStatus update AT command.
let expected_ciev = vec![AgIndicator::BatteryLevel(3).into()];
let ((), _run_fut) =
run_while(&mut exec, run_fut, expect_data_received_by_peer(&mut remote, expected_ciev));
}
#[fuchsia::test]
fn call_updates_produce_call_waiting() {
// Set up the executor, peer, and background call manager task
let mut exec = fasync::TestExecutor::new().unwrap();
let raw_number = "1234567";
let number = Number::from(raw_number);
let expected_ccwa =
vec![at::success(at::Success::Ccwa { ty: number.type_(), number: number.into() })];
let expected_ciev = vec![AgIndicator::CallSetup(1).into()];
// Setup the peer task with the specified SlcState to enable indicator events.
let state = SlcState {
call_waiting_notifications: true,
ag_indicator_events_reporting: AgIndicatorsReporting::new_enabled(),
..SlcState::default()
};
let (connection, mut remote) = create_and_initialize_slc(state);
let (peer, mut sender, receiver, _profile) = setup_peer_task(Some(connection));
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<PeerHandlerMarker>().unwrap();
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
// Pass in the client end connected to the call manager
let result = exec.run_singlethreaded(sender.send(PeerRequest::Handle(proxy)));
assert!(result.is_ok());
let (mut stream, run_fut) = run_while(&mut exec, run_fut, wait_for_call_stream(stream));
// Send the incoming waiting call.
let (responder, run_fut) = run_while(&mut exec, run_fut, stream.next());
let (client_end, _call_stream) = fidl::endpoints::create_request_stream().unwrap();
let next_call = NextCall {
call: Some(client_end),
remote: Some(raw_number.to_string()),
state: Some(CallState::IncomingWaiting),
direction: Some(CallDirection::MobileTerminated),
..NextCall::EMPTY
};
responder.unwrap().send(next_call).expect("Successfully send call information");
let ((), run_fut) =
run_while(&mut exec, run_fut, expect_data_received_by_peer(&mut remote, expected_ccwa));
let ((), mut run_fut) =
run_while(&mut exec, run_fut, expect_data_received_by_peer(&mut remote, expected_ciev));
// Drop the peer task sender to force the PeerTask's run future to complete
drop(sender);
let _ = exec.run_until_stalled(&mut run_fut).expect("run_fut to complete");
}
#[fuchsia::test]
fn outgoing_call_holds_active() {
// Set up the executor.
let mut exec = fasync::TestExecutor::new().unwrap();
// Setup the peer task with the specified SlcState to enable three way calling.
let mut ag_features = AgFeatures::default();
ag_features.set(AgFeatures::THREE_WAY_CALLING, true);
let mut hf_features = HfFeatures::default();
hf_features.set(HfFeatures::THREE_WAY_CALLING, true);
let state = SlcState { ag_features, hf_features, ..SlcState::default() };
let (connection, remote) = create_and_initialize_slc(state);
let (peer, mut sender, receiver, mut profile) = setup_peer_task(Some(connection));
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<PeerHandlerMarker>().unwrap();
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
// Pass in the client end connected to the call manager
exec.run_singlethreaded(sender.send(PeerRequest::Handle(proxy))).expect("Connecting peer");
let (mut stream, run_fut) = run_while(&mut exec, run_fut, wait_for_call_stream(stream));
// Send the incoming waiting call.
let (responder, run_fut) = run_while(&mut exec, run_fut, stream.next());
let (client_end, mut call_stream) = fidl::endpoints::create_request_stream().unwrap();
let next_call = NextCall {
call: Some(client_end),
remote: Some("1234567".to_string()),
state: Some(CallState::IncomingWaiting),
direction: Some(CallDirection::MobileTerminated),
..NextCall::EMPTY
};
responder.unwrap().send(next_call).expect("Successfully send call information");
// Answer call.
let (request, run_fut) = run_while(&mut exec, run_fut, &mut call_stream.next());
let state_responder = match request {
Some(Ok(CallRequest::WatchState { responder })) => responder,
req => panic!("Expected WatchState, got {:?}", req),
};
state_responder.send(CallState::OngoingActive).expect("Sent OngoingActive.");
let (sco, run_fut) =
run_while(&mut exec, run_fut, expect_sco_connection(&mut profile, true, Ok(())));
let _sco = sco.expect("SCO Connection.");
// Make outgoing call from HF.
let dial_cmd = at::Command::AtdNumber { number: String::from("7654321") };
let mut buf = Vec::new();
at::Command::serialize(&mut buf, &vec![dial_cmd]).expect("serialization is ok");
let _ = remote.as_ref().write(&buf[..]).expect("channel write is ok");
let (watch_state_req, run_fut) = run_while(&mut exec, run_fut, &mut call_stream.next());
let _watch_state_resp = match watch_state_req {
Some(Ok(CallRequest::WatchState { responder, .. })) => responder,
req => panic!("Expected WatchState, got {:?}", req),
};
// Receive hold request from first call.
let (hold_req, _run_fut) = run_while(&mut exec, run_fut, &mut call_stream.next());
match hold_req {
Some(Ok(CallRequest::RequestHold { .. })) => {}
req => panic!("Expected RequestHold, got {:?}", req),
};
}
#[fuchsia::test]
fn connection_behavior_request_updates_state() {
let mut exec = fasync::TestExecutor::new().unwrap();
let (peer, mut sender, receiver, mut profile) = setup_peer_task(None);
let _peer_task = fasync::Task::local(peer.run(receiver));
// First check that a connection is made when search results are received.
// Send a valid search result.
let random_channel_number = ServerChannel::try_from(4).expect("valid server channel");
let protocol =
Some(build_rfcomm_protocol(random_channel_number).iter().map(Into::into).collect());
let event = ProfileEvent::SearchResult {
id: PeerId(1),
protocol: protocol.clone(),
attributes: vec![],
};
exec.run_singlethreaded(sender.send(PeerRequest::Profile(event))).expect("Send to succeed");
// Get a connection request on the `profile` stream.
let result = exec.run_until_stalled(&mut profile.next());
let _channel = match result {
Poll::Ready(Some(Ok(bredr::ProfileRequest::Connect {
connection:
bredr::ConnectParameters::Rfcomm(bredr::RfcommParameters {
channel: Some(sc), ..
}),
responder,
..
}))) => {
assert_eq!(sc, u8::from(random_channel_number));
let (local, remote) = Channel::create();
let local = local.try_into().unwrap();
responder.send(&mut Ok(local)).unwrap();
remote
}
x => panic!("unexpected result: {:?}", x),
};
// Disable auto connect behavior
exec.run_singlethreaded(
sender.send(PeerRequest::Behavior(ConnectionBehavior { autoconnect: false })),
)
.expect("Send to succeed");
// Connection is not made after autoconnect is disabled
// Send search results.
let event = ProfileEvent::SearchResult { id: PeerId(1), protocol, attributes: vec![] };
exec.run_singlethreaded(sender.send(PeerRequest::Profile(event))).expect("Send to succeed");
// No request is received on stream.
let result = exec.run_until_stalled(&mut profile.next());
assert!(result.is_pending());
}
#[fuchsia::test]
fn non_rfcomm_search_result_is_ignored() {
let mut exec = fasync::TestExecutor::new().unwrap();
let (peer, mut sender, receiver, mut profile) = setup_peer_task(None);
let _peer_task = fasync::Task::local(peer.run(receiver));
// No connection should be made for a non RFCOMM search result.
// Send a valid search result with some random L2CAP protocol.
let protocol = vec![bredr::ProtocolDescriptor {
protocol: bredr::ProtocolIdentifier::L2Cap,
params: vec![bredr::DataElement::Uint16(25)],
}];
let event = ProfileEvent::SearchResult {
id: PeerId(1),
protocol: Some(protocol),
attributes: vec![],
};
exec.run_singlethreaded(sender.send(PeerRequest::Profile(event))).expect("Send to succeed");
// No connection request on the `profile` stream.
assert!(exec.run_until_stalled(&mut profile.next()).is_pending());
}
#[fuchsia::test]
fn connect_request_triggers_connection() {
let mut exec = fasync::TestExecutor::new().unwrap();
let connection = ServiceLevelConnection::new();
let (local, mut remote) = Channel::create();
let (peer, mut sender, receiver, _profile) = setup_peer_task(Some(connection));
assert!(!peer.connection.connected());
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
let event_fut = sender.send(PeerRequest::Profile(ProfileEvent::PeerConnected {
id: PeerId(0),
protocol: vec![],
channel: local,
}));
exec.run_singlethreaded(event_fut).unwrap();
// The peer task is pending with no further work to do at this time.
let result = exec.run_until_stalled(&mut run_fut);
assert!(result.is_pending());
// Closing the SLC connection will result in the completion of the peer task.
drop(sender);
let result = exec.run_until_stalled(&mut run_fut);
let peer = result.expect("run to complete");
assert!(peer.connection.connected());
assert!(exec.run_until_stalled(&mut remote.next()).is_pending());
}
#[fuchsia::test]
fn connect_request_replaces_connection() {
let mut exec = fasync::TestExecutor::new().unwrap();
// SLC is connected at the start of the test.
let (connection, mut old_remote) = create_and_connect_slc();
let (peer, mut sender, receiver, _profile) = setup_peer_task(Some(connection));
assert!(peer.connection.connected());
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
// create a new connection for the SLC
let (local, mut new_remote) = Channel::create();
let event_fut = sender.send(PeerRequest::Profile(ProfileEvent::PeerConnected {
id: PeerId(0),
protocol: vec![],
channel: local,
}));
exec.run_singlethreaded(event_fut).unwrap();
// The peer task is pending with no further work to do at this time.
let result = exec.run_until_stalled(&mut run_fut);
assert!(result.is_pending());
// Closing the SLC connection will result in the completion of the peer task.
drop(sender);
let result = exec.run_until_stalled(&mut run_fut);
let peer = result.expect("run to complete");
assert!(peer.connection.connected());
let result = exec.run_until_stalled(&mut old_remote.next());
// old_remote is closed
assert_matches!(result, Poll::Ready(None));
// new_remote is open
assert!(exec.run_until_stalled(&mut new_remote.next()).is_pending());
}
/// Run a PeerTask until it has no more work left to do, then return it.
/// This function generates its own PeerTask channel for convenience. This means that
/// the channel cannot be used to send messages to a running PeerTask.
#[track_caller]
fn run_peer_until_stalled(exec: &mut fasync::TestExecutor, peer: PeerTask) -> PeerTask {
let (_sender, receiver) = mpsc::channel(0);
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
exec.run_until_stalled(&mut run_fut)
.expect_pending("shouldn't be done while _sender is live");
drop(_sender);
exec.run_until_stalled(&mut run_fut).unwrap()
}
async fn expect_sco_connection(
profile_requests: &mut ProfileRequestStream,
expected_initiator: bool,
result: Result<(), ScoErrorCode>,
) -> Option<bredr::ScoConnectionRequestStream> {
// Sometimes dropping the SCO connection accept future doesn't cancel the
// existing request before we want to connect a new one. In this case, we
// may get *two* requests for a SCO connection, and we want the second one,
// which has the expected direction.
loop {
let proxy = match profile_requests.next().await.expect("request").unwrap() {
bredr::ProfileRequest::ConnectSco { receiver, params, initiator, .. } => {
if initiator != expected_initiator {
continue;
};
assert!(params.len() >= 1);
receiver.into_proxy().unwrap()
}
x => panic!("Unexpected request to profile stream: {:?}", x),
};
match result {
Ok(()) => {
let (client, request_stream) =
fidl::endpoints::create_request_stream::<bredr::ScoConnectionMarker>()
.expect("request stream");
let default_params = bredr::ScoConnectionParameters {
parameter_set: Some(bredr::HfpParameterSet::CvsdD1),
air_coding_format: Some(bredr::CodingFormat::Cvsd),
air_frame_size: Some(60),
io_bandwidth: Some(16000),
io_coding_format: Some(bredr::CodingFormat::LinearPcm),
io_frame_size: Some(16),
io_pcm_data_format: Some(
fidl_fuchsia_hardware_audio::SampleFormat::PcmSigned,
),
path: Some(bredr::DataPath::Offload),
..bredr::ScoConnectionParameters::EMPTY
};
proxy.connected(client, default_params).unwrap();
return Some(request_stream);
}
Err(code) => {
proxy.error(code).unwrap();
return None;
}
}
}
}
/// Setup a new audio connection between the PeerTask and an upstream ProfileRequestStream.
/// This helper function asserts that the SCO connection is created and that the audio
/// connection has started up.
#[track_caller]
fn setup_audio(
exec: &mut fasync::TestExecutor,
peer: &mut PeerTask,
profile_requests: &mut ProfileRequestStream,
) -> bredr::ScoConnectionRequestStream {
let codecs = peer
.connection
.get_selected_codec()
.map_or(vec![CodecId::MSBC, CodecId::CVSD], |c| vec![c]);
let sco_connector = peer.sco_connector.clone();
let audio_connection_fut = sco_connector.connect(peer.id.clone(), codecs).fuse();
pin_mut!(audio_connection_fut);
exec.run_until_stalled(&mut audio_connection_fut).expect_pending("shouldn't be done yet");
// Expect a sco connection, and have it succeed.
let sco_complete_fut = expect_sco_connection(profile_requests, true, Ok(()));
pin_mut!(sco_complete_fut);
let result = exec.run_singlethreaded(&mut futures::future::select(
audio_connection_fut,
sco_complete_fut,
));
let (remote_sco, mut audio_connection_fut) = match result {
Either::Right(r) => r,
Either::Left(_) => panic!("Audio connection future shouldn't have finished"),
};
let res = exec.run_until_stalled(&mut audio_connection_fut).expect("should be done");
let local_sco = res.expect("should have started up okay");
let audio_connection_fut2 = peer.finish_sco_connection(local_sco);
pin_mut!(audio_connection_fut2);
exec.run_singlethreaded(&mut audio_connection_fut2).expect("finished");
remote_sco.unwrap()
}
#[fuchsia::test]
fn setup_audio_connection_connects_and_starts_audio() {
let mut exec = fasync::TestExecutor::new().unwrap();
// SLC is connected at the start of the test.
let (connection, _old_remote) = create_and_connect_slc();
let (mut peer, _sender, _receiver, mut profile_requests) =
setup_peer_task(Some(connection));
assert!(peer.connection.connected());
let audio_control = peer.audio_control.clone();
let _remote_sco = setup_audio(&mut exec, &mut peer, &mut profile_requests);
// Should have started up the test audio control. Test by trying to start it again, it
// should be an error.
{
let mut lock = audio_control.lock();
let _ = lock
.start(PeerId(0), bredr::ScoConnectionParameters::EMPTY)
.expect_err("shouldn't be able to start, already started");
}
}
#[fuchsia::test]
fn audio_is_stopped_when_sco_connection_closes() {
let mut exec = fasync::TestExecutor::new().unwrap();
// SLC is connected at the start of the test.
let (connection, _old_remote) = create_and_connect_slc();
let (mut peer, _sender, receiver, mut profile_requests) = setup_peer_task(Some(connection));
assert!(peer.connection.connected());
let audio_control = peer.audio_control.clone();
let remote_sco = setup_audio(&mut exec, &mut peer, &mut profile_requests);
// Should have started up the test audio control.
{
let mut lock = audio_control.lock();
let _ = lock
.start(PeerId(0), bredr::ScoConnectionParameters::EMPTY)
.expect_err("shouldn't be able to start, already started");
}
// Set up the run task.
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
let _ = exec.run_until_stalled(&mut run_fut);
drop(remote_sco);
// Spin the run task to notice that the SCO connection has failed, and drop
// the audio / stop the audio.
let _ = exec.run_until_stalled(&mut run_fut);
// Should have stopped the audio - check by trying to stop it again, it should be an error
let mut lock = audio_control.lock();
let _ = lock.stop().expect_err("should already be stopped");
}
#[fuchsia::test]
fn sco_connection_closed_when_call_ends() {
let mut exec = fasync::TestExecutor::new().unwrap();
// SLC is connected at the start of the test.
let (connection, _old_remote) = create_and_connect_slc();
let (mut peer, _sender, _receiver, mut profile_requests) =
setup_peer_task(Some(connection));
assert!(peer.connection.connected());
let _remote_sco = setup_audio(&mut exec, &mut peer, &mut profile_requests);
// Run the PeerTask to handle all audio setup tasks
let peer = run_peer_until_stalled(&mut exec, peer);
assert!(peer.sco_state.is_active());
// Create the Call Manager side of a PeerHandler to send a call state update to the
// `OngoingHeld` state in order to tear down the active sco connection.
// Once call_manager does the work required by the test, it completes, returning items that
// should be kept alive for the remainder of the test.
async fn call_manager(
stream: PeerHandlerRequestStream,
) -> (impl Stream<Item = PeerHandlerWatchNextCallResponder>, CallRequestStream) {
let mut stream = wait_for_call_stream(stream).await;
// Send the held call response
let responder = stream.next().await.unwrap();
let (client_end, call_stream) = fidl::endpoints::create_request_stream().unwrap();
let next_call = NextCall {
call: Some(client_end),
remote: Some("1234567".to_string()),
state: Some(CallState::OngoingHeld),
direction: Some(CallDirection::MobileTerminated),
..NextCall::EMPTY
};
responder.send(next_call).expect("Successfully send call information");
(stream, call_stream)
}
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<PeerHandlerMarker>().unwrap();
let call_manager_fut = call_manager(stream);
pin_mut!(call_manager_fut);
let (mut sender, receiver) = mpsc::channel(0);
// Wire up the HFP side of PeerHandler by passing the proxy into the PeerTask.
let handle_fut = sender.send(PeerRequest::Handle(proxy));
// Join the futures that are responsible for sending events _into_ the PeerTask.
let join_fut = futures::future::join(call_manager_fut, handle_fut);
// Create the run future to drive the PeerTask forward.
let run_fut = peer.run(receiver);
pin_mut!(run_fut);
// Run until `join_fut` completes.
let (_stream, mut run_fut) = run_while(&mut exec, run_fut, join_fut);
// Make sure all work that is pending in the PeerTask `run` future completes,
// forcing the future to complete and return the `PeerTask` object.
assert!(exec.run_until_stalled(&mut run_fut).is_pending());
// Force `run_fut` to complete by dropping the `sender`.
drop(sender);
let peer = exec.run_singlethreaded(run_fut);
// The SCO connection should be removed after the PeerTask has handled the Terminated call
// update.
assert!(!peer.sco_state.is_active());
}
}