[bt][host][fidl] Migrate to hanging-get style peer observation

- Introduced Host.WatchPeers which mimicks sys.Access.WatchPeers in its
signature and semantics.

- Removed the Host.ListPeers method and the  Host.OnDeviceUpdated and
Host.OnDeviceRemoved events in favor of Host.WatchPeers.

- Migrated all existing integration tests against bt-host to the new
API.

- Migrated existing expectations from control.RemoteDevice over to
sys.Peer.

- Migrated bt-gap to use the new API.

Bug: 35008
Test: bt-host-unittests --gtest_filter=FIDL_HostServerTest.*
      bt-integration-tests
Change-Id: Ib58c6399e6542bfededb1d27bac47663537e8b6d
diff --git a/src/connectivity/bluetooth/core/bt-gap/src/host_device.rs b/src/connectivity/bluetooth/core/bt-gap/src/host_device.rs
index 097b617..9e3c81b 100644
--- a/src/connectivity/bluetooth/core/bt-gap/src/host_device.rs
+++ b/src/connectivity/bluetooth/core/bt-gap/src/host_device.rs
@@ -3,6 +3,7 @@
 // found in the LICENSE file.
 
 use {
+    anyhow::format_err,
     fidl::endpoints::ClientEnd,
     fidl_fuchsia_bluetooth::DeviceClass,
     fidl_fuchsia_bluetooth::PeerId as FidlPeerId,
@@ -18,10 +19,10 @@
         types::{BondingData, HostInfo, Peer, PeerId},
     },
     fuchsia_syslog::{fx_log_err, fx_log_info},
-    futures::{future, Future, FutureExt, StreamExt},
+    futures::{Future, FutureExt, StreamExt},
     parking_lot::RwLock,
     pin_utils::pin_mut,
-    std::{collections::HashMap, convert::TryInto, path::PathBuf, str::FromStr, sync::Arc},
+    std::{collections::HashMap, convert::TryInto, path::PathBuf, sync::Arc},
 };
 
 use crate::types::{self, from_fidl_status, Error};
@@ -161,15 +162,24 @@
     Ok(())
 }
 
-pub async fn watch_events<H: HostListener>(
+/// Monitors updates from a bt-host device and notifies `listener`. The returned Future represents
+/// a task that never ends in successful operation and only returns in case of a failure to
+/// communicate with the bt-host device.
+pub async fn watch_events<H: HostListener + Clone>(
     listener: H,
     host: Arc<RwLock<HostDevice>>,
 ) -> types::Result<()> {
-    let handle_fidl = handle_fidl_events(listener, host.clone());
+    let handle_fidl = handle_fidl_events(listener.clone(), host.clone());
+    let watch_peers = watch_peers(listener, host.clone());
     let watch_state = watch_state(host);
     pin_mut!(handle_fidl);
+    pin_mut!(watch_peers);
     pin_mut!(watch_state);
-    future::select(handle_fidl, watch_state).await.factor_first().0
+    futures::select! {
+        res1 = handle_fidl.fuse() => res1,
+        res2 = watch_peers.fuse() => res2,
+        res3 = watch_state.fuse() => res3,
+    }
 }
 
 async fn handle_fidl_events<H: HostListener>(
@@ -177,15 +187,8 @@
     host: Arc<RwLock<HostDevice>>,
 ) -> types::Result<()> {
     let mut stream = host.read().host.take_event_stream();
-
     while let Some(event) = stream.next().await {
         match event? {
-            // TODO(613): Add integration test for this.
-            HostEvent::OnDeviceUpdated { device } => listener.on_peer_updated(device.try_into()?),
-            // TODO(814): Add integration test for this.
-            HostEvent::OnDeviceRemoved { identifier } => {
-                listener.on_peer_removed(PeerId::from_str(&identifier)?)
-            }
             HostEvent::OnNewBondingData { data } => {
                 fx_log_info!("Received bonding data");
                 let data: BondingData = match data.try_into() {
@@ -201,7 +204,23 @@
             }
         };
     }
-    Ok(())
+    Err(types::Error::InternalError(format_err!("Host FIDL event stream terminated")))
+}
+
+async fn watch_peers<H: HostListener + Clone>(
+    mut listener: H,
+    host: Arc<RwLock<HostDevice>>,
+) -> types::Result<()> {
+    let proxy = host.read().host.clone();
+    loop {
+        let (updated, removed) = proxy.watch_peers().await?;
+        for peer in updated.into_iter() {
+            listener.on_peer_updated(peer.try_into()?);
+        }
+        for id in removed.into_iter() {
+            listener.on_peer_removed(id.into());
+        }
+    }
 }
 
 async fn watch_state(host: Arc<RwLock<HostDevice>>) -> types::Result<()> {
diff --git a/src/connectivity/bluetooth/core/bt-gap/src/host_dispatcher.rs b/src/connectivity/bluetooth/core/bt-gap/src/host_dispatcher.rs
index 6218a6d..bce8f9b 100644
--- a/src/connectivity/bluetooth/core/bt-gap/src/host_dispatcher.rs
+++ b/src/connectivity/bluetooth/core/bt-gap/src/host_dispatcher.rs
@@ -684,11 +684,8 @@
             .map_err(|e| e.as_failure())?;
 
         // Assign the name that is currently assigned to the HostDispatcher as the local name.
-        host_device
-            .read()
-            .set_name(self.state.read().name.clone())
-            .await
-            .map_err(|e| e.as_failure())?;
+        let fut = host_device.read().set_name(self.state.read().name.clone());
+        fut.await.map_err(|e| e.as_failure())?;
 
         let (gatt_server_proxy, remote_gatt_server) = fidl::endpoints::create_proxy()?;
         host_device.read().get_host().request_gatt_server_(remote_gatt_server)?;
@@ -698,11 +695,8 @@
         host_device.read().enable_privacy(true).map_err(|e| e.as_failure())?;
 
         // TODO(845): Only the active host should be made connectable and scanning in the background.
-        host_device
-            .read()
-            .set_connectable(true)
-            .await
-            .map_err(|_| format_err!("failed to set connectable"))?;
+        let fut = host_device.read().set_connectable(true);
+        fut.await.map_err(|_| format_err!("failed to set connectable"))?;
         host_device
             .read()
             .enable_background_scan(true)
@@ -719,6 +713,8 @@
         fasync::spawn(host_device::watch_events(self.clone(), host_device.clone()).map(|r| {
             r.unwrap_or_else(|err| {
                 fx_log_warn!("Error handling host event: {:?}", err);
+                // TODO(fxb/44180): This should probably remove the bt-host since termination of the
+                // `watch_events` task indicates that it no longer functions properly.
             })
         }));
 
diff --git a/src/connectivity/bluetooth/core/bt-host/fidl/helpers.cc b/src/connectivity/bluetooth/core/bt-host/fidl/helpers.cc
index 8dc4734..ecfc8a39 100644
--- a/src/connectivity/bluetooth/core/bt-host/fidl/helpers.cc
+++ b/src/connectivity/bluetooth/core/bt-host/fidl/helpers.cc
@@ -31,16 +31,32 @@
 namespace fidl_helpers {
 namespace {
 
-// TODO(36378): Add an AddressToFidl helper with a backend that can be used for both HostInfo and
-// Peer types.
-fbt::Address ControllerAddressToFidl(const bt::DeviceAddressBytes& input) {
+fbt::AddressType AddressTypeToFidl(bt::DeviceAddress::Type type) {
+  switch (type) {
+    case bt::DeviceAddress::Type::kBREDR:
+      [[fallthrough]];
+    case bt::DeviceAddress::Type::kLEPublic:
+      return fbt::AddressType::PUBLIC;
+    case bt::DeviceAddress::Type::kLERandom:
+      [[fallthrough]];
+    case bt::DeviceAddress::Type::kLEAnonymous:
+      return fbt::AddressType::RANDOM;
+  }
+  return fbt::AddressType::PUBLIC;
+}
+
+fbt::Address AddressToFidl(fbt::AddressType type, const bt::DeviceAddressBytes& value) {
   fbt::Address output;
-  output.type = fbt::AddressType::PUBLIC;
+  output.type = type;
   bt::MutableBufferView value_dst(output.bytes.data(), output.bytes.size());
-  value_dst.Write(input.bytes());
+  value_dst.Write(value.bytes());
   return output;
 }
 
+fbt::Address AddressToFidl(const bt::DeviceAddress& input) {
+  return AddressToFidl(AddressTypeToFidl(input.type()), input.value());
+}
+
 fctrl::TechnologyType TechnologyTypeToFidlDeprecated(bt::gap::TechnologyType type) {
   switch (type) {
     case bt::gap::TechnologyType::kLowEnergy:
@@ -136,6 +152,12 @@
   return result;
 }
 
+fbt::DeviceClass DeviceClassToFidl(bt::DeviceClass input) {
+  auto bytes = input.bytes();
+  fbt::DeviceClass output{static_cast<uint32_t>(bytes[0] | (bytes[1] << 8) | (bytes[2] << 16))};
+  return output;
+}
+
 }  // namespace
 
 std::optional<bt::PeerId> PeerIdFromString(const std::string& id) {
@@ -332,13 +354,47 @@
   fsys::HostInfo info;
   info.set_id(fbt::Id{adapter.identifier().value()});
   info.set_technology(TechnologyTypeToFidl(adapter.state().type()));
-  info.set_address(ControllerAddressToFidl(adapter.state().controller_address()));
+  info.set_address(AddressToFidl(fbt::AddressType::PUBLIC, adapter.state().controller_address()));
   info.set_local_name(adapter.state().local_name());
   info.set_discoverable(adapter.IsDiscoverable());
   info.set_discovering(adapter.IsDiscovering());
   return info;
 }
 
+fsys::Peer PeerToFidl(const bt::gap::Peer& peer) {
+  fsys::Peer output;
+  output.set_id(fbt::PeerId{peer.identifier().value()});
+  output.set_address(AddressToFidl(peer.address()));
+  output.set_technology(TechnologyTypeToFidl(peer.technology()));
+  output.set_connected(peer.connected());
+  output.set_bonded(peer.bonded());
+
+  if (peer.name()) {
+    output.set_name(*peer.name());
+  }
+
+  bt::gap::AdvertisingData adv;
+  if (peer.le() && bt::gap::AdvertisingData::FromBytes(peer.le()->advertising_data(), &adv)) {
+    if (adv.appearance()) {
+      output.set_appearance(static_cast<fbt::Appearance>(le16toh(*adv.appearance())));
+    }
+    if (adv.tx_power()) {
+      output.set_tx_power(*adv.tx_power());
+    }
+  }
+  if (peer.bredr() && peer.bredr()->device_class()) {
+    output.set_device_class(DeviceClassToFidl(*peer.bredr()->device_class()));
+  }
+  if (peer.rssi() != bt::hci::kRSSIInvalid) {
+    output.set_rssi(peer.rssi());
+  }
+
+  // TODO(fxb/37485): Populate service UUIDs based on GATT and SDP results as well as advertising
+  // and inquiry data.
+
+  return output;
+}
+
 fctrl::BondingData NewBondingData(const bt::gap::Adapter& adapter, const bt::gap::Peer& peer) {
   fctrl::BondingData out_data;
   out_data.identifier = peer.identifier().ToString();
diff --git a/src/connectivity/bluetooth/core/bt-host/fidl/helpers.h b/src/connectivity/bluetooth/core/bt-host/fidl/helpers.h
index 9b0aadc..a2cb298 100644
--- a/src/connectivity/bluetooth/core/bt-host/fidl/helpers.h
+++ b/src/connectivity/bluetooth/core/bt-host/fidl/helpers.h
@@ -85,6 +85,7 @@
 // fuchsia.bluetooth.sys library helpers.
 fuchsia::bluetooth::sys::TechnologyType TechnologyTypeToFidl(bt::gap::TechnologyType type);
 fuchsia::bluetooth::sys::HostInfo HostInfoToFidl(const bt::gap::Adapter& adapter);
+fuchsia::bluetooth::sys::Peer PeerToFidl(const bt::gap::Peer& peer);
 
 // Functions to convert Control FIDL library objects.
 bt::sm::PairingData PairingDataFromFidl(const fuchsia::bluetooth::control::LEData& data);
diff --git a/src/connectivity/bluetooth/core/bt-host/fidl/helpers_unittest.cc b/src/connectivity/bluetooth/core/bt-host/fidl/helpers_unittest.cc
index 2a58e75..3deb1e9 100644
--- a/src/connectivity/bluetooth/core/bt-host/fidl/helpers_unittest.cc
+++ b/src/connectivity/bluetooth/core/bt-host/fidl/helpers_unittest.cc
@@ -190,6 +190,71 @@
   EXPECT_EQ(std::nullopt, SecurityLevelFromFidl(level));
 }
 
+TEST(FidlHelpersTest, PeerToFidlMandatoryFields) {
+  // Required by PeerCache expiry functions.
+  async::TestLoop dispatcher;
+
+  bt::gap::PeerCache cache;
+  bt::DeviceAddress addr(bt::DeviceAddress::Type::kLEPublic, {0, 1, 2, 3, 4, 5});
+  auto* peer = cache.NewPeer(addr, /*connectable=*/true);
+  auto fidl = PeerToFidl(*peer);
+  ASSERT_TRUE(fidl.has_id());
+  EXPECT_EQ(peer->identifier().value(), fidl.id().value);
+  ASSERT_TRUE(fidl.has_address());
+  EXPECT_TRUE(
+      fidl::Equals(fbt::Address{fbt::AddressType::PUBLIC, {{0, 1, 2, 3, 4, 5}}}, fidl.address()));
+  ASSERT_TRUE(fidl.has_technology());
+  EXPECT_EQ(fsys::TechnologyType::LOW_ENERGY, fidl.technology());
+  ASSERT_TRUE(fidl.has_connected());
+  EXPECT_FALSE(fidl.connected());
+  ASSERT_TRUE(fidl.has_bonded());
+  EXPECT_FALSE(fidl.bonded());
+
+  EXPECT_FALSE(fidl.has_name());
+  EXPECT_FALSE(fidl.has_appearance());
+  EXPECT_FALSE(fidl.has_rssi());
+  EXPECT_FALSE(fidl.has_tx_power());
+  EXPECT_FALSE(fidl.has_device_class());
+  EXPECT_FALSE(fidl.has_services());
+}
+
+TEST(FidlHelpersTest, PeerToFidlOptionalFields) {
+  // Required by PeerCache expiry functions.
+  async::TestLoop dispatcher;
+
+  const int8_t kRssi = 5;
+  const int8_t kTxPower = 6;
+  const auto kAdv =
+      bt::CreateStaticByteBuffer(0x02, 0x01, 0x01,               // Flags: General Discoverable
+                                 0x03, 0x19, 192, 0,             // Appearance: Watch
+                                 0x02, 0x0A, 0x06,               // Tx-Power: 5
+                                 0x05, 0x09, 't', 'e', 's', 't'  // Complete Local Name: "test"
+      );
+
+  bt::gap::PeerCache cache;
+  bt::DeviceAddress addr(bt::DeviceAddress::Type::kLEPublic, {0, 1, 2, 3, 4, 5});
+  auto* peer = cache.NewPeer(addr, /*connectable=*/true);
+  peer->MutLe().SetAdvertisingData(kRssi, kAdv);
+  peer->MutBrEdr().SetInquiryData(bt::hci::InquiryResult{
+      bt::DeviceAddressBytes{{0, 1, 2, 3, 4, 5}}, bt::hci::PageScanRepetitionMode::kR0, 0, 0,
+      bt::DeviceClass(bt::DeviceClass::MajorClass::kPeripheral), 0});
+
+  auto fidl = PeerToFidl(*peer);
+  ASSERT_TRUE(fidl.has_name());
+  EXPECT_EQ("test", fidl.name());
+  ASSERT_TRUE(fidl.has_appearance());
+  EXPECT_EQ(fbt::Appearance::WATCH, fidl.appearance());
+  ASSERT_TRUE(fidl.has_rssi());
+  EXPECT_EQ(kRssi, fidl.rssi());
+  ASSERT_TRUE(fidl.has_tx_power());
+  EXPECT_EQ(kTxPower, fidl.tx_power());
+  ASSERT_TRUE(fidl.has_device_class());
+  EXPECT_EQ(fbt::MAJOR_DEVICE_CLASS_PERIPHERAL, fidl.device_class().value);
+
+  // TODO(fxb/37485) Add a test when this field gets populated.
+  EXPECT_FALSE(fidl.has_services());
+}
+
 }  // namespace
 }  // namespace fidl_helpers
 }  // namespace bthost
diff --git a/src/connectivity/bluetooth/core/bt-host/fidl/host_server.cc b/src/connectivity/bluetooth/core/bt-host/fidl/host_server.cc
index 0105028..52568e9 100644
--- a/src/connectivity/bluetooth/core/bt-host/fidl/host_server.cc
+++ b/src/connectivity/bluetooth/core/bt-host/fidl/host_server.cc
@@ -30,6 +30,8 @@
 
 namespace bthost {
 
+namespace fbt = fuchsia::bluetooth;
+
 using bt::PeerId;
 using bt::sm::IOCapability;
 using fidl_helpers::AddressBytesFromString;
@@ -46,6 +48,49 @@
 using fuchsia::bluetooth::control::RemoteDevice;
 using fuchsia::bluetooth::control::TechnologyType;
 
+std::pair<PeerTracker::Updated, PeerTracker::Removed> PeerTracker::ToFidl(
+    const bt::gap::PeerCache* peer_cache) {
+  PeerTracker::Updated updated_fidl;
+  for (auto& id : updated_) {
+    auto* peer = peer_cache->FindById(id);
+
+    // All ids in |updated_| are assumed to be valid as they would otherwise be in |removed_|.
+    ZX_ASSERT(peer);
+
+    updated_fidl.push_back(fidl_helpers::PeerToFidl(*peer));
+  }
+
+  PeerTracker::Removed removed_fidl;
+  for (auto& id : removed_) {
+    removed_fidl.push_back(fbt::PeerId{id.value()});
+  }
+
+  return std::make_pair(std::move(updated_fidl), std::move(removed_fidl));
+}
+
+void PeerTracker::Update(bt::PeerId id) {
+  updated_.insert(id);
+  removed_.erase(id);
+}
+
+void PeerTracker::Remove(bt::PeerId id) {
+  updated_.erase(id);
+  removed_.insert(id);
+}
+
+WatchPeersGetter::WatchPeersGetter(bt::gap::PeerCache* peer_cache) : peer_cache_(peer_cache) {
+  ZX_DEBUG_ASSERT(peer_cache_);
+}
+
+void WatchPeersGetter::Notify(std::queue<Callback> callbacks, PeerTracker peers) {
+  auto [updated, removed] = peers.ToFidl(peer_cache_);
+  while (!callbacks.empty()) {
+    auto f = std::move(callbacks.front());
+    callbacks.pop();
+    f(fidl::Clone(updated), fidl::Clone(removed));
+  }
+}
+
 HostServer::HostServer(zx::channel channel, fxl::WeakPtr<bt::gap::Adapter> adapter,
                        fbl::RefPtr<GattHost> gatt_host)
     : AdapterServerBase(adapter, this, std::move(channel)),
@@ -54,6 +99,7 @@
       requesting_discovery_(false),
       requesting_discoverable_(false),
       io_capability_(IOCapability::kNoInputNoOutput),
+      watch_peers_getter_(adapter->peer_cache()),
       weak_ptr_factory_(this) {
   ZX_DEBUG_ASSERT(gatt_host_);
 
@@ -81,6 +127,9 @@
 
   // Initialize the HostInfo getter with the initial state.
   NotifyInfoChange();
+
+  // Initialize the peer watcher with all known connectable peers that are in the cache.
+  adapter->peer_cache()->ForEach([this](const bt::gap::Peer& peer) { OnPeerUpdated(peer); });
 }
 
 HostServer::~HostServer() { Close(); }
@@ -99,14 +148,8 @@
   }
 }
 
-void HostServer::ListDevices(ListDevicesCallback callback) {
-  std::vector<RemoteDevice> fidl_devices;
-  adapter()->peer_cache()->ForEach([&fidl_devices](const bt::gap::Peer& p) {
-    if (p.connectable()) {
-      fidl_devices.push_back(fidl_helpers::NewRemoteDevice(p));
-    }
-  });
-  callback(std::vector<RemoteDevice>(std::move(fidl_devices)));
+void HostServer::WatchPeers(WatchPeersCallback callback) {
+  watch_peers_getter_.Watch(std::move(callback));
 }
 
 // TODO(35008): Add a unit test for this method.
@@ -127,8 +170,7 @@
 }
 
 // TODO(35008): Add a unit test for this method.
-void HostServer::SetDeviceClass(fuchsia::bluetooth::DeviceClass device_class,
-                                SetDeviceClassCallback callback) {
+void HostServer::SetDeviceClass(fbt::DeviceClass device_class, SetDeviceClassCallback callback) {
   // Device Class values must only contain data in the lower 3 bytes.
   if (device_class.value >= 1 << 24) {
     callback(NewFidlError(ErrorCode::INVALID_ARGUMENTS, "Can't Set Device Class"));
@@ -169,9 +211,8 @@
         }
 
         // Set up a general-discovery filter for connectable devices.
-        // NOTE(armansito): This currently has no effect since OnDeviceUpdated
-        // events are generated based on PeerCache events. |session|'s
-        // "result callback" is unused.
+        // NOTE(armansito): This currently has no effect since peer updates
+        // are driven by PeerCache events. |session|'s "result callback" is unused.
         session->filter()->set_connectable(true);
         session->filter()->SetGeneralDiscoveryFlags();
 
@@ -787,19 +828,19 @@
     return;
   }
 
-  auto fidl_device = fidl_helpers::NewRemoteDevicePtr(peer);
-  if (!fidl_device) {
-    bt_log(TRACE, "bt-host", "ignoring malformed peer update");
-    return;
-  }
-
-  this->binding()->events().OnDeviceUpdated(std::move(*fidl_device));
+  watch_peers_getter_.Transform([id = peer.identifier()](auto tracker) {
+    tracker.Update(id);
+    return tracker;
+  });
 }
 
-void HostServer::OnPeerRemoved(bt::PeerId identifier) {
+void HostServer::OnPeerRemoved(bt::PeerId id) {
   // TODO(armansito): Notify only if the peer is connectable for symmetry with
   // OnPeerUpdated?
-  this->binding()->events().OnDeviceRemoved(identifier.ToString());
+  watch_peers_getter_.Transform([id](auto tracker) {
+    tracker.Remove(id);
+    return tracker;
+  });
 }
 
 void HostServer::ResetPairingDelegate() {
diff --git a/src/connectivity/bluetooth/core/bt-host/fidl/host_server.h b/src/connectivity/bluetooth/core/bt-host/fidl/host_server.h
index b8ce0e7..77097b1 100644
--- a/src/connectivity/bluetooth/core/bt-host/fidl/host_server.h
+++ b/src/connectivity/bluetooth/core/bt-host/fidl/host_server.h
@@ -11,6 +11,7 @@
 
 #include <memory>
 #include <unordered_map>
+#include <unordered_set>
 
 #include <fbl/macros.h>
 
@@ -32,6 +33,42 @@
 
 class GattHost;
 
+// Custom hanging getter for the `WatchPeers()` method. Here we keep track of each `updated` and
+// `removed` notification per PeerId so that the hanging get contains no duplicates and removed
+// entries aren't reflected in `updated`.
+class PeerTracker {
+ public:
+  using Updated = std::vector<fuchsia::bluetooth::sys::Peer>;
+  using Removed = std::vector<fuchsia::bluetooth::PeerId>;
+
+  PeerTracker() = default;
+  PeerTracker(PeerTracker&&) = default;
+  PeerTracker& operator=(PeerTracker&&) = default;
+
+  // Returns parameters that can be used in a WatchPeers() response.
+  std::pair<Updated, Removed> ToFidl(const bt::gap::PeerCache* peer_cache);
+
+  void Update(bt::PeerId id);
+  void Remove(bt::PeerId id);
+
+ private:
+  std::unordered_set<bt::PeerId> updated_;
+  std::unordered_set<bt::PeerId> removed_;
+};
+
+class WatchPeersGetter
+    : public bt_lib_fidl::HangingGetterBase<PeerTracker,
+                                            void(PeerTracker::Updated, PeerTracker::Removed)> {
+ public:
+  explicit WatchPeersGetter(bt::gap::PeerCache* peer_cache);
+
+ protected:
+  void Notify(std::queue<Callback> callbacks, PeerTracker peers) override;
+
+ private:
+  bt::gap::PeerCache* peer_cache_;  // weak
+};
+
 // Implements the Host FIDL interface. Owns all FIDL connections that have been
 // opened through it.
 class HostServer : public AdapterServerBase<fuchsia::bluetooth::host::Host>,
@@ -44,7 +81,7 @@
   // ::fuchsia::bluetooth::host::Host overrides:
   void WatchState(WatchStateCallback callback) override;
   void SetLocalData(::fuchsia::bluetooth::control::HostData host_data) override;
-  void ListDevices(ListDevicesCallback callback) override;
+  void WatchPeers(WatchPeersCallback callback) override;
   void AddBondedDevices(::std::vector<fuchsia::bluetooth::control::BondingData> bonds,
                         AddBondedDevicesCallback callback) override;
   void SetLocalName(::std::string local_name, SetLocalNameCallback callback) override;
@@ -162,6 +199,9 @@
   // Used to drive the WatchState() method.
   bt_lib_fidl::HangingGetter<fuchsia::bluetooth::sys::HostInfo> info_getter_;
 
+  // Used to drive the WatchPeers() method.
+  WatchPeersGetter watch_peers_getter_;
+
   // Keep this as the last member to make sure that all weak pointers are
   // invalidated before other members get destroyed.
   fxl::WeakPtrFactory<HostServer> weak_ptr_factory_;
diff --git a/src/connectivity/bluetooth/core/bt-host/fidl/host_server_unittest.cc b/src/connectivity/bluetooth/core/bt-host/fidl/host_server_unittest.cc
index 3957291..56bbb7b 100644
--- a/src/connectivity/bluetooth/core/bt-host/fidl/host_server_unittest.cc
+++ b/src/connectivity/bluetooth/core/bt-host/fidl/host_server_unittest.cc
@@ -12,6 +12,7 @@
 #include "fuchsia/bluetooth/cpp/fidl.h"
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
+#include "helpers.h"
 #include "src/connectivity/bluetooth/core/bt-host/common/byte_buffer.h"
 #include "src/connectivity/bluetooth/core/bt-host/common/device_address.h"
 #include "src/connectivity/bluetooth/core/bt-host/common/test_helpers.h"
@@ -49,8 +50,8 @@
 namespace fbt = fuchsia::bluetooth;
 namespace fsys = fuchsia::bluetooth::sys;
 
-const bt::DeviceAddress kLETestAddr(bt::DeviceAddress::Type::kLEPublic, {0x01, 0, 0, 0, 0, 0});
-const bt::DeviceAddress kBrEdrTestAddr(bt::DeviceAddress::Type::kBREDR, {0x01, 0, 0, 0, 0, 0});
+const bt::DeviceAddress kLeTestAddr(bt::DeviceAddress::Type::kLEPublic, {0x01, 0, 0, 0, 0, 0});
+const bt::DeviceAddress kBredrTestAddr(bt::DeviceAddress::Type::kBREDR, {0x01, 0, 0, 0, 0, 0});
 
 class MockPairingDelegate : public fuchsia::bluetooth::control::testing::PairingDelegate_TestBase {
  public:
@@ -87,9 +88,12 @@
   void SetUp() override {
     AdapterTestFixture::SetUp();
 
-    // Create a HostServer and bind it to a local client.
-    fidl::InterfaceHandle<fuchsia::bluetooth::host::Host> host_handle;
     gatt_host_ = GattHost::CreateForTesting(dispatcher(), gatt());
+    ResetHostServer();
+  }
+
+  void ResetHostServer() {
+    fidl::InterfaceHandle<fuchsia::bluetooth::host::Host> host_handle;
     host_server_ = std::make_unique<HostServer>(host_handle.NewRequest().TakeChannel(),
                                                 adapter()->AsWeakPtr(), gatt_host_);
     host_.Bind(std::move(host_handle));
@@ -129,7 +133,7 @@
   }
 
   std::tuple<bt::gap::Peer*, FakeChannel*> ConnectFakePeer(bool connect_le = true) {
-    auto device_addr = connect_le ? kLETestAddr : kBrEdrTestAddr;
+    auto device_addr = connect_le ? kLeTestAddr : kBredrTestAddr;
     auto* peer = adapter()->peer_cache()->NewPeer(device_addr, true);
     EXPECT_TRUE(peer->temporary());
     // This is to capture the channel created during the Connection process
@@ -203,7 +207,7 @@
   auto fidl_pairing_delegate =
       SetMockPairingDelegate(InputCapabilityType::KEYBOARD, OutputCapabilityType::DISPLAY);
 
-  auto* const peer = adapter()->peer_cache()->NewPeer(kLETestAddr, /*connectable=*/true);
+  auto* const peer = adapter()->peer_cache()->NewPeer(kLeTestAddr, /*connectable=*/true);
   ASSERT_TRUE(peer);
 
   EXPECT_CALL(*fidl_pairing_delegate,
@@ -233,7 +237,7 @@
   auto fidl_pairing_delegate =
       SetMockPairingDelegate(InputCapabilityType::KEYBOARD, OutputCapabilityType::DISPLAY);
 
-  auto* const peer = adapter()->peer_cache()->NewPeer(kLETestAddr, /*connectable=*/true);
+  auto* const peer = adapter()->peer_cache()->NewPeer(kLeTestAddr, /*connectable=*/true);
   ASSERT_TRUE(peer);
 
   // This call should use PASSKEY_DISPLAY to request that the user perform peer passkey entry.
@@ -285,7 +289,7 @@
   auto fidl_pairing_delegate =
       SetMockPairingDelegate(InputCapabilityType::KEYBOARD, OutputCapabilityType::DISPLAY);
 
-  auto* const peer = adapter()->peer_cache()->NewPeer(kLETestAddr, /*connectable=*/true);
+  auto* const peer = adapter()->peer_cache()->NewPeer(kLeTestAddr, /*connectable=*/true);
   ASSERT_TRUE(peer);
 
   using OnPairingRequestCallback = FidlPairingDelegate::OnPairingRequestCallback;
@@ -514,5 +518,90 @@
   ASSERT_EQ(pair_status.error->error_code, FidlErrorCode::NOT_FOUND);
 }
 
+TEST_F(FIDL_HostServerTest, WatchPeersHangsOnFirstCallWithNoExistingPeers) {
+  // By default the peer cache contains no entries when HostServer is first constructed. The first
+  // call to WatchPeers should hang.
+  bool replied = false;
+  host_server()->WatchPeers([&](auto, auto) { replied = true; });
+  EXPECT_FALSE(replied);
+}
+
+TEST_F(FIDL_HostServerTest, WatchPeersRepliesOnFirstCallWithExistingPeers) {
+  __UNUSED auto* peer = adapter()->peer_cache()->NewPeer(kLeTestAddr, /*connectable=*/true);
+  ResetHostServer();
+
+  // By default the peer cache contains no entries when HostServer is first constructed. The first
+  // call to WatchPeers should hang.
+  bool replied = false;
+  host_server()->WatchPeers([&](auto updated, auto removed) {
+    EXPECT_EQ(1u, updated.size());
+    EXPECT_TRUE(removed.empty());
+    replied = true;
+  });
+  EXPECT_TRUE(replied);
+}
+
+TEST_F(FIDL_HostServerTest, WatchPeersStateMachine) {
+  std::optional<std::vector<fsys::Peer>> updated;
+  std::optional<std::vector<fbt::PeerId>> removed;
+
+  // Initial watch call hangs as the cache is empty.
+  host_server()->WatchPeers([&](auto updated_arg, auto removed_arg) {
+    updated = std::move(updated_arg);
+    removed = std::move(removed_arg);
+  });
+  ASSERT_FALSE(updated.has_value());
+  ASSERT_FALSE(removed.has_value());
+
+  // Adding a new peer should resolve the hanging get.
+  auto* peer = adapter()->peer_cache()->NewPeer(kLeTestAddr, /*connectable=*/true);
+  ASSERT_TRUE(updated.has_value());
+  ASSERT_TRUE(removed.has_value());
+  EXPECT_EQ(1u, updated->size());
+  EXPECT_TRUE(fidl::Equals(fidl_helpers::PeerToFidl(*peer), (*updated)[0]));
+  EXPECT_TRUE(removed->empty());
+  updated.reset();
+  removed.reset();
+
+  // The next call should hang.
+  host_server()->WatchPeers([&](auto updated_arg, auto removed_arg) {
+    updated = std::move(updated_arg);
+    removed = std::move(removed_arg);
+  });
+  ASSERT_FALSE(updated.has_value());
+  ASSERT_FALSE(removed.has_value());
+
+  // Removing the peer should resolve the hanging get.
+  auto peer_id = peer->identifier();
+  __UNUSED auto result = adapter()->peer_cache()->RemoveDisconnectedPeer(peer_id);
+  ASSERT_TRUE(updated.has_value());
+  ASSERT_TRUE(removed.has_value());
+  EXPECT_TRUE(updated->empty());
+  EXPECT_EQ(1u, removed->size());
+  EXPECT_TRUE(fidl::Equals(fbt::PeerId{peer_id.value()}, (*removed)[0]));
+}
+
+TEST_F(FIDL_HostServerTest, WatchPeersUpdatedThenRemoved) {
+  // Add then remove a peer. The watcher should only report the removal.
+  bt::PeerId id;
+  {
+    auto* peer = adapter()->peer_cache()->NewPeer(kLeTestAddr, /*connectable=*/true);
+    id = peer->identifier();
+
+    // |peer| becomes a dangling pointer after the call to RemoveDisconnectedPeer. We scoped the
+    // binding of |peer| so that it doesn't exist beyond this point.
+    __UNUSED auto result = adapter()->peer_cache()->RemoveDisconnectedPeer(id);
+  }
+
+  bool replied = false;
+  host_server()->WatchPeers([&replied, id](auto updated, auto removed) {
+    EXPECT_TRUE(updated.empty());
+    EXPECT_EQ(1u, removed.size());
+    EXPECT_TRUE(fidl::Equals(fbt::PeerId{id.value()}, removed[0]));
+    replied = true;
+  });
+  EXPECT_TRUE(replied);
+}
+
 }  // namespace
 }  // namespace bthost
diff --git a/src/connectivity/bluetooth/fidl/host.fidl b/src/connectivity/bluetooth/fidl/host.fidl
index d47ae12..e462d17 100644
--- a/src/connectivity/bluetooth/fidl/host.fidl
+++ b/src/connectivity/bluetooth/fidl/host.fidl
@@ -49,21 +49,20 @@
     /// Assigns local data to this host.
     SetLocalData(fuchsia.bluetooth.control.HostData host_data);
 
-    /// Returns a list of all known connectable devices, included those that are
-    /// currently connected and/or bonded. This list does not include
-    /// non-connectable devices such as LE broadcasters.
+    /// Monitors updates for connectable peers known to the system. Replies only when
+    /// peers have been added, modified, or removed since the most recent call to WatchPeers().
     ///
-    /// Notes:
+    /// Peers are added and updated as new information is obtained during discovery, connection
+    /// establishment, and bonding procedures.
     ///
-    /// - When used in the GAP central role (BR/EDR or LE) the listed devices are
-    /// obtained during discovery and connection procedures. While in the
-    /// peripheral role, this will contain devices that have successfully initiated
-    /// connections to this host.
+    /// Peers are removed either:
+    ///   a. explicitly via [`fuchsia.bluetooth.host.Host/Forget`], or
+    ///   b. no transmission has been received from the peer for an internally determined duration
+    ///      of time and the peer is neither connected nor bonded.
     ///
-    /// - This list contains connectable devices that are discovered or connected
-    /// via other interfaces obtained using the interface request methods declared
-    /// above.
-    ListDevices() -> (vector<fuchsia.bluetooth.control.RemoteDevice> devices);
+    /// - response `updated` Peers that were added or updated since the last call to WatchPeers().
+    /// - response `removed` Ids of peers that were removed since the last call to WatchPeers().
+    WatchPeers() -> (vector<sys.Peer> updated, vector<bt.PeerId> removed);
 
     /// Sets the local name for this host device.
     SetLocalName(string local_name) -> (fuchsia.bluetooth.Status status);
@@ -72,16 +71,13 @@
     SetDeviceClass(bt.DeviceClass device_class) -> (fuchsia.bluetooth.Status status);
 
     /// Initiates a general discovery procedure for BR/EDR and LE devices. On success, discovered
-    /// devices will be reported via OnDeviceUpdated events.
+    /// peers can be monitored using the [`fuchsia.bluetooth.host/Host.WatchPeers`] method.
     ///
     /// On the LE transport, only general-discoverable and connectable peripherals will be reported.
     ///
     /// Discovery will continue until it is terminated via StopDiscovery() or if the Host protocol
     /// channel gets closed. If the device does not support BR/EDR, only LE discovery will be
     /// performed.
-    ///
-    /// An OnDeviceUpdated event will be sent when the discovery procedures are
-    /// started.
     StartDiscovery() -> (fuchsia.bluetooth.Status status);
 
     /// Terminates discovery if one was started via StartDiscovery().
@@ -135,9 +131,9 @@
     /// enforced during service access.
     Pair(fuchsia.bluetooth.PeerId id, fuchsia.bluetooth.control.PairingOptions options) -> (fuchsia.bluetooth.Status status);
 
-    /// Deletes a peer from the Bluetooth host. If the peer is connected, it will be disconnected,
-    /// then OnDeviceUpdated will be sent. OnDeviceRemoved will be sent. `device_id` will no longer
-    /// refer to any peer, even if a device with the same address(es) is discovered again.
+    /// Deletes a peer from the Bluetooth host. If the peer is connected, it will be disconnected.
+    /// `device_id` will no longer refer to any peer, even if a device with the same address is
+    /// discovered again.
     ///
     /// Returns success after no peer exists that's identified by `device_id` (even if it didn't
     /// exist before Forget), failure if the peer specified by `device_id` could not be
@@ -171,11 +167,6 @@
 
     // ===== Events =====
 
-    /// Events that are sent when a connectable device is added, updated, or
-    /// removed as a result of connection and discovery procedures.
-    -> OnDeviceUpdated(fuchsia.bluetooth.control.RemoteDevice device);
-    -> OnDeviceRemoved(string identifier);
-
     /// Notifies when bonding data for a device has been updated.
     -> OnNewBondingData(fuchsia.bluetooth.control.BondingData data);
 
diff --git a/src/connectivity/bluetooth/lib/fuchsia-bluetooth/src/expectation.rs b/src/connectivity/bluetooth/lib/fuchsia-bluetooth/src/expectation.rs
index 1edbc71..1e8206c 100644
--- a/src/connectivity/bluetooth/lib/fuchsia-bluetooth/src/expectation.rs
+++ b/src/connectivity/bluetooth/lib/fuchsia-bluetooth/src/expectation.rs
@@ -72,43 +72,41 @@
 /// Expectations for Bluetooth Peers (i.e. Remote Devices)
 pub mod peer {
     use super::Predicate;
-    use fidl_fuchsia_bluetooth_control::{RemoteDevice, TechnologyType};
+    use {
+        crate::types::{Address, Peer, PeerId},
+        fidl_fuchsia_bluetooth_sys::TechnologyType,
+    };
 
-    pub fn name(expected_name: &str) -> Predicate<RemoteDevice> {
-        let name = Some(expected_name.to_string());
-        Predicate::<RemoteDevice>::new(
-            move |peer| peer.name == name,
-            Some(&format!("name == {}", expected_name)),
+    pub fn name(name: &str) -> Predicate<Peer> {
+        let name_owned = Some(name.to_string());
+        Predicate::<Peer>::new(
+            move |peer| peer.name == name_owned,
+            Some(&format!("name == {}", name)),
         )
     }
-    pub fn identifier(expected_ident: &str) -> Predicate<RemoteDevice> {
-        let identifier = expected_ident.to_string();
-        Predicate::<RemoteDevice>::new(
-            move |peer| peer.identifier == identifier,
-            Some(&format!("identifier == {}", expected_ident)),
-        )
+    pub fn identifier(id: PeerId) -> Predicate<Peer> {
+        Predicate::<Peer>::new(move |peer| peer.id == id, Some(&format!("peer id == {}", id)))
     }
-    pub fn address(expected_address: &str) -> Predicate<RemoteDevice> {
-        let address = expected_address.to_string();
-        Predicate::<RemoteDevice>::new(
+    pub fn address(address: Address) -> Predicate<Peer> {
+        Predicate::<Peer>::new(
             move |peer| peer.address == address,
-            Some(&format!("address == {}", expected_address)),
+            Some(&format!("address == {}", address)),
         )
     }
-    pub fn technology(tech: TechnologyType) -> Predicate<RemoteDevice> {
-        Predicate::<RemoteDevice>::new(
+    pub fn technology(tech: TechnologyType) -> Predicate<Peer> {
+        Predicate::<Peer>::new(
             move |peer| peer.technology == tech,
             Some(&format!("technology == {:?}", tech)),
         )
     }
-    pub fn connected(connected: bool) -> Predicate<RemoteDevice> {
-        Predicate::<RemoteDevice>::new(
+    pub fn connected(connected: bool) -> Predicate<Peer> {
+        Predicate::<Peer>::new(
             move |peer| peer.connected == connected,
             Some(&format!("connected == {}", connected)),
         )
     }
-    pub fn bonded(bonded: bool) -> Predicate<RemoteDevice> {
-        Predicate::<RemoteDevice>::new(
+    pub fn bonded(bonded: bool) -> Predicate<Peer> {
+        Predicate::<Peer>::new(
             move |peer| peer.bonded == bonded,
             Some(&format!("bonded == {}", bonded)),
         )
@@ -143,56 +141,57 @@
 
 #[cfg(test)]
 mod test {
-    use crate::expectation::*;
-    use fidl_fuchsia_bluetooth_control::{Appearance, RemoteDevice, TechnologyType};
+    use crate::{
+        expectation::*,
+        types::{Address, Peer, PeerId},
+    };
+    use fidl_fuchsia_bluetooth_sys::TechnologyType;
 
     const TEST_PEER_NAME: &'static str = "TestPeer";
-    const TEST_PEER_ADDRESS: &'static str = "00:00:00:00:00:01";
+    const TEST_PEER_ADDRESS: Address = Address::Public([1, 0, 0, 0, 0, 0]);
     const INCORRECT_PEER_NAME: &'static str = "IncorrectPeer";
-    const INCORRECT_PEER_ADDRESS: &'static str = "00:00:00:00:00:02";
+    const INCORRECT_PEER_ADDRESS: Address = Address::Public([2, 0, 0, 0, 0, 0]);
 
-    fn correct_name() -> Predicate<RemoteDevice> {
+    fn correct_name() -> Predicate<Peer> {
         peer::name(TEST_PEER_NAME)
     }
-    fn incorrect_name() -> Predicate<RemoteDevice> {
+    fn incorrect_name() -> Predicate<Peer> {
         peer::name(INCORRECT_PEER_NAME)
     }
-    fn correct_address() -> Predicate<RemoteDevice> {
+    fn correct_address() -> Predicate<Peer> {
         peer::address(TEST_PEER_ADDRESS)
     }
-    fn incorrect_address() -> Predicate<RemoteDevice> {
+    fn incorrect_address() -> Predicate<Peer> {
         peer::address(INCORRECT_PEER_ADDRESS)
     }
 
-    fn test_peer() -> RemoteDevice {
-        RemoteDevice {
-            name: Some(TEST_PEER_NAME.into()),
-            address: TEST_PEER_ADDRESS.into(),
+    fn test_peer() -> Peer {
+        Peer {
+            id: PeerId(1),
+            address: TEST_PEER_ADDRESS,
             technology: TechnologyType::LowEnergy,
             connected: false,
             bonded: false,
-            appearance: Appearance::Unknown,
-            identifier: "".into(),
+            name: Some(TEST_PEER_NAME.into()),
+            appearance: None,
+            device_class: None,
             rssi: None,
             tx_power: None,
-            service_uuids: vec![],
+            services: vec![],
         }
     }
 
     #[test]
     fn simple_predicate_succeeds() {
-        let predicate = Predicate::<RemoteDevice>::new(
-            move |peer| peer.name == Some(TEST_PEER_NAME.into()),
-            None,
-        );
+        let predicate =
+            Predicate::<Peer>::new(move |peer| peer.name == Some(TEST_PEER_NAME.into()), None);
         assert!(predicate.satisfied(&test_peer()));
     }
+
     #[test]
-    fn simple_incorrect_predicate_error() {
-        let predicate = Predicate::<RemoteDevice>::new(
-            move |peer| peer.name == Some("INCORRECT_NAME".into()),
-            None,
-        );
+    fn simple_incorrect_predicate_fails() {
+        let predicate =
+            Predicate::<Peer>::new(move |peer| peer.name == Some("INCORRECT_NAME".into()), None);
         assert!(!predicate.satisfied(&test_peer()));
     }
 
diff --git a/src/connectivity/bluetooth/tests/integration/src/harness/bootstrap.rs b/src/connectivity/bluetooth/tests/integration/src/harness/bootstrap.rs
index 987ef70..6a872ae 100644
--- a/src/connectivity/bluetooth/tests/integration/src/harness/bootstrap.rs
+++ b/src/connectivity/bluetooth/tests/integration/src/harness/bootstrap.rs
@@ -19,7 +19,7 @@
 
     fn init() -> BoxFuture<'static, Result<(Self, Self::Env, Self::Runner), Error>> {
         async {
-            let fake_host = ActivatedFakeHost::new("bt-hci-integration-le-0").await?;
+            let fake_host = ActivatedFakeHost::new("bt-hci-integration-bootstrap-0").await?;
             match fuchsia_component::client::connect_to_service::<BootstrapMarker>() {
                 Ok(proxy) => Ok((BootstrapHarness::new(proxy), fake_host, future::pending())),
                 Err(e) => Err(format_err!("Failed to connect to Bootstrap service: {:?}", e)),
diff --git a/src/connectivity/bluetooth/tests/integration/src/harness/control.rs b/src/connectivity/bluetooth/tests/integration/src/harness/control.rs
index e4ebd9a..143db61 100644
--- a/src/connectivity/bluetooth/tests/integration/src/harness/control.rs
+++ b/src/connectivity/bluetooth/tests/integration/src/harness/control.rs
@@ -113,10 +113,10 @@
     }
 }
 
-pub mod control_expectation {
+pub mod expectation {
     use crate::harness::control::ControlState;
     use fidl_fuchsia_bluetooth_control::RemoteDevice;
-    use fuchsia_bluetooth::expectation::{peer, Predicate};
+    use fuchsia_bluetooth::expectation::Predicate;
 
     pub fn active_host_is(id: String) -> Predicate<ControlState> {
         let msg = format!("active bt-host is {}", id);
@@ -132,16 +132,47 @@
         Predicate::new(move |state: &ControlState| !state.hosts.contains_key(&id), Some(&msg))
     }
 
-    pub fn peer_exists(p: Predicate<RemoteDevice>) -> Predicate<ControlState> {
-        let msg = format!("Peer exists satisfying {}", p.describe());
-        Predicate::new(
-            move |state: &ControlState| state.peers.iter().any(|(_, d)| p.satisfied(d)),
-            Some(&msg),
-        )
+    mod peer {
+        use super::*;
+
+        pub(crate) fn exists(p: Predicate<RemoteDevice>) -> Predicate<ControlState> {
+            let msg = format!("peer exists satisfying {}", p.describe());
+            Predicate::new(
+                move |state: &ControlState| state.peers.iter().any(|(_, d)| p.satisfied(d)),
+                Some(&msg),
+            )
+        }
+
+        pub(crate) fn with_identifier(id: &str) -> Predicate<RemoteDevice> {
+            let id_owned = id.to_string();
+            Predicate::<RemoteDevice>::new(
+                move |d| d.identifier == id_owned,
+                Some(&format!("identifier == {}", id)),
+            )
+        }
+
+        pub(crate) fn with_address(address: &str) -> Predicate<RemoteDevice> {
+            let addr_owned = address.to_string();
+            Predicate::<RemoteDevice>::new(
+                move |d| d.address == addr_owned,
+                Some(&format!("address == {}", address)),
+            )
+        }
+
+        pub(crate) fn connected(connected: bool) -> Predicate<RemoteDevice> {
+            Predicate::<RemoteDevice>::new(
+                move |d| d.connected == connected,
+                Some(&format!("connected == {}", connected)),
+            )
+        }
     }
 
     pub fn peer_connected(id: &str, connected: bool) -> Predicate<ControlState> {
-        peer_exists(peer::identifier(id).and(peer::connected(connected)))
+        peer::exists(peer::with_identifier(id).and(peer::connected(connected)))
+    }
+
+    pub fn peer_with_address(address: &str) -> Predicate<ControlState> {
+        peer::exists(peer::with_address(address))
     }
 }
 
@@ -193,9 +224,7 @@
 
     let fut = control.aux().set_active_adapter(&host);
     fut.await?;
-    control
-        .when_satisfied(control_expectation::active_host_is(host.clone()), control_timeout())
-        .await?;
+    control.when_satisfied(expectation::active_host_is(host.clone()), control_timeout()).await?;
     Ok((host, hci))
 }
 
@@ -219,10 +248,7 @@
 
         // Wait for BT-GAP to unregister the associated fake host
         self.control
-            .when_satisfied(
-                control_expectation::host_not_present(self.host.clone()),
-                control_timeout(),
-            )
+            .when_satisfied(expectation::host_not_present(self.host.clone()), control_timeout())
             .await?;
         Ok(())
     }
diff --git a/src/connectivity/bluetooth/tests/integration/src/harness/host_driver.rs b/src/connectivity/bluetooth/tests/integration/src/harness/host_driver.rs
index 6a67261..16faae8 100644
--- a/src/connectivity/bluetooth/tests/integration/src/harness/host_driver.rs
+++ b/src/connectivity/bluetooth/tests/integration/src/harness/host_driver.rs
@@ -3,28 +3,25 @@
 // found in the LICENSE file.
 
 use {
-    anyhow::{format_err, Error},
-    fidl_fuchsia_bluetooth_control::RemoteDevice,
-    fidl_fuchsia_bluetooth_host::{HostEvent, HostProxy},
+    anyhow::Error,
+    fidl_fuchsia_bluetooth_host::HostProxy,
     fidl_fuchsia_bluetooth_test::HciEmulatorProxy,
     fuchsia_async as fasync,
     fuchsia_bluetooth::{
         constants::HOST_DEVICE_DIR,
         device_watcher::DeviceWatcher,
-        error::Error as BtError,
         expectation::{
             asynchronous::{ExpectableState, ExpectableStateExt, ExpectationHarness},
             Predicate,
         },
         hci_emulator::Emulator,
         host,
-        types::HostInfo,
-        util::clone_remote_device,
+        types::{HostInfo, Peer, PeerId},
     },
     fuchsia_zircon::{Duration, DurationNum},
     futures::{
         future::{self, BoxFuture},
-        FutureExt, TryFutureExt, TryStreamExt,
+        FutureExt, TryFutureExt,
     },
     parking_lot::MappedRwLockWriteGuard,
     std::{
@@ -45,22 +42,34 @@
     TIMEOUT_SECONDS.seconds()
 }
 
-pub fn expect_remote_device(
-    test_state: &HostDriverHarness,
-    address: &str,
-    expected: &Predicate<RemoteDevice>,
-) -> Result<(), Error> {
-    let state = test_state.read();
-    let peer = state
-        .peers
-        .values()
-        .find(|dev| dev.address == address)
-        .ok_or(BtError::new(&format!("Peer with address '{}' not found", address)))?;
-    expect_true!(expected.satisfied(peer))
+// Returns a Future that resolves when the state of any RemoteDevice matches `target`.
+pub async fn expect_peer(
+    host: &HostDriverHarness,
+    target: Predicate<Peer>,
+) -> Result<HostState, Error> {
+    let fut = host.when_satisfied(
+        Predicate::<HostState>::new(
+            move |host| host.peers.iter().any(|(_, p)| target.satisfied(&p)),
+            None,
+        ),
+        timeout_duration(),
+    );
+    fut.await
+}
+
+pub async fn expect_host_state(
+    host: &HostDriverHarness,
+    target: Predicate<HostInfo>,
+) -> Result<HostState, Error> {
+    let fut = host.when_satisfied(
+        Predicate::<HostState>::new(move |host| target.satisfied(&host.host_info), None),
+        timeout_duration(),
+    );
+    fut.await
 }
 
 // Returns a future that resolves when a peer matching `id` is not present on the host.
-pub async fn expect_no_peer(host: &HostDriverHarness, id: String) -> Result<(), Error> {
+pub async fn expect_no_peer(host: &HostDriverHarness, id: PeerId) -> Result<(), Error> {
     let fut = host.when_satisfied(
         Predicate::<HostState>::new(move |host| host.peers.iter().all(|(i, _)| i != &id), None),
         timeout_duration(),
@@ -79,7 +88,13 @@
     host_info: HostInfo,
 
     // All known remote devices, indexed by their identifiers.
-    peers: HashMap<String, RemoteDevice>,
+    peers: HashMap<PeerId, Peer>,
+}
+
+impl HostState {
+    pub fn peers(&self) -> &HashMap<PeerId, Peer> {
+        &self.peers
+    }
 }
 
 impl Clone for HostState {
@@ -88,7 +103,7 @@
             emulator_state: self.emulator_state.clone(),
             host_path: self.host_path.clone(),
             host_info: self.host_info.clone(),
-            peers: self.peers.iter().map(|(k, v)| (k.clone(), clone_remote_device(v))).collect(),
+            peers: self.peers.clone(),
         }
     }
 }
@@ -115,18 +130,18 @@
     fn init() -> BoxFuture<'static, Result<(Self, Self::Env, Self::Runner), Error>> {
         async {
             let (harness, emulator) = new_host_harness().await?;
-            let host_events = handle_host_events(harness.clone())
-                .map_err(|e| e.context("Error handling host events"))
-                .err_into();
-            let watch_state = watch_host_state(harness.clone())
+            let watch_info = watch_host_info(harness.clone())
                 .map_err(|e| e.context("Error watching host state"))
                 .err_into();
-            let watch_cp = watch_controller_parameters(harness.clone())
+            let watch_peers = watch_peers(harness.clone())
+                .map_err(|e| e.context("Error watching peers"))
+                .err_into();
+            let watch_emulator_params = watch_controller_parameters(harness.clone())
                 .map_err(|e| e.context("Error watching controller parameters"))
                 .err_into();
             let path = harness.read().host_path;
 
-            let run = future::try_join3(host_events, watch_state, watch_cp)
+            let run = future::try_join3(watch_info, watch_peers, watch_emulator_params)
                 .map_ok(|((), (), ())| ())
                 .boxed();
             Ok((harness, (path, emulator), run))
@@ -178,56 +193,23 @@
     Ok((harness, emulator))
 }
 
-// Returns a Future that resolves when the state of any RemoteDevice matches `target`.
-pub async fn expect_host_peer(
-    host: &HostDriverHarness,
-    target: Predicate<RemoteDevice>,
-) -> Result<HostState, Error> {
-    let fut = host.when_satisfied(
-        Predicate::<HostState>::new(
-            move |host| host.peers.iter().any(|(_, p)| target.satisfied(p)),
-            None,
-        ),
-        timeout_duration(),
-    );
-    fut.await
-}
-
-pub async fn expect_host_state(
-    host: &HostDriverHarness,
-    target: Predicate<HostInfo>,
-) -> Result<HostState, Error> {
-    let fut = host.when_satisfied(
-        Predicate::<HostState>::new(move |host| target.satisfied(&host.host_info), None),
-        timeout_duration(),
-    );
-    fut.await
-}
-
-// Returns a Future that handles Host interface events.
-async fn handle_host_events(harness: HostDriverHarness) -> Result<(), Error> {
-    let mut events = harness.aux().proxy().take_event_stream();
-
-    while let Some(e) = events.try_next().await? {
-        match e {
-            HostEvent::OnDeviceUpdated { device } => {
-                harness.write_state().peers.insert(device.identifier.clone(), device);
-            }
-            HostEvent::OnDeviceRemoved { identifier } => {
-                harness.write_state().peers.remove(&identifier);
-            }
-            // TODO(armansito): handle other events
-            e => {
-                eprintln!("Unhandled event: {:?}", e);
-            }
+async fn watch_peers(harness: HostDriverHarness) -> Result<(), Error> {
+    loop {
+        // Clone the proxy so that the aux() lock is not held while waiting.
+        let proxy = harness.aux().proxy().clone();
+        let (updated, removed) = proxy.watch_peers().await?;
+        for peer in updated.into_iter() {
+            let peer: Peer = peer.try_into()?;
+            harness.write_state().peers.insert(peer.id.clone(), peer);
+            harness.notify_state_changed();
         }
-        harness.notify_state_changed();
+        for id in removed.into_iter() {
+            harness.write_state().peers.remove(&id.into());
+        }
     }
-
-    Ok(())
 }
 
-async fn watch_host_state(harness: HostDriverHarness) -> Result<(), Error> {
+async fn watch_host_info(harness: HostDriverHarness) -> Result<(), Error> {
     loop {
         let proxy = harness.aux().proxy().clone();
         let info = proxy.watch_state().await?;
diff --git a/src/connectivity/bluetooth/tests/integration/src/tests/bonding.rs b/src/connectivity/bluetooth/tests/integration/src/tests/bonding.rs
index 77c4e54..7555c6f 100644
--- a/src/connectivity/bluetooth/tests/integration/src/tests/bonding.rs
+++ b/src/connectivity/bluetooth/tests/integration/src/tests/bonding.rs
@@ -6,27 +6,34 @@
     anyhow::{format_err, Error},
     fidl_fuchsia_bluetooth::Status,
     fidl_fuchsia_bluetooth_control::{
-        AddressType, BondingData, LeData, Ltk, RemoteKey, SecurityProperties, TechnologyType,
+        AddressType, BondingData, LeData, Ltk, RemoteKey, SecurityProperties,
     },
-    fuchsia_bluetooth::expectation,
+    fuchsia_bluetooth::{
+        expectation,
+        types::{Address, PeerId},
+    },
     futures::TryFutureExt,
 };
 
 use crate::harness::{
+    emulator::EmulatorHarness,
     expect::expect_eq,
-    host_driver::{expect_host_peer, expect_remote_device, HostDriverHarness},
+    host_driver::{expect_peer, HostDriverHarness},
 };
 
 // TODO(armansito|xow): Add tests for BR/EDR and dual mode bond data.
 
-fn new_le_bond_data(id: &str, address: &str, name: &str, has_ltk: bool) -> BondingData {
+fn new_le_bond_data(id: &PeerId, address: &Address, name: &str, has_ltk: bool) -> BondingData {
     BondingData {
         identifier: id.to_string(),
         local_address: "AA:BB:CC:DD:EE:FF".to_string(),
         name: Some(name.to_string()),
         le: Some(Box::new(LeData {
             address: address.to_string(),
-            address_type: AddressType::LeRandom,
+            address_type: match address {
+                Address::Public(_) => AddressType::LePublic,
+                Address::Random(_) => AddressType::LeRandom,
+            },
             connection_parameters: None,
             services: vec![],
             ltk: if has_ltk {
@@ -61,98 +68,76 @@
     fut.await
 }
 
-const TEST_ID1: &str = "1234";
-const TEST_ID2: &str = "2345";
-const TEST_ADDR1: &str = "01:02:03:04:05:06";
-const TEST_ADDR2: &str = "06:05:04:03:02:01";
+const TEST_ID1: PeerId = PeerId(0x1234);
+const TEST_ID2: PeerId = PeerId(0x5678);
+const TEST_ADDR1: Address = Address::Public([6, 5, 4, 3, 2, 1]);
+const TEST_ADDR2: Address = Address::Public([1, 2, 3, 4, 5, 6]);
 const TEST_NAME1: &str = "Name1";
 const TEST_NAME2: &str = "Name2";
 
 // Tests initializing bonded LE devices.
-async fn test_add_bonded_devices_success(test_state: HostDriverHarness) -> Result<(), Error> {
-    // Devices should be initially empty.
-    let fut = test_state.aux().proxy().list_devices();
-    let devices = fut.await?;
-    expect_eq!(vec![], devices)?;
+async fn test_add_bonded_devices_success(harness: HostDriverHarness) -> Result<(), Error> {
+    // Peers should be initially empty.
+    expect_eq!(0, harness.state().peers().len())?;
 
-    let bond_data1 = new_le_bond_data(TEST_ID1, TEST_ADDR1, TEST_NAME1, true /* has LTK */);
-    let bond_data2 = new_le_bond_data(TEST_ID2, TEST_ADDR2, TEST_NAME2, true /* has LTK */);
-    let status = add_bonds(&test_state, vec![bond_data1, bond_data2]).await?;
+    let bond_data1 = new_le_bond_data(&TEST_ID1, &TEST_ADDR1, TEST_NAME1, true /* has LTK */);
+    let bond_data2 = new_le_bond_data(&TEST_ID2, &TEST_ADDR2, TEST_NAME2, true /* has LTK */);
+    let status = add_bonds(&harness, vec![bond_data1, bond_data2]).await?;
     expect_true!(status.error.is_none())?;
 
     // We should receive notifications for the newly added devices.
     let expected1 = expectation::peer::address(TEST_ADDR1)
-        .and(expectation::peer::technology(TechnologyType::LowEnergy))
+        .and(expectation::peer::technology(fidl_fuchsia_bluetooth_sys::TechnologyType::LowEnergy))
+        .and(expectation::peer::name(TEST_NAME1))
         .and(expectation::peer::bonded(true));
 
     let expected2 = expectation::peer::address(TEST_ADDR2)
-        .and(expectation::peer::technology(TechnologyType::LowEnergy))
+        .and(expectation::peer::technology(fidl_fuchsia_bluetooth_sys::TechnologyType::LowEnergy))
+        .and(expectation::peer::name(TEST_NAME2))
         .and(expectation::peer::bonded(true));
 
-    expect_host_peer(&test_state, expected1).await?;
-    expect_host_peer(&test_state, expected2).await?;
-
-    let fut = test_state.aux().proxy().list_devices();
-    let devices = fut.await?;
-    expect_eq!(2, devices.len())?;
-    expect_true!(devices.iter().any(|dev| dev.address == TEST_ADDR1))?;
-    expect_true!(devices.iter().any(|dev| dev.address == TEST_ADDR2))?;
-
-    expect_true!(devices.iter().any(|dev| dev.name == Some(TEST_NAME1.to_string())))?;
-    expect_true!(devices.iter().any(|dev| dev.name == Some(TEST_NAME2.to_string())))?;
+    expect_peer(&harness, expected1).await?;
+    expect_peer(&harness, expected2).await?;
 
     Ok(())
 }
 
-async fn test_add_bonded_devices_no_ltk_fails(test_state: HostDriverHarness) -> Result<(), Error> {
-    // Devices should be initially empty.
-    let fut = test_state.aux().proxy().list_devices();
-    let devices = fut.await?;
-    expect_eq!(vec![], devices)?;
+async fn test_add_bonded_devices_no_ltk_fails(harness: HostDriverHarness) -> Result<(), Error> {
+    // Peers should be initially empty.
+    expect_eq!(0, harness.state().peers().len())?;
 
     // Inserting a bonded device without a LTK should fail.
-    let bond_data = new_le_bond_data(TEST_ID1, TEST_ADDR1, TEST_NAME1, false /* no LTK */);
-    let status = add_bonds(&test_state, vec![bond_data]).await?;
+    let bond_data = new_le_bond_data(&TEST_ID1, &TEST_ADDR1, TEST_NAME1, false /* no LTK */);
+    let status = add_bonds(&harness, vec![bond_data]).await?;
     expect_true!(status.error.is_some())?;
-
-    let fut = test_state.aux().proxy().list_devices();
-    let devices = fut.await?;
-    expect_eq!(vec![], devices)?;
+    expect_eq!(0, harness.state().peers().len())?;
 
     Ok(())
 }
 
-async fn test_add_bonded_devices_duplicate_entry(
-    test_state: HostDriverHarness,
-) -> Result<(), Error> {
-    // Devices should be initially empty.
-    let fut = test_state.aux().proxy().list_devices();
-    let devices = fut.await?;
-    expect_eq!(vec![], devices)?;
+async fn test_add_bonded_devices_duplicate_entry(harness: HostDriverHarness) -> Result<(), Error> {
+    // Peers should be initially empty.
+    expect_eq!(0, harness.state().peers().len())?;
 
     // Initialize one entry.
-    let bond_data = new_le_bond_data(TEST_ID1, TEST_ADDR1, TEST_NAME1, true /* with LTK */);
-    let status = add_bonds(&test_state, vec![bond_data]).await?;
+    let bond_data = new_le_bond_data(&TEST_ID1, &TEST_ADDR1, TEST_NAME1, true /* with LTK */);
+    let status = add_bonds(&harness, vec![bond_data]).await?;
     expect_true!(status.error.is_none())?;
 
     // We should receive a notification for the newly added device.
     let expected = expectation::peer::address(TEST_ADDR1)
-        .and(expectation::peer::technology(TechnologyType::LowEnergy))
+        .and(expectation::peer::technology(fidl_fuchsia_bluetooth_sys::TechnologyType::LowEnergy))
         .and(expectation::peer::bonded(true));
-
-    expect_host_peer(&test_state, expected.clone()).await?;
-    let fut = test_state.aux().proxy().list_devices();
-    let devices = fut.await?;
-    expect_eq!(1, devices.len())?;
+    expect_peer(&harness, expected.clone()).await?;
 
     // Adding an entry with the existing id should fail.
-    let bond_data = new_le_bond_data(TEST_ID1, TEST_ADDR2, TEST_NAME2, true /* with LTK */);
-    let status = add_bonds(&test_state, vec![bond_data]).await?;
+    let bond_data = new_le_bond_data(&TEST_ID1, &TEST_ADDR2, TEST_NAME2, true /* with LTK */);
+    let status = add_bonds(&harness, vec![bond_data]).await?;
     expect_true!(status.error.is_some())?;
 
     // Adding an entry with a different ID but existing address should fail.
-    let bond_data = new_le_bond_data(TEST_ID2, TEST_ADDR1, TEST_NAME1, true /* with LTK */);
-    let status = add_bonds(&test_state, vec![bond_data]).await?;
+    let bond_data = new_le_bond_data(&TEST_ID2, &TEST_ADDR1, TEST_NAME1, true /* with LTK */);
+    let status = add_bonds(&harness, vec![bond_data]).await?;
     expect_true!(status.error.is_some())?;
 
     Ok(())
@@ -160,28 +145,21 @@
 
 // Tests that adding a list of bonding data with malformed content succeeds for the valid entries
 // but reports an error.
-async fn test_add_bonded_devices_invalid_entry(test_state: HostDriverHarness) -> Result<(), Error> {
-    // Devices should be initially empty.
-    let fut = test_state.aux().proxy().list_devices();
-    let devices = fut.await?;
-    expect_eq!(vec![], devices)?;
+async fn test_add_bonded_devices_invalid_entry(harness: HostDriverHarness) -> Result<(), Error> {
+    // Peers should be initially empty.
+    expect_eq!(0, harness.state().peers().len())?;
 
     // Add one entry with no LTK (invalid) and one with (valid). This should create an entry for the
     // valid device but report an error for the invalid entry.
-    let no_ltk = new_le_bond_data(TEST_ID1, TEST_ADDR1, TEST_NAME1, false);
-    let with_ltk = new_le_bond_data(TEST_ID2, TEST_ADDR2, TEST_NAME2, true);
-    let status = add_bonds(&test_state, vec![no_ltk, with_ltk]).await?;
+    let no_ltk = new_le_bond_data(&TEST_ID1, &TEST_ADDR1, TEST_NAME1, false);
+    let with_ltk = new_le_bond_data(&TEST_ID2, &TEST_ADDR2, TEST_NAME2, true);
+    let status = add_bonds(&harness, vec![no_ltk, with_ltk]).await?;
     expect_true!(status.error.is_some())?;
 
     let expected = expectation::peer::address(TEST_ADDR2)
-        .and(expectation::peer::technology(TechnologyType::LowEnergy))
+        .and(expectation::peer::technology(fidl_fuchsia_bluetooth_sys::TechnologyType::LowEnergy))
         .and(expectation::peer::bonded(true));
-
-    expect_host_peer(&test_state, expected.clone()).await?;
-    let fut = test_state.aux().proxy().list_devices();
-    let devices = fut.await?;
-    expect_eq!(1, devices.len())?;
-    expect_remote_device(&test_state, TEST_ADDR2, &expected)?;
+    expect_peer(&harness, expected.clone()).await?;
 
     Ok(())
 }
diff --git a/src/connectivity/bluetooth/tests/integration/src/tests/control.rs b/src/connectivity/bluetooth/tests/integration/src/tests/control.rs
index c3503aa..e2a71e7 100644
--- a/src/connectivity/bluetooth/tests/integration/src/tests/control.rs
+++ b/src/connectivity/bluetooth/tests/integration/src/tests/control.rs
@@ -7,7 +7,6 @@
     fidl_fuchsia_bluetooth_test::{AdvertisingData, LowEnergyPeerParameters},
     fuchsia_bluetooth::{
         expectation::{
-            self,
             asynchronous::{ExpectableState, ExpectableStateExt},
             Predicate,
         },
@@ -17,7 +16,7 @@
 };
 
 use crate::harness::control::{
-    activate_fake_host, control_expectation, control_timeout, ControlHarness, ControlState,
+    activate_fake_host, control_timeout, expectation, ControlHarness, ControlState,
     FAKE_HCI_ADDRESS,
 };
 
@@ -54,10 +53,7 @@
         let fut = control.aux().set_active_adapter(host);
         fut.await?;
         control
-            .when_satisfied(
-                control_expectation::active_host_is(host.to_string()),
-                control_timeout(),
-            )
+            .when_satisfied(expectation::active_host_is(host.to_string()), control_timeout())
             .await?;
     }
 
@@ -65,9 +61,7 @@
     fake_hci_1.destroy_and_wait().await?;
 
     for host in fake_hosts {
-        control
-            .when_satisfied(control_expectation::host_not_present(host), control_timeout())
-            .await?;
+        control.when_satisfied(expectation::host_not_present(host), control_timeout()).await?;
     }
 
     Ok(())
@@ -78,7 +72,6 @@
 
     // Insert a fake peer to test connection and disconnection.
     let peer_address = Address::Random([1, 0, 0, 0, 0, 0]);
-    let peer_address_string = peer_address.to_string();
     let peer_params = LowEnergyPeerParameters {
         address: Some(peer_address.into()),
         connectable: Some(true),
@@ -98,7 +91,7 @@
     fut.await?;
     let state = control
         .when_satisfied(
-            control_expectation::peer_exists(expectation::peer::address(&peer_address_string)),
+            expectation::peer_with_address(&peer_address.to_string()),
             control_timeout(),
         )
         .await?;
@@ -107,20 +100,16 @@
     // verify the controller state here.
 
     // We can safely unwrap here as this is guarded by the previous expectation
-    let peer = state.peers.iter().find(|(_, d)| &d.address == &peer_address_string).unwrap().0;
+    let peer = state.peers.iter().find(|(_, p)| &p.address == &peer_address.to_string()).unwrap().0;
 
     let fut = control.aux().connect(peer);
     fut.await?;
 
-    control
-        .when_satisfied(control_expectation::peer_connected(peer, true), control_timeout())
-        .await?;
+    control.when_satisfied(expectation::peer_connected(peer, true), control_timeout()).await?;
     let fut = control.aux().disconnect(peer);
     fut.await?;
 
-    control
-        .when_satisfied(control_expectation::peer_connected(peer, false), control_timeout())
-        .await?;
+    control.when_satisfied(expectation::peer_connected(peer, false), control_timeout()).await?;
 
     hci.destroy_and_wait().await?;
     Ok(())
diff --git a/src/connectivity/bluetooth/tests/integration/src/tests/host_driver.rs b/src/connectivity/bluetooth/tests/integration/src/tests/host_driver.rs
index fcf9acf..78798d6 100644
--- a/src/connectivity/bluetooth/tests/integration/src/tests/host_driver.rs
+++ b/src/connectivity/bluetooth/tests/integration/src/tests/host_driver.rs
@@ -5,8 +5,8 @@
 use {
     anyhow::{format_err, Context as _, Error},
     fidl_fuchsia_bluetooth::{DeviceClass, MAJOR_DEVICE_CLASS_TOY},
-    fidl_fuchsia_bluetooth_control::TechnologyType,
     fidl_fuchsia_bluetooth_host::HostProxy,
+    fidl_fuchsia_bluetooth_sys::TechnologyType,
     fidl_fuchsia_bluetooth_test::{EmulatorSettings, HciError, PeerProxy},
     fuchsia_async as fasync,
     fuchsia_bluetooth::{
@@ -15,18 +15,17 @@
         expectation::{self, asynchronous::ExpectableStateExt, peer},
         hci_emulator::Emulator,
         host,
-        types::{Address, HostInfo},
+        types::{Address, HostInfo, PeerId},
     },
     fuchsia_zircon as zx,
     std::{convert::TryInto, path::PathBuf},
 };
 
 use crate::harness::{
-    emulator,
+    emulator::{self, EmulatorHarness},
     expect::expect_eq,
     host_driver::{
-        expect_host_peer, expect_host_state, expect_no_peer, expect_remote_device,
-        timeout_duration, HostDriverHarness,
+        expect_host_state, expect_no_peer, expect_peer, timeout_duration, HostDriverHarness,
     },
 };
 
@@ -70,37 +69,37 @@
 }
 
 // Tests that the bt-host driver assigns the local name to "fuchsia" when initialized.
-async fn test_default_local_name(test_state: HostDriverHarness) -> Result<(), Error> {
+async fn test_default_local_name(harness: HostDriverHarness) -> Result<(), Error> {
     const NAME: &str = "fuchsia";
-    let _ = test_state
+    let _ = harness
         .when_satisfied(emulator::expectation::local_name_is(NAME), timeout_duration())
         .await?;
-    let fut = expect_host_state(&test_state, expectation::host_driver::name(NAME));
+    let fut = expect_host_state(&harness, expectation::host_driver::name(NAME));
     fut.await?;
     Ok(())
 }
 
 // Tests that the local name assigned to a bt-host is reflected in `AdapterState` and propagated
 // down to the controller.
-async fn test_set_local_name(test_state: HostDriverHarness) -> Result<(), Error> {
+async fn test_set_local_name(harness: HostDriverHarness) -> Result<(), Error> {
     const NAME: &str = "test1234";
-    let fut = test_state.aux().proxy().set_local_name(NAME);
+    let fut = harness.aux().proxy().set_local_name(NAME);
     fut.await?;
-    let _ = test_state
+    let _ = harness
         .when_satisfied(emulator::expectation::local_name_is(NAME), timeout_duration())
         .await?;
-    let fut = expect_host_state(&test_state, expectation::host_driver::name(NAME));
+    let fut = expect_host_state(&harness, expectation::host_driver::name(NAME));
     fut.await?;
 
     Ok(())
 }
 
 // Tests that the device class assigned to a bt-host gets propagated down to the controller.
-async fn test_set_device_class(test_state: HostDriverHarness) -> Result<(), Error> {
+async fn test_set_device_class(harness: HostDriverHarness) -> Result<(), Error> {
     let mut device_class = DeviceClass { value: MAJOR_DEVICE_CLASS_TOY + 4 };
-    let fut = test_state.aux().proxy().set_device_class(&mut device_class);
+    let fut = harness.aux().proxy().set_device_class(&mut device_class);
     fut.await?;
-    let _ = test_state
+    let _ = harness
         .when_satisfied(emulator::expectation::device_class_is(device_class), timeout_duration())
         .await?;
     Ok(())
@@ -108,244 +107,240 @@
 
 // Tests that host state updates when discoverable mode is turned on.
 // TODO(armansito): Test for FakeHciDevice state changes.
-async fn test_discoverable(test_state: HostDriverHarness) -> Result<(), Error> {
+async fn test_discoverable(harness: HostDriverHarness) -> Result<(), Error> {
     // Enable discoverable mode.
-    let fut = test_state.aux().proxy().set_discoverable(true);
+    let fut = harness.aux().proxy().set_discoverable(true);
     fut.await?;
-    expect_host_state(&test_state, expectation::host_driver::discoverable(true)).await?;
+    expect_host_state(&harness, expectation::host_driver::discoverable(true)).await?;
 
     // Disable discoverable mode
-    let fut = test_state.aux().proxy().set_discoverable(false);
+    let fut = harness.aux().proxy().set_discoverable(false);
     fut.await?;
-    expect_host_state(&test_state, expectation::host_driver::discoverable(false)).await?;
+    expect_host_state(&harness, expectation::host_driver::discoverable(false)).await?;
 
     Ok(())
 }
 
 // Tests that host state updates when discovery is started and stopped.
 // TODO(armansito): Test for FakeHciDevice state changes.
-async fn test_discovery(test_state: HostDriverHarness) -> Result<(), Error> {
+async fn test_discovery(harness: HostDriverHarness) -> Result<(), Error> {
     // Start discovery. "discovering" should get set to true.
-    let fut = test_state.aux().proxy().start_discovery();
+    let fut = harness.aux().proxy().start_discovery();
     fut.await?;
-    expect_host_state(&test_state, expectation::host_driver::discovering(true)).await?;
+    expect_host_state(&harness, expectation::host_driver::discovering(true)).await?;
 
     let address = Address::Random([1, 0, 0, 0, 0, 0]);
-    let fut = test_state.aux().add_le_peer_default(&address);
+    let fut = harness.aux().add_le_peer_default(&address);
     let _peer = fut.await?;
 
     // The host should discover a fake peer.
-    expect_host_peer(&test_state, peer::name("Fake").and(peer::address(&address.to_string())))
-        .await?;
+    expect_peer(&harness, peer::name("Fake").and(peer::address(address))).await?;
 
     // Stop discovery. "discovering" should get set to false.
-    let fut = test_state.aux().proxy().stop_discovery();
+    let fut = harness.aux().proxy().stop_discovery();
     fut.await?;
-    expect_host_state(&test_state, expectation::host_driver::discovering(false)).await?;
+    expect_host_state(&harness, expectation::host_driver::discovering(false)).await?;
 
     Ok(())
 }
 
 // Tests that "close" cancels all operations.
 // TODO(armansito): Test for FakeHciDevice state changes.
-async fn test_close(test_state: HostDriverHarness) -> Result<(), Error> {
+async fn test_close(harness: HostDriverHarness) -> Result<(), Error> {
     // Enable all procedures.
-    let fut = test_state.aux().proxy().start_discovery();
+    let fut = harness.aux().proxy().start_discovery();
     fut.await?;
-    let fut = test_state.aux().proxy().set_discoverable(true);
+    let fut = harness.aux().proxy().set_discoverable(true);
     fut.await?;
     let active_state = expectation::host_driver::discoverable(true)
         .and(expectation::host_driver::discovering(true));
-    expect_host_state(&test_state, active_state).await?;
+    expect_host_state(&harness, active_state).await?;
 
     // Close should cancel these procedures.
-    test_state.aux().proxy().close()?;
+    harness.aux().proxy().close()?;
 
     let closed_state_update = expectation::host_driver::discoverable(false)
         .and(expectation::host_driver::discovering(false));
 
-    expect_host_state(&test_state, closed_state_update).await?;
+    expect_host_state(&harness, closed_state_update).await?;
 
     Ok(())
 }
 
-// Tests that "list_devices" returns devices from a host's cache.
-async fn test_list_devices(test_state: HostDriverHarness) -> Result<(), Error> {
-    // Devices should be initially empty.
-    let fut = test_state.aux().proxy().list_devices();
-    let devices = fut.await?;
-    expect_eq!(vec![], devices)?;
+async fn test_watch_peers(harness: HostDriverHarness) -> Result<(), Error> {
+    // `HostDriverHarness` internally calls `Host.WatchPeers()` to monitor peers and satisfy peer
+    // expectations. `harness.peers()` represents the local cache monitored using this method.
+    // Peers should be initially empty.
+    expect_eq!(0, harness.state().peers().len())?;
+
+    // Calling `Host.WatchPeers()` directly will hang since the harness already calls this
+    // internally. We issue our own request and verify that it gets satisfied later.
 
     // Add a LE and a BR/EDR peer with the given addresses.
     let le_peer_address = Address::Random([1, 0, 0, 0, 0, 0]);
     let bredr_peer_address = Address::Public([2, 0, 0, 0, 0, 0]);
 
-    let fut = test_state.aux().add_le_peer_default(&le_peer_address);
+    let fut = harness.aux().add_le_peer_default(&le_peer_address);
     let _le_peer = fut.await?;
-    let fut = test_state.aux().add_bredr_peer_default(&bredr_peer_address);
+    let fut = harness.aux().add_bredr_peer_default(&bredr_peer_address);
     let _bredr_peer = fut.await?;
 
+    // At this stage the fake peers are registered with the emulator but bt-host does not know about
+    // them yet. Check that `watch_fut` is still unsatisfied.
+    expect_eq!(0, harness.state().peers().len())?;
+
     // Wait for all fake devices to be discovered.
-    let fut = test_state.aux().proxy().start_discovery();
-    fut.await?;
-    let expected_le = peer::address(&le_peer_address.to_string())
-        .and(peer::technology(TechnologyType::LowEnergy));
-    let expected_bredr = peer::address(&bredr_peer_address.to_string())
-        .and(peer::technology(TechnologyType::Classic));
+    let proxy = harness.aux().proxy().clone();
+    let _ = proxy.start_discovery().await?;
+    let expected_le =
+        peer::address(le_peer_address).and(peer::technology(TechnologyType::LowEnergy));
+    let expected_bredr =
+        peer::address(bredr_peer_address).and(peer::technology(TechnologyType::Classic));
 
-    expect_host_peer(&test_state, expected_le.clone()).await?;
-    expect_host_peer(&test_state, expected_bredr.clone()).await?;
+    expect_peer(&harness, expected_le).await?;
+    expect_peer(&harness, expected_bredr).await?;
+    expect_eq!(2, harness.state().peers().len())?;
 
-    // List the host's devices
-    let fut = test_state.aux().proxy().list_devices();
-    let devices = fut.await?;
-
-    // Both fake devices should be in the map.
-    expect_eq!(2, devices.len())?;
-    expect_remote_device(&test_state, &le_peer_address.to_string(), &expected_le)?;
-    expect_remote_device(&test_state, &bredr_peer_address.to_string(), &expected_bredr)?;
     Ok(())
 }
 
-async fn test_connect(test_state: HostDriverHarness) -> Result<(), Error> {
+async fn test_connect(harness: HostDriverHarness) -> Result<(), Error> {
     let address1 = Address::Random([1, 0, 0, 0, 0, 0]);
     let address2 = Address::Random([2, 0, 0, 0, 0, 0]);
-    let fut = test_state.aux().add_le_peer_default(&address1);
+    let fut = harness.aux().add_le_peer_default(&address1);
     let _peer1 = fut.await?;
-    let fut = test_state.aux().add_le_peer_default(&address2);
+    let fut = harness.aux().add_le_peer_default(&address2);
     let peer2 = fut.await?;
 
     // Configure `peer2` to return an error for the connection attempt.
     let _ = peer2.assign_connection_status(HciError::ConnectionTimeout).await?;
 
     // Start discovery and let bt-host process the fake devices.
-    let fut = test_state.aux().proxy().start_discovery();
+    let fut = harness.aux().proxy().start_discovery();
     fut.await?;
 
-    let le_dev = peer::address(&address1.to_string());
-    let le_error_dev = peer::address(&address2.to_string());
+    expect_peer(&harness, peer::address(address1)).await?;
+    expect_peer(&harness, peer::address(address2)).await?;
 
-    expect_host_peer(&test_state, le_dev).await?;
-    expect_host_peer(&test_state, le_error_dev).await?;
-
-    let fut = test_state.aux().proxy().list_devices();
-    let devices = fut.await?;
-    expect_true!(devices.len() >= 2)?;
+    let peers = harness.state().peers().clone();
+    expect_eq!(2, peers.len())?;
 
     // Obtain bt-host assigned IDs of the devices.
-    let success_dev = devices
+    let success_id = peers
         .iter()
-        .find(|x| x.address == address1.to_string())
-        .ok_or(format_err!("success peer not found"))?;
-    let failure_dev = devices
+        .find(|x| x.1.address == address1)
+        .ok_or(format_err!("success peer not found"))?
+        .0;
+    let failure_id = peers
         .iter()
-        .find(|x| x.address == address2.to_string())
-        .ok_or(format_err!("error peer not found"))?;
+        .find(|x| x.1.address == address2)
+        .ok_or(format_err!("error peer not found"))?
+        .0;
 
     // Connecting to the failure peer should result in an error.
-    let fut = test_state.aux().proxy().connect(&failure_dev.identifier);
+    let fut = harness.aux().proxy().connect(&failure_id.to_string());
     let status = fut.await?;
     expect_true!(status.error.is_some())?;
 
     // Connecting to the success peer should return success and the peer should become connected.
-    let fut = test_state.aux().proxy().connect(&success_dev.identifier);
+    let fut = harness.aux().proxy().connect(&success_id.to_string());
     let status = fut.await?;
     expect_true!(status.error.is_none())?;
 
-    let connected = peer::identifier(&success_dev.identifier).and(peer::connected(true));
-    expect_host_peer(&test_state, connected).await?;
+    let connected = peer::identifier(*success_id).and(peer::connected(true));
+    expect_peer(&harness, connected).await?;
     Ok(())
 }
 
-async fn wait_for_test_device(
-    test_state: HostDriverHarness,
+async fn wait_for_test_peer(
+    harness: HostDriverHarness,
     address: &Address,
-) -> Result<(String, PeerProxy), Error> {
-    let fut = test_state.aux().add_le_peer_default(&address);
-    let peer = fut.await?;
+) -> Result<(PeerId, PeerProxy), Error> {
+    let fut = harness.aux().add_le_peer_default(&address);
+    let proxy = fut.await?;
 
     // Start discovery and let bt-host process the fake LE peer.
-    let fut = test_state.aux().proxy().start_discovery();
+    let fut = harness.aux().proxy().start_discovery();
     fut.await?;
-    let le_dev = expectation::peer::address(&address.to_string());
-    expect_host_peer(&test_state, le_dev).await?;
-    let fut = test_state.aux().proxy().list_devices();
-    let devices = fut.await?;
-    expect_true!(devices.len() == 1)?;
+    let le_dev = expectation::peer::address(address.clone());
+    expect_peer(&harness, le_dev).await?;
 
-    // Obtain bt-host assigned IDs of the device.
-    let success_dev = devices
+    let peer_id = harness
+        .state()
+        .peers()
         .iter()
-        .find(|x| x.address == address.to_string())
-        .ok_or(format_err!("success peer not found"))?;
-
-    Ok((success_dev.identifier.clone(), peer))
+        .find(|(_, p)| p.address == *address)
+        .ok_or(format_err!("could not find peer with address: {}", address))?
+        .0
+        .clone();
+    Ok((peer_id, proxy))
 }
 
 // TODO(BT-932) - Add a test for disconnect failure when a connection attempt is outgoing, provided
 // that we can provide a manner of doing so that will not flake.
 
 /// Disconnecting from an unknown device should succeed
-async fn test_disconnect_unknown_device(test_state: HostDriverHarness) -> Result<(), Error> {
+async fn test_disconnect_unknown_device(harness: HostDriverHarness) -> Result<(), Error> {
     let unknown_id = "0123401234";
-    let fut = test_state.aux().proxy().disconnect(unknown_id);
+    let fut = harness.aux().proxy().disconnect(unknown_id);
     let status = fut.await?;
     expect_eq!(status.error, None)
 }
 
 /// Disconnecting from a known, unconnected device should succeed
-async fn test_disconnect_unconnected_device(test_state: HostDriverHarness) -> Result<(), Error> {
+async fn test_disconnect_unconnected_device(harness: HostDriverHarness) -> Result<(), Error> {
     let address = Address::Random([1, 0, 0, 0, 0, 0]);
-    let (success_dev, _proxy) = wait_for_test_device(test_state.clone(), &address).await?;
-    let fut = test_state.aux().proxy().disconnect(&success_dev);
+    let (success_dev, _proxy) = wait_for_test_peer(harness.clone(), &address).await?;
+    let fut = harness.aux().proxy().disconnect(&success_dev.to_string());
     let status = fut.await?;
     expect_eq!(status.error, None)
 }
 
 /// Disconnecting from a connected device should succeed and result in the device being disconnected
-async fn test_disconnect_connected_device(test_state: HostDriverHarness) -> Result<(), Error> {
+async fn test_disconnect_connected_device(harness: HostDriverHarness) -> Result<(), Error> {
     let address = Address::Random([1, 0, 0, 0, 0, 0]);
-    let (success_dev, _proxy) = wait_for_test_device(test_state.clone(), &address).await?;
+    let (success_dev, _proxy) = wait_for_test_peer(harness.clone(), &address).await?;
+    let success_dev = success_dev.to_string();
 
-    let fut = test_state.aux().proxy().connect(&success_dev);
+    let fut = harness.aux().proxy().connect(&success_dev);
     let status = fut.await?;
     expect_eq!(status.error, None)?;
 
-    let connected = peer::address(&address.to_string()).and(peer::connected(true));
-    let disconnected = peer::address(&address.to_string()).and(peer::connected(false));
+    let connected = peer::address(address).and(peer::connected(true));
+    let disconnected = peer::address(address).and(peer::connected(false));
 
-    let _ = expect_host_peer(&test_state, connected).await?;
-    let fut = test_state.aux().proxy().disconnect(&success_dev);
+    let _ = expect_peer(&harness, connected).await?;
+    let fut = harness.aux().proxy().disconnect(&success_dev);
     let status = fut.await?;
     expect_eq!(status.error, None)?;
-    let _ = expect_host_peer(&test_state, disconnected).await?;
+    let _ = expect_peer(&harness, disconnected).await?;
     Ok(())
 }
 
-async fn test_forget(test_state: HostDriverHarness) -> Result<(), Error> {
+async fn test_forget(harness: HostDriverHarness) -> Result<(), Error> {
     let address = Address::Random([1, 0, 0, 0, 0, 0]);
-    let (le_peer, _proxy) = wait_for_test_device(test_state.clone(), &address).await?;
+    let (le_peer, _proxy) = wait_for_test_peer(harness.clone(), &address).await?;
 
     // Start discovery and let bt-host process the fake peers.
-    let fut = test_state.aux().proxy().start_discovery();
+    let fut = harness.aux().proxy().start_discovery();
     fut.await?;
 
     // Wait for fake peer to be discovered.
-    let expected_peer = expectation::peer::address(&address.to_string());
-    expect_host_peer(&test_state, expected_peer.clone()).await?;
+    let expected_peer = expectation::peer::address(address);
+    expect_peer(&harness, expected_peer.clone()).await?;
 
     // Connecting to the peer should return success and the peer should become connected.
-    let fut = test_state.aux().proxy().connect(&le_peer);
+    let fut = harness.aux().proxy().connect(&le_peer.to_string());
     let mut status = fut.await?;
     expect_true!(status.error.is_none())?;
 
-    expect_host_peer(&test_state, expected_peer.and(expectation::peer::connected(true))).await?;
+    expect_peer(&harness, expected_peer.and(expectation::peer::connected(true))).await?;
 
     // Forgetting the peer should result in its removal.
-    let fut = test_state.aux().proxy().forget(&le_peer);
+    let fut = harness.aux().proxy().forget(&le_peer.to_string());
     status = fut.await?;
     expect_true!(status.error.is_none())?;
-    expect_no_peer(&test_state, le_peer).await?;
+    expect_no_peer(&harness, le_peer).await?;
 
     // TODO(BT-879): Test that the link closes by querying fake HCI.
 
@@ -364,7 +359,7 @@
             test_discoverable,
             test_discovery,
             test_close,
-            test_list_devices,
+            test_watch_peers,
             test_connect,
             test_forget,
             test_disconnect_unknown_device,