| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| anyhow::{self, Context, Error}, |
| emergency_lib::{ |
| bss_cache::{Bss, BssCache, BssId, RealBssCache, UpdateError}, |
| bss_resolver::{BssResolver, RealBssResolver, ResolverError}, |
| }, |
| emergency_metrics_registry::{ |
| self as metrics, EmergencyGetCurrentFailureMetricDimensionCause as GetCurrentFailure, |
| EmergencyGetCurrentResultMetricDimensionResult as GetCurrentResult, |
| WlanSensorReportMetricDimensionResult as WlanSensorReportResult, |
| EMERGENCY_GET_CURRENT_ACCURACY_METRIC_ID as GET_CURRENT_ACCURACY_METRIC_ID, |
| EMERGENCY_GET_CURRENT_FAILURE_METRIC_ID as GET_CURRENT_FAILURE_METRIC_ID, |
| EMERGENCY_GET_CURRENT_LATENCY_METRIC_ID as GET_CURRENT_LATENCY_METRIC_ID, |
| EMERGENCY_GET_CURRENT_RESULT_METRIC_ID as GET_CURRENT_RESULT_METRIC_ID, |
| WLAN_SENSOR_REPORT_METRIC_ID, |
| }, |
| fidl_fuchsia_location::Error as LocationError, |
| fidl_fuchsia_location_position::{ |
| EmergencyProviderRequest, EmergencyProviderRequestStream, Position, |
| }, |
| fidl_fuchsia_location_sensor::{ |
| WlanBaseStationWatcherRequest, WlanBaseStationWatcherRequestStream, |
| }, |
| fidl_fuchsia_net_http::LoaderMarker as HttpLoaderMarker, |
| fuchsia_async as fasync, |
| fuchsia_cobalt::{CobaltConnector, CobaltSender, ConnectionType}, |
| fuchsia_component::{client::connect_to_service, server::ServiceFs}, |
| fuchsia_syslog::{self as syslog}, |
| futures::{lock::Mutex, prelude::*}, |
| log::info, |
| std::{ |
| convert::TryFrom, |
| time::{Duration, Instant}, |
| }, |
| }; |
| |
| const CONCURRENCY_LIMIT: Option<usize> = None; |
| const API_KEY_FILE: &str = "/config/data/google_maps_api_key.txt"; |
| |
| /// Wraps all hosted protocols into a single type that can be matched against |
| /// and dispatched. |
| enum IncomingRequest { |
| EmergencyProviderRequest(EmergencyProviderRequestStream), |
| WlanBaseStationWatcherRequest(WlanBaseStationWatcherRequestStream), |
| } |
| |
| #[fasync::run_singlethreaded] |
| async fn main() -> Result<(), anyhow::Error> { |
| syslog::init().context("failed to initialize logging")?; |
| |
| let (cobalt_api, cobalt_fut) = |
| CobaltConnector::default().serve(ConnectionType::project_id(metrics::PROJECT_ID)); |
| let _cobalt_task = fasync::Task::spawn(cobalt_fut); |
| let bss_cache = Mutex::new(RealBssCache::new()); |
| let bss_resolver = RealBssResolver::new( |
| connect_to_service::<HttpLoaderMarker>().context("failed to connect to http loader")?, |
| std::fs::read_to_string(API_KEY_FILE) |
| .with_context(|| format!("failed to read {}", API_KEY_FILE))?, |
| ); |
| let mut service_fs = ServiceFs::new_local(); |
| service_fs |
| .dir("svc") |
| .add_fidl_service(IncomingRequest::EmergencyProviderRequest) |
| .add_fidl_service(IncomingRequest::WlanBaseStationWatcherRequest); |
| service_fs |
| .take_and_serve_directory_handle() |
| .context("failed to serve outgoing namespace")? |
| .for_each_concurrent(CONCURRENCY_LIMIT, |connection| { |
| handle_client_requests(&bss_cache, &bss_resolver, connection, cobalt_api.clone()) |
| .unwrap_or_else(|e| info!("connection terminated: {:?}", e)) |
| }) |
| .await; |
| |
| Ok(()) |
| } |
| |
| async fn handle_client_requests<C: BssCache, R: BssResolver>( |
| bss_cache: &Mutex<C>, |
| bss_resolver: &R, |
| protocol: IncomingRequest, |
| cobalt_api: CobaltSender, |
| ) -> Result<(), Error> { |
| match protocol { |
| IncomingRequest::EmergencyProviderRequest(client) => { |
| process_location_queries(&bss_cache, bss_resolver, client, cobalt_api).await |
| } |
| IncomingRequest::WlanBaseStationWatcherRequest(client) => { |
| process_bss_updates(&bss_cache, client, cobalt_api).await |
| } |
| } |
| } |
| |
| async fn process_location_queries<C: BssCache, R: BssResolver>( |
| bss_cache: &Mutex<C>, |
| bss_resolver: &R, |
| mut stream: EmergencyProviderRequestStream, |
| mut cobalt_api: CobaltSender, |
| ) -> Result<(), Error> { |
| loop { |
| match stream.try_next().await.context("failed to read emergency provider request")? { |
| Some(EmergencyProviderRequest::GetCurrent { responder }) => { |
| let start_time = Instant::now(); |
| // We don't want to hold the BSS cache lock while resolving the BSSes to a |
| // `Position`, so we copy data from the iterator into our own `Vector`. |
| let bss_list: Vec<(BssId, Bss)> = |
| bss_cache.lock().await.iter().map(|(&id, &bss)| (id, bss)).collect(); |
| match bss_resolver.resolve(bss_list).await { |
| Ok(position) => { |
| report_lookup_success_metrics( |
| &position, |
| &start_time.elapsed(), |
| &mut cobalt_api, |
| ); |
| responder |
| .send(&mut Ok(position)) |
| .context("failed to send position to caller")?; |
| } |
| Err(e) => { |
| info!("lookup failed: {:?}", e); |
| report_lookup_failure_metrics(e, &mut cobalt_api); |
| responder |
| .send(&mut Err(LocationError::GeneralError)) |
| .context("failed to send error to client")? |
| } |
| } |
| } |
| None => return Ok(()), |
| } |
| } |
| } |
| |
| async fn process_bss_updates<C: BssCache>( |
| bss_cache: &Mutex<C>, |
| mut stream: WlanBaseStationWatcherRequestStream, |
| mut cobalt_api: CobaltSender, |
| ) -> Result<(), Error> { |
| loop { |
| match stream.try_next().await.context("failed to read base station watcher request")? { |
| Some(WlanBaseStationWatcherRequest::ReportCurrentStations { |
| stations, |
| control_handle: _, |
| }) => { |
| let update_result = bss_cache |
| .lock() |
| .await |
| .update( |
| stations |
| .into_proxy() |
| .context("failed to get proxy for scan result iterator")?, |
| ) |
| .await; |
| report_bss_update_metrics(update_result, &mut cobalt_api); |
| update_result.context("failed to apply base station update")? |
| } |
| None => return Ok(()), |
| } |
| } |
| } |
| |
| fn report_lookup_success_metrics( |
| position: &Position, |
| latency: &Duration, |
| cobalt_api: &mut CobaltSender, |
| ) { |
| const METERS_TO_MILLIMETERS: f64 = 1000.0; |
| cobalt_api.log_event(GET_CURRENT_RESULT_METRIC_ID, GetCurrentResult::Success); |
| cobalt_api.log_elapsed_time( |
| GET_CURRENT_LATENCY_METRIC_ID, |
| (), |
| i64::try_from(latency.as_millis()).unwrap_or(i64::MAX), |
| ); |
| cobalt_api.log_event_count( |
| GET_CURRENT_ACCURACY_METRIC_ID, |
| (), |
| 0, |
| match position.extras.accuracy_meters { |
| Some(accuracy_meters) => (accuracy_meters * METERS_TO_MILLIMETERS) as i64, |
| None => i64::MAX, |
| }, |
| ) |
| } |
| |
| fn report_lookup_failure_metrics(error: ResolverError, cobalt_api: &mut CobaltSender) { |
| cobalt_api.log_event(GET_CURRENT_RESULT_METRIC_ID, GetCurrentResult::Failure); |
| cobalt_api.log_event( |
| GET_CURRENT_FAILURE_METRIC_ID, |
| match error { |
| ResolverError::NoBsses => GetCurrentFailure::NoBsses, |
| ResolverError::Internal => GetCurrentFailure::Internal, |
| ResolverError::Lookup => GetCurrentFailure::Lookup, |
| }, |
| ); |
| } |
| |
| fn report_bss_update_metrics(result: Result<(), UpdateError>, cobalt_api: &mut CobaltSender) { |
| cobalt_api.log_event( |
| WLAN_SENSOR_REPORT_METRIC_ID, |
| match result { |
| Ok(()) => WlanSensorReportResult::Success, |
| Err(UpdateError::NoBssIds) => WlanSensorReportResult::NoBssIds, |
| Err(UpdateError::NoBsses) => WlanSensorReportResult::NoBsses, |
| Err(UpdateError::Ipc) => WlanSensorReportResult::IpcError, |
| Err(UpdateError::Service) => WlanSensorReportResult::ServiceError, |
| }, |
| ); |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::*, |
| cobalt_client::traits::AsEventCode, |
| fidl::endpoints::create_proxy_and_stream, |
| fidl_fuchsia_cobalt::CobaltEvent, |
| fuchsia_cobalt::cobalt_event_builder::CobaltEventExt, |
| fuchsia_cobalt::CobaltSender, |
| futures::channel::mpsc, |
| futures::{future, pin_mut}, |
| matches::assert_matches, |
| std::task::Poll, |
| test_doubles::{FakeBssCache, StubBssResolver}, |
| }; |
| |
| mod base_station_watcher { |
| use { |
| super::*, emergency_lib::bss_cache::UpdateError, |
| fidl::endpoints::create_request_stream, |
| fidl_fuchsia_location_sensor::WlanBaseStationWatcherMarker, |
| fidl_fuchsia_wlan_policy::ScanResultIteratorMarker, test_case::test_case, |
| }; |
| |
| #[fasync::run_until_stalled(test)] |
| async fn propagates_stations_downward() { |
| let (cobalt_sender, _cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<WlanBaseStationWatcherMarker>() |
| .expect("internal error: failed to create base station watcher"); |
| let bss_cache = Mutex::new(FakeBssCache::new(Ok(()))); |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &StubBssResolver { resolve: || panic!("unexpected call to resolver") }, |
| IncomingRequest::WlanBaseStationWatcherRequest(stream), |
| cobalt_sender, |
| ); |
| let (scan_result_reader, _scan_result_generator) = |
| create_request_stream::<ScanResultIteratorMarker>() |
| .expect("internal error: failed to create scan result iterator"); |
| proxy |
| .report_current_stations(scan_result_reader) |
| .expect("internal error: proxy failed to send request"); |
| std::mem::drop(proxy); // Close connection so `server_fut` completes. |
| assert!(server_fut.await.is_ok()); |
| assert!(bss_cache.lock().await.was_update_called()) |
| } |
| |
| #[fasync::run_until_stalled(test)] |
| async fn update_error_does_not_panic() { |
| let (cobalt_sender, _cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<WlanBaseStationWatcherMarker>() |
| .expect("internal error: failed to create base station watcher"); |
| let bss_cache = Mutex::new(FakeBssCache::new(Err(UpdateError::NoBssIds))); |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &StubBssResolver { resolve: || panic!("unexpected call to resolver") }, |
| IncomingRequest::WlanBaseStationWatcherRequest(stream), |
| cobalt_sender, |
| ); |
| let (scan_result_reader, _scan_result_generator) = |
| create_request_stream::<ScanResultIteratorMarker>() |
| .expect("internal error: failed to create scan result iterator"); |
| proxy |
| .report_current_stations(scan_result_reader) |
| .expect("internal error: proxy failed to send request"); |
| // Close connection so `server_fut` completes, even if it chooses to |
| // ignore the error. |
| std::mem::drop(proxy); |
| |
| // The best error handling policy isn't exactly clear: is it useful |
| // to report an error upwards (which would cause `main()` to close |
| // the client connection), or is it better to leave the connection |
| // open, and hope that the client will provide more useful results |
| // next time? |
| // |
| // Rather than take a position on that question, we simply validate |
| // that the program doesn't crash when that happens. (If the program |
| // crashed, the test framework would report a test failure with |
| // the panic message.) |
| let _ = server_fut.await; |
| } |
| |
| #[test_case(Ok(()), WlanSensorReportResult::Success; "success")] |
| #[test_case(Err(UpdateError::NoBssIds), WlanSensorReportResult::NoBssIds; "no bss ids")] |
| #[test_case(Err(UpdateError::NoBsses), WlanSensorReportResult::NoBsses; "no bsses")] |
| #[test_case(Err(UpdateError::Ipc), WlanSensorReportResult::IpcError; "ipc error")] |
| #[test_case(Err(UpdateError::Service), WlanSensorReportResult::ServiceError; |
| "service error")] |
| fn reports_update_result_to_cobalt( |
| update_result: Result<(), UpdateError>, |
| cobalt_event: WlanSensorReportResult, |
| ) { |
| let test_fut = async { |
| let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<WlanBaseStationWatcherMarker>() |
| .expect("internal error: failed to create emergency provider"); |
| let bss_cache = Mutex::new(FakeBssCache::new(update_result)); |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &StubBssResolver { resolve: || panic!("unexpected call to resolver") }, |
| IncomingRequest::WlanBaseStationWatcherRequest(stream), |
| cobalt_sender, |
| ); |
| let (scan_result_reader, _scan_result_generator) = |
| create_request_stream::<ScanResultIteratorMarker>() |
| .expect("internal error: failed to create scan result iterator"); |
| proxy |
| .report_current_stations(scan_result_reader) |
| .expect("internal error: proxy failed to send request"); |
| std::mem::drop(proxy); // Close connection so `server_fut` completes. |
| |
| let _ = server_fut.await; |
| assert_eq!( |
| cobalt_receiver |
| .filter(|event| { |
| future::ready(event.metric_id == WLAN_SENSOR_REPORT_METRIC_ID) |
| }) |
| .collect::<Vec<CobaltEvent>>() |
| .await, |
| vec![CobaltEvent::builder(WLAN_SENSOR_REPORT_METRIC_ID) |
| .with_event_code(cobalt_event.as_event_code()) |
| .as_event()] |
| ); |
| }; |
| pin_mut!(test_fut); |
| assert_matches!( |
| fasync::Executor::new() |
| .expect("internal error: failed to create executor") |
| .run_until_stalled(&mut test_fut), |
| Poll::Ready(_) |
| ); |
| } |
| |
| #[fasync::run_until_stalled(test)] |
| async fn does_not_report_extra_metrics() { |
| let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<WlanBaseStationWatcherMarker>() |
| .expect("internal error: failed to create emergency provider"); |
| let bss_cache = Mutex::new(FakeBssCache::new(Ok(()))); |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &StubBssResolver { resolve: || panic!("unexpected call to resolver") }, |
| IncomingRequest::WlanBaseStationWatcherRequest(stream), |
| cobalt_sender, |
| ); |
| let (scan_result_reader, _scan_result_generator) = |
| create_request_stream::<ScanResultIteratorMarker>() |
| .expect("internal error: failed to create scan result iterator"); |
| proxy |
| .report_current_stations(scan_result_reader) |
| .expect("internal error: proxy failed to send request"); |
| std::mem::drop(proxy); // Close connection so `server_fut` completes. |
| |
| let _ = server_fut.await; |
| assert_eq!( |
| cobalt_receiver |
| .filter(|event| future::ready(event.metric_id != WLAN_SENSOR_REPORT_METRIC_ID)) |
| .collect::<Vec<CobaltEvent>>() |
| .await, |
| vec![] |
| ); |
| } |
| } |
| |
| mod emergency_provider { |
| use { |
| super::*, |
| emergency_lib::bss_resolver::ResolverError, |
| fidl_fuchsia_cobalt::EventPayload::ElapsedMicros, |
| fidl_fuchsia_location_position::{EmergencyProviderMarker, Position, PositionExtras}, |
| test_case::test_case, |
| }; |
| |
| const POSITION_WITH_UNKNOWN_ACCURACY: Position = Position { |
| latitude: 1.0, |
| longitude: -1.0, |
| extras: PositionExtras { |
| accuracy_meters: None, |
| altitude_meters: None, |
| ..PositionExtras::EMPTY |
| }, |
| }; |
| |
| const POSITION_WITH_KNOWN_ACCURACY: Position = Position { |
| latitude: 1.0, |
| longitude: -1.0, |
| extras: PositionExtras { |
| accuracy_meters: Some(1.0), |
| altitude_meters: None, |
| ..PositionExtras::EMPTY |
| }, |
| }; |
| |
| #[fasync::run_until_stalled(test)] |
| async fn propagates_success_to_client() { |
| let (cobalt_sender, _cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>() |
| .expect("internal error: failed to create emergency provider"); |
| let bss_cache = Mutex::new(FakeBssCache::new(Ok(()))); |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &StubBssResolver { resolve: || Ok(POSITION_WITH_UNKNOWN_ACCURACY) }, |
| IncomingRequest::EmergencyProviderRequest(stream), |
| cobalt_sender, |
| ); |
| let client_fut = proxy.get_current(); |
| std::mem::drop(proxy); // Close connection so `server_fut` completes. |
| |
| let (client_res, _server_res) = future::join(client_fut, server_fut).await; |
| assert_matches!(client_res, Ok(Ok(Position {..}))) |
| } |
| |
| #[fasync::run_until_stalled(test)] |
| async fn propagates_error_to_client() { |
| let (cobalt_sender, _cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>() |
| .expect("internal error: failed to create emergency provider"); |
| let bss_cache = Mutex::new(FakeBssCache::new(Ok(()))); |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &StubBssResolver { resolve: || Err(ResolverError::NoBsses) }, |
| IncomingRequest::EmergencyProviderRequest(stream), |
| cobalt_sender, |
| ); |
| let client_fut = proxy.get_current(); |
| std::mem::drop(proxy); // Close connection so `server_fut` completes. |
| |
| let (client_res, _server_res) = future::join(client_fut, server_fut).await; |
| assert_matches!(client_res, Ok(Err(_))) // The `Ok` is the FIDL-level result. |
| } |
| |
| #[fasync::run_until_stalled(test)] |
| async fn reports_success_to_cobalt() { |
| let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>() |
| .expect("internal error: failed to create emergency provider"); |
| let bss_cache = Mutex::new(FakeBssCache::new(Ok(()))); |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &StubBssResolver { resolve: || Ok(POSITION_WITH_UNKNOWN_ACCURACY) }, |
| IncomingRequest::EmergencyProviderRequest(stream), |
| cobalt_sender, |
| ); |
| let _ = proxy.get_current(); |
| std::mem::drop(proxy); // Close connection so `server_fut` completes. |
| |
| let _ = server_fut.await; |
| assert_eq!( |
| cobalt_receiver |
| .filter(|event| future::ready(event.metric_id == GET_CURRENT_RESULT_METRIC_ID)) |
| .collect::<Vec<CobaltEvent>>() |
| .await, |
| vec![CobaltEvent::builder(GET_CURRENT_RESULT_METRIC_ID) |
| .with_event_code(GetCurrentResult::Success.as_event_code()) |
| .as_event()] |
| ); |
| } |
| |
| #[fasync::run_until_stalled(test)] |
| async fn reports_failure_to_cobalt() { |
| let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>() |
| .expect("internal error: failed to create emergency provider"); |
| let bss_cache = Mutex::new(FakeBssCache::new(Ok(()))); |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &StubBssResolver { resolve: || Err(ResolverError::NoBsses) }, |
| IncomingRequest::EmergencyProviderRequest(stream), |
| cobalt_sender, |
| ); |
| let _ = proxy.get_current(); |
| std::mem::drop(proxy); // Close connection so `server_fut` completes. |
| |
| let _ = server_fut.await; |
| assert_eq!( |
| cobalt_receiver |
| .filter(|event| future::ready(event.metric_id == GET_CURRENT_RESULT_METRIC_ID)) |
| .collect::<Vec<CobaltEvent>>() |
| .await, |
| vec![CobaltEvent::builder(GET_CURRENT_RESULT_METRIC_ID) |
| .with_event_code(GetCurrentResult::Failure.as_event_code()) |
| .as_event()] |
| ); |
| } |
| |
| #[fasync::run_until_stalled(test)] |
| async fn propagates_reported_accuracy_to_cobalt() { |
| let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>() |
| .expect("internal error: failed to create emergency provider"); |
| let bss_cache = Mutex::new(FakeBssCache::new(Ok(()))); |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &StubBssResolver { resolve: || Ok(POSITION_WITH_KNOWN_ACCURACY) }, |
| IncomingRequest::EmergencyProviderRequest(stream), |
| cobalt_sender, |
| ); |
| let _ = proxy.get_current(); |
| std::mem::drop(proxy); // Close connection so `server_fut` completes. |
| |
| let _ = server_fut.await; |
| assert_eq!( |
| cobalt_receiver |
| .filter(|event| future::ready( |
| event.metric_id == GET_CURRENT_ACCURACY_METRIC_ID |
| )) |
| .collect::<Vec<CobaltEvent>>() |
| .await, |
| vec![CobaltEvent::builder(GET_CURRENT_ACCURACY_METRIC_ID).as_count_event( |
| 0, |
| 1000 // In millimeters, for higher precision |
| )] |
| ); |
| } |
| |
| #[fasync::run_until_stalled(test)] |
| async fn reports_worst_accuracy_if_accuracy_is_unknown() { |
| let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>() |
| .expect("internal error: failed to create emergency provider"); |
| let bss_cache = Mutex::new(FakeBssCache::new(Ok(()))); |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &StubBssResolver { resolve: || Ok(POSITION_WITH_UNKNOWN_ACCURACY) }, |
| IncomingRequest::EmergencyProviderRequest(stream), |
| cobalt_sender, |
| ); |
| let _ = proxy.get_current(); |
| std::mem::drop(proxy); // Close connection so `server_fut` completes. |
| |
| let _ = server_fut.await; |
| assert_eq!( |
| cobalt_receiver |
| .filter(|event| future::ready( |
| event.metric_id == GET_CURRENT_ACCURACY_METRIC_ID |
| )) |
| .collect::<Vec<CobaltEvent>>() |
| .await, |
| vec![CobaltEvent::builder(GET_CURRENT_ACCURACY_METRIC_ID) |
| .as_count_event(0, i64::MAX)] |
| ); |
| } |
| |
| #[fasync::run_until_stalled(test)] |
| async fn reports_elapsed_time_to_cobalt_on_success() { |
| let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>() |
| .expect("internal error: failed to create emergency provider"); |
| let bss_cache = Mutex::new(FakeBssCache::new(Ok(()))); |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &StubBssResolver { resolve: || Ok(POSITION_WITH_UNKNOWN_ACCURACY) }, |
| IncomingRequest::EmergencyProviderRequest(stream), |
| cobalt_sender, |
| ); |
| let _ = proxy.get_current(); |
| std::mem::drop(proxy); // Close connection so `server_fut` completes. |
| |
| let _ = server_fut.await; |
| let latency_reports = cobalt_receiver |
| .filter(|event| future::ready(event.metric_id == GET_CURRENT_LATENCY_METRIC_ID)) |
| .collect::<Vec<CobaltEvent>>() |
| .await; |
| assert_eq!(latency_reports.len(), 1); |
| assert_matches!( |
| latency_reports[0], |
| CobaltEvent { |
| metric_id: GET_CURRENT_LATENCY_METRIC_ID, |
| payload: ElapsedMicros(_), |
| .. |
| } |
| ); |
| } |
| |
| #[test_case(ResolverError::NoBsses, GetCurrentFailure::NoBsses; "no bsses")] |
| #[test_case(ResolverError::Internal, GetCurrentFailure::Internal; "internal")] |
| #[test_case(ResolverError::Lookup, GetCurrentFailure::Lookup; "lookup")] |
| fn reports_resolver_error_to_cobalt( |
| resolver_error: ResolverError, |
| cobalt_error: GetCurrentFailure, |
| ) { |
| let test_fut = async { |
| let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>() |
| .expect("internal error: failed to create emergency provider"); |
| let bss_cache = Mutex::new(FakeBssCache::new(Ok(()))); |
| let resolver = StubBssResolver { resolve: || Err(resolver_error) }; |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &resolver, |
| IncomingRequest::EmergencyProviderRequest(stream), |
| cobalt_sender, |
| ); |
| let _ = proxy.get_current(); |
| std::mem::drop(proxy); // Close connection so `server_fut` completes. |
| |
| let _ = server_fut.await; |
| assert_eq!( |
| cobalt_receiver |
| .filter(|event| { |
| future::ready(event.metric_id == GET_CURRENT_FAILURE_METRIC_ID) |
| }) |
| .collect::<Vec<CobaltEvent>>() |
| .await, |
| vec![CobaltEvent::builder(GET_CURRENT_FAILURE_METRIC_ID) |
| .with_event_code(cobalt_error.as_event_code()) |
| .as_event()] |
| ); |
| }; |
| pin_mut!(test_fut); |
| assert_matches!( |
| fasync::Executor::new() |
| .expect("internal error: failed to create executor") |
| .run_until_stalled(&mut test_fut), |
| Poll::Ready(_) |
| ); |
| } |
| |
| #[test_case( |
| Ok(()), |
| &[GET_CURRENT_RESULT_METRIC_ID, GET_CURRENT_ACCURACY_METRIC_ID, |
| GET_CURRENT_LATENCY_METRIC_ID]; |
| "on_success")] |
| #[test_case( |
| Err(()), |
| &[GET_CURRENT_RESULT_METRIC_ID, GET_CURRENT_FAILURE_METRIC_ID]; |
| "on_failure")] |
| fn does_not_report_extra_metrics(result: Result<(), ()>, expected_metric_ids: &[u32]) { |
| let test_fut = async { |
| let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection(); |
| let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>() |
| .expect("internal error: failed to create emergency provider"); |
| let bss_cache = Mutex::new(FakeBssCache::new(Ok(()))); |
| let resolver = StubBssResolver { |
| resolve: || match &result { |
| Ok(()) => Ok(POSITION_WITH_KNOWN_ACCURACY), |
| Err(()) => Err(ResolverError::NoBsses), |
| }, |
| }; |
| let server_fut = handle_client_requests( |
| &bss_cache, |
| &resolver, |
| IncomingRequest::EmergencyProviderRequest(stream), |
| cobalt_sender, |
| ); |
| let _ = proxy.get_current(); |
| std::mem::drop(proxy); // Close connection so `server_fut` completes. |
| |
| let _ = server_fut.await; |
| assert_eq!( |
| cobalt_receiver |
| .filter(|event| future::ready( |
| expected_metric_ids.iter().find(|&&i| i == event.metric_id).is_none() |
| )) |
| .collect::<Vec<CobaltEvent>>() |
| .await, |
| vec![] |
| ); |
| }; |
| pin_mut!(test_fut); |
| assert_matches!( |
| fasync::Executor::new() |
| .expect("internal error: failed to create executor") |
| .run_until_stalled(&mut test_fut), |
| Poll::Ready(_) |
| ); |
| } |
| } |
| |
| fn make_fake_cobalt_connection() -> (CobaltSender, mpsc::Receiver<CobaltEvent>) { |
| const MAX_METRICS_PER_QUERY: usize = 3; |
| const MAX_QUERIES: usize = 1; |
| let (sender, receiver) = mpsc::channel(MAX_METRICS_PER_QUERY * MAX_QUERIES); |
| (CobaltSender::new(sender), receiver) |
| } |
| } |
| |
| #[cfg(test)] |
| mod test_doubles { |
| use { |
| super::*, |
| async_trait::async_trait, |
| emergency_lib::{bss_cache::UpdateError, bss_resolver::ResolverError}, |
| fidl_fuchsia_location_position::Position, |
| fidl_fuchsia_wlan_policy::ScanResultIteratorProxyInterface, |
| }; |
| |
| pub(super) struct FakeBssCache { |
| update_result: Result<(), UpdateError>, |
| bsses: Vec<(BssId, Bss)>, |
| was_update_called: bool, |
| } |
| |
| pub(super) struct StubBssResolver<R: Fn() -> Result<Position, ResolverError>> { |
| // Note, we can't just store a value here, because `Position` is not Copy. |
| pub resolve: R, |
| } |
| |
| impl FakeBssCache { |
| pub fn new(update_result: Result<(), UpdateError>) -> Self { |
| Self { update_result, bsses: Vec::new(), was_update_called: false } |
| } |
| |
| pub fn was_update_called(&self) -> bool { |
| self.was_update_called |
| } |
| } |
| |
| #[async_trait(?Send)] |
| impl BssCache for FakeBssCache { |
| async fn update<I: ScanResultIteratorProxyInterface>( |
| &mut self, |
| _new_bsses: I, |
| ) -> Result<(), UpdateError> { |
| self.was_update_called = true; |
| self.update_result.clone() |
| } |
| |
| fn iter(&self) -> Box<dyn Iterator<Item = (&'_ BssId, &'_ Bss)> + '_> { |
| Box::new(self.bsses.iter().map(|(id, bss)| (id, bss))) |
| } |
| } |
| |
| #[async_trait(?Send)] |
| impl<R> BssResolver for StubBssResolver<R> |
| where |
| R: Fn() -> Result<Position, ResolverError>, |
| { |
| async fn resolve<'a, I, T, U>(&self, _bss_list: I) -> Result<Position, ResolverError> { |
| (self.resolve)() |
| } |
| } |
| } |