blob: 67bc3027a82a8ea00c50e1a98e7d59734165a0ad [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{format_err, Context, Error};
use emergency_lib::bss_cache::{Bss, BssCache, BssId, RealBssCache, UpdateError};
use emergency_lib::bss_resolver::{BssResolver, RealBssResolver, ResolverError};
use emergency_metrics_registry::{
self as metrics, EmergencyGetCurrentFailureMigratedMetricDimensionCause as GetCurrentFailure,
EmergencyGetCurrentResultMigratedMetricDimensionResult as GetCurrentResult,
WlanSensorReportMigratedMetricDimensionResult as WlanSensorReportResult,
EMERGENCY_GET_CURRENT_ACCURACY_MIGRATED_METRIC_ID as GET_CURRENT_ACCURACY_METRIC_ID,
EMERGENCY_GET_CURRENT_FAILURE_MIGRATED_METRIC_ID as GET_CURRENT_FAILURE_METRIC_ID,
EMERGENCY_GET_CURRENT_LATENCY_MIGRATED_METRIC_ID as GET_CURRENT_LATENCY_METRIC_ID,
EMERGENCY_GET_CURRENT_RESULT_MIGRATED_METRIC_ID as GET_CURRENT_RESULT_METRIC_ID,
WLAN_SENSOR_REPORT_MIGRATED_METRIC_ID,
};
use fidl_contrib::protocol_connector::{ConnectedProtocol, ProtocolSender};
use fidl_contrib::ProtocolConnector;
use fidl_fuchsia_location::Error as LocationError;
use fidl_fuchsia_location_position::{
EmergencyProviderRequest, EmergencyProviderRequestStream, Position,
};
use fidl_fuchsia_location_sensor::{
WlanBaseStationWatcherRequest, WlanBaseStationWatcherRequestStream,
};
use fidl_fuchsia_metrics::{
MetricEvent, MetricEventLoggerFactoryMarker, MetricEventLoggerProxy, ProjectSpec,
};
use fidl_fuchsia_net_http::LoaderMarker as HttpLoaderMarker;
use fuchsia_async as fasync;
use fuchsia_cobalt_builders::MetricEventExt;
use fuchsia_component::client::connect_to_protocol;
use fuchsia_component::server::ServiceFs;
use futures::lock::Mutex;
use futures::prelude::*;
use std::time::{Duration, Instant};
use tracing::info;
const CONCURRENCY_LIMIT: Option<usize> = None;
const API_KEY_FILE: &str = "/config/data/google_maps_api_key.txt";
/// Wraps all hosted protocols into a single type that can be matched against
/// and dispatched.
enum IncomingRequest {
EmergencyProviderRequest(EmergencyProviderRequestStream),
WlanBaseStationWatcherRequest(WlanBaseStationWatcherRequestStream),
}
struct CobaltConnectedService;
impl ConnectedProtocol for CobaltConnectedService {
type Protocol = MetricEventLoggerProxy;
type ConnectError = Error;
type Message = MetricEvent;
type SendError = Error;
fn get_protocol<'a>(
&'a mut self,
) -> future::BoxFuture<'a, Result<MetricEventLoggerProxy, Error>> {
async {
let (logger_proxy, server_end) =
fidl::endpoints::create_proxy().context("failed to create proxy endpoints")?;
let metric_event_logger_factory =
connect_to_protocol::<MetricEventLoggerFactoryMarker>()
.context("Failed to connect to fuchsia::metrics::MetricEventLoggerFactory")?;
metric_event_logger_factory
.create_metric_event_logger(
&ProjectSpec { project_id: Some(metrics::PROJECT_ID), ..Default::default() },
server_end,
)
.await?
.map_err(|e| format_err!("Connection to MetricEventLogger refused {e:?}"))?;
Ok(logger_proxy)
}
.boxed()
}
fn send_message<'a>(
&'a mut self,
protocol: &'a MetricEventLoggerProxy,
msg: MetricEvent,
) -> future::BoxFuture<'a, Result<(), Error>> {
async move {
let fut = protocol.log_metric_events(&[msg]);
fut.await?.map_err(|e| format_err!("Failed to log metric {e:?}"))?;
Ok(())
}
.boxed()
}
}
#[fuchsia::main]
async fn main() -> Result<(), anyhow::Error> {
let (cobalt_api, cobalt_fut) =
ProtocolConnector::new(CobaltConnectedService).serve_and_log_errors();
let _cobalt_task = fasync::Task::spawn(cobalt_fut);
let bss_cache = Mutex::new(RealBssCache::new());
let bss_resolver = RealBssResolver::new(
connect_to_protocol::<HttpLoaderMarker>().context("failed to connect to http loader")?,
std::fs::read_to_string(API_KEY_FILE)
.with_context(|| format!("failed to read {}", API_KEY_FILE))?,
);
let mut service_fs = ServiceFs::new_local();
service_fs
.dir("svc")
.add_fidl_service(IncomingRequest::EmergencyProviderRequest)
.add_fidl_service(IncomingRequest::WlanBaseStationWatcherRequest);
service_fs
.take_and_serve_directory_handle()
.context("failed to serve outgoing namespace")?
.for_each_concurrent(CONCURRENCY_LIMIT, |connection| {
handle_client_requests(&bss_cache, &bss_resolver, connection, cobalt_api.clone())
.unwrap_or_else(|e| info!("connection terminated: {:?}", e))
})
.await;
Ok(())
}
async fn handle_client_requests<C: BssCache, R: BssResolver>(
bss_cache: &Mutex<C>,
bss_resolver: &R,
protocol: IncomingRequest,
cobalt_api: ProtocolSender<MetricEvent>,
) -> Result<(), Error> {
match protocol {
IncomingRequest::EmergencyProviderRequest(client) => {
process_location_queries(&bss_cache, bss_resolver, client, cobalt_api).await
}
IncomingRequest::WlanBaseStationWatcherRequest(client) => {
process_bss_updates(&bss_cache, client, cobalt_api).await
}
}
}
async fn process_location_queries<C: BssCache, R: BssResolver>(
bss_cache: &Mutex<C>,
bss_resolver: &R,
mut stream: EmergencyProviderRequestStream,
mut cobalt_api: ProtocolSender<MetricEvent>,
) -> Result<(), Error> {
loop {
match stream.try_next().await.context("failed to read emergency provider request")? {
Some(EmergencyProviderRequest::GetCurrent { responder }) => {
let start_time = Instant::now();
// We don't want to hold the BSS cache lock while resolving the BSSes to a
// `Position`, so we copy data from the iterator into our own `Vector`.
let bss_list: Vec<(BssId, Bss)> =
bss_cache.lock().await.iter().map(|(&id, &bss)| (id, bss)).collect();
match bss_resolver.resolve(bss_list).await {
Ok(position) => {
report_lookup_success_metrics(
&position,
&start_time.elapsed(),
&mut cobalt_api,
);
responder
.send(Ok(&position))
.context("failed to send position to caller")?;
}
Err(e) => {
info!("lookup failed: {:?}", e);
report_lookup_failure_metrics(e, &mut cobalt_api);
responder
.send(Err(LocationError::GeneralError))
.context("failed to send error to client")?
}
}
}
None => return Ok(()),
}
}
}
async fn process_bss_updates<C: BssCache>(
bss_cache: &Mutex<C>,
mut stream: WlanBaseStationWatcherRequestStream,
mut cobalt_api: ProtocolSender<MetricEvent>,
) -> Result<(), Error> {
loop {
match stream.try_next().await.context("failed to read base station watcher request")? {
Some(WlanBaseStationWatcherRequest::ReportCurrentStations {
stations,
control_handle: _,
}) => {
let update_result = bss_cache
.lock()
.await
.update(
stations
.into_proxy()
.context("failed to get proxy for scan result iterator")?,
)
.await;
report_bss_update_metrics(update_result, &mut cobalt_api);
update_result.context("failed to apply base station update")?
}
None => return Ok(()),
}
}
}
fn report_lookup_success_metrics(
position: &Position,
latency: &Duration,
cobalt_api: &mut ProtocolSender<MetricEvent>,
) {
const METERS_TO_MILLIMETERS: f64 = 1000.0;
cobalt_api.send(
MetricEvent::builder(GET_CURRENT_RESULT_METRIC_ID)
.with_event_codes(GetCurrentResult::Success)
.as_occurrence(1),
);
cobalt_api.send(
MetricEvent::builder(GET_CURRENT_LATENCY_METRIC_ID)
.as_integer(i64::try_from(latency.as_millis()).unwrap_or(i64::MAX)),
);
cobalt_api.send(MetricEvent::builder(GET_CURRENT_ACCURACY_METRIC_ID).as_integer(
match position.extras.accuracy_meters {
Some(accuracy_meters) => (accuracy_meters * METERS_TO_MILLIMETERS) as i64,
None => i64::MAX,
},
));
}
fn report_lookup_failure_metrics(
error: ResolverError,
cobalt_api: &mut ProtocolSender<MetricEvent>,
) {
cobalt_api.send(
MetricEvent::builder(GET_CURRENT_RESULT_METRIC_ID)
.with_event_codes(GetCurrentResult::Failure)
.as_occurrence(1),
);
cobalt_api.send(
MetricEvent::builder(GET_CURRENT_FAILURE_METRIC_ID)
.with_event_codes(match error {
ResolverError::NoBsses => GetCurrentFailure::NoBsses,
ResolverError::Internal => GetCurrentFailure::Internal,
ResolverError::Lookup => GetCurrentFailure::Lookup,
})
.as_occurrence(1),
);
}
fn report_bss_update_metrics(
result: Result<(), UpdateError>,
cobalt_api: &mut ProtocolSender<MetricEvent>,
) {
cobalt_api.send(
MetricEvent::builder(WLAN_SENSOR_REPORT_MIGRATED_METRIC_ID)
.with_event_codes(match result {
Ok(()) => WlanSensorReportResult::Success,
Err(UpdateError::NoBssIds) => WlanSensorReportResult::NoBssIds,
Err(UpdateError::NoBsses) => WlanSensorReportResult::NoBsses,
Err(UpdateError::Ipc) => WlanSensorReportResult::IpcError,
Err(UpdateError::Service) => WlanSensorReportResult::ServiceError,
})
.as_occurrence(1),
);
}
#[cfg(test)]
mod tests {
use super::*;
use fidl::endpoints::create_proxy_and_stream;
use futures::channel::mpsc;
use futures::future;
use std::pin::pin;
use std::task::Poll;
use test_doubles::{FakeBssCache, StubBssResolver};
mod base_station_watcher {
use super::*;
use emergency_lib::bss_cache::UpdateError;
use fidl::endpoints::create_request_stream;
use fidl_fuchsia_location_sensor::WlanBaseStationWatcherMarker;
use fidl_fuchsia_wlan_policy::ScanResultIteratorMarker;
use test_case::test_case;
#[fasync::run_until_stalled(test)]
async fn propagates_stations_downward() {
let (cobalt_sender, _cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<WlanBaseStationWatcherMarker>()
.expect("internal error: failed to create base station watcher");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || panic!("unexpected call to resolver") },
IncomingRequest::WlanBaseStationWatcherRequest(stream),
cobalt_sender,
);
let (scan_result_reader, _scan_result_generator) =
create_request_stream::<ScanResultIteratorMarker>()
.expect("internal error: failed to create scan result iterator");
proxy
.report_current_stations(scan_result_reader)
.expect("internal error: proxy failed to send request");
std::mem::drop(proxy); // Close connection so `server_fut` completes.
assert!(server_fut.await.is_ok());
assert!(bss_cache.lock().await.was_update_called())
}
#[fasync::run_until_stalled(test)]
async fn update_error_does_not_panic() {
let (cobalt_sender, _cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<WlanBaseStationWatcherMarker>()
.expect("internal error: failed to create base station watcher");
let bss_cache = Mutex::new(FakeBssCache::new(Err(UpdateError::NoBssIds)));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || panic!("unexpected call to resolver") },
IncomingRequest::WlanBaseStationWatcherRequest(stream),
cobalt_sender,
);
let (scan_result_reader, _scan_result_generator) =
create_request_stream::<ScanResultIteratorMarker>()
.expect("internal error: failed to create scan result iterator");
proxy
.report_current_stations(scan_result_reader)
.expect("internal error: proxy failed to send request");
// Close connection so `server_fut` completes, even if it chooses to
// ignore the error.
std::mem::drop(proxy);
// The best error handling policy isn't exactly clear: is it useful
// to report an error upwards (which would cause `main()` to close
// the client connection), or is it better to leave the connection
// open, and hope that the client will provide more useful results
// next time?
//
// Rather than take a position on that question, we simply validate
// that the program doesn't crash when that happens. (If the program
// crashed, the test framework would report a test failure with
// the panic message.)
let _ = server_fut.await;
}
#[test_case(Ok(()), WlanSensorReportResult::Success; "success")]
#[test_case(Err(UpdateError::NoBssIds), WlanSensorReportResult::NoBssIds; "no bss ids")]
#[test_case(Err(UpdateError::NoBsses), WlanSensorReportResult::NoBsses; "no bsses")]
#[test_case(Err(UpdateError::Ipc), WlanSensorReportResult::IpcError; "ipc error")]
#[test_case(Err(UpdateError::Service), WlanSensorReportResult::ServiceError;
"service error")]
fn reports_update_result_to_cobalt(
update_result: Result<(), UpdateError>,
cobalt_event: WlanSensorReportResult,
) {
let test_fut = async {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<WlanBaseStationWatcherMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(update_result));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || panic!("unexpected call to resolver") },
IncomingRequest::WlanBaseStationWatcherRequest(stream),
cobalt_sender,
);
let (scan_result_reader, _scan_result_generator) =
create_request_stream::<ScanResultIteratorMarker>()
.expect("internal error: failed to create scan result iterator");
proxy
.report_current_stations(scan_result_reader)
.expect("internal error: proxy failed to send request");
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| {
future::ready(event.metric_id == WLAN_SENSOR_REPORT_MIGRATED_METRIC_ID)
})
.collect::<Vec<MetricEvent>>()
.await,
vec![MetricEvent::builder(WLAN_SENSOR_REPORT_MIGRATED_METRIC_ID)
.with_event_codes(cobalt_event)
.as_occurrence(1)]
);
};
let mut test_fut = pin!(test_fut);
assert_matches::assert_matches!(
fasync::TestExecutor::new().run_until_stalled(&mut test_fut),
Poll::Ready(_)
);
}
#[fasync::run_until_stalled(test)]
async fn does_not_report_extra_metrics() {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<WlanBaseStationWatcherMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || panic!("unexpected call to resolver") },
IncomingRequest::WlanBaseStationWatcherRequest(stream),
cobalt_sender,
);
let (scan_result_reader, _scan_result_generator) =
create_request_stream::<ScanResultIteratorMarker>()
.expect("internal error: failed to create scan result iterator");
proxy
.report_current_stations(scan_result_reader)
.expect("internal error: proxy failed to send request");
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| future::ready(
event.metric_id != WLAN_SENSOR_REPORT_MIGRATED_METRIC_ID
))
.collect::<Vec<MetricEvent>>()
.await,
vec![]
);
}
}
mod emergency_provider {
use super::*;
use assert_matches::assert_matches;
use emergency_lib::bss_resolver::ResolverError;
use fidl_fuchsia_location_position::{EmergencyProviderMarker, PositionExtras};
use fidl_fuchsia_metrics::MetricEventPayload;
use test_case::test_case;
fn position_with_unknown_accuracy() -> Position {
Position {
latitude: 1.0,
longitude: -1.0,
extras: PositionExtras {
accuracy_meters: None,
altitude_meters: None,
..Default::default()
},
}
}
fn position_with_known_accuracy() -> Position {
Position {
latitude: 1.0,
longitude: -1.0,
extras: PositionExtras {
accuracy_meters: Some(1.0),
altitude_meters: None,
..Default::default()
},
}
}
#[fasync::run_until_stalled(test)]
async fn propagates_success_to_client() {
let (cobalt_sender, _cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Ok(position_with_unknown_accuracy()) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let client_fut = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let (client_res, _server_res) = future::join(client_fut, server_fut).await;
assert_matches!(client_res, Ok(Ok(Position { .. })))
}
#[fasync::run_until_stalled(test)]
async fn propagates_error_to_client() {
let (cobalt_sender, _cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Err(ResolverError::NoBsses) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let client_fut = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let (client_res, _server_res) = future::join(client_fut, server_fut).await;
assert_matches!(client_res, Ok(Err(_))) // The `Ok` is the FIDL-level result.
}
#[fasync::run_until_stalled(test)]
async fn reports_success_to_cobalt() {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Ok(position_with_unknown_accuracy()) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| future::ready(event.metric_id == GET_CURRENT_RESULT_METRIC_ID))
.collect::<Vec<MetricEvent>>()
.await,
vec![MetricEvent::builder(GET_CURRENT_RESULT_METRIC_ID)
.with_event_codes(GetCurrentResult::Success)
.as_occurrence(1)]
);
}
#[fasync::run_until_stalled(test)]
async fn reports_failure_to_cobalt() {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Err(ResolverError::NoBsses) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| future::ready(event.metric_id == GET_CURRENT_RESULT_METRIC_ID))
.collect::<Vec<MetricEvent>>()
.await,
vec![MetricEvent::builder(GET_CURRENT_RESULT_METRIC_ID)
.with_event_codes(GetCurrentResult::Failure)
.as_occurrence(1)]
);
}
#[fasync::run_until_stalled(test)]
async fn propagates_reported_accuracy_to_cobalt() {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Ok(position_with_known_accuracy()) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| future::ready(
event.metric_id == GET_CURRENT_ACCURACY_METRIC_ID
))
.collect::<Vec<MetricEvent>>()
.await,
vec![MetricEvent::builder(GET_CURRENT_ACCURACY_METRIC_ID).as_integer(
1000 // In millimeters, for higher precision
)]
);
}
#[fasync::run_until_stalled(test)]
async fn reports_worst_accuracy_if_accuracy_is_unknown() {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Ok(position_with_unknown_accuracy()) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| future::ready(
event.metric_id == GET_CURRENT_ACCURACY_METRIC_ID
))
.collect::<Vec<MetricEvent>>()
.await,
vec![MetricEvent::builder(GET_CURRENT_ACCURACY_METRIC_ID).as_integer(i64::MAX)]
);
}
#[fasync::run_until_stalled(test)]
async fn reports_elapsed_time_to_cobalt_on_success() {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Ok(position_with_unknown_accuracy()) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
let latency_reports = cobalt_receiver
.filter(|event| future::ready(event.metric_id == GET_CURRENT_LATENCY_METRIC_ID))
.collect::<Vec<MetricEvent>>()
.await;
assert_eq!(latency_reports.len(), 1);
assert_matches!(
latency_reports[0],
MetricEvent {
metric_id: GET_CURRENT_LATENCY_METRIC_ID,
payload: MetricEventPayload::IntegerValue(_),
..
}
);
}
#[test_case(ResolverError::NoBsses, GetCurrentFailure::NoBsses; "no bsses")]
#[test_case(ResolverError::Internal, GetCurrentFailure::Internal; "internal")]
#[test_case(ResolverError::Lookup, GetCurrentFailure::Lookup; "lookup")]
fn reports_resolver_error_to_cobalt(
resolver_error: ResolverError,
cobalt_error: GetCurrentFailure,
) {
let test_fut = async {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let resolver = StubBssResolver { resolve: || Err(resolver_error) };
let server_fut = handle_client_requests(
&bss_cache,
&resolver,
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| {
future::ready(event.metric_id == GET_CURRENT_FAILURE_METRIC_ID)
})
.collect::<Vec<MetricEvent>>()
.await,
vec![MetricEvent::builder(GET_CURRENT_FAILURE_METRIC_ID)
.with_event_codes(cobalt_error)
.as_occurrence(1)]
);
};
let mut test_fut = pin!(test_fut);
assert_matches::assert_matches!(
fasync::TestExecutor::new().run_until_stalled(&mut test_fut),
Poll::Ready(_)
);
}
#[test_case(
Ok(()),
&[GET_CURRENT_RESULT_METRIC_ID, GET_CURRENT_ACCURACY_METRIC_ID,
GET_CURRENT_LATENCY_METRIC_ID];
"on_success")]
#[test_case(
Err(()),
&[GET_CURRENT_RESULT_METRIC_ID, GET_CURRENT_FAILURE_METRIC_ID];
"on_failure")]
fn does_not_report_extra_metrics(result: Result<(), ()>, expected_metric_ids: &[u32]) {
let test_fut = async {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let resolver = StubBssResolver {
resolve: || match &result {
Ok(()) => Ok(position_with_known_accuracy()),
Err(()) => Err(ResolverError::NoBsses),
},
};
let server_fut = handle_client_requests(
&bss_cache,
&resolver,
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| future::ready(
expected_metric_ids.iter().find(|&&i| i == event.metric_id).is_none()
))
.collect::<Vec<MetricEvent>>()
.await,
vec![]
);
};
let mut test_fut = pin!(test_fut);
assert_matches::assert_matches!(
fasync::TestExecutor::new().run_until_stalled(&mut test_fut),
Poll::Ready(_)
);
}
}
fn make_fake_cobalt_connection() -> (ProtocolSender<MetricEvent>, mpsc::Receiver<MetricEvent>) {
const MAX_METRICS_PER_QUERY: usize = 3;
const MAX_QUERIES: usize = 1;
let (sender, receiver) = mpsc::channel(MAX_METRICS_PER_QUERY * MAX_QUERIES);
(ProtocolSender::<MetricEvent>::new(sender), receiver)
}
}
#[cfg(test)]
mod test_doubles {
use super::*;
use async_trait::async_trait;
use fidl_fuchsia_wlan_policy::ScanResultIteratorProxyInterface;
pub(super) struct FakeBssCache {
update_result: Result<(), UpdateError>,
bsses: Vec<(BssId, Bss)>,
was_update_called: bool,
}
pub(super) struct StubBssResolver<R: Fn() -> Result<Position, ResolverError>> {
// Note, we can't just store a value here, because `Position` is not Copy.
pub resolve: R,
}
impl FakeBssCache {
pub fn new(update_result: Result<(), UpdateError>) -> Self {
Self { update_result, bsses: Vec::new(), was_update_called: false }
}
pub fn was_update_called(&self) -> bool {
self.was_update_called
}
}
#[async_trait(?Send)]
impl BssCache for FakeBssCache {
async fn update<I: ScanResultIteratorProxyInterface>(
&mut self,
_new_bsses: I,
) -> Result<(), UpdateError> {
self.was_update_called = true;
self.update_result.clone()
}
fn iter(&self) -> Box<dyn Iterator<Item = (&'_ BssId, &'_ Bss)> + '_> {
Box::new(self.bsses.iter().map(|(id, bss)| (id, bss)))
}
}
#[async_trait(?Send)]
impl<R> BssResolver for StubBssResolver<R>
where
R: Fn() -> Result<Position, ResolverError>,
{
async fn resolve<'a, I, T, U>(&self, _bss_list: I) -> Result<Position, ResolverError> {
(self.resolve)()
}
}
}