blob: b03318026c26699763f2efb7a19ff19919c53ef7 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
mod windowed_stats;
use {
crate::telemetry::windowed_stats::WindowedStats,
fuchsia_async as fasync,
fuchsia_inspect::{Inspector, Node as InspectNode},
fuchsia_inspect_contrib::inspect_insert,
fuchsia_zircon as zx,
futures::{channel::mpsc, select, Future, FutureExt, StreamExt},
log::{info, warn},
num_traits::SaturatingAdd,
parking_lot::Mutex,
static_assertions::const_assert_eq,
std::{
ops::Add,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
},
};
#[derive(Clone, Debug)]
pub struct TelemetrySender {
sender: Arc<Mutex<mpsc::Sender<TelemetryEvent>>>,
sender_is_blocked: Arc<AtomicBool>,
}
impl TelemetrySender {
pub fn new(sender: mpsc::Sender<TelemetryEvent>) -> Self {
Self {
sender: Arc::new(Mutex::new(sender)),
sender_is_blocked: Arc::new(AtomicBool::new(false)),
}
}
// Send telemetry event. Log an error if it fails
pub fn send(&self, event: TelemetryEvent) {
match self.sender.lock().try_send(event) {
Ok(_) => {
// If sender has been blocked before, set bool to false and log message
if let Ok(_) = self.sender_is_blocked.compare_exchange(
true,
false,
Ordering::SeqCst,
Ordering::SeqCst,
) {
info!("TelemetrySender recovered and resumed sending");
}
}
Err(_) => {
// If sender has not been blocked before, set bool to true and log error message
if let Ok(_) = self.sender_is_blocked.compare_exchange(
false,
true,
Ordering::SeqCst,
Ordering::SeqCst,
) {
warn!("TelemetrySender dropped a msg: either buffer is full or no receiver is waiting");
}
}
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum TelemetryEvent {
/// Notify the telemetry event loop that network selection has started.
StartNetworkSelection {
/// Type of network selection. This field is currently unused.
network_selection_type: NetworkSelectionType,
},
/// Notify the telemetry event loop that network selection is complete.
NetworkSelectionDecision {
/// When there's a scan error, `num_candidates` should be Err.
/// When `num_candidates` is `Ok(0)` and the telemetry event loop is tracking downtime,
/// the event loop will use the period of network selection to increment the
/// `downtime_no_saved_neighbor_duration` counter. This would later be used to
/// adjust the raw downtime.
num_candidates: Result<usize, ()>,
/// Whether a network has been selected. This field is currently unused.
selected_any: bool,
},
/// Notify the telemetry event loop that the client has connected.
/// Subsequently, the telemetry event loop will increment the `connected_duration` counter
/// periodically.
Connected,
/// Notify the telemetry event loop that the client has disconnected.
/// Subsequently, the telemetry event loop will increment the downtime counters periodically
/// if TelemetrySender has requested downtime to be tracked via `track_subsequent_downtime`
/// flag.
Disconnected {
/// Indicates whether subsequent period should be used to increment the downtime counters.
track_subsequent_downtime: bool,
},
}
#[derive(Clone, Debug, PartialEq)]
pub enum NetworkSelectionType {
/// Looking for the best BSS from any saved networks
Undirected,
/// Looking for the best BSS for a particular network
Directed,
}
/// Capacity of "first come, first serve" slots available to clients of
/// the mpsc::Sender<TelemetryEvent>.
const TELEMETRY_EVENT_BUFFER_SIZE: usize = 100;
/// How often to request RSSI stats and dispatcher packet counts from MLME.
const TELEMETRY_QUERY_INTERVAL: zx::Duration = zx::Duration::from_seconds(15);
/// Create a struct for sending TelemetryEvent, and a future representing the telemetry loop.
///
/// Every 15 seconds, the telemetry loop will query for MLME/PHY stats and update various
/// time-interval stats. The telemetry loop also handles incoming TelemetryEvent to update
/// the appropriate stats.
pub fn serve_telemetry(inspect_node: InspectNode) -> (TelemetrySender, impl Future<Output = ()>) {
let (sender, mut receiver) = mpsc::channel::<TelemetryEvent>(TELEMETRY_EVENT_BUFFER_SIZE);
let fut = async move {
let mut report_interval_stream = fasync::Interval::new(TELEMETRY_QUERY_INTERVAL);
const ONE_HOUR: zx::Duration = zx::Duration::from_hours(1);
const_assert_eq!(ONE_HOUR.into_nanos() % TELEMETRY_QUERY_INTERVAL.into_nanos(), 0);
const INTERVAL_TICKS_PER_HR: u64 =
(ONE_HOUR.into_nanos() / TELEMETRY_QUERY_INTERVAL.into_nanos()) as u64;
let mut interval_tick = 0;
let mut telemetry = Telemetry::new(inspect_node);
loop {
select! {
event = receiver.next() => {
if let Some(event) = event {
telemetry.handle_telemetry_event(event);
}
}
_ = report_interval_stream.next() => {
telemetry.handle_periodic_telemetry();
// This ensures that `signal_hr_passed` is always called after
// `handle_periodic_telemetry` at the hour mark. This is mainly for ease
// of testing.
interval_tick = (interval_tick + 1) % INTERVAL_TICKS_PER_HR;
if interval_tick == 0 {
telemetry.signal_hr_passed();
}
}
}
}
};
(TelemetrySender::new(sender), fut)
}
#[derive(Debug, Clone, PartialEq)]
enum ConnectionState {
// Like disconnected, but no downtime is tracked.
Idle,
Connected,
Disconnected,
}
fn record_inspect_counters(
inspect_node: &InspectNode,
child_name: &str,
counters: Arc<Mutex<WindowedStats<StatCounters>>>,
) {
inspect_node.record_lazy_child(child_name, move || {
let counters = Arc::clone(&counters);
async move {
let inspector = Inspector::new();
{
let counters_mutex_guard = counters.lock();
let counters = counters_mutex_guard.windowed_stat();
inspect_insert!(inspector.root(), {
connected_duration: counters.connected_duration.into_nanos(),
downtime_duration: counters.downtime_duration.into_nanos(),
downtime_no_saved_neighbor_duration: counters.downtime_no_saved_neighbor_duration.into_nanos(),
});
}
Ok(inspector)
}
.boxed()
});
}
pub struct Telemetry {
connection_state: ConnectionState,
last_checked_connection_state: fasync::Time,
network_selection_start_time: Option<fasync::Time>,
stats_logger: StatsLogger,
_inspect_node: InspectNode,
}
impl Telemetry {
pub fn new(inspect_node: InspectNode) -> Self {
let stats_logger = StatsLogger::new();
record_inspect_counters(
&inspect_node,
"1d_counters",
Arc::clone(&stats_logger.last_1d_stats),
);
record_inspect_counters(
&inspect_node,
"7d_counters",
Arc::clone(&stats_logger.last_7d_stats),
);
Self {
connection_state: ConnectionState::Idle,
last_checked_connection_state: fasync::Time::now(),
network_selection_start_time: None,
stats_logger,
_inspect_node: inspect_node,
}
}
pub fn handle_periodic_telemetry(&mut self) {
let now = fasync::Time::now();
let duration = now - self.last_checked_connection_state;
match &self.connection_state {
ConnectionState::Idle => (),
ConnectionState::Connected => {
self.stats_logger.log_stat(StatOp::AddConnectedDuration(duration));
}
ConnectionState::Disconnected => {
self.stats_logger.log_stat(StatOp::AddDowntimeDuration(duration));
}
}
self.last_checked_connection_state = now;
}
pub fn handle_telemetry_event(&mut self, event: TelemetryEvent) {
let now = fasync::Time::now();
match event {
TelemetryEvent::StartNetworkSelection { .. } => {
let _prev = self.network_selection_start_time.replace(now);
}
TelemetryEvent::NetworkSelectionDecision { num_candidates, .. } => {
if let Some(start_time) = self.network_selection_start_time.take() {
match self.connection_state {
ConnectionState::Disconnected => match num_candidates {
Ok(0) => {
// TODO(fxbug.dev/80699): Track a `no_saved_neighbor` flag and add
// all subsequent downtime to this counter.
self.stats_logger.log_stat(
StatOp::AddDowntimeNoSavedNeighborDuration(now - start_time),
);
}
_ => (),
},
_ => (),
}
}
}
TelemetryEvent::Connected => {
let duration = now - self.last_checked_connection_state;
if let ConnectionState::Disconnected = self.connection_state {
self.stats_logger.log_stat(StatOp::AddDowntimeDuration(duration));
}
self.connection_state = ConnectionState::Connected;
self.last_checked_connection_state = now;
}
TelemetryEvent::Disconnected { track_subsequent_downtime } => {
let duration = now - self.last_checked_connection_state;
if let ConnectionState::Connected = self.connection_state {
self.stats_logger.log_stat(StatOp::AddConnectedDuration(duration));
}
self.connection_state = if track_subsequent_downtime {
ConnectionState::Disconnected
} else {
ConnectionState::Idle
};
self.last_checked_connection_state = now;
}
}
}
pub fn signal_hr_passed(&mut self) {
self.stats_logger.handle_hr_passed();
}
}
struct StatsLogger {
last_1d_stats: Arc<Mutex<WindowedStats<StatCounters>>>,
last_7d_stats: Arc<Mutex<WindowedStats<StatCounters>>>,
hr_tick: u32,
}
impl StatsLogger {
pub fn new() -> Self {
Self {
last_1d_stats: Arc::new(Mutex::new(WindowedStats::new(24))),
last_7d_stats: Arc::new(Mutex::new(WindowedStats::new(7))),
hr_tick: 0,
}
}
fn log_stat(&mut self, stat_op: StatOp) {
let zero = StatCounters::default();
let addition = match stat_op {
StatOp::AddConnectedDuration(duration) => {
StatCounters { connected_duration: duration, ..zero }
}
StatOp::AddDowntimeDuration(duration) => {
StatCounters { downtime_duration: duration, ..zero }
}
StatOp::AddDowntimeNoSavedNeighborDuration(duration) => {
StatCounters { downtime_no_saved_neighbor_duration: duration, ..zero }
}
};
self.last_1d_stats.lock().saturating_add(&addition);
self.last_7d_stats.lock().saturating_add(&addition);
}
fn handle_hr_passed(&mut self) {
self.hr_tick = (self.hr_tick + 1) % 24;
self.last_1d_stats.lock().slide_window();
if self.hr_tick == 0 {
self.last_7d_stats.lock().slide_window();
}
}
}
enum StatOp {
AddConnectedDuration(zx::Duration),
AddDowntimeDuration(zx::Duration),
// Downtime with no saved network in vicinity
AddDowntimeNoSavedNeighborDuration(zx::Duration),
}
#[derive(Clone, Default)]
struct StatCounters {
connected_duration: zx::Duration,
downtime_duration: zx::Duration,
downtime_no_saved_neighbor_duration: zx::Duration,
}
// `Add` implementation is required to implement `SaturatingAdd` down below.
impl Add for StatCounters {
type Output = Self;
fn add(self, other: Self) -> Self {
Self {
connected_duration: self.connected_duration + other.connected_duration,
downtime_duration: self.downtime_duration + other.downtime_duration,
downtime_no_saved_neighbor_duration: self.downtime_no_saved_neighbor_duration
+ other.downtime_no_saved_neighbor_duration,
}
}
}
impl SaturatingAdd for StatCounters {
fn saturating_add(&self, v: &Self) -> Self {
Self {
connected_duration: zx::Duration::from_nanos(
self.connected_duration
.into_nanos()
.saturating_add(v.connected_duration.into_nanos()),
),
downtime_duration: zx::Duration::from_nanos(
self.downtime_duration
.into_nanos()
.saturating_add(v.downtime_duration.into_nanos()),
),
downtime_no_saved_neighbor_duration: zx::Duration::from_nanos(
self.downtime_no_saved_neighbor_duration
.into_nanos()
.saturating_add(v.downtime_no_saved_neighbor_duration.into_nanos()),
),
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
fuchsia_inspect::{assert_data_tree, Inspector},
fuchsia_zircon::DurationNum,
futures::task::Poll,
std::pin::Pin,
};
const STEP_INCREMENT: zx::Duration = zx::Duration::from_seconds(1);
#[fuchsia::test]
fn test_stat_cycles() {
let (mut test_helper, mut test_fut) = setup_test();
test_helper.telemetry_sender.send(TelemetryEvent::Connected);
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
test_helper.advance_by(24.hours() - TELEMETRY_QUERY_INTERVAL, test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: (24.hours() - TELEMETRY_QUERY_INTERVAL).into_nanos(),
},
"7d_counters": contains {
connected_duration: (24.hours() - TELEMETRY_QUERY_INTERVAL).into_nanos(),
},
}
});
test_helper.advance_to_next_telemetry_checkpoint(test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
// The first hour window is now discarded, so it only shows 23 hours
// of connected duration.
connected_duration: 23.hours().into_nanos(),
},
"7d_counters": contains {
connected_duration: 24.hours().into_nanos(),
},
}
});
test_helper.advance_by(2.hours(), test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 23.hours().into_nanos(),
},
"7d_counters": contains {
connected_duration: 26.hours().into_nanos(),
},
}
});
// Disconnect now
test_helper
.telemetry_sender
.send(TelemetryEvent::Disconnected { track_subsequent_downtime: false });
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
// Now the 1d counter should decrease
test_helper.advance_by(8.hours(), test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 15.hours().into_nanos(),
},
"7d_counters": contains {
connected_duration: 26.hours().into_nanos(),
},
}
});
// The 7d counter does not decrease before the 7th day
test_helper.advance_by(14.hours(), test_fut.as_mut());
test_helper.advance_by((5 * 24).hours() - TELEMETRY_QUERY_INTERVAL, test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 0i64,
},
"7d_counters": contains {
connected_duration: 26.hours().into_nanos(),
},
}
});
// On the 7th day, the first 24 hours of connected duration is deducted
test_helper.advance_to_next_telemetry_checkpoint(test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 0i64,
},
"7d_counters": contains {
connected_duration: 2.hours().into_nanos(),
},
}
});
}
#[fuchsia::test]
fn test_counters_when_idle() {
let (mut test_helper, mut test_fut) = setup_test();
test_helper.advance_by(30.minutes(), test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 0i64,
downtime_duration: 0i64,
downtime_no_saved_neighbor_duration: 0i64,
},
"7d_counters": contains {
connected_duration: 0i64,
downtime_duration: 0i64,
downtime_no_saved_neighbor_duration: 0i64,
},
}
});
test_helper.advance_by(30.minutes(), test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 0i64,
downtime_duration: 0i64,
downtime_no_saved_neighbor_duration: 0i64,
},
"7d_counters": contains {
connected_duration: 0i64,
downtime_duration: 0i64,
downtime_no_saved_neighbor_duration: 0i64,
},
}
});
}
#[fuchsia::test]
fn test_connected_counters_increase_when_connected() {
let (mut test_helper, mut test_fut) = setup_test();
test_helper.telemetry_sender.send(TelemetryEvent::Connected);
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
test_helper.advance_by(30.minutes(), test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 30.minutes().into_nanos(),
downtime_duration: 0i64,
downtime_no_saved_neighbor_duration: 0i64,
},
"7d_counters": contains {
connected_duration: 30.minutes().into_nanos(),
downtime_duration: 0i64,
downtime_no_saved_neighbor_duration: 0i64,
},
}
});
test_helper.advance_by(30.minutes(), test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 1.hour().into_nanos(),
downtime_duration: 0i64,
downtime_no_saved_neighbor_duration: 0i64,
},
"7d_counters": contains {
connected_duration: 1.hour().into_nanos(),
downtime_duration: 0i64,
downtime_no_saved_neighbor_duration: 0i64,
},
}
});
}
#[fuchsia::test]
fn test_downtime_counter() {
let (mut test_helper, mut test_fut) = setup_test();
// Disconnect but not track downtime. Downtime counter should not increase.
test_helper
.telemetry_sender
.send(TelemetryEvent::Disconnected { track_subsequent_downtime: false });
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
test_helper.advance_by(10.minutes(), test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 0i64,
downtime_duration: 0i64,
downtime_no_saved_neighbor_duration: 0i64,
},
"7d_counters": contains {
connected_duration: 0i64,
downtime_duration: 0i64,
downtime_no_saved_neighbor_duration: 0i64,
},
}
});
// Disconnect and track downtime. Downtime counter should now increase
test_helper
.telemetry_sender
.send(TelemetryEvent::Disconnected { track_subsequent_downtime: true });
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
test_helper.advance_by(15.minutes(), test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 0i64,
downtime_duration: 15.minutes().into_nanos(),
downtime_no_saved_neighbor_duration: 0i64,
},
"7d_counters": contains {
connected_duration: 0i64,
downtime_duration: 15.minutes().into_nanos(),
downtime_no_saved_neighbor_duration: 0i64,
},
}
});
}
#[fuchsia::test]
fn test_counters_connect_then_disconnect() {
let (mut test_helper, mut test_fut) = setup_test();
test_helper.telemetry_sender.send(TelemetryEvent::Connected);
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
test_helper.advance_by(5.seconds(), test_fut.as_mut());
// Disconnect but not track downtime. Downtime counter should not increase.
test_helper
.telemetry_sender
.send(TelemetryEvent::Disconnected { track_subsequent_downtime: true });
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
// The 5 seconds connected duration is accounted for right away
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 5.seconds().into_nanos(),
downtime_duration: 0i64,
downtime_no_saved_neighbor_duration: 0i64,
},
"7d_counters": contains {
connected_duration: 5.seconds().into_nanos(),
downtime_duration: 0i64,
downtime_no_saved_neighbor_duration: 0i64,
},
}
});
// At next telemetry checkpoint, `test_fut` updates the downtime duration
let downtime_start = fasync::Time::now();
test_helper.advance_to_next_telemetry_checkpoint(test_fut.as_mut());
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 5.seconds().into_nanos(),
downtime_duration: (fasync::Time::now() - downtime_start).into_nanos(),
downtime_no_saved_neighbor_duration: 0i64,
},
"7d_counters": contains {
connected_duration: 5.seconds().into_nanos(),
downtime_duration: (fasync::Time::now() - downtime_start).into_nanos(),
downtime_no_saved_neighbor_duration: 0i64,
},
}
});
}
#[fuchsia::test]
fn test_downtime_no_saved_neighbor_duration_counter() {
let (mut test_helper, mut test_fut) = setup_test();
test_helper.telemetry_sender.send(TelemetryEvent::Connected);
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
// Disconnect and track downtime.
test_helper
.telemetry_sender
.send(TelemetryEvent::Disconnected { track_subsequent_downtime: true });
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
test_helper.advance_by(5.seconds(), test_fut.as_mut());
test_helper.telemetry_sender.send(TelemetryEvent::StartNetworkSelection {
network_selection_type: NetworkSelectionType::Undirected,
});
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
test_helper.advance_by(2.seconds(), test_fut.as_mut());
test_helper.telemetry_sender.send(TelemetryEvent::NetworkSelectionDecision {
num_candidates: Ok(0),
selected_any: false,
});
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
let prev = fasync::Time::now();
test_helper.advance_to_next_telemetry_checkpoint(test_fut.as_mut());
let downtime_duration = (7.seconds() + (fasync::Time::now() - prev)).into_nanos();
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 0i64,
downtime_duration: downtime_duration,
downtime_no_saved_neighbor_duration: 2.seconds().into_nanos(),
},
"7d_counters": contains {
connected_duration: 0i64,
downtime_duration: downtime_duration,
downtime_no_saved_neighbor_duration: 2.seconds().into_nanos(),
},
}
});
// Disconnect but don't track downtime
test_helper
.telemetry_sender
.send(TelemetryEvent::Disconnected { track_subsequent_downtime: false });
// Go through the same sequence of network selection as before
test_helper.advance_by(5.seconds(), test_fut.as_mut());
test_helper.telemetry_sender.send(TelemetryEvent::StartNetworkSelection {
network_selection_type: NetworkSelectionType::Undirected,
});
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
test_helper.advance_by(2.seconds(), test_fut.as_mut());
test_helper.telemetry_sender.send(TelemetryEvent::NetworkSelectionDecision {
num_candidates: Ok(0),
selected_any: false,
});
assert_eq!(test_helper.exec.run_until_stalled(&mut test_fut), Poll::Pending);
test_helper.advance_to_next_telemetry_checkpoint(test_fut.as_mut());
// However, this time neither of the downtime counters should be incremented
assert_data_tree!(test_helper.inspector, root: {
stats: contains {
"1d_counters": contains {
connected_duration: 0i64,
downtime_duration: downtime_duration,
downtime_no_saved_neighbor_duration: 2.seconds().into_nanos(),
},
"7d_counters": contains {
connected_duration: 0i64,
downtime_duration: downtime_duration,
downtime_no_saved_neighbor_duration: 2.seconds().into_nanos(),
},
}
});
}
struct TestHelper {
exec: fasync::TestExecutor,
telemetry_sender: TelemetrySender,
inspector: Inspector,
}
impl TestHelper {
// Advance executor by `duration`.
// This function repeatedly advances the executor by 1 second, triggering any expired timers
// and running the test_fut, until `duration` is reached.
fn advance_by(
&mut self,
duration: zx::Duration,
mut test_fut: Pin<&mut impl Future<Output = ()>>,
) {
assert_eq!(
duration.into_nanos() % STEP_INCREMENT.into_nanos(),
0,
"duration {:?} is not divisible by STEP_INCREMENT",
duration,
);
const_assert_eq!(
TELEMETRY_QUERY_INTERVAL.into_nanos() % STEP_INCREMENT.into_nanos(),
0
);
for _i in 0..(duration.into_nanos() / STEP_INCREMENT.into_nanos()) {
self.exec.set_fake_time(fasync::Time::after(STEP_INCREMENT));
let _ = self.exec.wake_expired_timers();
assert_eq!(self.exec.run_until_stalled(&mut test_fut), Poll::Pending);
}
}
// Advance executor by some duration until the next time `test_fut` handles periodic
// telemetry. This uses `self.advance_by` underneath.
//
// This function assumes that executor starts test_fut at time 0 (which should be true
// if TestHelper is created from `setup_test()`)
fn advance_to_next_telemetry_checkpoint(
&mut self,
test_fut: Pin<&mut impl Future<Output = ()>>,
) {
let now = fasync::Time::now();
let remaining_interval = TELEMETRY_QUERY_INTERVAL.into_nanos()
- (now.into_nanos() % TELEMETRY_QUERY_INTERVAL.into_nanos());
self.advance_by(zx::Duration::from_nanos(remaining_interval), test_fut)
}
}
fn setup_test() -> (TestHelper, Pin<Box<impl Future<Output = ()>>>) {
let mut exec = fasync::TestExecutor::new_with_fake_time().expect("executor should build");
exec.set_fake_time(fasync::Time::from_nanos(0));
let inspector = Inspector::new();
let inspect_node = inspector.root().create_child("stats");
let (telemetry_sender, test_fut) = serve_telemetry(inspect_node);
let mut test_fut = Box::pin(test_fut);
assert_eq!(exec.run_until_stalled(&mut test_fut), Poll::Pending);
let test_helper = TestHelper { exec, telemetry_sender, inspector };
(test_helper, test_fut)
}
}