blob: dc9e0d9d7b53e82e5751b2da0d7229ad0340fc5c [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::message::VmoMessage;
use anyhow::Error;
use fidl::endpoints::{ClientEnd, Proxy};
use fidl_fuchsia_io as fio;
use fidl_fuchsia_test_debug as ftest_debug;
use futures::stream::{Stream, StreamExt};
/// Max VMOs to send at once. This is limited primarily by the max handles in a channel
/// write, although we may encounter the max byte limit too.
/// TODO(fxbug.dev/100462): Use tape measure instead
const VMO_CHUNK_SIZE: usize = 32;
/// Processes a stream of |DebugData| VMOs and places the results in |dir_path|.
pub async fn process_debug_data_vmos<S: Stream<Item = VmoMessage> + std::marker::Unpin>(
dir_path: &str,
processor_proxy: ftest_debug::DebugDataProcessorProxy,
event_receiver: S,
) -> Result<(), Error> {
let directory_proxy = io_util::open_directory_in_namespace(
dir_path,
fio::OpenFlags::RIGHT_READABLE | fio::OpenFlags::RIGHT_WRITABLE,
)?;
let client_end = ClientEnd::new(directory_proxy.into_channel().unwrap().into_zx_channel());
processor_proxy.set_directory(client_end)?;
let mut chunked_events = event_receiver
.map(|VmoMessage { test_url, data_sink, vmo }| ftest_debug::DebugVmo {
test_url,
data_sink,
vmo,
})
.ready_chunks(VMO_CHUNK_SIZE);
while let Some(mut items) = chunked_events.next().await {
processor_proxy.add_debug_vmos(&mut items.iter_mut()).await?;
}
processor_proxy.finish().await?;
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use fidl::endpoints::create_proxy_and_stream;
use fidl_fuchsia_io as fio;
use fuchsia_zircon as zx;
use futures::stream::TryStreamExt;
use tempfile::tempdir;
struct ExpectedVmoMetadata {
test_url: String,
data_sink: String,
}
impl ExpectedVmoMetadata {
#[allow(unused)]
fn new<S: Into<String>, T: Into<String>>(test_url: S, data_sink: T) -> Self {
Self { test_url: test_url.into(), data_sink: data_sink.into() }
}
}
async fn run_test_processor(
mut stream: ftest_debug::DebugDataProcessorRequestStream,
expected_vmos: Vec<ExpectedVmoMetadata>,
) {
let req = stream.try_next().await.expect("get first request").unwrap();
let dir = match req {
ftest_debug::DebugDataProcessorRequest::SetDirectory { directory, .. } => {
directory.into_proxy().expect("convert to proxy")
}
other => panic!("First request should be SetDirectory but got {:?}", other),
};
// check dir is writable
let file = io_util::open_file(
&dir,
"file".as_ref(),
fio::OpenFlags::RIGHT_WRITABLE | fio::OpenFlags::CREATE,
)
.expect("create file");
io_util::write_file(&file, "file content").await.expect("write to file");
drop(file);
let mut collected_vmos = vec![];
let mut finish_called = false;
while let Some(req) = stream.try_next().await.expect("get request") {
match req {
ftest_debug::DebugDataProcessorRequest::SetDirectory { .. } => {
panic!("Set directory called twice")
}
ftest_debug::DebugDataProcessorRequest::AddDebugVmos {
mut vmos,
responder,
..
} => {
collected_vmos.append(&mut vmos);
responder.send().unwrap();
}
ftest_debug::DebugDataProcessorRequest::Finish { responder, .. } => {
// check dir is still writable (resources have not been torn down prematurely).
let file = io_util::open_file(
&dir,
"file_2".as_ref(),
fio::OpenFlags::RIGHT_WRITABLE | fio::OpenFlags::CREATE,
)
.expect("create file");
io_util::write_file(&file, "file content").await.expect("write to file");
drop(file);
finish_called = true;
responder.send().unwrap();
break;
}
}
}
assert!(finish_called);
let expected_and_actual_iter = expected_vmos.into_iter().zip(collected_vmos.into_iter());
for (item_no, (expected, actual)) in expected_and_actual_iter.enumerate() {
assert_eq!(
expected.test_url, actual.test_url,
"Test URL mismatch in VMO #{:?}",
item_no
);
assert_eq!(
expected.data_sink, actual.data_sink,
"Data sink mismatch in VMO #{:?}",
item_no
);
}
}
#[fuchsia::test]
async fn process_empty_stream() {
let dir = tempdir().unwrap();
let (proxy, stream) =
create_proxy_and_stream::<ftest_debug::DebugDataProcessorMarker>().unwrap();
let process_fut =
process_debug_data_vmos(dir.path().to_str().unwrap(), proxy, futures::stream::iter([]));
let test_processor_fut = run_test_processor(stream, vec![]);
let (result, ()) = futures::future::join(process_fut, test_processor_fut).await;
result.expect("processor failed");
}
#[fuchsia::test]
async fn process_single_item_stream() {
let dir = tempdir().unwrap();
let (proxy, stream) =
create_proxy_and_stream::<ftest_debug::DebugDataProcessorMarker>().unwrap();
let process_fut = process_debug_data_vmos(
dir.path().to_str().unwrap(),
proxy,
futures::stream::iter([VmoMessage {
test_url: "test_url".to_string(),
data_sink: "data_sink".to_string(),
vmo: zx::Vmo::create(1024).unwrap(),
}]),
);
let test_processor_fut =
run_test_processor(stream, vec![ExpectedVmoMetadata::new("test_url", "data_sink")]);
let (result, ()) = futures::future::join(process_fut, test_processor_fut).await;
result.expect("processor failed");
}
#[fuchsia::test]
async fn process_multiple_item_stream() {
let dir = tempdir().unwrap();
let (proxy, stream) =
create_proxy_and_stream::<ftest_debug::DebugDataProcessorMarker>().unwrap();
let vmo_stream = (0..100).map(|idx| VmoMessage {
test_url: format!("test_url_{:?}", idx),
data_sink: format!("data_sink_{:?}", idx),
vmo: zx::Vmo::create(1024).unwrap(),
});
let expected_vmos = (0..100)
.map(|idx| {
ExpectedVmoMetadata::new(
format!("test_url_{:?}", idx),
format!("data_sink_{:?}", idx),
)
})
.collect();
let process_fut = process_debug_data_vmos(
dir.path().to_str().unwrap(),
proxy,
futures::stream::iter(vmo_stream),
);
let test_processor_fut = run_test_processor(stream, expected_vmos);
let (result, ()) = futures::future::join(process_fut, test_processor_fut).await;
result.expect("processor failed");
}
}