// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use crate::message::VmoMessage;
use anyhow::Error;
use fidl::endpoints::{ClientEnd, Proxy};
use fidl_fuchsia_io as fio;
use fidl_fuchsia_test_debug as ftest_debug;
use futures::stream::{Stream, StreamExt};

/// Max VMOs to send at once. This is limited primarily by the max handles in a channel
/// write, although we may encounter the max byte limit too.
/// TODO(fxbug.dev/100462): Use tape measure instead
const VMO_CHUNK_SIZE: usize = 32;

/// Processes a stream of |DebugData| VMOs and places the results in |dir_path|.
pub async fn process_debug_data_vmos<S: Stream<Item = VmoMessage> + std::marker::Unpin>(
    dir_path: &str,
    processor_proxy: ftest_debug::DebugDataProcessorProxy,
    event_receiver: S,
) -> Result<(), Error> {
    let directory_proxy = io_util::open_directory_in_namespace(
        dir_path,
        fio::OpenFlags::RIGHT_READABLE | fio::OpenFlags::RIGHT_WRITABLE,
    )?;
    let client_end = ClientEnd::new(directory_proxy.into_channel().unwrap().into_zx_channel());
    processor_proxy.set_directory(client_end)?;

    let mut chunked_events = event_receiver
        .map(|VmoMessage { test_url, data_sink, vmo }| ftest_debug::DebugVmo {
            test_url,
            data_sink,
            vmo,
        })
        .ready_chunks(VMO_CHUNK_SIZE);

    while let Some(mut items) = chunked_events.next().await {
        processor_proxy.add_debug_vmos(&mut items.iter_mut()).await?;
    }
    processor_proxy.finish().await?;
    Ok(())
}

#[cfg(test)]
mod test {
    use super::*;
    use fidl::endpoints::create_proxy_and_stream;
    use fidl_fuchsia_io as fio;
    use fuchsia_zircon as zx;
    use futures::stream::TryStreamExt;
    use tempfile::tempdir;

    struct ExpectedVmoMetadata {
        test_url: String,
        data_sink: String,
    }

    impl ExpectedVmoMetadata {
        #[allow(unused)]
        fn new<S: Into<String>, T: Into<String>>(test_url: S, data_sink: T) -> Self {
            Self { test_url: test_url.into(), data_sink: data_sink.into() }
        }
    }

    async fn run_test_processor(
        mut stream: ftest_debug::DebugDataProcessorRequestStream,
        expected_vmos: Vec<ExpectedVmoMetadata>,
    ) {
        let req = stream.try_next().await.expect("get first request").unwrap();
        let dir = match req {
            ftest_debug::DebugDataProcessorRequest::SetDirectory { directory, .. } => {
                directory.into_proxy().expect("convert to proxy")
            }
            other => panic!("First request should be SetDirectory but got {:?}", other),
        };

        // check dir is writable
        let file = io_util::open_file(
            &dir,
            "file".as_ref(),
            fio::OpenFlags::RIGHT_WRITABLE | fio::OpenFlags::CREATE,
        )
        .expect("create file");
        io_util::write_file(&file, "file content").await.expect("write to file");
        drop(file);

        let mut collected_vmos = vec![];
        let mut finish_called = false;
        while let Some(req) = stream.try_next().await.expect("get request") {
            match req {
                ftest_debug::DebugDataProcessorRequest::SetDirectory { .. } => {
                    panic!("Set directory called twice")
                }
                ftest_debug::DebugDataProcessorRequest::AddDebugVmos {
                    mut vmos,
                    responder,
                    ..
                } => {
                    collected_vmos.append(&mut vmos);
                    responder.send().unwrap();
                }
                ftest_debug::DebugDataProcessorRequest::Finish { responder, .. } => {
                    // check dir is still writable (resources have not been torn down prematurely).
                    let file = io_util::open_file(
                        &dir,
                        "file_2".as_ref(),
                        fio::OpenFlags::RIGHT_WRITABLE | fio::OpenFlags::CREATE,
                    )
                    .expect("create file");
                    io_util::write_file(&file, "file content").await.expect("write to file");
                    drop(file);

                    finish_called = true;
                    responder.send().unwrap();
                    break;
                }
            }
        }

        assert!(finish_called);
        let expected_and_actual_iter = expected_vmos.into_iter().zip(collected_vmos.into_iter());
        for (item_no, (expected, actual)) in expected_and_actual_iter.enumerate() {
            assert_eq!(
                expected.test_url, actual.test_url,
                "Test URL mismatch in VMO #{:?}",
                item_no
            );
            assert_eq!(
                expected.data_sink, actual.data_sink,
                "Data sink mismatch in VMO #{:?}",
                item_no
            );
        }
    }

    #[fuchsia::test]
    async fn process_empty_stream() {
        let dir = tempdir().unwrap();
        let (proxy, stream) =
            create_proxy_and_stream::<ftest_debug::DebugDataProcessorMarker>().unwrap();
        let process_fut =
            process_debug_data_vmos(dir.path().to_str().unwrap(), proxy, futures::stream::iter([]));
        let test_processor_fut = run_test_processor(stream, vec![]);
        let (result, ()) = futures::future::join(process_fut, test_processor_fut).await;
        result.expect("processor failed");
    }

    #[fuchsia::test]
    async fn process_single_item_stream() {
        let dir = tempdir().unwrap();
        let (proxy, stream) =
            create_proxy_and_stream::<ftest_debug::DebugDataProcessorMarker>().unwrap();
        let process_fut = process_debug_data_vmos(
            dir.path().to_str().unwrap(),
            proxy,
            futures::stream::iter([VmoMessage {
                test_url: "test_url".to_string(),
                data_sink: "data_sink".to_string(),
                vmo: zx::Vmo::create(1024).unwrap(),
            }]),
        );
        let test_processor_fut =
            run_test_processor(stream, vec![ExpectedVmoMetadata::new("test_url", "data_sink")]);
        let (result, ()) = futures::future::join(process_fut, test_processor_fut).await;
        result.expect("processor failed");
    }

    #[fuchsia::test]
    async fn process_multiple_item_stream() {
        let dir = tempdir().unwrap();
        let (proxy, stream) =
            create_proxy_and_stream::<ftest_debug::DebugDataProcessorMarker>().unwrap();

        let vmo_stream = (0..100).map(|idx| VmoMessage {
            test_url: format!("test_url_{:?}", idx),
            data_sink: format!("data_sink_{:?}", idx),
            vmo: zx::Vmo::create(1024).unwrap(),
        });
        let expected_vmos = (0..100)
            .map(|idx| {
                ExpectedVmoMetadata::new(
                    format!("test_url_{:?}", idx),
                    format!("data_sink_{:?}", idx),
                )
            })
            .collect();

        let process_fut = process_debug_data_vmos(
            dir.path().to_str().unwrap(),
            proxy,
            futures::stream::iter(vmo_stream),
        );
        let test_processor_fut = run_test_processor(stream, expected_vmos);
        let (result, ()) = futures::future::join(process_fut, test_processor_fut).await;
        result.expect("processor failed");
    }
}
