// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    crate::diagnostics::{Diagnostics, Event},
    crate::enums::{Role, SampleValidationError, TimeSourceError},
    crate::time_source::{Event as TimeSourceEvent, Sample, TimeSource},
    fidl_fuchsia_time_external::Status,
    fuchsia_async::{self as fasync, TimeoutExt},
    fuchsia_zircon as zx,
    futures::{FutureExt as _, StreamExt as _},
    std::sync::Arc,
    tracing::{error, info, warn},
};

/// Sets the maximum rate at which Timekeeper is willing to accept new updates from a time source in
/// order to limit the Timekeeper resource utilization. This value is also used to apply an upper
/// limit on the monotonic age of time updates.
const MIN_UPDATE_DELAY: zx::Duration = zx::Duration::from_minutes(1);

/// The time to wait before restart after a complete failure of the time source. Many time source
/// failures are likely to repeat so this is useful to limit resource utilization.
const RESTART_DELAY: zx::Duration = zx::Duration::from_minutes(5);

/// How frequently a source that declares itself to be healthy needs to produce updates in order to
/// remain selected. Sources are restarted if they fail to produce updates faster than this.
const SOURCE_KEEPALIVE: zx::Duration = zx::Duration::from_minutes(60);

/// A provider of monotonic times.
pub trait MonotonicProvider: Send + Sync {
    /// Returns the current monotonic time.
    fn now(&mut self) -> zx::Time;
}

/// A provider of true monotonic times from the kernel.
pub struct KernelMonotonicProvider();

impl MonotonicProvider for KernelMonotonicProvider {
    fn now(&mut self) -> zx::Time {
        zx::Time::get_monotonic()
    }
}

/// A wrapper that launches a time source, validates time updates from the source, and handles
/// relaunching the source in the case of failures.
///
/// In the future `TimeSourceManager` will also handle multiple time sources and the selection
/// between them. Meaning it will manage up to three sources.
pub struct TimeSourceManager<T: TimeSource, D: Diagnostics, M: MonotonicProvider> {
    /// The backstop time that samples must not come before.
    backstop: zx::Time,
    /// Whether the time source restart delay and minimum update delay should be enabled.
    delays_enabled: bool,
    /// A source of monotonic time.
    monotonic: M,
    /// The role of the time source being managed.
    role: Role,
    /// The time source to be managed.
    time_source: T,
    /// A diagnostics implementation for recording events of note.
    diagnostics: Arc<D>,

    /// The active event stream, present when the source is currently running.
    event_stream: Option<T::EventStream>,
    /// The most recent status received from the time source in its current execution.
    last_status: Option<Status>,
    /// The monotonic time at which the most recently accepted Sample arrived.
    last_accepted_sample_arrival: Option<zx::Time>,
}

impl<T: TimeSource, D: Diagnostics> TimeSourceManager<T, D, KernelMonotonicProvider> {
    /// Constructs a new `TimeSourceManager` that reads monotonic times from the kernel.
    pub fn new(backstop: zx::Time, role: Role, time_source: T, diagnostics: Arc<D>) -> Self {
        TimeSourceManager {
            backstop,
            delays_enabled: true,
            monotonic: KernelMonotonicProvider(),
            role,
            time_source,
            diagnostics,
            event_stream: None,
            last_status: None,
            last_accepted_sample_arrival: None,
        }
    }

    /// Constructs a new `TimeSourceManager` that reads monotonic times from the kernel and has
    /// the restart delay and minimum update delay set to zero. This makes the behavior more
    /// amenable to use in tests.
    pub fn new_with_delays_disabled(
        backstop: zx::Time,
        role: Role,
        time_source: T,
        diagnostics: Arc<D>,
    ) -> Self {
        let mut manager = Self::new(backstop, role, time_source, diagnostics);
        manager.delays_enabled = false;
        manager
    }
}

impl<T: TimeSource, D: Diagnostics, M: MonotonicProvider> TimeSourceManager<T, D, M> {
    /// Returns the `Role` of the time source being managed.
    ///
    /// Note: This method is viable while the `TimeSourceManager` is managing a single time source.
    ///       Once fallback and gating sources are added role will be moved to a property of each
    ///       time sample and this method will be removed.
    pub fn role(&self) -> Role {
        self.role
    }

    /// Returns the next valid sample from the time source.
    pub async fn next_sample(&mut self) -> Sample {
        loop {
            // Extract the event stream from self if one exists and attempt to start one if not.
            let mut event_stream = match self.event_stream.take() {
                Some(event_stream) => event_stream,
                None => match self.time_source.launch() {
                    Err(err) => {
                        error!("Error launching {:?} time source: {:?}", self.role, err);
                        self.record_time_source_failure(TimeSourceError::LaunchFailed);
                        if self.delays_enabled {
                            fasync::Timer::new(fasync::Time::after(RESTART_DELAY)).await;
                        }
                        continue;
                    }
                    Ok(event_stream) => event_stream,
                },
            };

            // Try to wait for a valid sample from the event stream, inserting the event stream
            // back into self for next time if we're successful.
            match self.next_sample_from_stream(&mut event_stream).await {
                Ok(sample) => {
                    self.event_stream.replace(event_stream);
                    return sample;
                }
                Err(failure) => {
                    self.record_time_source_failure(failure);
                    self.last_status = None;
                    if self.delays_enabled {
                        fasync::Timer::new(fasync::Time::after(RESTART_DELAY)).await;
                    }
                }
            }
        }
    }

    /// Returns the next valid sample from the supplied stream, or an error if the stream
    /// encounters a terminal error. The monotonic provider will be queried exactly once to
    /// validate every `TimeSourceEvent::Sample` received.
    async fn next_sample_from_stream(
        &mut self,
        event_stream: &mut T::EventStream,
    ) -> Result<Sample, TimeSourceError> {
        loop {
            // Time sources whose current status is OK must send new samples (or state
            // changes) within SOURCE_KEEPALIVE. This doesn't apply to sources that are not
            // OK (e.g. those waiting indefinitely for network availability).
            let timeout = match self.last_status {
                Some(Status::Ok) => zx::Time::after(SOURCE_KEEPALIVE),
                _ => zx::Time::INFINITE,
            };

            let event = event_stream
                .next()
                .map(|res| res.ok_or(TimeSourceError::StreamFailed))
                .on_timeout(timeout, || Err(TimeSourceError::SampleTimeOut))
                .await
                .map_err(|err| {
                    warn!("Error polling stream on {:?}: {:?}", self.role, err);
                    err
                })?
                .map_err(|err| {
                    warn!("Error calling watch on {:?}: {:?}", self.role, err);
                    TimeSourceError::CallFailed
                })?;

            match event {
                TimeSourceEvent::StatusChange { status } if self.last_status == Some(status) => {
                    info!("Ignoring repeated {:?} state of {:?}", self.role, status);
                }
                TimeSourceEvent::StatusChange { status } => {
                    info!("{:?} changed state to {:?}", self.role, status);
                    self.diagnostics.record(Event::TimeSourceStatus { role: self.role, status });
                    self.last_status = Some(status);
                }
                TimeSourceEvent::Sample(sample) => match self.validate_sample(&sample) {
                    Ok(arrival) => {
                        // The current API leaves the potential for a race condition between a
                        // source declaring itself OK and sending the first sample. Since the
                        // non-OK states describe reasons a time source is incapable of sending
                        // samples, we mark a source as OK if we receive a valid sample from any
                        // other state.
                        if self.last_status != Some(Status::Ok) {
                            info!("{:?} setting state to OK on receipt of valid sample", self.role);
                            self.diagnostics.record(Event::TimeSourceStatus {
                                role: self.role,
                                status: Status::Ok,
                            });
                            self.last_status = Some(Status::Ok);
                        }
                        self.last_accepted_sample_arrival = Some(arrival);
                        return Ok(sample);
                    }
                    Err(error) => {
                        error!("Rejected invalid sample from {:?}: {:?}", self.role, error);
                        self.diagnostics.record(Event::SampleRejected { role: self.role, error });
                    }
                },
            }
        }
    }

    /// Validates the supplied time sample against the current state. Returns the current
    /// monotonic time on success so it may be used as an arrival time.
    fn validate_sample(&mut self, sample: &Sample) -> Result<zx::Time, SampleValidationError> {
        let current_monotonic = self.monotonic.now();
        let earliest_allowed_arrival = match self.last_accepted_sample_arrival {
            Some(previous_arrival) if self.delays_enabled => previous_arrival + MIN_UPDATE_DELAY,
            _ => zx::Time::INFINITE_PAST,
        };

        if sample.utc < self.backstop {
            Err(SampleValidationError::BeforeBackstop)
        } else if sample.monotonic > current_monotonic {
            Err(SampleValidationError::MonotonicInFuture)
        } else if sample.monotonic < current_monotonic - MIN_UPDATE_DELAY {
            Err(SampleValidationError::MonotonicTooOld)
        } else if current_monotonic < earliest_allowed_arrival {
            Err(SampleValidationError::TooCloseToPrevious)
        } else {
            Ok(current_monotonic)
        }
    }

    /// Record a time source failure via diagnostics.
    fn record_time_source_failure(&self, error: TimeSourceError) {
        self.diagnostics.record(Event::TimeSourceFailed { role: self.role, error });
    }
}

#[cfg(test)]
mod test {
    use {
        super::*,
        crate::diagnostics::FakeDiagnostics,
        crate::enums::{SampleValidationError as SVE, TimeSourceError as TSE},
        crate::time_source::FakeTimeSource,
        anyhow::anyhow,
    };

    const BACKSTOP_FACTOR: i64 = 100;
    const TEST_ROLE: Role = Role::Monitor;
    const STD_DEV: zx::Duration = zx::Duration::from_millis(22);

    /// A provider of artificial monotonic times that increment by a fixed duration each call.
    struct FakeMonotonicProvider {
        increment: zx::Duration,
        last_time: zx::Time,
    }

    impl FakeMonotonicProvider {
        /// Constructs a new `FakeMonotonicProvider` that increments by `increment` on each call.
        pub fn new(increment: zx::Duration) -> Self {
            FakeMonotonicProvider { increment, last_time: zx::Time::ZERO }
        }
    }

    impl MonotonicProvider for FakeMonotonicProvider {
        fn now(&mut self) -> zx::Time {
            self.last_time += self.increment;
            self.last_time
        }
    }

    /// Create a new `TimeSourceManager` using the standard backstop time and role, a monotonic time
    /// that increments by `MIN_UPDATE_DELAY` per sample, and the supplied time source and
    /// diagnostics.
    fn create_manager(
        time_source: FakeTimeSource,
        diagnostics: Arc<FakeDiagnostics>,
    ) -> TimeSourceManager<FakeTimeSource, FakeDiagnostics, FakeMonotonicProvider> {
        TimeSourceManager {
            backstop: zx::Time::ZERO + (MIN_UPDATE_DELAY * BACKSTOP_FACTOR),
            delays_enabled: true,
            monotonic: FakeMonotonicProvider::new(MIN_UPDATE_DELAY),
            role: TEST_ROLE,
            time_source,
            diagnostics,
            event_stream: None,
            last_status: None,
            last_accepted_sample_arrival: None,
        }
    }

    /// Create a new `TimeSourceManager` using the standard backstop time and role, a monotonic time
    /// that increments by `MIN_UPDATE_DELAY` per sample, and the supplied time source and
    /// diagnostics. Restart and min update delays are disabled.
    fn create_manager_delays_disabled(
        time_source: FakeTimeSource,
        diagnostics: Arc<FakeDiagnostics>,
    ) -> TimeSourceManager<FakeTimeSource, FakeDiagnostics, FakeMonotonicProvider> {
        TimeSourceManager {
            backstop: zx::Time::ZERO + (MIN_UPDATE_DELAY * BACKSTOP_FACTOR),
            delays_enabled: false,
            monotonic: FakeMonotonicProvider::new(MIN_UPDATE_DELAY),
            role: TEST_ROLE,
            time_source,
            diagnostics,
            event_stream: None,
            last_status: None,
            last_accepted_sample_arrival: None,
        }
    }

    /// Creates a new time sample from the supplied times. Both UTC and Monotonic are supplied as
    /// a factor to multiply by MIN_UPDATE_DELAY, which is the minimum interval the manager would
    /// accept between samples.rate at hence the rate we choose our fake monotonic clock to tick at.
    fn create_sample(utc_factor: i64, monotonic_factor: i64) -> Sample {
        Sample {
            utc: zx::Time::ZERO + (MIN_UPDATE_DELAY * utc_factor),
            monotonic: zx::Time::ZERO + (MIN_UPDATE_DELAY * monotonic_factor),
            std_dev: STD_DEV,
        }
    }

    #[fuchsia::test]
    fn role_accessor() {
        let time_source = FakeTimeSource::failing();
        let diagnostics = Arc::new(FakeDiagnostics::new());
        let manager = create_manager(time_source, diagnostics);
        assert_eq!(manager.role(), TEST_ROLE);
    }

    #[fuchsia::test(allow_stalls = false)]
    async fn event_in_future() {
        let time_source = FakeTimeSource::events(vec![
            TimeSourceEvent::StatusChange { status: Status::Ok },
            TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 1, 1)),
            // Should be ignored since monotonic is in the future
            TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 2, 20)),
            TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 3, 3)),
        ]);
        let diagnostics = Arc::new(FakeDiagnostics::new());
        let mut manager = create_manager(time_source, Arc::clone(&diagnostics));

        assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 1, 1));
        assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 3, 3));
        assert_eq!(
            manager.last_accepted_sample_arrival,
            Some(zx::Time::ZERO + MIN_UPDATE_DELAY * 3)
        );

        diagnostics.assert_events(&[
            Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
            Event::SampleRejected { role: TEST_ROLE, error: SVE::MonotonicInFuture },
        ]);
    }

    #[fuchsia::test(allow_stalls = false)]
    async fn sample_implies_ok() {
        let time_source = FakeTimeSource::events(vec![
            // Should be accepted even though time source is not currently OK.
            TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 1, 1)),
            // Should not be recorded since we moved the source to OK on receiving the sample.
            TimeSourceEvent::StatusChange { status: Status::Ok },
            TimeSourceEvent::StatusChange { status: Status::Network },
            // Should be accepted even though time source is not curently OK.
            TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 2, 2)),
        ]);
        let diagnostics = Arc::new(FakeDiagnostics::new());
        let mut manager = create_manager(time_source, Arc::clone(&diagnostics));

        assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 1, 1));
        assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 2, 2));
        assert_eq!(manager.last_status, Some(Status::Ok));

        diagnostics.assert_events(&[
            Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
            Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Network },
            Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
        ]);
    }

    #[fuchsia::test]
    async fn restart_on_watch_error() {
        let time_source = FakeTimeSource::result_collections(vec![
            vec![
                Ok(TimeSourceEvent::StatusChange { status: Status::Ok }),
                Ok(TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 1, 1))),
                Err(anyhow!("Walked through wet cement")),
                // Should be ignored since Err caused restart.
                Ok(TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 2, 2))),
            ],
            vec![
                Ok(TimeSourceEvent::StatusChange { status: Status::Ok }),
                Ok(TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 3, 2))),
                Ok(TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 4, 3))),
            ],
        ]);
        let diagnostics = Arc::new(FakeDiagnostics::new());
        let mut manager = create_manager_delays_disabled(time_source, Arc::clone(&diagnostics));

        assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 1, 1));
        assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 3, 2));
        assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 4, 3));

        diagnostics.assert_events(&[
            Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
            Event::TimeSourceFailed { role: TEST_ROLE, error: TSE::CallFailed },
            Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
        ]);
    }

    #[fuchsia::test]
    async fn restart_on_channel_close() {
        let time_source = FakeTimeSource::event_collections(vec![
            vec![
                TimeSourceEvent::StatusChange { status: Status::Ok },
                TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 1, 1)),
            ],
            vec![
                TimeSourceEvent::StatusChange { status: Status::Ok },
                TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 2, 2)),
            ],
        ]);
        let diagnostics = Arc::new(FakeDiagnostics::new());
        let mut manager = create_manager_delays_disabled(time_source, Arc::clone(&diagnostics));

        assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 1, 1));
        assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 2, 2));

        diagnostics.assert_events(&[
            Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
            Event::TimeSourceFailed { role: TEST_ROLE, error: TSE::StreamFailed },
            Event::TimeSourceStatus { role: TEST_ROLE, status: Status::Ok },
        ]);
    }

    #[fuchsia::test]
    async fn restart_on_launch_failure() {
        let time_source = FakeTimeSource::failing();
        let diagnostics = Arc::new(FakeDiagnostics::new());
        let mut manager = TimeSourceManager::new(
            zx::Time::ZERO,
            TEST_ROLE,
            time_source,
            Arc::clone(&diagnostics),
        );

        // Calling next sample on this manager with the restart delay enabled should lead to
        // failed launch and then a few minute cooldown period before relaunch. We test for this by
        // verifying a short timeout triggered.
        assert_eq!(
            manager
                .next_sample()
                .map(|_| true)
                .on_timeout(zx::Time::after(zx::Duration::from_millis(50)), || false)
                .await,
            false
        );

        diagnostics.assert_events(&[Event::TimeSourceFailed {
            role: TEST_ROLE,
            error: TSE::LaunchFailed,
        }]);
    }

    #[fuchsia::test]
    fn validate_sample_failures() {
        let mut manager =
            create_manager(FakeTimeSource::failing(), Arc::new(FakeDiagnostics::new()));
        manager.last_status = Some(Status::Ok);

        // The monotonic our manager sees will start at a factor of 1 and increment by 1 each time
        // we try to validate a sample.
        assert_eq!(
            manager.validate_sample(&create_sample(BACKSTOP_FACTOR, 1)),
            Ok(zx::Time::ZERO + MIN_UPDATE_DELAY)
        );
        assert_eq!(
            manager.validate_sample(&create_sample(BACKSTOP_FACTOR - 1, 2)),
            Err(SVE::BeforeBackstop)
        );
        assert_eq!(
            manager.validate_sample(&create_sample(BACKSTOP_FACTOR, 0)),
            Err(SVE::MonotonicTooOld)
        );
        assert_eq!(
            manager.validate_sample(&create_sample(BACKSTOP_FACTOR, 100)),
            Err(SVE::MonotonicInFuture)
        );
        // On the next call the monontonic should be a factor of 5, trick the manager into thinking
        // it already accepted an update at 4.5
        manager.last_accepted_sample_arrival =
            Some(zx::Time::from_nanos(MIN_UPDATE_DELAY.into_nanos() / 2 * 9));
        assert_eq!(
            manager.validate_sample(&create_sample(BACKSTOP_FACTOR, 5)),
            Err(SVE::TooCloseToPrevious)
        );
        // But if we disable delays an accepted update of 5.5 at a monotonic of 6 is accepted.
        manager.delays_enabled = false;
        manager.last_accepted_sample_arrival =
            Some(zx::Time::from_nanos(MIN_UPDATE_DELAY.into_nanos() / 2 * 11));
        assert_eq!(
            manager.validate_sample(&create_sample(BACKSTOP_FACTOR, 6)),
            Ok(zx::Time::ZERO + MIN_UPDATE_DELAY * 6)
        );
    }
}
