blob: 70155d81daeb062060f097383e405163fefc98e1 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use async_utils::stream::FutureMap;
use fidl_fuchsia_bluetooth_bredr as bredr;
use fuchsia_bluetooth::types::PeerId;
use futures::StreamExt;
use profile_client::{ProfileClient, ProfileEvent};
use std::collections::hash_map::Entry;
use tracing::{debug, error, info};
use crate::peer_task::PeerTask;
/// Struct containing all known peers and the profile client.
pub struct Peers {
profile_client: ProfileClient,
profile_proxy: bredr::ProfileProxy,
peers: FutureMap<PeerId, PeerTask>,
}
// TODO(https://fxbug.dev/42163927) Clean up when a peer task finishes.
impl Peers {
pub fn new(profile_client: ProfileClient, profile_proxy: bredr::ProfileProxy) -> Self {
Self { profile_client, profile_proxy, peers: FutureMap::new() }
}
pub async fn run(&mut self) {
loop {
info!("In main loop awaiting profile events.");
match self.profile_client.next().await {
Some(Ok(event)) => {
info!("Received profile event: {:?}", event);
self.handle_profile_event(event).await;
}
Some(Err(e)) => {
error!("Error encountered wating for profile events: {}.", e);
break;
}
None => {
debug!("Profile event stream terminated.");
break;
}
}
}
}
async fn handle_profile_event(&mut self, event: ProfileEvent) {
let peer_id = event.peer_id();
debug!("Handling profile event: {:?}", event);
let task = self.make_task_if_nonexistent(peer_id);
let send_result = task.handle_profile_event(event).await;
if let Err(err) = send_result {
// At this point we've just created a task but can't communicate with it. This is an unrecoverable error.
error!("Unable to send profile event to peer {:} with error {:?}", peer_id, err);
let _ = self.peers.remove(&peer_id);
}
}
fn make_task_if_nonexistent(&mut self, peer_id: PeerId) -> &mut PeerTask {
match self.peers.inner().entry(peer_id) {
Entry::Occupied(task_entry) => task_entry.into_mut(),
Entry::Vacant(task_entry) => {
let task = PeerTask::spawn_new(peer_id, self.profile_proxy.clone());
task_entry.insert(Box::pin(task))
}
}
}
#[cfg(test)]
fn get_peer_map(&mut self) -> &mut FutureMap<PeerId, PeerTask> {
&mut self.peers
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_test_helpers::run_while;
use async_utils::PollExt;
use {fuchsia_async as fasync, test_profile_server};
#[fuchsia::test]
fn search_result_creates_peer_task() {
// Set up peers.
let mut exec = fasync::TestExecutor::new();
let test_profile_server::TestProfileServerEndpoints { proxy, client, mut test_server } =
test_profile_server::TestProfileServer::new(
None, // Advertisement
Some(bredr::ServiceClassProfileIdentifier::HumanInterfaceDevice), // Search
);
let mut peers = Peers::new(client, proxy);
assert_eq!(peers.get_peer_map().inner().len(), 0);
// A service found event causes a task to be added to the peers map.
{
let peers_fut = Box::pin(peers.run());
let (_, peers_fut) = run_while(&mut exec, peers_fut, test_server.expect_search());
let (service_found_result, mut peers_fut) = run_while(
&mut exec,
peers_fut,
test_server.send_service_found(PeerId(1), None, Vec::new()),
);
service_found_result.expect("Service found result");
exec.run_until_stalled(&mut peers_fut).expect_pending("Handling connected first 1");
}
assert_eq!(peers.get_peer_map().inner().len(), 1);
// A second service found event for the same peer doesn't add a new task to the peers map.
{
let peers_fut = Box::pin(peers.run());
let (service_found_result, mut peers_fut) = run_while(
&mut exec,
peers_fut,
test_server.send_service_found(PeerId(1), None, Vec::new()),
);
service_found_result.expect("Service found result");
exec.run_until_stalled(&mut peers_fut).expect_pending("Handling connected second 1");
}
assert_eq!(peers.get_peer_map().inner().len(), 1);
// A service found event for a second peer causes a task to be added to the peers map.
{
let peers_fut = Box::pin(peers.run());
let (service_found_result, mut peers_fut) = run_while(
&mut exec,
peers_fut,
test_server.send_service_found(PeerId(2), None, Vec::new()),
);
service_found_result.expect("Service found result");
exec.run_until_stalled(&mut peers_fut).expect_pending("Handling connected 2");
}
assert_eq!(peers.get_peer_map().inner().len(), 2);
}
}