blob: 7ee54df909928bc9fdb88e8b8539d448141bf3eb [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{format_err, Error},
diagnostics_data::LogsData,
fidl::endpoints::ClientEnd,
fidl_fuchsia_developer_remotecontrol::{ArchiveIteratorMarker, ArchiveIteratorProxy},
fidl_fuchsia_test_manager as ftest_manager, fuchsia_async as fasync,
futures::Stream,
futures::{channel::mpsc, stream::BoxStream, SinkExt, StreamExt},
pin_project::pin_project,
serde_json,
std::{
pin::Pin,
task::{Context, Poll},
},
};
#[cfg(target_os = "fuchsia")]
use crate::diagnostics::fuchsia::BatchLogStream;
#[pin_project]
pub struct LogStream {
#[pin]
stream: BoxStream<'static, Result<LogsData, Error>>,
}
impl LogStream {
fn new<S>(stream: S) -> Self
where
S: Stream<Item = Result<LogsData, Error>> + Send + 'static,
{
Self { stream: stream.boxed() }
}
pub fn from_syslog(syslog: ftest_manager::Syslog) -> Result<LogStream, fidl::Error> {
get_log_stream(syslog)
}
}
impl Stream for LogStream {
type Item = Result<LogsData, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.stream.poll_next(cx)
}
}
#[cfg(target_os = "fuchsia")]
fn get_log_stream(syslog: ftest_manager::Syslog) -> Result<LogStream, fidl::Error> {
match syslog {
ftest_manager::Syslog::Archive(client_end) => {
Ok(LogStream::new(ArchiveLogStream::from_client_end(client_end)?))
}
ftest_manager::Syslog::Batch(client_end) => {
Ok(LogStream::new(BatchLogStream::from_client_end(client_end)?))
}
_ => {
panic!("not supported")
}
}
}
#[cfg(not(target_os = "fuchsia"))]
fn get_log_stream(syslog: ftest_manager::Syslog) -> Result<LogStream, fidl::Error> {
match syslog {
ftest_manager::Syslog::Archive(client_end) => {
Ok(LogStream::new(ArchiveLogStream::from_client_end(client_end)?))
}
ftest_manager::Syslog::Batch(_) => panic!("batch iterator not supported on host"),
_ => {
panic!("not supported")
}
}
}
#[cfg(target_os = "fuchsia")]
mod fuchsia {
use {
super::*, diagnostics_data::Logs, diagnostics_reader::Subscription,
fidl_fuchsia_diagnostics::BatchIteratorMarker,
};
#[pin_project]
pub struct BatchLogStream {
#[pin]
subscription: Subscription<Logs>,
}
impl BatchLogStream {
#[cfg(test)]
pub fn new() -> Result<(Self, ftest_manager::LogsIterator), fidl::Error> {
fidl::endpoints::create_proxy::<BatchIteratorMarker>().map(|(proxy, server_end)| {
let subscription = Subscription::new(proxy);
(Self { subscription }, ftest_manager::LogsIterator::Batch(server_end))
})
}
pub fn from_client_end(
client_end: ClientEnd<BatchIteratorMarker>,
) -> Result<Self, fidl::Error> {
Ok(Self { subscription: Subscription::new(client_end.into_proxy()?) })
}
}
impl Stream for BatchLogStream {
type Item = Result<LogsData, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.subscription.poll_next(cx) {
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Some(Ok(value))) => Poll::Ready(Some(Ok(value))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
}
#[pin_project]
struct ArchiveLogStream {
#[pin]
receiver: mpsc::Receiver<Result<LogsData, Error>>,
_drain_task: fasync::Task<()>,
}
impl ArchiveLogStream {
#[cfg(test)]
#[cfg(not(target_os = "fuchsia"))]
fn new() -> Result<(Self, ftest_manager::LogsIterator), fidl::Error> {
fidl::endpoints::create_proxy::<ArchiveIteratorMarker>().map(|(proxy, server_end)| {
let (receiver, _drain_task) = Self::start_streaming_logs(proxy);
(Self { _drain_task, receiver }, ftest_manager::LogsIterator::Archive(server_end))
})
}
pub fn from_client_end(
client_end: ClientEnd<ArchiveIteratorMarker>,
) -> Result<Self, fidl::Error> {
let (receiver, _drain_task) = Self::start_streaming_logs(client_end.into_proxy()?);
Ok(Self { _drain_task, receiver })
}
}
impl ArchiveLogStream {
fn start_streaming_logs(
proxy: ArchiveIteratorProxy,
) -> (mpsc::Receiver<Result<LogsData, Error>>, fasync::Task<()>) {
let (mut sender, receiver) = mpsc::channel(32);
let task = fasync::Task::spawn(async move {
loop {
let result = match proxy.get_next().await {
Err(e) => {
let _ =
sender.send(Err(format_err!("Error calling GetNext: {:?}", e))).await;
break;
}
Ok(batch) => batch,
};
let entries = match result {
Err(e) => {
let _ =
sender.send(Err(format_err!("GetNext returned error: {:?}", e))).await;
break;
}
Ok(entries) => entries,
};
if entries.is_empty() {
break;
}
for data_str in entries.into_iter().map(|e| e.data).filter_map(|data| data) {
let _ = match serde_json::from_str(&data_str) {
Ok(data) => sender.send(Ok(data)).await,
Err(e) => sender.send(Err(format_err!("Malformed json: {:?}", e))).await,
};
}
}
});
(receiver, task)
}
}
impl Stream for ArchiveLogStream {
type Item = Result<LogsData, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.receiver.poll_next(cx)
}
}
#[cfg(test)]
mod tests {
use {
super::*,
diagnostics_data::{Severity, Timestamp},
fidl::endpoints::ServerEnd,
fuchsia_async as fasync,
futures::TryStreamExt,
};
#[cfg(target_os = "fuchsia")]
mod fuchsia {
use {
super::*,
fidl_fuchsia_diagnostics::{
BatchIteratorMarker, BatchIteratorRequest, FormattedContent, ReaderError,
},
fidl_fuchsia_mem as fmem, fuchsia_zircon as zx,
futures::StreamExt,
matches::assert_matches,
};
fn create_log_stream() -> Result<(LogStream, ftest_manager::LogsIterator), fidl::Error> {
let (stream, iterator) = BatchLogStream::new()?;
Ok((LogStream::new(stream), iterator))
}
struct BatchIteratorOpts {
with_error: bool,
}
/// Spanws a dummy batch iterator for testing that return 3 logs: "1", "2", "3" all with
/// the same severity
async fn spawn_batch_iterator_server(
server_end: ServerEnd<BatchIteratorMarker>,
opts: BatchIteratorOpts,
) {
let mut request_stream = server_end.into_stream().expect("got stream");
let mut values = vec![1i64, 2, 3].into_iter();
while let Some(BatchIteratorRequest::GetNext { responder }) =
request_stream.try_next().await.expect("get next request")
{
match values.next() {
None => {
responder.send(&mut Ok(vec![])).expect("send empty response");
}
Some(value) => {
if opts.with_error {
responder.send(&mut Err(ReaderError::Io)).expect("send error");
continue;
}
let content = get_json_data(value);
let size = content.len() as u64;
let vmo = zx::Vmo::create(size).expect("create vmo");
vmo.write(content.as_bytes(), 0).expect("write vmo");
let result = FormattedContent::Json(fmem::Buffer { vmo, size });
responder.send(&mut Ok(vec![result])).expect("send response");
}
}
}
}
#[fasync::run_singlethreaded(test)]
async fn log_stream_returns_logs() {
let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
let server_end = match iterator {
ftest_manager::LogsIterator::Batch(server_end) => server_end,
_ => panic!("unexpected logs iterator server end"),
};
fasync::Task::spawn(spawn_batch_iterator_server(
server_end,
BatchIteratorOpts { with_error: false },
))
.detach();
assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("1"));
assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("2"));
assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("3"));
assert_matches!(log_stream.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn log_stream_can_return_errors() {
let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
let server_end = match iterator {
ftest_manager::LogsIterator::Batch(server_end) => server_end,
_ => panic!("unexpected logs iterator server end"),
};
fasync::Task::spawn(spawn_batch_iterator_server(
server_end,
BatchIteratorOpts { with_error: true },
))
.detach();
assert_matches!(log_stream.next().await, Some(Err(_)));
}
}
#[cfg(not(target_os = "fuchsia"))]
mod host {
use {
super::*,
fidl_fuchsia_developer_remotecontrol::{
ArchiveIteratorEntry, ArchiveIteratorError, ArchiveIteratorMarker,
ArchiveIteratorRequest,
},
matches::assert_matches,
};
fn create_log_stream() -> Result<(LogStream, ftest_manager::LogsIterator), fidl::Error> {
let (stream, iterator) = ArchiveLogStream::new()?;
Ok((LogStream::new(stream), iterator))
}
async fn spawn_archive_iterator_server(
server_end: ServerEnd<ArchiveIteratorMarker>,
with_error: bool,
) {
let mut request_stream = server_end.into_stream().expect("got stream");
let mut values = vec![1, 2, 3].into_iter();
while let Some(ArchiveIteratorRequest::GetNext { responder }) =
request_stream.try_next().await.expect("get next request")
{
match values.next() {
None => {
responder.send(&mut Ok(vec![])).expect("send empty response");
}
Some(value) => {
if with_error {
responder
.send(&mut Err(ArchiveIteratorError::DataReadFailed))
.expect("send error");
continue;
}
let json_data = get_json_data(value);
let result = ArchiveIteratorEntry {
data: Some(json_data),
..ArchiveIteratorEntry::EMPTY
};
responder.send(&mut Ok(vec![result])).expect("send response");
}
}
}
}
#[fasync::run_singlethreaded(test)]
async fn archive_stream_returns_logs() {
let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
let server_end = match iterator {
ftest_manager::LogsIterator::Archive(server_end) => server_end,
_ => panic!("unexpected logs iterator server end"),
};
fasync::Task::spawn(spawn_archive_iterator_server(server_end, false)).detach();
assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("1"));
assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("2"));
assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("3"));
assert_matches!(log_stream.next().await, None);
}
#[fasync::run_singlethreaded(test)]
async fn archive_stream_can_return_errors() {
let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
let server_end = match iterator {
ftest_manager::LogsIterator::Archive(server_end) => server_end,
_ => panic!("unexpected logs iterator server end"),
};
fasync::Task::spawn(spawn_archive_iterator_server(server_end, true)).detach();
assert_matches!(log_stream.next().await, Some(Err(_)));
}
}
fn get_json_data(value: i64) -> String {
let data = diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
timestamp_nanos: Timestamp::from(0).into(),
component_url: String::from("fake-url"),
moniker: String::from("test/moniker"),
severity: Severity::Info,
size_bytes: 1,
})
.set_message(value.to_string())
.build();
serde_json::to_string(&data).expect("serialize to json")
}
}