// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    anyhow::{anyhow, Context as _, Error},
    fidl_fuchsia_time_external::{self as ftexternal, PushSourceProxy, Status},
    fuchsia_component::client::{launch, launcher, App},
    fuchsia_zircon as zx,
    futures::{
        stream::{Select, Stream},
        FutureExt, StreamExt, TryFutureExt,
    },
    std::{fmt::Debug, pin::Pin, sync::Arc},
    tracing::info,
};

/// A time sample received from a source of time.
#[derive(Debug, PartialEq, Clone, Copy)]
pub struct Sample {
    /// The UTC time.
    pub utc: zx::Time,
    /// The monotonic time at which the UTC was most valid.
    pub monotonic: zx::Time,
    /// The standard deviation of the UTC error.
    pub std_dev: zx::Duration,
}

#[cfg(test)]
impl Sample {
    /// Constructs a new `Sample`.
    pub fn new(utc: zx::Time, monotonic: zx::Time, std_dev: zx::Duration) -> Sample {
        Sample { utc, monotonic, std_dev }
    }
}

/// An event that may be observed from a source of time.
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum Event {
    /// The status of the time source changed.
    StatusChange {
        /// The current status of the time source.
        status: Status,
    },
    /// The time source produced a new time sample.
    Sample(Sample),
}

impl From<Sample> for Event {
    fn from(sample: Sample) -> Event {
        Event::Sample(sample)
    }
}

/// A definition of a time source that may subsequently be launched to create a stream of update
/// events.
pub trait TimeSource: Send + Sync + Debug {
    /// The type of `Stream` produced when launching the `TimeSource`.
    type EventStream: Stream<Item = Result<Event, Error>> + Unpin + Send;

    /// Attempts to launch the time source and return a stream of its time output and status
    /// change events.
    fn launch(&self) -> Result<Self::EventStream, Error>;
}

/// A time source that communicates using the `fuchsia.time.external.PushSource` protocol.
#[derive(Debug)]
pub struct PushTimeSource {
    /// The fully qualified name of the component to launch.
    component: String,
}

/// The `Stream` of events produced by a `PushTimeSource`
type PushTimeSourceEventStream = Select<
    Pin<Box<dyn Stream<Item = Result<Event, Error>> + Send>>,
    Pin<Box<dyn Stream<Item = Result<Event, Error>> + Send>>,
>;

impl PushTimeSource {
    /// Creates a new `PushTimeSource` using the supplied component name.
    pub fn new(component: String) -> Self {
        PushTimeSource { component }
    }

    /// Returns a stream of time output and status change events received using the supplied
    /// `PushSourceProxy`, retaining the optional `App` for the same lifetime.
    fn events_from_proxy(app: Option<App>, proxy: PushSourceProxy) -> PushTimeSourceEventStream {
        // Store the App in a tuple with the PushSourceProxy to ensure it remains in scope.
        let app_and_proxy = Arc::new((app, proxy));
        let app_and_proxy_clone = Arc::clone(&app_and_proxy);

        let status_stream = futures::stream::try_unfold(app_and_proxy, |app_and_proxy| {
            app_and_proxy
                .1
                .watch_status()
                .map_ok(move |status| Some((Event::StatusChange { status }, app_and_proxy)))
                .err_into()
        });

        let sample_stream = futures::stream::try_unfold(app_and_proxy_clone, |app_and_proxy| {
            app_and_proxy.1.watch_sample().map(move |result| match result {
                Ok(sample) => match (sample.utc, sample.monotonic, sample.standard_deviation) {
                    (None, _, _) => Err(anyhow!("sample missing utc")),
                    (_, None, _) => Err(anyhow!("sample missing monotonic")),
                    (_, _, None) => Err(anyhow!("sample missing standard deviation")),
                    (Some(utc), Some(monotonic), Some(std_dev)) => Ok(Some((
                        Event::Sample(Sample {
                            utc: zx::Time::from_nanos(utc),
                            monotonic: zx::Time::from_nanos(monotonic),
                            std_dev: zx::Duration::from_nanos(std_dev),
                        }),
                        app_and_proxy,
                    ))),
                },
                Err(err) => Err(err.into()),
            })
        });

        futures::stream::select(status_stream.boxed(), sample_stream.boxed())
    }
}

impl TimeSource for PushTimeSource {
    type EventStream = PushTimeSourceEventStream;

    fn launch(&self) -> Result<Self::EventStream, Error> {
        let launcher = launcher().context("starting launcher")?;
        info!("Launching PushTimeSource at {}", self.component);
        let app = launch(&launcher, self.component.clone(), None)
            .context(format!("launching push source {}", self.component))?;
        let proxy = app.connect_to_protocol::<ftexternal::PushSourceMarker>()?;
        Ok(PushTimeSource::events_from_proxy(Some(app), proxy))
    }
}

#[cfg(test)]
use {futures::stream, parking_lot::Mutex};

/// A time source that immediately produces a collections of events supplied at construction.
/// The time source may be launched multiple times and will return a different collection of events
/// on each launch. It will return pending after the last event in the last collection, and will
/// terminate the stream after the last event in all other collections. The time source will return
/// an error if asked to launch after the last collection of events has been returned.
#[cfg(test)]
pub struct FakeTimeSource {
    /// The collections of events to return. The TimeSource will return pending after the last
    /// event in the last collection, and will terminate the stream after the last event in all
    /// other collections.
    collections: Mutex<Vec<Vec<Result<Event, Error>>>>,
}

#[cfg(test)]
impl FakeTimeSource {
    /// Creates a new `FakeTimeSource` that produces the supplied single collection of successful
    /// events.
    pub fn events(events: Vec<Event>) -> Self {
        FakeTimeSource {
            collections: Mutex::new(vec![events.into_iter().map(|evt| Ok(evt)).collect()]),
        }
    }

    /// Creates a new `FakeTimeSource` that produces the supplied collections of successful events.
    pub fn event_collections(event_collections: Vec<Vec<Event>>) -> Self {
        FakeTimeSource {
            collections: Mutex::new(
                event_collections
                    .into_iter()
                    .map(|collection| collection.into_iter().map(|evt| Ok(evt)).collect())
                    .collect(),
            ),
        }
    }

    /// Creates a new `FakeTimeSource` that produces the supplied collections of results.
    pub fn result_collections(result_collections: Vec<Vec<Result<Event, Error>>>) -> Self {
        FakeTimeSource { collections: Mutex::new(result_collections) }
    }

    /// Creates a new `FakeTimeSource` that always fails to launch.
    pub fn failing() -> Self {
        FakeTimeSource { collections: Mutex::new(vec![]) }
    }
}

#[cfg(test)]
impl Debug for FakeTimeSource {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        formatter.write_str("FakeTimeSource")
    }
}

#[cfg(test)]
impl TimeSource for FakeTimeSource {
    type EventStream = Pin<Box<dyn Stream<Item = Result<Event, Error>> + Send>>;

    fn launch(&self) -> Result<Self::EventStream, Error> {
        let mut lock = self.collections.lock();
        if lock.is_empty() {
            return Err(anyhow!("FakeTimeSource sent all supplied event collections"));
        }
        let events = lock.remove(0);
        // Return a pending after the last event if this was the last collection.
        if lock.is_empty() {
            Ok(stream::iter(events).chain(stream::pending()).boxed())
        } else {
            Ok(stream::iter(events).boxed())
        }
    }
}

#[cfg(test)]
mod test {
    use {super::*, fidl::prelude::*, fuchsia_async as fasync, lazy_static::lazy_static};

    const STATUS_1: Status = Status::Initializing;
    const SAMPLE_1_UTC_NANOS: i64 = 1234567;
    const SAMPLE_1_MONO_NANOS: i64 = 222;
    const SAMPLE_1_STD_DEV_NANOS: i64 = 8888;

    lazy_static! {
        static ref STATUS_EVENT_1: Event = Event::StatusChange { status: STATUS_1 };
        static ref SAMPLE_EVENT_1: Event = Event::from(Sample {
            utc: zx::Time::from_nanos(SAMPLE_1_UTC_NANOS),
            monotonic: zx::Time::from_nanos(SAMPLE_1_MONO_NANOS),
            std_dev: zx::Duration::from_nanos(SAMPLE_1_STD_DEV_NANOS),
        });
        static ref SAMPLE_EVENT_2: Event = Event::from(Sample {
            utc: zx::Time::from_nanos(12345678),
            monotonic: zx::Time::from_nanos(333),
            std_dev: zx::Duration::from_nanos(9999),
        });
    }

    #[fuchsia::test(allow_stalls = false)]
    async fn single_event_set() -> Result<(), Error> {
        let fake = FakeTimeSource::events(vec![*STATUS_EVENT_1, *SAMPLE_EVENT_1, *SAMPLE_EVENT_2]);
        let mut events = fake.launch().context("Fake should launch without error")?;
        assert_eq!(events.next().await.unwrap().unwrap(), *STATUS_EVENT_1);
        assert_eq!(events.next().await.unwrap().unwrap(), *SAMPLE_EVENT_1);
        assert_eq!(events.next().await.unwrap().unwrap(), *SAMPLE_EVENT_2);
        // Making another call should lead to a stall and hence panic. We don't test this to
        // avoid a degenerate test, but do in fake_no_events_then_pending.
        assert!(fake.launch().is_err());
        Ok(())
    }

    #[fuchsia::test(allow_stalls = false)]
    async fn double_event_set() -> Result<(), Error> {
        let fake = FakeTimeSource::event_collections(vec![
            vec![*STATUS_EVENT_1, *SAMPLE_EVENT_1],
            vec![*SAMPLE_EVENT_2],
        ]);
        let mut events = fake.launch().context("Fake should launch without error")?;
        assert_eq!(events.next().await.unwrap().unwrap(), *STATUS_EVENT_1);
        assert_eq!(events.next().await.unwrap().unwrap(), *SAMPLE_EVENT_1);
        assert!(events.next().await.is_none());
        let mut events = fake.launch().context("Fake should relaunch without error")?;
        assert_eq!(events.next().await.unwrap().unwrap(), *SAMPLE_EVENT_2);
        // Making another call should lead to a stall and hence panic. We don't test this to
        // avoid a degenerate test, but do in fake_no_events_then_pending.
        assert!(fake.launch().is_err());
        Ok(())
    }

    #[fuchsia::test(allow_stalls = false)]
    #[should_panic]
    async fn fake_no_events_then_pending() {
        let fake = FakeTimeSource::events(vec![]);
        let mut events = fake.launch().unwrap();
        // Getting an event from the last collection should never complete, leading to a stall.
        events.next().await;
    }

    #[fuchsia::test]
    fn fake_failing() {
        let fake = FakeTimeSource::failing();
        assert!(fake.launch().is_err());
    }

    #[fuchsia::test]
    fn new_push_time_source() {
        const COMPONENT_NAME: &str = "alfred";
        let time_source = PushTimeSource::new(COMPONENT_NAME.to_string());
        assert_eq!(time_source.component, COMPONENT_NAME);
    }

    #[fuchsia::test]
    async fn push_time_source_events() {
        let (proxy, mut requests) =
            fidl::endpoints::create_proxy_and_stream::<ftexternal::PushSourceMarker>().unwrap();

        let _task = fasync::Task::spawn(async move {
            while let Some(Ok(request)) = requests.next().await {
                match request {
                    ftexternal::PushSourceRequest::WatchStatus { responder, .. } => {
                        responder.send(STATUS_1).unwrap();
                    }
                    ftexternal::PushSourceRequest::WatchSample { responder, .. } => {
                        let sample = ftexternal::TimeSample {
                            utc: Some(SAMPLE_1_UTC_NANOS),
                            monotonic: Some(SAMPLE_1_MONO_NANOS),
                            standard_deviation: Some(SAMPLE_1_STD_DEV_NANOS),
                            ..ftexternal::TimeSample::EMPTY
                        };
                        responder.send(sample).unwrap();
                    }
                    _ => {}
                };
            }
        });

        let mut events = PushTimeSource::events_from_proxy(None, proxy);
        // We expect to receive both events but the ordering is not deterministic.
        let event1 = events.next().await.unwrap().unwrap();
        let event2 = events.next().await.unwrap().unwrap();
        match event1 {
            Event::StatusChange { status: _ } => {
                assert_eq!(event1, *STATUS_EVENT_1);
                assert_eq!(event2, *SAMPLE_EVENT_1);
            }
            Event::Sample(_) => {
                assert_eq!(event1, *SAMPLE_EVENT_1);
                assert_eq!(event2, *STATUS_EVENT_1);
            }
        }
    }

    #[fuchsia::test]
    async fn push_time_source_failure() {
        let (proxy, mut requests) =
            fidl::endpoints::create_proxy_and_stream::<ftexternal::PushSourceMarker>().unwrap();

        let _task = fasync::Task::spawn(async move {
            while let Some(Ok(request)) = requests.next().await {
                // Close the channel on the first watch status request.
                match request {
                    ftexternal::PushSourceRequest::WatchStatus { responder, .. } => {
                        responder.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
                    }
                    _ => {}
                };
            }
        });

        let mut events = PushTimeSource::events_from_proxy(None, proxy);
        assert!(events.next().await.unwrap().is_err());
    }
}
