blob: 03fd9845869373701eb2b7dfdaa2b8cc23ea89fc [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::datatypes::{HttpsSample, Phase, Poll};
use crate::diagnostics::{Diagnostics, Event};
use fidl_fuchsia_cobalt::{CobaltEvent, HistogramBucket};
use fuchsia_cobalt::{CobaltConnector, CobaltEventExt, CobaltSender, ConnectionType};
use fuchsia_zircon as zx;
use futures::Future;
use parking_lot::Mutex;
use time_metrics_registry::{
HttpsdateBoundSizeMetricDimensionPhase as CobaltPhase, TimeMetricDimensionDirection,
HTTPSDATE_BOUND_SIZE_METRIC_ID, HTTPSDATE_POLL_LATENCY_INT_BUCKETS_FLOOR,
HTTPSDATE_POLL_LATENCY_INT_BUCKETS_NUM_BUCKETS as RTT_BUCKETS,
HTTPSDATE_POLL_LATENCY_INT_BUCKETS_STEP_SIZE, HTTPSDATE_POLL_LATENCY_METRIC_ID,
HTTPSDATE_POLL_OFFSET_METRIC_ID, PROJECT_ID,
};
const RTT_BUCKET_SIZE: zx::Duration =
zx::Duration::from_micros(HTTPSDATE_POLL_LATENCY_INT_BUCKETS_STEP_SIZE as i64);
const RTT_BUCKET_FLOOR: zx::Duration =
zx::Duration::from_micros(HTTPSDATE_POLL_LATENCY_INT_BUCKETS_FLOOR);
// Since the buckets for poll offsets are a Cobalt dimension rather than a histogram, we don't
// have nice autogenerated constants. These values must be kept in sync with the Cobalt definition
// in //third_party/cobalt_config/fuchsia/time/metrics.yaml.
const POLL_OFFSET_RTT_BUCKETS: u32 = 100;
const POLL_OFFSET_RTT_BUCKET_SIZE: zx::Duration = zx::Duration::from_micros(10000);
const POLL_OFFSET_RTT_FLOOR: zx::Duration = zx::Duration::from_micros(0);
/// A `Diagnostics` implementation that uploads diagnostics metrics to Cobalt.
pub struct CobaltDiagnostics {
/// Client connection to Cobalt.
sender: Mutex<CobaltSender>,
/// Last known phase of the algorithm.
phase: Mutex<Phase>,
}
impl CobaltDiagnostics {
/// Create a new `CobaltDiagnostics`, and future that must be polled to upload to Cobalt.
pub fn new() -> (Self, impl Future<Output = ()>) {
let (sender, fut) =
CobaltConnector::default().serve(ConnectionType::project_id(PROJECT_ID));
(Self { sender: Mutex::new(sender), phase: Mutex::new(Phase::Initial) }, fut)
}
/// Calculate the bucket number in the latency metric for a given duration.
fn round_trip_time_bucket(duration: &zx::Duration) -> u32 {
Self::cobalt_bucket(*duration, RTT_BUCKETS, RTT_BUCKET_SIZE, RTT_BUCKET_FLOOR)
}
/// Calculate the bucket index in the poll offset metric.
fn poll_offset_rtt_bucket(duration: &zx::Duration) -> u32 {
Self::cobalt_bucket(
*duration,
POLL_OFFSET_RTT_BUCKETS,
POLL_OFFSET_RTT_BUCKET_SIZE,
POLL_OFFSET_RTT_FLOOR,
)
}
/// Calculate the bucket index for a time duration. Indices follow the rules for Cobalt
/// histograms - bucket 0 is underflow, and num_buckets + 1 is overflow.
fn cobalt_bucket(
duration: zx::Duration,
num_buckets: u32,
bucket_size: zx::Duration,
underflow_floor: zx::Duration,
) -> u32 {
let overflow_threshold = underflow_floor + (bucket_size * num_buckets);
if duration < underflow_floor {
0
} else if duration > overflow_threshold {
num_buckets + 1
} else {
((duration - underflow_floor).into_nanos() / bucket_size.into_nanos()) as u32 + 1
}
}
fn success(&self, sample: &HttpsSample) {
let phase = self.phase.lock();
let mut sender = self.sender.lock();
sender.log_event_count(
HTTPSDATE_BOUND_SIZE_METRIC_ID,
[<Phase as Into<CobaltPhase>>::into(*phase)],
0i64, // period_duration, not used
sample.final_bound_size.into_micros(),
);
let mut bucket_counts = [0u64; RTT_BUCKETS as usize + 2];
for bucket_idx in
sample.polls.iter().map(|poll| Self::round_trip_time_bucket(&poll.round_trip_time))
{
bucket_counts[bucket_idx as usize] += 1;
}
let histogram_buckets = bucket_counts
.iter()
.enumerate()
.filter(|(_, count)| **count > 0)
.map(|(index, count)| HistogramBucket { index: index as u32, count: *count })
.collect::<Vec<_>>();
sender.log_int_histogram(HTTPSDATE_POLL_LATENCY_METRIC_ID, (), histogram_buckets);
let poll_offset_events =
sample.polls.iter().filter_map(|Poll { center_offset, round_trip_time }| {
let offset = center_offset.as_ref()?;
let direction_code = if offset.into_nanos() < 0 {
TimeMetricDimensionDirection::Negative
} else {
TimeMetricDimensionDirection::Positive
};
Some(
CobaltEvent::builder(HTTPSDATE_POLL_OFFSET_METRIC_ID)
.with_event_codes([
Self::poll_offset_rtt_bucket(round_trip_time),
direction_code as u32,
])
.as_count_event(0, offset.into_micros().abs()),
)
});
poll_offset_events.for_each(|event| sender.log_cobalt_event(event));
}
fn phase_update(&self, phase: &Phase) {
*self.phase.lock() = *phase;
}
}
impl Diagnostics for CobaltDiagnostics {
fn record<'a>(&self, event: Event<'a>) {
match event {
Event::NetworkCheckSuccessful => (),
Event::Success(sample) => self.success(sample),
Event::Failure(_) => (), // currently, no failures are registered with cobalt
Event::Phase(phase) => self.phase_update(&phase),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::datatypes::Poll;
use fidl_fuchsia_cobalt::{CobaltEvent, CountEvent, EventPayload};
use futures::{channel::mpsc, stream::StreamExt};
use lazy_static::lazy_static;
use std::{collections::HashSet, iter::FromIterator};
use time_metrics_registry::TimeMetricDimensionRttBucket;
const TEST_INITIAL_PHASE: Phase = Phase::Initial;
const TEST_BOUND_SIZE: zx::Duration = zx::Duration::from_millis(101);
const TEST_STANDARD_DEVIATION: zx::Duration = zx::Duration::from_millis(20);
const ONE_MICROS: zx::Duration = zx::Duration::from_micros(1);
const TEST_CENTER_OFFSET: zx::Duration = zx::Duration::from_micros(20);
const TEST_TIME: zx::Time = zx::Time::from_nanos(123_456_789);
const TEST_RTT_BUCKET: u32 = 2;
const TEST_RTT_2_BUCKET: u32 = 4;
const OVERFLOW_RTT: zx::Duration = zx::Duration::from_seconds(10);
lazy_static! {
static ref TEST_INITIAL_PHASE_COBALT: CobaltPhase = TEST_INITIAL_PHASE.into();
static ref TEST_RTT: zx::Duration =
RTT_BUCKET_FLOOR + RTT_BUCKET_SIZE * TEST_RTT_BUCKET - ONE_MICROS;
static ref TEST_RTT_2: zx::Duration =
RTT_BUCKET_FLOOR + RTT_BUCKET_SIZE * TEST_RTT_2_BUCKET - ONE_MICROS;
static ref TEST_RTT_OFFSET_BUCKET: u32 = ((*TEST_RTT - POLL_OFFSET_RTT_FLOOR).into_nanos()
/ POLL_OFFSET_RTT_BUCKET_SIZE.into_nanos())
as u32
+ 1;
static ref TEST_RTT_2_OFFSET_BUCKET: u32 =
((*TEST_RTT_2 - POLL_OFFSET_RTT_FLOOR).into_nanos()
/ POLL_OFFSET_RTT_BUCKET_SIZE.into_nanos()) as u32
+ 1;
}
/// Create a `CobaltDiagnostics` and a receiver to inspect events it produces.
fn diagnostics_for_test() -> (CobaltDiagnostics, mpsc::Receiver<CobaltEvent>) {
let (send, recv) = mpsc::channel(10);
(
CobaltDiagnostics {
sender: Mutex::new(CobaltSender::new(send)),
phase: Mutex::new(TEST_INITIAL_PHASE),
},
recv,
)
}
#[fuchsia::test]
fn test_round_trip_time_bucket() {
let bucket_1_rtt = RTT_BUCKET_FLOOR + ONE_MICROS;
let bucket_5_rtt_1 = bucket_1_rtt + RTT_BUCKET_SIZE * 4;
let overflow_rtt = RTT_BUCKET_FLOOR + RTT_BUCKET_SIZE * (RTT_BUCKETS + 2);
let overflow_rtt_2 =
RTT_BUCKET_FLOOR + RTT_BUCKET_SIZE * RTT_BUCKETS + zx::Duration::from_minutes(2);
let overflow_adjacent_rtt = RTT_BUCKET_FLOOR + RTT_BUCKET_SIZE * RTT_BUCKETS - ONE_MICROS;
let underflow_rtt = RTT_BUCKET_FLOOR - ONE_MICROS;
assert_eq!(CobaltDiagnostics::round_trip_time_bucket(&bucket_1_rtt), 1);
assert_eq!(CobaltDiagnostics::round_trip_time_bucket(&bucket_5_rtt_1), 5);
assert_eq!(CobaltDiagnostics::round_trip_time_bucket(&overflow_rtt), RTT_BUCKETS + 1);
assert_eq!(CobaltDiagnostics::round_trip_time_bucket(&overflow_rtt_2), RTT_BUCKETS + 1);
assert_eq!(CobaltDiagnostics::round_trip_time_bucket(&overflow_adjacent_rtt), RTT_BUCKETS);
assert_eq!(CobaltDiagnostics::round_trip_time_bucket(&underflow_rtt), 0);
}
#[fuchsia::test]
fn test_offset_rtt_bucket() {
let bucket_1_offset_rtt = POLL_OFFSET_RTT_FLOOR + ONE_MICROS;
let bucket_5_offset_rtt_1 = bucket_1_offset_rtt + POLL_OFFSET_RTT_BUCKET_SIZE * 4;
let overflow_offset_rtt =
POLL_OFFSET_RTT_FLOOR + POLL_OFFSET_RTT_BUCKET_SIZE * (POLL_OFFSET_RTT_BUCKETS + 2);
let overflow_adjacent_offset_rtt = POLL_OFFSET_RTT_FLOOR
+ POLL_OFFSET_RTT_BUCKET_SIZE * POLL_OFFSET_RTT_BUCKETS
- ONE_MICROS;
let underflow_offset_rtt = POLL_OFFSET_RTT_FLOOR - ONE_MICROS;
assert_eq!(CobaltDiagnostics::poll_offset_rtt_bucket(&bucket_1_offset_rtt), 1);
assert_eq!(CobaltDiagnostics::poll_offset_rtt_bucket(&bucket_5_offset_rtt_1), 5);
assert_eq!(
CobaltDiagnostics::poll_offset_rtt_bucket(&overflow_offset_rtt),
POLL_OFFSET_RTT_BUCKETS + 1
);
assert_eq!(
CobaltDiagnostics::poll_offset_rtt_bucket(&overflow_adjacent_offset_rtt),
POLL_OFFSET_RTT_BUCKETS
);
assert_eq!(CobaltDiagnostics::poll_offset_rtt_bucket(&underflow_offset_rtt), 0);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_success_single_poll() {
let (cobalt, event_recv) = diagnostics_for_test();
cobalt.record(Event::Success(&HttpsSample {
utc: TEST_TIME,
monotonic: TEST_TIME,
standard_deviation: TEST_STANDARD_DEVIATION,
final_bound_size: TEST_BOUND_SIZE,
polls: vec![Poll::with_round_trip_time(*TEST_RTT)],
}));
assert_eq!(
event_recv.take(2).collect::<Vec<_>>().await,
vec![
CobaltEvent {
metric_id: HTTPSDATE_BOUND_SIZE_METRIC_ID,
event_codes: vec![*TEST_INITIAL_PHASE_COBALT as u32],
component: None,
payload: EventPayload::EventCount(CountEvent {
period_duration_micros: 0,
count: TEST_BOUND_SIZE.into_micros()
})
},
CobaltEvent {
metric_id: HTTPSDATE_POLL_LATENCY_METRIC_ID,
event_codes: vec![],
component: None,
payload: EventPayload::IntHistogram(vec![HistogramBucket {
index: TEST_RTT_BUCKET,
count: 1
}])
}
]
);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_success_after_phase_update() {
let (cobalt, mut event_recv) = diagnostics_for_test();
cobalt.record(Event::Success(&HttpsSample {
utc: TEST_TIME,
monotonic: TEST_TIME,
standard_deviation: TEST_STANDARD_DEVIATION,
final_bound_size: TEST_BOUND_SIZE,
polls: vec![Poll::with_round_trip_time(*TEST_RTT)],
}));
let events = event_recv.by_ref().take(2).collect::<Vec<_>>().await;
assert_eq!(events[0].event_codes, vec![*TEST_INITIAL_PHASE_COBALT as u32]);
cobalt.record(Event::Phase(Phase::Converge));
cobalt.record(Event::Success(&HttpsSample {
utc: TEST_TIME,
monotonic: TEST_TIME,
standard_deviation: TEST_STANDARD_DEVIATION,
final_bound_size: TEST_BOUND_SIZE,
polls: vec![Poll::with_round_trip_time(*TEST_RTT_2)],
}));
let events = event_recv.take(2).collect::<Vec<_>>().await;
assert_eq!(events[0].event_codes, vec![CobaltPhase::Converge as u32]);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_success_multiple_rtt() {
let (cobalt, event_recv) = diagnostics_for_test();
cobalt.record(Event::Success(&HttpsSample {
utc: TEST_TIME,
monotonic: TEST_TIME,
standard_deviation: TEST_STANDARD_DEVIATION,
final_bound_size: TEST_BOUND_SIZE,
polls: vec![
Poll::with_round_trip_time(*TEST_RTT),
Poll::with_round_trip_time(*TEST_RTT_2),
Poll::with_round_trip_time(*TEST_RTT_2),
],
}));
let mut events = event_recv.take(2).collect::<Vec<_>>().await;
assert_eq!(
events[0],
CobaltEvent {
metric_id: HTTPSDATE_BOUND_SIZE_METRIC_ID,
event_codes: vec![*TEST_INITIAL_PHASE_COBALT as u32],
component: None,
payload: EventPayload::EventCount(CountEvent {
period_duration_micros: 0,
count: TEST_BOUND_SIZE.into_micros()
})
}
);
assert_eq!(events[1].metric_id, HTTPSDATE_POLL_LATENCY_METRIC_ID);
assert!(events[1].event_codes.is_empty());
assert!(events[1].component.is_none());
match events.remove(1).payload {
EventPayload::IntHistogram(buckets) => {
let expected_buckets: HashSet<HistogramBucket> = HashSet::from_iter(vec![
HistogramBucket { index: TEST_RTT_BUCKET, count: 1 },
HistogramBucket { index: TEST_RTT_2_BUCKET, count: 2 },
]);
assert_eq!(expected_buckets, HashSet::from_iter(buckets));
}
p => panic!("Got unexpected payload: {:?}", p),
}
}
#[fuchsia::test(allow_stalls = false)]
async fn test_success_with_offsets() {
let (cobalt, event_recv) = diagnostics_for_test();
let expected_offset = zx::Duration::from_micros(125);
cobalt.record(Event::Success(&HttpsSample {
utc: TEST_TIME,
monotonic: TEST_TIME,
standard_deviation: TEST_STANDARD_DEVIATION,
final_bound_size: TEST_BOUND_SIZE,
polls: vec![
Poll { round_trip_time: *TEST_RTT, center_offset: Some(expected_offset) },
Poll { round_trip_time: *TEST_RTT_2, center_offset: Some(expected_offset * -1) },
],
}));
let events = event_recv.take(4).collect::<Vec<_>>().await;
// Here we rely on test_success_multiple_rtt to test the initial events and only verify
// offset metrics.
assert_eq!(
events[2..],
[
CobaltEvent {
metric_id: HTTPSDATE_POLL_OFFSET_METRIC_ID,
event_codes: vec![
*TEST_RTT_OFFSET_BUCKET,
TimeMetricDimensionDirection::Positive as u32,
],
component: None,
payload: EventPayload::EventCount(CountEvent {
period_duration_micros: 0,
count: expected_offset.into_micros()
})
},
CobaltEvent {
metric_id: HTTPSDATE_POLL_OFFSET_METRIC_ID,
event_codes: vec![
*TEST_RTT_2_OFFSET_BUCKET,
TimeMetricDimensionDirection::Negative as u32,
],
component: None,
payload: EventPayload::EventCount(CountEvent {
period_duration_micros: 0,
count: expected_offset.into_micros()
})
},
]
);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_success_overflow_rtt() {
let (cobalt, event_recv) = diagnostics_for_test();
cobalt.record(Event::Success(&HttpsSample {
utc: TEST_TIME,
monotonic: TEST_TIME,
standard_deviation: TEST_STANDARD_DEVIATION,
final_bound_size: TEST_BOUND_SIZE,
polls: vec![Poll {
round_trip_time: OVERFLOW_RTT,
center_offset: Some(TEST_CENTER_OFFSET),
}],
}));
assert_eq!(
event_recv.take(3).collect::<Vec<_>>().await,
vec![
CobaltEvent {
metric_id: HTTPSDATE_BOUND_SIZE_METRIC_ID,
event_codes: vec![*TEST_INITIAL_PHASE_COBALT as u32],
component: None,
payload: EventPayload::EventCount(CountEvent {
period_duration_micros: 0,
count: TEST_BOUND_SIZE.into_micros()
})
},
CobaltEvent {
metric_id: HTTPSDATE_POLL_LATENCY_METRIC_ID,
event_codes: vec![],
component: None,
payload: EventPayload::IntHistogram(vec![HistogramBucket {
index: RTT_BUCKETS + 1,
count: 1
}])
},
CobaltEvent {
metric_id: HTTPSDATE_POLL_OFFSET_METRIC_ID,
event_codes: vec![
TimeMetricDimensionRttBucket::Overflow as u32,
TimeMetricDimensionDirection::Positive as u32,
],
component: None,
payload: EventPayload::EventCount(CountEvent {
period_duration_micros: 0,
count: TEST_CENTER_OFFSET.into_micros()
})
},
]
);
}
}