blob: 0f331aca50148659655b77a04b80fcb3e0bdbb96 [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
use crate::PublishOptions;
use fidl_fuchsia_diagnostics::{Interest, Severity};
use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy};
use fuchsia_async as fasync;
use fuchsia_component::client::connect_to_protocol;
use fuchsia_zircon::{self as zx};
use std::{cell::Cell, collections::HashSet, fmt::Debug, ops::Deref, sync::OnceLock};
use thiserror::Error;
use tracing::{
span::{Attributes, Id, Record},
subscriber::Subscriber,
Event, Metadata,
};
use tracing_core::span::Current;
use tracing_subscriber::{layer::Layered, prelude::*, registry::Registry};
mod filter;
mod sink;
use filter::InterestFilter;
use sink::{Sink, SinkConfig};
pub use diagnostics_log_encoding::{encode::TestRecord, Metatag};
pub use paste::paste;
#[cfg(test)]
use std::{
sync::atomic::{AtomicI64, Ordering},
time::Duration,
};
/// Callback for interest listeners
pub trait OnInterestChanged {
/// Callback for when the interest changes
fn on_changed(&self, severity: &Severity);
}
/// Options to configure a `Publisher`.
pub struct PublisherOptions<'t> {
blocking: bool,
pub(crate) interest: Interest,
listen_for_interest_updates: bool,
log_sink_proxy: Option<LogSinkProxy>,
pub(crate) metatags: HashSet<Metatag>,
pub(crate) tags: &'t [&'t str],
wait_for_initial_interest: bool,
}
impl<'t> Default for PublisherOptions<'t> {
fn default() -> Self {
Self {
blocking: false,
interest: Interest::default(),
listen_for_interest_updates: true,
log_sink_proxy: None,
metatags: HashSet::new(),
tags: &[],
wait_for_initial_interest: true,
}
}
}
impl PublisherOptions<'_> {
/// Creates a `PublishOptions` with all sets either empty or set to false. This is
/// useful when fine grain control of `Publisher` and its behavior is necessary.
///
/// However, for the majority of binaries that "just want to log",
/// `PublishOptions::default` is preferred as that brings all the default
/// configuration that is desired in most scenarios.
pub fn empty() -> Self {
Self {
blocking: false,
interest: Interest::default(),
listen_for_interest_updates: false,
log_sink_proxy: None,
metatags: HashSet::new(),
tags: &[],
wait_for_initial_interest: false,
}
}
}
macro_rules! publisher_options {
($(($name:ident, $self:ident, $($self_arg:ident),*)),*) => {
$(
impl<'t> $name<'t> {
/// Whether or not to block on initial runtime interest being received before
/// starting to emit log records using the default interest configured.
///
/// It's recommended that this is set when
/// developing to guarantee that a dynamically configured minimum severity makes it
/// to the component before it starts emitting logs.
///
/// Default: true.
pub fn wait_for_initial_interest(mut $self, enable: bool) -> Self {
let this = &mut $self$(.$self_arg)*;
this.wait_for_initial_interest = enable;
$self
}
/// When set, a `fuchsia_async::Task` will be spawned and held that will be
/// listening for interest changes.
///
/// Default: true
pub fn listen_for_interest_updates(mut $self, enable: bool) -> Self {
let this = &mut $self$(.$self_arg)*;
this.listen_for_interest_updates = enable;
$self
}
/// Sets the `LogSink` that will be used.
///
/// Default: the `fuchsia.logger.LogSink` available in the incoming namespace.
pub fn use_log_sink(mut $self, proxy: LogSinkProxy) -> Self {
let this = &mut $self$(.$self_arg)*;
this.log_sink_proxy = Some(proxy);
$self
}
/// When set to true, writes to the log socket will be blocking. This is, we'll
/// retry every time the socket buffer is full until we are able to write the log.
///
/// Default: false
pub fn blocking(mut $self, is_blocking: bool) -> Self {
let this = &mut $self$(.$self_arg)*;
this.blocking = is_blocking;
$self
}
}
)*
};
}
publisher_options!((PublisherOptions, self,), (PublishOptions, self, publisher));
fn initialize_publishing(opts: PublishOptions<'_>) -> Result<Publisher, PublishError> {
let publisher = Publisher::new(opts.publisher)?;
if opts.ingest_log_events {
crate::ingest_log_events()?;
}
if opts.install_panic_hook {
crate::install_panic_hook();
}
Ok(publisher)
}
/// Initializes logging with the given options.
///
/// IMPORTANT: this should be called at most once in a program, and must be
/// called only after an async executor has been set for the current thread,
/// otherwise it'll return errors or panic. Therefore it's recommended to never
/// call this from libraries and only do it from binaries.
pub fn initialize(opts: PublishOptions<'_>) -> Result<(), PublishError> {
let publisher = initialize_publishing(opts)?;
tracing::subscriber::set_global_default(publisher)?;
Ok(())
}
/// Sets the global minimum log severity.
/// IMPORTANT: this function can panic if `initialize` wasn't called before.
pub fn set_minimum_severity(severity: Severity) {
tracing::dispatcher::get_default(|dispatcher| {
let publisher: &Publisher = dispatcher.downcast_ref().unwrap();
let filter: &InterestFilter =
(&*publisher.inner as &dyn Subscriber).downcast_ref().unwrap();
filter.set_minimum_severity(severity);
});
}
struct AbortAndJoinOnDrop(
Option<futures::future::AbortHandle>,
Option<std::thread::JoinHandle<()>>,
);
impl Drop for AbortAndJoinOnDrop {
fn drop(&mut self) {
if let Some(handle) = &mut self.0 {
handle.abort();
}
self.1.take().unwrap().join().unwrap();
}
}
/// Initializes logging with the given options.
///
/// This must be used when working in an environment where a [`fuchsia_async::Executor`] can't be
/// used.
///
/// IMPORTANT: this should be called at most once in a program, and must be
/// called only after an async executor has been set for the current thread,
/// otherwise it'll return errors or panic. Therefore it's recommended to never
/// call this from libraries and only do it from binaries.
pub fn initialize_sync(opts: PublishOptions<'_>) -> impl Drop {
let (send, recv) = std::sync::mpsc::channel();
let (ready_send, ready_recv) = {
let (snd, rcv) = std::sync::mpsc::channel();
if opts.publisher.wait_for_initial_interest {
(Some(snd), Some(rcv))
} else {
(None, None)
}
};
let PublishOptions {
publisher:
PublisherOptions {
blocking,
interest,
metatags,
listen_for_interest_updates,
log_sink_proxy,
tags,
wait_for_initial_interest,
},
ingest_log_events,
install_panic_hook,
} = opts;
let tags = tags.into_iter().map(|s| s.to_string()).collect::<Vec<_>>();
let bg_thread = std::thread::spawn(move || {
let options = PublishOptions {
publisher: PublisherOptions {
interest,
metatags,
tags: &tags.iter().map(String::as_ref).collect::<Vec<_>>(),
listen_for_interest_updates,
log_sink_proxy,
wait_for_initial_interest,
blocking,
},
ingest_log_events,
install_panic_hook,
};
let mut exec = fuchsia_async::LocalExecutor::new();
let mut publisher = initialize_publishing(options).expect("initialize logging");
if let Some(ready_send) = ready_send {
ready_send.send(()).unwrap();
}
let interest_listening_task = publisher.take_interest_listening_task();
tracing::subscriber::set_global_default(publisher).expect("set global tracing subscriber");
if let Some(on_interest_changes) = interest_listening_task {
let (on_interest_changes, cancel_interest) =
futures::future::abortable(on_interest_changes);
send.send(cancel_interest).unwrap();
drop(send);
exec.run_singlethreaded(on_interest_changes).ok();
}
});
if let Some(ready_recv) = ready_recv {
let _ = ready_recv.recv();
}
AbortAndJoinOnDrop(recv.recv().map_or(None, |value| Some(value)), Some(bg_thread))
}
/// This custom Lazy implementation can be replaced by LazyLock once that is stabilized:
/// https://github.com/rust-lang/rust/issues/109736
struct Lazy<T, F> {
cell: OnceLock<T>,
init: Cell<Option<F>>,
}
// SAFETY: We never create a `&F` from a `&Lazy<T, F>` so it is fine to not impl `Sync`
// for `F`. We do create a `&mut Option<F>` in `deref`, but that is properly synchronized
// under `get_or_init`, so is safe.
unsafe impl<T, F: Send> Sync for Lazy<T, F> where OnceLock<T>: Sync {}
impl<T, F: FnOnce() -> T> Lazy<T, F> {
fn new(f: F) -> Self {
Self { cell: OnceLock::new(), init: Cell::new(Some(f)) }
}
}
impl<T, F: FnOnce() -> T> Deref for Lazy<T, F> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.cell.get_or_init(|| match self.init.take() {
Some(f) => f(),
None => panic!("Lazy poisoned"),
})
}
}
/// A `Publisher` acts as broker, implementing [`tracing::Subscriber`] to receive diagnostic
/// events from a component, and then forwarding that data on to a diagnostics service.
pub struct Publisher {
inner: Lazy<
Layered<InterestFilter, Layered<Sink, Registry>>,
Box<dyn FnOnce() -> Layered<InterestFilter, Layered<Sink, Registry>> + Send>,
>,
interest_listening_task: Option<fasync::Task<()>>,
}
impl Default for Publisher {
fn default() -> Self {
Self::new(PublisherOptions::default()).expect("failed to create Publisher")
}
}
impl Publisher {
/// Construct a new `Publisher` using the given options.
///
/// If options such as `install_panic_hook` and `ingest_log_events` are enabled, then this
/// constructor should be called only once.
pub fn new(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
let proxy = match opts.log_sink_proxy {
Some(log_sink) => log_sink,
None => connect_to_protocol::<LogSinkMarker>()
.map_err(|e| e.to_string())
.map_err(PublishError::LogSinkConnect)?,
};
let sink = Sink::new(
&proxy,
SinkConfig {
tags: opts.tags.into_iter().map(|s| s.to_string()).collect(),
metatags: opts.metatags,
retry_on_buffer_full: opts.blocking,
},
)?;
let (filter, on_change) =
InterestFilter::new(proxy, opts.interest, opts.wait_for_initial_interest);
let interest_listening_task = if opts.listen_for_interest_updates {
Some(fasync::Task::spawn(on_change))
} else {
None
};
Ok(Self {
inner: Lazy::new(Box::new(move || Registry::default().with(sink).with(filter))),
interest_listening_task,
})
}
// TODO(https://fxbug.dev/42150573) delete this and make Publisher private
/// Publish the provided event for testing.
pub fn event_for_testing(&self, record: TestRecord<'_>) {
let filter: &InterestFilter = (&*self.inner as &dyn Subscriber).downcast_ref().unwrap();
if filter.enabled_for_testing(&record) {
let sink: &Sink = (&*self.inner as &dyn Subscriber).downcast_ref().unwrap();
sink.event_for_testing(record);
}
}
/// Registers an interest listener
pub fn set_interest_listener<T>(&self, listener: T)
where
T: OnInterestChanged + Send + Sync + 'static,
{
let filter: &InterestFilter = (&*self.inner as &dyn Subscriber).downcast_ref().unwrap();
filter.set_interest_listener(listener);
}
/// Takes the task listening for interest changes if one exists.
pub fn take_interest_listening_task(&mut self) -> Option<fasync::Task<()>> {
self.interest_listening_task.take()
}
}
impl Subscriber for Publisher {
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
self.inner.enabled(metadata)
}
fn new_span(&self, span: &Attributes<'_>) -> Id {
self.inner.new_span(span)
}
fn record(&self, span: &Id, values: &Record<'_>) {
self.inner.record(span, values)
}
fn record_follows_from(&self, span: &Id, follows: &Id) {
self.inner.record_follows_from(span, follows)
}
fn event(&self, event: &Event<'_>) {
self.inner.event(event)
}
fn enter(&self, span: &Id) {
self.inner.enter(span)
}
fn exit(&self, span: &Id) {
self.inner.exit(span)
}
fn register_callsite(
&self,
metadata: &'static Metadata<'static>,
) -> tracing::subscriber::Interest {
self.inner.register_callsite(metadata)
}
fn clone_span(&self, id: &Id) -> Id {
self.inner.clone_span(id)
}
fn try_close(&self, id: Id) -> bool {
self.inner.try_close(id)
}
fn current_span(&self) -> Current {
self.inner.current_span()
}
}
/// Errors arising while forwarding a diagnostics stream to the environment.
#[derive(Debug, Error)]
pub enum PublishError {
/// Connection to fuchsia.logger.LogSink failed.
#[error("failed to connect to fuchsia.logger.LogSink ({0})")]
LogSinkConnect(String),
/// Couldn't create a new socket.
#[error("failed to create a socket for logging")]
MakeSocket(#[source] zx::Status),
/// An issue with the LogSink channel or socket prevented us from sending it to the `LogSink`.
#[error("failed to send a socket to the LogSink")]
SendSocket(#[source] fidl::Error),
/// Setting the default global [`tracing::Subscriber`] failed.
#[error("failed to install forwarder as the global default")]
SetGlobalDefault(#[from] tracing::subscriber::SetGlobalDefaultError),
/// Installing a forwarder from [`log`] macros to [`tracing`] macros failed.
#[error("failed to install a forwarder from `log` to `tracing`")]
InitLogForward(#[from] tracing_log::log_tracer::SetLoggerError),
}
#[cfg(test)]
static CURRENT_TIME_NANOS: AtomicI64 = AtomicI64::new(Duration::from_secs(10).as_nanos() as i64);
/// Increments the test clock.
#[cfg(test)]
pub fn increment_clock(duration: Duration) {
CURRENT_TIME_NANOS.fetch_add(duration.as_nanos() as i64, Ordering::SeqCst);
}
/// Gets the current monotonic time in nanoseconds.
#[doc(hidden)]
pub fn get_now() -> i64 {
#[cfg(not(test))]
return zx::Time::get_monotonic().into_nanos();
#[cfg(test)]
CURRENT_TIME_NANOS.load(Ordering::Relaxed)
}
/// Logs every N seconds using an Atomic variable
/// to keep track of the time. This will have a higher
/// performance impact on ARM compared to regular logging due to the use
/// of an atomic.
#[macro_export]
macro_rules! log_every_n_seconds {
($seconds:expr, $severity:expr, $($arg:tt)*) => {
use std::{time::Duration, sync::atomic::{Ordering, AtomicI64}};
use diagnostics_log::{paste, fuchsia::get_now};
let now = get_now();
static LAST_LOG_TIMESTAMP: AtomicI64 = AtomicI64::new(0);
if now - LAST_LOG_TIMESTAMP.load(Ordering::Acquire) >= Duration::from_secs($seconds).as_nanos() as i64 {
paste! {
tracing::[< $severity:lower >]!($($arg)*);
}
LAST_LOG_TIMESTAMP.store(now, Ordering::Release);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use diagnostics_reader::{ArchiveReader, Logs};
use futures::{future, StreamExt};
use itertools::Itertools;
use tracing::{debug, info, info_span};
#[fuchsia::test(logging = false)]
async fn verify_setting_minimum_log_severity() {
let reader = ArchiveReader::new();
let (logs, _) = reader.snapshot_then_subscribe::<Logs>().unwrap().split_streams();
let publisher = Publisher::new(PublisherOptions {
tags: &["verify_setting_minimum_log_severity"],
..PublisherOptions::empty()
})
.expect("initialized tracing");
tracing::subscriber::with_default(publisher, || {
info!("I'm an info log");
debug!("I'm a debug log and won't show up");
set_minimum_severity(Severity::Debug);
debug!("I'm a debug log and I show up");
});
let results = logs
.filter(|data| {
future::ready(
data.tags().unwrap().iter().any(|t| t == "verify_setting_minimum_log_severity"),
)
})
.take(2)
.collect::<Vec<_>>()
.await;
assert_eq!(results[0].msg().unwrap(), "I'm an info log");
assert_eq!(results[1].msg().unwrap(), "I'm a debug log and I show up");
}
#[fuchsia::test]
async fn verify_nested_spans() {
let reader = ArchiveReader::new();
let (logs, _) = reader.snapshot_then_subscribe::<Logs>().unwrap().split_streams();
let s1 = info_span!("", key = "span1");
info!("Log with no span 1");
{
let _s1_guard = s1.enter();
info!("Log with s1");
{
let s2 = info_span!("", other = "span2");
let _s2_guard = s2.enter();
info!("Log with s1 and s2");
}
info!("Second log with s1");
}
info!("Log with no span 2");
let results = logs
.filter(|data| {
future::ready(data.tags().unwrap().iter().any(|t| t == "verify_nested_spans"))
})
.take(5)
.collect::<Vec<_>>()
.await;
assert_eq!(results[0].msg().unwrap(), "Log with no span 1");
assert!(results[0].payload_keys_strings().collect::<Vec<_>>().is_empty());
assert_eq!(results[1].msg().unwrap(), "Log with s1");
assert_eq!(
results[1].payload_keys_strings().collect::<Vec<_>>(),
vec!["key=span1".to_string()]
);
assert_eq!(results[2].msg().unwrap(), "Log with s1 and s2");
assert_eq!(
results[2].payload_keys_strings().sorted().collect::<Vec<_>>(),
vec!["key=span1".to_string(), "other=span2".to_string()]
);
assert_eq!(results[3].msg().unwrap(), "Second log with s1");
assert_eq!(
results[3].payload_keys_strings().collect::<Vec<_>>(),
vec!["key=span1".to_string()]
);
assert_eq!(results[4].msg().unwrap(), "Log with no span 2");
assert!(results[4].payload_keys_strings().collect::<Vec<_>>().is_empty());
}
#[fuchsia::test]
async fn verify_sibling_spans_nested_scopes() {
let reader = ArchiveReader::new();
let (logs, _) = reader.snapshot_then_subscribe::<Logs>().unwrap().split_streams();
let s1 = info_span!("", key = "span1");
let s2 = info_span!("", other = "span2");
info!("Log with no span 1");
{
let _s1_guard = s1.enter();
info!("Log with s1");
{
let _s2_guard = s2.enter();
info!("Log with s2 only");
}
info!("Second log with s1");
}
info!("Log with no span 2");
let results = logs
.filter(|data| {
future::ready(
data.tags().unwrap().iter().any(|t| t == "verify_sibling_spans_nested_scopes"),
)
})
.take(5)
.collect::<Vec<_>>()
.await;
assert_eq!(results[0].msg().unwrap(), "Log with no span 1");
assert!(results[0].payload_keys_strings().collect::<Vec<_>>().is_empty());
assert_eq!(results[1].msg().unwrap(), "Log with s1");
assert_eq!(
results[1].payload_keys_strings().collect::<Vec<_>>(),
vec!["key=span1".to_string()]
);
assert_eq!(results[2].msg().unwrap(), "Log with s2 only");
assert_eq!(
results[2].payload_keys_strings().sorted().collect::<Vec<_>>(),
vec!["other=span2".to_string()]
);
assert_eq!(results[3].msg().unwrap(), "Second log with s1");
assert_eq!(
results[3].payload_keys_strings().collect::<Vec<_>>(),
vec!["key=span1".to_string()]
);
assert_eq!(results[4].msg().unwrap(), "Log with no span 2");
assert!(results[4].payload_keys_strings().collect::<Vec<_>>().is_empty());
}
#[fuchsia::test]
async fn verify_sibling_spans_multithreaded() {
let reader = ArchiveReader::new();
let (logs, _) = reader.snapshot_then_subscribe::<Logs>().unwrap().split_streams();
let total_threads = 300;
for i in 0..total_threads {
std::thread::spawn(move || {
let s = info_span!("", thread = i);
let _s_guard = s.enter();
info!("Log from thread");
});
}
let mut results = logs
.filter(|data| {
future::ready(
data.tags().unwrap().iter().any(|t| t == "verify_sibling_spans_multithreaded"),
)
})
.take(total_threads);
let mut seen = vec![];
while let Some(log) = results.next().await {
assert_eq!(log.msg().unwrap(), "Log from thread");
let hierarchy = log.payload_keys().unwrap();
assert_eq!(hierarchy.properties.len(), 1);
assert_eq!(hierarchy.properties[0].name(), "thread");
seen.push(hierarchy.properties[0].uint().unwrap() as usize);
}
seen.sort();
assert_eq!(seen, (0..total_threads).collect::<Vec<_>>());
}
}