// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    crate::{cancel::OrCancel, trace::duration},
    anyhow::Error,
    diagnostics_data::{LogsData, Severity},
    fidl_fuchsia_test_manager::LogsIteratorOption,
    fuchsia_async as fasync,
    futures::{Future, FutureExt, Stream, TryStreamExt},
    log::warn,
    std::{io::Write, time::Duration},
};

// TODO(fxbug.dev/54198, fxbug.dev/70581): deprecate this when implementing metadata selectors for
// logs or when we support OnRegisterInterest that can be sent to *all* test components.
#[derive(Clone, Default)]
pub(crate) struct LogCollectionOptions {
    pub min_severity: Option<Severity>,
    pub max_severity: Option<Severity>,
}

/// Options for timing out log collection.
/// Log collection is timed out when both |timeout_fut| is resolved and
/// |time_between_logs| has passed since either |timeout_fut| was signalled or
/// the last log was processed.
/// TODO(fxbug.dev/98223): remove timeouts once logs no longer hang.
pub(crate) struct LogTimeoutOptions<F: Future> {
    pub timeout_fut: F,
    pub time_between_logs: Duration,
}

impl LogCollectionOptions {
    fn is_restricted_log(&self, log: &LogsData) -> bool {
        let severity = log.metadata.severity;
        matches!(self.max_severity, Some(max) if severity > max)
    }

    fn should_display(&self, log: &LogsData) -> bool {
        let severity = log.metadata.severity;
        matches!(self.min_severity, None)
            || matches!(self.min_severity, Some(min) if severity >= min)
    }
}

#[derive(Debug, PartialEq, Eq)]
pub enum LogCollectionOutcome {
    Error { restricted_logs: Vec<String> },
    Passed,
}

impl From<Vec<String>> for LogCollectionOutcome {
    fn from(restricted_logs: Vec<String>) -> Self {
        if restricted_logs.is_empty() {
            LogCollectionOutcome::Passed
        } else {
            LogCollectionOutcome::Error { restricted_logs }
        }
    }
}

/// Collects logs from |stream|, filters out low severity logs, and stores the results
/// in |log_artifact|. Returns any high severity restricted logs that are encountered.
pub(crate) async fn collect_logs<S, W, F>(
    mut stream: S,
    mut log_artifact: W,
    options: LogCollectionOptions,
    timeout_options: LogTimeoutOptions<F>,
) -> Result<LogCollectionOutcome, Error>
where
    S: Stream<Item = Result<LogsData, Error>> + Unpin,
    W: Write,
    F: Future,
{
    duration!("collect_logs");
    let timeout_options = LogTimeoutOptions {
        timeout_fut: timeout_options.timeout_fut.map(|_| ()).shared(),
        time_between_logs: timeout_options.time_between_logs,
    };
    let mut restricted_logs = vec![];
    while let Some(mut log) = next_log_or_timeout(&mut stream, &timeout_options).await? {
        duration!("process_single_log");
        let is_restricted = options.is_restricted_log(&log);
        let should_display = options.should_display(&log);
        if !should_display && !is_restricted {
            continue;
        }

        if log.moniker == "test_root" {
            log.moniker = "<root>".to_string();
        } else if log.moniker.starts_with("test_root/") {
            log.moniker = log.moniker.replace("test_root/", "");
        }
        let log_repr = format!("{}", log);

        if should_display {
            writeln!(log_artifact, "{}", log_repr)?;
        }

        if is_restricted {
            restricted_logs.push(log_repr);
        }
    }
    Ok(restricted_logs.into())
}

async fn next_log_or_timeout<S, F>(
    stream: &mut S,
    timeout_options: &LogTimeoutOptions<futures::future::Shared<F>>,
) -> Result<Option<LogsData>, Error>
where
    S: Stream<Item = Result<LogsData, Error>> + Unpin,
    F: Future,
    <F as Future>::Output: std::clone::Clone,
{
    let timeout = async {
        timeout_options.timeout_fut.clone().await;
        fasync::Timer::new(timeout_options.time_between_logs).await;
        warn!("Log timeout invoked")
    };
    stream.try_next().or_cancelled(timeout).await.unwrap_or(Ok(None))
}

#[cfg(target_os = "fuchsia")]
pub fn get_type() -> LogsIteratorOption {
    LogsIteratorOption::BatchIterator
}

#[cfg(not(target_os = "fuchsia"))]
pub fn get_type() -> LogsIteratorOption {
    LogsIteratorOption::ArchiveIterator
}

#[cfg(test)]
mod test {
    use {
        super::*,
        diagnostics_data::{BuilderArgs, LogsDataBuilder},
    };

    fn infinite_log_timeout() -> LogTimeoutOptions<futures::future::Pending<()>> {
        LogTimeoutOptions {
            timeout_fut: futures::future::pending::<()>(),
            time_between_logs: Duration::ZERO,
        }
    }

    #[fuchsia::test]
    async fn simplify_log_moniker() {
        let unaltered_logs = vec![
            LogsDataBuilder::new(BuilderArgs {
                moniker: "test_root".into(),
                timestamp_nanos: 0i64.into(),
                component_url: "test-root-url".to_string().into(),
                severity: Severity::Info,
            })
            .set_message("my info log")
            .build(),
            LogsDataBuilder::new(BuilderArgs {
                moniker: "test_root/child".into(),
                timestamp_nanos: 1000i64.into(),
                component_url: "test-child-url".to_string().into(),
                severity: Severity::Warn,
            })
            .set_message("my warn log")
            .build(),
        ];
        let altered_moniker_logs = vec![
            LogsDataBuilder::new(BuilderArgs {
                moniker: "<root>".into(),
                timestamp_nanos: 0i64.into(),
                component_url: "test-root-url".to_string().into(),
                severity: Severity::Info,
            })
            .set_message("my info log")
            .build(),
            LogsDataBuilder::new(BuilderArgs {
                moniker: "child".into(),
                timestamp_nanos: 1000i64.into(),
                component_url: "test-child-url".to_string().into(),
                severity: Severity::Warn,
            })
            .set_message("my warn log")
            .build(),
        ];

        let mut log_artifact = vec![];
        assert_eq!(
            collect_logs(
                futures::stream::iter(unaltered_logs.into_iter().map(Ok)),
                &mut log_artifact,
                LogCollectionOptions { min_severity: None, max_severity: None },
                infinite_log_timeout(),
            )
            .await
            .unwrap(),
            LogCollectionOutcome::Passed
        );
        assert_eq!(
            String::from_utf8(log_artifact).unwrap(),
            altered_moniker_logs
                .iter()
                .map(|log| format!("{}\n", log))
                .collect::<Vec<_>>()
                .concat()
        );
    }

    #[fuchsia::test]
    async fn filter_low_severity() {
        let input_logs = vec![
            LogsDataBuilder::new(BuilderArgs {
                moniker: "test_root".into(),
                timestamp_nanos: 0i64.into(),
                component_url: "test-root-url".to_string().into(),
                severity: Severity::Info,
            })
            .set_message("my info log")
            .build(),
            LogsDataBuilder::new(BuilderArgs {
                moniker: "test_root/child".into(),
                timestamp_nanos: 1000i64.into(),
                component_url: "test-child-url".to_string().into(),
                severity: Severity::Warn,
            })
            .set_message("my info log")
            .build(),
        ];
        let displayed_logs = vec![LogsDataBuilder::new(BuilderArgs {
            moniker: "child".into(),
            timestamp_nanos: 1000i64.into(),
            component_url: "test-child-url".to_string().into(),
            severity: Severity::Warn,
        })
        .set_message("my info log")
        .build()];

        let mut log_artifact = vec![];
        assert_eq!(
            collect_logs(
                futures::stream::iter(input_logs.into_iter().map(Ok)),
                &mut log_artifact,
                LogCollectionOptions { min_severity: Severity::Warn.into(), max_severity: None },
                infinite_log_timeout(),
            )
            .await
            .unwrap(),
            LogCollectionOutcome::Passed
        );
        assert_eq!(
            String::from_utf8(log_artifact).unwrap(),
            displayed_logs.iter().map(|log| format!("{}\n", log)).collect::<Vec<_>>().concat()
        );
    }

    #[fuchsia::test]
    async fn display_restricted_logs() {
        let input_logs = vec![
            LogsDataBuilder::new(BuilderArgs {
                moniker: "test_root".into(),
                timestamp_nanos: 0i64.into(),
                component_url: "test-root-url".to_string().into(),
                severity: Severity::Info,
            })
            .set_message("my info log")
            .build(),
            LogsDataBuilder::new(BuilderArgs {
                moniker: "test_root/child".into(),
                timestamp_nanos: 1000i64.into(),
                component_url: "test-child-url".to_string().into(),
                severity: Severity::Error,
            })
            .set_message("my error log")
            .build(),
        ];
        let displayed_logs = vec![
            LogsDataBuilder::new(BuilderArgs {
                moniker: "<root>".into(),
                timestamp_nanos: 0i64.into(),
                component_url: "test-root-url".to_string().into(),
                severity: Severity::Info,
            })
            .set_message("my info log")
            .build(),
            LogsDataBuilder::new(BuilderArgs {
                moniker: "child".into(),
                timestamp_nanos: 1000i64.into(),
                component_url: "test-child-url".to_string().into(),
                severity: Severity::Error,
            })
            .set_message("my error log")
            .build(),
        ];

        let mut log_artifact = vec![];
        assert_eq!(
            collect_logs(
                futures::stream::iter(input_logs.into_iter().map(Ok)),
                &mut log_artifact,
                LogCollectionOptions { min_severity: None, max_severity: Severity::Warn.into() },
                infinite_log_timeout(),
            )
            .await
            .unwrap(),
            LogCollectionOutcome::Error { restricted_logs: vec![format!("{}", displayed_logs[1])] }
        );
        assert_eq!(
            String::from_utf8(log_artifact).unwrap(),
            displayed_logs.iter().map(|log| format!("{}\n", log)).collect::<Vec<_>>().concat()
        );
    }

    // fuchsia only as TestExecutor is not available on host.
    #[cfg(target_os = "fuchsia")]
    mod fuchsia_tests {
        use super::*;
        use fuchsia_zircon as zx;
        use futures::channel::mpsc;

        #[fuchsia::test]
        fn terminate_on_timeout() {
            const TIMEOUT_BETWEEN_LOGS: zx::Duration = zx::Duration::from_seconds(5);
            let mut executor = fasync::TestExecutor::new_with_fake_time().expect("create executor");

            let (mut log_sender, log_recv) = mpsc::channel(5);
            let timeout_signal = async_utils::event::Event::new();

            let mut log_artifact = vec![];
            let mut collect_logs_fut = collect_logs(
                log_recv,
                &mut log_artifact,
                LogCollectionOptions { min_severity: None, max_severity: Severity::Warn.into() },
                LogTimeoutOptions {
                    timeout_fut: timeout_signal.wait(),
                    time_between_logs: Duration::from_secs(
                        TIMEOUT_BETWEEN_LOGS.into_seconds() as u64
                    ),
                },
            )
            .boxed();

            // send first log
            log_sender
                .try_send(Ok(LogsDataBuilder::new(BuilderArgs {
                    moniker: "test_root".into(),
                    timestamp_nanos: 0i64.into(),
                    component_url: "test-root-url".to_string().into(),
                    severity: Severity::Info,
                })
                .set_message("first log")
                .build()))
                .expect("send log");
            assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());

            // advancing time past the time between logs shouldn't timeout logs if the
            // timeout signal isn't triggered yet
            executor.set_fake_time(
                executor.now() + TIMEOUT_BETWEEN_LOGS + zx::Duration::from_seconds(1),
            );
            assert!(!executor.wake_expired_timers());
            assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());

            // second log should be sent
            log_sender
                .try_send(Ok(LogsDataBuilder::new(BuilderArgs {
                    moniker: "test_root".into(),
                    timestamp_nanos: 0i64.into(),
                    component_url: "test-root-url".to_string().into(),
                    severity: Severity::Info,
                })
                .set_message("second log")
                .build()))
                .expect("send log");
            assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());

            // after triggering timeout_signal, logs still received so long as time_between_logs
            // doesn't pass
            timeout_signal.signal();
            log_sender
                .try_send(Ok(LogsDataBuilder::new(BuilderArgs {
                    moniker: "test_root".into(),
                    timestamp_nanos: 0i64.into(),
                    component_url: "test-root-url".to_string().into(),
                    severity: Severity::Info,
                })
                .set_message("third log")
                .build()))
                .expect("send log");
            assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());

            executor.set_fake_time(
                executor.now() + TIMEOUT_BETWEEN_LOGS - zx::Duration::from_seconds(1),
            );
            assert!(!executor.wake_expired_timers());
            assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());

            // timeout period hasn't elapsed, so logs still accepted
            log_sender
                .try_send(Ok(LogsDataBuilder::new(BuilderArgs {
                    moniker: "test_root".into(),
                    timestamp_nanos: 0i64.into(),
                    component_url: "test-root-url".to_string().into(),
                    severity: Severity::Info,
                })
                .set_message("fourth log")
                .build()))
                .expect("send log");
            assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());

            // after timeout period elapses, stop collecting logs even if stream is still open
            executor.set_fake_time(
                executor.now() + TIMEOUT_BETWEEN_LOGS + zx::Duration::from_seconds(1),
            );
            assert!(executor.wake_expired_timers());
            match executor.run_until_stalled(&mut collect_logs_fut) {
                std::task::Poll::Ready(Ok(LogCollectionOutcome::Passed)) => (),
                _ => panic!("Expected future to complete successfully"),
            }
            drop(collect_logs_fut);

            let displayed_logs = vec![
                LogsDataBuilder::new(BuilderArgs {
                    moniker: "<root>".into(),
                    timestamp_nanos: 0i64.into(),
                    component_url: "test-root-url".to_string().into(),
                    severity: Severity::Info,
                })
                .set_message("first log")
                .build(),
                LogsDataBuilder::new(BuilderArgs {
                    moniker: "<root>".into(),
                    timestamp_nanos: 0i64.into(),
                    component_url: "test-root-url".to_string().into(),
                    severity: Severity::Info,
                })
                .set_message("second log")
                .build(),
                LogsDataBuilder::new(BuilderArgs {
                    moniker: "<root>".into(),
                    timestamp_nanos: 0i64.into(),
                    component_url: "test-root-url".to_string().into(),
                    severity: Severity::Info,
                })
                .set_message("third log")
                .build(),
                LogsDataBuilder::new(BuilderArgs {
                    moniker: "<root>".into(),
                    timestamp_nanos: 0i64.into(),
                    component_url: "test-root-url".to_string().into(),
                    severity: Severity::Info,
                })
                .set_message("fourth log")
                .build(),
            ];

            assert_eq!(
                String::from_utf8(log_artifact).unwrap(),
                displayed_logs.iter().map(|log| format!("{}\n", log)).collect::<Vec<_>>().concat()
            );
        }

        #[fuchsia::test]
        fn timeout_not_triggered_until_timeout_signal_given() {
            const TIMEOUT_BETWEEN_LOGS: zx::Duration = zx::Duration::from_seconds(5);
            let mut executor = fasync::TestExecutor::new_with_fake_time().expect("create executor");

            let (mut log_sender, log_recv) = mpsc::channel(5);
            let timeout_signal = async_utils::event::Event::new();

            let mut log_artifact = vec![];
            let mut collect_logs_fut = collect_logs(
                log_recv,
                &mut log_artifact,
                LogCollectionOptions { min_severity: None, max_severity: Severity::Warn.into() },
                LogTimeoutOptions {
                    timeout_fut: timeout_signal.wait(),
                    time_between_logs: Duration::from_secs(
                        TIMEOUT_BETWEEN_LOGS.into_seconds() as u64
                    ),
                },
            )
            .boxed();

            // send first log
            log_sender
                .try_send(Ok(LogsDataBuilder::new(BuilderArgs {
                    moniker: "test_root".into(),
                    timestamp_nanos: 0i64.into(),
                    component_url: "test-root-url".to_string().into(),
                    severity: Severity::Info,
                })
                .set_message("first log")
                .build()))
                .expect("send log");
            assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());

            // advancing time past the time between logs shouldn't timeout logs if the
            // timeout signal isn't triggered yet
            executor.set_fake_time(
                executor.now() + TIMEOUT_BETWEEN_LOGS + zx::Duration::from_seconds(1),
            );
            assert!(!executor.wake_expired_timers());
            assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());

            // when timeout is triggered, logs should be polled for an additional
            // TIMEOUT_BETWEEN_LOGS
            timeout_signal.signal();
            assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());
            assert!(!executor.wake_expired_timers());

            // After an additional TIMEOUT_BETWEEN_LOGS elapses logs should stop
            executor.set_fake_time(
                executor.now() + TIMEOUT_BETWEEN_LOGS + zx::Duration::from_seconds(1),
            );
            assert!(executor.wake_expired_timers());
            match executor.run_until_stalled(&mut collect_logs_fut) {
                std::task::Poll::Ready(Ok(LogCollectionOutcome::Passed)) => (),
                _ => panic!("Expected future to complete successfully"),
            }
            drop(collect_logs_fut);

            let displayed_logs = vec![LogsDataBuilder::new(BuilderArgs {
                moniker: "<root>".into(),
                timestamp_nanos: 0i64.into(),
                component_url: "test-root-url".to_string().into(),
                severity: Severity::Info,
            })
            .set_message("first log")
            .build()];

            assert_eq!(
                String::from_utf8(log_artifact).unwrap(),
                displayed_logs.iter().map(|log| format!("{}\n", log)).collect::<Vec<_>>().concat()
            );
        }
    }
}
