// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    anyhow::anyhow,
    event_queue::{ControlHandle, EventQueue, Notify},
    fidl_fuchsia_update_ext::State,
    fuchsia_inspect_contrib::inspectable::InspectableDebugString,
    fuchsia_syslog::{fx_log_err, fx_log_warn},
    futures::prelude::*,
    std::time::Duration,
};

pub trait StateNotifier: Notify<Event = State> + Send + Sync + 'static {}

impl<T> StateNotifier for T where T: Notify<Event = State> + Send + Sync + 'static {}

#[derive(Debug)]
pub struct UpdateMonitor<N>
where
    N: StateNotifier,
{
    temporary_queue: ControlHandle<N>,
    update_state: InspectableDebugString<Option<State>>,
    version_available: InspectableDebugString<Option<String>>,
    inspect_node: fuchsia_inspect::Node,
}

impl<N> UpdateMonitor<N>
where
    N: StateNotifier,
{
    pub fn from_inspect_node(node: fuchsia_inspect::Node) -> (impl Future<Output = ()>, Self) {
        let (temporary_fut, temporary_queue) = EventQueue::new();
        (
            temporary_fut,
            UpdateMonitor {
                temporary_queue,
                update_state: InspectableDebugString::new(None, &node, "update-state"),
                version_available: InspectableDebugString::new(None, &node, "version-available"),
                inspect_node: node,
            },
        )
    }

    #[cfg(test)]
    pub fn new() -> (impl Future<Output = ()>, Self) {
        Self::from_inspect_node(
            fuchsia_inspect::Inspector::new().root().create_child("test-update-monitor-root-node"),
        )
    }

    pub async fn add_temporary_callback(&mut self, callback: N) {
        if let Err(e) = self.temporary_queue.add_client(callback).await {
            fx_log_err!("error adding client to temporary queue: {:#}", anyhow!(e))
        }
    }

    pub async fn advance_update_state(&mut self, next_update_state: State) {
        *self.update_state.get_mut() = Some(next_update_state.clone());
        if let Err(e) = self.temporary_queue.queue_event(next_update_state).await {
            fx_log_warn!("error sending state to temporary queue: {:#}", anyhow!(e))
        }
    }

    pub async fn clear(&mut self) {
        *self.version_available.get_mut() = None;
        if let Err(e) = self.temporary_queue.clear().await {
            fx_log_warn!("error clearing clients of temporary queue: {:#}", anyhow!(e))
        }
    }

    pub async fn try_flush(&mut self) {
        match self.temporary_queue.try_flush(Duration::from_secs(5)).await {
            Ok(flush_future) => {
                if let Err(e) = flush_future.await {
                    fx_log_warn!("Timed out flushing temporary queue: {:#}", anyhow!(e));
                }
            }
            Err(e) => {
                fx_log_warn!("error trying to flush temporary queue: {:#}", anyhow!(e));
            }
        }
    }

    pub fn set_version_available(&mut self, version_available: String) {
        *self.version_available.get_mut() = Some(version_available);
    }

    #[cfg(test)]
    pub fn get_version_available(&self) -> Option<String> {
        self.version_available.clone()
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use event_queue::{ClosedClient, Event, Notify};
    use fidl_fuchsia_update_ext::random_version_available;
    use fuchsia_async as fasync;
    use fuchsia_zircon as zx;
    use futures::{channel::mpsc, future::BoxFuture, pin_mut, task::Poll};
    use parking_lot::Mutex;
    use proptest::prelude::*;
    use std::sync::Arc;

    const VERSION_AVAILABLE: &str = "fake-version-available";

    #[derive(Clone, Debug)]
    struct FakeStateNotifier {
        states: Arc<Mutex<Vec<State>>>,
    }
    impl FakeStateNotifier {
        fn new() -> Self {
            Self { states: Arc::new(Mutex::new(vec![])) }
        }
    }
    impl Notify for FakeStateNotifier {
        type Event = State;
        type NotifyFuture = future::Ready<Result<(), ClosedClient>>;
        fn notify(&self, state: State) -> Self::NotifyFuture {
            self.states.lock().push(state);
            future::ready(Ok(()))
        }
    }

    struct MpscNotifier<T> {
        sender: mpsc::Sender<T>,
    }

    impl<T> Notify for MpscNotifier<T>
    where
        T: Event + Send + 'static,
    {
        type Event = T;
        type NotifyFuture = BoxFuture<'static, Result<(), ClosedClient>>;

        fn notify(&self, event: T) -> BoxFuture<'static, Result<(), ClosedClient>> {
            let mut sender = self.sender.clone();
            async move { sender.send(event).map(|result| result.map_err(|_| ClosedClient)).await }
                .boxed()
        }
    }

    async fn random_update_monitor<N: StateNotifier>(
        update_state: Option<State>,
        version_available: Option<String>,
    ) -> UpdateMonitor<N> {
        let (fut, mut mms) = UpdateMonitor::<N>::new();
        fasync::Task::spawn(fut).detach();
        version_available.map(|s| mms.set_version_available(s));
        if let Some(update_state) = update_state {
            mms.advance_update_state(update_state).await;
        }
        mms
    }

    async fn wait_for_states(callback: &FakeStateNotifier, len: usize) {
        while callback.states.lock().len() != len {
            fasync::Timer::new(fasync::Time::after(zx::Duration::from_millis(10))).await;
        }
    }

    proptest! {
        #[test]
        fn test_adding_callback_sends_current_state(
                update_state: Option<State>,
                version_available in random_version_available(),
        ) {
            fasync::Executor::new().unwrap().run_singlethreaded(async {
                let mut update_monitor = random_update_monitor(update_state.clone(), version_available).await;
                let expected_states: Vec<_> = update_state.into_iter().collect();
                let temporary_callback = FakeStateNotifier::new();

                update_monitor.add_temporary_callback(temporary_callback.clone()).await;

                wait_for_states(&temporary_callback, expected_states.len()).await;
                prop_assert_eq!(
                    &temporary_callback.states.lock().clone(),
                    &expected_states
                );
                Ok(())
            }).unwrap();
        }

        #[test]
        fn test_advance_update_state_calls_callbacks(
                initial_state: Option<State>,
                version_available in random_version_available(),
                next_states in prop::collection::vec(any::<State>(), 0..4),
        ) {
            fasync::Executor::new().unwrap().run_singlethreaded(async {
                let mut update_monitor = random_update_monitor(initial_state.clone(), version_available).await;
                let temporary_callback = FakeStateNotifier::new();
                let expected_states: Vec<_> = initial_state.clone().into_iter().chain(next_states.clone().into_iter()).collect();

                update_monitor.add_temporary_callback(temporary_callback.clone()).await;

                for state in next_states {
                    update_monitor.advance_update_state(state).await;
                }

                wait_for_states(&temporary_callback, expected_states.len()).await;
                prop_assert_eq!(
                    &temporary_callback.states.lock().clone(),
                    &expected_states
                );
                Ok(())
            }).unwrap();
        }

        #[test]
        fn test_clear_clears_version_available(
            update_state: Option<State>,
            version_available in random_version_available(),
        ) {
            fasync::Executor::new().unwrap().run_singlethreaded(async {
                let mut update_monitor = random_update_monitor::<FakeStateNotifier>(update_state, version_available).await;
                update_monitor.set_version_available(VERSION_AVAILABLE.to_string());

                update_monitor.clear().await;

                prop_assert_eq!(
                    update_monitor.get_version_available(),
                    None
                );
                Ok(())
            }).unwrap();
        }

        #[test]
        fn test_clear_clears_temporary_callbacks(
            update_state: Option<State>,
            version_available in random_version_available(),
        ) {
            fasync::Executor::new().unwrap().run_singlethreaded(async {
                let mut update_monitor = random_update_monitor(update_state, version_available).await;
                let temporary_callback = FakeStateNotifier::new();

                update_monitor.add_temporary_callback(temporary_callback.clone()).await;
                update_monitor.clear().await;
                temporary_callback.states.lock().clear();
                update_monitor.advance_update_state(State::CheckingForUpdates).await;

                prop_assert_eq!(temporary_callback.states.lock().clone(), vec![]);
                Ok(())
            }).unwrap();
        }

        #[test]
        fn test_try_flush_flushes_temporary_callbacks(
            update_state: State,
            version_available in random_version_available(),
        ) {
            let mut executor = fasync::Executor::new().unwrap();
            let mut update_monitor = executor.run_singlethreaded(random_update_monitor(Some(update_state.clone()), version_available));

            let (sender, mut receiver) = mpsc::channel(0);
            let temporary_callback = MpscNotifier { sender };
            executor.run_singlethreaded(update_monitor.add_temporary_callback(temporary_callback));

            let flush = update_monitor.try_flush();
            pin_mut!(flush);
            prop_assert_eq!(executor.run_until_stalled(&mut flush), Poll::Pending);
            prop_assert_eq!(executor.run_until_stalled(&mut receiver.next()), Poll::Ready(Some(update_state)));
            prop_assert_eq!(executor.run_until_stalled(&mut flush), Poll::Ready(()));
        }

        #[test]
        fn test_try_flush_timeout(
            update_state: State,
            version_available in random_version_available(),
        ) {
            let mut executor = fasync::Executor::new_with_fake_time().unwrap();
            // Can't use run_singlethreaded on executor with fake time.
            let update_monitor_fut = random_update_monitor(Some(update_state), version_available);
            pin_mut!(update_monitor_fut);
            let mut update_monitor = match executor.run_until_stalled(&mut update_monitor_fut) {
                Poll::Ready(monitor) => monitor,
                Poll::Pending => panic!("random_update_monitor blocked"),
            };

            let (sender, _receiver) = mpsc::channel(0);
            {
                let temporary_callback = MpscNotifier { sender };
                let add_temporary_callback = update_monitor.add_temporary_callback(temporary_callback);
                pin_mut!(add_temporary_callback);
                prop_assert_eq!(executor.run_until_stalled(&mut add_temporary_callback), Poll::Ready(()));
            }

            let flush = update_monitor.try_flush();
            pin_mut!(flush);
            prop_assert_eq!(executor.run_until_stalled(&mut flush), Poll::Pending);
            let expected_deadline = executor.now() + zx::Duration::from_seconds(5);
            prop_assert_eq!(executor.wake_next_timer(), Some(expected_deadline));
            prop_assert_eq!(executor.run_until_stalled(&mut flush), Poll::Ready(()));
        }
    }
}

#[cfg(test)]
mod test_inspect {
    use super::*;
    use event_queue::{ClosedClient, Notify};
    use fuchsia_async as fasync;
    use fuchsia_inspect::assert_inspect_tree;

    #[derive(Clone, Debug)]
    struct FakeStateNotifier;
    impl Notify for FakeStateNotifier {
        type Event = State;
        type NotifyFuture = future::Ready<Result<(), ClosedClient>>;
        fn notify(&self, _state: State) -> Self::NotifyFuture {
            future::ready(Ok(()))
        }
    }

    #[test]
    fn test_inspect_initial_state() {
        let inspector = fuchsia_inspect::Inspector::new();
        let _update_monitor = UpdateMonitor::<FakeStateNotifier>::from_inspect_node(
            inspector.root().create_child("update-monitor"),
        );

        assert_inspect_tree!(
            inspector,
            root: {
                "update-monitor": {
                    "update-state": "None",
                    "version-available": "None",
                }
            }
        );
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_inspect_update_state() {
        let inspector = fuchsia_inspect::Inspector::new();
        let (fut, mut update_monitor) = UpdateMonitor::<FakeStateNotifier>::from_inspect_node(
            inspector.root().create_child("update-monitor"),
        );
        fasync::Task::spawn(fut).detach();

        update_monitor.advance_update_state(State::CheckingForUpdates).await;

        assert_inspect_tree!(
            inspector,
            root: {
                "update-monitor": {
                    "update-state": format!("{:?}", Some(State::CheckingForUpdates)),
                    "version-available": "None",
                }
            }
        );
    }

    #[test]
    fn test_inspect_version_available() {
        let inspector = fuchsia_inspect::Inspector::new();
        let (_fut, mut update_monitor) = UpdateMonitor::<FakeStateNotifier>::from_inspect_node(
            inspector.root().create_child("update-monitor"),
        );
        let version_available = "fake-version-available";

        update_monitor.set_version_available(version_available.to_string());

        assert_inspect_tree!(
            inspector,
            root: {
                "update-monitor": {
                    "update-state": "None",
                    "version-available": format!("{:?}", Some(version_available)),
                }
            }
        );
    }
}
