blob: 2e3661fe37959d8fb3818593a2b98b63659fe8cb [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{anyhow, format_err, Context as _, Error};
use async_helpers::hanging_get::asynchronous as hanging_get;
use fidl::endpoints::{Proxy, ServerEnd};
use fidl_fuchsia_bluetooth::{Appearance, DeviceClass};
use fidl_fuchsia_bluetooth_bredr::ProfileMarker;
use fidl_fuchsia_bluetooth_gatt::Server_Marker;
use fidl_fuchsia_bluetooth_gatt2::{
LocalServiceRequest, Server_Marker as Server_Marker2, Server_Proxy,
use fidl_fuchsia_bluetooth_host::{HostProxy, ProtocolRequest};
use fidl_fuchsia_bluetooth_le::{CentralMarker, PeripheralMarker};
use fidl_fuchsia_bluetooth_sys::{
self as sys, InputCapability, OutputCapability, PairingDelegateProxy,
use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
use fuchsia_bluetooth::inspect::{DebugExt, Inspectable, ToProperty};
use fuchsia_bluetooth::types::pairing_options::PairingOptions;
use fuchsia_bluetooth::types::{
Address, BondingData, HostData, HostId, HostInfo, Identity, Peer, PeerId,
use fuchsia_inspect::{self as inspect, unique_name, NumericProperty, Property};
use fuchsia_inspect_contrib::{inspect_log, nodes::BoundedListNode};
use fuchsia_sync::RwLock;
use fuchsia_zircon::{self as zx, AsHandleRef, Duration};
use futures::channel::{mpsc, oneshot};
use futures::future::{BoxFuture, Future};
use futures::FutureExt;
use slab::Slab;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, Waker};
use tracing::{error, info, trace, warn};
use crate::{
build_config, generic_access_service,
host_device::{HostDevice, HostDiscoverableSession, HostDiscoverySession, HostListener},
services::pairing::pairing_dispatcher::{PairingDispatcher, PairingDispatcherHandle},
pub use fidl_fuchsia_device::DEFAULT_DEVICE_NAME;
/// Policies for HostDispatcher::set_name
#[derive(Copy, Clone, PartialEq, Debug)]
pub enum NameReplace {
/// Keep the current name if it is already set, but set a new name if it hasn't been.
/// Replace the current name unconditionally.
pub static HOST_INIT_TIMEOUT: i64 = 5; // Seconds
/// Available FIDL services that can be provided by a particular Host
#[derive(Copy, Clone)]
pub enum HostService {
/// When a client requests Discovery, we establish and store two distinct sessions; the dispatcher
/// DiscoverySession, an Arc<> of which is returned to clients and represents the dispatcher's
/// state of discovery that perists as long as one client maintains an Arc<> to the session, and
/// the HostDiscoverySession, which is returned by the active host device on which discovery is
/// physically ocurring and persists until the host disappears or the host session is dropped.
pub enum DiscoveryState {
Discovering {
session: Weak<DiscoverySession>,
host_session: HostDiscoverySession,
started: fasync::Time,
impl DiscoveryState {
// If a dispatcher discovery session exists, return an Arc<> pointer to it.
fn get_discovery_session(&self) -> Option<Arc<DiscoverySession>> {
match self {
DiscoveryState::Discovering { session, .. } => session.upgrade(),
_ => None,
// Idempotently end the discovery session.
// Returns the duration of a session, if one was ended.
fn end_discovery_session(&mut self) -> Option<fasync::Duration> {
// If we are Discovering, HostDiscoverySession is dropped here
let prev = std::mem::replace(self, DiscoveryState::NotDiscovering);
if let DiscoveryState::Discovering { started, .. } = prev {
return Some(fasync::Time::now() - started);
// If possible, replace the current host session with a given new one. This does not affect
// the dispatcher session.
fn attach_new_host_session(&mut self, new_host_session: HostDiscoverySession) {
if let DiscoveryState::Discovering { host_session, .. } = self {
*host_session = new_host_session;
/// A dispatcher discovery session, which persists as long as at least one client holds an
/// Arc<> to it.
pub struct DiscoverySession {
dispatcher_state: Arc<RwLock<HostDispatcherState>>,
impl Drop for DiscoverySession {
fn drop(&mut self) {
let mut write = self.dispatcher_state.write();
if let Some(dur) = write.discovery.end_discovery_session() {
inspect_log!(write.inspect.discovery_history, duration: dur.into_seconds_f64());
struct HostDispatcherInspect {
_inspect: inspect::Node,
peers: inspect::Node,
hosts: inspect::Node,
host_count: inspect::UintProperty,
device_class: inspect::StringProperty,
peer_count: inspect::UintProperty,
input_capability: inspect::StringProperty,
output_capability: inspect::StringProperty,
has_pairing_delegate: inspect::UintProperty,
evicted_peers: BoundedListNode,
discovery_sessions: inspect::UintProperty,
discovery_history: BoundedListNode,
impl HostDispatcherInspect {
pub fn new(inspect: inspect::Node) -> HostDispatcherInspect {
HostDispatcherInspect {
host_count: inspect.create_uint("host_count", 0),
peer_count: inspect.create_uint("peer_count", 0),
device_class: inspect.create_string("device_class", "default"),
input_capability: inspect.create_string("input_capability", "unknown"),
output_capability: inspect.create_string("output_capability", "unknown"),
has_pairing_delegate: inspect.create_uint("has_pairing_delegate", 0),
peers: inspect.create_child("peers"),
hosts: inspect.create_child("hosts"),
discovery_sessions: inspect.create_uint("discovery_sessions", 0),
discovery_history: BoundedListNode::new(
evicted_peers: BoundedListNode::new(
_inspect: inspect,
pub fn peers(&self) -> &inspect::Node {
pub fn hosts(&self) -> &inspect::Node {
/// The HostDispatcher acts as a proxy aggregating multiple HostAdapters
/// It appears as a Host to higher level systems, and is responsible for
/// routing commands to the appropriate HostAdapter
struct HostDispatcherState {
host_devices: HashMap<HostId, HostDevice>,
active_id: Option<HostId>,
// Component storage.
stash: Stash,
// GAP state
// Name, if set. If not set, hosts will not have a set name.
name: Option<String>,
appearance: Appearance,
discovery: DiscoveryState,
discoverable: Option<Weak<HostDiscoverableSession>>,
config_settings: build_config::Config,
peers: HashMap<PeerId, Inspectable<Peer>>,
// Sender end of a futures::mpsc channel to send LocalServiceRequests to Generic Access Service.
// When a new host adapter is recognized, we create a new GasProxy, which takes GAS requests
// from the new host and forwards them along a clone of this channel to GAS
gas_channel_sender: mpsc::Sender<LocalServiceRequest>,
pairing_dispatcher: Option<PairingDispatcherHandle>,
watch_peers_publisher: hanging_get::Publisher<HashMap<PeerId, Peer>>,
watch_peers_registrar: hanging_get::SubscriptionRegistrar<PeerWatcher>,
watch_hosts_publisher: hanging_get::Publisher<Vec<HostInfo>>,
watch_hosts_registrar: hanging_get::SubscriptionRegistrar<sys::HostWatcherWatchResponder>,
// Pending requests to obtain a Host.
host_requests: Slab<Waker>,
inspect: HostDispatcherInspect,
impl HostDispatcherState {
/// Set the active adapter for this HostDispatcher
pub fn set_active_host(&mut self, adapter_id: HostId) -> types::Result<()> {
if let Some(id) = self.active_id {
if id == adapter_id {
return Ok(());
// Shut down the previously active host.
let _ = self.host_devices[&id].shutdown();
if self.host_devices.contains_key(&adapter_id) {
} else {
/// Used to set the pairing delegate. If there is a prior pairing delegate connected to the
/// host, check if the existing stored connection is closed:
/// * if it is closed, overwrite it and succeed
/// * if it is still active, fail
/// If there is no prior delegate, this will always succeed
/// Returns `true` if the delegate was set successfully, otherwise false
fn set_pairing_delegate(
&mut self,
delegate: PairingDelegateProxy,
input: InputCapability,
output: OutputCapability,
) -> types::Result<()> {
match self.pairing_dispatcher.as_ref() {
Some(dispatcher) if !dispatcher.is_closed() => {
Err(format_err!("Another Delegate is active"))?
_ => {
let (dispatcher, handle) = PairingDispatcher::new(delegate, input, output);
for host in self.host_devices.values() {
handle.add_host(, host.proxy().clone());
// Old pairing dispatcher dropped; this drops all host pairings
self.pairing_dispatcher = Some(handle);
// Spawn handling of the new pairing requests
// TODO( - We should avoid detach() here, and consider a more
// explicit way to track this task
/// Return the active id. If the ID is currently not set,
/// it will make the first ID in it's host_devices active
fn get_active_id(&mut self) -> Option<HostId> {
let active = self.active_id.clone();
active.or_else(|| {
self.host_devices.keys().next().cloned().map(|id| {
/// Return the active host. If the Host is currently not set,
/// it will make the first ID in it's host_devices active
fn get_active_host(&mut self) -> Option<HostDevice> {
self.get_active_id().and_then(|id| self.host_devices.get(&id)).cloned()
/// Resolves all pending OnAdapterFuture's. Called when we leave the init period (by seeing the
/// first host device or when the init timer expires).
fn resolve_host_requests(&mut self) {
for waker in &self.host_requests {
fn add_host(&mut self, id: HostId, host: HostDevice) {
if self.host_devices.insert(id, host.clone()).is_some() {
warn!("Host replaced: {}", id.to_string())
} else {
info!("Host added: {}", id.to_string());
// If this is the only host, mark it as active.
let _ = self.get_active_id();
// Update inspect state
self.inspect.host_count.set(self.host_devices.len() as u64);
// Notify HostWatcher interface clients about the new device.
// Resolve pending adapter futures.
/// Updates the active adapter and notifies listeners & host watchers.
fn set_active_id(&mut self, id: Option<HostId>) {
info!("New active adapter: {}", id.map_or("<none>".to_string(), |id| id.to_string()));
self.active_id = id;
pub fn notify_host_watchers(&self) {
// The HostInfo::active field for the active host must be filled in later.
let active_id = self.active_id;
// Wait for the hanging get watcher to update so we can linearize updates
let current_hosts: Vec<HostInfo> = self
.map(|host| {
let mut info =;
// Fill in HostInfo::active
if let Some(active_id) = active_id { = active_id ==;
let mut publisher = self.watch_hosts_publisher.clone();
fasync::Task::spawn(async move {
.expect("Fatal error: Host Watcher HangingGet unreachable");
pub struct HostDispatcher {
state: Arc<RwLock<HostDispatcherState>>,
impl HostDispatcher {
/// The HostDispatcher will forward all Generic Access Service requests to the mpsc::Receiver
/// end of |gas_channel_sender|. It is the responsibility of this function's caller to ensure
/// that these requests are handled. This can be done by passing the mpsc::Receiver into a
/// GenericAccessService struct and ensuring its run method is scheduled.
pub fn new(
appearance: Appearance,
stash: Stash,
inspect: inspect::Node,
gas_channel_sender: mpsc::Sender<LocalServiceRequest>,
watch_peers_publisher: hanging_get::Publisher<HashMap<PeerId, Peer>>,
watch_peers_registrar: hanging_get::SubscriptionRegistrar<PeerWatcher>,
watch_hosts_publisher: hanging_get::Publisher<Vec<HostInfo>>,
watch_hosts_registrar: hanging_get::SubscriptionRegistrar<sys::HostWatcherWatchResponder>,
) -> HostDispatcher {
let hd = HostDispatcherState {
active_id: None,
host_devices: HashMap::new(),
name: None,
config_settings: build_config::load_default(),
peers: HashMap::new(),
discovery: DiscoveryState::NotDiscovering,
discoverable: None,
pairing_dispatcher: None,
host_requests: Slab::new(),
inspect: HostDispatcherInspect::new(inspect),
HostDispatcher { state: Arc::new(RwLock::new(hd)) }
pub fn when_hosts_found(&self) -> impl Future<Output = HostDispatcher> {
pub fn get_name(&self) -> String {
pub fn get_appearance(&self) -> Appearance {
pub async fn set_name(&self, name: String, replace: NameReplace) -> types::Result<()> {
if NameReplace::Keep == replace && {
return Ok(());
self.state.write().name = Some(name);
match self.active_host().await {
Some(host) => {
let name = self.get_name();
None => Err(types::Error::no_host()),
pub async fn set_device_class(&self, class: DeviceClass) -> types::Result<()> {
let class_repr = class.debug();
let res = match self.active_host().await {
Some(host) => host.set_device_class(class).await,
None => Err(types::Error::no_host()),
// Update Inspect state
if res.is_ok() {;
/// Set the active adapter for this HostDispatcher
pub fn set_active_host(&self, host: HostId) -> types::Result<()> {
/// Used to set the pairing delegate. If there is a prior pairing delegate connected to the
/// host, check if the existing stored connection is closed:
/// * if it is closed, overwrite it and succeed
/// * if it is still active, fail
/// If there is no prior delegate, this will always succeed
/// Returns `true` if the delegate was set successfully, otherwise false
pub fn set_pairing_delegate(
delegate: PairingDelegateProxy,
input: InputCapability,
output: OutputCapability,
) -> types::Result<()> {
self.state.write().set_pairing_delegate(delegate, input, output)
pub async fn apply_sys_settings(&self, new_settings: sys::Settings) -> build_config::Config {
let (host_devices, new_config) = {
let mut state = self.state.write();
state.config_settings = state.config_settings.update_with_sys_settings(&new_settings);
(state.host_devices.clone(), state.config_settings.clone())
for (host_id, device) in host_devices {
let fut = device.apply_sys_settings(&new_settings);
if let Err(e) = fut.await {
warn!("Unable to apply new settings to host {}: {:?}", host_id, e);
let failed_host_path = device.path();
async fn discover_on_active_host(&self) -> types::Result<HostDiscoverySession> {
match self.active_host().await {
Some(host) => HostDevice::establish_discovery_session(&host).await,
None => Err(types::Error::no_host()),
pub async fn start_discovery(&self) -> types::Result<Arc<DiscoverySession>> {
// If a Discovery session already exists, return its session token
if let Some(existing_session) = {
return Ok(existing_session);
// If Discovery is pending, add ourself to queue of clients awaiting session token
let mut session_receiver = None;
if let DiscoveryState::Pending(client_queue) = &mut self.state.write().discovery {
let (send, recv) = oneshot::channel();
session_receiver = Some(recv);
// We cannot also .await on the channel in the previous if statement, since we
// acquire a lock on the dispatcher state there, i.e. self.state.write()
if let Some(recv) = session_receiver {
return recv
.map_err(|_| format_err!("Pending discovery client channel closed").into());
// If we don't have a discovery session and we're not pending, we must be
// NotDiscovering, so start a new discovery session
// Immediately mark the state as pending to indicate to other requests to wait on
// this discovery session initialization
self.state.write().discovery = DiscoveryState::Pending(Vec::new());
let host_session = self.discover_on_active_host().await?;
let dispatcher_session =
Arc::new(DiscoverySession { dispatcher_state: self.state.clone() });
let _ =;
// Replace Pending state with new session and send session token to waiters
if let DiscoveryState::Pending(client_queue) = std::mem::replace(
&mut self.state.write().discovery,
DiscoveryState::Discovering {
session: Arc::downgrade(&dispatcher_session),
started: fasync::Time::now(),
) {
for client in client_queue {
let _ = client.send(dispatcher_session.clone());
// TODO( - This is susceptible to the same ToCtoToU race condition as
// start_discovery. We can fix with the same tri-state pattern as for discovery
pub async fn set_discoverable(&self) -> types::Result<Arc<HostDiscoverableSession>> {
let strong_current_token =|token| token.upgrade());
if let Some(token) = strong_current_token {
return Ok(Arc::clone(&token));
match self.active_host().await {
Some(host) => {
let token = Arc::new(host.establish_discoverable_session().await?);
self.state.write().discoverable = Some(Arc::downgrade(&token));
None => Err(types::Error::no_host()),
fn stash(&self) -> Stash {
pub async fn forget(&self, peer_id: PeerId) -> types::Result<()> {
// Try to delete from each host, even if it might not have the peer.
// peers will be updated by the disconnection(s).
let hosts = self.get_all_adapters().await;
if hosts.is_empty() {
return Err(sys::Error::Failed.into());
let mut hosts_removed: u32 = 0;
for host in hosts {
let host_path = host.path();
match host.forget(peer_id).await {
Ok(()) => hosts_removed += 1,
Err(types::Error::SysError(sys::Error::PeerNotFound)) => {
trace!("No peer {} on host {:?}; ignoring", peer_id, host_path);
err => {
error!("Could not forget peer {} on host {:?}", peer_id, host_path);
return err;
if let Err(_) = self.stash().rm_peer(peer_id).await {
return Err(format_err!("Couldn't remove peer").into());
if hosts_removed == 0 {
return Err(format_err!("No hosts had peer").into());
pub async fn connect(&self, peer_id: PeerId) -> types::Result<()> {
let host = self.active_host().await;
match host {
Some(host) => host.connect(peer_id).await,
None => Err(types::Error::SysError(sys::Error::Failed)),
/// Instruct the active host to intitiate a pairing procedure with the target peer. If it
/// fails, we return the error we receive from the host
pub async fn pair(&self, id: PeerId, pairing_options: PairingOptions) -> types::Result<()> {
let host = self.active_host().await;
match host {
Some(host) => host.pair(id, pairing_options.into()).await,
None => Err(sys::Error::Failed.into()),
// Attempt to disconnect peer with id `peer_id` from all transports
pub async fn disconnect(&self, peer_id: PeerId) -> types::Result<()> {
let host = self.active_host().await;
match host {
Some(host) => host.disconnect(peer_id).await,
None => Err(types::Error::no_host()),
pub fn active_host(&self) -> impl Future<Output = Option<HostDevice>> {
self.when_hosts_found().map(|adapter| {
let mut wstate = adapter.state.write();
pub async fn get_all_adapters(&self) -> Vec<HostDevice> {
let _ = self.when_hosts_found().await;
pub fn get_adapters(&self) -> Vec<HostInfo> {
let hosts =;
pub async fn request_host_service(self, chan: zx::Channel, service: HostService) {
match self.active_host().await {
Some(host) => {
let host = host.proxy();
match service {
HostService::LeCentral => {
let remote = ServerEnd::<CentralMarker>::new(chan.into());
let _ = host.request_protocol(ProtocolRequest::Central(remote));
HostService::LePeripheral => {
let remote = ServerEnd::<PeripheralMarker>::new(chan.into());
let _ = host.request_protocol(ProtocolRequest::Peripheral(remote));
HostService::LeGatt => {
let remote = ServerEnd::<Server_Marker>::new(chan.into());
let _ = host.request_protocol(ProtocolRequest::GattServer(remote));
HostService::LeGatt2 => {
let remote = ServerEnd::<Server_Marker2>::new(chan.into());
let _ = host.request_protocol(ProtocolRequest::Gatt2Server(remote));
HostService::Profile => {
let remote = ServerEnd::<ProfileMarker>::new(chan.into());
let _ = host.request_protocol(ProtocolRequest::Profile(remote));
None => eprintln!("Failed to spawn, no active host"),
// This is not an async method as we do not want to borrow `self` for the duration of the async
// call, and we also want to trigger the send immediately even if the future is not yet awaited
pub fn store_bond(&self, bond_data: BondingData) -> impl Future<Output = Result<(), Error>> {
pub fn on_device_updated(&self, peer: Peer) -> impl Future<Output = ()> {
let update_peer = peer.clone();
let mut publisher = {
let mut state = self.state.write();
let node = state.inspect.peers().create_child(unique_name("peer_"));
let peer = Inspectable::new(peer, node);
let _drop_old_value = state.peers.insert(, peer);
state.inspect.peer_count.set(state.peers.len() as u64);
// Wait for the hanging get watcher to update so we can linearize updates
async move {
.update(move |peers| {
let _ = peers.insert(, update_peer);
.expect("Fatal error: Peer Watcher HangingGet unreachable")
pub fn on_device_removed(&self, id: PeerId) -> impl Future<Output = ()> {
let mut publisher = {
let mut state = self.state.write();
if let Some(removed) = state.peers.remove(&id) {
inspect_log!(state.inspect.evicted_peers, peer: removed);
state.inspect.peer_count.set(state.peers.len() as u64);
// Wait for the hanging get watcher to update so we can linearize updates
async move {
.update(move |peers| {
// Updated if we actually removed something.
.expect("Fatal error: Peer Watcher HangingGet unreachable")
pub async fn watch_peers(&self) -> hanging_get::Subscriber<PeerWatcher> {
let mut registrar = self.state.write().watch_peers_registrar.clone();
registrar.new_subscriber().await.expect("Fatal error: Peer Watcher HangingGet unreachable")
pub async fn watch_hosts(&self) -> hanging_get::Subscriber<sys::HostWatcherWatchResponder> {
let mut registrar = self.state.write().watch_hosts_registrar.clone();
registrar.new_subscriber().await.expect("Fatal error: Host Watcher HangingGet unreachable")
async fn spawn_gas_proxy(&self, gatt_server_proxy: Server_Proxy) -> Result<(), Error> {
let gas_channel =;
let gas_proxy =
generic_access_service::GasProxy::new(gatt_server_proxy, gas_channel).await?;
fasync::Task::spawn(|r| {
r.unwrap_or_else(|err| {
warn!("Error passing message through Generic Access proxy: {:?}", err);
/// Commit all bootstrapped bonding identities to the system. This will update both the Stash
/// and our in memory store, and notify all hosts of new bonding identities. If we already have
/// bonding data for any of the peers (as identified by address), the new bootstrapped data
/// will override them.
pub async fn commit_bootstrap(&self, identities: Vec<Identity>) -> types::Result<()> {
// Store all new bonds in our permanent Store. If we cannot successfully record the bonds
// in the store, then Bootstrap.Commit() has failed.
let mut stash =;
for identity in identities {
// Notify all current hosts of any changes to their bonding data
let host_devices: Vec<_> =;
for host in host_devices {
// If we fail to restore bonds to a given host, that is not a failure on a part of
// Bootstrap.Commit(), but a failure on the host. So do not return error from this
// function, but instead log and continue.
// TODO( - if a host fails we should close it and clean up after it
if let Err(error) =
try_restore_bonds(host.clone(), self.clone(), &host.public_address()).await
"Error restoring Bootstrapped bonds to host '{:?}': {}",
/// Finishes initializing a host device by setting host configs and services.
async fn add_host_device(&self, host_device: &HostDevice) -> Result<(), Error> {
let dbg_ids = host_device.debug_identifiers();
// TODO( Make sure that the bt-host device is left in a well-known state if
// any of these operations fails.
let address = host_device.public_address();
assign_host_data(host_device.clone(), self.clone(), &address)
.context(format!("{:?}: failed to assign identity to bt-host", dbg_ids))?;
try_restore_bonds(host_device.clone(), self.clone(), &address)
.map_err(|e| e.as_failure())?;
let config =;
.context(format!("{:?}: failed to configure bt-host device", dbg_ids))?;
// Assign the name that is currently assigned to the HostDispatcher as the local name.
let name = self.get_name();
.map_err(|e| e.as_failure())
.context(format!("{:?}: failed to set name of bt-host", dbg_ids))?;
let (gatt_server_proxy, remote_gatt_server) = fidl::endpoints::create_proxy()?;
.context(format!("{:?}: failed to open gatt server for bt-host", dbg_ids))?;
.context(format!("{:?}: failed to spawn generic access service", dbg_ids))?;
// Ensure the current active pairing delegate (if it exists) handles this host
self.state.write().add_host(, host_device.clone());
// Update our hanging_get server with the latest hosts. This will notify any pending
// hanging_gets and any new requests will see the new results.
fn notify_host_watchers(&self) {
pub async fn rm_device(&self, host_path: &str) {
let mut new_adapter_activated = false;
// Scope our HostDispatcherState lock
let mut hd = self.state.write();
let active_id = hd.active_id.clone();
// Get the host IDs that match `host_path`.
let ids: Vec<HostId> = hd
.filter(|(_, ref host)| host.path() == host_path)
.map(|(k, _)| k.clone())
let id_strs: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
info!("Host removed: {} (path: {:?})", id_strs.join(","), host_path);
for id in &ids {
// Reset the active ID if it got removed.
if let Some(active_id) = active_id {
if ids.contains(&active_id) {
hd.active_id = None;
// Try to assign a new active adapter. This may send an "OnActiveAdapterChanged" event.
if hd.active_id.is_none() && hd.get_active_id().is_some() {
new_adapter_activated = true;
} // Now the lock is dropped, we can run the async notify
if new_adapter_activated {
if let Err(err) = self.configure_newly_active_adapter().await {
warn!("Failed to persist state on adapter change: {:?}", err);
/// Configure a newly active adapter with the correct behavior for an active adapter.
async fn configure_newly_active_adapter(&self) -> types::Result<()> {
// Migrate discovery state to new host
if {
let new_host_session = self.discover_on_active_host().await?;
/// Route pairing requests from this host through our pairing dispatcher, if it exists
fn handle_pairing_requests(&self, host: HostDevice) {
let mut dispatcher = self.state.write();
if let Some(handle) = &mut dispatcher.pairing_dispatcher {
handle.add_host(, host.proxy().clone());
pub async fn add_host_component(&self, proxy: HostProxy) -> types::Result<()> {
info!("Adding host component");
let node ="device_"));
let proxy_handle = proxy.as_channel().raw_handle().to_string();
let host_device = init_host(proxy_handle.as_str(), node, proxy).await?;
info!("Successfully started host device: {:?}",;
// Start listening to Host interface events.
let this = self.clone();
async move {
match host_device.watch_events(this.clone()).await {
Ok(()) => (),
Err(e) => {
warn!("Error handling host event: {e:?}");
let host_path = proxy_handle.as_str();
async fn init_host(path: &str, node: inspect::Node, proxy: HostProxy) -> Result<HostDevice, Error> {
node.record_string("path", path);
// Obtain basic information and create and entry in the dispatcher's map.
let host_info = proxy.watch_state().await.context("failed to obtain bt-host information")?;
let host_info = Inspectable::new(HostInfo::try_from(host_info)?, node);
Ok(HostDevice::new(path.to_string(), proxy, host_info))
impl HostListener for HostDispatcher {
type PeerUpdatedFut = BoxFuture<'static, ()>;
fn on_peer_updated(&mut self, peer: Peer) -> Self::PeerUpdatedFut {
type PeerRemovedFut = BoxFuture<'static, ()>;
fn on_peer_removed(&mut self, id: PeerId) -> Self::PeerRemovedFut {
type HostBondFut = BoxFuture<'static, Result<(), anyhow::Error>>;
fn on_new_host_bond(&mut self, data: BondingData) -> Self::HostBondFut {
type HostInfoFut = BoxFuture<'static, Result<(), anyhow::Error>>;
fn on_host_updated(&mut self, _info: HostInfo) -> Self::HostInfoFut {
async { Ok(()) }.boxed()
/// A future that completes when at least one adapter is available.
#[must_use = "futures do nothing unless polled"]
struct WhenHostsFound {
hd: HostDispatcher,
waker_key: Option<usize>,
impl WhenHostsFound {
// Constructs an WhenHostsFound that completes at the latest after HOST_INIT_TIMEOUT seconds.
fn new(hd: HostDispatcher) -> impl Future<Output = HostDispatcher> {
WhenHostsFound { hd: hd.clone(), waker_key: None }.on_timeout(
move || {
let mut inner = hd.state.write();
if inner.host_devices.len() == 0 {
info!("No bt-host devices found");
fn remove_waker(&mut self) {
if let Some(key) = self.waker_key {
self.waker_key = None;
impl Drop for WhenHostsFound {
fn drop(&mut self) {
impl Unpin for WhenHostsFound {}
impl Future for WhenHostsFound {
type Output = HostDispatcher;
fn poll(mut self: ::std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if == 0 {
let hd = self.hd.clone();
if self.waker_key.is_none() {
self.waker_key = Some(hd.state.write().host_requests.insert(cx.waker().clone()));
} else {
async fn try_restore_bonds(
host_device: HostDevice,
hd: HostDispatcher,
address: &Address,
) -> types::Result<()> {
// Load bonding data that use this host's `address` as their "local identity address".
let opt_data = hd.stash().list_bonds(address.clone()).await?;
let data = match opt_data {
Some(data) => data,
None => return Ok(()),
match host_device.restore_bonds(data).await {
Err(e) => {
error!("failed to restore bonding data for host: {:?}", e);
Ok(errors) => {
if errors.is_empty() {
} else {
let msg =
errors.into_iter().fold("".to_string(), |acc, b| format!("{}, {:?}", acc, b));
let msg = format!("failed to restore bonding data: {}", msg);
error!("{}", msg);
fn generate_irk() -> Result<sys::Key, zx::Status> {
let mut buf: [u8; 16] = [0; 16];
// Generate a secure IRK.
zx::cprng_draw(&mut buf);
Ok(sys::Key { value: buf })
async fn assign_host_data(
host: HostDevice,
hd: HostDispatcher,
address: &Address,
) -> Result<(), Error> {
// Obtain an existing IRK or generate a new one if one doesn't already exists for |address|.
let data = match hd.stash().get_host_data(address.clone()).await? {
Some(host_data) => {
trace!("restored IRK");
None => {
// Generate a new IRK.
trace!("generating new IRK");
let new_data = HostData { irk: Some(generate_irk()?) };
if let Err(e) = hd.stash().store_host_data(address.clone(), new_data.clone()).await {
error!("failed to persist local IRK");
return Err(e.into());
host.set_local_data(data).map_err(|e| e.into())
pub(crate) mod test {
use super::*;
use fidl_fuchsia_bluetooth_gatt2::{
LocalServiceProxy, Server_Request, Server_RequestStream as GattServerRequestStream,
use fidl_fuchsia_bluetooth_host::{HostRequest, HostRequestStream};
use futures::{future::join, StreamExt};
pub(crate) fn make_test_dispatcher(
watch_peers_publisher: hanging_get::Publisher<HashMap<PeerId, Peer>>,
watch_peers_registrar: hanging_get::SubscriptionRegistrar<PeerWatcher>,
watch_hosts_publisher: hanging_get::Publisher<Vec<HostInfo>>,
watch_hosts_registrar: hanging_get::SubscriptionRegistrar<sys::HostWatcherWatchResponder>,
) -> HostDispatcher {
let (gas_channel_sender, _ignored_gas_task_req_stream) = mpsc::channel(0);
pub(crate) fn make_simple_test_dispatcher() -> HostDispatcher {
let watch_peers_broker = hanging_get::HangingGetBroker::new(
|_, _| true,
let watch_hosts_broker = hanging_get::HangingGetBroker::new(
|_, _| true,
let dispatcher = make_test_dispatcher(
let watchers_fut = join(,|_| ());
pub(crate) struct GasEndpoints {
gatt_server: Option<GattServerRequestStream>,
service: Option<LocalServiceProxy>,
async fn handle_standard_host_server_init(
mut host_server: HostRequestStream,
) -> (HostRequestStream, GasEndpoints) {
let mut gas_endpoints = GasEndpoints::default();
while gas_endpoints.gatt_server.is_none() {
match {
Some(Ok(HostRequest::SetLocalName { responder, .. })) => {
info!("Setting Local Name");
let _ = responder.send(Ok(()));
Some(Ok(HostRequest::SetDeviceClass { responder, .. })) => {
info!("Setting Device Class");
let _ = responder.send(Ok(()));
Some(Ok(HostRequest::RequestProtocol {
payload: ProtocolRequest::Gatt2Server(server),
})) => {
// don't respond at all on the server side.
info!("Storing Gatt Server");
let mut gatt_server = server.into_stream().unwrap();
info!("GAS Server was started, waiting for publish");
// The Generic Access Service now publishes itself.
match {
Some(Ok(Server_Request::PublishService { info, service, responder })) => {
info!("Captured publish of GAS Service: {:?}", info);
gas_endpoints.service = Some(service.into_proxy().unwrap());
let _ = responder.send(Ok(()));
x => error!("Got unexpected GAS Server request: {:?}", x),
gas_endpoints.gatt_server = Some(gatt_server);
Some(Ok(HostRequest::SetConnectable { responder, .. })) => {
info!("Setting connectable");
let _ = responder.send(Ok(()));
Some(Ok(req)) => info!("Unhandled Host Request in add: {:?}", req),
Some(Err(e)) => error!("Error in host server: {:?}", e),
None => break,
info!("Finishing host_device mocking for add host");
(host_server, gas_endpoints)
pub(crate) async fn create_and_add_test_host_to_dispatcher(
id: HostId,
dispatcher: &HostDispatcher,
) -> types::Result<(HostRequestStream, HostDevice, GasEndpoints)> {
let (host_server, host_device) = HostDevice::mock_from_id(id);
let host_server_init_handler = handle_standard_host_server_init(host_server);
let (res, (host_server, gas_endpoints)) =
join(dispatcher.add_host_device(&host_device), host_server_init_handler).await;
Ok((host_server, host_device, gas_endpoints))