blob: 861b4ba4b44ed63a3e8217809390cd19a4531144 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
accessor::ArchiveAccessor,
archive, configs, constants, diagnostics,
events::{stream::EventStream, types::EventSource},
logs::redact::Redactor,
pipeline::Pipeline,
repository::DataRepo,
},
anyhow::Error,
fidl::{endpoints::RequestStream, AsyncChannel},
fidl_fuchsia_diagnostics::Selector,
fidl_fuchsia_diagnostics_test::{ControllerRequest, ControllerRequestStream},
fidl_fuchsia_process_lifecycle::{LifecycleRequest, LifecycleRequestStream},
fidl_fuchsia_sys_internal::SourceIdentity,
fuchsia_async::{self as fasync, Task},
fuchsia_component::server::{ServiceFs, ServiceObj, ServiceObjTrait},
fuchsia_inspect::{component, health::Reporter},
fuchsia_runtime::{take_startup_handle, HandleInfo, HandleType},
fuchsia_zircon as zx,
futures::{
channel::mpsc,
future::{self, abortable},
prelude::*,
},
io_util,
parking_lot::RwLock,
std::{
path::{Path, PathBuf},
sync::Arc,
},
tracing::{debug, error, info, warn},
};
/// Spawns controller sends stop signal.
fn spawn_controller(mut stream: ControllerRequestStream, mut stop_sender: mpsc::Sender<()>) {
fasync::Task::spawn(
async move {
while let Some(ControllerRequest::Stop { .. }) = stream.try_next().await? {
debug!("Stop request received.");
stop_sender.send(()).await.ok();
break;
}
Ok(())
}
.map(|o: Result<(), fidl::Error>| {
if let Err(e) = o {
error!(%e, "error serving controller");
}
}),
)
.detach();
}
fn maybe_create_archive<ServiceObjTy: ServiceObjTrait>(
fs: &mut ServiceFs<ServiceObjTy>,
archive_path: &PathBuf,
) -> Result<archive::ArchiveWriter, Error> {
let writer = archive::ArchiveWriter::open(archive_path.clone())?;
fs.add_remote(
"archive",
io_util::open_directory_in_namespace(
&archive_path.to_string_lossy(),
io_util::OPEN_RIGHT_READABLE | io_util::OPEN_RIGHT_WRITABLE,
)?,
);
Ok(writer)
}
/// The `Archivist` is responsible for publishing all the services and monitoring component's health.
/// # All resposibilities:
/// * Run and process Log Sink connections on main future.
/// * Run and Process Log Listener connections by spawning them.
/// * Optionally collect component events.
pub struct Archivist {
/// Archive state, including the diagnostics repo which currently stores all logs.
state: archive::ArchivistState,
/// True if pipeline exists.
pipeline_exists: bool,
/// Store for safe keeping,
_pipeline_nodes: Vec<fuchsia_inspect::Node>,
// Store for safe keeping.
_pipeline_configs: Vec<configs::PipelineConfig>,
/// ServiceFs object to server outgoing directory.
fs: ServiceFs<ServiceObj<'static, ()>>,
/// Receiver for stream which will process LogSink connections.
log_receiver: mpsc::UnboundedReceiver<Task<()>>,
/// Sender which is used to close the stream of LogSink connections.
///
/// Clones of the sender keep the receiver end of the channel open. As soon
/// as all clones are dropped or disconnected, the receiver will close. The
/// receiver must close for `Archivist::run` to return gracefully.
log_sender: mpsc::UnboundedSender<Task<()>>,
/// Receiver for stream which will process Log connections.
listen_receiver: mpsc::UnboundedReceiver<Task<()>>,
/// Sender which is used to close the stream of Log connections after log_sender
/// completes.
///
/// Clones of the sender keep the receiver end of the channel open. As soon
/// as all clones are dropped or disconnected, the receiver will close. The
/// receiver must close for `Archivist::run` to return gracefully.
listen_sender: mpsc::UnboundedSender<Task<()>>,
/// Listes for events coming from v1 and v2.
event_stream: EventStream,
/// Recieve stop signal to kill this archivist.
stop_recv: Option<mpsc::Receiver<()>>,
}
impl Archivist {
async fn collect_component_events(
event_stream: EventStream,
state: archive::ArchivistState,
pipeline_exists: bool,
) {
let events = event_stream.listen().await;
if !pipeline_exists {
component::health().set_unhealthy("Pipeline config has an error");
} else {
component::health().set_ok();
}
archive::run_archivist(state, events).await
}
/// Install controller service.
pub fn install_controller_service(&mut self) -> &mut Self {
let (stop_sender, stop_recv) = mpsc::channel(0);
self.fs
.dir("svc")
.add_fidl_service(move |stream| spawn_controller(stream, stop_sender.clone()));
self.stop_recv = Some(stop_recv);
debug!("Controller services initialized.");
self
}
fn take_lifecycle_channel() -> LifecycleRequestStream {
let lifecycle_handle_info = HandleInfo::new(HandleType::Lifecycle, 0);
let lifecycle_handle = take_startup_handle(lifecycle_handle_info)
.expect("must have been provided a lifecycle channel in procargs");
let x: zx::Channel = lifecycle_handle.into();
let async_x = AsyncChannel::from(
fasync::Channel::from_channel(x).expect("Async channel conversion failed."),
);
LifecycleRequestStream::from_channel(async_x)
}
pub fn install_lifecycle_listener(&mut self) -> &mut Self {
let (mut stop_sender, stop_recv) = mpsc::channel(0);
let mut req_stream = Self::take_lifecycle_channel();
Task::spawn(async move {
debug!("Awaiting request to close");
while let Some(LifecycleRequest::Stop { .. }) =
req_stream.try_next().await.expect("Failure receiving lifecycle FIDL message")
{
info!("Initiating shutdown.");
stop_sender.send(()).await.unwrap();
}
})
.detach();
self.stop_recv = Some(stop_recv);
debug!("Lifecycle listener initialized.");
self
}
/// Installs `LogSink` and `Log` services. Panics if called twice.
/// # Arguments:
/// * `log_connector` - If provided, install log connector.
pub fn install_logger_services(&mut self) -> &mut Self {
let data_repo_1 = self.data_repo().clone();
let data_repo_2 = self.data_repo().clone();
let data_repo_3 = self.data_repo().clone();
let log_sender = self.log_sender.clone();
let log_sender2 = self.log_sender.clone();
let listen_sender = self.listen_sender.clone();
self.fs
.dir("svc")
.add_fidl_service(move |stream| {
debug!("fuchsia.logger.Log connection");
data_repo_1.clone().handle_log(stream, listen_sender.clone());
})
.add_fidl_service(move |stream| {
debug!("fuchsia.logger.LogSink connection");
let source = Arc::new(SourceIdentity::EMPTY);
fasync::Task::spawn(data_repo_2.clone().handle_log_sink(
stream,
source,
log_sender.clone(),
))
.detach();
})
.add_fidl_service(move |stream| {
debug!("fuchsia.sys.EventStream connection");
fasync::Task::spawn(
data_repo_3.clone().handle_event_stream(stream, log_sender2.clone()),
)
.detach()
});
debug!("Log services initialized.");
self
}
// Sets event provider which is used to collect component events, Panics if called twice.
pub fn add_event_source(
&mut self,
name: impl Into<String>,
source: Box<dyn EventSource>,
) -> &mut Self {
let name = name.into();
debug!("{} event source initialized", &name);
self.event_stream.add_source(name, source);
self
}
/// Creates new instance, sets up inspect and adds 'archive' directory to output folder.
/// Also installs `fuchsia.diagnostics.Archive` service.
/// Call `install_logger_services`, `add_event_source`.
pub fn new(archivist_configuration: configs::Config) -> Result<Self, Error> {
let (log_sender, log_receiver) = mpsc::unbounded();
let (listen_sender, listen_receiver) = mpsc::unbounded();
let mut fs = ServiceFs::new();
diagnostics::serve(&mut fs)?;
let writer = archivist_configuration.archive_path.as_ref().and_then(|archive_path| {
maybe_create_archive(&mut fs, archive_path)
.or_else(|e| {
// TODO(fxbug.dev/57271): this is not expected in regular builds of the archivist. It's
// happening when starting the zircon_guest (fx shell guest launch zircon_guest)
// We'd normally fail if we couldn't create the archive, but for now we include
// a warning.
warn!(
path = %archive_path.display(), ?e,
"Failed to create archive"
);
Err(e)
})
.ok()
});
let pipelines_node = diagnostics::root().create_child("pipelines");
let feedback_pipeline_node = pipelines_node.create_child("feedback");
let legacy_pipeline_node = pipelines_node.create_child("legacy_metrics");
let mut feedback_config = configs::PipelineConfig::from_directory(
"/config/data/feedback",
configs::EmptyBehavior::DoNotFilter,
);
feedback_config.record_to_inspect(&feedback_pipeline_node);
let mut legacy_config = configs::PipelineConfig::from_directory(
"/config/data/legacy_metrics",
configs::EmptyBehavior::Disable,
);
legacy_config.record_to_inspect(&legacy_pipeline_node);
// Do not set the state to error if the pipelines simply do not exist.
let pipeline_exists = !((Path::new("/config/data/feedback").is_dir()
&& feedback_config.has_error())
|| (Path::new("/config/data/legacy_metrics").is_dir() && legacy_config.has_error()));
let diagnostics_repo = DataRepo::with_logs_inspect(diagnostics::root(), "log_stats");
// The Inspect Repository offered to the ALL_ACCESS pipeline. This
// repository is unique in that it has no statically configured
// selectors, meaning all diagnostics data is visible.
// This should not be used for production services.
// TODO(fxbug.dev/55735): Lock down this protocol using allowlists.
let all_access_pipeline =
Arc::new(RwLock::new(Pipeline::new(None, Redactor::noop(), diagnostics_repo.clone())));
// The Inspect Repository offered to the Feedback pipeline. This repository applies
// static selectors configured under config/data/feedback to inspect exfiltration.
let (feedback_static_selectors, feedback_redactor) = if !feedback_config.disable_filtering {
(
feedback_config.take_inspect_selectors().map(|selectors| {
selectors
.into_iter()
.map(|selector| Arc::new(selector))
.collect::<Vec<Arc<Selector>>>()
}),
Redactor::with_static_patterns(),
)
} else {
(None, Redactor::noop())
};
let feedback_pipeline = Arc::new(RwLock::new(Pipeline::new(
feedback_static_selectors,
feedback_redactor,
diagnostics_repo.clone(),
)));
// The Inspect Repository offered to the LegacyMetrics
// pipeline. This repository applies static selectors configured
// under config/data/legacy_metrics to inspect exfiltration.
let legacy_metrics_pipeline = Arc::new(RwLock::new(Pipeline::new(
match legacy_config.disable_filtering {
false => legacy_config.take_inspect_selectors().map(|selectors| {
selectors
.into_iter()
.map(|selector| Arc::new(selector))
.collect::<Vec<Arc<Selector>>>()
}),
true => None,
},
Redactor::noop(),
diagnostics_repo.clone(),
)));
// TODO(fxbug.dev/55736): Refactor this code so that we don't store
// diagnostics data N times if we have N pipelines. We should be
// storing a single copy regardless of the number of pipelines.
let archivist_state = archive::ArchivistState::new(
archivist_configuration,
vec![
all_access_pipeline.clone(),
feedback_pipeline.clone(),
legacy_metrics_pipeline.clone(),
],
diagnostics_repo,
writer,
)?;
let all_accessor_stats = Arc::new(diagnostics::AccessorStats::new(
component::inspector().root().create_child("all_archive_accessor"),
));
let feedback_accessor_stats = Arc::new(diagnostics::AccessorStats::new(
component::inspector().root().create_child("feedback_archive_accessor"),
));
let legacy_accessor_stats = Arc::new(diagnostics::AccessorStats::new(
component::inspector().root().create_child("legacy_metrics_archive_accessor"),
));
fs.dir("svc")
.add_fidl_service(move |stream| {
debug!("fuchsia.diagnostics.ArchiveAccessor connection");
let all_archive_accessor =
ArchiveAccessor::new(all_access_pipeline.clone(), all_accessor_stats.clone());
all_archive_accessor.spawn_archive_accessor_server(stream)
})
.add_fidl_service_at(constants::FEEDBACK_ARCHIVE_ACCESSOR_NAME, move |chan| {
debug!("fuchsia.diagnostics.FeedbackArchiveAccessor connection");
let feedback_archive_accessor = ArchiveAccessor::new(
feedback_pipeline.clone(),
feedback_accessor_stats.clone(),
);
feedback_archive_accessor.spawn_archive_accessor_server(chan)
})
.add_fidl_service_at(constants::LEGACY_METRICS_ARCHIVE_ACCESSOR_NAME, move |chan| {
debug!("fuchsia.diagnostics.LegacyMetricsAccessor connection");
let legacy_archive_accessor = ArchiveAccessor::new(
legacy_metrics_pipeline.clone(),
legacy_accessor_stats.clone(),
);
legacy_archive_accessor.spawn_archive_accessor_server(chan)
});
let events_node = diagnostics::root().create_child("event_stats");
Ok(Self {
fs,
state: archivist_state,
log_receiver,
log_sender,
listen_receiver,
listen_sender,
pipeline_exists,
_pipeline_nodes: vec![pipelines_node, feedback_pipeline_node, legacy_pipeline_node],
_pipeline_configs: vec![feedback_config, legacy_config],
event_stream: EventStream::new(events_node),
stop_recv: None,
})
}
pub fn data_repo(&self) -> &DataRepo {
&self.state.diagnostics_repo
}
pub fn log_sender(&self) -> &mpsc::UnboundedSender<Task<()>> {
&self.log_sender
}
/// Run archivist to completion.
/// # Arguments:
/// * `outgoing_channel`- channel to serve outgoing directory on.
pub async fn run(mut self, outgoing_channel: zx::Channel) -> Result<(), Error> {
debug!("Running Archivist.");
let data_repo = { self.data_repo().clone() };
self.fs.serve_connection(outgoing_channel)?;
// Start servcing all outgoing services.
let run_outgoing = self.fs.collect::<()>();
// collect events.
let run_event_collection =
Self::collect_component_events(self.event_stream, self.state, self.pipeline_exists);
// Process messages from log sink.
let log_receiver = self.log_receiver;
let listen_receiver = self.listen_receiver;
let all_msg = async {
log_receiver.for_each_concurrent(None, |rx| async move { rx.await }).await;
debug!("Log ingestion stopped.");
data_repo.terminate_logs();
debug!("Flushing to listeners.");
listen_receiver.for_each_concurrent(None, |rx| async move { rx.await }).await;
debug!("Log listeners stopped.");
};
let (abortable_fut, abort_handle) =
abortable(future::join(run_outgoing, run_event_collection));
let mut listen_sender = self.listen_sender;
let mut log_sender = self.log_sender;
let stop_fut = match self.stop_recv {
Some(stop_recv) => async move {
stop_recv.into_future().await;
listen_sender.disconnect();
log_sender.disconnect();
abort_handle.abort()
}
.left_future(),
None => future::ready(()).right_future(),
};
debug!("Entering core loop.");
// Combine all three futures into a main future.
future::join3(abortable_fut, stop_fut, all_msg).map(|_| Ok(())).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::logs::testing::*;
use fidl::endpoints::create_proxy;
use fidl_fuchsia_diagnostics_test::ControllerMarker;
use fidl_fuchsia_io as fio;
use fio::DirectoryProxy;
use fuchsia_async as fasync;
use fuchsia_component::client::connect_to_protocol_at_dir_svc;
use futures::channel::oneshot;
fn init_archivist() -> Archivist {
let config = configs::Config {
archive_path: None,
max_archive_size_bytes: 10,
max_event_group_size_bytes: 10,
num_threads: 1,
};
Archivist::new(config).unwrap()
}
// run archivist and send signal when it dies.
fn run_archivist_and_signal_on_exit() -> (DirectoryProxy, oneshot::Receiver<()>) {
let (directory, server_end) = create_proxy::<fio::DirectoryMarker>().unwrap();
let mut archivist = init_archivist();
archivist.install_logger_services().install_controller_service();
let (signal_send, signal_recv) = oneshot::channel();
fasync::Task::spawn(async move {
archivist.run(server_end.into_channel()).await.expect("Cannot run archivist");
signal_send.send(()).unwrap();
})
.detach();
(directory, signal_recv)
}
// runs archivist and returns its directory.
fn run_archivist() -> DirectoryProxy {
let (directory, server_end) = create_proxy::<fio::DirectoryMarker>().unwrap();
let mut archivist = init_archivist();
archivist.install_logger_services();
fasync::Task::spawn(async move {
archivist.run(server_end.into_channel()).await.expect("Cannot run archivist");
})
.detach();
directory
}
#[fasync::run_singlethreaded(test)]
async fn can_log_and_retrive_log() {
let directory = run_archivist();
let mut recv_logs = start_listener(&directory);
let mut log_helper = LogSinkHelper::new(&directory);
log_helper.write_log("my msg1");
log_helper.write_log("my msg2");
assert_eq!(
vec! {Some("my msg1".to_owned()),Some("my msg2".to_owned())},
vec! {recv_logs.next().await,recv_logs.next().await}
);
// new client can log
let mut log_helper2 = LogSinkHelper::new(&directory);
log_helper2.write_log("my msg1");
log_helper.write_log("my msg2");
let mut expected = vec!["my msg1".to_owned(), "my msg2".to_owned()];
expected.sort();
let mut actual = vec![recv_logs.next().await.unwrap(), recv_logs.next().await.unwrap()];
actual.sort();
assert_eq!(expected, actual);
// can log after killing log sink proxy
log_helper.kill_log_sink();
log_helper.write_log("my msg1");
log_helper.write_log("my msg2");
assert_eq!(
expected,
vec! {recv_logs.next().await.unwrap(),recv_logs.next().await.unwrap()}
);
// can log from new socket cnonnection
log_helper2.add_new_connection();
log_helper2.write_log("my msg1");
log_helper2.write_log("my msg2");
assert_eq!(
expected,
vec! {recv_logs.next().await.unwrap(),recv_logs.next().await.unwrap()}
);
}
/// Makes sure that implementaion can handle multiple sockets from same
/// log sink.
#[fasync::run_singlethreaded(test)]
async fn log_from_multiple_sock() {
let directory = run_archivist();
let mut recv_logs = start_listener(&directory);
let log_helper = LogSinkHelper::new(&directory);
let sock1 = log_helper.connect();
let sock2 = log_helper.connect();
let sock3 = log_helper.connect();
LogSinkHelper::write_log_at(&sock1, "msg sock1-1");
LogSinkHelper::write_log_at(&sock2, "msg sock2-1");
LogSinkHelper::write_log_at(&sock1, "msg sock1-2");
LogSinkHelper::write_log_at(&sock3, "msg sock3-1");
LogSinkHelper::write_log_at(&sock2, "msg sock2-2");
let mut expected = vec![
"msg sock1-1".to_owned(),
"msg sock1-2".to_owned(),
"msg sock2-1".to_owned(),
"msg sock2-2".to_owned(),
"msg sock3-1".to_owned(),
];
expected.sort();
let mut actual = vec![
recv_logs.next().await.unwrap(),
recv_logs.next().await.unwrap(),
recv_logs.next().await.unwrap(),
recv_logs.next().await.unwrap(),
recv_logs.next().await.unwrap(),
];
actual.sort();
assert_eq!(expected, actual);
}
/// Stop API works
#[fasync::run_singlethreaded(test)]
async fn stop_works() {
let (directory, signal_recv) = run_archivist_and_signal_on_exit();
let mut recv_logs = start_listener(&directory);
{
// make sure we can write logs
let log_sink_helper = LogSinkHelper::new(&directory);
let sock1 = log_sink_helper.connect();
LogSinkHelper::write_log_at(&sock1, "msg sock1-1");
log_sink_helper.write_log("msg sock1-2");
let mut expected = vec!["msg sock1-1".to_owned(), "msg sock1-2".to_owned()];
expected.sort();
let mut actual = vec![recv_logs.next().await.unwrap(), recv_logs.next().await.unwrap()];
actual.sort();
assert_eq!(expected, actual);
// Start new connections and sockets
let log_sink_helper1 = LogSinkHelper::new(&directory);
let sock2 = log_sink_helper.connect();
// Write logs before calling stop
log_sink_helper1.write_log("msg 1");
log_sink_helper1.write_log("msg 2");
let log_sink_helper2 = LogSinkHelper::new(&directory);
let controller = connect_to_protocol_at_dir_svc::<ControllerMarker>(&directory)
.expect("cannot connect to log proxy");
controller.stop().unwrap();
// make more socket connections and write to them and old ones.
let sock3 = log_sink_helper2.connect();
log_sink_helper2.write_log("msg 3");
log_sink_helper2.write_log("msg 4");
LogSinkHelper::write_log_at(&sock3, "msg 5");
LogSinkHelper::write_log_at(&sock2, "msg 6");
log_sink_helper.write_log("msg 7");
LogSinkHelper::write_log_at(&sock1, "msg 8");
LogSinkHelper::write_log_at(&sock2, "msg 9");
} // kills all sockets and log_sink connections
let mut expected = vec![];
let mut actual = vec![];
for i in 1..=9 {
expected.push(format!("msg {}", i));
actual.push(recv_logs.next().await.unwrap());
}
expected.sort();
actual.sort();
// make sure archivist is dead.
signal_recv.await.unwrap();
assert_eq!(expected, actual);
}
}