// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    crate::{
        diagnostics::{Diagnostics, Event},
        enums::{
            ClockCorrectionStrategy, ClockUpdateReason, InitialClockState, StartClockSource, Track,
        },
        MonitorTrack, PrimaryTrack, TimeSource,
    },
    fuchsia_async as fasync,
    fuchsia_cobalt::{CobaltConnector, CobaltSender, ConnectionType},
    fuchsia_zircon as zx,
    parking_lot::Mutex,
    std::sync::Arc,
    time_metrics_registry::{
        RealTimeClockEventsMetricDimensionEventType as RtcEvent,
        TimeMetricDimensionDirection as Direction, TimeMetricDimensionExperiment as Experiment,
        TimeMetricDimensionRole as CobaltRole, TimeMetricDimensionTrack as CobaltTrack,
        TimekeeperLifecycleEventsMetricDimensionEventType as LifecycleEvent,
        TimekeeperTimeSourceEventsMetricDimensionEventType as TimeSourceEvent,
        TimekeeperTrackEventsMetricDimensionEventType as TrackEvent,
        REAL_TIME_CLOCK_EVENTS_METRIC_ID, TIMEKEEPER_CLOCK_CORRECTION_METRIC_ID,
        TIMEKEEPER_LIFECYCLE_EVENTS_METRIC_ID, TIMEKEEPER_MONITOR_DIFFERENCE_METRIC_ID,
        TIMEKEEPER_SQRT_COVARIANCE_METRIC_ID, TIMEKEEPER_TIME_SOURCE_EVENTS_METRIC_ID,
        TIMEKEEPER_TRACK_EVENTS_METRIC_ID,
    },
    time_util::time_at_monotonic,
};

/// The period duration in micros. This field is required for Cobalt 1.0 EVENT_COUNT but not used.
const PERIOD_DURATION: i64 = 0;

/// A connection to the real Cobalt service.
pub struct CobaltDiagnostics {
    /// The wrapped CobaltSender used to log metrics.
    sender: Mutex<CobaltSender>,
    /// The experiment to record on all experiment-based events.
    experiment: Experiment,
    /// The UTC clock used in the primary track.
    primary_clock: Arc<zx::Clock>,
    /// The UTC clock used in the monitor track.
    monitor_clock: Option<Arc<zx::Clock>>,
    // TODO(fxbug.dev/57677): Move back to an owned fasync::Task instead of detaching the spawned
    // Task once the lifecycle of timekeeper ensures CobaltDiagnostics objects will last long enough
    // to finish their logging.
}

impl CobaltDiagnostics {
    /// Contructs a new `CobaltDiagnostics` instance.
    pub(crate) fn new<T: TimeSource>(
        experiment: Experiment,
        primary: &PrimaryTrack<T>,
        optional_monitor: &Option<MonitorTrack<T>>,
    ) -> Self {
        let (sender, fut) = CobaltConnector::default()
            .serve(ConnectionType::project_id(time_metrics_registry::PROJECT_ID));
        fasync::Task::spawn(fut).detach();
        Self {
            sender: Mutex::new(sender),
            experiment,
            primary_clock: Arc::clone(&primary.clock),
            monitor_clock: optional_monitor.as_ref().map(|track| Arc::clone(&track.clock)),
        }
    }

    /// Records an update to the Kalman filter state, including an event and a covariance report.
    fn record_kalman_filter_update(&self, track: Track, sqrt_covariance: zx::Duration) {
        let mut locked_sender = self.sender.lock();
        let cobalt_track = Into::<CobaltTrack>::into(track);
        locked_sender.log_event_count(
            TIMEKEEPER_TRACK_EVENTS_METRIC_ID,
            (TrackEvent::EstimatedOffsetUpdated, cobalt_track, self.experiment),
            PERIOD_DURATION,
            1,
        );
        locked_sender.log_event_count(
            TIMEKEEPER_SQRT_COVARIANCE_METRIC_ID,
            (Into::<CobaltTrack>::into(track), self.experiment),
            PERIOD_DURATION,
            // Unfortunately Cobalt does not follow the standard of nanoseconds everywhere.
            sqrt_covariance.into_micros(),
        );
    }

    /// Records a clock correction, including an event and a report on the magnitude of change.
    fn record_clock_correction(
        &self,
        track: Track,
        correction: zx::Duration,
        strategy: ClockCorrectionStrategy,
    ) {
        let mut locked_sender = self.sender.lock();
        let cobalt_track = Into::<CobaltTrack>::into(track);
        let direction =
            if correction.into_nanos() >= 0 { Direction::Positive } else { Direction::Negative };
        locked_sender.log_event_count(
            TIMEKEEPER_TRACK_EVENTS_METRIC_ID,
            (Into::<TrackEvent>::into(strategy), cobalt_track, self.experiment),
            PERIOD_DURATION,
            1,
        );
        locked_sender.log_event_count(
            TIMEKEEPER_CLOCK_CORRECTION_METRIC_ID,
            (direction, cobalt_track, self.experiment),
            PERIOD_DURATION,
            // Unfortunately Cobalt does not follow the standard of nanoseconds everywhere.
            correction.into_micros().abs(),
        );
    }

    /// Records relevant information following a clock update.
    ///
    /// All updates record the reason, an update to the monitor track additionally records the
    /// difference between the monitor and primary clocks.
    fn record_clock_update(&self, track: Track, reason: ClockUpdateReason) {
        let mut locked_sender = self.sender.lock();
        locked_sender.log_event_count(
            TIMEKEEPER_TRACK_EVENTS_METRIC_ID,
            (Into::<TrackEvent>::into(reason), Into::<CobaltTrack>::into(track), self.experiment),
            PERIOD_DURATION,
            1,
        );
        if track == Track::Monitor {
            if let Some(monitor_clock) = self.monitor_clock.as_ref() {
                let monotonic_ref = zx::Time::get_monotonic();
                let primary = time_at_monotonic(&self.primary_clock, monotonic_ref);
                let monitor = time_at_monotonic(monitor_clock, monotonic_ref);
                let direction =
                    if monitor >= primary { Direction::Positive } else { Direction::Negative };
                locked_sender.log_event_count(
                    TIMEKEEPER_MONITOR_DIFFERENCE_METRIC_ID,
                    (direction, self.experiment),
                    PERIOD_DURATION,
                    // Unfortunately Cobalt does not follow the standard of nanoseconds everywhere.
                    (monitor - primary).into_micros().abs(),
                );
            }
        }
    }
}

impl Diagnostics for CobaltDiagnostics {
    fn record(&self, event: Event) {
        match event {
            Event::Initialized { clock_state } => {
                let event = match clock_state {
                    InitialClockState::NotSet => LifecycleEvent::InitializedBeforeUtcStart,
                    InitialClockState::PreviouslySet => LifecycleEvent::InitializedAfterUtcStart,
                };
                self.sender.lock().log_event(TIMEKEEPER_LIFECYCLE_EVENTS_METRIC_ID, event);
            }
            Event::InitializeRtc { outcome, .. } => {
                self.sender
                    .lock()
                    .log_event(REAL_TIME_CLOCK_EVENTS_METRIC_ID, Into::<RtcEvent>::into(outcome));
            }
            Event::TimeSourceFailed { role, error } => {
                let event = Into::<TimeSourceEvent>::into(error);
                self.sender.lock().log_event_count(
                    TIMEKEEPER_TIME_SOURCE_EVENTS_METRIC_ID,
                    (event, Into::<CobaltRole>::into(role), self.experiment),
                    PERIOD_DURATION,
                    1,
                );
            }
            Event::TimeSourceStatus { .. } => {}
            Event::SampleRejected { role, error } => {
                let event = Into::<TimeSourceEvent>::into(error);
                self.sender.lock().log_event_count(
                    TIMEKEEPER_TIME_SOURCE_EVENTS_METRIC_ID,
                    (event, Into::<CobaltRole>::into(role), self.experiment),
                    PERIOD_DURATION,
                    1,
                );
            }
            Event::KalmanFilterUpdated { track, sqrt_covariance, .. } => {
                self.record_kalman_filter_update(track, sqrt_covariance);
            }
            Event::ClockCorrection { track, correction, strategy } => {
                self.record_clock_correction(track, correction, strategy);
            }
            Event::WriteRtc { outcome } => {
                self.sender
                    .lock()
                    .log_event(REAL_TIME_CLOCK_EVENTS_METRIC_ID, Into::<RtcEvent>::into(outcome));
            }
            Event::StartClock { track, source } => {
                if track == Track::Primary {
                    let event = match source {
                        StartClockSource::Rtc => LifecycleEvent::StartedUtcFromRtc,
                        StartClockSource::External(_) => LifecycleEvent::StartedUtcFromTimeSource,
                    };
                    self.sender.lock().log_event(TIMEKEEPER_LIFECYCLE_EVENTS_METRIC_ID, event);
                }
            }
            Event::UpdateClock { track, reason } => {
                self.record_clock_update(track, reason);
            }
        }
    }
}

#[cfg(test)]
mod test {
    use {
        super::*,
        crate::enums::{
            ClockUpdateReason, InitializeRtcOutcome, Role, SampleValidationError, TimeSourceError,
            WriteRtcOutcome,
        },
        fidl_fuchsia_cobalt::{CobaltEvent, CountEvent, Event as EmptyEvent, EventPayload},
        futures::{channel::mpsc, FutureExt, StreamExt},
        test_util::{assert_geq, assert_leq},
    };

    const TEST_EXPERIMENT: Experiment = Experiment::B;
    const MONITOR_OFFSET: zx::Duration = zx::Duration::from_seconds(444);

    fn create_clock(time: zx::Time) -> Arc<zx::Clock> {
        let clk = zx::Clock::create(zx::ClockOpts::empty(), None).unwrap();
        clk.update(zx::ClockUpdate::new().value(time)).unwrap();
        Arc::new(clk)
    }

    /// Creates a test CobaltDiagnostics and an mpsc receiver that may be used to verify the
    /// events it sends. The primary and monitor clocks will have a difference of MONITOR_OFFSET.
    fn create_test_object() -> (CobaltDiagnostics, mpsc::Receiver<CobaltEvent>) {
        let (mpsc_sender, mpsc_receiver) = futures::channel::mpsc::channel(1);
        let sender = CobaltSender::new(mpsc_sender);
        let diagnostics = CobaltDiagnostics {
            sender: Mutex::new(sender),
            experiment: TEST_EXPERIMENT,
            primary_clock: create_clock(zx::Time::ZERO),
            monitor_clock: Some(create_clock(zx::Time::ZERO + MONITOR_OFFSET)),
        };
        (diagnostics, mpsc_receiver)
    }

    /// Creates an `EventPayload` containing the supplied count.
    fn event_count_payload(count: i64) -> EventPayload {
        EventPayload::EventCount(CountEvent { period_duration_micros: 0, count })
    }

    #[fuchsia::test(allow_stalls = false)]
    async fn record_initialization_events() {
        let (diagnostics, mut mpsc_receiver) = create_test_object();

        diagnostics.record(Event::Initialized { clock_state: InitialClockState::NotSet });
        assert_eq!(
            mpsc_receiver.next().await,
            Some(CobaltEvent {
                metric_id: time_metrics_registry::TIMEKEEPER_LIFECYCLE_EVENTS_METRIC_ID,
                event_codes: vec![LifecycleEvent::InitializedBeforeUtcStart as u32],
                component: None,
                payload: EventPayload::Event(EmptyEvent),
            })
        );
    }

    #[fuchsia::test(allow_stalls = false)]
    async fn record_clock_events() {
        let (diagnostics, mut mpsc_receiver) = create_test_object();

        diagnostics.record(Event::StartClock {
            track: Track::Monitor,
            source: StartClockSource::External(Role::Monitor),
        });
        diagnostics.record(Event::StartClock {
            track: Track::Primary,
            source: StartClockSource::External(Role::Primary),
        });
        assert_eq!(
            mpsc_receiver.next().await,
            Some(CobaltEvent {
                metric_id: time_metrics_registry::TIMEKEEPER_LIFECYCLE_EVENTS_METRIC_ID,
                event_codes: vec![LifecycleEvent::StartedUtcFromTimeSource as u32],
                component: None,
                payload: EventPayload::Event(EmptyEvent),
            })
        );
    }

    #[fuchsia::test(allow_stalls = false)]
    async fn record_rtc_events() {
        let (diagnostics, mut mpsc_receiver) = create_test_object();

        diagnostics
            .record(Event::InitializeRtc { outcome: InitializeRtcOutcome::Succeeded, time: None });
        assert_eq!(
            mpsc_receiver.next().await,
            Some(CobaltEvent {
                metric_id: time_metrics_registry::REAL_TIME_CLOCK_EVENTS_METRIC_ID,
                event_codes: vec![RtcEvent::ReadSucceeded as u32],
                component: None,
                payload: EventPayload::Event(EmptyEvent),
            })
        );

        diagnostics.record(Event::WriteRtc { outcome: WriteRtcOutcome::Failed });
        assert_eq!(
            mpsc_receiver.next().await,
            Some(CobaltEvent {
                metric_id: time_metrics_registry::REAL_TIME_CLOCK_EVENTS_METRIC_ID,
                event_codes: vec![RtcEvent::WriteFailed as u32],
                component: None,
                payload: EventPayload::Event(EmptyEvent),
            })
        );
    }

    #[fuchsia::test(allow_stalls = false)]
    async fn record_time_source_events() {
        let (diagnostics, mut mpsc_receiver) = create_test_object();

        diagnostics.record(Event::SampleRejected {
            role: Role::Primary,
            error: SampleValidationError::MonotonicTooOld,
        });
        assert_eq!(
            mpsc_receiver.next().await,
            Some(CobaltEvent {
                metric_id: time_metrics_registry::TIMEKEEPER_TIME_SOURCE_EVENTS_METRIC_ID,
                event_codes: vec![
                    TimeSourceEvent::SampleRejectedMonotonicTooOld as u32,
                    CobaltRole::Primary as u32,
                    TEST_EXPERIMENT as u32
                ],
                component: None,
                payload: event_count_payload(1),
            })
        );

        diagnostics.record(Event::TimeSourceFailed {
            role: Role::Monitor,
            error: TimeSourceError::CallFailed,
        });
        assert_eq!(
            mpsc_receiver.next().await,
            Some(CobaltEvent {
                metric_id: time_metrics_registry::TIMEKEEPER_TIME_SOURCE_EVENTS_METRIC_ID,
                event_codes: vec![
                    TimeSourceEvent::RestartedCallFailed as u32,
                    CobaltRole::Monitor as u32,
                    TEST_EXPERIMENT as u32
                ],
                component: None,
                payload: event_count_payload(1),
            })
        );
    }

    #[fuchsia::test(allow_stalls = false)]
    async fn record_time_track_events() {
        let (diagnostics, mut mpsc_receiver) = create_test_object();

        diagnostics.record(Event::KalmanFilterUpdated {
            track: Track::Primary,
            monotonic: zx::Time::from_nanos(333_000_000_000),
            utc: zx::Time::from_nanos(4455445544_000_000_000),
            sqrt_covariance: zx::Duration::from_micros(55555),
        });
        assert_eq!(
            mpsc_receiver.next().await,
            Some(CobaltEvent {
                metric_id: time_metrics_registry::TIMEKEEPER_TRACK_EVENTS_METRIC_ID,
                event_codes: vec![
                    TrackEvent::EstimatedOffsetUpdated as u32,
                    CobaltTrack::Primary as u32,
                    TEST_EXPERIMENT as u32
                ],
                component: None,
                payload: event_count_payload(1),
            })
        );
        assert_eq!(
            mpsc_receiver.next().await,
            Some(CobaltEvent {
                metric_id: time_metrics_registry::TIMEKEEPER_SQRT_COVARIANCE_METRIC_ID,
                event_codes: vec![CobaltTrack::Primary as u32, TEST_EXPERIMENT as u32],
                component: None,
                payload: event_count_payload(55555),
            })
        );

        diagnostics.record(Event::ClockCorrection {
            track: Track::Monitor,
            correction: zx::Duration::from_micros(-777),
            strategy: ClockCorrectionStrategy::NominalRateSlew,
        });
        assert_eq!(
            mpsc_receiver.next().await,
            Some(CobaltEvent {
                metric_id: time_metrics_registry::TIMEKEEPER_TRACK_EVENTS_METRIC_ID,
                event_codes: vec![
                    TrackEvent::CorrectionByNominalRateSlew as u32,
                    CobaltTrack::Monitor as u32,
                    TEST_EXPERIMENT as u32
                ],
                component: None,
                payload: event_count_payload(1),
            })
        );
        assert_eq!(
            mpsc_receiver.next().await,
            Some(CobaltEvent {
                metric_id: time_metrics_registry::TIMEKEEPER_CLOCK_CORRECTION_METRIC_ID,
                event_codes: vec![
                    Direction::Negative as u32,
                    CobaltTrack::Monitor as u32,
                    TEST_EXPERIMENT as u32
                ],
                component: None,
                payload: event_count_payload(777),
            })
        );
    }

    #[fuchsia::test(allow_stalls = false)]
    async fn record_update_clock_events() {
        let (diagnostics, mut mpsc_receiver) = create_test_object();

        // Updates to the primary track should only log the reason.
        diagnostics.record(Event::UpdateClock {
            track: Track::Primary,
            reason: ClockUpdateReason::TimeStep,
        });
        assert_eq!(
            mpsc_receiver.next().await,
            Some(CobaltEvent {
                metric_id: time_metrics_registry::TIMEKEEPER_TRACK_EVENTS_METRIC_ID,
                event_codes: vec![
                    TrackEvent::ClockUpdateTimeStep as u32,
                    CobaltTrack::Primary as u32,
                    TEST_EXPERIMENT as u32
                ],
                component: None,
                payload: event_count_payload(1),
            })
        );
        assert!(mpsc_receiver.next().now_or_never().is_none());

        // Updates to the monitor track should lead to an update event and a difference report.
        // Unfortunately the latter is cumbersome to verify since we don't know the exact clock
        // difference.
        diagnostics.record(Event::UpdateClock {
            track: Track::Monitor,
            reason: ClockUpdateReason::BeginSlew,
        });
        assert_eq!(
            mpsc_receiver.next().await,
            Some(CobaltEvent {
                metric_id: time_metrics_registry::TIMEKEEPER_TRACK_EVENTS_METRIC_ID,
                event_codes: vec![
                    TrackEvent::ClockUpdateBeginSlew as u32,
                    CobaltTrack::Monitor as u32,
                    TEST_EXPERIMENT as u32
                ],
                component: None,
                payload: event_count_payload(1),
            })
        );
        let event = mpsc_receiver.next().await.unwrap();
        assert_eq!(event.metric_id, TIMEKEEPER_MONITOR_DIFFERENCE_METRIC_ID);
        assert_eq!(event.event_codes, vec![Direction::Positive as u32, TEST_EXPERIMENT as u32]);
        match event.payload {
            EventPayload::EventCount(CountEvent { period_duration_micros, count }) => {
                assert_eq!(period_duration_micros, 0);
                assert_geq!(count, MONITOR_OFFSET.into_micros() - 5000);
                assert_leq!(count, MONITOR_OFFSET.into_micros() + 5000);
            }
            _ => panic!("monitor clock update did not produce event count payload"),
        }
    }
}
