blob: f004113f46fecfef9d11fd022fa88b171a0c989a [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use async_utils::hanging_get::server as hanging_get;
use fidl::endpoints::ControlHandle;
use fuchsia_component::server::{ServiceFsDir, ServiceObjLocal};
use futures::lock::Mutex;
use futures::{TryFutureExt as _, TryStreamExt as _};
use std::sync::Arc;
use tracing::error;
use {fidl_fuchsia_net_reachability as freachability, fuchsia_async as fasync};
type WatchResponder = freachability::MonitorWatchResponder;
type NotifyFn = Box<dyn Fn(&freachability::Snapshot, WatchResponder) -> bool>;
type ReachabilityBroker =
hanging_get::HangingGet<freachability::Snapshot, WatchResponder, NotifyFn>;
type ReachabilityPublisher =
hanging_get::Publisher<freachability::Snapshot, WatchResponder, NotifyFn>;
pub struct ReachabilityHandler {
state: Arc<Mutex<ReachabilityState>>,
broker: Arc<Mutex<ReachabilityBroker>>,
publisher: Arc<Mutex<ReachabilityPublisher>>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct ReachabilityState {
pub internet_available: bool,
pub gateway_reachable: bool,
pub dns_active: bool,
pub http_active: bool,
}
impl From<ReachabilityState> for freachability::Snapshot {
fn from(state: ReachabilityState) -> Self {
Self {
internet_available: Some(state.internet_available),
gateway_reachable: Some(state.gateway_reachable),
dns_active: Some(state.dns_active),
http_active: Some(state.http_active),
..Default::default()
}
}
}
impl ReachabilityHandler {
pub fn new() -> Self {
let notify_fn: NotifyFn = Box::new(|state, responder| match responder.send(&state) {
Ok(()) => true,
Err(e) => {
error!("Failed to send reachability state to client: {}", e);
false
}
});
let state = ReachabilityState {
internet_available: false,
gateway_reachable: false,
dns_active: false,
http_active: false,
};
let broker = hanging_get::HangingGet::new(state.clone().into(), notify_fn);
let publisher = broker.new_publisher();
Self {
state: Arc::new(Mutex::new(state)),
broker: Arc::new(Mutex::new(broker)),
publisher: Arc::new(Mutex::new(publisher)),
}
}
pub async fn replace_state(&mut self, new_state: ReachabilityState) {
self.update_state(|state| *state = new_state).await;
}
async fn update_state(&mut self, update_callback: impl FnOnce(&mut ReachabilityState)) {
let mut current_state_guard = self.state.lock().await;
let previous_state = current_state_guard.clone();
update_callback(&mut current_state_guard);
if *current_state_guard != previous_state {
self.publisher
.lock()
.await
.set(freachability::Snapshot::from(current_state_guard.clone()));
}
}
pub fn publish_service<'a, 'b>(
&mut self,
mut svc_dir: ServiceFsDir<'a, ServiceObjLocal<'b, ()>>,
) {
let _ = svc_dir.add_fidl_service({
let broker = self.broker.clone();
move |mut stream: freachability::MonitorRequestStream| {
let broker = broker.clone();
fasync::Task::local(
async move {
let subscriber = broker.lock().await.new_subscriber();
// Keep track of whether SetOptions or Watch were already called. Calling
// SetOptions after either it or Watch have already been called will result in us
// closing the request stream.
let mut set_options_called = false;
let mut watch_called = false;
while let Some(req) = stream.try_next().await? {
match req {
freachability::MonitorRequest::Watch { responder } => {
watch_called = true;
subscriber.register(responder)?
}
freachability::MonitorRequest::SetOptions {
payload: _,
control_handle,
} => {
if watch_called || set_options_called {
control_handle.shutdown_with_epitaph(
fidl::Status::CONNECTION_ABORTED,
);
break;
}
set_options_called = true;
}
}
}
Ok(())
}
.unwrap_or_else(|e: anyhow::Error| error!("{:?}", e)),
)
.detach()
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Error;
use assert_matches::assert_matches;
use fidl::endpoints::Proxy;
use fuchsia_component::server::ServiceFs;
use futures::StreamExt as _;
use std::cell::RefCell;
use std::task::Poll;
struct TestEnv {
connector: fuchsia_component::server::ProtocolConnector,
}
impl TestEnv {
fn new(mut service_fs: ServiceFs<ServiceObjLocal<'static, ()>>) -> Self {
let connector = service_fs.create_protocol_connector().unwrap();
fasync::Task::local(service_fs.collect()).detach();
Self { connector }
}
fn connect_client(&self) -> FakeClient {
let watcher_proxy =
self.connector.connect_to_protocol::<freachability::MonitorMarker>().unwrap();
FakeClient { watcher_proxy, hanging_watcher_request: RefCell::new(None) }
}
}
struct FakeClient {
watcher_proxy: freachability::MonitorProxy,
hanging_watcher_request:
RefCell<Option<fidl::client::QueryResponseFut<freachability::Snapshot>>>,
}
impl FakeClient {
fn get_reachability_state(
&self,
executor: &mut fasync::TestExecutor,
) -> Result<Option<freachability::Snapshot>, Error> {
let mut watch_request = self
.hanging_watcher_request
.take()
.take()
.unwrap_or_else(|| self.watcher_proxy.watch());
match executor.run_until_stalled(&mut watch_request) {
Poll::Pending => {
let _: Option<fidl::client::QueryResponseFut<freachability::Snapshot>> =
self.hanging_watcher_request.replace(Some(watch_request));
Ok(None)
}
Poll::Ready(Ok(state)) => Ok(Some(state)),
Poll::Ready(Err(e)) => Err(e.into()),
}
}
}
// Tests that the handler correctly implements the hanging-get pattern.
#[test]
fn test_hanging_get() {
let mut executor = fasync::TestExecutor::new();
let mut service_fs = ServiceFs::new_local();
let mut handler = ReachabilityHandler::new();
handler.publish_service(service_fs.root_dir());
let test_env = TestEnv::new(service_fs);
let client = test_env.connect_client();
assert_matches!(
client.get_reachability_state(&mut executor),
Ok(Some(freachability::Snapshot {
internet_available: Some(false),
gateway_reachable: Some(false),
dns_active: Some(false),
..
}))
);
// Verify no response as state hasn't changed.
assert_matches!(client.get_reachability_state(&mut executor), Ok(None));
executor.run_singlethreaded(handler.replace_state(ReachabilityState {
internet_available: true,
gateway_reachable: true,
dns_active: true,
http_active: true,
}));
assert_matches!(
client.get_reachability_state(&mut executor),
Ok(Some(freachability::Snapshot {
internet_available: Some(true),
gateway_reachable: Some(true),
dns_active: Some(true),
http_active: Some(true),
..
}))
);
}
#[test]
fn test_hanging_get_multiple_clients() {
let mut executor = fasync::TestExecutor::new();
let mut service_fs = ServiceFs::new_local();
let mut handler = ReachabilityHandler::new();
handler.publish_service(service_fs.root_dir());
let test_env = TestEnv::new(service_fs);
let client1 = test_env.connect_client();
let client2 = test_env.connect_client();
assert_matches!(
client1.get_reachability_state(&mut executor),
Ok(Some(freachability::Snapshot {
internet_available: Some(false),
gateway_reachable: Some(false),
dns_active: Some(false),
..
}))
);
assert_matches!(
client2.get_reachability_state(&mut executor),
Ok(Some(freachability::Snapshot {
internet_available: Some(false),
gateway_reachable: Some(false),
dns_active: Some(false),
..
}))
);
assert_matches!(client1.get_reachability_state(&mut executor), Ok(None));
assert_matches!(client2.get_reachability_state(&mut executor), Ok(None));
executor.run_singlethreaded(handler.update_state(|state| {
state.internet_available = true;
state.gateway_reachable = true;
}));
assert_matches!(
client1.get_reachability_state(&mut executor),
Ok(Some(freachability::Snapshot {
internet_available: Some(true),
gateway_reachable: Some(true),
dns_active: Some(false),
..
}))
);
assert_matches!(
client2.get_reachability_state(&mut executor),
Ok(Some(freachability::Snapshot {
internet_available: Some(true),
gateway_reachable: Some(true),
dns_active: Some(false),
..
}))
);
// An update that does not change the current state should not be published.
executor.run_singlethreaded(handler.update_state(|state| {
state.internet_available = true;
state.gateway_reachable = true;
state.dns_active = false;
}));
assert_matches!(client1.get_reachability_state(&mut executor), Ok(None));
assert_matches!(client2.get_reachability_state(&mut executor), Ok(None));
}
// Tests that the handler closes the request stream if the client calls SetOptions after having
// already called Watch.
#[test]
fn test_cannot_call_set_options_after_watch() {
let mut executor = fasync::TestExecutor::new();
let mut service_fs = ServiceFs::new_local();
let mut handler = ReachabilityHandler::new();
handler.publish_service(service_fs.root_dir());
let test_env = TestEnv::new(service_fs);
let client = test_env.connect_client();
assert_matches!(client.get_reachability_state(&mut executor), Ok(_));
assert_matches!(
client.watcher_proxy.set_options(&freachability::MonitorOptions::default()),
Ok(())
);
assert_matches!(executor.run_singlethreaded(client.watcher_proxy.on_closed()), Ok(_));
}
// Tests that the handler closes the request stream if the client calls SetOptions after having
// already called it before.
#[test]
fn test_cannot_call_set_options_twice() {
let mut executor = fasync::TestExecutor::new();
let mut service_fs = ServiceFs::new_local();
let mut handler = ReachabilityHandler::new();
handler.publish_service(service_fs.root_dir());
let test_env = TestEnv::new(service_fs);
let client = test_env.connect_client();
assert_matches!(
client.watcher_proxy.set_options(&freachability::MonitorOptions::default()),
Ok(())
);
assert_matches!(
client.watcher_proxy.set_options(&freachability::MonitorOptions::default()),
Ok(())
);
assert_matches!(executor.run_singlethreaded(client.watcher_proxy.on_closed()), Ok(_));
}
}