blob: cecffb2f0c3bd04f238ffab6f309fecaf30471da [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{self, Context, Error},
emergency_lib::{
bss_cache::{Bss, BssCache, BssId, RealBssCache, UpdateError},
bss_resolver::{BssResolver, RealBssResolver, ResolverError},
},
emergency_metrics_registry::{
self as metrics, EmergencyGetCurrentFailureMetricDimensionCause as GetCurrentFailure,
EmergencyGetCurrentResultMetricDimensionResult as GetCurrentResult,
WlanSensorReportMetricDimensionResult as WlanSensorReportResult,
EMERGENCY_GET_CURRENT_ACCURACY_METRIC_ID as GET_CURRENT_ACCURACY_METRIC_ID,
EMERGENCY_GET_CURRENT_FAILURE_METRIC_ID as GET_CURRENT_FAILURE_METRIC_ID,
EMERGENCY_GET_CURRENT_LATENCY_METRIC_ID as GET_CURRENT_LATENCY_METRIC_ID,
EMERGENCY_GET_CURRENT_RESULT_METRIC_ID as GET_CURRENT_RESULT_METRIC_ID,
WLAN_SENSOR_REPORT_METRIC_ID,
},
fidl_fuchsia_location::Error as LocationError,
fidl_fuchsia_location_position::{
EmergencyProviderRequest, EmergencyProviderRequestStream, Position,
},
fidl_fuchsia_location_sensor::{
WlanBaseStationWatcherRequest, WlanBaseStationWatcherRequestStream,
},
fidl_fuchsia_net_http::LoaderMarker as HttpLoaderMarker,
fuchsia_async as fasync,
fuchsia_cobalt::{CobaltConnector, CobaltSender, ConnectionType},
fuchsia_component::{client::connect_to_service, server::ServiceFs},
fuchsia_syslog::{self as syslog},
futures::{lock::Mutex, prelude::*},
log::info,
std::{
convert::TryFrom,
time::{Duration, Instant},
},
};
const CONCURRENCY_LIMIT: Option<usize> = None;
const API_KEY_FILE: &str = "/config/data/google_maps_api_key.txt";
/// Wraps all hosted protocols into a single type that can be matched against
/// and dispatched.
enum IncomingRequest {
EmergencyProviderRequest(EmergencyProviderRequestStream),
WlanBaseStationWatcherRequest(WlanBaseStationWatcherRequestStream),
}
#[fasync::run_singlethreaded]
async fn main() -> Result<(), anyhow::Error> {
syslog::init().context("failed to initialize logging")?;
let (cobalt_api, cobalt_fut) =
CobaltConnector::default().serve(ConnectionType::project_id(metrics::PROJECT_ID));
let _cobalt_task = fasync::Task::spawn(cobalt_fut);
let bss_cache = Mutex::new(RealBssCache::new());
let bss_resolver = RealBssResolver::new(
connect_to_service::<HttpLoaderMarker>().context("failed to connect to http loader")?,
std::fs::read_to_string(API_KEY_FILE)
.with_context(|| format!("failed to read {}", API_KEY_FILE))?,
);
let mut service_fs = ServiceFs::new_local();
service_fs
.dir("svc")
.add_fidl_service(IncomingRequest::EmergencyProviderRequest)
.add_fidl_service(IncomingRequest::WlanBaseStationWatcherRequest);
service_fs
.take_and_serve_directory_handle()
.context("failed to serve outgoing namespace")?
.for_each_concurrent(CONCURRENCY_LIMIT, |connection| {
handle_client_requests(&bss_cache, &bss_resolver, connection, cobalt_api.clone())
.unwrap_or_else(|e| info!("connection terminated: {:?}", e))
})
.await;
Ok(())
}
async fn handle_client_requests<C: BssCache, R: BssResolver>(
bss_cache: &Mutex<C>,
bss_resolver: &R,
protocol: IncomingRequest,
cobalt_api: CobaltSender,
) -> Result<(), Error> {
match protocol {
IncomingRequest::EmergencyProviderRequest(client) => {
process_location_queries(&bss_cache, bss_resolver, client, cobalt_api).await
}
IncomingRequest::WlanBaseStationWatcherRequest(client) => {
process_bss_updates(&bss_cache, client, cobalt_api).await
}
}
}
async fn process_location_queries<C: BssCache, R: BssResolver>(
bss_cache: &Mutex<C>,
bss_resolver: &R,
mut stream: EmergencyProviderRequestStream,
mut cobalt_api: CobaltSender,
) -> Result<(), Error> {
loop {
match stream.try_next().await.context("failed to read emergency provider request")? {
Some(EmergencyProviderRequest::GetCurrent { responder }) => {
let start_time = Instant::now();
// We don't want to hold the BSS cache lock while resolving the BSSes to a
// `Position`, so we copy data from the iterator into our own `Vector`.
let bss_list: Vec<(BssId, Bss)> =
bss_cache.lock().await.iter().map(|(&id, &bss)| (id, bss)).collect();
match bss_resolver.resolve(bss_list).await {
Ok(position) => {
report_lookup_success_metrics(
&position,
&start_time.elapsed(),
&mut cobalt_api,
);
responder
.send(&mut Ok(position))
.context("failed to send position to caller")?;
}
Err(e) => {
info!("lookup failed: {:?}", e);
report_lookup_failure_metrics(e, &mut cobalt_api);
responder
.send(&mut Err(LocationError::GeneralError))
.context("failed to send error to client")?
}
}
}
None => return Ok(()),
}
}
}
async fn process_bss_updates<C: BssCache>(
bss_cache: &Mutex<C>,
mut stream: WlanBaseStationWatcherRequestStream,
mut cobalt_api: CobaltSender,
) -> Result<(), Error> {
loop {
match stream.try_next().await.context("failed to read base station watcher request")? {
Some(WlanBaseStationWatcherRequest::ReportCurrentStations {
stations,
control_handle: _,
}) => {
let update_result = bss_cache
.lock()
.await
.update(
stations
.into_proxy()
.context("failed to get proxy for scan result iterator")?,
)
.await;
report_bss_update_metrics(update_result, &mut cobalt_api);
update_result.context("failed to apply base station update")?
}
None => return Ok(()),
}
}
}
fn report_lookup_success_metrics(
position: &Position,
latency: &Duration,
cobalt_api: &mut CobaltSender,
) {
const METERS_TO_MILLIMETERS: f64 = 1000.0;
cobalt_api.log_event(GET_CURRENT_RESULT_METRIC_ID, GetCurrentResult::Success);
cobalt_api.log_elapsed_time(
GET_CURRENT_LATENCY_METRIC_ID,
(),
i64::try_from(latency.as_millis()).unwrap_or(i64::MAX),
);
cobalt_api.log_event_count(
GET_CURRENT_ACCURACY_METRIC_ID,
(),
0,
match position.extras.accuracy_meters {
Some(accuracy_meters) => (accuracy_meters * METERS_TO_MILLIMETERS) as i64,
None => i64::MAX,
},
)
}
fn report_lookup_failure_metrics(error: ResolverError, cobalt_api: &mut CobaltSender) {
cobalt_api.log_event(GET_CURRENT_RESULT_METRIC_ID, GetCurrentResult::Failure);
cobalt_api.log_event(
GET_CURRENT_FAILURE_METRIC_ID,
match error {
ResolverError::NoBsses => GetCurrentFailure::NoBsses,
ResolverError::Internal => GetCurrentFailure::Internal,
ResolverError::Lookup => GetCurrentFailure::Lookup,
},
);
}
fn report_bss_update_metrics(result: Result<(), UpdateError>, cobalt_api: &mut CobaltSender) {
cobalt_api.log_event(
WLAN_SENSOR_REPORT_METRIC_ID,
match result {
Ok(()) => WlanSensorReportResult::Success,
Err(UpdateError::NoBssIds) => WlanSensorReportResult::NoBssIds,
Err(UpdateError::NoBsses) => WlanSensorReportResult::NoBsses,
Err(UpdateError::Ipc) => WlanSensorReportResult::IpcError,
Err(UpdateError::Service) => WlanSensorReportResult::ServiceError,
},
);
}
#[cfg(test)]
mod tests {
use {
super::*,
cobalt_client::traits::AsEventCode,
fidl::endpoints::create_proxy_and_stream,
fidl_fuchsia_cobalt::CobaltEvent,
fuchsia_cobalt::cobalt_event_builder::CobaltEventExt,
fuchsia_cobalt::CobaltSender,
futures::channel::mpsc,
futures::{future, pin_mut},
matches::assert_matches,
std::task::Poll,
test_doubles::{FakeBssCache, StubBssResolver},
};
mod base_station_watcher {
use {
super::*, emergency_lib::bss_cache::UpdateError,
fidl::endpoints::create_request_stream,
fidl_fuchsia_location_sensor::WlanBaseStationWatcherMarker,
fidl_fuchsia_wlan_policy::ScanResultIteratorMarker, test_case::test_case,
};
#[fasync::run_until_stalled(test)]
async fn propagates_stations_downward() {
let (cobalt_sender, _cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<WlanBaseStationWatcherMarker>()
.expect("internal error: failed to create base station watcher");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || panic!("unexpected call to resolver") },
IncomingRequest::WlanBaseStationWatcherRequest(stream),
cobalt_sender,
);
let (scan_result_reader, _scan_result_generator) =
create_request_stream::<ScanResultIteratorMarker>()
.expect("internal error: failed to create scan result iterator");
proxy
.report_current_stations(scan_result_reader)
.expect("internal error: proxy failed to send request");
std::mem::drop(proxy); // Close connection so `server_fut` completes.
assert!(server_fut.await.is_ok());
assert!(bss_cache.lock().await.was_update_called())
}
#[fasync::run_until_stalled(test)]
async fn update_error_does_not_panic() {
let (cobalt_sender, _cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<WlanBaseStationWatcherMarker>()
.expect("internal error: failed to create base station watcher");
let bss_cache = Mutex::new(FakeBssCache::new(Err(UpdateError::NoBssIds)));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || panic!("unexpected call to resolver") },
IncomingRequest::WlanBaseStationWatcherRequest(stream),
cobalt_sender,
);
let (scan_result_reader, _scan_result_generator) =
create_request_stream::<ScanResultIteratorMarker>()
.expect("internal error: failed to create scan result iterator");
proxy
.report_current_stations(scan_result_reader)
.expect("internal error: proxy failed to send request");
// Close connection so `server_fut` completes, even if it chooses to
// ignore the error.
std::mem::drop(proxy);
// The best error handling policy isn't exactly clear: is it useful
// to report an error upwards (which would cause `main()` to close
// the client connection), or is it better to leave the connection
// open, and hope that the client will provide more useful results
// next time?
//
// Rather than take a position on that question, we simply validate
// that the program doesn't crash when that happens. (If the program
// crashed, the test framework would report a test failure with
// the panic message.)
let _ = server_fut.await;
}
#[test_case(Ok(()), WlanSensorReportResult::Success; "success")]
#[test_case(Err(UpdateError::NoBssIds), WlanSensorReportResult::NoBssIds; "no bss ids")]
#[test_case(Err(UpdateError::NoBsses), WlanSensorReportResult::NoBsses; "no bsses")]
#[test_case(Err(UpdateError::Ipc), WlanSensorReportResult::IpcError; "ipc error")]
#[test_case(Err(UpdateError::Service), WlanSensorReportResult::ServiceError;
"service error")]
fn reports_update_result_to_cobalt(
update_result: Result<(), UpdateError>,
cobalt_event: WlanSensorReportResult,
) {
let test_fut = async {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<WlanBaseStationWatcherMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(update_result));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || panic!("unexpected call to resolver") },
IncomingRequest::WlanBaseStationWatcherRequest(stream),
cobalt_sender,
);
let (scan_result_reader, _scan_result_generator) =
create_request_stream::<ScanResultIteratorMarker>()
.expect("internal error: failed to create scan result iterator");
proxy
.report_current_stations(scan_result_reader)
.expect("internal error: proxy failed to send request");
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| {
future::ready(event.metric_id == WLAN_SENSOR_REPORT_METRIC_ID)
})
.collect::<Vec<CobaltEvent>>()
.await,
vec![CobaltEvent::builder(WLAN_SENSOR_REPORT_METRIC_ID)
.with_event_code(cobalt_event.as_event_code())
.as_event()]
);
};
pin_mut!(test_fut);
assert_matches!(
fasync::Executor::new()
.expect("internal error: failed to create executor")
.run_until_stalled(&mut test_fut),
Poll::Ready(_)
);
}
#[fasync::run_until_stalled(test)]
async fn does_not_report_extra_metrics() {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<WlanBaseStationWatcherMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || panic!("unexpected call to resolver") },
IncomingRequest::WlanBaseStationWatcherRequest(stream),
cobalt_sender,
);
let (scan_result_reader, _scan_result_generator) =
create_request_stream::<ScanResultIteratorMarker>()
.expect("internal error: failed to create scan result iterator");
proxy
.report_current_stations(scan_result_reader)
.expect("internal error: proxy failed to send request");
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| future::ready(event.metric_id != WLAN_SENSOR_REPORT_METRIC_ID))
.collect::<Vec<CobaltEvent>>()
.await,
vec![]
);
}
}
mod emergency_provider {
use {
super::*,
emergency_lib::bss_resolver::ResolverError,
fidl_fuchsia_cobalt::EventPayload::ElapsedMicros,
fidl_fuchsia_location_position::{EmergencyProviderMarker, Position, PositionExtras},
test_case::test_case,
};
const POSITION_WITH_UNKNOWN_ACCURACY: Position = Position {
latitude: 1.0,
longitude: -1.0,
extras: PositionExtras {
accuracy_meters: None,
altitude_meters: None,
..PositionExtras::EMPTY
},
};
const POSITION_WITH_KNOWN_ACCURACY: Position = Position {
latitude: 1.0,
longitude: -1.0,
extras: PositionExtras {
accuracy_meters: Some(1.0),
altitude_meters: None,
..PositionExtras::EMPTY
},
};
#[fasync::run_until_stalled(test)]
async fn propagates_success_to_client() {
let (cobalt_sender, _cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Ok(POSITION_WITH_UNKNOWN_ACCURACY) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let client_fut = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let (client_res, _server_res) = future::join(client_fut, server_fut).await;
assert_matches!(client_res, Ok(Ok(Position {..})))
}
#[fasync::run_until_stalled(test)]
async fn propagates_error_to_client() {
let (cobalt_sender, _cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Err(ResolverError::NoBsses) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let client_fut = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let (client_res, _server_res) = future::join(client_fut, server_fut).await;
assert_matches!(client_res, Ok(Err(_))) // The `Ok` is the FIDL-level result.
}
#[fasync::run_until_stalled(test)]
async fn reports_success_to_cobalt() {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Ok(POSITION_WITH_UNKNOWN_ACCURACY) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| future::ready(event.metric_id == GET_CURRENT_RESULT_METRIC_ID))
.collect::<Vec<CobaltEvent>>()
.await,
vec![CobaltEvent::builder(GET_CURRENT_RESULT_METRIC_ID)
.with_event_code(GetCurrentResult::Success.as_event_code())
.as_event()]
);
}
#[fasync::run_until_stalled(test)]
async fn reports_failure_to_cobalt() {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Err(ResolverError::NoBsses) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| future::ready(event.metric_id == GET_CURRENT_RESULT_METRIC_ID))
.collect::<Vec<CobaltEvent>>()
.await,
vec![CobaltEvent::builder(GET_CURRENT_RESULT_METRIC_ID)
.with_event_code(GetCurrentResult::Failure.as_event_code())
.as_event()]
);
}
#[fasync::run_until_stalled(test)]
async fn propagates_reported_accuracy_to_cobalt() {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Ok(POSITION_WITH_KNOWN_ACCURACY) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| future::ready(
event.metric_id == GET_CURRENT_ACCURACY_METRIC_ID
))
.collect::<Vec<CobaltEvent>>()
.await,
vec![CobaltEvent::builder(GET_CURRENT_ACCURACY_METRIC_ID).as_count_event(
0,
1000 // In millimeters, for higher precision
)]
);
}
#[fasync::run_until_stalled(test)]
async fn reports_worst_accuracy_if_accuracy_is_unknown() {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Ok(POSITION_WITH_UNKNOWN_ACCURACY) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| future::ready(
event.metric_id == GET_CURRENT_ACCURACY_METRIC_ID
))
.collect::<Vec<CobaltEvent>>()
.await,
vec![CobaltEvent::builder(GET_CURRENT_ACCURACY_METRIC_ID)
.as_count_event(0, i64::MAX)]
);
}
#[fasync::run_until_stalled(test)]
async fn reports_elapsed_time_to_cobalt_on_success() {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let server_fut = handle_client_requests(
&bss_cache,
&StubBssResolver { resolve: || Ok(POSITION_WITH_UNKNOWN_ACCURACY) },
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
let latency_reports = cobalt_receiver
.filter(|event| future::ready(event.metric_id == GET_CURRENT_LATENCY_METRIC_ID))
.collect::<Vec<CobaltEvent>>()
.await;
assert_eq!(latency_reports.len(), 1);
assert_matches!(
latency_reports[0],
CobaltEvent {
metric_id: GET_CURRENT_LATENCY_METRIC_ID,
payload: ElapsedMicros(_),
..
}
);
}
#[test_case(ResolverError::NoBsses, GetCurrentFailure::NoBsses; "no bsses")]
#[test_case(ResolverError::Internal, GetCurrentFailure::Internal; "internal")]
#[test_case(ResolverError::Lookup, GetCurrentFailure::Lookup; "lookup")]
fn reports_resolver_error_to_cobalt(
resolver_error: ResolverError,
cobalt_error: GetCurrentFailure,
) {
let test_fut = async {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let resolver = StubBssResolver { resolve: || Err(resolver_error) };
let server_fut = handle_client_requests(
&bss_cache,
&resolver,
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| {
future::ready(event.metric_id == GET_CURRENT_FAILURE_METRIC_ID)
})
.collect::<Vec<CobaltEvent>>()
.await,
vec![CobaltEvent::builder(GET_CURRENT_FAILURE_METRIC_ID)
.with_event_code(cobalt_error.as_event_code())
.as_event()]
);
};
pin_mut!(test_fut);
assert_matches!(
fasync::Executor::new()
.expect("internal error: failed to create executor")
.run_until_stalled(&mut test_fut),
Poll::Ready(_)
);
}
#[test_case(
Ok(()),
&[GET_CURRENT_RESULT_METRIC_ID, GET_CURRENT_ACCURACY_METRIC_ID,
GET_CURRENT_LATENCY_METRIC_ID];
"on_success")]
#[test_case(
Err(()),
&[GET_CURRENT_RESULT_METRIC_ID, GET_CURRENT_FAILURE_METRIC_ID];
"on_failure")]
fn does_not_report_extra_metrics(result: Result<(), ()>, expected_metric_ids: &[u32]) {
let test_fut = async {
let (cobalt_sender, cobalt_receiver) = make_fake_cobalt_connection();
let (proxy, stream) = create_proxy_and_stream::<EmergencyProviderMarker>()
.expect("internal error: failed to create emergency provider");
let bss_cache = Mutex::new(FakeBssCache::new(Ok(())));
let resolver = StubBssResolver {
resolve: || match &result {
Ok(()) => Ok(POSITION_WITH_KNOWN_ACCURACY),
Err(()) => Err(ResolverError::NoBsses),
},
};
let server_fut = handle_client_requests(
&bss_cache,
&resolver,
IncomingRequest::EmergencyProviderRequest(stream),
cobalt_sender,
);
let _ = proxy.get_current();
std::mem::drop(proxy); // Close connection so `server_fut` completes.
let _ = server_fut.await;
assert_eq!(
cobalt_receiver
.filter(|event| future::ready(
expected_metric_ids.iter().find(|&&i| i == event.metric_id).is_none()
))
.collect::<Vec<CobaltEvent>>()
.await,
vec![]
);
};
pin_mut!(test_fut);
assert_matches!(
fasync::Executor::new()
.expect("internal error: failed to create executor")
.run_until_stalled(&mut test_fut),
Poll::Ready(_)
);
}
}
fn make_fake_cobalt_connection() -> (CobaltSender, mpsc::Receiver<CobaltEvent>) {
const MAX_METRICS_PER_QUERY: usize = 3;
const MAX_QUERIES: usize = 1;
let (sender, receiver) = mpsc::channel(MAX_METRICS_PER_QUERY * MAX_QUERIES);
(CobaltSender::new(sender), receiver)
}
}
#[cfg(test)]
mod test_doubles {
use {
super::*,
async_trait::async_trait,
emergency_lib::{bss_cache::UpdateError, bss_resolver::ResolverError},
fidl_fuchsia_location_position::Position,
fidl_fuchsia_wlan_policy::ScanResultIteratorProxyInterface,
};
pub(super) struct FakeBssCache {
update_result: Result<(), UpdateError>,
bsses: Vec<(BssId, Bss)>,
was_update_called: bool,
}
pub(super) struct StubBssResolver<R: Fn() -> Result<Position, ResolverError>> {
// Note, we can't just store a value here, because `Position` is not Copy.
pub resolve: R,
}
impl FakeBssCache {
pub fn new(update_result: Result<(), UpdateError>) -> Self {
Self { update_result, bsses: Vec::new(), was_update_called: false }
}
pub fn was_update_called(&self) -> bool {
self.was_update_called
}
}
#[async_trait(?Send)]
impl BssCache for FakeBssCache {
async fn update<I: ScanResultIteratorProxyInterface>(
&mut self,
_new_bsses: I,
) -> Result<(), UpdateError> {
self.was_update_called = true;
self.update_result.clone()
}
fn iter(&self) -> Box<dyn Iterator<Item = (&'_ BssId, &'_ Bss)> + '_> {
Box::new(self.bsses.iter().map(|(id, bss)| (id, bss)))
}
}
#[async_trait(?Send)]
impl<R> BssResolver for StubBssResolver<R>
where
R: Fn() -> Result<Position, ResolverError>,
{
async fn resolve<'a, I, T, U>(&self, _bss_list: I) -> Result<Position, ResolverError> {
(self.resolve)()
}
}
}