blob: 25971024be71ffa1317b9f1dab9f5016c8838e31 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
artifacts,
cancel::NamedFutureExt,
diagnostics::{self, LogCollectionOutcome},
outcome::RunTestSuiteError,
outcome::UnexpectedEventError,
output::{
ArtifactType, DirectoryArtifactType, DynDirectoryArtifact, DynReporter, EntityReporter,
},
stream_util::StreamUtil,
},
anyhow::{anyhow, Context as _},
fidl::Peered,
fidl_fuchsia_io as fio, fidl_fuchsia_test_manager as ftest_manager, fuchsia_async as fasync,
futures::{
future::{join_all, BoxFuture, FutureExt, TryFutureExt},
stream::{FuturesUnordered, StreamExt, TryStreamExt},
},
std::{borrow::Borrow, collections::VecDeque, io::Write, path::PathBuf},
tracing::{debug, warn},
};
/// Given an |artifact| reported over fuchsia.test.manager, create the appropriate artifact in the
/// reporter. Returns a Future, which when polled to completion, drains the results from |artifact|
/// and saves them to the reporter.
///
/// This method is an async method returning a Future so that the lifetime of |reporter| is not
/// tied to the lifetime of the Future.
/// The returned Future resolves to LogCollectionOutcome when logs are processed.
pub(crate) async fn drain_artifact<'a, E, T>(
reporter: &'a EntityReporter<E, T>,
artifact: ftest_manager::Artifact,
log_opts: diagnostics::LogCollectionOptions,
) -> Result<
BoxFuture<'static, Result<Option<LogCollectionOutcome>, anyhow::Error>>,
RunTestSuiteError,
>
where
T: Borrow<DynReporter>,
{
match artifact {
ftest_manager::Artifact::Stdout(socket) => {
let stdout = reporter.new_artifact(&ArtifactType::Stdout)?;
Ok(copy_socket_artifact(socket, stdout).map_ok(|_| None).named("stdout").boxed())
}
ftest_manager::Artifact::Stderr(socket) => {
let stderr = reporter.new_artifact(&ArtifactType::Stderr)?;
Ok(copy_socket_artifact(socket, stderr).map_ok(|_| None).named("stderr").boxed())
}
ftest_manager::Artifact::Log(syslog) => {
let syslog_artifact = reporter.new_artifact(&ArtifactType::Syslog)?;
Ok(diagnostics::collect_logs(
test_diagnostics::LogStream::from_syslog(syslog)?,
syslog_artifact,
log_opts,
)
.map_ok(Some)
.named("syslog")
.boxed())
}
ftest_manager::Artifact::Custom(ftest_manager::CustomArtifact {
directory_and_token,
component_moniker,
..
}) => {
let ftest_manager::DirectoryAndToken { directory, token, .. } = directory_and_token
.ok_or(UnexpectedEventError::MissingRequiredField {
containing_struct: "CustomArtifact",
field: "directory_and_token",
})?;
let directory_artifact = reporter
.new_directory_artifact(&DirectoryArtifactType::Custom, component_moniker)?;
Ok(async move {
let directory = directory.into_proxy()?;
let result =
artifacts::copy_custom_artifact_directory(directory, directory_artifact).await;
// TODO(https://fxbug.dev/42165719): Remove this signal once Overnet
// supports automatically signalling EVENTPAIR_CLOSED when the
// handle is closed.
let _ = token.signal_peer(fidl::Signals::empty(), fidl::Signals::USER_0);
result
}
.map_ok(|()| None)
.named("custom_artifacts")
.boxed())
}
ftest_manager::Artifact::DebugData(iterator) => {
let output_directory = reporter
.new_directory_artifact(&DirectoryArtifactType::Debug, None /* moniker */)?;
Ok(artifacts::copy_debug_data(iterator.into_proxy()?, output_directory)
.map(|()| Ok(None))
.named("debug_data")
.boxed())
}
ftest_manager::ArtifactUnknown!() => {
warn!("Encountered an unknown artifact");
Ok(futures::future::ready(Ok(None)).boxed())
}
}
}
/// Copy an artifact reported over a socket.
async fn copy_socket_artifact<W: Write>(
socket: fidl::Socket,
mut artifact: W,
) -> Result<usize, anyhow::Error> {
let mut async_socket = fidl::AsyncSocket::from_socket(socket);
let mut len = 0;
loop {
let done =
test_diagnostics::SocketReadFut::new(&mut async_socket, |maybe_buf| match maybe_buf {
Some(buf) => {
len += buf.len();
artifact.write_all(buf)?;
Ok(false)
}
None => Ok(true),
})
.await?;
if done {
artifact.flush()?;
return Ok(len);
}
}
}
/// Copy debug data reported over a debug data iterator to an output directory.
pub async fn copy_debug_data(
iterator: ftest_manager::DebugDataIteratorProxy,
output_directory: Box<DynDirectoryArtifact>,
) {
const PIPELINED_REQUESTS: usize = 4;
let unprocessed_data_stream =
futures::stream::repeat_with(move || iterator.get_next()).buffered(PIPELINED_REQUESTS);
let terminated_event_stream =
unprocessed_data_stream.take_until_stop_after(|result| match &result {
Ok(events) => events.is_empty(),
_ => true,
});
let data_futs = terminated_event_stream
.map(|result| match result {
Ok(vals) => vals,
Err(e) => {
warn!("Request failure: {:?}", e);
vec![]
}
})
.map(futures::stream::iter)
.flatten()
.map(|debug_data| {
let output =
debug_data.name.as_ref().ok_or_else(|| anyhow!("Missing profile name")).and_then(
|name| {
output_directory.new_file(&PathBuf::from(name)).map_err(anyhow::Error::from)
},
);
fasync::Task::spawn(async move {
let _ = &debug_data;
let mut output = output?;
let socket =
debug_data.socket.ok_or_else(|| anyhow!("Missing profile socket handle"))?;
debug!("Reading run profile \"{:?}\"", debug_data.name);
let start = std::time::Instant::now();
let len = copy_socket_artifact(socket, &mut output).await?;
debug!("Copied file {:?}: {} bytes in {:?}", debug_data.name, len, start.elapsed());
Ok::<(), anyhow::Error>(())
})
})
.collect::<Vec<_>>()
.await;
join_all(data_futs).await;
debug!("All profiles downloaded");
}
/// Copy a directory into a directory artifact.
async fn copy_custom_artifact_directory(
directory: fio::DirectoryProxy,
out_dir: Box<DynDirectoryArtifact>,
) -> Result<(), anyhow::Error> {
let mut paths = vec![];
let mut enumerate = fuchsia_fs::directory::readdir_recursive(&directory, None);
while let Ok(Some(file)) = enumerate.try_next().await {
if file.kind == fuchsia_fs::directory::DirentKind::File {
paths.push(file.name);
}
}
let futs = FuturesUnordered::new();
paths.iter().for_each(|path| {
let file = fuchsia_fs::directory::open_file_no_describe(
&directory,
path,
fuchsia_fs::OpenFlags::RIGHT_READABLE,
);
let output_file = out_dir.new_file(std::path::Path::new(path));
futs.push(async move {
let file = file.with_context(|| format!("with path {:?}", path))?;
let mut output_file = output_file?;
copy_file_to_writer(&file, &mut output_file).await.map(|_| ())
});
});
futs.for_each(|result| {
if let Err(e) = result {
warn!("Custom artifact failure: {}", e);
}
async move {}
})
.await;
Ok(())
}
async fn copy_file_to_writer<T: Write>(
file: &fio::FileProxy,
output: &mut T,
) -> Result<usize, anyhow::Error> {
const READ_SIZE: u64 = fio::MAX_BUF;
let mut vector = VecDeque::new();
// Arbitrary number of reads to pipeline.
const PIPELINED_READ_COUNT: u64 = 4;
for _n in 0..PIPELINED_READ_COUNT {
vector.push_back(file.read(READ_SIZE));
}
let mut len = 0;
loop {
let buf =
vector.pop_front().unwrap().await?.map_err(fuchsia_zircon_status::Status::from_raw)?;
if buf.is_empty() {
break;
}
len += buf.len();
output.write_all(&buf)?;
vector.push_back(file.read(READ_SIZE));
}
Ok(len)
}
#[cfg(test)]
mod socket_tests {
use {super::*, futures::AsyncWriteExt};
#[fuchsia::test]
async fn copy_socket() {
let cases = vec![vec![], b"0123456789abcde".to_vec(), vec![0u8; 4096]];
for case in cases.iter() {
let (client_socket, server_socket) = fidl::Socket::create_stream();
let mut output = vec![];
let write_fut = async move {
let mut async_socket = fidl::AsyncSocket::from_socket(server_socket);
async_socket.write_all(case.as_slice()).await.expect("write bytes");
};
let ((), res) =
futures::future::join(write_fut, copy_socket_artifact(client_socket, &mut output))
.await;
res.expect("copy contents");
assert_eq!(output.as_slice(), case.as_slice());
}
}
}
// These tests use vfs, which is only available on Fuchsia.
#[cfg(target_os = "fuchsia")]
#[cfg(test)]
mod file_tests {
use {
super::*,
crate::output::InMemoryDirectoryWriter,
fidl::endpoints::ServerEnd,
fidl_fuchsia_io as fio, fuchsia_async as fasync,
futures::prelude::*,
maplit::hashmap,
std::{collections::HashMap, sync::Arc},
vfs::{
directory::{entry_container::Directory, helper::DirectlyMutable, immutable::Simple},
execution_scope::ExecutionScope,
file::vmo::read_only,
pseudo_directory,
},
};
async fn serve_content_over_socket(content: Vec<u8>, socket: fuchsia_zircon::Socket) {
let mut socket = fidl::AsyncSocket::from_socket(socket);
socket.write_all(content.as_slice()).await.expect("Cannot serve content over socket");
}
async fn serve_and_copy_debug_data(
expected_files: &HashMap<PathBuf, Vec<u8>>,
directory_writer: InMemoryDirectoryWriter,
) {
let mut served_files = vec![];
expected_files.iter().for_each(|(path, content)| {
let (client, server) = fuchsia_zircon::Socket::create_stream();
fasync::Task::spawn(serve_content_over_socket(content.clone(), server)).detach();
served_files.push(ftest_manager::DebugData {
name: Some(path.display().to_string()),
socket: Some(client.into()),
..Default::default()
});
});
let (iterator_proxy, mut iterator_stream) =
fidl::endpoints::create_proxy_and_stream::<ftest_manager::DebugDataIteratorMarker>()
.unwrap();
let serve_fut = async move {
let mut files_iter = served_files.into_iter();
while let Ok(Some(request)) = iterator_stream.try_next().await {
let ftest_manager::DebugDataIteratorRequest::GetNext { responder } = request;
let resp: Vec<_> = files_iter.by_ref().take(3).collect();
let _ = responder.send(resp);
}
};
futures::future::join(
serve_fut,
copy_debug_data(iterator_proxy, Box::new(directory_writer)),
)
.await;
}
async fn serve_and_copy_directory(
fake_dir: Arc<Simple>,
directory_writer: InMemoryDirectoryWriter,
) {
let (directory_client, directory_service) =
fidl::endpoints::create_proxy::<fio::DirectoryMarker>().unwrap();
let scope = ExecutionScope::new();
fake_dir.open(
scope,
fio::OpenFlags::RIGHT_READABLE
| fio::OpenFlags::RIGHT_WRITABLE
| fio::OpenFlags::DIRECTORY,
vfs::path::Path::dot(),
ServerEnd::new(directory_service.into_channel()),
);
copy_custom_artifact_directory(directory_client, Box::new(directory_writer))
.await
.expect("reading custom directory");
}
fn test_cases() -> Vec<(&'static str, Arc<Simple>, HashMap<PathBuf, Vec<u8>>)> {
vec![
("empty", pseudo_directory! {}, hashmap! {}),
(
"single file",
pseudo_directory! {
"test_file.txt" => read_only("Hello, World!"),
},
hashmap! {
"test_file.txt".to_string().into() => b"Hello, World!".to_vec()
},
),
(
"subdir",
pseudo_directory! {
"sub" => pseudo_directory! {
"nested.txt" => read_only("Nested file!"),
}
},
hashmap! {
"sub/nested.txt".to_string().into() => b"Nested file!".to_vec()
},
),
(
"empty file",
pseudo_directory! {
"empty.txt" => read_only(""),
},
hashmap! {
"empty.txt".to_string().into() => b"".to_vec()
},
),
(
"big file",
pseudo_directory! {
"big.txt" => read_only(vec![b's'; (fio::MAX_BUF as usize)*2]),
},
hashmap! {
"big.txt".to_string().into() => vec![b's'; (fio::MAX_BUF as usize) *2 as usize]
},
),
(
"100 files",
{
let dir = pseudo_directory! {};
for i in 0..100 {
dir.add_entry(
format!("{:?}.txt", i),
read_only(format!("contents for {:?}", i)),
)
.expect("add file");
}
dir
},
(0..100)
.map(|i| {
(
format!("{:?}.txt", i).into(),
format!("contents for {:?}", i).into_bytes(),
)
})
.collect(),
),
]
}
#[fuchsia::test]
async fn test_copy_dir() {
for (name, fake_dir, expected_files) in test_cases() {
let artifact = InMemoryDirectoryWriter::default();
serve_and_copy_directory(fake_dir, artifact.clone()).await;
let actual_files: HashMap<_, _> = artifact
.files
.lock()
.iter()
.map(|(path, artifact)| (path.clone(), artifact.get_contents()))
.collect();
assert_eq!(expected_files, actual_files, "{}", name);
}
}
#[fuchsia::test]
async fn test_copy_debug_data() {
for (name, _fake_dir, expected_files) in test_cases() {
let artifact = InMemoryDirectoryWriter::default();
serve_and_copy_debug_data(&expected_files, artifact.clone()).await;
let actual_files: HashMap<_, _> = artifact
.files
.lock()
.iter()
.map(|(path, artifact)| (path.clone(), artifact.get_contents()))
.collect();
assert_eq!(expected_files, actual_files, "{}", name);
}
}
}