// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use crate::{
    elementary_stream::*, output_validator::*, stream::*, stream_runner::*, FatalError, Result,
};
use anyhow::Context as _;
use fidl_fuchsia_media::StreamProcessorProxy;
use futures::{future::BoxFuture, stream::FuturesUnordered, TryStreamExt};
use std::rc::Rc;

const FIRST_FORMAT_DETAILS_VERSION_ORDINAL: u64 = 1;

pub trait StreamProcessorFactory {
    fn connect_to_stream_processor(
        &self,
        stream: &dyn ElementaryStream,
        format_details_version_ordinal: u64,
    ) -> BoxFuture<'_, Result<StreamProcessorProxy>>;
}

/// A test spec describes all the cases that will run and the circumstances in which
/// they will run.
pub struct TestSpec {
    pub cases: Vec<TestCase>,
    pub relation: CaseRelation,
    pub stream_processor_factory: Rc<dyn StreamProcessorFactory>,
}

/// A case relation describes the temporal relationship between two test cases.
pub enum CaseRelation {
    /// With serial relation, test cases will be run in sequence using the same codec server.
    Serial,
    /// With concurrent relation, test cases will run concurrently using two or more codec servers.
    Concurrent,
}

/// A test cases describes a sequence of elementary stream chunks that should be fed into a codec
/// server, and a set of validators to check the output. To pass, all validations must pass for all
/// output from the stream.
pub struct TestCase {
    pub name: &'static str,
    pub stream: Rc<dyn ElementaryStream>,
    pub validators: Vec<Rc<dyn OutputValidator>>,
    pub stream_options: Option<StreamOptions>,
}

impl TestSpec {
    pub async fn run(self) -> Result<()> {
        match self.relation {
            CaseRelation::Serial => {
                run_cases_serially(self.stream_processor_factory.as_ref(), self.cases).await
            }
            CaseRelation::Concurrent => {
                run_cases_concurrently(self.stream_processor_factory.as_ref(), self.cases).await
            }
        }
    }
}

async fn run_cases_serially(
    stream_processor_factory: &dyn StreamProcessorFactory,
    cases: Vec<TestCase>,
) -> Result<()> {
    let stream_processor =
        if let Some(stream) = cases.first().as_ref().map(|case| case.stream.as_ref()) {
            stream_processor_factory
                .connect_to_stream_processor(stream, FIRST_FORMAT_DETAILS_VERSION_ORDINAL)
                .await?
        } else {
            return Err(FatalError(String::from("No test cases provided.")).into());
        };
    let mut stream_runner = StreamRunner::new(stream_processor);

    for case in cases {
        let output = stream_runner
            .run_stream(case.stream, case.stream_options.unwrap_or_default())
            .await
            .context(format!("Running case {}", case.name))?;
        for validator in case.validators {
            validator.validate(&output).await.context(format!("Validating case {}", case.name))?;
        }
    }

    Ok(())
}

async fn run_cases_concurrently(
    stream_processor_factory: &dyn StreamProcessorFactory,
    cases: Vec<TestCase>,
) -> Result<()> {
    let mut unordered = FuturesUnordered::new();
    for case in cases {
        unordered.push(run_cases_serially(stream_processor_factory, vec![case]))
    }

    while let Some(()) = unordered.try_next().await? {}

    Ok(())
}

pub fn with_large_stack(f: fn() -> Result<()>) -> Result<()> {
    // The TestSpec futures are too big to fit on Fuchsia's default stack.
    // We need this when running the test in panic=abort mode (in unwind mode,
    // the test harness creates a thread with a 2MB stack for us).
    const STACK_SIZE: usize = 2 * 1024 * 1024;
    std::thread::Builder::new().stack_size(STACK_SIZE).spawn(f).unwrap().join().unwrap()
}
