blob: e9f171caf7ed3b7124b97577bbe9174525dd05cc [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
component_lifecycle, diagnostics,
error::Error,
events::{
router::{ConsumerConfig, EventConsumer, EventRouter, ProducerConfig, ProducerType},
sources::{
ComponentEventProvider, EventSource, LogConnector, UnattributedLogSinkSource,
},
types::*,
},
identity::ComponentIdentity,
logs::{budget::BudgetManager, KernelDebugLog},
pipeline::Pipeline,
repository::DataRepo,
},
archivist_config::Config,
async_trait::async_trait,
fidl_fuchsia_io as fio, fidl_fuchsia_logger as flogger,
fidl_fuchsia_sys2::EventSourceMarker,
fidl_fuchsia_sys_internal as fsys_internal,
fuchsia_async::{self as fasync, Task},
fuchsia_component::{
client::connect_to_protocol,
server::{ServiceFs, ServiceObj},
},
fuchsia_inspect::{component, health::Reporter},
fuchsia_zircon as zx,
futures::{
channel::mpsc,
future::{self, abortable},
prelude::*,
},
parking_lot::RwLock,
std::{path::Path, sync::Arc},
tracing::{debug, error, info, warn},
};
/// Responsible for initializing an `Archivist` instance. Supports multiple configurations by
/// either calling or not calling methods on the builder like `serve_test_controller_protocol`.
pub struct Archivist {
/// Archive state, including the diagnostics repo which currently stores all logs.
archivist_state: Arc<ArchivistState>,
/// ServiceFs object to server outgoing directory.
fs: ServiceFs<ServiceObj<'static, ()>>,
/// Receiver for stream which will process LogSink connections.
log_receiver: mpsc::UnboundedReceiver<Task<()>>,
/// Sender which is used to close the stream of LogSink connections.
///
/// Clones of the sender keep the receiver end of the channel open. As soon
/// as all clones are dropped or disconnected, the receiver will close. The
/// receiver must close for `Archivist::run` to return gracefully.
log_sender: mpsc::UnboundedSender<Task<()>>,
/// Sender which is used to close the stream of Log connections after log_sender
/// completes.
///
/// Clones of the sender keep the receiver end of the channel open. As soon
/// as all clones are dropped or disconnected, the receiver will close. The
/// receiver must close for `Archivist::run` to return gracefully.
listen_sender: mpsc::UnboundedSender<Task<()>>,
/// Handles event routing between archivist parts.
event_router: EventRouter,
/// Recieve stop signal to kill this archivist.
stop_recv: Option<mpsc::Receiver<()>>,
/// Listens for lifecycle requests, to handle Stop requests.
_lifecycle_task: Option<fasync::Task<()>>,
/// Tasks that drains klog.
_drain_klog_task: Option<fasync::Task<()>>,
/// Task draining the receiver for the `listen_sender`s.
drain_listeners_task: fasync::Task<()>,
incoming_external_event_producers: Vec<fasync::Task<()>>,
}
impl Archivist {
/// Creates new instance, sets up inspect and adds 'archive' directory to output folder.
/// Also installs `fuchsia.diagnostics.Archive` service.
/// Call `install_log_services`
pub fn new(archivist_configuration: &Config) -> Result<Self, Error> {
let mut fs = ServiceFs::new();
diagnostics::serve(&mut fs)?;
let (log_sender, log_receiver) = mpsc::unbounded();
let (listen_sender, listen_receiver) = mpsc::unbounded();
let logs_budget =
BudgetManager::new(archivist_configuration.logs_max_cached_original_bytes as usize);
let diagnostics_repo = DataRepo::new(&logs_budget, component::inspector().root());
let pipelines_node = component::inspector().root().create_child("pipelines");
let pipelines_path = Path::new(&archivist_configuration.pipelines_path);
let pipelines = vec![
Pipeline::feedback(diagnostics_repo.clone(), &pipelines_path, &pipelines_node),
Pipeline::legacy_metrics(diagnostics_repo.clone(), &pipelines_path, &pipelines_node),
Pipeline::all_access(diagnostics_repo.clone(), &pipelines_path, &pipelines_node),
];
component::inspector().root().record(pipelines_node);
if pipelines.iter().any(|p| p.config_has_error()) {
component::health().set_unhealthy("Pipeline config has an error");
} else {
component::health().set_ok();
}
let stats_node = component::inspector().root().create_child("archive_accessor_stats");
let pipelines = pipelines
.into_iter()
.map(|pipeline| pipeline.serve(&mut fs, listen_sender.clone(), &stats_node))
.collect();
component::inspector().root().record(stats_node);
let archivist_state = Arc::new(ArchivistState::new(
pipelines,
diagnostics_repo,
logs_budget,
log_sender.clone(),
)?);
Ok(Self {
fs,
archivist_state,
log_receiver,
log_sender,
listen_sender,
event_router: EventRouter::new(component::inspector().root().create_child("events")),
stop_recv: None,
_lifecycle_task: None,
_drain_klog_task: None,
drain_listeners_task: fasync::Task::spawn(async move {
listen_receiver.for_each_concurrent(None, |rx| async move { rx.await }).await;
}),
incoming_external_event_producers: vec![],
})
}
pub fn data_repo(&self) -> &DataRepo {
&self.archivist_state.diagnostics_repo
}
pub fn log_sender(&self) -> &mpsc::UnboundedSender<Task<()>> {
&self.log_sender
}
/// Install controller protocol.
pub fn serve_test_controller_protocol(&mut self) -> &mut Self {
let (stop_sender, stop_recv) = mpsc::channel(0);
self.fs.dir("svc").add_fidl_service(move |stream| {
fasync::Task::spawn(component_lifecycle::serve_test_controller(
stream,
stop_sender.clone(),
))
.detach()
});
self.stop_recv = Some(stop_recv);
debug!("Controller services initialized.");
self
}
/// Listen for v2 lifecycle requests.
pub fn serve_lifecycle_protocol(&mut self) -> &mut Self {
let (task, stop_recv) = component_lifecycle::serve_v2();
self._lifecycle_task = Some(task);
self.stop_recv = Some(stop_recv);
debug!("Lifecycle listener initialized.");
self
}
/// Installs `LogSink` and `Log` services. Panics if called twice.
pub async fn install_log_services(&mut self) -> &mut Self {
let data_repo_1 = self.data_repo().clone();
let data_repo_2 = self.data_repo().clone();
let listen_sender = self.listen_sender.clone();
let mut unattributed_log_sink_source = UnattributedLogSinkSource::default();
let unattributed_sender = unattributed_log_sink_source.publisher();
self.event_router.add_producer(ProducerConfig {
producer: &mut unattributed_log_sink_source,
producer_type: ProducerType::External,
events: vec![],
singleton_events: vec![SingletonEventType::LogSinkRequested],
});
self.incoming_external_event_producers.push(fasync::Task::spawn(async move {
unattributed_log_sink_source.spawn().await.unwrap_or_else(|err| {
error!(?err, "Failed to run unattributed log sink producer loop");
});
}));
self.fs
.dir("svc")
.add_fidl_service(move |stream| {
debug!("fuchsia.logger.Log connection");
data_repo_1.clone().handle_log(stream, listen_sender.clone());
})
.add_fidl_service(move |stream| {
debug!("fuchsia.diagnostics.LogSettings connection");
let data_repo_for_task = data_repo_2.clone();
fasync::Task::spawn(async move {
data_repo_for_task.handle_log_settings(stream).await.unwrap_or_else(|err| {
error!(?err, "Failed to handle LogSettings");
});
})
.detach();
})
.add_fidl_service(move |stream| {
debug!("unattributed fuchsia.logger.LogSink connection");
let mut sender = unattributed_sender.clone();
// TODO(fxbug.dev/67769): get rid of this Task spawn since it introduces a small
// window in which we might lose LogSinks.
fasync::Task::spawn(async move {
sender.send(stream).await.unwrap_or_else(|err| {
error!(?err, "Failed to add unattributed LogSink connection")
})
})
.detach();
});
debug!("Log services initialized.");
self
}
pub async fn install_event_source(&mut self) {
debug!("fuchsia.sys.EventStream connection");
match EventSource::new(connect_to_protocol::<EventSourceMarker>().unwrap()).await {
Err(err) => error!(?err, "Failed to create event source"),
Ok(mut event_source) => {
self.event_router.add_producer(ProducerConfig {
producer: &mut event_source,
producer_type: ProducerType::External,
events: vec![EventType::ComponentStarted, EventType::ComponentStopped],
singleton_events: vec![
SingletonEventType::LogSinkRequested,
SingletonEventType::DiagnosticsReady,
],
});
self.incoming_external_event_producers.push(fasync::Task::spawn(async move {
event_source.spawn().await;
}));
}
}
}
pub fn install_component_event_provider(&mut self) {
let proxy = match connect_to_protocol::<fsys_internal::ComponentEventProviderMarker>() {
Ok(proxy) => proxy,
Err(err) => {
error!(?err, "Failed to connect to fuchsia.sys.internal.ComponentEventProvider");
return;
}
};
let mut component_event_provider = ComponentEventProvider::new(proxy);
self.event_router.add_producer(ProducerConfig {
producer: &mut component_event_provider,
producer_type: ProducerType::External,
events: vec![EventType::ComponentStarted, EventType::ComponentStopped],
singleton_events: vec![SingletonEventType::DiagnosticsReady],
});
self.incoming_external_event_producers.push(fasync::Task::spawn(async move {
component_event_provider.spawn().await.unwrap_or_else(|err| {
error!(?err, "Failed to run component event provider loop");
});
}));
}
pub fn install_log_connector(&mut self) {
let proxy = match connect_to_protocol::<fsys_internal::LogConnectorMarker>() {
Ok(proxy) => proxy,
Err(err) => {
error!(?err, "Failed to connect to fuchsia.sys.internal.LogConnector");
return;
}
};
let mut connector = LogConnector::new(proxy);
self.event_router.add_producer(ProducerConfig {
producer: &mut connector,
producer_type: ProducerType::External,
events: vec![],
singleton_events: vec![SingletonEventType::LogSinkRequested],
});
self.incoming_external_event_producers.push(fasync::Task::spawn(async move {
connector.spawn().await.unwrap_or_else(|err| {
error!(?err, "Failed to run event source producer loop");
});
}));
}
/// Spawns a task that will drain klog as another log source.
pub async fn start_draining_klog(&mut self) -> Result<(), Error> {
let debuglog = KernelDebugLog::new().await?;
self._drain_klog_task =
Some(fasync::Task::spawn(self.data_repo().clone().drain_debuglog(debuglog)));
Ok(())
}
/// Run archivist to completion.
/// # Arguments:
/// * `outgoing_channel`- channel to serve outgoing directory on.
pub async fn run(mut self, outgoing_channel: zx::Channel) -> Result<(), Error> {
debug!("Running Archivist.");
let data_repo = { self.data_repo().clone() };
self.fs.serve_connection(outgoing_channel).map_err(Error::ServeOutgoing)?;
// Start servicing all outgoing services.
let run_outgoing = self.fs.collect::<()>();
let (snd, rcv) = mpsc::unbounded::<Arc<ComponentIdentity>>();
self.archivist_state.logs_budget.set_remover(snd);
let component_removal_task = fasync::Task::spawn(Self::process_removal_of_components(
rcv,
data_repo.clone(),
self.archivist_state.diagnostics_pipelines.clone(),
));
let logs_budget = self.archivist_state.logs_budget.handle();
let mut event_router = self.event_router;
let archivist_state = self.archivist_state;
let archivist_state_log_sender = archivist_state.log_sender.clone();
event_router.add_consumer(ConsumerConfig {
consumer: &archivist_state,
events: vec![EventType::ComponentStarted, EventType::ComponentStopped],
singleton_events: vec![
SingletonEventType::DiagnosticsReady,
SingletonEventType::LogSinkRequested,
],
});
// panic: can only panic if we didn't register event producers and consumers correctly.
let (terminate_handle, drain_events_fut) =
event_router.start().expect("Failed to start event router");
let _event_routing_task = fasync::Task::spawn(async move {
drain_events_fut.await;
});
// Process messages from log sink.
let log_receiver = self.log_receiver;
let drain_listeners_task = self.drain_listeners_task;
let all_msg = async {
log_receiver.for_each_concurrent(None, |rx| async move { rx.await }).await;
debug!("Log ingestion stopped.");
data_repo.terminate_logs();
logs_budget.terminate();
debug!("Flushing to listeners.");
drain_listeners_task.await;
debug!("Log listeners and batch iterators stopped.");
component_removal_task.cancel().await;
debug!("Not processing more component removal requests.");
};
let (abortable_fut, abort_handle) = abortable(run_outgoing);
let mut listen_sender = self.listen_sender;
let mut log_sender = self.log_sender;
let incoming_external_event_producers = self.incoming_external_event_producers;
let stop_fut = match self.stop_recv {
Some(stop_recv) => async move {
stop_recv.into_future().await;
terminate_handle.terminate().await;
for task in incoming_external_event_producers {
task.cancel().await;
}
listen_sender.disconnect();
log_sender.disconnect();
archivist_state_log_sender.write().disconnect();
abort_handle.abort()
}
.left_future(),
None => future::ready(()).right_future(),
};
info!("archivist: Entering core loop.");
// Combine all three futures into a main future.
future::join3(abortable_fut, stop_fut, all_msg).map(|_| Ok(())).await
}
async fn process_removal_of_components(
mut removal_requests: mpsc::UnboundedReceiver<Arc<ComponentIdentity>>,
diagnostics_repo: DataRepo,
diagnostics_pipelines: Arc<Vec<Arc<RwLock<Pipeline>>>>,
) {
while let Some(identity) = removal_requests.next().await {
maybe_remove_component(&identity, &diagnostics_repo, &diagnostics_pipelines);
}
}
}
fn maybe_remove_component(
identity: &ComponentIdentity,
diagnostics_repo: &DataRepo,
diagnostics_pipelines: &[Arc<RwLock<Pipeline>>],
) {
if !diagnostics_repo.is_live(identity) {
debug!(%identity, "Removing component from repository.");
diagnostics_repo.write().data_directories.remove(&*identity.unique_key());
// TODO(fxbug.dev/55736): The pipeline specific updates should be happening asynchronously.
for pipeline in diagnostics_pipelines {
pipeline.write().remove(&identity.relative_moniker);
}
}
}
/// Archivist owns the tools needed to persist data
/// to the archive, as well as the service-specific repositories
/// that are populated by the archivist server and exposed in the
/// service sessions.
pub struct ArchivistState {
diagnostics_pipelines: Arc<Vec<Arc<RwLock<Pipeline>>>>,
pub diagnostics_repo: DataRepo,
/// The overall capacity we enforce for log messages across containers.
logs_budget: BudgetManager,
log_sender: Arc<RwLock<mpsc::UnboundedSender<Task<()>>>>,
}
impl ArchivistState {
pub fn new(
diagnostics_pipelines: Vec<Arc<RwLock<Pipeline>>>,
diagnostics_repo: DataRepo,
logs_budget: BudgetManager,
log_sender: mpsc::UnboundedSender<Task<()>>,
) -> Result<Self, Error> {
Ok(Self {
diagnostics_pipelines: Arc::new(diagnostics_pipelines),
diagnostics_repo,
logs_budget,
log_sender: Arc::new(RwLock::new(log_sender)),
})
}
fn handle_diagnostics_ready(
&self,
component: ComponentIdentity,
directory: Option<fio::DirectoryProxy>,
timestamp: zx::Time,
) {
debug!(identity = %component, "Diagnostics directory is ready.");
if let Some(directory) = directory {
// Update the central repository to reference the new diagnostics source.
self.diagnostics_repo
.write()
.add_inspect_artifacts(component.clone(), directory, timestamp)
.unwrap_or_else(|err| {
warn!(identity = %component, ?err, "Failed to add inspect artifacts to repository");
});
// Let each pipeline know that a new component arrived, and allow the pipeline
// to eagerly bucket static selectors based on that component's moniker.
// TODO(fxbug.dev/55736): The pipeline specific updates should be happening
// asynchronously.
for pipeline in self.diagnostics_pipelines.iter() {
pipeline.write().add_inspect_artifacts(&component.relative_moniker).unwrap_or_else(
|e| {
warn!(identity = %component, ?e,
"Failed to add inspect artifacts to pipeline wrapper");
},
);
}
}
}
/// Updates the central repository to reference the new diagnostics source.
fn handle_started(&self, component: ComponentIdentity, timestamp: zx::Time) {
debug!(identity = %component, "Adding new component.");
if let Err(e) =
self.diagnostics_repo.write().add_new_component(component.clone(), timestamp)
{
error!(identity = %component, ?e, "Failed to add new component to repository");
}
}
fn handle_stopped(&self, component: ComponentIdentity) {
debug!(identity = %component, "Component stopped");
self.diagnostics_repo.write().mark_stopped(&component.unique_key());
maybe_remove_component(&component, &self.diagnostics_repo, &self.diagnostics_pipelines);
}
fn handle_log_sink_requested(
&self,
component: ComponentIdentity,
request_stream: Option<flogger::LogSinkRequestStream>,
) {
debug!(identity = %component, "LogSink requested.");
if let Some(request_stream) = request_stream {
let container = self.diagnostics_repo.write().get_log_container(component);
container.handle_log_sink(request_stream, self.log_sender.read().clone());
}
}
}
#[async_trait]
impl EventConsumer for ArchivistState {
async fn handle(self: Arc<Self>, event: Event) {
match event.payload {
EventPayload::ComponentStarted(ComponentStartedPayload { component }) => {
self.handle_started(component, event.timestamp);
}
EventPayload::ComponentStopped(ComponentStoppedPayload { component }) => {
self.handle_stopped(component);
}
EventPayload::DiagnosticsReady(DiagnosticsReadyPayload { component, directory }) => {
self.handle_diagnostics_ready(component, directory, event.timestamp);
}
EventPayload::LogSinkRequested(LogSinkRequestedPayload {
component,
request_stream,
}) => {
self.handle_log_sink_requested(component, request_stream);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
constants::*,
events::router::{Dispatcher, EventProducer},
logs::testing::*,
};
use fidl::endpoints::create_proxy;
use fidl_fuchsia_diagnostics_test::ControllerMarker;
use fidl_fuchsia_io as fio;
use fuchsia_async as fasync;
use fuchsia_component::client::connect_to_protocol_at_dir_svc;
use futures::channel::oneshot;
fn init_archivist() -> Archivist {
let config = Config {
enable_component_event_provider: false,
enable_klog: false,
enable_event_source: false,
enable_log_connector: false,
install_controller: false,
listen_to_lifecycle: false,
log_to_debuglog: false,
logs_max_cached_original_bytes: LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES as u64,
num_threads: 1,
pipelines_path: DEFAULT_PIPELINES_PATH.into(),
bind_services: vec![],
};
let mut archivist = Archivist::new(&config).unwrap();
// Install a fake producer that allows all incoming events. This allows skipping
// validation for the purposes of the tests here.
let mut fake_producer = FakeProducer {};
archivist.event_router.add_producer(ProducerConfig {
producer: &mut fake_producer,
producer_type: ProducerType::External,
events: vec![EventType::ComponentStarted, EventType::ComponentStopped],
singleton_events: vec![
SingletonEventType::LogSinkRequested,
SingletonEventType::DiagnosticsReady,
],
});
archivist
}
struct FakeProducer {}
impl EventProducer for FakeProducer {
fn set_dispatcher(&mut self, _dispatcher: Dispatcher) {}
}
// run archivist and send signal when it dies.
async fn run_archivist_and_signal_on_exit() -> (fio::DirectoryProxy, oneshot::Receiver<()>) {
let (directory, server_end) = create_proxy::<fio::DirectoryMarker>().unwrap();
let mut archivist = init_archivist();
archivist.install_log_services().await.serve_test_controller_protocol();
let (signal_send, signal_recv) = oneshot::channel();
fasync::Task::spawn(async move {
archivist.run(server_end.into_channel()).await.expect("Cannot run archivist");
signal_send.send(()).unwrap();
})
.detach();
(directory, signal_recv)
}
// runs archivist and returns its directory.
async fn run_archivist() -> fio::DirectoryProxy {
let (directory, server_end) = create_proxy::<fio::DirectoryMarker>().unwrap();
let mut archivist = init_archivist();
archivist.install_log_services().await;
fasync::Task::spawn(async move {
archivist.run(server_end.into_channel()).await.expect("Cannot run archivist");
})
.detach();
directory
}
#[fuchsia::test]
async fn can_log_and_retrive_log() {
let directory = run_archivist().await;
let mut recv_logs = start_listener(&directory);
let mut log_helper = LogSinkHelper::new(&directory);
log_helper.write_log("my msg1");
log_helper.write_log("my msg2");
assert_eq!(
vec! {Some("my msg1".to_owned()),Some("my msg2".to_owned())},
vec! {recv_logs.next().await,recv_logs.next().await}
);
// new client can log
let mut log_helper2 = LogSinkHelper::new(&directory);
log_helper2.write_log("my msg1");
log_helper.write_log("my msg2");
let mut expected = vec!["my msg1".to_owned(), "my msg2".to_owned()];
expected.sort();
let mut actual = vec![recv_logs.next().await.unwrap(), recv_logs.next().await.unwrap()];
actual.sort();
assert_eq!(expected, actual);
// can log after killing log sink proxy
log_helper.kill_log_sink();
log_helper.write_log("my msg1");
log_helper.write_log("my msg2");
assert_eq!(
expected,
vec! {recv_logs.next().await.unwrap(),recv_logs.next().await.unwrap()}
);
// can log from new socket cnonnection
log_helper2.add_new_connection();
log_helper2.write_log("my msg1");
log_helper2.write_log("my msg2");
assert_eq!(
expected,
vec! {recv_logs.next().await.unwrap(),recv_logs.next().await.unwrap()}
);
}
/// Makes sure that implementaion can handle multiple sockets from same
/// log sink.
#[fuchsia::test]
async fn log_from_multiple_sock() {
let directory = run_archivist().await;
let mut recv_logs = start_listener(&directory);
let log_helper = LogSinkHelper::new(&directory);
let sock1 = log_helper.connect();
let sock2 = log_helper.connect();
let sock3 = log_helper.connect();
LogSinkHelper::write_log_at(&sock1, "msg sock1-1");
LogSinkHelper::write_log_at(&sock2, "msg sock2-1");
LogSinkHelper::write_log_at(&sock1, "msg sock1-2");
LogSinkHelper::write_log_at(&sock3, "msg sock3-1");
LogSinkHelper::write_log_at(&sock2, "msg sock2-2");
let mut expected = vec![
"msg sock1-1".to_owned(),
"msg sock1-2".to_owned(),
"msg sock2-1".to_owned(),
"msg sock2-2".to_owned(),
"msg sock3-1".to_owned(),
];
expected.sort();
let mut actual = vec![
recv_logs.next().await.unwrap(),
recv_logs.next().await.unwrap(),
recv_logs.next().await.unwrap(),
recv_logs.next().await.unwrap(),
recv_logs.next().await.unwrap(),
];
actual.sort();
assert_eq!(expected, actual);
}
/// Stop API works
#[fuchsia::test]
async fn stop_works() {
let (directory, signal_recv) = run_archivist_and_signal_on_exit().await;
let mut recv_logs = start_listener(&directory);
{
// make sure we can write logs
let log_sink_helper = LogSinkHelper::new(&directory);
let sock1 = log_sink_helper.connect();
LogSinkHelper::write_log_at(&sock1, "msg sock1-1");
log_sink_helper.write_log("msg sock1-2");
let mut expected = vec!["msg sock1-1".to_owned(), "msg sock1-2".to_owned()];
expected.sort();
let mut actual = vec![recv_logs.next().await.unwrap(), recv_logs.next().await.unwrap()];
actual.sort();
assert_eq!(expected, actual);
// Start new connections and sockets
let log_sink_helper1 = LogSinkHelper::new(&directory);
let sock2 = log_sink_helper.connect();
// Write logs before calling stop
log_sink_helper1.write_log("msg 1");
log_sink_helper1.write_log("msg 2");
let log_sink_helper2 = LogSinkHelper::new(&directory);
let controller = connect_to_protocol_at_dir_svc::<ControllerMarker>(&directory)
.expect("cannot connect to log proxy");
controller.stop().unwrap();
// make more socket connections and write to them and old ones.
let sock3 = log_sink_helper2.connect();
log_sink_helper2.write_log("msg 3");
log_sink_helper2.write_log("msg 4");
LogSinkHelper::write_log_at(&sock3, "msg 5");
LogSinkHelper::write_log_at(&sock2, "msg 6");
log_sink_helper.write_log("msg 7");
LogSinkHelper::write_log_at(&sock1, "msg 8");
LogSinkHelper::write_log_at(&sock2, "msg 9");
} // kills all sockets and log_sink connections
let mut expected = vec![];
let mut actual = vec![];
for i in 1..=9 {
expected.push(format!("msg {}", i));
actual.push(recv_logs.next().await.unwrap());
}
expected.sort();
actual.sort();
// make sure archivist is dead.
signal_recv.await.unwrap();
assert_eq!(expected, actual);
}
}