| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use anyhow::{anyhow, format_err, Context as _, Error}; |
| use async_helpers::hanging_get::asynchronous as hanging_get; |
| use fidl::endpoints::{Proxy, ServerEnd}; |
| use fidl_fuchsia_bluetooth::{Appearance, DeviceClass}; |
| use fidl_fuchsia_bluetooth_bredr::ProfileMarker; |
| use fidl_fuchsia_bluetooth_gatt::Server_Marker; |
| use fidl_fuchsia_bluetooth_gatt2::{ |
| LocalServiceRequest, Server_Marker as Server_Marker2, Server_Proxy, |
| }; |
| use fidl_fuchsia_bluetooth_host::{DiscoverySessionProxy, HostProxy, ProtocolRequest}; |
| use fidl_fuchsia_bluetooth_le::{CentralMarker, PeripheralMarker}; |
| use fidl_fuchsia_bluetooth_sys::{ |
| self as sys, InputCapability, OutputCapability, PairingDelegateProxy, |
| }; |
| use fuchsia_async::{self as fasync, DurationExt, TimeoutExt}; |
| use fuchsia_bluetooth::inspect::{DebugExt, Inspectable, ToProperty}; |
| use fuchsia_bluetooth::types::pairing_options::PairingOptions; |
| use fuchsia_bluetooth::types::{ |
| Address, BondingData, HostData, HostId, HostInfo, Identity, Peer, PeerId, |
| }; |
| use fuchsia_inspect::{self as inspect, unique_name, NumericProperty, Property}; |
| use fuchsia_inspect_contrib::{inspect_log, nodes::BoundedListNode}; |
| use fuchsia_sync::RwLock; |
| use fuchsia_zircon::{self as zx, AsHandleRef, Duration}; |
| use futures::channel::{mpsc, oneshot}; |
| use futures::future::{self, BoxFuture, FusedFuture, Future, Shared}; |
| use futures::FutureExt; |
| use slab::Slab; |
| use std::collections::HashMap; |
| use std::sync::{Arc, Weak}; |
| use std::task::{Context, Poll, Waker}; |
| use tracing::{debug, error, info, trace, warn}; |
| |
| use crate::{ |
| build_config, generic_access_service, |
| host_device::{HostDevice, HostDiscoverableSession, HostListener}, |
| services::pairing::pairing_dispatcher::{PairingDispatcher, PairingDispatcherHandle}, |
| store::stash::Stash, |
| types, |
| watch_peers::PeerWatcher, |
| }; |
| |
| pub use fidl_fuchsia_device::DEFAULT_DEVICE_NAME; |
| |
| /// Policies for HostDispatcher::set_name |
| #[derive(Copy, Clone, PartialEq, Debug)] |
| pub enum NameReplace { |
| /// Keep the current name if it is already set, but set a new name if it hasn't been. |
| Keep, |
| /// Replace the current name unconditionally. |
| Replace, |
| } |
| |
| pub static HOST_INIT_TIMEOUT: i64 = 5; // Seconds |
| |
| /// Available FIDL services that can be provided by a particular Host |
| #[derive(Copy, Clone)] |
| pub enum HostService { |
| LeCentral, |
| LePeripheral, |
| LeGatt, |
| LeGatt2, |
| Profile, |
| } |
| |
| /// When a client requests Discovery, we establish and store two distinct sessions; the dispatcher |
| /// DiscoverySession, an Arc<> of which is returned to clients and represents the dispatcher's |
| /// state of discovery that perists as long as one client maintains an Arc<> to the session, and |
| /// the DiscoverySessionProxy, which is returned by the active host device on which discovery is |
| /// physically occurring and persists until the host disappears or discovery is stopped. |
| pub enum DiscoveryState { |
| NotDiscovering, |
| Pending { |
| // Additional client requests for discovery made while discovery is asynchronously |
| // starting. |
| session_receiver: Shared<oneshot::Receiver<Arc<DiscoverySession>>>, |
| session_sender: oneshot::Sender<Arc<DiscoverySession>>, |
| discovery_stopped_receiver: Shared<oneshot::Receiver<()>>, |
| discovery_stopped_sender: oneshot::Sender<()>, |
| start_discovery_task: fasync::Task<()>, |
| }, |
| Discovering { |
| session: Weak<DiscoverySession>, |
| discovery_proxy: DiscoverySessionProxy, |
| started: fasync::Time, |
| discovery_on_closed_task: fasync::Task<()>, |
| discovery_stopped_receiver: Shared<oneshot::Receiver<()>>, |
| discovery_stopped_sender: oneshot::Sender<()>, |
| }, |
| Stopping { |
| // DiscoverySessionProxy needs to be held while stopping so that peer closed signal can be received. |
| discovery_proxy: DiscoverySessionProxy, |
| // Contains requests for discovery made while discovery is stopping (edge case). |
| // This field is optional so we know if we don't need to restart discovery. |
| session_receiver: Option<Shared<oneshot::Receiver<Arc<DiscoverySession>>>>, |
| session_sender: Option<oneshot::Sender<Arc<DiscoverySession>>>, |
| discovery_on_closed_task: fasync::Task<()>, |
| discovery_stopped_receiver: Shared<oneshot::Receiver<()>>, |
| discovery_stopped_sender: oneshot::Sender<()>, |
| }, |
| } |
| |
| impl DiscoveryState { |
| // Idempotently end the discovery session. |
| // Returns the duration of a session, if one was ended. |
| fn stop_discovery_session(&mut self) -> Option<fasync::Duration> { |
| if let DiscoveryState::Discovering { |
| discovery_proxy, |
| started, |
| discovery_on_closed_task, |
| discovery_stopped_receiver, |
| discovery_stopped_sender, |
| .. |
| } = std::mem::replace(self, DiscoveryState::NotDiscovering) |
| { |
| // stop() is synchronous, but stopping is an asynchronous procedure and will complete |
| // when the Discovery event stream terminates. |
| let _ = discovery_proxy.stop(); |
| *self = DiscoveryState::Stopping { |
| discovery_proxy, |
| session_receiver: None, |
| session_sender: None, |
| discovery_on_closed_task, |
| discovery_stopped_receiver, |
| discovery_stopped_sender, |
| }; |
| return Some(fasync::Time::now() - started); |
| } |
| None |
| } |
| |
| pub fn on_stopped(&mut self) -> impl FusedFuture<Output = ()> { |
| let rx = match self { |
| DiscoveryState::NotDiscovering => { |
| let (tx, rx) = oneshot::channel(); |
| let _ = tx.send(()); |
| rx.shared() |
| } |
| DiscoveryState::Pending { discovery_stopped_receiver, .. } => { |
| discovery_stopped_receiver.clone() |
| } |
| DiscoveryState::Discovering { discovery_stopped_receiver, .. } => { |
| discovery_stopped_receiver.clone() |
| } |
| DiscoveryState::Stopping { discovery_stopped_receiver, .. } => { |
| discovery_stopped_receiver.clone() |
| } |
| }; |
| rx.map(|_| ()) |
| } |
| |
| fn get_active_session_future( |
| &mut self, |
| ) -> Option<BoxFuture<'static, Result<Arc<DiscoverySession>, oneshot::Canceled>>> { |
| match self { |
| DiscoveryState::NotDiscovering => None, |
| DiscoveryState::Pending { session_receiver, .. } => { |
| Some(session_receiver.clone().boxed()) |
| } |
| DiscoveryState::Discovering { session, .. } => { |
| let session = session.upgrade().expect("session must exist in Discovering state"); |
| Some(future::ready(Ok(session)).boxed()) |
| } |
| DiscoveryState::Stopping { session_receiver, session_sender, .. } => { |
| match session_receiver { |
| Some(recv) => Some(recv.clone().boxed()), |
| None => { |
| let (send, recv) = oneshot::channel(); |
| let recv = recv.shared(); |
| *session_receiver = Some(recv.clone()); |
| *session_sender = Some(send); |
| Some(recv.boxed()) |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| impl std::fmt::Debug for DiscoveryState { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| match self { |
| DiscoveryState::NotDiscovering => { |
| write!(f, "DiscoveryState::NotDiscovering") |
| } |
| DiscoveryState::Pending { .. } => { |
| write!(f, "DiscoveryState::Pending") |
| } |
| DiscoveryState::Discovering { .. } => { |
| write!(f, "DiscoveryState::Discovering") |
| } |
| DiscoveryState::Stopping { .. } => { |
| write!(f, "DiscoveryState::Stopping") |
| } |
| } |
| } |
| } |
| |
| /// A dispatcher discovery session, which persists as long as at least one client holds an |
| /// Arc<> to it. |
| pub struct DiscoverySession { |
| dispatcher_state: Arc<RwLock<HostDispatcherState>>, |
| } |
| |
| impl DiscoverySession { |
| pub fn on_discovery_end(&self) -> impl FusedFuture<Output = ()> { |
| self.dispatcher_state.write().discovery.on_stopped() |
| } |
| } |
| |
| impl Drop for DiscoverySession { |
| fn drop(&mut self) { |
| let mut write = self.dispatcher_state.write(); |
| if let Some(dur) = write.discovery.stop_discovery_session() { |
| inspect_log!(write.inspect.discovery_history, duration: dur.into_seconds_f64()); |
| } |
| } |
| } |
| |
| static RECENTLY_REMOVED_PEERS_COUNT: usize = 15; |
| static RECENT_DISCOVERY_SESSIONS_COUNT: usize = 5; |
| |
| struct HostDispatcherInspect { |
| _inspect: inspect::Node, |
| peers: inspect::Node, |
| hosts: inspect::Node, |
| host_count: inspect::UintProperty, |
| device_class: inspect::StringProperty, |
| peer_count: inspect::UintProperty, |
| input_capability: inspect::StringProperty, |
| output_capability: inspect::StringProperty, |
| has_pairing_delegate: inspect::UintProperty, |
| evicted_peers: BoundedListNode, |
| discovery_sessions: inspect::UintProperty, |
| discovery_history: BoundedListNode, |
| } |
| |
| impl HostDispatcherInspect { |
| pub fn new(inspect: inspect::Node) -> HostDispatcherInspect { |
| HostDispatcherInspect { |
| host_count: inspect.create_uint("host_count", 0), |
| peer_count: inspect.create_uint("peer_count", 0), |
| device_class: inspect.create_string("device_class", "default"), |
| input_capability: inspect.create_string("input_capability", "unknown"), |
| output_capability: inspect.create_string("output_capability", "unknown"), |
| has_pairing_delegate: inspect.create_uint("has_pairing_delegate", 0), |
| peers: inspect.create_child("peers"), |
| hosts: inspect.create_child("hosts"), |
| discovery_sessions: inspect.create_uint("discovery_sessions", 0), |
| discovery_history: BoundedListNode::new( |
| inspect.create_child("discovery_history"), |
| RECENT_DISCOVERY_SESSIONS_COUNT, |
| ), |
| evicted_peers: BoundedListNode::new( |
| inspect.create_child("recently_removed"), |
| RECENTLY_REMOVED_PEERS_COUNT, |
| ), |
| _inspect: inspect, |
| } |
| } |
| |
| pub fn peers(&self) -> &inspect::Node { |
| &self.peers |
| } |
| |
| pub fn hosts(&self) -> &inspect::Node { |
| &self.hosts |
| } |
| } |
| |
| /// The HostDispatcher acts as a proxy aggregating multiple HostAdapters |
| /// It appears as a Host to higher level systems, and is responsible for |
| /// routing commands to the appropriate HostAdapter |
| struct HostDispatcherState { |
| host_devices: HashMap<HostId, HostDevice>, |
| active_id: Option<HostId>, |
| |
| // Component storage. |
| stash: Stash, |
| |
| // GAP state |
| // Name, if set. If not set, hosts will not have a set name. |
| name: Option<String>, |
| appearance: Appearance, |
| discovery: DiscoveryState, |
| discoverable: Option<Weak<HostDiscoverableSession>>, |
| config_settings: build_config::Config, |
| peers: HashMap<PeerId, Inspectable<Peer>>, |
| |
| // Sender end of a futures::mpsc channel to send LocalServiceRequests to Generic Access Service. |
| // When a new host adapter is recognized, we create a new GasProxy, which takes GAS requests |
| // from the new host and forwards them along a clone of this channel to GAS |
| gas_channel_sender: mpsc::Sender<LocalServiceRequest>, |
| |
| pairing_dispatcher: Option<PairingDispatcherHandle>, |
| |
| watch_peers_publisher: hanging_get::Publisher<HashMap<PeerId, Peer>>, |
| watch_peers_registrar: hanging_get::SubscriptionRegistrar<PeerWatcher>, |
| |
| watch_hosts_publisher: hanging_get::Publisher<Vec<HostInfo>>, |
| watch_hosts_registrar: hanging_get::SubscriptionRegistrar<sys::HostWatcherWatchResponder>, |
| |
| // Pending requests to obtain a Host. |
| host_requests: Slab<Waker>, |
| |
| inspect: HostDispatcherInspect, |
| } |
| |
| impl HostDispatcherState { |
| /// Set the active adapter for this HostDispatcher |
| pub fn set_active_host(&mut self, adapter_id: HostId) -> types::Result<()> { |
| if let Some(id) = self.active_id { |
| if id == adapter_id { |
| return Ok(()); |
| } |
| |
| // Shut down the previously active host. |
| let _ = self.host_devices[&id].shutdown(); |
| } |
| |
| if self.host_devices.contains_key(&adapter_id) { |
| self.set_active_id(Some(adapter_id)); |
| Ok(()) |
| } else { |
| Err(types::Error::no_host()) |
| } |
| } |
| |
| /// Used to set the pairing delegate. If there is a prior pairing delegate connected to the |
| /// host, check if the existing stored connection is closed: |
| /// * if it is closed, overwrite it and succeed |
| /// * if it is still active, fail |
| /// If there is no prior delegate, this will always succeed |
| /// Returns `true` if the delegate was set successfully, otherwise false |
| fn set_pairing_delegate( |
| &mut self, |
| delegate: PairingDelegateProxy, |
| input: InputCapability, |
| output: OutputCapability, |
| ) -> types::Result<()> { |
| match self.pairing_dispatcher.as_ref() { |
| Some(dispatcher) if !dispatcher.is_closed() => { |
| Err(format_err!("Another Delegate is active"))? |
| } |
| _ => { |
| self.inspect.input_capability.set(&input.debug()); |
| self.inspect.output_capability.set(&output.debug()); |
| self.inspect.has_pairing_delegate.set(true.to_property()); |
| let (dispatcher, handle) = PairingDispatcher::new(delegate, input, output); |
| for host in self.host_devices.values() { |
| handle.add_host(host.id(), host.proxy().clone()); |
| } |
| // Old pairing dispatcher dropped; this drops all host pairings |
| self.pairing_dispatcher = Some(handle); |
| // Spawn handling of the new pairing requests |
| // TODO(https://fxbug.dev/42152480) - We should avoid detach() here, and consider a more |
| // explicit way to track this task |
| fasync::Task::spawn(dispatcher.run()).detach(); |
| Ok(()) |
| } |
| } |
| } |
| |
| /// Return the active id. If the ID is currently not set, |
| /// it will make the first ID in it's host_devices active |
| fn get_active_id(&mut self) -> Option<HostId> { |
| let active = self.active_id.clone(); |
| active.or_else(|| { |
| self.host_devices.keys().next().cloned().map(|id| { |
| self.set_active_id(Some(id)); |
| id |
| }) |
| }) |
| } |
| |
| /// Return the active host. If the Host is currently not set, |
| /// it will make the first ID in it's host_devices active |
| fn get_active_host(&mut self) -> Option<HostDevice> { |
| self.get_active_id().and_then(|id| self.host_devices.get(&id)).cloned() |
| } |
| |
| /// Resolves all pending OnAdapterFuture's. Called when we leave the init period (by seeing the |
| /// first host device or when the init timer expires). |
| fn resolve_host_requests(&mut self) { |
| for waker in &self.host_requests { |
| waker.1.wake_by_ref(); |
| } |
| } |
| |
| fn add_host(&mut self, id: HostId, host: HostDevice) { |
| if self.host_devices.insert(id, host.clone()).is_some() { |
| warn!("Host replaced: {}", id.to_string()) |
| } else { |
| info!("Host added: {}", id.to_string()); |
| } |
| |
| // If this is the only host, mark it as active. |
| let _ = self.get_active_id(); |
| |
| // Update inspect state |
| self.inspect.host_count.set(self.host_devices.len() as u64); |
| |
| // Notify HostWatcher interface clients about the new device. |
| self.notify_host_watchers(); |
| |
| // Resolve pending adapter futures. |
| self.resolve_host_requests(); |
| } |
| |
| /// Updates the active adapter and notifies listeners & host watchers. |
| fn set_active_id(&mut self, id: Option<HostId>) { |
| info!("New active adapter: {}", id.map_or("<none>".to_string(), |id| id.to_string())); |
| self.active_id = id; |
| self.notify_host_watchers(); |
| } |
| |
| pub fn notify_host_watchers(&self) { |
| // The HostInfo::active field for the active host must be filled in later. |
| let active_id = self.active_id; |
| |
| // Wait for the hanging get watcher to update so we can linearize updates |
| let current_hosts: Vec<HostInfo> = self |
| .host_devices |
| .values() |
| .map(|host| { |
| let mut info = host.info(); |
| // Fill in HostInfo::active |
| if let Some(active_id) = active_id { |
| info.active = active_id == host.id(); |
| } |
| info |
| }) |
| .collect(); |
| let mut publisher = self.watch_hosts_publisher.clone(); |
| fasync::Task::spawn(async move { |
| publisher |
| .set(current_hosts) |
| .await |
| .expect("Fatal error: Host Watcher HangingGet unreachable"); |
| }) |
| .detach(); |
| } |
| } |
| |
| #[derive(Clone)] |
| pub struct HostDispatcher { |
| state: Arc<RwLock<HostDispatcherState>>, |
| } |
| |
| impl HostDispatcher { |
| /// The HostDispatcher will forward all Generic Access Service requests to the mpsc::Receiver |
| /// end of |gas_channel_sender|. It is the responsibility of this function's caller to ensure |
| /// that these requests are handled. This can be done by passing the mpsc::Receiver into a |
| /// GenericAccessService struct and ensuring its run method is scheduled. |
| pub fn new( |
| appearance: Appearance, |
| stash: Stash, |
| inspect: inspect::Node, |
| gas_channel_sender: mpsc::Sender<LocalServiceRequest>, |
| watch_peers_publisher: hanging_get::Publisher<HashMap<PeerId, Peer>>, |
| watch_peers_registrar: hanging_get::SubscriptionRegistrar<PeerWatcher>, |
| watch_hosts_publisher: hanging_get::Publisher<Vec<HostInfo>>, |
| watch_hosts_registrar: hanging_get::SubscriptionRegistrar<sys::HostWatcherWatchResponder>, |
| ) -> HostDispatcher { |
| let hd = HostDispatcherState { |
| active_id: None, |
| host_devices: HashMap::new(), |
| name: None, |
| appearance, |
| config_settings: build_config::load_default(), |
| peers: HashMap::new(), |
| gas_channel_sender, |
| stash, |
| discovery: DiscoveryState::NotDiscovering, |
| discoverable: None, |
| pairing_dispatcher: None, |
| watch_peers_publisher, |
| watch_peers_registrar, |
| watch_hosts_publisher, |
| watch_hosts_registrar, |
| host_requests: Slab::new(), |
| inspect: HostDispatcherInspect::new(inspect), |
| }; |
| HostDispatcher { state: Arc::new(RwLock::new(hd)) } |
| } |
| |
| pub fn when_hosts_found(&self) -> impl Future<Output = HostDispatcher> { |
| WhenHostsFound::new(self.clone()) |
| } |
| |
| pub fn get_name(&self) -> String { |
| self.state.read().name.clone().unwrap_or(DEFAULT_DEVICE_NAME.to_string()) |
| } |
| |
| pub fn get_appearance(&self) -> Appearance { |
| self.state.read().appearance |
| } |
| |
| pub async fn set_name(&self, name: String, replace: NameReplace) -> types::Result<()> { |
| if NameReplace::Keep == replace && self.state.read().name.is_some() { |
| return Ok(()); |
| } |
| self.state.write().name = Some(name); |
| match self.active_host().await { |
| Some(host) => { |
| let name = self.get_name(); |
| host.set_name(name).await |
| } |
| None => Err(types::Error::no_host()), |
| } |
| } |
| |
| pub async fn set_device_class(&self, class: DeviceClass) -> types::Result<()> { |
| let class_repr = class.debug(); |
| let res = match self.active_host().await { |
| Some(host) => host.set_device_class(class).await, |
| None => Err(types::Error::no_host()), |
| }; |
| |
| // Update Inspect state |
| if res.is_ok() { |
| self.state.read().inspect.device_class.set(&class_repr); |
| } |
| res |
| } |
| |
| /// Set the active adapter for this HostDispatcher |
| pub fn set_active_host(&self, host: HostId) -> types::Result<()> { |
| self.state.write().set_active_host(host) |
| } |
| |
| /// Used to set the pairing delegate. If there is a prior pairing delegate connected to the |
| /// host, check if the existing stored connection is closed: |
| /// * if it is closed, overwrite it and succeed |
| /// * if it is still active, fail |
| /// If there is no prior delegate, this will always succeed |
| /// Returns `true` if the delegate was set successfully, otherwise false |
| pub fn set_pairing_delegate( |
| &self, |
| delegate: PairingDelegateProxy, |
| input: InputCapability, |
| output: OutputCapability, |
| ) -> types::Result<()> { |
| self.state.write().set_pairing_delegate(delegate, input, output) |
| } |
| |
| pub async fn apply_sys_settings(&self, new_settings: sys::Settings) -> build_config::Config { |
| let (host_devices, new_config) = { |
| let mut state = self.state.write(); |
| state.config_settings = state.config_settings.update_with_sys_settings(&new_settings); |
| (state.host_devices.clone(), state.config_settings.clone()) |
| }; |
| for (host_id, device) in host_devices { |
| let fut = device.apply_sys_settings(&new_settings); |
| if let Err(e) = fut.await { |
| warn!("Unable to apply new settings to host {}: {:?}", host_id, e); |
| let failed_host_path = device.path(); |
| self.rm_device(failed_host_path).await; |
| } |
| } |
| new_config |
| } |
| |
| async fn discover_on_active_host(&self) -> types::Result<DiscoverySessionProxy> { |
| match self.active_host().await { |
| Some(host) => HostDevice::start_discovery(&host), |
| None => Err(types::Error::no_host()), |
| } |
| } |
| |
| fn make_discovery_on_closed_task(&self, proxy: DiscoverySessionProxy) -> fasync::Task<()> { |
| fasync::Task::spawn(self.clone().process_discovery_on_closed(proxy)) |
| } |
| |
| async fn process_discovery_on_closed(self, proxy: DiscoverySessionProxy) { |
| // wait for DiscoverySession to close |
| let _ = proxy.on_closed().await; |
| debug!( |
| "process_discovery_event_stream: Discovery protocol closed (state: {:?})", |
| self.state.read().discovery |
| ); |
| |
| let old_state = |
| std::mem::replace(&mut self.state.write().discovery, DiscoveryState::NotDiscovering); |
| match old_state { |
| DiscoveryState::NotDiscovering | DiscoveryState::Pending { .. } => { |
| warn!("process_discovery_event_stream: Unexpected discovery event stream close in state {:?}", old_state); |
| } |
| DiscoveryState::Discovering { discovery_stopped_sender, .. } => { |
| let _ = discovery_stopped_sender.send(()); |
| } |
| DiscoveryState::Stopping { discovery_stopped_sender, session_sender, .. } => { |
| let _ = discovery_stopped_sender.send(()); |
| |
| // Restart discovery if clients queued session watchers while stopping. |
| if let Some(sender) = session_sender { |
| // On errors all session_watchers will be dropped, signaling receivers of |
| // the error. |
| if let Ok(session) = self.start_discovery().await { |
| let _ = sender.send(session.clone()); |
| } |
| } |
| } |
| }; |
| trace!("discovery_event_stream_task: completed"); |
| } |
| |
| fn make_start_discovery_task( |
| &self, |
| session: Arc<DiscoverySession>, |
| started: fasync::Time, |
| ) -> fasync::Task<()> { |
| let hd = self.clone(); |
| fasync::Task::spawn(async move { |
| debug!("start_discovery_task: waiting for discover_on_active_host"); |
| let Ok(discovery_proxy) = hd.discover_on_active_host().await else { |
| // On failure (host init timeout), revert state to NotDiscovering and drop Pending |
| // state to notify Receivers. |
| hd.state.write().discovery = DiscoveryState::NotDiscovering; |
| return; |
| }; |
| debug!("start_discovery_task: started discovery successfully"); |
| let _ = hd.state.read().inspect.discovery_sessions.add(1); |
| |
| let discovery_on_closed_task = |
| hd.make_discovery_on_closed_task(discovery_proxy.clone()); |
| |
| // Replace Pending state with new session and send session token to waiters |
| let old_state = |
| std::mem::replace(&mut hd.state.write().discovery, DiscoveryState::NotDiscovering); |
| if let DiscoveryState::Pending { |
| session_receiver: _, |
| session_sender, |
| discovery_stopped_receiver, |
| discovery_stopped_sender, |
| start_discovery_task: _, |
| } = old_state |
| { |
| hd.state.write().discovery = DiscoveryState::Discovering { |
| session: Arc::downgrade(&session), |
| discovery_proxy, |
| started, |
| discovery_on_closed_task, |
| discovery_stopped_receiver, |
| discovery_stopped_sender, |
| }; |
| let _ = session_sender.send(session.clone()); |
| } |
| trace!("start_discovery_task: completed"); |
| }) |
| } |
| |
| pub async fn start_discovery(&self) -> types::Result<Arc<DiscoverySession>> { |
| let session_fut = self.state.write().discovery.get_active_session_future(); |
| if let Some(session_fut) = session_fut { |
| debug!( |
| "start_discovery: awaiting DiscoverySession (state: {:?})", |
| self.state.read().discovery |
| ); |
| return session_fut |
| .await |
| .map_err(|_| format_err!("Pending discovery client channel closed").into()); |
| } |
| |
| let session = Arc::new(DiscoverySession { dispatcher_state: self.state.clone() }); |
| let started = fasync::Time::now(); |
| let (session_sender, session_receiver) = oneshot::channel(); |
| let session_receiver = session_receiver.shared(); |
| let (discovery_stopped_sender, discovery_stopped_receiver) = oneshot::channel(); |
| let discovery_stopped_receiver = discovery_stopped_receiver.shared(); |
| let start_discovery_task = self.make_start_discovery_task(session, started); |
| |
| // Immediately mark the state as pending to indicate to other requests to wait on |
| // this discovery session initialization |
| self.state.write().discovery = DiscoveryState::Pending { |
| session_receiver: session_receiver.clone(), |
| session_sender, |
| discovery_stopped_receiver: discovery_stopped_receiver.clone(), |
| discovery_stopped_sender, |
| start_discovery_task, |
| }; |
| |
| debug!( |
| "start_discovery: awaiting DiscoverySession for first client (state: {:?})", |
| self.state.read().discovery |
| ); |
| session_receiver |
| .await |
| .map_err(|_| format_err!("Pending discovery client channel closed").into()) |
| } |
| |
| // TODO(https://fxbug.dev/42139629) - This is susceptible to the same ToCtoToU race condition as |
| // start_discovery. We can fix with the same tri-state pattern as for discovery |
| pub async fn set_discoverable(&self) -> types::Result<Arc<HostDiscoverableSession>> { |
| let strong_current_token = |
| self.state.read().discoverable.as_ref().and_then(|token| token.upgrade()); |
| if let Some(token) = strong_current_token { |
| return Ok(Arc::clone(&token)); |
| } |
| |
| match self.active_host().await { |
| Some(host) => { |
| let token = Arc::new(host.establish_discoverable_session().await?); |
| self.state.write().discoverable = Some(Arc::downgrade(&token)); |
| Ok(token) |
| } |
| None => Err(types::Error::no_host()), |
| } |
| } |
| |
| fn stash(&self) -> Stash { |
| self.state.read().stash.clone() |
| } |
| |
| pub async fn forget(&self, peer_id: PeerId) -> types::Result<()> { |
| // Try to delete from each host, even if it might not have the peer. |
| // peers will be updated by the disconnection(s). |
| let hosts = self.get_all_adapters().await; |
| if hosts.is_empty() { |
| return Err(sys::Error::Failed.into()); |
| } |
| let mut hosts_removed: u32 = 0; |
| for host in hosts { |
| let host_path = host.path(); |
| |
| match host.forget(peer_id).await { |
| Ok(()) => hosts_removed += 1, |
| Err(types::Error::SysError(sys::Error::PeerNotFound)) => { |
| trace!("No peer {} on host {:?}; ignoring", peer_id, host_path); |
| } |
| err => { |
| error!("Could not forget peer {} on host {:?}", peer_id, host_path); |
| return err; |
| } |
| } |
| } |
| |
| if let Err(_) = self.stash().rm_peer(peer_id).await { |
| return Err(format_err!("Couldn't remove peer").into()); |
| } |
| |
| if hosts_removed == 0 { |
| return Err(format_err!("No hosts had peer").into()); |
| } |
| Ok(()) |
| } |
| |
| pub async fn connect(&self, peer_id: PeerId) -> types::Result<()> { |
| let host = self.active_host().await; |
| match host { |
| Some(host) => host.connect(peer_id).await, |
| None => Err(types::Error::SysError(sys::Error::Failed)), |
| } |
| } |
| |
| /// Instruct the active host to intitiate a pairing procedure with the target peer. If it |
| /// fails, we return the error we receive from the host |
| pub async fn pair(&self, id: PeerId, pairing_options: PairingOptions) -> types::Result<()> { |
| let host = self.active_host().await; |
| match host { |
| Some(host) => host.pair(id, pairing_options.into()).await, |
| None => Err(sys::Error::Failed.into()), |
| } |
| } |
| |
| // Attempt to disconnect peer with id `peer_id` from all transports |
| pub async fn disconnect(&self, peer_id: PeerId) -> types::Result<()> { |
| let host = self.active_host().await; |
| match host { |
| Some(host) => host.disconnect(peer_id).await, |
| None => Err(types::Error::no_host()), |
| } |
| } |
| |
| pub fn active_host(&self) -> impl Future<Output = Option<HostDevice>> { |
| self.when_hosts_found().map(|adapter| { |
| let mut wstate = adapter.state.write(); |
| wstate.get_active_host() |
| }) |
| } |
| |
| pub async fn get_all_adapters(&self) -> Vec<HostDevice> { |
| let _ = self.when_hosts_found().await; |
| self.state.read().host_devices.values().cloned().collect() |
| } |
| |
| #[cfg(test)] |
| pub fn get_adapters(&self) -> Vec<HostInfo> { |
| let hosts = self.state.read(); |
| hosts.host_devices.values().map(|host| host.info()).collect() |
| } |
| |
| pub async fn request_host_service(self, chan: zx::Channel, service: HostService) { |
| match self.active_host().await { |
| Some(host) => { |
| let host = host.proxy(); |
| match service { |
| HostService::LeCentral => { |
| let remote = ServerEnd::<CentralMarker>::new(chan.into()); |
| let _ = host.request_protocol(ProtocolRequest::Central(remote)); |
| } |
| HostService::LePeripheral => { |
| let remote = ServerEnd::<PeripheralMarker>::new(chan.into()); |
| let _ = host.request_protocol(ProtocolRequest::Peripheral(remote)); |
| } |
| HostService::LeGatt => { |
| let remote = ServerEnd::<Server_Marker>::new(chan.into()); |
| let _ = host.request_protocol(ProtocolRequest::GattServer(remote)); |
| } |
| HostService::LeGatt2 => { |
| let remote = ServerEnd::<Server_Marker2>::new(chan.into()); |
| let _ = host.request_protocol(ProtocolRequest::Gatt2Server(remote)); |
| } |
| HostService::Profile => { |
| let remote = ServerEnd::<ProfileMarker>::new(chan.into()); |
| let _ = host.request_protocol(ProtocolRequest::Profile(remote)); |
| } |
| } |
| } |
| None => eprintln!("Failed to spawn, no active host"), |
| } |
| } |
| |
| // This is not an async method as we do not want to borrow `self` for the duration of the async |
| // call, and we also want to trigger the send immediately even if the future is not yet awaited |
| pub fn store_bond(&self, bond_data: BondingData) -> impl Future<Output = Result<(), Error>> { |
| self.stash().store_bond(bond_data) |
| } |
| |
| pub fn on_device_updated(&self, peer: Peer) -> impl Future<Output = ()> { |
| let update_peer = peer.clone(); |
| |
| let mut publisher = { |
| let mut state = self.state.write(); |
| |
| let node = state.inspect.peers().create_child(unique_name("peer_")); |
| let peer = Inspectable::new(peer, node); |
| let _drop_old_value = state.peers.insert(peer.id.clone(), peer); |
| state.inspect.peer_count.set(state.peers.len() as u64); |
| state.watch_peers_publisher.clone() |
| }; |
| |
| // Wait for the hanging get watcher to update so we can linearize updates |
| async move { |
| publisher |
| .update(move |peers| { |
| let _ = peers.insert(update_peer.id, update_peer); |
| true |
| }) |
| .await |
| .expect("Fatal error: Peer Watcher HangingGet unreachable") |
| } |
| } |
| |
| pub fn on_device_removed(&self, id: PeerId) -> impl Future<Output = ()> { |
| let mut publisher = { |
| let mut state = self.state.write(); |
| if let Some(removed) = state.peers.remove(&id) { |
| inspect_log!(state.inspect.evicted_peers, peer: removed); |
| } |
| state.inspect.peer_count.set(state.peers.len() as u64); |
| state.watch_peers_publisher.clone() |
| }; |
| |
| // Wait for the hanging get watcher to update so we can linearize updates |
| async move { |
| publisher |
| .update(move |peers| { |
| // Updated if we actually removed something. |
| peers.remove(&id).is_some() |
| }) |
| .await |
| .expect("Fatal error: Peer Watcher HangingGet unreachable") |
| } |
| } |
| |
| pub async fn watch_peers(&self) -> hanging_get::Subscriber<PeerWatcher> { |
| let mut registrar = self.state.write().watch_peers_registrar.clone(); |
| registrar.new_subscriber().await.expect("Fatal error: Peer Watcher HangingGet unreachable") |
| } |
| |
| pub async fn watch_hosts(&self) -> hanging_get::Subscriber<sys::HostWatcherWatchResponder> { |
| let mut registrar = self.state.write().watch_hosts_registrar.clone(); |
| registrar.new_subscriber().await.expect("Fatal error: Host Watcher HangingGet unreachable") |
| } |
| |
| async fn spawn_gas_proxy(&self, gatt_server_proxy: Server_Proxy) -> Result<(), Error> { |
| let gas_channel = self.state.read().gas_channel_sender.clone(); |
| let gas_proxy = |
| generic_access_service::GasProxy::new(gatt_server_proxy, gas_channel).await?; |
| fasync::Task::spawn(gas_proxy.run().map(|r| { |
| r.unwrap_or_else(|err| { |
| warn!("Error passing message through Generic Access proxy: {:?}", err); |
| }) |
| })) |
| .detach(); |
| Ok(()) |
| } |
| |
| /// Commit all bootstrapped bonding identities to the system. This will update both the Stash |
| /// and our in memory store, and notify all hosts of new bonding identities. If we already have |
| /// bonding data for any of the peers (as identified by address), the new bootstrapped data |
| /// will override them. |
| pub async fn commit_bootstrap(&self, identities: Vec<Identity>) -> types::Result<()> { |
| // Store all new bonds in our permanent Store. If we cannot successfully record the bonds |
| // in the store, then Bootstrap.Commit() has failed. |
| let mut stash = self.state.read().stash.clone(); |
| for identity in identities { |
| stash.store_bonds(identity.bonds).await? |
| } |
| |
| // Notify all current hosts of any changes to their bonding data |
| let host_devices: Vec<_> = self.state.read().host_devices.values().cloned().collect(); |
| |
| for host in host_devices { |
| // If we fail to restore bonds to a given host, that is not a failure on a part of |
| // Bootstrap.Commit(), but a failure on the host. So do not return error from this |
| // function, but instead log and continue. |
| // TODO(https://fxbug.dev/42121837) - if a host fails we should close it and clean up after it |
| if let Err(error) = |
| try_restore_bonds(host.clone(), self.clone(), &host.public_address()).await |
| { |
| error!( |
| "Error restoring Bootstrapped bonds to host '{:?}': {}", |
| host.debug_identifiers(), |
| error |
| ) |
| } |
| } |
| Ok(()) |
| } |
| |
| /// Finishes initializing a host device by setting host configs and services. |
| async fn add_host_device(&self, host_device: &HostDevice) -> Result<(), Error> { |
| let dbg_ids = host_device.debug_identifiers(); |
| |
| // TODO(https://fxbug.dev/42145442): Make sure that the bt-host device is left in a well-known state if |
| // any of these operations fails. |
| |
| let address = host_device.public_address(); |
| assign_host_data(host_device.clone(), self.clone(), &address) |
| .await |
| .context(format!("{:?}: failed to assign identity to bt-host", dbg_ids))?; |
| try_restore_bonds(host_device.clone(), self.clone(), &address) |
| .await |
| .map_err(|e| e.as_failure())?; |
| |
| let config = self.state.read().config_settings.clone(); |
| host_device |
| .apply_config(config) |
| .await |
| .context(format!("{:?}: failed to configure bt-host device", dbg_ids))?; |
| |
| // Assign the name that is currently assigned to the HostDispatcher as the local name. |
| let name = self.get_name(); |
| host_device |
| .set_name(name) |
| .await |
| .map_err(|e| e.as_failure()) |
| .context(format!("{:?}: failed to set name of bt-host", dbg_ids))?; |
| |
| let (gatt_server_proxy, remote_gatt_server) = fidl::endpoints::create_proxy()?; |
| host_device |
| .proxy() |
| .request_protocol(ProtocolRequest::Gatt2Server(remote_gatt_server)) |
| .context(format!("{:?}: failed to open gatt server for bt-host", dbg_ids))?; |
| self.spawn_gas_proxy(gatt_server_proxy) |
| .await |
| .context(format!("{:?}: failed to spawn generic access service", dbg_ids))?; |
| |
| // Ensure the current active pairing delegate (if it exists) handles this host |
| self.handle_pairing_requests(host_device.clone()); |
| |
| self.state.write().add_host(host_device.id(), host_device.clone()); |
| |
| Ok(()) |
| } |
| |
| // Update our hanging_get server with the latest hosts. This will notify any pending |
| // hanging_gets and any new requests will see the new results. |
| fn notify_host_watchers(&self) { |
| self.state.write().notify_host_watchers(); |
| } |
| |
| pub async fn rm_device(&self, host_path: &str) { |
| let mut new_adapter_activated = false; |
| // Scope our HostDispatcherState lock |
| { |
| let mut hd = self.state.write(); |
| let active_id = hd.active_id.clone(); |
| |
| // Get the host IDs that match `host_path`. |
| let ids: Vec<HostId> = hd |
| .host_devices |
| .iter() |
| .filter(|(_, ref host)| host.path() == host_path) |
| .map(|(k, _)| k.clone()) |
| .collect(); |
| |
| let id_strs: Vec<String> = ids.iter().map(|id| id.to_string()).collect(); |
| info!("Host removed: {} (path: {:?})", id_strs.join(","), host_path); |
| |
| for id in &ids { |
| drop(hd.host_devices.remove(id)); |
| } |
| |
| // Reset the active ID if it got removed. |
| if let Some(active_id) = active_id { |
| if ids.contains(&active_id) { |
| hd.active_id = None; |
| } |
| } |
| |
| // Try to assign a new active adapter. This may send an "OnActiveAdapterChanged" event. |
| if hd.active_id.is_none() && hd.get_active_id().is_some() { |
| new_adapter_activated = true; |
| } |
| } // Now the lock is dropped, we can run the async notify |
| |
| if new_adapter_activated { |
| if let Err(err) = self.configure_newly_active_adapter().await { |
| warn!("Failed to persist state on adapter change: {:?}", err); |
| } |
| } |
| self.notify_host_watchers(); |
| } |
| |
| /// Configure a newly active adapter with the correct behavior for an active adapter. |
| async fn configure_newly_active_adapter(&self) -> types::Result<()> { |
| // Migrate discovery state to new host |
| let old_state = |
| std::mem::replace(&mut self.state.write().discovery, DiscoveryState::NotDiscovering); |
| match old_state { |
| DiscoveryState::NotDiscovering => {} |
| DiscoveryState::Pending { |
| session_receiver, |
| session_sender, |
| discovery_stopped_receiver, |
| discovery_stopped_sender, |
| start_discovery_task, |
| } => { |
| info!("migrating Pending discovery to new host"); |
| // Stop the old discovery task, and restart discovery on the new host. |
| drop(start_discovery_task); |
| let session = Arc::new(DiscoverySession { dispatcher_state: self.state.clone() }); |
| let started = fasync::Time::now(); |
| let start_discovery_task = self.make_start_discovery_task(session, started); |
| self.state.write().discovery = DiscoveryState::Pending { |
| session_receiver, |
| session_sender, |
| discovery_stopped_receiver, |
| discovery_stopped_sender, |
| start_discovery_task, |
| }; |
| } |
| DiscoveryState::Discovering { |
| session, |
| discovery_proxy, |
| started, |
| discovery_on_closed_task, |
| discovery_stopped_receiver, |
| discovery_stopped_sender, |
| } => { |
| info!("migrating Discovering discovery to new host"); |
| drop(discovery_on_closed_task); |
| drop(discovery_proxy); |
| |
| let session = session.upgrade().ok_or(format_err!("failed to upgrade session"))?; |
| |
| // Restart discovery. |
| let (session_sender, session_receiver) = oneshot::channel(); |
| let session_receiver = session_receiver.shared(); |
| let start_discovery_task = self.make_start_discovery_task(session, started); |
| self.state.write().discovery = DiscoveryState::Pending { |
| session_receiver: session_receiver.clone(), |
| session_sender, |
| discovery_stopped_receiver, |
| discovery_stopped_sender, |
| start_discovery_task, |
| }; |
| return session_receiver |
| .await |
| .map(|_| ()) |
| .map_err(|_| format_err!("Pending discovery client channel closed").into()); |
| } |
| DiscoveryState::Stopping { |
| discovery_proxy, |
| session_receiver: _, |
| session_sender, |
| discovery_on_closed_task: discovery_event_stream_task, |
| discovery_stopped_receiver: _, |
| discovery_stopped_sender, |
| } => { |
| info!("migrating Stopping discovery to new host"); |
| drop(discovery_event_stream_task); |
| drop(discovery_proxy); |
| let _ = discovery_stopped_sender.send(()); |
| |
| // Restart discovery if clients queued session receiver while stopping. |
| if let Some(sender) = session_sender { |
| // On errors, sender will be dropped, signaling receivers of |
| // the error. |
| if let Ok(session) = self.start_discovery().await { |
| let _ = sender.send(session.clone()); |
| } |
| } |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| /// Route pairing requests from this host through our pairing dispatcher, if it exists |
| fn handle_pairing_requests(&self, host: HostDevice) { |
| let mut dispatcher = self.state.write(); |
| |
| if let Some(handle) = &mut dispatcher.pairing_dispatcher { |
| handle.add_host(host.id(), host.proxy().clone()); |
| } |
| } |
| |
| pub async fn add_host_component(&self, proxy: HostProxy) -> types::Result<()> { |
| info!("Adding host component"); |
| |
| let node = self.state.read().inspect.hosts().create_child(unique_name("device_")); |
| |
| let proxy_handle = proxy.as_channel().raw_handle().to_string(); |
| let host_device = init_host(proxy_handle.as_str(), node, proxy).await?; |
| info!("Successfully started host device: {:?}", host_device.info()); |
| self.add_host_device(&host_device).await?; |
| |
| // Start listening to Host interface events. |
| fasync::Task::spawn({ |
| let this = self.clone(); |
| async move { |
| match host_device.watch_events(this.clone()).await { |
| Ok(()) => (), |
| Err(e) => { |
| warn!("Error handling host event: {e:?}"); |
| let host_path = proxy_handle.as_str(); |
| this.rm_device(&host_path).await; |
| } |
| } |
| } |
| }) |
| .detach(); |
| |
| Ok(()) |
| } |
| } |
| |
| async fn init_host(path: &str, node: inspect::Node, proxy: HostProxy) -> Result<HostDevice, Error> { |
| node.record_string("path", path); |
| |
| // Obtain basic information and create and entry in the dispatcher's map. |
| let host_info = proxy.watch_state().await.context("failed to obtain bt-host information")?; |
| let host_info = Inspectable::new(HostInfo::try_from(host_info)?, node); |
| |
| Ok(HostDevice::new(path.to_string(), proxy, host_info)) |
| } |
| |
| impl HostListener for HostDispatcher { |
| type PeerUpdatedFut = BoxFuture<'static, ()>; |
| fn on_peer_updated(&mut self, peer: Peer) -> Self::PeerUpdatedFut { |
| self.on_device_updated(peer).boxed() |
| } |
| type PeerRemovedFut = BoxFuture<'static, ()>; |
| fn on_peer_removed(&mut self, id: PeerId) -> Self::PeerRemovedFut { |
| self.on_device_removed(id).boxed() |
| } |
| type HostBondFut = BoxFuture<'static, Result<(), anyhow::Error>>; |
| fn on_new_host_bond(&mut self, data: BondingData) -> Self::HostBondFut { |
| self.store_bond(data).boxed() |
| } |
| |
| type HostInfoFut = BoxFuture<'static, Result<(), anyhow::Error>>; |
| fn on_host_updated(&mut self, _info: HostInfo) -> Self::HostInfoFut { |
| self.notify_host_watchers(); |
| async { Ok(()) }.boxed() |
| } |
| } |
| |
| /// A future that completes when at least one adapter is available. |
| #[must_use = "futures do nothing unless polled"] |
| struct WhenHostsFound { |
| hd: HostDispatcher, |
| waker_key: Option<usize>, |
| } |
| |
| impl WhenHostsFound { |
| // Constructs an WhenHostsFound that completes at the latest after HOST_INIT_TIMEOUT seconds. |
| fn new(hd: HostDispatcher) -> impl Future<Output = HostDispatcher> { |
| WhenHostsFound { hd: hd.clone(), waker_key: None }.on_timeout( |
| Duration::from_seconds(HOST_INIT_TIMEOUT).after_now(), |
| move || { |
| { |
| let mut inner = hd.state.write(); |
| if inner.host_devices.len() == 0 { |
| info!("No bt-host devices found"); |
| inner.resolve_host_requests(); |
| } |
| } |
| hd |
| }, |
| ) |
| } |
| |
| fn remove_waker(&mut self) { |
| if let Some(key) = self.waker_key { |
| drop(self.hd.state.write().host_requests.remove(key)); |
| } |
| self.waker_key = None; |
| } |
| } |
| |
| impl Drop for WhenHostsFound { |
| fn drop(&mut self) { |
| self.remove_waker() |
| } |
| } |
| |
| impl Unpin for WhenHostsFound {} |
| |
| impl Future for WhenHostsFound { |
| type Output = HostDispatcher; |
| |
| fn poll(mut self: ::std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| if self.hd.state.read().host_devices.len() == 0 { |
| let hd = self.hd.clone(); |
| if self.waker_key.is_none() { |
| self.waker_key = Some(hd.state.write().host_requests.insert(cx.waker().clone())); |
| } |
| Poll::Pending |
| } else { |
| self.remove_waker(); |
| Poll::Ready(self.hd.clone()) |
| } |
| } |
| } |
| |
| async fn try_restore_bonds( |
| host_device: HostDevice, |
| hd: HostDispatcher, |
| address: &Address, |
| ) -> types::Result<()> { |
| // Load bonding data that use this host's `address` as their "local identity address". |
| let opt_data = hd.stash().list_bonds(address.clone()).await?; |
| let data = match opt_data { |
| Some(data) => data, |
| None => return Ok(()), |
| }; |
| match host_device.restore_bonds(data).await { |
| Err(e) => { |
| error!("failed to restore bonding data for host: {:?}", e); |
| Err(e) |
| } |
| Ok(errors) => { |
| if errors.is_empty() { |
| Ok(()) |
| } else { |
| let msg = |
| errors.into_iter().fold("".to_string(), |acc, b| format!("{}, {:?}", acc, b)); |
| let msg = format!("failed to restore bonding data: {}", msg); |
| error!("{}", msg); |
| Err(anyhow!(msg).into()) |
| } |
| } |
| } |
| } |
| |
| fn generate_irk() -> Result<sys::Key, zx::Status> { |
| let mut buf: [u8; 16] = [0; 16]; |
| // Generate a secure IRK. |
| zx::cprng_draw(&mut buf); |
| Ok(sys::Key { value: buf }) |
| } |
| |
| async fn assign_host_data( |
| host: HostDevice, |
| hd: HostDispatcher, |
| address: &Address, |
| ) -> Result<(), Error> { |
| // Obtain an existing IRK or generate a new one if one doesn't already exists for |address|. |
| let data = match hd.stash().get_host_data(address.clone()).await? { |
| Some(host_data) => { |
| trace!("restored IRK"); |
| host_data.clone() |
| } |
| None => { |
| // Generate a new IRK. |
| trace!("generating new IRK"); |
| let new_data = HostData { irk: Some(generate_irk()?) }; |
| |
| if let Err(e) = hd.stash().store_host_data(address.clone(), new_data.clone()).await { |
| error!("failed to persist local IRK"); |
| return Err(e.into()); |
| } |
| new_data |
| } |
| }; |
| host.set_local_data(data).map_err(|e| e.into()) |
| } |
| |
| #[cfg(test)] |
| pub(crate) mod test { |
| use super::*; |
| |
| use fidl_fuchsia_bluetooth_gatt2::{ |
| LocalServiceProxy, Server_Request, Server_RequestStream as GattServerRequestStream, |
| }; |
| use fidl_fuchsia_bluetooth_host::{HostRequest, HostRequestStream}; |
| use futures::{future::join, StreamExt}; |
| |
| pub(crate) fn make_test_dispatcher( |
| watch_peers_publisher: hanging_get::Publisher<HashMap<PeerId, Peer>>, |
| watch_peers_registrar: hanging_get::SubscriptionRegistrar<PeerWatcher>, |
| watch_hosts_publisher: hanging_get::Publisher<Vec<HostInfo>>, |
| watch_hosts_registrar: hanging_get::SubscriptionRegistrar<sys::HostWatcherWatchResponder>, |
| ) -> HostDispatcher { |
| let (gas_channel_sender, _ignored_gas_task_req_stream) = mpsc::channel(0); |
| HostDispatcher::new( |
| Appearance::Display, |
| Stash::in_memory_mock(), |
| fuchsia_inspect::Node::default(), |
| gas_channel_sender, |
| watch_peers_publisher, |
| watch_peers_registrar, |
| watch_hosts_publisher, |
| watch_hosts_registrar, |
| ) |
| } |
| |
| pub(crate) fn make_simple_test_dispatcher() -> HostDispatcher { |
| let watch_peers_broker = hanging_get::HangingGetBroker::new( |
| HashMap::new(), |
| |_, _| true, |
| hanging_get::DEFAULT_CHANNEL_SIZE, |
| ); |
| let watch_hosts_broker = hanging_get::HangingGetBroker::new( |
| Vec::new(), |
| |_, _| true, |
| hanging_get::DEFAULT_CHANNEL_SIZE, |
| ); |
| |
| let dispatcher = make_test_dispatcher( |
| watch_peers_broker.new_publisher(), |
| watch_peers_broker.new_registrar(), |
| watch_hosts_broker.new_publisher(), |
| watch_hosts_broker.new_registrar(), |
| ); |
| |
| let watchers_fut = join(watch_peers_broker.run(), watch_hosts_broker.run()).map(|_| ()); |
| fasync::Task::spawn(watchers_fut).detach(); |
| dispatcher |
| } |
| |
| #[derive(Default)] |
| pub(crate) struct GasEndpoints { |
| gatt_server: Option<GattServerRequestStream>, |
| service: Option<LocalServiceProxy>, |
| } |
| |
| async fn handle_standard_host_server_init( |
| mut host_server: HostRequestStream, |
| ) -> (HostRequestStream, GasEndpoints) { |
| let mut gas_endpoints = GasEndpoints::default(); |
| while gas_endpoints.gatt_server.is_none() { |
| match host_server.next().await { |
| Some(Ok(HostRequest::SetLocalName { responder, .. })) => { |
| info!("Setting Local Name"); |
| let _ = responder.send(Ok(())); |
| } |
| Some(Ok(HostRequest::SetDeviceClass { responder, .. })) => { |
| info!("Setting Device Class"); |
| let _ = responder.send(Ok(())); |
| } |
| Some(Ok(HostRequest::RequestProtocol { |
| payload: ProtocolRequest::Gatt2Server(server), |
| .. |
| })) => { |
| // don't respond at all on the server side. |
| info!("Storing Gatt Server"); |
| let mut gatt_server = server.into_stream().unwrap(); |
| info!("GAS Server was started, waiting for publish"); |
| // The Generic Access Service now publishes itself. |
| match gatt_server.next().await { |
| Some(Ok(Server_Request::PublishService { info, service, responder })) => { |
| info!("Captured publish of GAS Service: {:?}", info); |
| gas_endpoints.service = Some(service.into_proxy().unwrap()); |
| let _ = responder.send(Ok(())); |
| } |
| x => error!("Got unexpected GAS Server request: {:?}", x), |
| } |
| gas_endpoints.gatt_server = Some(gatt_server); |
| } |
| Some(Ok(HostRequest::SetConnectable { responder, .. })) => { |
| info!("Setting connectable"); |
| let _ = responder.send(Ok(())); |
| } |
| Some(Ok(req)) => info!("Unhandled Host Request in add: {:?}", req), |
| Some(Err(e)) => error!("Error in host server: {:?}", e), |
| None => break, |
| } |
| } |
| info!("Finishing host_device mocking for add host"); |
| (host_server, gas_endpoints) |
| } |
| |
| pub(crate) async fn create_and_add_test_host_to_dispatcher( |
| id: HostId, |
| dispatcher: &HostDispatcher, |
| ) -> types::Result<(HostRequestStream, HostDevice, GasEndpoints)> { |
| let (host_server, host_device) = HostDevice::mock_from_id(id); |
| let host_server_init_handler = handle_standard_host_server_init(host_server); |
| let (res, (host_server, gas_endpoints)) = |
| join(dispatcher.add_host_device(&host_device), host_server_init_handler).await; |
| res?; |
| Ok((host_server, host_device, gas_endpoints)) |
| } |
| } |