[bt][gap] Use async/await, hide HostDispatcher

* Update bt-gap to use Rust 2018's async/await syntax
* Wrap the HostDispatcher in an opaque type to hide the Arc<RwLock<>>
* Refactor services into smaller units of work
* NET-1536 #done

Test: compiles
Change-Id: Iebb3ea61e0aa289611313d45bfda6a3613ff4496
diff --git a/bin/bluetooth/bt-gap/src/host_device.rs b/bin/bluetooth/bt-gap/src/host_device.rs
index 72237f0..d067ddc 100644
--- a/bin/bluetooth/bt-gap/src/host_device.rs
+++ b/bin/bluetooth/bt-gap/src/host_device.rs
@@ -121,7 +121,7 @@
 }
 
 pub fn run(
-    hd: Arc<RwLock<HostDispatcher>>, host: Arc<RwLock<HostDevice>>,
+    hd: HostDispatcher, host: Arc<RwLock<HostDevice>>,
 ) -> impl Future<Output = Result<(), fidl::Error>> {
     make_clones!(host => host_stream, host);
     let stream = host_stream.read().host.take_event_stream();
@@ -133,7 +133,7 @@
             // TODO(NET-968): Add integration test for this.
             HostEvent::OnDeviceUpdated { mut device } => {
                 // TODO(NET-1297): generic method for this pattern
-                for listener in hd.read().event_listeners.iter() {
+                for listener in hd.event_listeners().iter() {
                     let _res = listener
                         .send_on_device_updated(&mut device)
                         .map_err(|e| fx_log_err!("Failed to send device updated event: {:?}", e));
@@ -141,7 +141,7 @@
             }
             // TODO(NET-1038): Add integration test for this.
             HostEvent::OnDeviceRemoved { identifier } => {
-                for listener in hd.read().event_listeners.iter() {
+                for listener in hd.event_listeners().iter() {
                     let _res = listener
                         .send_on_device_removed(&identifier)
                         .map_err(|e| fx_log_err!("Failed to send device removed event: {:?}", e));
@@ -150,7 +150,7 @@
             HostEvent::OnNewBondingData { mut data } => {
                 fx_log_info!("Received Bonding Data: {:#?}", data);
                 let id = host.read().get_info().identifier.clone();
-                if let Some(ref bond_events) = hd.read().bonding_events {
+                if let Some(ref bond_events) = hd.bonding_listener() {
                     let _res = bond_events
                         .send_on_new_bonding_data(id.as_str(), &mut data)
                         .map_err(|e| fx_log_err!("Failed to send device bonded event: {:?}", e));
diff --git a/bin/bluetooth/bt-gap/src/host_dispatcher.rs b/bin/bluetooth/bt-gap/src/host_dispatcher.rs
index f1fcf83..b1fe3ef 100644
--- a/bin/bluetooth/bt-gap/src/host_dispatcher.rs
+++ b/bin/bluetooth/bt-gap/src/host_dispatcher.rs
@@ -8,27 +8,26 @@
 use failure::Error;
 use fidl;
 use fidl::encoding::OutOfLine;
+use fidl::endpoints::ServerEnd;
 use fidl_fuchsia_bluetooth;
 use fidl_fuchsia_bluetooth_control::{
     AdapterInfo, BondingControlHandle, ControlControlHandle, PairingDelegateMarker,
-    PairingDelegateProxy,
+    PairingDelegateProxy, InputCapabilityType, OutputCapabilityType
 };
-use fidl_fuchsia_bluetooth_control::{InputCapabilityType, OutputCapabilityType};
+use fidl_fuchsia_bluetooth_bredr::ProfileMarker;
+use fidl_fuchsia_bluetooth_gatt::Server_Marker;
 use fidl_fuchsia_bluetooth_host::HostProxy;
-use fidl_fuchsia_bluetooth_le::CentralProxy;
-use fuchsia_async::{
-    self as fasync,
-    temp::Either::{Left, Right},
-    TimeoutExt,
-};
+use fidl_fuchsia_bluetooth_le::{CentralProxy, CentralMarker, PeripheralMarker};
+
 use fuchsia_bluetooth::{
     self as bt, bt_fidl_status, error::Error as BTError, util::clone_host_info,
 };
+use fuchsia_async::{self as fasync, TimeoutExt};
 use fuchsia_syslog::{fx_log, fx_log_err, fx_log_info, fx_log_warn};
 use fuchsia_vfs_watcher as vfs_watcher;
+use fuchsia_vfs_watcher::{WatchEvent, WatchMessage};
 use fuchsia_zircon as zx;
 use fuchsia_zircon::Duration;
-use futures::future;
 use futures::TryStreamExt;
 use futures::{task, Future, Poll, TryFutureExt};
 use parking_lot::RwLock;
@@ -45,6 +44,18 @@
 static BT_HOST_DIR: &'static str = "/dev/class/bt-host";
 static DEFAULT_NAME: &'static str = "fuchsia";
 
+/// Available FIDL services that can be provided by a particular Host
+pub enum HostService {
+    LeCentral,
+    LePeripheral,
+    LeGatt,
+    Profile
+}
+
+// We use tokens to track the reference counting for discovery/discoverable states
+// As long as at least one user maintains an Arc<> to the token, the state persists
+// Once all references are dropped, the `Drop` trait on the token causes the state
+// to be terminated.
 pub struct DiscoveryRequestToken {
     adap: Weak<RwLock<HostDevice>>,
 }
@@ -52,8 +63,7 @@
 impl Drop for DiscoveryRequestToken {
     fn drop(&mut self) {
         if let Some(host) = self.adap.upgrade() {
-            let host = host.write();
-            host.stop_discovery();
+            host.write().stop_discovery();
         }
     }
 }
@@ -71,7 +81,10 @@
     }
 }
 
-pub struct HostDispatcher {
+/// The HostDispatcher acts as a proxy aggregating multiple HostAdapters
+/// It appears as a Host to higher level systems, and is responsible for
+/// routing commands to the appropriate HostAdapter
+struct HostDispatcherState {
     host_devices: HashMap<String, Arc<RwLock<HostDevice>>>,
     active_id: Option<String>,
 
@@ -90,20 +103,23 @@
     host_requests: Slab<task::Waker>,
 }
 
-impl HostDispatcher {
-    pub fn new() -> HostDispatcher {
-        HostDispatcher {
-            active_id: None,
-            host_devices: HashMap::new(),
-            name: DEFAULT_NAME.to_string(),
-            input: InputCapabilityType::None,
-            output: OutputCapabilityType::None,
-            discovery: None,
-            discoverable: None,
-            pairing_delegate: None,
-            bonding_events: None,
-            event_listeners: vec![],
-            host_requests: Slab::new(),
+impl HostDispatcherState {
+    /// Set the active adapter for this HostDispatcher
+    pub fn set_active_adapter(&mut self, adapter_id: String) -> fidl_fuchsia_bluetooth::Status {
+        if let Some(ref id) = self.active_id {
+            if *id == adapter_id {
+                return bt_fidl_status!(Already, "Adapter already active");
+            }
+
+            // Shut down the previously active host.
+            let _ = self.host_devices[id].write().close();
+        }
+
+        if self.host_devices.contains_key(&adapter_id) {
+            self.set_active_id(Some(adapter_id));
+            bt_fidl_status!()
+        } else {
+            bt_fidl_status!(NotFound, "Attempting to activate an unknown adapter")
         }
     }
 
@@ -140,269 +156,32 @@
         self.pairing_delegate.clone()
     }
 
-    pub fn set_name(
-        hd: Arc<RwLock<HostDispatcher>>,
-        name: Option<String>,
-    ) -> impl Future<Output = fidl::Result<fidl_fuchsia_bluetooth::Status>> {
-        hd.write().name = match name {
-            Some(name) => name,
-            None => DEFAULT_NAME.to_string(),
-        };
-        HostDispatcher::get_active_adapter(hd.clone()).and_then(move |adapter| match adapter {
-            Some(adapter) => Left(adapter.write().set_name(hd.read().name.clone())),
-            None => Right(future::ready(Ok(bt_fidl_status!(
-                BluetoothNotAvailable,
-                "No Adapter found"
-            )))),
-        })
-    }
-
-    /// Return the active id. If the ID is current not set,
-    /// this fn will make the first ID in it's host_devices active
+    /// Return the active id. If the ID is currently not set,
+    /// it will make the first ID in it's host_devices active
     fn get_active_id(&mut self) -> Option<String> {
         match self.active_id {
-            None => {
-                let id = match self.host_devices.keys().next() {
-                    None => {
-                        return None;
-                    }
-                    Some(id) => id.clone(),
-                };
-                self.set_active_id(Some(id));
-                self.active_id.clone()
-            }
+            None => match self.host_devices.keys().next() {
+                None => None,
+                Some(id) => {
+                    self.set_active_id(Some(id.clone()));
+                    self.active_id.clone()
+                }
+            },
             ref id => id.clone(),
         }
     }
 
-    pub fn start_discovery(
-        hd: Arc<RwLock<HostDispatcher>>,
-    ) -> impl Future<
-        Output = fidl::Result<(
-            fidl_fuchsia_bluetooth::Status,
-            Option<Arc<DiscoveryRequestToken>>,
-        )>,
-    > {
-        let strong_current_token = match hd.read().discovery {
-            Some(ref token) => token.upgrade(),
-            None => None,
-        };
-        if let Some(token) = strong_current_token {
-            return Left(future::ready(Ok((
-                bt_fidl_status!(),
-                Some(Arc::clone(&token)),
-            ))));
-        }
-
-        Right(HostDispatcher::get_active_adapter(hd.clone()).and_then(
-            move |adapter| match adapter {
-                Some(adapter) => {
-                    let weak_adapter = Arc::downgrade(&adapter);
-                    Right(adapter.write().start_discovery().and_then(
-                        move |resp| match resp.error {
-                            Some(_) => future::ready(Ok((resp, None))),
-                            None => {
-                                let token = Arc::new(DiscoveryRequestToken { adap: weak_adapter });
-                                hd.write().discovery = Some(Arc::downgrade(&token));
-                                future::ready(Ok((resp, Some(token))))
-                            }
-                        },
-                    ))
-                }
-                None => Left(future::ready(Ok((
-                    bt_fidl_status!(BluetoothNotAvailable, "No Adapter found"),
-                    None,
-                )))),
-            },
-        ))
+    /// Return the active host. If the Host is currently not set,
+    /// it will make the first ID in it's host_devices active
+    fn get_active_host(&mut self) -> Option<Arc<RwLock<HostDevice>>> {
+        self.get_active_id()
+            .as_ref()
+            .and_then(|id| self.host_devices.get(id))
+            .map(|host| host.clone())
     }
 
-    pub fn set_discoverable(
-        hd: Arc<RwLock<HostDispatcher>>,
-    ) -> impl Future<
-        Output = fidl::Result<(
-            fidl_fuchsia_bluetooth::Status,
-            Option<Arc<DiscoverableRequestToken>>,
-        )>,
-    > {
-        let strong_current_token = match hd.read().discoverable {
-            Some(ref token) => token.upgrade(),
-            None => None,
-        };
-        if let Some(token) = strong_current_token {
-            return Left(future::ready(Ok((
-                bt_fidl_status!(),
-                Some(Arc::clone(&token)),
-            ))));
-        }
-
-        Right(HostDispatcher::get_active_adapter(hd.clone()).and_then(
-            move |adapter| match adapter {
-                Some(adapter) => {
-                    let weak_adapter = Arc::downgrade(&adapter);
-                    let res =
-                        adapter
-                            .write()
-                            .set_discoverable(true)
-                            .and_then(move |resp| match resp.error {
-                                Some(_) => future::ready(Ok((resp, None))),
-                                None => {
-                                    let token =
-                                        Arc::new(DiscoverableRequestToken { adap: weak_adapter });
-                                    hd.write().discoverable = Some(Arc::downgrade(&token));
-                                    future::ready(Ok((resp, Some(token))))
-                                }
-                            });
-                    Right(res)
-                }
-                None => Left(future::ready(Ok((
-                    bt_fidl_status!(BluetoothNotAvailable, "No Adapter found"),
-                    None,
-                )))),
-            },
-        ))
-    }
-
-    pub fn set_active_adapter(&mut self, adapter_id: String) -> fidl_fuchsia_bluetooth::Status {
-        if let Some(ref id) = self.active_id {
-            if *id == adapter_id {
-                return bt_fidl_status!(Already, "Adapter already active");
-            }
-
-            // Shut down the previously active host.
-            let _ = self.host_devices[id].write().close();
-        }
-
-        if self.host_devices.contains_key(&adapter_id) {
-            self.set_active_id(Some(adapter_id));
-            bt_fidl_status!()
-        } else {
-            bt_fidl_status!(NotFound, "Attempting to activate an unknown adapter")
-        }
-    }
-
-    pub fn get_active_adapter_info(&mut self) -> Option<AdapterInfo> {
-        match self.get_active_id() {
-            Some(ref id) => {
-                // Id must always be valid
-                let host = self.host_devices.get(id).unwrap().read();
-                Some(util::clone_host_info(host.get_info()))
-            }
-            None => None,
-        }
-    }
-
-    pub fn connect_le_central(
-        hd: Arc<RwLock<HostDispatcher>>,
-    ) -> impl Future<Output = fidl::Result<Option<CentralProxy>>> {
-        OnAdaptersFound::new(hd.clone()).and_then(|hd| {
-            let mut hd = hd.write();
-            future::ready(Ok(match hd.get_active_id() {
-                Some(ref id) => {
-                    let host = hd.host_devices.get(id).unwrap();
-                    Some(host.write().connect_le_central().unwrap())
-                }
-                None => None,
-            }))
-        })
-    }
-
-    pub fn connect(
-        hd: Arc<RwLock<HostDispatcher>>,
-        device_id: String,
-    ) -> impl Future<Output = fidl::Result<fidl_fuchsia_bluetooth::Status>> {
-        HostDispatcher::connect_le_central(hd.clone()).and_then(move |central| {
-            let (service_local, service_remote) = fidl::endpoints::create_proxy().unwrap();
-
-            let central = central.unwrap();
-            let connected = central.connect_peripheral(device_id.as_str(), service_remote);
-            connected.and_then(move |status| {
-                let host = hd.clone();
-                // TODO(NET-1092): We want this as a host.fidl API
-                HostDispatcher::get_active_adapter(host).and_then(move |adapter| match adapter {
-                    Some(adapter) => {
-                        adapter
-                            .write()
-                            .store_gatt(device_id, central, service_local);
-                        future::ready(Ok(status))
-                    }
-                    None => future::ready(Ok(bt_fidl_status!(
-                        BluetoothNotAvailable,
-                        "Adapter went away"
-                    ))),
-                })
-            })
-        })
-    }
-
-    pub fn forget(
-        hd: Arc<RwLock<HostDispatcher>>,
-        device_id: String,
-    ) -> impl Future<Output = fidl::Result<fidl_fuchsia_bluetooth::Status>> {
-        let id = device_id.clone();
-        HostDispatcher::get_active_adapter(hd.clone())
-            .and_then(move |adapter| {
-                future::ready(Ok(match adapter {
-                    Some(adapter) => {
-                        adapter.write().rm_gatt(device_id);
-                        bt_fidl_status!()
-                    }
-                    None => bt_fidl_status!(BluetoothNotAvailable, "Adapter went away"),
-                }))
-            }).and_then(move |status| {
-                let event = &hd.read().bonding_events;
-                future::ready(Ok(match event {
-                    Some(events) => {
-                        let _res = events.send_on_delete_bond(id.as_str()).map_err(|e| {
-                            fx_log_err!("Failed to send device updated event: {:?}", e)
-                        });
-
-                        status
-                    }
-                    None => bt_fidl_status!(BluetoothNotAvailable, "Adapter went away"),
-                }))
-            })
-    }
-    pub fn disconnect(
-        hd: Arc<RwLock<HostDispatcher>>,
-        device_id: String,
-    ) -> impl Future<Output = fidl::Result<fidl_fuchsia_bluetooth::Status>> {
-        HostDispatcher::get_active_adapter(hd).and_then(move |adapter| match adapter {
-            Some(adapter) => Right(adapter.write().rm_gatt(device_id)),
-            None => Left(future::ready(Ok(bt_fidl_status!(
-                BluetoothNotAvailable,
-                "Adapter went away"
-            )))),
-        })
-    }
-
-    pub fn get_active_adapter(
-        hd: Arc<RwLock<HostDispatcher>>,
-    ) -> impl Future<Output = fidl::Result<Option<Arc<RwLock<HostDevice>>>>> {
-        OnAdaptersFound::new(hd.clone()).and_then(|hd| {
-            let mut hd = hd.write();
-            future::ready(Ok(match hd.get_active_id() {
-                Some(ref id) => Some(hd.host_devices.get(id).unwrap().clone()),
-                None => None,
-            }))
-        })
-    }
-
-    pub fn get_adapters(
-        hd: &mut Arc<RwLock<HostDispatcher>>,
-    ) -> impl Future<Output = fidl::Result<Vec<AdapterInfo>>> {
-        OnAdaptersFound::new(hd.clone()).and_then(|hd| {
-            let mut result = vec![];
-            for host in hd.read().host_devices.values() {
-                let host = host.read();
-                result.push(util::clone_host_info(host.get_info()));
-            }
-            future::ready(Ok(result))
-        })
-    }
-
-    // Resolves all pending OnAdapterFuture's. Called when we leave the init period (by seeing the
-    // first host device or when the init timer expires).
+    /// Resolves all pending OnAdapterFuture's. Called when we leave the init period (by seeing the
+    /// first host device or when the init timer expires).
     fn resolve_host_requests(&mut self) {
         for waker in &self.host_requests {
             waker.1.wake();
@@ -413,7 +192,7 @@
         self.host_devices.insert(id, host);
     }
 
-    // Updates the active adapter and sends a FIDL event.
+    /// Updates the active adapter and sends a FIDL event.
     fn set_active_id(&mut self, id: Option<String>) {
         fx_log_info!("New active adapter: {:?}", id);
         self.active_id = id;
@@ -423,20 +202,288 @@
             }
         }
     }
+
+    pub fn get_active_adapter_info(&mut self) -> Option<AdapterInfo> {
+        self.get_active_host().map(|host| util::clone_host_info(host.read().get_info()))
+    }
+}
+
+#[derive(Clone)]
+pub struct HostDispatcher {
+    state: Arc<RwLock<HostDispatcherState>>,
+}
+
+impl HostDispatcher {
+    pub fn new() -> HostDispatcher {
+        let hd = HostDispatcherState {
+            active_id: None,
+            host_devices: HashMap::new(),
+            name: DEFAULT_NAME.to_string(),
+            input: InputCapabilityType::None,
+            output: OutputCapabilityType::None,
+            discovery: None,
+            discoverable: None,
+            pairing_delegate: None,
+            bonding_events: None,
+            event_listeners: vec![],
+            host_requests: Slab::new(),
+        };
+        HostDispatcher {
+            state: Arc::new(RwLock::new(hd)),
+        }
+    }
+
+    pub fn get_active_adapter_info(&mut self) -> Option<AdapterInfo> {
+        self.state.write().get_active_adapter_info()
+    }
+
+    pub async fn on_adapters_found(&self) -> fidl::Result<HostDispatcher> {
+        await!(OnAdaptersFound::new(self.clone()))
+    }
+
+    pub async fn set_name(
+        &mut self, name: Option<String>,
+    ) -> fidl::Result<fidl_fuchsia_bluetooth::Status> {
+        self.state.write().name = name.unwrap_or(DEFAULT_NAME.to_string());
+
+        match await!(self.get_active_adapter())? {
+            Some(adapter) => await!(adapter.write().set_name(self.state.read().name.clone())),
+            None => Ok(bt_fidl_status!(BluetoothNotAvailable, "No Adapter found")),
+        }
+    }
+
+    /// Set the active adapter for this HostDispatcher
+    pub fn set_active_adapter(&mut self, adapter_id: String) -> fidl_fuchsia_bluetooth::Status {
+        self.state.write().set_active_adapter(adapter_id)
+    }
+
+    pub fn set_pairing_delegate(&mut self, delegate: Option<PairingDelegateProxy>) -> bool {
+        self.state.write().set_pairing_delegate(delegate)
+    }
+
+    pub async fn start_discovery(
+        &mut self,
+    ) -> fidl::Result<(
+        fidl_fuchsia_bluetooth::Status,
+        Option<Arc<DiscoveryRequestToken>>,
+    )> {
+        let strong_current_token = self
+            .state
+            .read()
+            .discovery
+            .as_ref()
+            .and_then(|token| token.upgrade());
+        if let Some(token) = strong_current_token {
+            return Ok((bt_fidl_status!(), Some(Arc::clone(&token))));
+        }
+
+        match await!(self.get_active_adapter())? {
+            Some(adapter) => {
+                let weak_adapter = Arc::downgrade(&adapter);
+                let resp = await!(adapter.write().start_discovery())?;
+                match resp.error {
+                    Some(_) => Ok((resp, None)),
+                    None => {
+                        let token = Arc::new(DiscoveryRequestToken { adap: weak_adapter });
+                        self.state.write().discovery = Some(Arc::downgrade(&token));
+                        Ok((resp, Some(token)))
+                    }
+                }
+            }
+            None => Ok((
+                bt_fidl_status!(BluetoothNotAvailable, "No Adapter found"),
+                None,
+            )),
+        }
+    }
+
+    pub async fn set_discoverable(
+        &mut self,
+    ) -> fidl::Result<(
+        fidl_fuchsia_bluetooth::Status,
+        Option<Arc<DiscoverableRequestToken>>,
+    )> {
+        let strong_current_token = self
+            .state
+            .read()
+            .discoverable
+            .as_ref()
+            .and_then(|token| token.upgrade());
+        if let Some(token) = strong_current_token {
+            return Ok((bt_fidl_status!(), Some(Arc::clone(&token))));
+        }
+
+        match await!(self.get_active_adapter())? {
+            Some(adapter) => {
+                let weak_adapter = Arc::downgrade(&adapter);
+                let resp = await!(adapter.write().set_discoverable(true))?;
+                match resp.error {
+                    Some(_) => Ok((resp, None)),
+                    None => {
+                        let token = Arc::new(DiscoverableRequestToken { adap: weak_adapter });
+                        self.state.write().discoverable = Some(Arc::downgrade(&token));
+                        Ok((resp, Some(token)))
+                    }
+                }
+            }
+            None => Ok((
+                bt_fidl_status!(BluetoothNotAvailable, "No Adapter found"),
+                None,
+            )),
+        }
+    }
+
+    pub async fn connect_le_central(&mut self) -> fidl::Result<Option<CentralProxy>> {
+        let adapter = await!(self.on_adapters_found())?;
+        let mut adapter = adapter.state.write();
+        match adapter.get_active_host() {
+            Some(host) => host.write().connect_le_central().map(|central| Some(central)),
+            None => Ok(None),
+        }
+    }
+
+    pub async fn connect(
+        &mut self, device_id: String,
+    ) -> fidl::Result<fidl_fuchsia_bluetooth::Status> {
+        let central = await!(self.connect_le_central())?;
+        let central = match central {
+            Some(c) => c,
+            None => return Ok(bt_fidl_status!(BluetoothNotAvailable, "No Adapter found"))
+        };
+        let (service_local, service_remote) = fidl::endpoints::create_proxy().unwrap();
+        let connected = await!(central.connect_peripheral(device_id.as_str(), service_remote));
+        // TODO(NET-1092): We want this as a host.fidl API
+        match await!(self.get_active_adapter())? {
+            Some(adapter) => {
+                adapter
+                    .write()
+                    .store_gatt(device_id, central, service_local);
+                connected
+            }
+            None => Ok(bt_fidl_status!(BluetoothNotAvailable, "Adapter went away")),
+        }
+    }
+
+    pub async fn forget(
+        &mut self, device_id: String,
+    ) -> fidl::Result<fidl_fuchsia_bluetooth::Status> {
+        // TODO(NET-1148): Implement correctly
+        let id = device_id.clone();
+        let adapter = await!(self.get_active_adapter())?;
+        let status = match adapter {
+            Some(adapter) => {
+                adapter.write().rm_gatt(device_id);
+                bt_fidl_status!()
+            }
+            None => bt_fidl_status!(BluetoothNotAvailable, "Adapter went away"),
+        };
+        let event = &self.state.read().bonding_events;
+        match event {
+            Some(events) => {
+                let _res = events
+                    .send_on_delete_bond(id.as_str())
+                    .map_err(|e| fx_log_err!("Failed to send device updated event: {:?}", e));
+                Ok(status)
+            }
+            None => Ok(bt_fidl_status!(BluetoothNotAvailable, "Adapter went away")),
+        }
+    }
+
+    pub async fn disconnect(
+        &mut self, device_id: String,
+    ) -> fidl::Result<fidl_fuchsia_bluetooth::Status> {
+        let adapter = await!(self.get_active_adapter())?;
+        match adapter {
+            Some(adapter) => await!(adapter.write().rm_gatt(device_id)),
+            None => Ok(bt_fidl_status!(BluetoothNotAvailable, "Adapter went away")),
+        }
+    }
+
+    pub async fn get_active_adapter(&mut self) -> fidl::Result<Option<Arc<RwLock<HostDevice>>>> {
+        let adapter = await!(self.on_adapters_found())?;
+        let mut wstate = adapter.state.write();
+        Ok(wstate.get_active_host())
+    }
+
+    pub async fn get_adapters(&mut self) -> fidl::Result<Vec<AdapterInfo>> {
+        let _ = await!(self.on_adapters_found());
+        let mut result = vec![];
+        for host in self.state.read().host_devices.values() {
+            let host = host.read();
+            result.push(util::clone_host_info(host.get_info()));
+        }
+        Ok(result)
+    }
+
+    pub async fn request_host_service(mut self, chan: fasync::Channel, service: HostService) {
+        let adapter = await!(self.get_active_adapter());
+        match adapter {
+            Ok(Some(adapter)) => {
+                let adapter = adapter.read();
+                let host = adapter.get_host();
+                match service {
+                    HostService::LeCentral => {
+                        let remote = ServerEnd::<CentralMarker>::new(chan.into());
+                        let _ = host.request_low_energy_central(remote);
+                    }
+                    HostService::LePeripheral => {
+                        let remote = ServerEnd::<PeripheralMarker>::new(chan.into());
+                        let _ = host.request_low_energy_peripheral(remote);
+                    }
+                    HostService::LeGatt => {
+                        let remote = ServerEnd::<Server_Marker>::new(chan.into());
+                        let _ = host.request_gatt_server_(remote);
+                    }
+                    HostService::Profile => {
+                        let remote = ServerEnd::<ProfileMarker>::new(chan.into());
+                        let _ = host.request_profile(remote);
+                    }
+                }
+            },
+            Ok(None) => eprintln!("Failed to spawn, no active adapter"),
+            Err(e) => eprintln!("Failed to spawn, error resolving adapter {:?}", e),
+        }
+    }
+
+    pub fn set_io_capability(&mut self, input: InputCapabilityType, output: OutputCapabilityType) {
+        let mut state = self.state.write();
+        state.input = input;
+        state.output = output;
+    }
+
+    pub fn add_event_listener(&mut self, handle: ControlControlHandle) {
+        self.state.write().event_listeners.push(handle);
+    }
+
+    pub fn event_listeners(&self) -> Vec<ControlControlHandle> {
+        self.state.read().event_listeners.clone()
+    }
+
+    pub fn set_bonding_listener(&mut self, handle: Option<BondingControlHandle>) {
+        self.state.write().bonding_events = handle;
+    }
+
+    pub fn bonding_listener(&self) -> Option<BondingControlHandle> {
+        self.state.read().bonding_events.clone()
+    }
+
+    /// Returns the current pairing delegate proxy if it exists and has not been closed. Clears the
+    /// if the handle is closed.
+    pub fn pairing_delegate(&mut self) -> Option<PairingDelegateProxy> {
+        self.state.write().pairing_delegate()
+    }
 }
 
 /// A future that completes when at least one adapter is available.
 #[must_use = "futures do nothing unless polled"]
 struct OnAdaptersFound {
-    hd: Arc<RwLock<HostDispatcher>>,
+    hd: HostDispatcher,
     waker_key: Option<usize>,
 }
 
 impl OnAdaptersFound {
     // Constructs an OnAdaptersFound that completes at the latest after HOST_INIT_TIMEOUT seconds.
-    fn new(
-        hd: Arc<RwLock<HostDispatcher>>,
-    ) -> impl Future<Output = fidl::Result<Arc<RwLock<HostDispatcher>>>> {
+    fn new(hd: HostDispatcher) -> impl Future<Output = fidl::Result<HostDispatcher>> {
         OnAdaptersFound {
             hd: hd.clone(),
             waker_key: None,
@@ -444,10 +491,10 @@
             Duration::from_seconds(HOST_INIT_TIMEOUT).after_now(),
             move || {
                 {
-                    let mut hd = hd.write();
-                    if hd.host_devices.len() == 0 {
+                    let mut inner = hd.state.write();
+                    if inner.host_devices.len() == 0 {
                         fx_log_info!("No bt-host devices found");
-                        hd.resolve_host_requests();
+                        inner.resolve_host_requests();
                     }
                 }
                 Ok(hd)
@@ -457,7 +504,7 @@
 
     fn remove_waker(&mut self) {
         if let Some(key) = self.waker_key {
-            self.hd.write().host_requests.remove(key);
+            self.hd.state.write().host_requests.remove(key);
         }
         self.waker_key = None;
     }
@@ -472,13 +519,13 @@
 impl Unpin for OnAdaptersFound {}
 
 impl Future for OnAdaptersFound {
-    type Output = fidl::Result<Arc<RwLock<HostDispatcher>>>;
+    type Output = fidl::Result<HostDispatcher>;
 
     fn poll(mut self: ::std::pin::PinMut<Self>, ctx: &mut task::Context) -> Poll<Self::Output> {
-        if self.hd.read().host_devices.len() == 0 {
+        if self.hd.state.read().host_devices.len() == 0 {
             let hd = self.hd.clone();
             if self.waker_key.is_none() {
-                self.waker_key = Some(hd.write().host_requests.insert(ctx.waker().clone()));
+                self.waker_key = Some(hd.state.write().host_requests.insert(ctx.waker().clone()));
             }
             Poll::Pending
         } else {
@@ -490,7 +537,7 @@
 
 /// Adds an adapter to the host dispatcher. Called by the watch_hosts device
 /// watcher
-async fn add_adapter(hd: Arc<RwLock<HostDispatcher>>, host_path: PathBuf) -> Result<(), Error> {
+async fn add_adapter(hd: HostDispatcher, host_path: PathBuf) -> Result<(), Error> {
     fx_log_info!("Adding Adapter: {:?}", host_path);
 
     // Connect to the host device.
@@ -517,38 +564,34 @@
     let delegate_ptr = fidl::endpoints::ClientEnd::<PairingDelegateMarker>::new(delegate_remote);
     host_device
         .read()
-        .set_host_pairing_delegate(hd.read().input, hd.read().output, delegate_ptr);
+        .set_host_pairing_delegate(hd.state.read().input, hd.state.read().output, delegate_ptr);
     fasync::spawn(
         services::start_pairing_delegate(hd.clone(), delegate_local)
             .unwrap_or_else(|e| eprintln!("Failed to spawn {:?}", e)),
     );
-
     fx_log_info!("Host added: {:?}", host_device.read().get_info().identifier);
-    hd.write().add_host(id, host_device.clone());
+    hd.state.write().add_host(id, host_device.clone());
 
     // Notify Control interface clients about the new device.
     // TODO(armansito): This layering isn't quite right. It's better to do this in
     // HostDispatcher::add_host instead.
-    for listener in hd.read().event_listeners.iter() {
+    for listener in hd.state.read().event_listeners.iter() {
         let _res =
             listener.send_on_adapter_updated(&mut clone_host_info(host_device.read().get_info()));
     }
 
     // Resolve pending adapter futures.
-    hd.write().resolve_host_requests();
+    hd.state.write().resolve_host_requests();
 
     // Start listening to Host interface events.
     await!(host_device::run(hd.clone(), host_device.clone()))
         .map_err(|_| BTError::new("Host interface event stream error").into())
 }
 
-pub fn rm_adapter(
-    hd: Arc<RwLock<HostDispatcher>>,
-    host_path: PathBuf,
-) -> impl Future<Output = Result<(), Error>> {
+pub fn rm_adapter(hd: HostDispatcher, host_path: PathBuf) -> Result<(), Error> {
     fx_log_info!("Host removed: {:?}", host_path);
 
-    let mut hd = hd.write();
+    let mut hd = hd.state.write();
     let active_id = hd.active_id.clone();
 
     // Get the host IDs that match |host_path|.
@@ -574,40 +617,44 @@
         let _ = hd.get_active_id();
     }
 
-    future::ready(Ok(()))
+    Ok(())
 }
 
-pub fn watch_hosts(hd: Arc<RwLock<HostDispatcher>>) -> impl Future<Output = Result<(), Error>> {
+fn bluetooth_device_path(msg: &WatchMessage) -> PathBuf {
+    PathBuf::from(format!(
+        "{}/{}",
+        BT_HOST_DIR,
+        msg.filename.to_string_lossy()
+    ))
+}
+
+pub async fn watch_hosts(hd: HostDispatcher) -> Result<(), Error> {
     let dev = File::open(&BT_HOST_DIR);
     let watcher = vfs_watcher::Watcher::new(&dev.unwrap()).unwrap();
-    watcher
-        .try_for_each(move |msg| {
-            let path = PathBuf::from(format!(
-                "{}/{}",
-                BT_HOST_DIR,
-                msg.filename.to_string_lossy()
-            ));
-            match msg.event {
-                vfs_watcher::WatchEvent::EXISTING | vfs_watcher::WatchEvent::ADD_FILE => {
-                    fx_log_info!("Adding device from {:?}", path);
-                    Left(Left(add_adapter(hd.clone(), path).map_err(|e| {
-                        io::Error::new(io::ErrorKind::Other, e.to_string())
-                    })))
-                }
-                vfs_watcher::WatchEvent::REMOVE_FILE => {
-                    fx_log_info!("Removing device from {:?}", path);
-                    Left(Right(rm_adapter(hd.clone(), path).map_err(|e| {
-                        io::Error::new(io::ErrorKind::Other, e.to_string())
-                    })))
-                }
-                vfs_watcher::WatchEvent::IDLE => {
-                    fx_log_info!("HostDispatcher is IDLE");
-                    Right(future::ready(Ok(())))
-                }
-                e => {
-                    fx_log_warn!("Unrecognized host watch event: {:?}", e);
-                    Right(future::ready(Ok(())))
-                }
-            }
-        }).map_err(|e| e.into())
+    await!(watcher.try_for_each(|msg| handle_device(hd.clone(), msg))).map_err(|e| e.into())
+}
+
+pub async fn handle_device(hd: HostDispatcher, msg: WatchMessage) -> Result<(), io::Error> {
+    let path = bluetooth_device_path(&msg);
+    match msg.event {
+        WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
+            fx_log_info!("Adding device from {:?}", path);
+            await!(
+                add_adapter(hd, path)
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))
+            )
+        }
+        WatchEvent::REMOVE_FILE => {
+            fx_log_info!("Removing device from {:?}", path);
+            rm_adapter(hd, path).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))
+        }
+        WatchEvent::IDLE => {
+            fx_log_info!("HostDispatcher is IDLE");
+            Ok(())
+        }
+        e => {
+            fx_log_warn!("Unrecognized host watch event: {:?}", e);
+            Ok(())
+        }
+    }
 }
diff --git a/bin/bluetooth/bt-gap/src/main.rs b/bin/bluetooth/bt-gap/src/main.rs
index 02fda15..6bb71a5 100644
--- a/bin/bluetooth/bt-gap/src/main.rs
+++ b/bin/bluetooth/bt-gap/src/main.rs
@@ -3,22 +3,26 @@
 // found in the LICENSE file.
 
 #![deny(warnings)]
-#![feature(async_await, await_macro, futures_api, pin, arbitrary_self_types)]
+#![feature(
+    futures_api,
+    pin,
+    arbitrary_self_types,
+    await_macro,
+    async_await
+)]
 
 use failure::{Error, ResultExt};
-use fidl::endpoints::{ServerEnd, ServiceMarker};
-use fidl_fuchsia_bluetooth_bredr::{ProfileMarker};
+use fidl::endpoints::{ServiceMarker};
+use fidl_fuchsia_bluetooth_bredr::ProfileMarker;
 use fidl_fuchsia_bluetooth_control::BondingMarker;
 use fidl_fuchsia_bluetooth_control::ControlMarker;
 use fidl_fuchsia_bluetooth_gatt::Server_Marker;
 use fidl_fuchsia_bluetooth_le::{CentralMarker, PeripheralMarker};
 use fuchsia_app::server::ServicesServer;
 use fuchsia_async as fasync;
-use fuchsia_bluetooth::{make_clones, util};
+use fuchsia_bluetooth::util;
 use fuchsia_syslog::{self as syslog, fx_log, fx_log_info};
-use futures::{future, TryFutureExt};
-use parking_lot::RwLock;
-use std::sync::Arc;
+use futures::TryFutureExt;
 
 mod services;
 
@@ -32,82 +36,54 @@
     fx_log_info!("Starting bt-gap...");
 
     let mut executor = fasync::Executor::new().context("Error creating executor")?;
-    let hd = Arc::new(RwLock::new(HostDispatcher::new()));
 
-    make_clones!(hd => host_hd, control_hd, central_hd, peripheral_hd, gatt_hd, bonding_hd, profile_hd);
+    let hd = HostDispatcher::new();
 
-    let host_watcher = watch_hosts(host_hd);
+    let host_watcher = watch_hosts(hd.clone());
+
+    let central_hd = hd.clone();
+    let bonding_hd = hd.clone();
+    let control_hd = hd.clone();
+    let peripheral_hd = hd.clone();
+    let profile_hd = hd.clone();
+    let gatt_hd = hd.clone();
 
     let server = ServicesServer::new()
         .add_service((ControlMarker::NAME, move |chan: fasync::Channel| {
-            fx_log_info!("Spawning Control Service");
-            fasync::spawn(
-                services::start_control_service(control_hd.clone(), chan)
-                    .unwrap_or_else(|e| eprintln!("Failed to spawn {:?}", e)),
-            )
+            control_service(control_hd.clone(), chan)
         })).add_service((BondingMarker::NAME, move |chan: fasync::Channel| {
-            fx_log_info!("Spawning Bonding Service");
-            fasync::spawn(
-                services::start_bonding_service(bonding_hd.clone(), chan)
-                    .unwrap_or_else(|e| eprintln!("Failed to spawn {:?}", e)),
-            )
+            bonding_service(bonding_hd.clone(), chan)
         })).add_service((CentralMarker::NAME, move |chan: fasync::Channel| {
             fx_log_info!("Connecting CentralService to Adapter");
-            fasync::spawn(
-                HostDispatcher::get_active_adapter(central_hd.clone())
-                    .and_then(move |adapter| {
-                        let remote = ServerEnd::<CentralMarker>::new(chan.into());
-                        if let Some(adapter) = adapter {
-                            let _ = adapter.read().get_host().request_low_energy_central(remote);
-                        }
-                        future::ready(Ok(()))
-                    }).unwrap_or_else(|e| eprintln!("Failed to spawn {:?}", e)),
-            )
+            fasync::spawn(central_hd.clone().request_host_service(chan, HostService::LeCentral))
         })).add_service((PeripheralMarker::NAME, move |chan: fasync::Channel| {
             fx_log_info!("Connecting Peripheral Service to Adapter");
-            fasync::spawn(
-                HostDispatcher::get_active_adapter(peripheral_hd.clone())
-                    .and_then(move |adapter| {
-                        let remote = ServerEnd::<PeripheralMarker>::new(chan.into());
-                        if let Some(adapter) = adapter {
-                            let _ = adapter
-                                .read()
-                                .get_host()
-                                .request_low_energy_peripheral(remote);
-                        }
-                        future::ready(Ok(()))
-                    }).unwrap_or_else(|e| eprintln!("Failed to spawn {:?}", e)),
-            )
+            fasync::spawn(peripheral_hd.clone().request_host_service(chan, HostService::LePeripheral))
         })).add_service((ProfileMarker::NAME, move |chan: fasync::Channel| {
             fx_log_info!("Connecting Profile Service to Adapter");
-            fasync::spawn(
-                HostDispatcher::get_active_adapter(profile_hd.clone())
-                    .and_then(move |adapter| {
-                        let remote = ServerEnd::<ProfileMarker>::new(chan.into());
-                        if let Some(adapter) = adapter {
-                            let _ = adapter
-                                .read()
-                                .get_host()
-                                .request_profile(remote);
-                        }
-                        future::ready(Ok(()))
-                    }).unwrap_or_else(|e| eprintln!("Failed to spawn {:?}", e)),
-            )
+            fasync::spawn(profile_hd.clone().request_host_service(chan, HostService::Profile))
         })).add_service((Server_Marker::NAME, move |chan: fasync::Channel| {
             fx_log_info!("Connecting Gatt Service to Adapter");
-            fasync::spawn(
-                HostDispatcher::get_active_adapter(gatt_hd.clone())
-                    .and_then(move |adapter| {
-                        let remote = ServerEnd::<Server_Marker>::new(chan.into());
-                        if let Some(adapter) = adapter {
-                            let _ = adapter.read().get_host().request_gatt_server_(remote);
-                        }
-                        future::ready(Ok(()))
-                    }).unwrap_or_else(|e| eprintln!("Failed to spawn {:?}", e)),
-            )
+            fasync::spawn(gatt_hd.clone().request_host_service(chan, HostService::LeGatt))
         })).start()?;
 
     executor
         .run_singlethreaded(server.try_join(host_watcher))
         .map(|_| ())
 }
+
+fn control_service(hd: HostDispatcher, chan: fasync::Channel) {
+    fx_log_info!("Spawning Control Service");
+    fasync::spawn(
+        services::start_control_service(hd.clone(), chan)
+            .unwrap_or_else(|e| eprintln!("Failed to spawn {:?}", e)),
+    )
+}
+
+fn bonding_service(hd: HostDispatcher, chan: fasync::Channel) {
+    fx_log_info!("Spawning Bonding Service");
+    fasync::spawn(
+        services::start_bonding_service(hd.clone(), chan)
+            .unwrap_or_else(|e| eprintln!("Failed to spawn {:?}", e)),
+    )
+}
diff --git a/bin/bluetooth/bt-gap/src/services/bonding.rs b/bin/bluetooth/bt-gap/src/services/bonding.rs
index 5854bcf..79ce720 100644
--- a/bin/bluetooth/bt-gap/src/services/bonding.rs
+++ b/bin/bluetooth/bt-gap/src/services/bonding.rs
@@ -10,28 +10,24 @@
 use fuchsia_async;
 use fuchsia_bluetooth::bt_fidl_status;
 use fuchsia_syslog::{fx_log, fx_log_info};
-use futures::{Future, TryFutureExt, TryStreamExt};
-use parking_lot::RwLock;
-use std::sync::Arc;
+use futures::{TryFutureExt, TryStreamExt};
 
-pub fn start_bonding_service(
-    hd: Arc<RwLock<HostDispatcher>>, channel: fuchsia_async::Channel,
-) -> impl Future<Output = Result<(), fidl::Error>> {
+pub async fn start_bonding_service(
+    mut hd: HostDispatcher, channel: fuchsia_async::Channel,
+) -> fidl::Result<()> {
+
     let stream = BondingRequestStream::from_channel(channel);
-    let hd = hd.clone();
-    hd.write().bonding_events = Some(stream.control_handle());
-    stream.try_for_each(move |evt| {
-        let BondingRequest::AddBondedDevices {
-            local_id,
-            bonds,
-            responder,
-        } = evt;
-        fx_log_info!("Add Bonded devices for {:?}", local_id);
-        HostDispatcher::get_active_adapter(hd.clone()).map_ok(move |host_device| {
-            if let Some(ref host_device) = host_device {
-                host_device.read().restore_bonds(bonds);
-            }
-            responder.send(&mut bt_fidl_status!()).unwrap()
-        })
-    })
+    hd.set_bonding_listener(Some(stream.control_handle()));
+    await!(stream.try_for_each(|event| handler(hd.clone(), event)))
+}
+
+pub async fn handler(mut hd: HostDispatcher, event: BondingRequest) -> fidl::Result<()> {
+    let BondingRequest::AddBondedDevices { local_id, bonds, responder } = event;
+    fx_log_info!("Add Bonded devices for {:?}", local_id);
+    await!(hd.get_active_adapter().map_ok(move |host_device| {
+        if let Some(ref host_device) = host_device {
+            host_device.read().restore_bonds(bonds);
+        }
+        responder.send(&mut bt_fidl_status!()).unwrap()
+    }))
 }
diff --git a/bin/bluetooth/bt-gap/src/services/control.rs b/bin/bluetooth/bt-gap/src/services/control.rs
index b3daf38..0abeb5f 100644
--- a/bin/bluetooth/bt-gap/src/services/control.rs
+++ b/bin/bluetooth/bt-gap/src/services/control.rs
@@ -8,178 +8,89 @@
 use fidl::endpoints::RequestStream;
 use fidl_fuchsia_bluetooth;
 use fidl_fuchsia_bluetooth_control::{ControlRequest, ControlRequestStream};
-use fuchsia_async::{self as fasync,
-                    temp::Either::{Left, Right},
-                    unsafe_many_futures};
+use fuchsia_async as fasync;
 use fuchsia_bluetooth::bt_fidl_status;
 use futures::prelude::*;
-use futures::{future, Future, FutureExt};
-use parking_lot::RwLock;
-use std::sync::Arc;
-
-struct ControlServiceState {
-    host: Arc<RwLock<HostDispatcher>>,
-    discovery_token: Option<Arc<DiscoveryRequestToken>>,
-    discoverable_token: Option<Arc<DiscoverableRequestToken>>,
-}
 
 /// Build the ControlImpl to interact with fidl messages
 /// State is stored in the HostDispatcher object
-pub fn start_control_service(
-    hd: Arc<RwLock<HostDispatcher>>, chan: fasync::Channel,
-) -> impl Future<Output = Result<(), Error>> {
-    let state = Arc::new(RwLock::new(ControlServiceState {
-        host: hd,
-        discovery_token: None,
-        discoverable_token: None,
-    }));
-
-    let mystate = state.clone();
-    let wstate = mystate.write();
-    let mut hd = wstate.host.write();
-
+pub async fn start_control_service(mut hd: HostDispatcher, chan: fasync::Channel) -> Result<(), Error> {
     let stream = ControlRequestStream::from_channel(chan);
-    hd.event_listeners.push(stream.control_handle());
+    hd.add_event_listener(stream.control_handle());
+    await!(stream.try_for_each(move |event| handler(hd.clone(), event))).map_err(|e| e.into())
+}
 
-    // TODO(bwb): Remove and replace with async/await. Used to prevent
-    // deeply nested match Either arms
-    unsafe_many_futures!(Output, [A, B, C, D, E, F, G, H, I, J, K, L, M]);
-    stream
-        .try_for_each(move |evt| match evt {
-            ControlRequest::Connect {
-                device_id,
-                responder,
-            } => {
-                let host = state.write().host.clone();
-                let fut = HostDispatcher::connect(host, device_id)
-                    .and_then(move |mut status| future::ready(responder.send(&mut status)));
-                Output::A(fut)
-            }
-            ControlRequest::SetDiscoverable {
-                discoverable,
-                responder,
-            } => {
-                let fut = if discoverable {
-                    let stateref = state.clone();
-                    Left(
-                        HostDispatcher::set_discoverable(state.read().host.clone()).and_then(
-                            move |(mut resp, token)| {
-                                stateref.write().discoverable_token = token;
-                                future::ready(responder.send(&mut resp))
-                            },
-                        ),
-                    )
-                } else {
-                    state.write().discoverable_token = None;
-                    Right(future::ready(responder.send(&mut bt_fidl_status!())))
-                };
-                Output::C(fut)
-            }
-            ControlRequest::SetIoCapabilities {
-                input,
-                output,
-                control_handle: _,
-            } => {
-                let wstate = state.write();
-                let mut hd = wstate.host.write();
-                hd.input = input;
-                hd.output = output;
-                Output::E(future::ready(Ok(())))
-            }
-            ControlRequest::Forget {
-                device_id,
-                responder,
-            } => {
-                let host = state.write().host.clone();
-                let fut = HostDispatcher::forget(host, device_id)
-                    .and_then(move |mut status| future::ready(responder.send(&mut status)));
-                Output::L(fut)
-            }
-            ControlRequest::Disconnect {
-                device_id,
-                responder,
-            } => {
-                let host = state.write().host.clone();
-                // TODO work with classic as well
-                let fut = HostDispatcher::disconnect(host, device_id)
-                    .and_then(move |mut status| future::ready(responder.send(&mut status)));
-                Output::M(fut)
-            }
-            ControlRequest::GetKnownRemoteDevices { .. } => Output::K(future::ready(Ok(()))),
-            ControlRequest::IsBluetoothAvailable { responder } => {
-                let rstate = state.read();
-                let mut hd = rstate.host.write();
-                let is_available = hd.get_active_adapter_info().is_some();
-                let _ = responder.send(is_available);
-                Output::F(future::ready(Ok(())))
-            }
-            ControlRequest::SetPairingDelegate {
-                delegate,
-                responder,
-            } => {
-                let mut status = false;
-                let mut wstate = state.write();
-                if let Some(delegate) = delegate {
-                    if let Ok(proxy) = delegate.into_proxy() {
-                        status = wstate.host.write().set_pairing_delegate(Some(proxy));
-                    }
-                } else {
-                    status = wstate.host.write().set_pairing_delegate(None);
+async fn handler(
+    mut hd: HostDispatcher, event: ControlRequest,
+) -> Result<(), fidl::Error> {
+    match event {
+        ControlRequest::Connect { device_id, responder } => {
+            let mut status = await!(hd.connect(device_id))?;
+            responder.send(&mut status)
+        }
+        ControlRequest::SetDiscoverable { discoverable, responder } => {
+            let (mut resp, _) = if discoverable {
+                await!(hd.set_discoverable())?
+            } else {
+                (bt_fidl_status!(), None)
+            };
+            responder.send(&mut resp)
+        }
+        ControlRequest::SetIoCapabilities { input, output, control_handle: _ } => {
+            hd.set_io_capability(input, output); 
+            Ok(())
+        }
+        ControlRequest::Forget { device_id, responder } => {
+            let mut status = await!(hd.forget(device_id))?;
+            responder.send(&mut status)
+        }
+        ControlRequest::Disconnect { device_id, responder } => {
+            // TODO work with classic as well
+            let mut status = await!(hd.disconnect(device_id))?;
+            responder.send(&mut status)
+        }
+        ControlRequest::GetKnownRemoteDevices { .. } => Ok(()),
+        ControlRequest::IsBluetoothAvailable { responder } => {
+            let is_available = hd.get_active_adapter_info().is_some();
+            let _ = responder.send(is_available);
+            Ok(())
+        }
+        ControlRequest::SetPairingDelegate { delegate, responder } => {
+            let mut status = match delegate.map(|d| d.into_proxy()) {
+                Some(Ok(proxy)) => hd.set_pairing_delegate(Some(proxy)),
+                Some(Err(_ignored)) => return Ok(()), // TODO - should we return this error?
+                None => hd.set_pairing_delegate(None)
+            };
+            let _ = responder.send(status);
+            Ok(())
+        }
+        ControlRequest::GetAdapters { responder } => {
+            let mut resp = await!(hd.get_adapters())?;
+            responder.send(Some(&mut resp.iter_mut()))
+        }
+        ControlRequest::SetActiveAdapter { identifier, responder } => {
+            let mut success = hd.set_active_adapter(identifier.clone());
+            let _ = responder.send(&mut success);
+            Ok(())
+        }
+        ControlRequest::GetActiveAdapterInfo { responder } => {
+            let mut adap = hd.get_active_adapter_info();
+            let _ = responder.send(adap.as_mut().map(OutOfLine));
+            Ok(())
+        }
+        ControlRequest::RequestDiscovery { discovery, responder } => {
+            if discovery {
+                if let Ok((mut resp, _)) = await!(hd.start_discovery()) {
+                    let _ = responder.send(&mut resp);
                 }
-                let _ = responder.send(status);
-                Output::D(future::ready(Ok(())))
+                Ok(())
+            } else {
+                responder.send(&mut bt_fidl_status!())
             }
-            ControlRequest::GetAdapters { responder } => {
-                let wstate = state.write();
-                let mut hd = wstate.host.clone();
-                let fut = HostDispatcher::get_adapters(&mut hd)
-                    .map_ok(move |mut resp| responder.send(Some(&mut resp.iter_mut())))
-                    .map(|_| Ok(()));
-                Output::H(fut)
-            }
-            ControlRequest::SetActiveAdapter {
-                identifier,
-                responder,
-            } => {
-                let wstate = state.write();
-                let mut success = wstate.host.write().set_active_adapter(identifier.clone());
-                let _ = responder.send(&mut success);
-                Output::I(future::ready(Ok(())))
-            }
-            ControlRequest::GetActiveAdapterInfo { responder } => {
-                let wstate = state.write();
-                let mut hd = wstate.host.write();
-                let mut adap = hd.get_active_adapter_info();
-
-                let _ = responder.send(adap.as_mut().map(OutOfLine));
-                Output::J(future::ready(Ok(())))
-            }
-            ControlRequest::RequestDiscovery {
-                discovery,
-                responder,
-            } => {
-                let fut = if discovery {
-                    let stateref = state.clone();
-                    Left(
-                        HostDispatcher::start_discovery(state.read().host.clone())
-                            .map_ok(move |(mut resp, token)| {
-                                stateref.write().discovery_token = token;
-                                responder.send(&mut resp)
-                            }).map(|_| Ok(())),
-                    )
-                } else {
-                    state.write().discovery_token = None;
-                    Right(future::ready(responder.send(&mut bt_fidl_status!())))
-                };
-                Output::G(fut)
-            }
-            ControlRequest::SetName { name, responder } => {
-                let wstate = state.write();
-                Output::B(
-                    HostDispatcher::set_name(wstate.host.clone(), name)
-                        .and_then(move |mut resp| future::ready(Ok(responder.send(&mut resp))))
-                        .map(|_| Ok(())),
-                )
-            }
-        }).map(|_| Ok(()))
+        }
+        ControlRequest::SetName { name, responder } => {
+            let mut resp = await!(hd.set_name(name))?;
+            responder.send(&mut resp)
+        }
+    }
 }
diff --git a/bin/bluetooth/bt-gap/src/services/pairing_delegate.rs b/bin/bluetooth/bt-gap/src/services/pairing_delegate.rs
index da728a2..9b57fdb 100644
--- a/bin/bluetooth/bt-gap/src/services/pairing_delegate.rs
+++ b/bin/bluetooth/bt-gap/src/services/pairing_delegate.rs
@@ -5,74 +5,77 @@
 use crate::host_dispatcher::HostDispatcher;
 use fidl;
 use fidl::endpoints::RequestStream;
-use fidl_fuchsia_bluetooth_control::{PairingDelegateRequest, PairingDelegateRequestStream};
-use fuchsia_async::{self as fasync,
-                    temp::Either::{Left, Right}};
+use fidl_fuchsia_bluetooth_control::*;
+use fuchsia_async as fasync;
 use fuchsia_syslog::{fx_log, fx_log_warn};
-use futures::future;
-use futures::{Future, TryFutureExt, TryStreamExt};
-use parking_lot::RwLock;
-use std::sync::Arc;
+use futures::{Future, TryStreamExt};
 
-// Number of concurrent requests allowed to the pairing delegate
-// at a single time
-const MAX_CONCURRENT: usize = 100;
+// Number of concurrent requests allowed to the pairing delegate at a single time
+const MAX_CONCURRENT_REQUESTS: usize = 100;
+
+async fn handler(
+    pd: Option<PairingDelegateProxy>, event: PairingDelegateRequest,
+) -> fidl::Result<()> {
+    match pd {
+        Some(pd) => match event {
+            PairingDelegateRequest::OnPairingRequest { device, method, displayed_passkey, responder } =>
+                await!(handle_pairing_request( pd, device, method, displayed_passkey, responder)),
+            PairingDelegateRequest::OnPairingComplete { device_id, status, control_handle: _ } =>
+                handle_pairing_complete(pd, device_id, status),
+            PairingDelegateRequest::OnRemoteKeypress { device_id, keypress, control_handle: _, } =>
+                handle_remote_keypress(pd, device_id, keypress),
+        },
+        None => match event {
+            PairingDelegateRequest::OnPairingRequest { device: _, method: _, displayed_passkey: _, responder } => {
+                fx_log_warn!("Rejected pairing due to no upstream pairing delegate");
+                let _ = responder.send(false, None);
+                Ok(())
+            }
+            PairingDelegateRequest::OnPairingComplete { device_id, .. } => {
+                fx_log_warn!("Unhandled OnPairingComplete for device '{:?}': No PairingDelegate", device_id);
+                Ok(())
+            },
+            PairingDelegateRequest::OnRemoteKeypress { device_id, .. } => {
+                fx_log_warn!("Unhandled OnRemoteKeypress for device '{:?}': No PairingDelegate", device_id);
+                Ok(())
+            },
+        },
+    }
+}
+
+async fn handle_pairing_request(
+    pd: PairingDelegateProxy, mut device: RemoteDevice, method: PairingMethod,
+    displayed_passkey: Option<String>, responder: PairingDelegateOnPairingRequestResponder,
+) -> fidl::Result<()> {
+    let passkey_ref = displayed_passkey.as_ref().map(|x| &**x);
+    let (status, passkey) = await!(pd.on_pairing_request(&mut device, method, passkey_ref))?;
+    let _ = responder.send(status, passkey.as_ref().map(String::as_str));
+    Ok(())
+}
+
+fn handle_pairing_complete(
+    pd: PairingDelegateProxy, device_id: String, mut status: fidl_fuchsia_bluetooth::Status,
+) -> fidl::Result<()> {
+    if pd.on_pairing_complete(device_id.as_str(), &mut status).is_err() {
+        fx_log_warn!("Failed to propagate pairing cancelled upstream");
+    };
+    Ok(())
+}
+
+fn handle_remote_keypress(
+    pd: PairingDelegateProxy, device_id: String, keypress: PairingKeypressType,
+) -> fidl::Result<()> {
+    if pd.on_remote_keypress(device_id.as_str(), keypress).is_err() {
+        fx_log_warn!("Failed to propagate pairing cancelled upstream");
+    };
+    Ok(())
+}
 
 pub fn start_pairing_delegate(
-    hd: Arc<RwLock<HostDispatcher>>, channel: fasync::Channel,
-) -> impl Future<Output = Result<(), fidl::Error>> {
+    mut hd: HostDispatcher, channel: fasync::Channel,
+) -> impl Future<Output = fidl::Result<()>> {
     let stream = PairingDelegateRequestStream::from_channel(channel);
-    let hd = hd.clone();
-    stream.try_for_each_concurrent(MAX_CONCURRENT, move |evt| match evt {
-        PairingDelegateRequest::OnPairingRequest {
-            mut device,
-            method,
-            displayed_passkey,
-            responder,
-        } => {
-            let pd = hd.write().pairing_delegate();
-            let passkey_ref = displayed_passkey.as_ref().map(|x| &**x);
-            Left(Left(
-                match pd {
-                    Some(pd) => Left(pd.on_pairing_request(&mut device, method, passkey_ref)),
-                    None => {
-                        fx_log_warn!("Rejected pairing due to no upstream pairing delegate");
-                        Right(future::ready(Ok((false, None))))
-                    }
-                }.and_then(move |(status, passkey)| {
-                    future::ready(Ok(
-                        responder.send(status, passkey.as_ref().map(String::as_str))
-                    ))
-                }).map_ok(|_| ()),
-            ))
-        }
-        PairingDelegateRequest::OnPairingComplete {
-            device_id,
-            mut status,
-            control_handle: _,
-        } => {
-            let pd = hd.read().pairing_delegate.clone();
-            if let Some(pd) = pd {
-                let res = pd.on_pairing_complete(device_id.as_str(), &mut status);
-                if res.is_err() {
-                    fx_log_warn!("Failed to propagate pairing cancelled upstream");
-                }
-            }
-            Left(Right(future::ready(Ok(()))))
-        }
-        PairingDelegateRequest::OnRemoteKeypress {
-            device_id,
-            keypress,
-            control_handle: _,
-        } => {
-            let pd = hd.read().pairing_delegate.clone();
-            if let Some(pd) = pd {
-                let res = pd.on_remote_keypress(device_id.as_str(), keypress);
-                if res.is_err() {
-                    fx_log_warn!("Failed to propagate pairing cancelled upstream");
-                }
-            }
-            Right(future::ready(Ok(())))
-        }
+    stream.try_for_each_concurrent(MAX_CONCURRENT_REQUESTS, move |event| {
+        handler(hd.pairing_delegate(), event)
     })
 }