blob: 999e23d36c03bdb144ab4fbb025743b9f97048f9 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::diagnostics::{Diagnostics, Event},
crate::enums::{Role, SampleValidationError, TimeSourceError},
crate::time_source::{Event as TimeSourceEvent, Sample, TimeSource},
fidl_fuchsia_time_external::Status,
fuchsia_async::{self as fasync, TimeoutExt},
fuchsia_zircon as zx,
futures::{FutureExt as _, StreamExt as _},
std::sync::Arc,
tracing::{error, info, warn},
};
/// Sets the maximum rate at which Timekeeper is willing to accept new updates from a time source in
/// order to limit the Timekeeper resource utilization. This value is also used to apply an upper
/// limit on the monotonic age of time updates.
const MIN_UPDATE_DELAY: zx::Duration = zx::Duration::from_minutes(1);
/// The time to wait before restart after a complete failure of the time source. Many time source
/// failures are likely to repeat so this is useful to limit resource utilization.
const RESTART_DELAY: zx::Duration = zx::Duration::from_minutes(5);
/// How frequently a source that declares itself to be healthy needs to produce updates in order to
/// remain selected. Sources are restarted if they fail to produce updates faster than this.
const SOURCE_KEEPALIVE: zx::Duration = zx::Duration::from_minutes(60);
/// A provider of monotonic times.
pub trait MonotonicProvider: Send + Sync {
/// Returns the current monotonic time.
fn now(&mut self) -> zx::Time;
}
/// A provider of true monotonic times from the kernel.
pub struct KernelMonotonicProvider();
impl MonotonicProvider for KernelMonotonicProvider {
fn now(&mut self) -> zx::Time {
zx::Time::get_monotonic()
}
}
/// A wrapper that launches a time source, validates time updates from the source, and handles
/// relaunching the source in the case of failures.
///
/// In the future `TimeSourceManager` will also handle multiple time sources and the selection
/// between them. Meaning it will manage up to three sources.
pub struct TimeSourceManager<T: TimeSource, D: Diagnostics, M: MonotonicProvider> {
/// The backstop time that samples must not come before.
backstop: zx::Time,
/// Whether the time source restart delay and minimum update delay should be enabled.
delays_enabled: bool,
/// A source of monotonic time.
monotonic: M,
/// The role of the time source being managed.
role: Role,
/// The time source to be managed.
time_source: T,
/// A diagnostics implementation for recording events of note.
diagnostics: Arc<D>,
/// The active event stream, present when the source is currently running.
event_stream: Option<T::EventStream>,
/// The most recent status received from the time source in its current execution.
last_status: Option<Status>,
/// The monotonic time at which the most recently accepted Sample arrived.
last_accepted_sample_arrival: Option<zx::Time>,
}
impl<T: TimeSource, D: Diagnostics> TimeSourceManager<T, D, KernelMonotonicProvider> {
/// Constructs a new `TimeSourceManager` that reads monotonic times from the kernel.
pub fn new(backstop: zx::Time, role: Role, time_source: T, diagnostics: Arc<D>) -> Self {
TimeSourceManager {
backstop,
delays_enabled: true,
monotonic: KernelMonotonicProvider(),
role,
time_source,
diagnostics,
event_stream: None,
last_status: None,
last_accepted_sample_arrival: None,
}
}
/// Constructs a new `TimeSourceManager` that reads monotonic times from the kernel and has
/// the restart delay and minimum update delay set to zero. This makes the behavior more
/// amenable to use in tests.
pub fn new_with_delays_disabled(
backstop: zx::Time,
role: Role,
time_source: T,
diagnostics: Arc<D>,
) -> Self {
let mut manager = Self::new(backstop, role, time_source, diagnostics);
manager.delays_enabled = false;
manager
}
}
impl<T: TimeSource, D: Diagnostics, M: MonotonicProvider> TimeSourceManager<T, D, M> {
/// Returns the `Role` of the time source being managed.
///
/// Note: This method is viable while the `TimeSourceManager` is managing a single time source.
/// Once fallback and gating sources are added role will be moved to a property of each
/// time sample and this method will be removed.
pub fn role(&self) -> Role {
self.role
}
/// Returns the next valid sample from the time source.
pub async fn next_sample(&mut self) -> Sample {
loop {
// Extract the event stream from self if one exists and attempt to start one if not.
let mut event_stream = match self.event_stream.take() {
Some(event_stream) => event_stream,
None => match self.time_source.launch() {
Err(err) => {
error!("Error launching {:?} time source: {:?}", self.role, err);
self.record_time_source_failure(TimeSourceError::LaunchFailed);
if self.delays_enabled {
fasync::Timer::new(fasync::Time::after(RESTART_DELAY)).await;
}
continue;
}
Ok(event_stream) => event_stream,
},
};
// Try to wait for a valid sample from the event stream, inserting the event stream
// back into self for next time if we're successful.
match self.next_sample_from_stream(&mut event_stream).await {
Ok(sample) => {
self.event_stream.replace(event_stream);
return sample;
}
Err(failure) => {
self.record_time_source_failure(failure);
self.last_status = None;
if self.delays_enabled {
fasync::Timer::new(fasync::Time::after(RESTART_DELAY)).await;
}
}
}
}
}
/// Returns the next valid sample from the supplied stream, or an error if the stream
/// encounters a terminal error. The monotonic provider will be queried exactly once to
/// validate every `TimeSourceEvent::Sample` received.
async fn next_sample_from_stream(
&mut self,
event_stream: &mut T::EventStream,
) -> Result<Sample, TimeSourceError> {
loop {
// Time sources whose current status is OK must send new samples (or state
// changes) within SOURCE_KEEPALIVE. This doesn't apply to sources that are not
// OK (e.g. those waiting indefinitely for network availability).
let timeout = match self.last_status {
Some(Status::Ok) => zx::Time::after(SOURCE_KEEPALIVE),
_ => zx::Time::INFINITE,
};
let event = event_stream
.next()
.map(|res| res.ok_or(TimeSourceError::StreamFailed))
.on_timeout(timeout, || Err(TimeSourceError::SampleTimeOut))
.await
.map_err(|err| {
warn!("Error polling stream on {:?}: {:?}", self.role, err);
err
})?
.map_err(|err| {
warn!("Error calling watch on {:?}: {:?}", self.role, err);
TimeSourceError::CallFailed
})?;
match event {
TimeSourceEvent::StatusChange { status } if self.last_status == Some(status) => {
info!("Ignoring repeated {:?} state of {:?}", self.role, status);
}
TimeSourceEvent::StatusChange { status } => {
info!("{:?} changed state to {:?}", self.role, status);
self.diagnostics.record(Event::TimeSourceStatus { role: self.role, status });
self.last_status = Some(status);
}
TimeSourceEvent::Sample(sample) => match self.validate_sample(&sample) {
Ok(arrival) => {
// The current API leaves the potential for a race condition between a
// source declaring itself OK and sending the first sample. Since the
// non-OK states describe reasons a time source is incapable of sending
// samples, we mark a source as OK if we receive a valid sample from any
// other state.
if self.last_status != Some(Status::Ok) {
info!("{:?} setting state to OK on receipt of valid sample", self.role);
self.diagnostics.record(Event::TimeSourceStatus {
role: self.role,
status: Status::Ok,
});
self.last_status = Some(Status::Ok);
}
self.last_accepted_sample_arrival = Some(arrival);
return Ok(sample);
}
Err(error) => {
error!("Rejected invalid sample from {:?}: {:?}", self.role, error);
self.diagnostics.record(Event::SampleRejected { role: self.role, error });
}
},
}
}
}
/// Validates the supplied time sample against the current state. Returns the current
/// monotonic time on success so it may be used as an arrival time.
fn validate_sample(&mut self, sample: &Sample) -> Result<zx::Time, SampleValidationError> {
let current_monotonic = self.monotonic.now();
let earliest_allowed_arrival = match self.last_accepted_sample_arrival {
Some(previous_arrival) if self.delays_enabled => previous_arrival + MIN_UPDATE_DELAY,
_ => zx::Time::INFINITE_PAST,
};
if sample.utc < self.backstop {
Err(SampleValidationError::BeforeBackstop)
} else if sample.monotonic > current_monotonic {
Err(SampleValidationError::MonotonicInFuture)
} else if sample.monotonic < current_monotonic - MIN_UPDATE_DELAY {
Err(SampleValidationError::MonotonicTooOld)
} else if current_monotonic < earliest_allowed_arrival {
Err(SampleValidationError::TooCloseToPrevious)
} else {
Ok(current_monotonic)
}
}
/// Record a time source failure via diagnostics.
fn record_time_source_failure(&self, error: TimeSourceError) {
self.diagnostics.record(Event::TimeSourceFailed { role: self.role, error });
}
}
#[cfg(test)]
mod test {
use {
super::*,
crate::diagnostics::FakeDiagnostics,
crate::enums::{SampleValidationError as SVE, TimeSourceError as TSE},
crate::time_source::FakeTimeSource,
anyhow::anyhow,
};
const BACKSTOP_FACTOR: i64 = 100;
const TEST_ROLE: Role = Role::Monitor;
const STD_DEV: zx::Duration = zx::Duration::from_millis(22);
/// A provider of artificial monotonic times that increment by a fixed duration each call.
struct FakeMonotonicProvider {
increment: zx::Duration,
last_time: zx::Time,
}
impl FakeMonotonicProvider {
/// Constructs a new `FakeMonotonicProvider` that increments by `increment` on each call.
pub fn new(increment: zx::Duration) -> Self {
FakeMonotonicProvider { increment, last_time: zx::Time::ZERO }
}
}
impl MonotonicProvider for FakeMonotonicProvider {
fn now(&mut self) -> zx::Time {
self.last_time += self.increment;
self.last_time
}
}
/// Create a new `TimeSourceManager` using the standard backstop time and role, a monotonic time
/// that increments by `MIN_UPDATE_DELAY` per sample, and the supplied time source and
/// diagnostics.
fn create_manager(
time_source: FakeTimeSource,
diagnostics: Arc<FakeDiagnostics>,
) -> TimeSourceManager<FakeTimeSource, FakeDiagnostics, FakeMonotonicProvider> {
TimeSourceManager {
backstop: zx::Time::ZERO + (MIN_UPDATE_DELAY * BACKSTOP_FACTOR),
delays_enabled: true,
monotonic: FakeMonotonicProvider::new(MIN_UPDATE_DELAY),
role: TEST_ROLE,
time_source,
diagnostics,
event_stream: None,
last_status: None,
last_accepted_sample_arrival: None,
}
}
/// Create a new `TimeSourceManager` using the standard backstop time and role, a monotonic time
/// that increments by `MIN_UPDATE_DELAY` per sample, and the supplied time source and
/// diagnostics. Restart and min update delays are disabled.
fn create_manager_delays_disabled(
time_source: FakeTimeSource,
diagnostics: Arc<FakeDiagnostics>,
) -> TimeSourceManager<FakeTimeSource, FakeDiagnostics, FakeMonotonicProvider> {
TimeSourceManager {
backstop: zx::Time::ZERO + (MIN_UPDATE_DELAY * BACKSTOP_FACTOR),
delays_enabled: false,
monotonic: FakeMonotonicProvider::new(MIN_UPDATE_DELAY),
role: TEST_ROLE,
time_source,
diagnostics,
event_stream: None,
last_status: None,
last_accepted_sample_arrival: None,
}
}
/// Creates a new time sample from the supplied times. Both UTC and Monotonic are supplied as
/// a factor to multiply by MIN_UPDATE_DELAY, which is the minimum interval the manager would
/// accept between samples.rate at hence the rate we choose our fake monotonic clock to tick at.
fn create_sample(utc_factor: i64, monotonic_factor: i64) -> Sample {
Sample {
utc: zx::Time::ZERO + (MIN_UPDATE_DELAY * utc_factor),
monotonic: zx::Time::ZERO + (MIN_UPDATE_DELAY * monotonic_factor),
std_dev: STD_DEV,
}
}
#[fuchsia::test]
fn role_accessor() {
let time_source = FakeTimeSource::failing();
let diagnostics = Arc::new(FakeDiagnostics::new());
let manager = create_manager(time_source, diagnostics);
assert_eq!(manager.role(), TEST_ROLE);
}
#[fuchsia::test(allow_stalls = false)]
async fn event_in_future() {
let time_source = FakeTimeSource::events(vec![
TimeSourceEvent::StatusChange { status: Status::Ok },
TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 1, 1)),
// Should be ignored since monotonic is in the future
TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 2, 20)),
TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 3, 3)),
]);
let diagnostics = Arc::new(FakeDiagnostics::new());
let mut manager = create_manager(time_source, Arc::clone(&diagnostics));
assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 1, 1));
assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 3, 3));
assert_eq!(
manager.last_accepted_sample_arrival,
Some(zx::Time::ZERO + MIN_UPDATE_DELAY * 3)
);
diagnostics.assert_events(&[
Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
Event::SampleRejected { role: TEST_ROLE, error: SVE::MonotonicInFuture },
]);
}
#[fuchsia::test(allow_stalls = false)]
async fn sample_implies_ok() {
let time_source = FakeTimeSource::events(vec![
// Should be accepted even though time source is not currently OK.
TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 1, 1)),
// Should not be recorded since we moved the source to OK on receiving the sample.
TimeSourceEvent::StatusChange { status: Status::Ok },
TimeSourceEvent::StatusChange { status: Status::Network },
// Should be accepted even though time source is not curently OK.
TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 2, 2)),
]);
let diagnostics = Arc::new(FakeDiagnostics::new());
let mut manager = create_manager(time_source, Arc::clone(&diagnostics));
assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 1, 1));
assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 2, 2));
assert_eq!(manager.last_status, Some(Status::Ok));
diagnostics.assert_events(&[
Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Network },
Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
]);
}
#[fuchsia::test]
async fn restart_on_watch_error() {
let time_source = FakeTimeSource::result_collections(vec![
vec![
Ok(TimeSourceEvent::StatusChange { status: Status::Ok }),
Ok(TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 1, 1))),
Err(anyhow!("Walked through wet cement")),
// Should be ignored since Err caused restart.
Ok(TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 2, 2))),
],
vec![
Ok(TimeSourceEvent::StatusChange { status: Status::Ok }),
Ok(TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 3, 2))),
Ok(TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 4, 3))),
],
]);
let diagnostics = Arc::new(FakeDiagnostics::new());
let mut manager = create_manager_delays_disabled(time_source, Arc::clone(&diagnostics));
assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 1, 1));
assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 3, 2));
assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 4, 3));
diagnostics.assert_events(&[
Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
Event::TimeSourceFailed { role: TEST_ROLE, error: TSE::CallFailed },
Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
]);
}
#[fuchsia::test]
async fn restart_on_channel_close() {
let time_source = FakeTimeSource::event_collections(vec![
vec![
TimeSourceEvent::StatusChange { status: Status::Ok },
TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 1, 1)),
],
vec![
TimeSourceEvent::StatusChange { status: Status::Ok },
TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 2, 2)),
],
]);
let diagnostics = Arc::new(FakeDiagnostics::new());
let mut manager = create_manager_delays_disabled(time_source, Arc::clone(&diagnostics));
assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 1, 1));
assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 2, 2));
diagnostics.assert_events(&[
Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
Event::TimeSourceFailed { role: TEST_ROLE, error: TSE::StreamFailed },
Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
]);
}
#[fuchsia::test]
async fn restart_on_launch_failure() {
let time_source = FakeTimeSource::failing();
let diagnostics = Arc::new(FakeDiagnostics::new());
let mut manager = TimeSourceManager::new(
zx::Time::ZERO,
TEST_ROLE,
time_source,
Arc::clone(&diagnostics),
);
// Calling next sample on this manager with the restart delay enabled should lead to
// failed launch and then a few minute cooldown period before relaunch. We test for this by
// verifying a short timeout triggered.
assert_eq!(
manager
.next_sample()
.map(|_| true)
.on_timeout(zx::Time::after(zx::Duration::from_millis(50)), || false)
.await,
false
);
diagnostics.assert_events(&[Event::TimeSourceFailed {
role: TEST_ROLE,
error: TSE::LaunchFailed,
}]);
}
#[fuchsia::test]
fn validate_sample_failures() {
let mut manager =
create_manager(FakeTimeSource::failing(), Arc::new(FakeDiagnostics::new()));
manager.last_status = Some(Status::Ok);
// The monotonic our manager sees will start at a factor of 1 and increment by 1 each time
// we try to validate a sample.
assert_eq!(
manager.validate_sample(&create_sample(BACKSTOP_FACTOR, 1)),
Ok(zx::Time::ZERO + MIN_UPDATE_DELAY)
);
assert_eq!(
manager.validate_sample(&create_sample(BACKSTOP_FACTOR - 1, 2)),
Err(SVE::BeforeBackstop)
);
assert_eq!(
manager.validate_sample(&create_sample(BACKSTOP_FACTOR, 0)),
Err(SVE::MonotonicTooOld)
);
assert_eq!(
manager.validate_sample(&create_sample(BACKSTOP_FACTOR, 100)),
Err(SVE::MonotonicInFuture)
);
// On the next call the monontonic should be a factor of 5, trick the manager into thinking
// it already accepted an update at 4.5
manager.last_accepted_sample_arrival =
Some(zx::Time::from_nanos(MIN_UPDATE_DELAY.into_nanos() / 2 * 9));
assert_eq!(
manager.validate_sample(&create_sample(BACKSTOP_FACTOR, 5)),
Err(SVE::TooCloseToPrevious)
);
// But if we disable delays an accepted update of 5.5 at a monotonic of 6 is accepted.
manager.delays_enabled = false;
manager.last_accepted_sample_arrival =
Some(zx::Time::from_nanos(MIN_UPDATE_DELAY.into_nanos() / 2 * 11));
assert_eq!(
manager.validate_sample(&create_sample(BACKSTOP_FACTOR, 6)),
Ok(zx::Time::ZERO + MIN_UPDATE_DELAY * 6)
);
}
}