blob: 52e19d0a284688f925d044745671883813602a98 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
accessor::ArchiveAccessor,
archive, configs, constants,
constants::INSPECT_LOG_WINDOW_SIZE,
container::ComponentIdentity,
diagnostics,
events::{
source_registry::EventSourceRegistry,
sources::{StaticEventStream, UnattributedLogSinkSource},
types::{
ComponentEvent, ComponentEventStream, DiagnosticsReadyEvent, EventMetadata,
EventSource,
},
},
logs::{budget::BudgetManager, redact::Redactor, socket::LogMessageSocket},
pipeline::Pipeline,
repository::DataRepo,
},
anyhow::Error,
fidl::{endpoints::RequestStream, AsyncChannel},
fidl_fuchsia_diagnostics::Selector,
fidl_fuchsia_diagnostics_test::{ControllerRequest, ControllerRequestStream},
fidl_fuchsia_process_lifecycle::{LifecycleRequest, LifecycleRequestStream},
fidl_fuchsia_sys2::EventSourceMarker,
fuchsia_async::{self as fasync, Task},
fuchsia_component::{
client::connect_to_service,
server::{ServiceFs, ServiceObj, ServiceObjTrait},
},
fuchsia_inspect::{component, health::Reporter},
fuchsia_inspect_contrib::{inspect_log, nodes::BoundedListNode},
fuchsia_runtime::{take_startup_handle, HandleInfo, HandleType},
fuchsia_zircon as zx,
futures::{
channel::mpsc,
future::{self, abortable},
prelude::*,
},
io_util,
parking_lot::{Mutex, RwLock},
std::{
path::{Path, PathBuf},
sync::{Arc, Weak},
},
tracing::{debug, error, info, warn},
};
/// Options for ingesting logs.
pub struct LogOpts {
/// Whether or not logs coming from v2 components should be ingested.
pub ingest_v2_logs: bool,
}
/// Responsible for initializing an `Archivist` instance. Supports multiple configurations by
/// either calling or not calling methods on the builder like `install_controller_service`.
pub struct ArchivistBuilder {
/// Archive state, including the diagnostics repo which currently stores all logs.
state: ArchivistState,
/// True if pipeline exists.
pipeline_exists: bool,
/// Store for safe keeping,
_pipeline_nodes: Vec<fuchsia_inspect::Node>,
// Store for safe keeping.
_pipeline_configs: Vec<configs::PipelineConfig>,
/// ServiceFs object to server outgoing directory.
fs: ServiceFs<ServiceObj<'static, ()>>,
/// Receiver for stream which will process LogSink connections.
log_receiver: mpsc::UnboundedReceiver<Task<()>>,
/// Sender which is used to close the stream of LogSink connections.
///
/// Clones of the sender keep the receiver end of the channel open. As soon
/// as all clones are dropped or disconnected, the receiver will close. The
/// receiver must close for `Archivist::run` to return gracefully.
log_sender: mpsc::UnboundedSender<Task<()>>,
/// Receiver for stream which will process Log connections.
listen_receiver: mpsc::UnboundedReceiver<Task<()>>,
/// Sender which is used to close the stream of Log connections after log_sender
/// completes.
///
/// Clones of the sender keep the receiver end of the channel open. As soon
/// as all clones are dropped or disconnected, the receiver will close. The
/// receiver must close for `Archivist::run` to return gracefully.
listen_sender: mpsc::UnboundedSender<Task<()>>,
/// Listes for events coming from v1 and v2.
event_source_registry: EventSourceRegistry,
/// Recieve stop signal to kill this archivist.
stop_recv: Option<mpsc::Receiver<()>>,
}
impl ArchivistBuilder {
/// Creates new instance, sets up inspect and adds 'archive' directory to output folder.
/// Also installs `fuchsia.diagnostics.Archive` service.
/// Call `install_log_services`, `add_event_source`.
pub fn new(archivist_configuration: configs::Config) -> Result<Self, Error> {
let (log_sender, log_receiver) = mpsc::unbounded();
let (listen_sender, listen_receiver) = mpsc::unbounded();
let mut fs = ServiceFs::new();
diagnostics::serve(&mut fs)?;
let writer = archivist_configuration.archive_path.as_ref().and_then(|archive_path| {
maybe_create_archive(&mut fs, archive_path)
.or_else(|e| {
// TODO(fxbug.dev/57271): this is not expected in regular builds of the archivist. It's
// happening when starting the zircon_guest (fx shell guest launch zircon_guest)
// We'd normally fail if we couldn't create the archive, but for now we include
// a warning.
warn!(
path = %archive_path.display(), ?e,
"Failed to create archive"
);
Err(e)
})
.ok()
});
let pipelines_node = diagnostics::root().create_child("pipelines");
let feedback_pipeline_node = pipelines_node.create_child("feedback");
let legacy_pipeline_node = pipelines_node.create_child("legacy_metrics");
let mut feedback_config = configs::PipelineConfig::from_directory(
"/config/data/feedback",
configs::EmptyBehavior::DoNotFilter,
);
feedback_config.record_to_inspect(&feedback_pipeline_node);
let mut legacy_config = configs::PipelineConfig::from_directory(
"/config/data/legacy_metrics",
configs::EmptyBehavior::Disable,
);
legacy_config.record_to_inspect(&legacy_pipeline_node);
// Do not set the state to error if the pipelines simply do not exist.
let pipeline_exists = !((Path::new("/config/data/feedback").is_dir()
&& feedback_config.has_error())
|| (Path::new("/config/data/legacy_metrics").is_dir() && legacy_config.has_error()));
let logs_budget =
BudgetManager::new(archivist_configuration.logs.max_cached_original_bytes);
let diagnostics_repo = DataRepo::new(&logs_budget, diagnostics::root());
// The Inspect Repository offered to the ALL_ACCESS pipeline. This
// repository is unique in that it has no statically configured
// selectors, meaning all diagnostics data is visible.
// This should not be used for production services.
// TODO(fxbug.dev/55735): Lock down this protocol using allowlists.
let all_access_pipeline =
Arc::new(RwLock::new(Pipeline::new(None, Redactor::noop(), diagnostics_repo.clone())));
// The Inspect Repository offered to the Feedback pipeline. This repository applies
// static selectors configured under config/data/feedback to inspect exfiltration.
let (feedback_static_selectors, feedback_redactor) = if !feedback_config.disable_filtering {
(
feedback_config.take_inspect_selectors().map(|selectors| {
selectors
.into_iter()
.map(|selector| Arc::new(selector))
.collect::<Vec<Arc<Selector>>>()
}),
Redactor::with_static_patterns(),
)
} else {
(None, Redactor::noop())
};
let feedback_pipeline = Arc::new(RwLock::new(Pipeline::new(
feedback_static_selectors,
feedback_redactor,
diagnostics_repo.clone(),
)));
// The Inspect Repository offered to the LegacyMetrics
// pipeline. This repository applies static selectors configured
// under config/data/legacy_metrics to inspect exfiltration.
let legacy_metrics_pipeline = Arc::new(RwLock::new(Pipeline::new(
match legacy_config.disable_filtering {
false => legacy_config.take_inspect_selectors().map(|selectors| {
selectors
.into_iter()
.map(|selector| Arc::new(selector))
.collect::<Vec<Arc<Selector>>>()
}),
true => None,
},
Redactor::noop(),
diagnostics_repo.clone(),
)));
// TODO(fxbug.dev/55736): Refactor this code so that we don't store
// diagnostics data N times if we have N pipelines. We should be
// storing a single copy regardless of the number of pipelines.
let archivist_state = ArchivistState::new(
archivist_configuration,
vec![
all_access_pipeline.clone(),
feedback_pipeline.clone(),
legacy_metrics_pipeline.clone(),
],
diagnostics_repo,
logs_budget,
writer,
)?;
let all_accessor_stats = Arc::new(diagnostics::AccessorStats::new(
component::inspector().root().create_child("all_archive_accessor"),
));
let feedback_accessor_stats = Arc::new(diagnostics::AccessorStats::new(
component::inspector().root().create_child("feedback_archive_accessor"),
));
let legacy_accessor_stats = Arc::new(diagnostics::AccessorStats::new(
component::inspector().root().create_child("legacy_metrics_archive_accessor"),
));
let sender_for_accessor = listen_sender.clone();
let sender_for_feedback = listen_sender.clone();
let sender_for_legacy = listen_sender.clone();
fs.dir("svc")
.add_fidl_service(move |stream| {
debug!("fuchsia.diagnostics.ArchiveAccessor connection");
let all_archive_accessor =
ArchiveAccessor::new(all_access_pipeline.clone(), all_accessor_stats.clone());
all_archive_accessor
.spawn_archive_accessor_server(stream, sender_for_accessor.clone());
})
.add_fidl_service_at(constants::FEEDBACK_ARCHIVE_ACCESSOR_NAME, move |chan| {
debug!("fuchsia.diagnostics.FeedbackArchiveAccessor connection");
let feedback_archive_accessor = ArchiveAccessor::new(
feedback_pipeline.clone(),
feedback_accessor_stats.clone(),
);
feedback_archive_accessor
.spawn_archive_accessor_server(chan, sender_for_feedback.clone());
})
.add_fidl_service_at(constants::LEGACY_METRICS_ARCHIVE_ACCESSOR_NAME, move |chan| {
debug!("fuchsia.diagnostics.LegacyMetricsAccessor connection");
let legacy_archive_accessor = ArchiveAccessor::new(
legacy_metrics_pipeline.clone(),
legacy_accessor_stats.clone(),
);
legacy_archive_accessor
.spawn_archive_accessor_server(chan, sender_for_legacy.clone());
});
let events_node = diagnostics::root().create_child("event_stats");
Ok(Self {
fs,
state: archivist_state,
log_receiver,
log_sender,
listen_receiver,
listen_sender,
pipeline_exists,
_pipeline_nodes: vec![pipelines_node, feedback_pipeline_node, legacy_pipeline_node],
_pipeline_configs: vec![feedback_config, legacy_config],
event_source_registry: EventSourceRegistry::new(events_node),
stop_recv: None,
})
}
pub fn data_repo(&self) -> &DataRepo {
&self.state.diagnostics_repo
}
pub fn log_sender(&self) -> &mpsc::UnboundedSender<Task<()>> {
&self.log_sender
}
// TODO(fxbug.dev/72046) delete when netemul no longer using
pub fn consume_own_logs(&self, socket: zx::Socket) {
let container = self.data_repo().write().get_own_log_container();
fasync::Task::spawn(async move {
let log_stream =
LogMessageSocket::new(socket, container.identity.clone(), container.stats.clone())
.expect("failed to create internal LogMessageSocket");
container.drain_messages(log_stream).await;
unreachable!();
})
.detach();
}
/// Install controller service.
pub fn install_controller_service(&mut self) -> &mut Self {
let (stop_sender, stop_recv) = mpsc::channel(0);
self.fs
.dir("svc")
.add_fidl_service(move |stream| Self::spawn_controller(stream, stop_sender.clone()));
self.stop_recv = Some(stop_recv);
debug!("Controller services initialized.");
self
}
/// The spawned controller listens for the stop request and forwards that to the archivist stop
/// channel if received.
fn spawn_controller(mut stream: ControllerRequestStream, mut stop_sender: mpsc::Sender<()>) {
fasync::Task::spawn(
async move {
while let Some(ControllerRequest::Stop { .. }) = stream.try_next().await? {
debug!("Stop request received.");
stop_sender.send(()).await.ok();
break;
}
Ok(())
}
.map(|o: Result<(), fidl::Error>| {
if let Err(e) = o {
error!(%e, "error serving controller");
}
}),
)
.detach();
}
fn take_lifecycle_channel() -> LifecycleRequestStream {
let lifecycle_handle_info = HandleInfo::new(HandleType::Lifecycle, 0);
let lifecycle_handle = take_startup_handle(lifecycle_handle_info)
.expect("must have been provided a lifecycle channel in procargs");
let x: zx::Channel = lifecycle_handle.into();
let async_x = AsyncChannel::from(
fasync::Channel::from_channel(x).expect("Async channel conversion failed."),
);
LifecycleRequestStream::from_channel(async_x)
}
pub fn install_lifecycle_listener(&mut self) -> &mut Self {
let (mut stop_sender, stop_recv) = mpsc::channel(0);
let mut req_stream = Self::take_lifecycle_channel();
Task::spawn(async move {
debug!("Awaiting request to close");
while let Some(LifecycleRequest::Stop { .. }) =
req_stream.try_next().await.expect("Failure receiving lifecycle FIDL message")
{
info!("Initiating shutdown.");
stop_sender.send(()).await.unwrap();
}
})
.detach();
self.stop_recv = Some(stop_recv);
debug!("Lifecycle listener initialized.");
self
}
/// Installs `LogSink` and `Log` services. Panics if called twice.
/// # Arguments:
/// * `log_connector` - If provided, install log connector.
pub async fn install_log_services(&mut self, opts: LogOpts) -> &mut Self {
let data_repo_1 = self.data_repo().clone();
let listen_sender = self.listen_sender.clone();
let unattributed_log_sink_source = UnattributedLogSinkSource::new();
let unattributed_log_sink_publisher = unattributed_log_sink_source.get_publisher();
self.event_source_registry
.add_source("unattributed_log_sink", Box::new(unattributed_log_sink_source))
.await;
self.fs
.dir("svc")
.add_fidl_service(move |stream| {
debug!("fuchsia.logger.Log connection");
data_repo_1.clone().handle_log(stream, listen_sender.clone());
})
.add_fidl_service(move |stream| {
debug!("unattributed fuchsia.logger.LogSink connection");
let mut publisher = unattributed_log_sink_publisher.clone();
// TODO(fxbug.dev/67769): get rid of this Task spawn since it introduces a small
// window in which we might lose LogSinks.
fasync::Task::spawn(async move {
publisher.send(stream).await.unwrap_or_else(|err| {
error!(?err, "Failed to add unattributed LogSink connection")
})
})
.detach();
});
if opts.ingest_v2_logs {
debug!("fuchsia.sys.EventStream connection");
let event_source = connect_to_service::<EventSourceMarker>().unwrap();
match event_source.take_static_event_stream("EventStream").await {
Ok(Ok(event_stream)) => {
let event_stream = event_stream.into_stream().unwrap();
self.event_source_registry
.add_source(
"v2_static_event_stream",
Box::new(StaticEventStream::new(event_stream)),
)
.await;
}
Ok(Err(err)) => debug!(?err, "Failed to open event stream"),
Err(err) => debug!(?err, "Failed to send request to take event stream"),
}
}
debug!("Log services initialized.");
self
}
// Sets event provider which is used to collect component events, Panics if called twice.
pub async fn add_event_source(
&mut self,
name: impl Into<String>,
source: Box<dyn EventSource>,
) -> &mut Self {
let name = name.into();
debug!("{} event source initialized", &name);
self.event_source_registry.add_source(name, source).await;
self
}
/// Run archivist to completion.
/// # Arguments:
/// * `outgoing_channel`- channel to serve outgoing directory on.
pub async fn run(mut self, outgoing_channel: zx::Channel) -> Result<(), Error> {
debug!("Running Archivist.");
let data_repo = { self.data_repo().clone() };
self.fs.serve_connection(outgoing_channel)?;
// Start servcing all outgoing services.
let run_outgoing = self.fs.collect::<()>();
// collect events.
let events = self.event_source_registry.take_stream().await.expect("Created event stream");
let logs_budget = self.state.logs_budget.handle();
let run_event_collection_task = fasync::Task::spawn(Self::collect_component_events(
events,
self.log_sender.clone(),
self.state,
self.pipeline_exists,
));
// Process messages from log sink.
let log_receiver = self.log_receiver;
let listen_receiver = self.listen_receiver;
let all_msg = async {
log_receiver.for_each_concurrent(None, |rx| async move { rx.await }).await;
debug!("Log ingestion stopped.");
data_repo.terminate_logs();
logs_budget.terminate();
debug!("Flushing to listeners.");
listen_receiver.for_each_concurrent(None, |rx| async move { rx.await }).await;
debug!("Log listeners and batch iterators stopped.");
};
let (abortable_fut, abort_handle) = abortable(run_outgoing);
let mut listen_sender = self.listen_sender;
let mut log_sender = self.log_sender;
let event_source_registry = self.event_source_registry;
let stop_fut = match self.stop_recv {
Some(stop_recv) => async move {
stop_recv.into_future().await;
event_source_registry.terminate();
run_event_collection_task.await;
listen_sender.disconnect();
log_sender.disconnect();
abort_handle.abort()
}
.left_future(),
None => future::ready(()).right_future(),
};
debug!("Entering core loop.");
// Combine all three futures into a main future.
future::join3(abortable_fut, stop_fut, all_msg).map(|_| Ok(())).await
}
async fn collect_component_events(
events: ComponentEventStream,
log_sender: mpsc::UnboundedSender<Task<()>>,
state: ArchivistState,
pipeline_exists: bool,
) {
if !pipeline_exists {
component::health().set_unhealthy("Pipeline config has an error");
} else {
component::health().set_ok();
}
state.run(events, log_sender).await
}
}
/// ArchivistState owns the tools needed to persist data
/// to the archive, as well as the service-specific repositories
/// that are populated by the archivist server and exposed in the
/// service sessions.
pub struct ArchivistState {
/// Writer for the archive. If a path was not configured it will be `None`.
writer: Option<archive::ArchiveWriter>,
log_node: BoundedListNode,
configuration: configs::Config,
diagnostics_pipelines: Vec<Arc<RwLock<Pipeline>>>,
pub diagnostics_repo: DataRepo,
/// The overall capacity we enforce for log messages across containers.
logs_budget: BudgetManager,
}
impl ArchivistState {
pub fn new(
configuration: configs::Config,
diagnostics_pipelines: Vec<Arc<RwLock<Pipeline>>>,
diagnostics_repo: DataRepo,
logs_budget: BudgetManager,
writer: Option<archive::ArchiveWriter>,
) -> Result<Self, Error> {
let mut log_node = BoundedListNode::new(
diagnostics::root().create_child("events"),
INSPECT_LOG_WINDOW_SIZE,
);
inspect_log!(log_node, event: "Archivist started");
Ok(ArchivistState {
writer,
log_node,
configuration,
diagnostics_pipelines,
diagnostics_repo,
logs_budget,
})
}
pub async fn run(
self,
mut events: ComponentEventStream,
log_sender: mpsc::UnboundedSender<Task<()>>,
) {
let state = Arc::new(Mutex::new(self));
let remover = ComponentRemover { inner: Arc::downgrade(&state) };
state.lock().logs_budget.set_remover(remover);
let archivist = Archivist { state, log_sender };
while let Some(event) = events.next().await {
archivist.clone().process_event(event).await.unwrap_or_else(|e| {
let mut state = archivist.lock();
inspect_log!(state.log_node, event: "Failed to log event", result: format!("{:?}", e));
error!(?e, "Failed to log event");
});
}
}
fn maybe_remove_component(&mut self, identity: &ComponentIdentity) {
if !self.diagnostics_repo.is_live(identity) {
debug!(%identity, "Removing component from repository.");
self.diagnostics_repo.write().data_directories.remove(&identity.unique_key);
// TODO(fxbug.dev/55736): The pipeline specific updates should be happening asynchronously.
for pipeline in &self.diagnostics_pipelines {
pipeline.write().remove(&identity.relative_moniker);
}
}
}
}
#[derive(Clone)]
pub struct Archivist {
state: Arc<Mutex<ArchivistState>>,
log_sender: mpsc::UnboundedSender<Task<()>>,
}
impl std::ops::Deref for Archivist {
type Target = Mutex<ArchivistState>;
fn deref(&self) -> &Self::Target {
&*self.state
}
}
impl Archivist {
async fn populate_inspect_repo(&self, diagnostics_ready_data: DiagnosticsReadyEvent) {
// The DiagnosticsReadyEvent should always contain a directory_proxy. Its existence
// as an Option is only to support mock objects for equality in tests.
let diagnostics_proxy = diagnostics_ready_data.directory.unwrap();
let identity = diagnostics_ready_data.metadata.identity.clone();
let locked_state = &self.lock();
// Update the central repository to reference the new diagnostics source.
locked_state
.diagnostics_repo
.write()
.add_inspect_artifacts(
identity.clone(),
diagnostics_proxy,
diagnostics_ready_data.metadata.timestamp.clone(),
)
.unwrap_or_else(|e| {
warn!(%identity, ?e, "Failed to add inspect artifacts to repository");
});
// Let each pipeline know that a new component arrived, and allow the pipeline
// to eagerly bucket static selectors based on that component's moniker.
// TODO(fxbug.dev/55736): The pipeline specific updates should be happening asynchronously.
for pipeline in &locked_state.diagnostics_pipelines {
pipeline.write().add_inspect_artifacts(&identity.relative_moniker).unwrap_or_else(
|e| {
warn!(%identity, ?e, "Failed to add inspect artifacts to pipeline wrapper");
},
);
}
}
/// Updates the central repository to reference the new diagnostics source.
fn add_new_component(
&self,
identity: ComponentIdentity,
event_timestamp: zx::Time,
component_start_time: Option<zx::Time>,
) {
if let Err(e) = self.lock().diagnostics_repo.write().add_new_component(
identity.clone(),
event_timestamp,
component_start_time,
) {
error!(%identity, ?e, "Failed to add new component to repository");
}
}
fn mark_component_stopped(&self, identity: &ComponentIdentity) {
let mut this = self.state.lock();
// TODO(fxbug.dev/53939): Get inspect data from repository before removing
// for post-mortem inspection.
this.diagnostics_repo.write().mark_stopped(&identity.unique_key);
this.maybe_remove_component(identity);
}
async fn process_event(self, event: ComponentEvent) -> Result<(), Error> {
match event {
ComponentEvent::Start(start) => {
let archived_metadata = start.metadata.clone();
debug!(identity = %start.metadata.identity, "Adding new component.");
self.add_new_component(start.metadata.identity, start.metadata.timestamp, None);
self.archive_event("START", archived_metadata).await
}
ComponentEvent::Running(running) => {
let archived_metadata = running.metadata.clone();
debug!(identity = %running.metadata.identity, "Component is running.");
self.add_new_component(
running.metadata.identity,
running.metadata.timestamp,
Some(running.component_start_time),
);
self.archive_event("RUNNING", archived_metadata.clone()).await
}
ComponentEvent::Stop(stop) => {
debug!(identity = %stop.metadata.identity, "Component stopped");
self.mark_component_stopped(&stop.metadata.identity);
self.archive_event("STOP", stop.metadata).await
}
ComponentEvent::DiagnosticsReady(diagnostics_ready) => {
debug!(
identity = %diagnostics_ready.metadata.identity,
"Diagnostics directory is ready.",
);
self.populate_inspect_repo(diagnostics_ready).await;
Ok(())
}
ComponentEvent::LogSinkRequested(event) => {
let locked_state = self.state.lock();
let data_repo = &locked_state.diagnostics_repo;
let container = data_repo.write().get_log_container(event.metadata.identity);
container.handle_log_sink(event.requests, self.log_sender.clone());
Ok(())
}
}
}
async fn archive_event(
&self,
_event_name: &str,
_event_data: EventMetadata,
) -> Result<(), Error> {
let mut state = self.lock();
let ArchivistState { writer, log_node, configuration, .. } = &mut *state;
let writer = if let Some(w) = writer.as_mut() {
w
} else {
return Ok(());
};
let max_archive_size_bytes = configuration.max_archive_size_bytes;
let max_event_group_size_bytes = configuration.max_event_group_size_bytes;
// TODO(fxbug.dev/53939): Get inspect data from repository before removing
// for post-mortem inspection.
//let log = writer.get_log().new_event(event_name, event_data.component_id);
// if let Some(data_map) = event_data.component_data_map {
// for (path, object) in data_map {
// match object {
// InspectData::Empty
// | InspectData::DeprecatedFidl(_)
// | InspectData::Tree(_, None) => {}
// InspectData::Vmo(vmo) | InspectData::Tree(_, Some(vmo)) => {
// let mut contents = vec![0u8; vmo.get_size()? as usize];
// vmo.read(&mut contents[..], 0)?;
// // Truncate the bytes down to the last non-zero 4096-byte page of data.
// // TODO(fxbug.dev/4703): Handle truncation of VMOs without reading the whole thing.
// let mut last_nonzero = 0;
// for (i, v) in contents.iter().enumerate() {
// if *v != 0 {
// last_nonzero = i;
// }
// }
// if last_nonzero % 4096 != 0 {
// last_nonzero = last_nonzero + 4096 - last_nonzero % 4096;
// }
// contents.resize(last_nonzero, 0);
// log = log.add_event_file(path, &contents);
// }
// InspectData::File(contents) => {
// log = log.add_event_file(path, &contents);
// }
// }
// }
// }
let current_group_stats = writer.get_log().get_stats();
if current_group_stats.size >= max_event_group_size_bytes {
let (path, stats) = writer.rotate_log()?;
inspect_log!(log_node, event:"Rotated log",
new_path: path.to_string_lossy().to_string());
writer.add_group_stat(&path, stats);
}
let archived_size = writer.archived_size();
let mut current_archive_size = current_group_stats.size + archived_size;
if current_archive_size > max_archive_size_bytes {
let dates = writer.get_archive().get_dates().unwrap_or_else(|e| {
warn!("Garbage collection failure");
inspect_log!(log_node, event: "Failed to get dates for garbage collection",
reason: format!("{:?}", e));
vec![]
});
for date in dates {
let groups =
writer.get_archive().get_event_file_groups(&date).unwrap_or_else(|e| {
warn!("Garbage collection failure");
inspect_log!(log_node, event: "Failed to get event file",
date: &date,
reason: format!("{:?}", e));
vec![]
});
for group in groups {
let path = group.log_file_path();
match group.delete() {
Err(e) => {
inspect_log!(log_node, event: "Failed to remove group",
path: &path,
reason: format!(
"{:?}", e));
continue;
}
Ok(stat) => {
current_archive_size -= stat.size;
writer.remove_group_stat(&PathBuf::from(&path));
inspect_log!(log_node, event: "Garbage collected group",
path: &path,
removed_files: stat.file_count as u64,
removed_bytes: stat.size as u64);
}
};
if current_archive_size < max_archive_size_bytes {
return Ok(());
}
}
}
}
Ok(())
}
}
/// Provides weak access to ArchivistState for BudgetManager to use when it pops the last message
/// from the buffer of a stopped component.
#[derive(Clone, Default)]
pub struct ComponentRemover {
inner: Weak<Mutex<ArchivistState>>,
}
impl ComponentRemover {
pub fn maybe_remove_component(&self, identity: &ComponentIdentity) {
if let Some(this) = self.inner.upgrade() {
this.lock().maybe_remove_component(identity);
}
}
}
fn maybe_create_archive<ServiceObjTy: ServiceObjTrait>(
fs: &mut ServiceFs<ServiceObjTy>,
archive_path: &PathBuf,
) -> Result<archive::ArchiveWriter, Error> {
let writer = archive::ArchiveWriter::open(archive_path.clone())?;
fs.add_remote(
"archive",
io_util::open_directory_in_namespace(
&archive_path.to_string_lossy(),
io_util::OPEN_RIGHT_READABLE | io_util::OPEN_RIGHT_WRITABLE,
)?,
);
Ok(writer)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES, logs::testing::*};
use fidl::endpoints::create_proxy;
use fidl_fuchsia_diagnostics_test::ControllerMarker;
use fidl_fuchsia_io as fio;
use fio::DirectoryProxy;
use fuchsia_async as fasync;
use fuchsia_component::client::connect_to_protocol_at_dir_svc;
use futures::channel::oneshot;
fn init_archivist() -> ArchivistBuilder {
let config = configs::Config {
archive_path: None,
max_archive_size_bytes: 10,
max_event_group_size_bytes: 10,
num_threads: 1,
logs: configs::LogsConfig {
max_cached_original_bytes: LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES,
},
};
ArchivistBuilder::new(config).unwrap()
}
// run archivist and send signal when it dies.
async fn run_archivist_and_signal_on_exit() -> (DirectoryProxy, oneshot::Receiver<()>) {
let (directory, server_end) = create_proxy::<fio::DirectoryMarker>().unwrap();
let mut archivist = init_archivist();
archivist
.install_log_services(LogOpts { ingest_v2_logs: false })
.await
.install_controller_service();
let (signal_send, signal_recv) = oneshot::channel();
fasync::Task::spawn(async move {
archivist.run(server_end.into_channel()).await.expect("Cannot run archivist");
signal_send.send(()).unwrap();
})
.detach();
(directory, signal_recv)
}
// runs archivist and returns its directory.
async fn run_archivist() -> DirectoryProxy {
let (directory, server_end) = create_proxy::<fio::DirectoryMarker>().unwrap();
let mut archivist = init_archivist();
archivist.install_log_services(LogOpts { ingest_v2_logs: false }).await;
fasync::Task::spawn(async move {
archivist.run(server_end.into_channel()).await.expect("Cannot run archivist");
})
.detach();
directory
}
#[fuchsia::test]
async fn can_log_and_retrive_log() {
let directory = run_archivist().await;
let mut recv_logs = start_listener(&directory);
let mut log_helper = LogSinkHelper::new(&directory);
log_helper.write_log("my msg1");
log_helper.write_log("my msg2");
assert_eq!(
vec! {Some("my msg1".to_owned()),Some("my msg2".to_owned())},
vec! {recv_logs.next().await,recv_logs.next().await}
);
// new client can log
let mut log_helper2 = LogSinkHelper::new(&directory);
log_helper2.write_log("my msg1");
log_helper.write_log("my msg2");
let mut expected = vec!["my msg1".to_owned(), "my msg2".to_owned()];
expected.sort();
let mut actual = vec![recv_logs.next().await.unwrap(), recv_logs.next().await.unwrap()];
actual.sort();
assert_eq!(expected, actual);
// can log after killing log sink proxy
log_helper.kill_log_sink();
log_helper.write_log("my msg1");
log_helper.write_log("my msg2");
assert_eq!(
expected,
vec! {recv_logs.next().await.unwrap(),recv_logs.next().await.unwrap()}
);
// can log from new socket cnonnection
log_helper2.add_new_connection();
log_helper2.write_log("my msg1");
log_helper2.write_log("my msg2");
assert_eq!(
expected,
vec! {recv_logs.next().await.unwrap(),recv_logs.next().await.unwrap()}
);
}
/// Makes sure that implementaion can handle multiple sockets from same
/// log sink.
#[fuchsia::test]
async fn log_from_multiple_sock() {
let directory = run_archivist().await;
let mut recv_logs = start_listener(&directory);
let log_helper = LogSinkHelper::new(&directory);
let sock1 = log_helper.connect();
let sock2 = log_helper.connect();
let sock3 = log_helper.connect();
LogSinkHelper::write_log_at(&sock1, "msg sock1-1");
LogSinkHelper::write_log_at(&sock2, "msg sock2-1");
LogSinkHelper::write_log_at(&sock1, "msg sock1-2");
LogSinkHelper::write_log_at(&sock3, "msg sock3-1");
LogSinkHelper::write_log_at(&sock2, "msg sock2-2");
let mut expected = vec![
"msg sock1-1".to_owned(),
"msg sock1-2".to_owned(),
"msg sock2-1".to_owned(),
"msg sock2-2".to_owned(),
"msg sock3-1".to_owned(),
];
expected.sort();
let mut actual = vec![
recv_logs.next().await.unwrap(),
recv_logs.next().await.unwrap(),
recv_logs.next().await.unwrap(),
recv_logs.next().await.unwrap(),
recv_logs.next().await.unwrap(),
];
actual.sort();
assert_eq!(expected, actual);
}
/// Stop API works
#[fuchsia::test]
async fn stop_works() {
let (directory, signal_recv) = run_archivist_and_signal_on_exit().await;
let mut recv_logs = start_listener(&directory);
{
// make sure we can write logs
let log_sink_helper = LogSinkHelper::new(&directory);
let sock1 = log_sink_helper.connect();
LogSinkHelper::write_log_at(&sock1, "msg sock1-1");
log_sink_helper.write_log("msg sock1-2");
let mut expected = vec!["msg sock1-1".to_owned(), "msg sock1-2".to_owned()];
expected.sort();
let mut actual = vec![recv_logs.next().await.unwrap(), recv_logs.next().await.unwrap()];
actual.sort();
assert_eq!(expected, actual);
// Start new connections and sockets
let log_sink_helper1 = LogSinkHelper::new(&directory);
let sock2 = log_sink_helper.connect();
// Write logs before calling stop
log_sink_helper1.write_log("msg 1");
log_sink_helper1.write_log("msg 2");
let log_sink_helper2 = LogSinkHelper::new(&directory);
let controller = connect_to_protocol_at_dir_svc::<ControllerMarker>(&directory)
.expect("cannot connect to log proxy");
controller.stop().unwrap();
// make more socket connections and write to them and old ones.
let sock3 = log_sink_helper2.connect();
log_sink_helper2.write_log("msg 3");
log_sink_helper2.write_log("msg 4");
LogSinkHelper::write_log_at(&sock3, "msg 5");
LogSinkHelper::write_log_at(&sock2, "msg 6");
log_sink_helper.write_log("msg 7");
LogSinkHelper::write_log_at(&sock1, "msg 8");
LogSinkHelper::write_log_at(&sock2, "msg 9");
} // kills all sockets and log_sink connections
let mut expected = vec![];
let mut actual = vec![];
for i in 1..=9 {
expected.push(format!("msg {}", i));
actual.push(recv_logs.next().await.unwrap());
}
expected.sort();
actual.sort();
// make sure archivist is dead.
signal_recv.await.unwrap();
assert_eq!(expected, actual);
}
}