| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use failure::format_err; |
| use fidl::endpoints::{RequestStream, ServerEnd}; |
| use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; |
| use futures::prelude::*; |
| use futures::try_join; |
| use log::error; |
| use parking_lot::Mutex; |
| use std::collections::HashMap; |
| use std::sync::Arc; |
| use fidl_fuchsia_wlan_device_service::{self as fidl_svc, DeviceWatcherControlHandle, DeviceWatcherRequestStream}; |
| |
| use crate::Never; |
| use crate::watchable_map::{MapEvent, WatchableMap}; |
| |
| // In reality, P and I are always PhyDevice and IfaceDevice, respectively. |
| // They are generic solely for the purpose of mocking for tests. |
| pub fn serve_watchers<P, I>(phys: Arc<WatchableMap<u16, P>>, |
| ifaces: Arc<WatchableMap<u16, I>>, |
| phy_events: UnboundedReceiver<MapEvent<u16, P>>, |
| iface_events: UnboundedReceiver<MapEvent<u16, I>>) |
| -> (WatcherService<P, I>, impl Future<Output = Result<Never, failure::Error>>) |
| where P: 'static, I: 'static |
| { |
| let inner = Arc::new(Mutex::new(Inner { |
| watchers: HashMap::new(), |
| next_watcher_id: 0, |
| phys, |
| ifaces |
| })); |
| let (reaper_sender, reaper_receiver) = mpsc::unbounded(); |
| let s = WatcherService { |
| inner: Arc::clone(&inner), |
| reaper_queue: reaper_sender |
| }; |
| |
| let fut = async move { |
| let phy_fut = notify_phy_watchers(phy_events, &inner); |
| let iface_fut = notify_iface_watchers(iface_events, &inner); |
| let reaper_fut = reap_watchers(&inner, reaper_receiver); |
| try_join!(phy_fut, iface_fut, reaper_fut) |
| .map(|x: (Never, Never, Never)| x.0) |
| }; |
| (s, fut) |
| } |
| |
| pub struct WatcherService<P, I> { |
| inner: Arc<Mutex<Inner<P, I>>>, |
| reaper_queue: UnboundedSender<ReaperTask> |
| } |
| |
| // Manual clone impl since #derive uses incorrect trait bounds |
| impl<P, I> Clone for WatcherService<P, I> { |
| fn clone(&self) -> Self { |
| WatcherService { |
| inner: self.inner.clone(), |
| reaper_queue: self.reaper_queue.clone() |
| } |
| } |
| } |
| |
| impl<P, I> WatcherService<P, I> { |
| pub fn add_watcher(&self, endpoint: ServerEnd<fidl_svc::DeviceWatcherMarker>) |
| -> Result<(), fidl::Error> |
| { |
| let stream = endpoint.into_stream()?; |
| let handle = stream.control_handle(); |
| let mut guard = self.inner.lock(); |
| let inner = &mut *guard; |
| self.reaper_queue.unbounded_send(ReaperTask { |
| watcher_channel: stream, |
| watcher_id: inner.next_watcher_id |
| }).expect("failed to submit a task to the watcher reaper: {}"); |
| inner.watchers.insert(inner.next_watcher_id, Watcher { |
| handle, |
| sent_phy_snapshot: false, |
| sent_iface_snapshot: false |
| }); |
| inner.phys.request_snapshot(); |
| inner.ifaces.request_snapshot(); |
| inner.next_watcher_id += 1; |
| Ok(()) |
| } |
| } |
| |
| struct Inner<P, I> { |
| watchers: HashMap<u64, Watcher>, |
| next_watcher_id: u64, |
| phys: Arc<WatchableMap<u16, P>>, |
| ifaces: Arc<WatchableMap<u16, I>>, |
| } |
| |
| struct Watcher { |
| handle: DeviceWatcherControlHandle, |
| sent_phy_snapshot: bool, |
| sent_iface_snapshot: bool, |
| } |
| |
| impl<P, I> Inner<P, I> { |
| fn notify_watchers<F, G>(&mut self, sent_snapshot: F, send_event: G) |
| where F: Fn(&Watcher) -> bool, |
| G: Fn(&DeviceWatcherControlHandle) -> Result<(), fidl::Error> |
| { |
| self.watchers.retain(|_, w| { |
| if sent_snapshot(w) { |
| let r = send_event(&w.handle); |
| handle_send_result(&w.handle, r) |
| } else { |
| true |
| } |
| }) |
| } |
| |
| fn send_snapshot<F, G, T>(&mut self, sent_snapshot: F, send_on_add: G, |
| snapshot: Arc<HashMap<u16, T>>) |
| where F: Fn(&mut Watcher) -> &mut bool, |
| G: Fn(&DeviceWatcherControlHandle, u16) -> Result<(), fidl::Error> |
| { |
| self.watchers.retain(|_, w| { |
| if !*sent_snapshot(w) { |
| for key in snapshot.keys() { |
| let r = send_on_add(&w.handle, *key); |
| if !handle_send_result(&w.handle, r) { |
| return false; |
| } |
| } |
| *sent_snapshot(w) = true; |
| } |
| true |
| }) |
| } |
| } |
| |
| fn handle_send_result(handle: &DeviceWatcherControlHandle, r: Result<(), fidl::Error>) -> bool { |
| if let Err(e) = r.as_ref() { |
| error!("Error sending event to watcher: {}", e); |
| handle.shutdown(); |
| } |
| r.is_ok() |
| } |
| |
| async fn notify_phy_watchers<P, I>(mut events: UnboundedReceiver<MapEvent<u16, P>>, |
| inner: &Mutex<Inner<P, I>>) |
| -> Result<Never, failure::Error> |
| { |
| while let Some(e) = await!(events.next()) { |
| match e { |
| MapEvent::KeyInserted(id) => inner.lock().notify_watchers( |
| |w| w.sent_phy_snapshot, |h| h.send_on_phy_added(id)), |
| MapEvent::KeyRemoved(id) => inner.lock().notify_watchers( |
| |w| w.sent_phy_snapshot, |h| h.send_on_phy_removed(id)), |
| MapEvent::Snapshot(s) => inner.lock().send_snapshot( |
| |w| &mut w.sent_phy_snapshot, |h, id| h.send_on_phy_added(id), s) |
| } |
| } |
| Err(format_err!("stream of events from the phy device map has ended unexpectedly")) |
| } |
| |
| async fn notify_iface_watchers<P, I>(mut events: UnboundedReceiver<MapEvent<u16, I>>, |
| inner: &Mutex<Inner<P, I>>) |
| -> Result<Never, failure::Error> |
| { |
| while let Some(e) = await!(events.next()) { |
| match e { |
| MapEvent::KeyInserted(id) => inner.lock().notify_watchers( |
| |w| w.sent_iface_snapshot, |h| h.send_on_iface_added(id)), |
| MapEvent::KeyRemoved(id) => inner.lock().notify_watchers( |
| |w| w.sent_iface_snapshot, |h| h.send_on_iface_removed(id)), |
| MapEvent::Snapshot(s) => inner.lock().send_snapshot( |
| |w| &mut w.sent_iface_snapshot, |h, id| h.send_on_iface_added(id), s) |
| } |
| } |
| Err(format_err!("stream of events from the iface device map has ended unexpectedly")) |
| } |
| |
| struct ReaperTask { |
| watcher_channel: DeviceWatcherRequestStream, |
| watcher_id: u64, |
| } |
| |
| /// A future that removes watchers from device maps when their FIDL channels get closed. |
| /// Performing this clean up solely when notification fails is not sufficient: |
| /// in the scenario where devices are not being added or removed, but new clients come and go, |
| /// the watcher list could grow without bound. |
| async fn reap_watchers<P, I>(inner: &Mutex<Inner<P, I>>, watchers: UnboundedReceiver<ReaperTask>) |
| -> Result<Never, failure::Error> |
| { |
| const REAP_CONCURRENT_LIMIT: usize = 10000; |
| await!(watchers.for_each_concurrent(REAP_CONCURRENT_LIMIT, move |w| { |
| // Wait for the other side to close the channel (or an error to occur) |
| // and remove the watcher from the maps |
| async move { |
| await!(w.watcher_channel.map(|_| ()).collect::<()>()); |
| inner.lock().watchers.remove(&w.watcher_id); |
| } |
| })); |
| Err(format_err!("stream of watcher channels has ended unexpectedly")) |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use fidl_fuchsia_wlan_device_service::DeviceWatcherEvent; |
| use fuchsia_async as fasync; |
| use fuchsia_zircon as zx; |
| use futures::task::Poll; |
| use pin_utils::pin_mut; |
| use std::mem; |
| |
| #[test] |
| fn reap_watchers() { |
| let exec = &mut fasync::Executor::new().expect("Failed to create an executor"); |
| let (helper, future) = setup(); |
| pin_mut!(future); |
| assert_eq!(0, helper.service.inner.lock().watchers.len()); |
| let (client_end, server_end) = fidl::endpoints::create_endpoints() |
| .expect("Failed to create endpoints"); |
| |
| // Add a watcher and check that it was added to the map |
| helper.service.add_watcher(server_end).expect("add_watcher failed"); |
| assert_eq!(1, helper.service.inner.lock().watchers.len()); |
| |
| // Run the reaper and make sure the watcher is still there |
| if let Poll::Ready(Err(e)) = exec.run_until_stalled(&mut future) { |
| panic!("future returned an error (1): {:?}", e); |
| } |
| assert_eq!(1, helper.service.inner.lock().watchers.len()); |
| |
| // Drop the client end of the channel and run the reaper again |
| mem::drop(client_end); |
| if let Poll::Ready(Err(e)) = exec.run_until_stalled(&mut future) { |
| panic!("future returned an error (1): {:?}", e); |
| } |
| assert_eq!(0, helper.service.inner.lock().watchers.len()); |
| } |
| |
| #[test] |
| fn add_remove_phys() { |
| let exec = &mut fasync::Executor::new().expect("Failed to create an executor"); |
| let (helper, future) = setup(); |
| pin_mut!(future); |
| let (proxy, server_end) = fidl::endpoints::create_proxy() |
| .expect("Failed to create endpoints"); |
| helper.service.add_watcher(server_end).expect("add_watcher failed"); |
| |
| helper.phys.insert(20, 2000); |
| helper.phys.insert(30, 3000); |
| helper.phys.remove(&20); |
| |
| // Run the server future to propagate the events to FIDL clients |
| if let Poll::Ready(Err(e)) = exec.run_until_stalled(&mut future) { |
| panic!("server future returned an error: {:?}", e); |
| } |
| |
| let events = fetch_events(exec, proxy.take_event_stream()); |
| assert_eq!(3, events.len()); |
| // Sadly, generated Event struct doesn't implement PartialEq |
| match &events[0] { |
| &DeviceWatcherEvent::OnPhyAdded{ phy_id: 20 } => {}, |
| other => panic!("Expected OnPhyAdded(20), got {:?}", other) |
| } |
| match &events[1] { |
| &DeviceWatcherEvent::OnPhyAdded{ phy_id: 30 } => {}, |
| other => panic!("Expected OnPhyAdded(30), got {:?}", other) |
| } |
| match &events[2] { |
| &DeviceWatcherEvent::OnPhyRemoved{ phy_id: 20 } => {}, |
| other => panic!("Expected OnPhyRemoved(20), got {:?}", other) |
| } |
| } |
| |
| #[test] |
| fn add_remove_ifaces() { |
| let exec = &mut fasync::Executor::new().expect("Failed to create an executor"); |
| let (helper, future) = setup(); |
| pin_mut!(future); |
| let (proxy, server_end) = fidl::endpoints::create_proxy() |
| .expect("Failed to create endpoints"); |
| helper.service.add_watcher(server_end).expect("add_watcher failed"); |
| |
| helper.ifaces.insert(50, 5000); |
| helper.ifaces.remove(&50); |
| |
| // Run the server future to propagate the events to FIDL clients |
| if let Poll::Ready(Err(e)) = exec.run_until_stalled(&mut future) { |
| panic!("server future returned an error: {:?}", e); |
| } |
| |
| let events = fetch_events(exec, proxy.take_event_stream()); |
| assert_eq!(2, events.len()); |
| match &events[0] { |
| &DeviceWatcherEvent::OnIfaceAdded{ iface_id: 50 } => {}, |
| other => panic!("Expected OnIfaceAdded(50), got {:?}", other) |
| } |
| match &events[1] { |
| &DeviceWatcherEvent::OnIfaceRemoved{ iface_id: 50 } => {}, |
| other => panic!("Expected OnIfaceRemoved(50), got {:?}", other) |
| } |
| } |
| |
| #[test] |
| fn snapshot_phys() { |
| let exec = &mut fasync::Executor::new().expect("Failed to create an executor"); |
| let (helper, future) = setup(); |
| pin_mut!(future); |
| |
| // Add and remove phys before we the watcher is added |
| helper.phys.insert(20, 2000); |
| helper.phys.insert(30, 3000); |
| helper.phys.remove(&20); |
| |
| // Now add the watcher and pump the events |
| let (proxy, server_end) = fidl::endpoints::create_proxy() |
| .expect("Failed to create endpoints"); |
| helper.service.add_watcher(server_end).expect("add_watcher failed"); |
| if let Poll::Ready(Err(e)) = exec.run_until_stalled(&mut future) { |
| panic!("server future returned an error: {:?}", e); |
| } |
| |
| // The watcher should only see phy #30 being "added" |
| let events = fetch_events(exec, proxy.take_event_stream()); |
| assert_eq!(1, events.len()); |
| match &events[0] { |
| &DeviceWatcherEvent::OnPhyAdded{ phy_id: 30 } => {}, |
| other => panic!("Expected OnPhyAdded(30), got {:?}", other) |
| } |
| } |
| |
| #[test] |
| fn snapshot_ifaces() { |
| let exec = &mut fasync::Executor::new().expect("Failed to create an executor"); |
| let (helper, future) = setup(); |
| pin_mut!(future); |
| |
| // Add and remove ifaces before we the watcher is added |
| helper.ifaces.insert(20, 2000); |
| helper.ifaces.insert(30, 3000); |
| helper.ifaces.remove(&20); |
| |
| // Now add the watcher and pump the events |
| let (proxy, server_end) = fidl::endpoints::create_proxy() |
| .expect("Failed to create endpoints"); |
| helper.service.add_watcher(server_end).expect("add_watcher failed"); |
| if let Poll::Ready(Err(e)) = exec.run_until_stalled(&mut future) { |
| panic!("server future returned an error: {:?}", e); |
| } |
| |
| // The watcher should only see iface #30 being "added" |
| let events = fetch_events(exec, proxy.take_event_stream()); |
| assert_eq!(1, events.len()); |
| match &events[0] { |
| &DeviceWatcherEvent::OnIfaceAdded{ iface_id: 30 } => {}, |
| other => panic!("Expected OnIfaceAdded(30), got {:?}", other) |
| } |
| } |
| |
| #[test] |
| fn two_watchers() { |
| let exec = &mut fasync::Executor::new().expect("Failed to create an executor"); |
| let (helper, future) = setup(); |
| pin_mut!(future); |
| |
| helper.ifaces.insert(20, 2000); |
| |
| // Add first watcher |
| let (proxy_one, server_end_one) = fidl::endpoints::create_proxy() |
| .expect("Failed to create endpoints"); |
| helper.service.add_watcher(server_end_one).expect("add_watcher failed (1)"); |
| |
| // Add second watcher |
| let (proxy_two, server_end_two) = fidl::endpoints::create_proxy() |
| .expect("Failed to create endpoints"); |
| helper.service.add_watcher(server_end_two).expect("add_watcher failed (2)"); |
| |
| // Deliver events |
| if let Poll::Ready(Err(e)) = exec.run_until_stalled(&mut future) { |
| panic!("server future returned an error: {:?}", e); |
| } |
| |
| // Each should only receive a single snapshot, despite two snapshots being |
| // requested |
| let events_one = fetch_events(exec, proxy_one.take_event_stream()); |
| assert_eq!(1, events_one.len()); |
| let events_two = fetch_events(exec, proxy_two.take_event_stream()); |
| assert_eq!(1, events_two.len()); |
| } |
| |
| #[test] |
| fn remove_watcher_on_send_error() { |
| let exec = &mut fasync::Executor::new().expect("Failed to create an executor"); |
| let (helper, future) = setup(); |
| pin_mut!(future); |
| |
| let (client_chan, server_chan) = zx::Channel::create().unwrap(); |
| // Make a channel without a WRITE permission to make sure sending an event fails |
| let server_handle: zx::Handle = server_chan.into(); |
| let reduced_chan: zx::Channel = server_handle |
| .replace(zx::Rights::READ | zx::Rights::WAIT) |
| .unwrap() |
| .into(); |
| |
| helper.service.add_watcher(ServerEnd::new(reduced_chan)) |
| .expect("add_watcher failed"); |
| if let Poll::Ready(Err(e)) = exec.run_until_stalled(&mut future) { |
| panic!("future returned an error (1): {:?}", e); |
| } |
| assert_eq!(1, helper.service.inner.lock().watchers.len()); |
| |
| // Not add a phy to trigger an event |
| helper.phys.insert(20, 2000); |
| |
| // The watcher should be now removed since sending the event fails |
| if let Poll::Ready(Err(e)) = exec.run_until_stalled(&mut future) { |
| panic!("future returned an error (1): {:?}", e); |
| } |
| assert_eq!(0, helper.service.inner.lock().watchers.len()); |
| |
| // Make sure the client endpoint is only dropped at the end, so that the watcher |
| // is not removed by the reaper thread |
| mem::drop(client_chan); |
| } |
| |
| struct Helper { |
| phys: Arc<WatchableMap<u16, i32>>, |
| ifaces: Arc<WatchableMap<u16, i32>>, |
| service: WatcherService<i32, i32>, |
| } |
| |
| fn setup() -> (Helper, impl Future<Output = Result<Never, failure::Error>>) { |
| let (phys, phy_events) = WatchableMap::new(); |
| let (ifaces, iface_events) = WatchableMap::new(); |
| let phys = Arc::new(phys); |
| let ifaces = Arc::new(ifaces); |
| let (service, future) = serve_watchers(phys.clone(), ifaces.clone(), phy_events, iface_events); |
| let helper = Helper { phys, ifaces, service }; |
| (helper, future) |
| } |
| |
| fn fetch_events(exec: &mut fasync::Executor, |
| stream: fidl_svc::DeviceWatcherEventStream) -> Vec<DeviceWatcherEvent> { |
| let events = Arc::new(Mutex::new(Some(Vec::new()))); |
| let events_two = events.clone(); |
| let mut event_fut = stream |
| .try_for_each(move |e| future::ready(Ok(events_two.lock().as_mut().unwrap().push(e)))); |
| if let Poll::Ready(Err(e)) = exec.run_until_stalled(&mut event_fut) { |
| panic!("event stream future returned an error: {:?}", e); |
| } |
| let events = events.lock().take().unwrap(); |
| events |
| } |
| } |