blob: 2518c0e1ace7cd0e3607c2b8694af2fb0de26b7b [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{anyhow, Context as _, Error, Result},
async_trait::async_trait,
diagnostics_data::{Data, Inspect, Lifecycle, Logs, LogsData},
diagnostics_reader::{ArchiveReader, Error as ReaderError},
fidl::endpoints::ServerEnd,
fidl_fuchsia_developer_remotecontrol::{
ArchiveIteratorEntry, ArchiveIteratorError, ArchiveIteratorMarker, ArchiveIteratorRequest,
BridgeStreamParameters, DiagnosticsData, InlineData, RemoteDiagnosticsBridgeRequest,
RemoteDiagnosticsBridgeRequestStream, StreamError,
},
fidl_fuchsia_diagnostics::{
ArchiveAccessorProxy,
ClientSelectorConfiguration::{SelectAll, Selectors},
DataType, SelectorArgument, StreamMode,
},
fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES,
fuchsia_async as fasync,
fuchsia_component::server::ServiceFs,
futures::{
prelude::*,
select,
stream::{FusedStream, TryStreamExt},
},
iquery::commands::connect_to_archive_accessor,
selectors,
std::fmt::Debug,
std::sync::Arc,
tracing::{error, info, warn},
};
// This lets us mock out the ArchiveReader in tests
#[async_trait]
pub trait ArchiveReaderManager {
type Error: Debug + Send + 'static;
async fn snapshot<D: diagnostics_data::DiagnosticsData + 'static>(
&self,
) -> Result<Vec<Data<D>>, StreamError>;
fn spawn_snapshot_server<D: diagnostics_data::DiagnosticsData + 'static>(
&self,
data: Vec<Data<D>>,
iterator: ServerEnd<ArchiveIteratorMarker>,
) -> fasync::Task<Result<(), Error>> {
let task = fasync::Task::spawn(async move {
let mut is_sent = false;
let mut iter_stream = iterator.into_stream()?;
while let Some(request) = iter_stream.next().await {
let ArchiveIteratorRequest::GetNext { responder } = request?;
if is_sent {
responder.send(&mut Ok(vec![]))?;
continue;
}
is_sent = true;
let data = serde_json::to_string(&data)?;
if data.len() <= MAX_DATAGRAM_LEN_BYTES as usize {
let response = vec![ArchiveIteratorEntry {
diagnostics_data: Some(DiagnosticsData::Inline(InlineData {
data,
truncated_chars: 0,
})),
..ArchiveIteratorEntry::EMPTY
}];
responder.send(&mut Ok(response))?;
} else {
let sock_opts = fuchsia_zircon::SocketOpts::STREAM;
let (socket, tx_socket) = fuchsia_zircon::Socket::create(sock_opts)?;
let mut tx_socket = fasync::Socket::from_socket(tx_socket)?;
let response = vec![ArchiveIteratorEntry {
diagnostics_data: Some(DiagnosticsData::Socket(socket)),
..ArchiveIteratorEntry::EMPTY
}];
responder.send(&mut Ok(response))?;
tx_socket.write_all(data.as_bytes()).await?;
}
}
Ok::<(), Error>(())
});
task
}
fn start_log_stream(
&mut self,
) -> Result<
Box<dyn FusedStream<Item = Result<LogsData, Self::Error>> + Unpin + Send>,
StreamError,
>;
/// Provides an implementation of an ArchiveIterator server. Intended to be used by clients who
/// wish to spawn an ArchiveIterator server given an implementation of `start_log_stream`..
fn spawn_iterator_server(
&mut self,
iterator: ServerEnd<ArchiveIteratorMarker>,
) -> Result<fasync::Task<Result<(), Error>>, StreamError> {
let stream_result = match self.start_log_stream() {
Ok(r) => r,
Err(e) => return Err(e),
};
let task = fasync::Task::spawn(async move {
let mut iter_stream = iterator.into_stream()?;
let mut result_stream = stream_result;
while let Some(request) = iter_stream.next().await {
match request? {
ArchiveIteratorRequest::GetNext { responder } => {
let logs = select! {
result = result_stream.select_next_some() => {
match result {
Err(err) => {
warn!(?err, "Data read error");
responder.send(&mut Err(
ArchiveIteratorError::DataReadFailed))?;
continue;
}
Ok(log) => log,
}
}
complete => {
responder.send(&mut Ok(vec![]))?;
break;
}
};
let send_inline = logs_data_can_be_sent_inline(&logs);
let data = serde_json::to_string(&logs)?;
if send_inline {
// Short data => send the data within the message.
let response = vec![ArchiveIteratorEntry {
diagnostics_data: Some(DiagnosticsData::Inline(InlineData {
data,
truncated_chars: 0,
})),
..ArchiveIteratorEntry::EMPTY
}];
responder.send(&mut Ok(response))?;
} else {
// Long data => create a socket, send the socket in the message, then
// write all the data on the other side of the socket.
let sock_opts = fuchsia_zircon::SocketOpts::STREAM;
let (socket, tx_socket) = fuchsia_zircon::Socket::create(sock_opts)?;
let mut tx_socket = fasync::Socket::from_socket(tx_socket)?;
let response = vec![ArchiveIteratorEntry {
diagnostics_data: Some(DiagnosticsData::Socket(socket)),
..ArchiveIteratorEntry::EMPTY
}];
responder.send(&mut Ok(response))?;
tx_socket.write_all(data.as_bytes()).await?;
}
}
}
}
Ok::<(), Error>(())
});
Ok(task)
}
}
struct ArchiveReaderManagerImpl {
reader: ArchiveReader,
}
impl ArchiveReaderManagerImpl {
async fn new(parameters: BridgeStreamParameters) -> Result<Self> {
let archive: ArchiveAccessorProxy =
connect_to_archive_accessor(&parameters.accessor_path).await?;
let mut reader = ArchiveReader::new();
reader.with_archive(archive).retry_if_empty(false);
match parameters.client_selector_configuration {
Some(Selectors(s)) => {
for selector in s.into_iter() {
let selector = match selector {
SelectorArgument::StructuredSelector(s) => {
selectors::selector_to_string(s)?
}
SelectorArgument::RawSelector(s) => s,
_ => return Err(anyhow!("unknown selector argument")),
};
reader.add_selector(selector);
}
}
Some(SelectAll(true)) => {}
_ => return Err(anyhow!("unsupported selector arguments")),
}
Ok(Self { reader })
}
}
#[async_trait]
impl ArchiveReaderManager for ArchiveReaderManagerImpl {
type Error = ReaderError;
async fn snapshot<D: diagnostics_data::DiagnosticsData + 'static>(
&self,
) -> Result<Vec<Data<D>>, StreamError> {
self.reader.snapshot::<D>().await.map_err(|err| {
warn!(%err, "Got error getting snapshot");
StreamError::SetupSubscriptionFailed
})
}
fn start_log_stream(
&mut self,
) -> Result<
Box<dyn FusedStream<Item = Result<LogsData, Self::Error>> + Unpin + Send>,
StreamError,
> {
let result_stream = self.reader.snapshot_then_subscribe::<Logs>().map_err(|err| {
warn!(%err, "Got error creating log subscription");
StreamError::SetupSubscriptionFailed
})?;
Ok(Box::new(result_stream))
}
}
// This checks whether the log can fit within a FIDL message to be able to use the inline variant
// of the DiagnosticsData type.
fn logs_data_can_be_sent_inline(logs: &LogsData) -> bool {
logs.msg().map(|msg| msg.len() <= MAX_DATAGRAM_LEN_BYTES as usize).unwrap_or(true)
}
pub struct RemoteDiagnosticsBridge<E, F, A, Fut>
where
F: Fn(BridgeStreamParameters) -> Fut,
E: Into<anyhow::Error> + Send,
A: ArchiveReaderManager<Error = E> + Send,
Fut: Future<Output = Result<A>>,
{
archive_accessor: F,
}
impl<E, F, A, Fut> RemoteDiagnosticsBridge<E, F, A, Fut>
where
F: Fn(BridgeStreamParameters) -> Fut + Send + Sync + 'static,
E: Into<anyhow::Error> + Send + Debug + 'static,
A: ArchiveReaderManager<Error = E> + Send,
Fut: Future<Output = Result<A>>,
{
pub fn new(archive_accessor: F) -> Result<Self> {
return Ok(RemoteDiagnosticsBridge { archive_accessor });
}
fn validate_params(
&self,
parameters: &BridgeStreamParameters,
) -> Result<(DataType, StreamMode), StreamError> {
match (parameters.data_type, parameters.stream_mode) {
(Some(d), Some(s @ StreamMode::Snapshot)) => Ok((d, s)),
(Some(d @ DataType::Logs), Some(s @ StreamMode::SnapshotThenSubscribe)) => Ok((d, s)),
(None, _) => Err(StreamError::MissingParameter),
(_, None) => Err(StreamError::MissingParameter),
(_, _) => Err(StreamError::UnsupportedParameter),
}
}
pub async fn serve_stream(
&self,
mut stream: RemoteDiagnosticsBridgeRequestStream,
) -> Result<()> {
while let Some(request) =
stream.try_next().await.context("next RemoteDiagnosticsBridge request")?
{
match request {
RemoteDiagnosticsBridgeRequest::StreamDiagnostics {
parameters,
iterator,
responder,
} => {
let (data_type, stream_mode) = match self.validate_params(&parameters) {
Err(e) => {
responder.send(&mut Err(e))?;
continue;
}
Ok(v) => v,
};
let mut reader = (self.archive_accessor)(parameters).await?;
let res = match stream_mode {
StreamMode::SnapshotThenSubscribe => reader.spawn_iterator_server(iterator),
StreamMode::Snapshot => match data_type {
DataType::Logs => reader
.snapshot::<Logs>()
.await
.map(|data| reader.spawn_snapshot_server(data, iterator)),
DataType::Inspect => reader
.snapshot::<Inspect>()
.await
.map(|data| reader.spawn_snapshot_server(data, iterator)),
DataType::Lifecycle => reader
.snapshot::<Lifecycle>()
.await
.map(|data| reader.spawn_snapshot_server(data, iterator)),
},
_ => {
responder.send(&mut Err(StreamError::UnsupportedParameter))?;
continue;
}
};
match res {
Ok(task) => {
responder.send(&mut Ok(()))?;
task.await?;
}
Err(e) => {
responder.send(&mut Err(e))?;
}
}
}
RemoteDiagnosticsBridgeRequest::Hello { responder } => {
responder.send()?;
}
}
}
Ok(())
}
}
pub async fn exec_server() -> Result<()> {
fuchsia_syslog::init_with_tags(&["remote-diagnostics-bridge"])?;
info!("starting log-reader");
let service = Arc::new(RemoteDiagnosticsBridge::new(|p| ArchiveReaderManagerImpl::new(p))?);
let sc1 = service.clone();
let mut fs = ServiceFs::new_local();
fs.dir("svc").add_fidl_service(move |req| {
let sc = sc1.clone();
fasync::Task::local(async move {
match sc.clone().serve_stream(req).await {
Ok(()) => {}
Err(e) => {
error!(%e, "error encountered while serving stream");
}
}
})
.detach();
});
fs.take_and_serve_directory_handle()?;
fs.collect::<()>().await;
Ok(())
}
#[cfg(test)]
mod test {
use {
super::*,
anyhow::Error,
async_trait::async_trait,
diagnostics_data::{DiagnosticsHierarchy, InspectData, LifecycleData, Property, Severity},
fidl::endpoints::create_proxy,
fidl::AsyncSocket,
fidl_fuchsia_developer_remotecontrol::{
ArchiveIteratorMarker, RemoteDiagnosticsBridgeMarker, RemoteDiagnosticsBridgeProxy,
},
fuchsia_async as fasync,
futures::stream::iter,
matches::assert_matches,
};
const LONG_LOG_LEN: u32 = MAX_DATAGRAM_LEN_BYTES * 2;
struct FakeArchiveReaderManager {
logs: Vec<LogsData>,
inspect: Vec<InspectData>,
lifecycle: Vec<LifecycleData>,
errors: Vec<Error>,
connection_error: Option<StreamError>,
}
impl FakeArchiveReaderManager {
fn new(_parameters: BridgeStreamParameters) -> Result<Self> {
return Ok(Self {
logs: vec![],
inspect: vec![],
lifecycle: vec![],
connection_error: None,
errors: vec![],
});
}
fn new_with_data(
_parameters: BridgeStreamParameters,
logs: Vec<LogsData>,
inspect: Vec<InspectData>,
lifecycle: Vec<LifecycleData>,
errors: Vec<Error>,
) -> Result<Self> {
return Ok(Self { logs, inspect, lifecycle, connection_error: None, errors });
}
fn new_with_failed_connect(
_parameters: BridgeStreamParameters,
error: StreamError,
) -> Result<Self> {
return Ok(Self {
logs: vec![],
inspect: vec![],
lifecycle: vec![],
errors: vec![],
connection_error: Some(error),
});
}
}
#[async_trait]
impl ArchiveReaderManager for FakeArchiveReaderManager {
type Error = anyhow::Error;
async fn snapshot<D: diagnostics_data::DiagnosticsData + 'static>(
&self,
) -> Result<Vec<Data<D>>, StreamError> {
if let Some(cerr) = self.connection_error {
return Err(cerr);
}
match D::DATA_TYPE {
DataType::Inspect => {
let data = self.inspect.clone();
let data = async move { data }.await;
let data_str = serde_json::to_string(&data).unwrap();
let result = serde_json::from_str(&data_str).unwrap();
Ok(result)
}
DataType::Lifecycle => {
let data = self.lifecycle.clone();
let data = async move { data }.await;
let data_str = serde_json::to_string(&data).unwrap();
let result = serde_json::from_str(&data_str).unwrap();
Ok(result)
}
DataType::Logs => {
let data = self.logs.clone();
let data = async move { data }.await;
let data_str = serde_json::to_string(&data).unwrap();
let result = serde_json::from_str(&data_str).unwrap();
Ok(result)
}
}
}
fn start_log_stream(
&mut self,
) -> Result<
Box<dyn FusedStream<Item = Result<LogsData, Self::Error>> + Unpin + Send>,
StreamError,
> {
if let Some(cerr) = self.connection_error {
return Err(cerr);
}
let (mut sender, rec) =
futures::channel::mpsc::channel::<Result<LogsData, Self::Error>>(0);
let logs = self.logs.clone();
let errors = self.errors.drain(..).collect::<Vec<Error>>();
fasync::Task::local(async move {
sender.send_all(&mut iter(errors.into_iter().map(|e| Ok(Err(e))))).await.unwrap();
for log in logs {
sender.send(Ok(log)).await.unwrap();
}
})
.detach();
Ok(Box::new(rec.fuse()))
}
}
fn setup_diagnostics_bridge_proxy<F, A, Fut>(service: F) -> RemoteDiagnosticsBridgeProxy
where
F: Fn(BridgeStreamParameters) -> Fut + std::marker::Send + std::marker::Sync + 'static,
A: ArchiveReaderManager<Error = anyhow::Error> + std::marker::Send + 'static,
Fut: Future<Output = Result<A>> + 'static,
{
let service = Arc::new(RemoteDiagnosticsBridge::new(service).unwrap());
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<RemoteDiagnosticsBridgeMarker>().unwrap();
fasync::Task::local(async move {
service.serve_stream(stream).await.unwrap();
})
.detach();
return proxy;
}
fn make_log(timestamp: i64, msg: String) -> LogsData {
diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
timestamp_nanos: timestamp.into(),
component_url: Some(String::from("fake-url")),
moniker: String::from("test/moniker"),
severity: Severity::Error,
size_bytes: 1,
})
.set_message(msg)
.build()
}
fn make_short_log(timestamp: i64) -> LogsData {
make_log(timestamp, "msg".to_string())
}
fn make_long_log(timestamp: i64, len: u32) -> LogsData {
let mut msg = String::default();
for _ in 0..len {
msg.push('a');
}
make_log(timestamp, msg)
}
fn default_log_parameters() -> BridgeStreamParameters {
BridgeStreamParameters {
data_type: Some(DataType::Logs),
stream_mode: Some(StreamMode::SnapshotThenSubscribe),
..BridgeStreamParameters::EMPTY
}
}
fn make_short_lifecycle(timestamp: i64) -> LifecycleData {
Data::for_lifecycle_event(
String::from("test/moniker"),
diagnostics_data::LifecycleType::Started,
None,
String::from("fake-url"),
timestamp,
vec![],
)
}
fn make_long_inspect(timestamp: i64) -> InspectData {
let long_string = std::iter::repeat("a").take(LONG_LOG_LEN as usize).collect::<String>();
let hierarchy = DiagnosticsHierarchy::new(
String::from("name"),
vec![Property::String(String::from("hello"), long_string)],
vec![],
);
Data::for_inspect(
String::from("test/moniker"),
Some(hierarchy),
timestamp,
String::from("fake-url"),
String::from("fake-filename"),
vec![],
)
}
#[fasync::run_singlethreaded(test)]
async fn test_empty_log_stream() {
let proxy = setup_diagnostics_bridge_proxy(|p| async { FakeArchiveReaderManager::new(p) });
let (iterator, server) = create_proxy::<ArchiveIteratorMarker>().unwrap();
proxy.stream_diagnostics(default_log_parameters(), server).await.unwrap().unwrap();
let entries = iterator.get_next().await.unwrap().expect("expected Ok response");
assert!(entries.is_empty());
}
#[fasync::run_singlethreaded(test)]
async fn test_missing_data_type() {
let proxy = setup_diagnostics_bridge_proxy(|p| async { FakeArchiveReaderManager::new(p) });
let (_, server) = create_proxy::<ArchiveIteratorMarker>().unwrap();
let err = proxy
.stream_diagnostics(
BridgeStreamParameters {
data_type: None,
stream_mode: Some(StreamMode::SnapshotThenSubscribe),
..BridgeStreamParameters::EMPTY
},
server,
)
.await
.unwrap()
.unwrap_err();
assert_eq!(err, StreamError::MissingParameter);
}
#[fasync::run_singlethreaded(test)]
async fn test_missing_stream_mode() {
let proxy = setup_diagnostics_bridge_proxy(|p| async { FakeArchiveReaderManager::new(p) });
let (_, server) = create_proxy::<ArchiveIteratorMarker>().unwrap();
let err = proxy
.stream_diagnostics(
BridgeStreamParameters {
data_type: Some(DataType::Logs),
stream_mode: None,
..BridgeStreamParameters::EMPTY
},
server,
)
.await
.unwrap()
.unwrap_err();
assert_eq!(err, StreamError::MissingParameter);
}
#[fasync::run_singlethreaded(test)]
async fn test_unsupported_data_type_for_subscribe() {
let proxy = setup_diagnostics_bridge_proxy(|p| async { FakeArchiveReaderManager::new(p) });
let (_, server) = create_proxy::<ArchiveIteratorMarker>().unwrap();
let err = proxy
.stream_diagnostics(
BridgeStreamParameters {
data_type: Some(DataType::Inspect),
stream_mode: Some(StreamMode::SnapshotThenSubscribe),
..BridgeStreamParameters::EMPTY
},
server,
)
.await
.unwrap()
.unwrap_err();
assert_eq!(err, StreamError::UnsupportedParameter);
}
#[fasync::run_singlethreaded(test)]
async fn test_unsupported_stream_mode() {
let proxy = setup_diagnostics_bridge_proxy(|p| async { FakeArchiveReaderManager::new(p) });
let (_, server) = create_proxy::<ArchiveIteratorMarker>().unwrap();
let err = proxy
.stream_diagnostics(
BridgeStreamParameters {
data_type: Some(DataType::Logs),
stream_mode: Some(StreamMode::Subscribe),
..BridgeStreamParameters::EMPTY
},
server,
)
.await
.unwrap()
.unwrap_err();
assert_eq!(err, StreamError::UnsupportedParameter);
}
#[fasync::run_singlethreaded(test)]
async fn test_two_log_stream_no_errors() {
let proxy = setup_diagnostics_bridge_proxy(|p| async {
FakeArchiveReaderManager::new_with_data(
p,
vec![make_short_log(1), make_short_log(2)],
vec![],
vec![],
vec![],
)
});
let (iterator, server) = create_proxy::<ArchiveIteratorMarker>().unwrap();
proxy
.stream_diagnostics(default_log_parameters(), server)
.await
.unwrap()
.expect("expect Ok response");
let entries = iterator.get_next().await.unwrap().expect("get next should not error");
assert_eq!(entries.len(), 1);
let entry = entries.get(0).unwrap();
let diagnostics_data = entry.diagnostics_data.as_ref().unwrap();
assert_matches!(diagnostics_data, DiagnosticsData::Inline(_));
if let DiagnosticsData::Inline(inline) = diagnostics_data {
assert_eq!(inline.truncated_chars, 0);
assert_eq!(inline.data, serde_json::to_string(&make_short_log(1)).unwrap());
}
let entries = iterator.get_next().await.unwrap().expect("get next should not error");
assert_eq!(entries.len(), 1);
let entry = entries.get(0).unwrap();
let diagnostics_data = entry.diagnostics_data.as_ref().unwrap();
assert_matches!(diagnostics_data, DiagnosticsData::Inline(_));
if let DiagnosticsData::Inline(inline) = diagnostics_data {
assert_eq!(inline.truncated_chars, 0);
assert_eq!(inline.data, serde_json::to_string(&make_short_log(2)).unwrap());
}
let empty_entries = iterator.get_next().await.unwrap().expect("get next should not error");
assert!(empty_entries.is_empty());
}
#[fasync::run_singlethreaded(test)]
async fn test_two_log_stream_with_errors() {
let proxy = setup_diagnostics_bridge_proxy(|p| async {
FakeArchiveReaderManager::new_with_data(
p,
vec![make_short_log(1)],
vec![],
vec![],
vec![anyhow!("error1"), anyhow!("error2")],
)
});
let (iterator, server) = create_proxy::<ArchiveIteratorMarker>().unwrap();
proxy
.stream_diagnostics(default_log_parameters(), server)
.await
.unwrap()
.expect("expect Ok response");
let result = iterator.get_next().await.unwrap();
assert_eq!(result.unwrap_err(), ArchiveIteratorError::DataReadFailed);
let result = iterator.get_next().await.unwrap();
assert_eq!(result.unwrap_err(), ArchiveIteratorError::DataReadFailed);
let entries = iterator.get_next().await.unwrap().expect("get next should not error");
assert_eq!(entries.len(), 1);
let entry = entries.get(0).unwrap();
let diagnostics_data = entry.diagnostics_data.as_ref().unwrap();
assert_matches!(diagnostics_data, DiagnosticsData::Inline(_));
if let DiagnosticsData::Inline(inline) = diagnostics_data {
assert_eq!(inline.truncated_chars, 0);
assert_eq!(inline.data.clone(), serde_json::to_string(&make_short_log(1)).unwrap());
}
let empty_entries = iterator.get_next().await.unwrap().expect("get next should not error");
assert!(empty_entries.is_empty());
}
#[fasync::run_singlethreaded(test)]
async fn test_connection_fails() {
let proxy = setup_diagnostics_bridge_proxy(|p| async {
FakeArchiveReaderManager::new_with_failed_connect(
p,
StreamError::SetupSubscriptionFailed,
)
});
let (_, server) = create_proxy::<ArchiveIteratorMarker>().unwrap();
let result = proxy
.stream_diagnostics(default_log_parameters(), server)
.await
.unwrap()
.expect_err("connection should fail");
assert_eq!(result, StreamError::SetupSubscriptionFailed);
}
#[fasync::run_singlethreaded(test)]
async fn test_long_message_is_sent_with_a_socket() {
let proxy = setup_diagnostics_bridge_proxy(|p| async {
FakeArchiveReaderManager::new_with_data(
p,
vec![make_long_log(1, LONG_LOG_LEN)],
vec![],
vec![],
vec![],
)
});
let (iterator, server) = create_proxy::<ArchiveIteratorMarker>().unwrap();
proxy
.stream_diagnostics(default_log_parameters(), server)
.await
.unwrap()
.expect("expect Ok response");
let mut entries = iterator.get_next().await.unwrap().expect("get next should not error");
assert_eq!(entries.len(), 1);
let entry = entries.pop().unwrap();
let diagnostics_data = entry.diagnostics_data.unwrap();
assert_matches!(diagnostics_data, DiagnosticsData::Socket(_));
if let DiagnosticsData::Socket(socket) = diagnostics_data {
let mut socket = AsyncSocket::from_socket(socket).unwrap();
let mut result = Vec::new();
let _bytes = socket.read_to_end(&mut result).await.unwrap();
let data: LogsData = serde_json::from_slice(&result).unwrap();
let expected_data = make_long_log(1, LONG_LOG_LEN);
assert_eq!(data, expected_data);
}
let empty_entries = iterator.get_next().await.unwrap().expect("get next should not error");
assert!(empty_entries.is_empty());
}
#[fasync::run_singlethreaded(test)]
async fn test_short_lifecycle_snapshot() {
let proxy = setup_diagnostics_bridge_proxy(|p| async {
FakeArchiveReaderManager::new_with_data(
p,
vec![],
vec![],
vec![make_short_lifecycle(1)],
vec![],
)
});
let (iterator, server) = create_proxy::<ArchiveIteratorMarker>().unwrap();
proxy
.stream_diagnostics(
BridgeStreamParameters {
data_type: Some(DataType::Lifecycle),
stream_mode: Some(StreamMode::Snapshot),
..BridgeStreamParameters::EMPTY
},
server,
)
.await
.unwrap()
.unwrap();
let mut entries = iterator.get_next().await.unwrap().expect("get next should not error");
assert_eq!(entries.len(), 1);
let entry = entries.pop().unwrap();
let diagnostics_data = entry.diagnostics_data.unwrap();
assert_matches!(diagnostics_data, DiagnosticsData::Inline(_));
if let DiagnosticsData::Inline(inline) = diagnostics_data {
let data: Vec<LifecycleData> = serde_json::from_str(inline.data.as_ref()).unwrap();
let expected_data = vec![make_short_lifecycle(1)];
assert_eq!(data, expected_data);
}
let empty_entries = iterator.get_next().await.unwrap().expect("get next should not error");
assert!(empty_entries.is_empty());
}
#[fasync::run_singlethreaded(test)]
async fn test_long_inspect_snapshot() {
let proxy = setup_diagnostics_bridge_proxy(|p| async {
FakeArchiveReaderManager::new_with_data(
p,
vec![],
vec![make_long_inspect(1)],
vec![],
vec![],
)
});
let (iterator, server) = create_proxy::<ArchiveIteratorMarker>().unwrap();
proxy
.stream_diagnostics(
BridgeStreamParameters {
data_type: Some(DataType::Inspect),
stream_mode: Some(StreamMode::Snapshot),
..BridgeStreamParameters::EMPTY
},
server,
)
.await
.unwrap()
.unwrap();
let mut entries = iterator.get_next().await.unwrap().expect("get next should not error");
assert_eq!(entries.len(), 1);
let entry = entries.pop().unwrap();
let diagnostics_data = entry.diagnostics_data.unwrap();
assert_matches!(diagnostics_data, DiagnosticsData::Socket(_));
if let DiagnosticsData::Socket(socket) = diagnostics_data {
let mut socket = AsyncSocket::from_socket(socket).unwrap();
let mut result = Vec::new();
let _bytes = socket.read_to_end(&mut result).await.unwrap();
let actual: Vec<InspectData> = serde_json::from_slice(&result).unwrap();
assert_eq!(actual.get(0).unwrap(), &make_long_inspect(1));
}
let empty_entries = iterator.get_next().await.unwrap().expect("get next should not error");
assert!(empty_entries.is_empty());
}
}