blob: fd54202457e48659cdab763bd4bfcde158ff7a02 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{format_err, Context as _, Error};
use buffer::LazyItem;
use fidl::endpoints::{ServerEnd, ServiceMarker};
use fidl_fuchsia_diagnostics::{Interest, StreamMode};
use fidl_fuchsia_logger::LogSinkMarker;
use fidl_fuchsia_logger::{
LogRequest, LogRequestStream, LogSinkControlHandle, LogSinkRequest, LogSinkRequestStream,
};
use fidl_fuchsia_sys2 as fsys;
use fidl_fuchsia_sys_internal::{
LogConnection, LogConnectionListenerRequest, LogConnectorProxy, SourceIdentity,
};
use fuchsia_async::{self as fasync, Task};
use fuchsia_inspect as inspect;
use fuchsia_inspect_derive::Inspect;
use fuchsia_zircon as zx;
use futures::{channel::mpsc, future::FutureObj, lock::Mutex, prelude::*};
use log::{debug, error, trace, warn};
use std::sync::Arc;
mod buffer;
pub mod debuglog;
mod error;
mod interest;
mod listener;
pub mod message;
mod socket;
pub mod stats;
#[cfg(test)]
pub mod testing;
pub use debuglog::{convert_debuglog_to_log_message, KernelDebugLog};
pub use message::Message;
use interest::InterestDispatcher;
use listener::{pretend_scary_listener_is_safe, Listener};
use socket::{Encoding, Forwarder, LegacyEncoding, LogMessageSocket, StructuredEncoding};
use stats::LogSource;
/// Store 4 MB of log messages and delete on FIFO basis.
const OLD_MSGS_BUF_SIZE: usize = 4 * 1024 * 1024;
/// The `LogManager` is responsible for brokering all logging in the archivist.
#[derive(Clone, Inspect)]
pub struct LogManager {
#[inspect(forward)]
inner: Arc<Mutex<ManagerInner>>,
}
#[derive(Inspect)]
struct ManagerInner {
#[inspect(skip)]
interest_dispatcher: InterestDispatcher,
#[inspect(skip)]
legacy_forwarder: Forwarder<LegacyEncoding>,
#[inspect(skip)]
structured_forwarder: Forwarder<StructuredEncoding>,
#[inspect(rename = "buffer_stats")]
log_msg_buffer: buffer::MemoryBoundedBuffer<Message>,
stats: stats::LogManagerStats,
inspect_node: inspect::Node,
}
impl LogManager {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(ManagerInner {
interest_dispatcher: InterestDispatcher::default(),
log_msg_buffer: buffer::MemoryBoundedBuffer::new(OLD_MSGS_BUF_SIZE),
stats: stats::LogManagerStats::new_detached(),
inspect_node: inspect::Node::default(),
legacy_forwarder: Forwarder::new(),
structured_forwarder: Forwarder::new(),
})),
}
}
/// Drain the kernel's debug log. The returned future completes once
/// existing messages have been ingested.
pub async fn drain_debuglog<K>(self, klog_reader: K)
where
K: debuglog::DebugLog + Send + Sync + 'static,
{
debug!("Draining debuglog.");
let component_log_stats = {
let inner = self.inner.lock().await;
inner.stats.get_component_log_stats("fuchsia-boot://klog").await
};
let mut kernel_logger = debuglog::DebugLogBridge::create(klog_reader);
let mut messages = match kernel_logger.existing_logs().await {
Ok(messages) => messages,
Err(e) => {
error!("failed to read from kernel log, important logs may be missing: {}", e);
return;
}
};
messages.sort_by_key(|m| m.metadata.timestamp);
for message in messages {
component_log_stats.record_log(&message);
self.ingest_message(message, LogSource::Kernel).await;
}
let res = kernel_logger
.listen()
.try_for_each(|message| {
async {
component_log_stats.clone().record_log(&message);
self.ingest_message(message, LogSource::Kernel).await
}
.map(Ok)
})
.await;
if let Err(e) = res {
error!("failed to drain kernel log, important logs may be missing: {}", e);
}
}
/// Drain log sink for messages sent by the archivist itself.
pub async fn drain_internal_log_sink(self, socket: zx::Socket, name: &str) {
let forwarder = self.inner.lock().await.legacy_forwarder.clone();
// TODO(fxbug.dev/50105): Figure out how to properly populate SourceIdentity
let mut source = SourceIdentity::empty();
source.component_name = Some(name.to_owned());
let source = Arc::new(source);
let log_stream = LogMessageSocket::new(socket, source, forwarder)
.expect("failed to create internal LogMessageSocket");
self.drain_messages(log_stream).await;
unreachable!();
}
/// Handle `LogConnectionListener` for the parent realm, eventually passing
/// `LogSink` connections into the manager.
pub async fn handle_log_connector(
self,
connector: LogConnectorProxy,
sender: mpsc::UnboundedSender<FutureObj<'static, ()>>,
) {
debug!("Handling LogSink connections from appmgr.");
match connector.take_log_connection_listener().await {
Ok(Some(listener)) => {
let mut connections =
listener.into_stream().expect("getting request stream from server end");
while let Ok(Some(connection)) = connections.try_next().await {
match connection {
LogConnectionListenerRequest::OnNewConnection {
connection: LogConnection { log_request, source_identity },
control_handle: _,
} => {
let stream = log_request
.into_stream()
.expect("getting LogSinkRequestStream from serverend");
let source = Arc::new(source_identity);
fasync::Task::spawn(self.clone().handle_log_sink(
stream,
source,
sender.clone(),
))
.detach()
}
};
}
}
Ok(None) => warn!("local realm already gave out LogConnectionListener, skipping logs"),
Err(e) => error!("error retrieving LogConnectionListener from LogConnector: {}", e),
}
}
/// Handle `LogSink` protocol on `stream`. The future returned by this
/// function will not complete before all messages on this connection are
/// processed.
pub async fn handle_log_sink(
self,
mut stream: LogSinkRequestStream,
source: Arc<SourceIdentity>,
sender: mpsc::UnboundedSender<FutureObj<'static, ()>>,
) {
if source.component_name.is_none() {
self.inner.lock().await.stats.record_unattributed();
}
while let Some(next) = stream.next().await {
match next {
Ok(LogSinkRequest::Connect { socket, control_handle }) => {
let forwarder = { self.inner.lock().await.legacy_forwarder.clone() };
match LogMessageSocket::new(socket, source.clone(), forwarder)
.context("creating log stream from socket")
{
Ok(log_stream) => {
self.try_add_interest_listener(&source, control_handle).await;
let fut =
FutureObj::new(Box::new(self.clone().drain_messages(log_stream)));
let res = sender.unbounded_send(fut);
if let Err(e) = res {
warn!("error queuing log message drain: {}", e);
}
}
Err(e) => {
control_handle.shutdown();
warn!("error creating socket from {:?}: {}", source, e)
}
};
}
Ok(LogSinkRequest::ConnectStructured { socket, control_handle }) => {
let forwarder = { self.inner.lock().await.structured_forwarder.clone() };
match LogMessageSocket::new_structured(socket, source.clone(), forwarder)
.context("creating log stream from socket")
{
Ok(log_stream) => {
self.try_add_interest_listener(&source, control_handle).await;
let fut =
FutureObj::new(Box::new(self.clone().drain_messages(log_stream)));
let res = sender.unbounded_send(fut);
if let Err(e) = res {
warn!("error queuing log message drain: {}", e);
}
}
Err(e) => {
control_handle.shutdown();
warn!("error creating socket from {:?}: {}", source, e)
}
};
}
Err(e) => error!("error handling log sink from {:?}: {}", source, e),
}
}
}
/// Drain a `LogMessageSocket` which wraps a socket from a component
/// generating logs.
async fn drain_messages<E>(self, mut log_stream: LogMessageSocket<E>)
where
E: Encoding + Unpin,
{
let component_log_stats = {
let inner = self.inner.lock().await;
inner.stats.get_component_log_stats(log_stream.source_url()).await
};
loop {
match log_stream.next().await {
Ok(message) => {
component_log_stats.record_log(&message);
self.ingest_message(message, stats::LogSource::LogSink).await;
}
Err(error::StreamError::Closed) => return,
Err(e) => {
self.inner.lock().await.stats.record_closed_stream();
warn!("closing socket from {:?}: {}", log_stream.source_url(), e);
return;
}
}
}
}
/// Add 'Interest' listener to connect the interest dispatcher to the
/// LogSinkControlHandle (weak reference) associated with the given source.
/// Interest listeners are only supported for log connections where the
/// SourceIdentity includes an attributed component name. If no component
/// name is present, this function will exit without adding any listener.
async fn try_add_interest_listener(
&self,
source: &Arc<SourceIdentity>,
control_handle: LogSinkControlHandle,
) {
if source.component_name.is_none() {
return;
}
let control_handle = Arc::new(control_handle);
let event_listener = control_handle.clone();
self.inner
.lock()
.await
.interest_dispatcher
.add_interest_listener(source, Arc::downgrade(&event_listener));
// ack successful connections with 'empty' interest
// for async clients
let _ = control_handle.send_on_register_interest(Interest::empty());
}
/// Handle the components v2 EventStream for attributed logs of v2
/// components.
pub async fn handle_event_stream(
self,
mut stream: fsys::EventStreamRequestStream,
sender: mpsc::UnboundedSender<FutureObj<'static, ()>>,
) {
while let Ok(Some(request)) = stream.try_next().await {
match request {
fsys::EventStreamRequest::OnEvent { event, .. } => {
if let Err(e) = self.handle_event(event, sender.clone()) {
error!("Unable to process event: {}", e);
}
}
}
}
}
/// Handle the components v2 CapabilityRequested event for attributed logs of
/// v2 components.
fn handle_event(
&self,
event: fsys::Event,
sender: mpsc::UnboundedSender<FutureObj<'static, ()>>,
) -> Result<(), Error> {
let identity = Self::source_identity_from_event(&event)?;
let stream = Self::log_sink_request_stream_from_event(event)?;
fasync::Task::spawn(self.clone().handle_log_sink(stream, identity, sender)).detach();
Ok(())
}
/// Extract the SourceIdentity from a components v2 event.
// TODO(fxbug.dev/54330): LogManager should have its own error type.
fn source_identity_from_event(event: &fsys::Event) -> Result<Arc<SourceIdentity>, Error> {
let target_moniker = event
.descriptor
.as_ref()
.and_then(|descriptor| descriptor.moniker.clone())
.ok_or(format_err!("No moniker present"))?;
let component_url = event
.descriptor
.as_ref()
.and_then(|descriptor| descriptor.component_url.clone())
.ok_or(format_err!("No URL present"))?;
let mut source = SourceIdentity::empty();
source.component_url = Some(component_url.clone());
source.component_name = Some(target_moniker.clone());
Ok(Arc::new(source))
}
/// Extract the LogSinkRequestStream from a CapabilityRequested v2 event.
// TODO(fxbug.dev/54330): LogManager should have its own error type.
fn log_sink_request_stream_from_event(
event: fsys::Event,
) -> Result<LogSinkRequestStream, Error> {
let payload =
event.event_result.ok_or(format_err!("Missing event result.")).and_then(|result| {
match result {
fsys::EventResult::Payload(fsys::EventPayload::CapabilityRequested(
payload,
)) => Ok(payload),
fsys::EventResult::Error(fsys::EventError {
description: Some(description),
..
}) => Err(format_err!("{}", description)),
_ => Err(format_err!("Incorrect event type.")),
}
})?;
let capability_path = payload.path.ok_or(format_err!("Missing capability path"))?;
if capability_path != format!("/svc/{}", LogSinkMarker::NAME) {
return Err(format_err!("Incorrect LogSink capability_path {}", capability_path));
}
let capability = payload.capability.ok_or(format_err!("Missing capability"))?;
let server_end = ServerEnd::<LogSinkMarker>::new(capability);
server_end.into_stream().map_err(|_| format_err!("Unable to create LogSinkRequestStream"))
}
/// Spawn a task to handle requests from components reading the shared log.
pub fn handle_log(self, stream: LogRequestStream, sender: mpsc::UnboundedSender<Task<()>>) {
if let Err(e) = sender.clone().unbounded_send(Task::spawn(async move {
if let Err(e) = self.handle_log_requests(stream, sender).await {
warn!("error handling Log requests: {}", e);
}
})) {
warn!("Couldn't queue listener task: {:?}", e);
}
}
/// Handle requests to `fuchsia.logger.Log`. All request types read the
/// whole backlog from memory, `DumpLogs(Safe)` stops listening after that.
async fn handle_log_requests(
self,
mut stream: LogRequestStream,
mut sender: mpsc::UnboundedSender<Task<()>>,
) -> Result<(), Error> {
while let Some(request) = stream.try_next().await? {
let (listener, options, dump_logs, selectors) = match request {
LogRequest::ListenSafe { log_listener, options, .. } => {
(log_listener, options, false, None)
}
LogRequest::DumpLogsSafe { log_listener, options, .. } => {
(log_listener, options, true, None)
}
LogRequest::ListenSafeWithSelectors {
log_listener, options, selectors, ..
} => (log_listener, options, false, Some(selectors)),
// TODO(fxbug.dev/48758) delete these methods!
LogRequest::Listen { log_listener, options, .. } => {
warn!("Use of fuchsia.logger.Log.Listen. Use ListenSafe.");
let listener = pretend_scary_listener_is_safe(log_listener)?;
(listener, options, false, None)
}
LogRequest::DumpLogs { log_listener, options, .. } => {
warn!("Use of fuchsia.logger.Log.DumpLogs. Use DumpLogsSafe.");
let listener = pretend_scary_listener_is_safe(log_listener)?;
(listener, options, true, None)
}
};
let listener = Listener::new(listener, options)?;
let mode =
if dump_logs { StreamMode::Snapshot } else { StreamMode::SnapshotThenSubscribe };
let logs = self.cursor(mode).await;
if let Some(s) = selectors {
self.inner.lock().await.interest_dispatcher.update_selectors(s).await;
}
sender.send(listener.spawn(logs, dump_logs)).await.ok();
}
Ok(())
}
pub async fn cursor(&self, mode: StreamMode) -> impl Stream<Item = Arc<Message>> {
self.inner.lock().await.log_msg_buffer.cursor(mode).map(|item| match item {
LazyItem::Next(m) => m,
LazyItem::ItemsDropped(n) => Arc::new(Message::for_dropped(n)),
})
}
/// Ingest an individual log message.
async fn ingest_message(&self, log_msg: Message, source: stats::LogSource) {
let mut inner = self.inner.lock().await;
trace!("Ingesting {:?}", log_msg.id);
// We always record the log before pushing onto the buffer and waking listeners because
// we want to be able to see that stats are updated as soon as we receive messages in tests.
inner.stats.record_log(&log_msg, source);
inner.log_msg_buffer.push(log_msg);
}
/// Stop accepting new messages, ensuring that pending Cursors return Poll::Ready(None) after
/// consuming any messages received before this call.
pub async fn terminate(&self) {
self.inner.lock().await.log_msg_buffer.terminate();
}
/// Initializes internal log forwarders.
pub fn forward_logs(self) {
fasync::Task::spawn(async move {
if let Err(e) = self.init_forwarders().await {
error!("couldn't forward logs: {:?}", e);
}
})
.detach();
debug!("Log forwarding initialized.");
}
async fn init_forwarders(self) -> Result<(), Error> {
let sink = fuchsia_component::client::connect_to_service::<LogSinkMarker>()?;
let mut inner = self.inner.lock().await;
let (send, recv) = zx::Socket::create(zx::SocketOpts::DATAGRAM)?;
sink.connect(recv)?;
inner.legacy_forwarder.init(send);
let (send, recv) = zx::Socket::create(zx::SocketOpts::DATAGRAM)?;
sink.connect_structured(recv)?;
inner.structured_forwarder.init(send);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::logs::{message::LegacySeverity, testing::*};
use diagnostics_data::{DROPPED_LABEL, MESSAGE_LABEL, PID_LABEL, TAG_LABEL, TID_LABEL};
use diagnostics_stream::{Argument, Record, Severity as StreamSeverity, Value};
use fidl_fuchsia_logger::{LogFilterOptions, LogLevelFilter, LogMessage, LogSinkMarker};
use fuchsia_inspect::assert_inspect_tree;
use fuchsia_zircon as zx;
use matches::assert_matches;
#[fasync::run_singlethreaded(test)]
async fn test_log_manager_simple() {
TestHarness::new().manager_test(false).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_log_manager_dump() {
TestHarness::new().manager_test(true).await;
}
#[fasync::run_singlethreaded(test)]
async fn unfiltered_stats() {
let first_packet = setup_default_packet();
let first_message = LogMessage {
pid: first_packet.metadata.pid,
tid: first_packet.metadata.tid,
time: first_packet.metadata.time,
dropped_logs: first_packet.metadata.dropped_logs,
severity: first_packet.metadata.severity,
msg: String::from("BBBBB"),
tags: vec![String::from("AAAAA")],
};
let (mut second_packet, mut second_message) = (first_packet.clone(), first_message.clone());
second_packet.metadata.pid = 0;
second_message.pid = second_packet.metadata.pid;
let (mut third_packet, mut third_message) = (second_packet.clone(), second_message.clone());
third_packet.metadata.severity = LogLevelFilter::Info.into_primitive().into();
third_message.severity = third_packet.metadata.severity;
let (fourth_packet, fourth_message) = (third_packet.clone(), third_message.clone());
let (mut fifth_packet, mut fifth_message) = (fourth_packet.clone(), fourth_message.clone());
fifth_packet.metadata.severity = LogLevelFilter::Error.into_primitive().into();
fifth_message.severity = fifth_packet.metadata.severity;
let mut harness = TestHarness::new();
let mut stream = harness.create_stream(Arc::new(SourceIdentity::empty()));
stream.write_packets(vec![
first_packet,
second_packet,
third_packet,
fourth_packet,
fifth_packet,
]);
let log_stats_tree = harness
.filter_test(
vec![first_message, second_message, third_message, fourth_message, fifth_message],
None,
)
.await;
assert_inspect_tree!(
log_stats_tree,
root: {
log_stats: {
total_logs: 5u64,
kernel_logs: 0u64,
logsink_logs: 5u64,
trace_logs: 0u64,
debug_logs: 0u64,
info_logs: 2u64,
warning_logs: 2u64,
error_logs: 1u64,
fatal_logs: 0u64,
closed_streams: 0u64,
unattributed_log_sinks: 1u64,
by_component: { "(unattributed)": contains {
total_logs: 5u64,
trace_logs: 0u64,
debug_logs: 0u64,
info_logs: 2u64,
warning_logs: 2u64,
error_logs: 1u64,
fatal_logs: 0u64,
} },
buffer_stats: {
rolled_out_entries: 0u64,
},
granular_stats: contains {
},
},
}
);
}
async fn attributed_inspect_two_streams_different_identities_by_reader(
mut harness: TestHarness,
log_reader1: Arc<dyn LogReader>,
log_reader2: Arc<dyn LogReader>,
) {
let mut packet = setup_default_packet();
let message = LogMessage {
pid: packet.metadata.pid,
tid: packet.metadata.tid,
time: packet.metadata.time,
dropped_logs: packet.metadata.dropped_logs,
severity: packet.metadata.severity,
msg: String::from("BBBBB"),
tags: vec![String::from("AAAAA")],
};
let mut packet2 = packet.clone();
packet2.metadata.severity = LogLevelFilter::Error.into_primitive().into();
let mut message2 = message.clone();
message2.severity = packet2.metadata.severity;
let mut foo_stream = harness.create_stream_from_log_reader(log_reader1.clone());
foo_stream.write_packet(&mut packet);
let mut bar_stream = harness.create_stream_from_log_reader(log_reader2.clone());
bar_stream.write_packet(&mut packet2);
let log_stats_tree = harness.filter_test(vec![message, message2], None).await;
assert_inspect_tree!(
log_stats_tree,
root: {
log_stats: {
total_logs: 2u64,
kernel_logs: 0u64,
logsink_logs: 2u64,
trace_logs: 0u64,
debug_logs: 0u64,
info_logs: 0u64,
warning_logs: 1u64,
error_logs: 1u64,
fatal_logs: 0u64,
closed_streams: 0u64,
unattributed_log_sinks: 0u64,
by_component: {
"http://foo.com": contains {
total_logs: 1u64,
trace_logs: 0u64,
debug_logs: 0u64,
info_logs: 0u64,
warning_logs: 1u64,
error_logs: 0u64,
fatal_logs: 0u64,
},
"http://bar.com": contains {
total_logs: 1u64,
trace_logs: 0u64,
debug_logs: 0u64,
info_logs: 0u64,
warning_logs: 0u64,
error_logs: 1u64,
fatal_logs: 0u64,
}
},
granular_stats: contains {
},
buffer_stats: {
rolled_out_entries: 0u64,
}
},
}
);
}
#[fasync::run_singlethreaded(test)]
async fn attributed_inspect_two_streams_different_identities() {
let harness = TestHarness::with_retained_sinks();
let log_reader1 = harness.create_default_reader(SourceIdentity {
component_name: Some("foo".into()),
component_url: Some("http://foo.com".into()),
instance_id: None,
realm_path: None,
});
let log_reader2 = harness.create_default_reader(SourceIdentity {
component_name: Some("bar".into()),
component_url: Some("http://bar.com".into()),
instance_id: None,
realm_path: None,
});
attributed_inspect_two_streams_different_identities_by_reader(
harness,
log_reader1,
log_reader2,
)
.await;
}
#[fasync::run_singlethreaded(test)]
async fn attributed_inspect_two_v2_streams_different_identities() {
let harness = TestHarness::with_retained_sinks();
let log_reader1 = harness.create_event_stream_reader("foo", "http://foo.com");
let log_reader2 = harness.create_event_stream_reader("bar", "http://bar.com");
attributed_inspect_two_streams_different_identities_by_reader(
harness,
log_reader1,
log_reader2,
)
.await;
}
#[fasync::run_singlethreaded(test)]
async fn attributed_inspect_two_mixed_streams_different_identities() {
let harness = TestHarness::with_retained_sinks();
let log_reader1 = harness.create_event_stream_reader("foo", "http://foo.com");
let log_reader2 = harness.create_default_reader(SourceIdentity {
component_name: Some("bar".into()),
component_url: Some("http://bar.com".into()),
instance_id: None,
realm_path: None,
});
attributed_inspect_two_streams_different_identities_by_reader(
harness,
log_reader1,
log_reader2,
)
.await;
}
#[fasync::run_singlethreaded(test)]
async fn source_identity_from_v2_event() {
let target_moniker = "foo".to_string();
let target_url = "http://foo.com".to_string();
let (_log_sink_proxy, log_sink_server_end) =
fidl::endpoints::create_proxy::<LogSinkMarker>().unwrap();
let event = create_capability_requested_event(
target_moniker.clone(),
target_url.clone(),
log_sink_server_end.into_channel(),
);
let identity = LogManager::source_identity_from_event(&event).unwrap();
assert_matches!(&identity.component_name,
Some(component_name) if *component_name == target_moniker);
assert_matches!(&identity.component_url,
Some(component_url) if *component_url == target_url);
}
#[fasync::run_singlethreaded(test)]
async fn log_sink_request_stream_from_v2_event() {
let target_moniker = "foo".to_string();
let target_url = "http://foo.com".to_string();
let (_log_sink_proxy, log_sink_server_end) =
fidl::endpoints::create_proxy::<LogSinkMarker>().unwrap();
let event = create_capability_requested_event(
target_moniker.clone(),
target_url.clone(),
log_sink_server_end.into_channel(),
);
LogManager::log_sink_request_stream_from_event(event).unwrap();
}
async fn attributed_inspect_two_streams_same_identity_by_reader(
mut harness: TestHarness,
log_reader1: Arc<dyn LogReader>,
log_reader2: Arc<dyn LogReader>,
) {
let mut packet = setup_default_packet();
let message = LogMessage {
pid: packet.metadata.pid,
tid: packet.metadata.tid,
time: packet.metadata.time,
dropped_logs: packet.metadata.dropped_logs,
severity: packet.metadata.severity,
msg: String::from("BBBBB"),
tags: vec![String::from("AAAAA")],
};
let mut packet2 = packet.clone();
packet2.metadata.severity = LogLevelFilter::Error.into_primitive().into();
let mut message2 = message.clone();
message2.severity = packet2.metadata.severity;
let mut foo_stream = harness.create_stream_from_log_reader(log_reader1.clone());
foo_stream.write_packet(&mut packet);
let mut bar_stream = harness.create_stream_from_log_reader(log_reader2.clone());
bar_stream.write_packet(&mut packet2);
let log_stats_tree = harness.filter_test(vec![message, message2], None).await;
assert_inspect_tree!(
log_stats_tree,
root: {
log_stats: {
total_logs: 2u64,
kernel_logs: 0u64,
logsink_logs: 2u64,
trace_logs: 0u64,
debug_logs: 0u64,
info_logs: 0u64,
warning_logs: 1u64,
error_logs: 1u64,
fatal_logs: 0u64,
closed_streams: 0u64,
unattributed_log_sinks: 0u64,
by_component: {
"http://foo.com": contains {
total_logs: 2u64,
trace_logs: 0u64,
debug_logs: 0u64,
info_logs: 0u64,
warning_logs: 1u64,
error_logs: 1u64,
fatal_logs: 0u64,
},
},
buffer_stats: {
rolled_out_entries: 0u64,
},
granular_stats: contains {
},
},
}
);
}
#[fasync::run_singlethreaded(test)]
async fn attributed_inspect_two_streams_same_identity() {
let harness = TestHarness::with_retained_sinks();
let log_reader = harness.create_default_reader(SourceIdentity {
component_name: Some("foo".into()),
component_url: Some("http://foo.com".into()),
instance_id: None,
realm_path: None,
});
attributed_inspect_two_streams_same_identity_by_reader(
harness,
log_reader.clone(),
log_reader.clone(),
)
.await;
}
#[fasync::run_singlethreaded(test)]
async fn attributed_inspect_two_v2_streams_same_identity() {
let harness = TestHarness::with_retained_sinks();
let log_reader = harness.create_event_stream_reader("foo", "http://foo.com");
attributed_inspect_two_streams_same_identity_by_reader(
harness,
log_reader.clone(),
log_reader.clone(),
)
.await;
}
#[fasync::run_singlethreaded(test)]
async fn attributed_inspect_two_mixed_streams_same_identity() {
let harness = TestHarness::with_retained_sinks();
let log_reader1 = harness.create_event_stream_reader("foo", "http://foo.com");
let log_reader2 = harness.create_default_reader(SourceIdentity {
component_name: Some("foo".into()),
component_url: Some("http://foo.com".into()),
instance_id: None,
realm_path: None,
});
attributed_inspect_two_streams_same_identity_by_reader(harness, log_reader1, log_reader2)
.await;
}
#[fasync::run_singlethreaded(test)]
async fn test_filter_by_pid() {
let p = setup_default_packet();
let mut p2 = p.clone();
p2.metadata.pid = 0;
let lm = LogMessage {
pid: p.metadata.pid,
tid: p.metadata.tid,
time: p.metadata.time,
dropped_logs: p.metadata.dropped_logs,
severity: p.metadata.severity,
msg: String::from("BBBBB"),
tags: vec![String::from("AAAAA")],
};
let options = LogFilterOptions {
filter_by_pid: true,
pid: 1,
filter_by_tid: false,
tid: 0,
min_severity: LogLevelFilter::None,
verbosity: 0,
tags: vec![],
};
let mut harness = TestHarness::new();
let mut stream = harness.create_stream(Arc::new(SourceIdentity::empty()));
stream.write_packets(vec![p, p2]);
harness.filter_test(vec![lm], Some(options)).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_filter_by_tid() {
let mut p = setup_default_packet();
p.metadata.pid = 0;
let mut p2 = p.clone();
p2.metadata.tid = 0;
let lm = LogMessage {
pid: p.metadata.pid,
tid: p.metadata.tid,
time: p.metadata.time,
dropped_logs: p.metadata.dropped_logs,
severity: p.metadata.severity,
msg: String::from("BBBBB"),
tags: vec![String::from("AAAAA")],
};
let options = LogFilterOptions {
filter_by_pid: false,
pid: 1,
filter_by_tid: true,
tid: 1,
min_severity: LogLevelFilter::None,
verbosity: 0,
tags: vec![],
};
let mut harness = TestHarness::new();
let mut stream = harness.create_stream(Arc::new(SourceIdentity::empty()));
stream.write_packets(vec![p, p2]);
harness.filter_test(vec![lm], Some(options)).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_filter_by_min_severity() {
let p = setup_default_packet();
let mut p2 = p.clone();
p2.metadata.pid = 0;
p2.metadata.tid = 0;
p2.metadata.severity = LogLevelFilter::Error.into_primitive().into();
let mut p3 = p.clone();
p3.metadata.severity = LogLevelFilter::Info.into_primitive().into();
let mut p4 = p.clone();
p4.metadata.severity = 0x70; // custom
let mut p5 = p.clone();
p5.metadata.severity = LogLevelFilter::Fatal.into_primitive().into();
let lm = LogMessage {
pid: p2.metadata.pid,
tid: p2.metadata.tid,
time: p2.metadata.time,
dropped_logs: p2.metadata.dropped_logs,
severity: p2.metadata.severity,
msg: String::from("BBBBB"),
tags: vec![String::from("AAAAA")],
};
let options = LogFilterOptions {
filter_by_pid: false,
pid: 1,
filter_by_tid: false,
tid: 1,
min_severity: LogLevelFilter::Error,
verbosity: 0,
tags: vec![],
};
let mut harness = TestHarness::new();
let mut stream = harness.create_stream(Arc::new(SourceIdentity::empty()));
stream.write_packets(vec![p, p2, p3, p4, p5]);
harness.filter_test(vec![lm], Some(options)).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_filter_by_combination() {
let mut p = setup_default_packet();
p.metadata.pid = 0;
p.metadata.tid = 0;
let mut p2 = p.clone();
p2.metadata.severity = LogLevelFilter::Error.into_primitive().into();
let mut p3 = p.clone();
p3.metadata.pid = 1;
let lm = LogMessage {
pid: p2.metadata.pid,
tid: p2.metadata.tid,
time: p2.metadata.time,
dropped_logs: p2.metadata.dropped_logs,
severity: p2.metadata.severity,
msg: String::from("BBBBB"),
tags: vec![String::from("AAAAA")],
};
let options = LogFilterOptions {
filter_by_pid: true,
pid: 0,
filter_by_tid: false,
tid: 1,
min_severity: LogLevelFilter::Error,
verbosity: 0,
tags: vec![],
};
let mut harness = TestHarness::new();
let mut stream = harness.create_stream(Arc::new(SourceIdentity::empty()));
stream.write_packets(vec![p, p2, p3]);
harness.filter_test(vec![lm], Some(options)).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_filter_by_tags() {
let mut p = setup_default_packet();
let mut p2 = p.clone();
// p tags - "DDDDD"
memset(&mut p.data[..], 1, 68, 5);
p2.metadata.pid = 0;
p2.metadata.tid = 0;
p2.data[6] = 5;
// p2 tag - "AAAAA", "BBBBB"
// p2 msg - "CCCCC"
memset(&mut p2.data[..], 13, 67, 5);
let lm1 = LogMessage {
pid: p.metadata.pid,
tid: p.metadata.tid,
time: p.metadata.time,
dropped_logs: p.metadata.dropped_logs,
severity: p.metadata.severity,
msg: String::from("BBBBB"),
tags: vec![String::from("DDDDD")],
};
let lm2 = LogMessage {
pid: p2.metadata.pid,
tid: p2.metadata.tid,
time: p2.metadata.time,
dropped_logs: p2.metadata.dropped_logs,
severity: p2.metadata.severity,
msg: String::from("CCCCC"),
tags: vec![String::from("AAAAA"), String::from("BBBBB")],
};
let options = LogFilterOptions {
filter_by_pid: false,
pid: 1,
filter_by_tid: false,
tid: 1,
min_severity: LogLevelFilter::None,
verbosity: 0,
tags: vec![String::from("BBBBB"), String::from("DDDDD")],
};
let mut harness = TestHarness::new();
let mut stream = harness.create_stream(Arc::new(SourceIdentity::empty()));
stream.write_packets(vec![p, p2]);
harness.filter_test(vec![lm1, lm2], Some(options)).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_structured_log() {
let logs = vec![
Record {
timestamp: 6,
severity: StreamSeverity::Info,
arguments: vec![Argument {
name: MESSAGE_LABEL.into(),
value: Value::Text("hi".to_string()),
}],
},
Record { timestamp: 14, severity: StreamSeverity::Error, arguments: vec![] },
Record {
timestamp: 19,
severity: StreamSeverity::Warn,
arguments: vec![
Argument { name: PID_LABEL.into(), value: Value::UnsignedInt(0x1d1) },
Argument { name: TID_LABEL.into(), value: Value::UnsignedInt(0x1d2) },
Argument { name: DROPPED_LABEL.into(), value: Value::UnsignedInt(23) },
Argument { name: TAG_LABEL.into(), value: Value::Text(String::from("tag")) },
Argument {
name: MESSAGE_LABEL.into(),
value: Value::Text(String::from("message")),
},
],
},
Record {
timestamp: 21,
severity: StreamSeverity::Warn,
arguments: vec![
Argument { name: TAG_LABEL.into(), value: Value::Text(String::from("tag-1")) },
Argument { name: TAG_LABEL.into(), value: Value::Text(String::from("tag-2")) },
],
},
];
let expected_logs = vec![
LogMessage {
pid: zx::sys::ZX_KOID_INVALID,
tid: zx::sys::ZX_KOID_INVALID,
time: 6,
severity: LegacySeverity::Info.for_listener(),
dropped_logs: 0,
msg: String::from("hi"),
tags: vec!["UNKNOWN".to_owned()],
},
LogMessage {
pid: zx::sys::ZX_KOID_INVALID,
tid: zx::sys::ZX_KOID_INVALID,
time: 14,
severity: LegacySeverity::Error.for_listener(),
dropped_logs: 0,
msg: String::from(""),
tags: vec!["UNKNOWN".to_owned()],
},
LogMessage {
pid: 0x1d1,
tid: 0x1d2,
time: 19,
severity: LegacySeverity::Warn.for_listener(),
dropped_logs: 23,
msg: String::from("message"),
tags: vec![String::from("tag")],
},
LogMessage {
pid: zx::sys::ZX_KOID_INVALID,
tid: zx::sys::ZX_KOID_INVALID,
time: 21,
severity: LegacySeverity::Warn.for_listener(),
dropped_logs: 0,
msg: String::from(""),
tags: vec![String::from("tag-1"), String::from("tag-2")],
},
];
let mut harness = TestHarness::new();
let mut stream = harness.create_structured_stream(Arc::new(SourceIdentity::empty()));
stream.write_packets(logs);
harness.filter_test(expected_logs, None).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_debuglog_drainer() {
let log1 = TestDebugEntry::new("log1".as_bytes());
let log2 = TestDebugEntry::new("log2".as_bytes());
let log3 = TestDebugEntry::new("log3".as_bytes());
let klog_reader = TestDebugLog::new();
klog_reader.enqueue_read_entry(&log1);
klog_reader.enqueue_read_entry(&log2);
// logs recieved after kernel indicates no logs should be read
klog_reader.enqueue_read_fail(zx::Status::SHOULD_WAIT);
klog_reader.enqueue_read_entry(&log3);
klog_reader.enqueue_read_fail(zx::Status::SHOULD_WAIT);
let expected_logs = vec![
LogMessage {
pid: log1.pid,
tid: log1.tid,
time: log1.timestamp,
dropped_logs: 0,
severity: fidl_fuchsia_logger::LogLevelFilter::Info as i32,
msg: String::from("log1"),
tags: vec![String::from("klog")],
},
LogMessage {
pid: log2.pid,
tid: log2.tid,
time: log2.timestamp,
dropped_logs: 0,
severity: fidl_fuchsia_logger::LogLevelFilter::Info as i32,
msg: String::from("log2"),
tags: vec![String::from("klog")],
},
LogMessage {
pid: log3.pid,
tid: log3.tid,
time: log3.timestamp,
dropped_logs: 0,
severity: fidl_fuchsia_logger::LogLevelFilter::Info as i32,
msg: String::from("log3"),
tags: vec![String::from("klog")],
},
];
let klog_stats_tree = debuglog_test(expected_logs, klog_reader).await;
assert_inspect_tree!(
klog_stats_tree,
root: {
log_stats: contains {
total_logs: 3u64,
kernel_logs: 3u64,
logsink_logs: 0u64,
trace_logs: 0u64,
debug_logs: 0u64,
info_logs: 3u64,
warning_logs: 0u64,
error_logs: 0u64,
fatal_logs: 0u64,
closed_streams: 0u64,
unattributed_log_sinks: 0u64,
by_component: {
"fuchsia-boot://klog": contains {
total_logs: 3u64,
trace_logs: 0u64,
debug_logs: 0u64,
info_logs: 3u64,
warning_logs: 0u64,
error_logs: 0u64,
fatal_logs: 0u64,
},
}
}
}
);
}
}