blob: 7e3cb5996fe922df01f41def75777c4b67d09347 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{
events::{
router::EventConsumer,
types::{Event, EventPayload, LogSinkRequestedPayload},
},
identity::ComponentIdentity,
logs::{
budget::BudgetManager,
container::LogsArtifactsContainer,
debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY},
multiplex::{Multiplexer, MultiplexerHandle},
stored_message::StoredMessage,
},
severity_filter::KlogSeverityFilter,
};
use diagnostics_data::LogsData;
use fidl_fuchsia_diagnostics::{LogInterestSelector, Selector, Severity, StreamMode};
use fuchsia_async as fasync;
use fuchsia_inspect as inspect;
use fuchsia_sync::{Mutex, RwLock};
use fuchsia_trace as ftrace;
use futures::channel::mpsc;
use futures::prelude::*;
use lazy_static::lazy_static;
use moniker::{Moniker, MonikerBase};
use selectors::SelectorExt;
use std::{
collections::{BTreeMap, HashMap},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use tracing::{debug, error};
lazy_static! {
pub static ref INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(0);
pub static ref OUR_MONIKER: Moniker = Moniker::parse_str("bootstrap/archivist").unwrap();
}
/// LogsRepository holds all diagnostics data and is a singleton wrapped by multiple
/// [`pipeline::Pipeline`]s in a given Archivist instance.
pub struct LogsRepository {
log_sender: RwLock<mpsc::UnboundedSender<fasync::Task<()>>>,
mutable_state: RwLock<LogsRepositoryState>,
/// Processes removal of components emitted by the budget handler. This is done to prevent a
/// deadlock. It's behind a mutex, but it's only set once. Holds an Arc<Self>.
component_removal_task: Mutex<Option<fasync::Task<()>>>,
}
impl LogsRepository {
pub fn new(logs_max_cached_original_bytes: u64, parent: &fuchsia_inspect::Node) -> Arc<Self> {
let (remover_snd, remover_rcv) = mpsc::unbounded();
let logs_budget = BudgetManager::new(logs_max_cached_original_bytes as usize, remover_snd);
let (log_sender, log_receiver) = mpsc::unbounded();
let this = Arc::new(LogsRepository {
mutable_state: RwLock::new(LogsRepositoryState::new(logs_budget, log_receiver, parent)),
log_sender: RwLock::new(log_sender),
component_removal_task: Mutex::new(None),
});
*this.component_removal_task.lock() = Some(fasync::Task::spawn(
Self::process_removal_of_components(remover_rcv, Arc::clone(&this)),
));
this
}
/// Drain the kernel's debug log. The returned future completes once
/// existing messages have been ingested.
pub fn drain_debuglog<K>(&self, klog_reader: K)
where
K: DebugLog + Send + Sync + 'static,
{
let mut mutable_state = self.mutable_state.write();
// We can only have one klog reader, if this is already set, it means we are already
// draining klog.
if mutable_state.drain_klog_task.is_some() {
return;
}
let container = mutable_state.get_log_container(KERNEL_IDENTITY.clone());
mutable_state.drain_klog_task = Some(fasync::Task::spawn(async move {
debug!("Draining debuglog.");
let mut kernel_logger =
DebugLogBridge::create(klog_reader, Arc::clone(&container.stats));
let mut messages = match kernel_logger.existing_logs() {
Ok(messages) => messages,
Err(e) => {
error!(%e, "failed to read from kernel log, important logs may be missing");
return;
}
};
messages.sort_by_key(|m| m.timestamp());
for message in messages {
container.ingest_message(Box::new(message));
}
let res = kernel_logger
.listen()
.try_for_each(|message| async {
container.ingest_message(Box::new(message));
Ok(())
})
.await;
if let Err(e) = res {
error!(%e, "failed to drain kernel log, important logs may be missing");
}
}));
}
pub fn logs_cursor(
&self,
mode: StreamMode,
selectors: Option<Vec<Selector>>,
parent_trace_id: ftrace::Id,
) -> impl Stream<Item = Arc<LogsData>> + Send + 'static {
let mut repo = self.mutable_state.write();
let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
let cursor = c.cursor(mode, parent_trace_id);
(Arc::clone(identity), cursor)
});
let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
repo.logs_multiplexers.add(mode, mpx_handle);
merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
merged
}
pub fn get_log_container(
&self,
identity: Arc<ComponentIdentity>,
) -> Arc<LogsArtifactsContainer> {
self.mutable_state.write().get_log_container(identity)
}
/// Stop accepting new messages, ensuring that pending Cursors return Poll::Ready(None) after
/// consuming any messages received before this call.
pub async fn wait_for_termination(&self) {
let receiver = self.mutable_state.write().log_receiver.take().unwrap();
receiver.for_each_concurrent(None, |rx| rx).await;
// Process messages from log sink.
debug!("Log ingestion stopped.");
let mut repo = self.mutable_state.write();
for container in repo.logs_data_store.values() {
container.terminate();
}
repo.logs_multiplexers.terminate();
debug!("Terminated logs");
repo.logs_budget.terminate();
}
/// Closes the connection in which new logger draining tasks are sent. No more logger tasks
/// will be accepted when this is called and we'll proceed to terminate logs.
pub fn stop_accepting_new_log_sinks(&self) {
self.log_sender.write().disconnect();
}
/// Returns an id to use for a new interest connection. Used by both LogSettings and Log, to
/// ensure shared uniqueness of their connections.
pub fn new_interest_connection(&self) -> usize {
INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
}
/// Updates log selectors associated with an interest connection.
pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
self.mutable_state.write().update_logs_interest(connection_id, selectors);
}
/// Indicates that the connection associated with the given ID is now done.
pub fn finish_interest_connection(&self, connection_id: usize) {
self.mutable_state.write().finish_interest_connection(connection_id);
}
async fn process_removal_of_components(
mut removal_requests: mpsc::UnboundedReceiver<Arc<ComponentIdentity>>,
logs_repo: Arc<LogsRepository>,
) {
while let Some(identity) = removal_requests.next().await {
let mut repo = logs_repo.mutable_state.write();
if !repo.is_live(&identity) {
debug!(%identity, "Removing component from repository.");
repo.remove(&identity);
}
}
}
#[cfg(test)]
pub(crate) fn default() -> Arc<Self> {
LogsRepository::new(
crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES,
&Default::default(),
)
}
}
impl EventConsumer for LogsRepository {
fn handle(self: Arc<Self>, event: Event) {
match event.payload {
EventPayload::LogSinkRequested(LogSinkRequestedPayload {
component,
request_stream,
}) => {
debug!(identity = %component, "LogSink requested.");
let container = self.get_log_container(component);
container.handle_log_sink(request_stream, self.log_sender.read().clone());
}
_ => unreachable!("Archivist state just subscribes to log sink requested"),
}
}
}
pub struct LogsRepositoryState {
logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
inspect_node: inspect::Node,
/// Receives the logger tasks. This will be taken once in wait for termination hence why it's
/// an option.
log_receiver: Option<mpsc::UnboundedReceiver<fasync::Task<()>>>,
/// A reference to the budget manager, kept to be passed to containers.
logs_budget: BudgetManager,
/// BatchIterators for logs need to be made aware of new components starting and their logs.
logs_multiplexers: MultiplexerBroker,
/// Interest registrations that we have received through fuchsia.logger.Log/ListWithSelectors
/// or through fuchsia.logger.LogSettings/SetInterest.
interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
/// The task draining klog and routing to syslog.
drain_klog_task: Option<fasync::Task<()>>,
}
impl LogsRepositoryState {
fn new(
logs_budget: BudgetManager,
log_receiver: mpsc::UnboundedReceiver<fasync::Task<()>>,
parent: &fuchsia_inspect::Node,
) -> Self {
Self {
inspect_node: parent.create_child("log_sources"),
logs_data_store: HashMap::new(),
logs_budget,
log_receiver: Some(log_receiver),
logs_multiplexers: MultiplexerBroker::new(),
interest_registrations: BTreeMap::new(),
drain_klog_task: None,
}
}
/// Returns a container for logs artifacts, constructing one and adding it to the trie if
/// necessary.
pub fn get_log_container(
&mut self,
identity: Arc<ComponentIdentity>,
) -> Arc<LogsArtifactsContainer> {
match self.logs_data_store.get(&identity) {
None => {
let container = Arc::new(LogsArtifactsContainer::new(
Arc::clone(&identity),
self.interest_registrations.values().flat_map(|s| s.iter()),
&self.inspect_node,
self.logs_budget.handle(),
));
self.logs_budget.add_container(Arc::clone(&container));
self.logs_data_store.insert(identity, Arc::clone(&container));
self.logs_multiplexers.send(&container);
container
}
Some(existing) => Arc::clone(existing),
}
}
fn is_live(&self, identity: &Arc<ComponentIdentity>) -> bool {
match self.logs_data_store.get(identity) {
Some(container) => container.should_retain(),
None => false,
}
}
/// Updates our own log interest if we are the root Archivist and logging
/// to klog.
fn maybe_update_own_logs_interest(
&mut self,
selectors: &[LogInterestSelector],
clear_interest: bool,
) {
tracing::dispatcher::get_default(|dispatcher| {
let Some(publisher) = dispatcher.downcast_ref::<KlogSeverityFilter>() else {
return;
};
let lowest_selector = selectors
.iter()
.filter(|selector| {
OUR_MONIKER.matches_component_selector(&selector.selector).unwrap_or(false)
})
.min_by_key(|selector| selector.interest.min_severity.unwrap_or(Severity::Info));
if let Some(selector) = lowest_selector {
if clear_interest {
*publisher.min_severity.write() = Severity::Info;
} else {
*publisher.min_severity.write() =
selector.interest.min_severity.unwrap_or(Severity::Info);
}
}
});
}
fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
self.maybe_update_own_logs_interest(&selectors, false);
let previous_selectors =
self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
// unwrap safe, we just inserted.
let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
for logs_data in self.logs_data_store.values() {
logs_data.update_interest(new_selectors.iter(), &previous_selectors);
}
}
pub fn finish_interest_connection(&mut self, connection_id: usize) {
let selectors = self.interest_registrations.remove(&connection_id);
if let Some(selectors) = selectors {
self.maybe_update_own_logs_interest(&selectors, true);
for logs_data in self.logs_data_store.values() {
logs_data.reset_interest(&selectors);
}
}
}
pub fn remove(&mut self, identity: &Arc<ComponentIdentity>) {
self.logs_data_store.remove(identity);
}
}
type LiveIteratorsMap = HashMap<usize, (StreamMode, MultiplexerHandle<Arc<LogsData>>)>;
/// Ensures that BatchIterators get access to logs from newly started components.
pub struct MultiplexerBroker {
live_iterators: Arc<Mutex<LiveIteratorsMap>>,
cleanup_sender: mpsc::UnboundedSender<usize>,
_live_iterators_cleanup_task: fasync::Task<()>,
}
impl MultiplexerBroker {
fn new() -> Self {
let (cleanup_sender, mut receiver) = mpsc::unbounded();
let live_iterators = Arc::new(Mutex::new(HashMap::new()));
let live_iterators_clone = Arc::clone(&live_iterators);
Self {
live_iterators,
cleanup_sender,
_live_iterators_cleanup_task: fasync::Task::spawn(async move {
while let Some(id) = receiver.next().await {
live_iterators_clone.lock().remove(&id);
}
}),
}
}
fn cleanup_sender(&self) -> mpsc::UnboundedSender<usize> {
self.cleanup_sender.clone()
}
/// A new BatchIterator has been created and must be notified when future log containers are
/// created.
fn add(&mut self, mode: StreamMode, recipient: MultiplexerHandle<Arc<LogsData>>) {
match mode {
// snapshot streams only want to know about what's currently available
StreamMode::Snapshot => recipient.close(),
StreamMode::SnapshotThenSubscribe | StreamMode::Subscribe => {
self.live_iterators.lock().insert(recipient.multiplexer_id(), (mode, recipient));
}
}
}
/// Notify existing BatchIterators of a new logs container so they can include its messages
/// in their results.
pub fn send(&mut self, container: &Arc<LogsArtifactsContainer>) {
self.live_iterators.lock().retain(|_, (mode, recipient)| {
recipient.send(
Arc::clone(&container.identity),
container.cursor(*mode, recipient.parent_trace_id()),
)
});
}
/// Notify all multiplexers to terminate their streams once sub streams have terminated.
fn terminate(&mut self) {
for (_, (_, recipient)) in self.live_iterators.lock().drain() {
recipient.close();
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::logs::stored_message::{GenericStoredMessage, StructuredStoredMessage},
diagnostics_log_encoding::{
encode::Encoder, Argument, Record, Severity as StreamSeverity, Value,
},
moniker::ExtendedMoniker,
selectors::FastError,
std::{io::Cursor, time::Duration},
};
#[fuchsia::test]
async fn data_repo_filters_logs_by_selectors() {
let repo = LogsRepository::default();
let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
ExtendedMoniker::parse_str("./foo").unwrap(),
"fuchsia-pkg://foo",
)));
let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
ExtendedMoniker::parse_str("./bar").unwrap(),
"fuchsia-pkg://bar",
)));
foo_container.ingest_message(make_message("a", 1));
bar_container.ingest_message(make_message("b", 2));
foo_container.ingest_message(make_message("c", 3));
let stream = repo.logs_cursor(StreamMode::Snapshot, None, ftrace::Id::random());
let results =
stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
let filtered_stream = repo.logs_cursor(
StreamMode::Snapshot,
Some(vec![selectors::parse_selector::<FastError>("foo:root").unwrap()]),
ftrace::Id::random(),
);
let results =
filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
}
#[fuchsia::test]
async fn multiplexer_broker_cleanup() {
let repo = LogsRepository::default();
let stream =
repo.logs_cursor(StreamMode::SnapshotThenSubscribe, None, ftrace::Id::random());
assert_eq!(repo.mutable_state.read().logs_multiplexers.live_iterators.lock().len(), 1);
// When the multiplexer goes away it must be forgotten by the broker.
drop(stream);
loop {
fasync::Timer::new(Duration::from_millis(100)).await;
if repo.mutable_state.read().logs_multiplexers.live_iterators.lock().len() == 0 {
break;
}
}
}
fn make_message(msg: &str, timestamp: i64) -> GenericStoredMessage {
let record = Record {
timestamp,
severity: StreamSeverity::Debug.into_primitive(),
arguments: vec![
Argument { name: "pid".to_string(), value: Value::UnsignedInt(1) },
Argument { name: "tid".to_string(), value: Value::UnsignedInt(2) },
Argument { name: "message".to_string(), value: Value::Text(msg.to_string()) },
],
};
let mut buffer = Cursor::new(vec![0u8; 1024]);
let mut encoder = Encoder::new(&mut buffer);
encoder.write_record(&record).unwrap();
let encoded = &buffer.get_ref()[..buffer.position() as usize];
StructuredStoredMessage::create(encoded.to_vec(), Default::default())
}
}