| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::{ |
| accessor::{ArchiveAccessorServer, BatchRetrievalTimeout}, |
| component_lifecycle, |
| error::Error, |
| events::{ |
| router::{ConsumerConfig, EventRouter, ProducerConfig}, |
| sources::EventSource, |
| types::*, |
| }, |
| identity::ComponentIdentity, |
| inspect::{repository::InspectRepository, servers::*}, |
| logs::{ |
| repository::LogsRepository, |
| serial::{SerialConfig, SerialSink}, |
| servers::*, |
| KernelDebugLog, |
| }, |
| pipeline::Pipeline, |
| }; |
| use archivist_config::Config; |
| use fidl_fuchsia_diagnostics::ArchiveAccessorRequestStream; |
| use fidl_fuchsia_diagnostics_host as fhost; |
| use fidl_fuchsia_process_lifecycle::LifecycleRequestStream; |
| use fuchsia_async as fasync; |
| use fuchsia_component::server::{ServiceFs, ServiceObj}; |
| use fuchsia_inspect::{component, health::Reporter}; |
| use fuchsia_zircon as zx; |
| use futures::{channel::oneshot, future::abortable, prelude::*}; |
| use moniker::ExtendedMoniker; |
| use std::{path::Path, sync::Arc}; |
| use tracing::{debug, error, info, warn}; |
| |
| /// Responsible for initializing an `Archivist` instance. Supports multiple configurations by |
| /// either calling or not calling methods on the builder like `serve_test_controller_protocol`. |
| pub struct Archivist { |
| /// Handles event routing between archivist parts. |
| event_router: EventRouter, |
| |
| /// Receive stop signal to kill this archivist. |
| stop_recv: Option<oneshot::Receiver<()>>, |
| |
| /// Listens for lifecycle requests, to handle Stop requests. |
| lifecycle_task: Option<fasync::Task<()>>, |
| |
| /// Tasks that drains klog. |
| _drain_klog_task: Option<fasync::Task<()>>, |
| |
| /// Task writing logs to serial. |
| _serial_task: Option<fasync::Task<()>>, |
| |
| /// Tasks receiving external events from component manager. |
| incoming_external_event_producers: Vec<fasync::Task<()>>, |
| |
| /// The diagnostics pipelines that have been installed. |
| pipelines: Vec<Arc<Pipeline>>, |
| |
| /// The repository holding Inspect data. |
| _inspect_repository: Arc<InspectRepository>, |
| |
| /// The repository holding active log connections. |
| logs_repository: Arc<LogsRepository>, |
| |
| /// The server handling fuchsia.diagnostics.ArchiveAccessor |
| accessor_server: Arc<ArchiveAccessorServer>, |
| |
| /// The server handling fuchsia.logger.Log |
| log_server: Arc<LogServer>, |
| |
| /// The server handling fuchsia.inspect.InspectSink |
| inspect_sink_server: Arc<InspectSinkServer>, |
| |
| /// The server handling fuchsia.diagnostics.LogSettings |
| log_settings_server: Arc<LogSettingsServer>, |
| } |
| |
| impl Archivist { |
| /// Creates new instance, sets up inspect and adds 'archive' directory to output folder. |
| /// Also installs `fuchsia.diagnostics.Archive` service. |
| /// Call `install_log_services` |
| pub async fn new(config: Config) -> Self { |
| // Initialize the pipelines that the archivist will expose. |
| let pipelines = Self::init_pipelines(&config); |
| |
| // Initialize the core event router |
| let mut event_router = |
| EventRouter::new(component::inspector().root().create_child("events")); |
| let incoming_external_event_producers = |
| Self::initialize_external_event_sources(&mut event_router).await; |
| |
| let logs_repo = LogsRepository::new( |
| config.logs_max_cached_original_bytes, |
| component::inspector().root(), |
| ); |
| let serial_task = if !config.allow_serial_logs.is_empty() { |
| Some(fasync::Task::spawn( |
| SerialConfig::new(config.allow_serial_logs, config.deny_serial_log_tags) |
| .write_logs(Arc::clone(&logs_repo), SerialSink), |
| )) |
| } else { |
| None |
| }; |
| let inspect_repo = |
| Arc::new(InspectRepository::new(pipelines.iter().map(Arc::downgrade).collect())); |
| |
| let inspect_sink_server = Arc::new(InspectSinkServer::new(Arc::clone(&inspect_repo))); |
| |
| // Initialize our FIDL servers. This doesn't start serving yet. |
| let accessor_server = Arc::new(ArchiveAccessorServer::new( |
| Arc::clone(&inspect_repo), |
| Arc::clone(&logs_repo), |
| config.maximum_concurrent_snapshots_per_reader, |
| BatchRetrievalTimeout::from_seconds(config.per_component_batch_timeout_seconds), |
| )); |
| |
| let log_server = Arc::new(LogServer::new(Arc::clone(&logs_repo))); |
| let log_settings_server = Arc::new(LogSettingsServer::new(Arc::clone(&logs_repo))); |
| |
| // Initialize the external event providers containing incoming diagnostics directories and |
| // log sink connections. |
| event_router.add_consumer(ConsumerConfig { |
| consumer: &logs_repo, |
| events: vec![EventType::LogSinkRequested], |
| }); |
| event_router.add_consumer(ConsumerConfig { |
| consumer: &inspect_sink_server, |
| events: vec![EventType::InspectSinkRequested], |
| }); |
| |
| // Drain klog and publish it to syslog. |
| if config.enable_klog { |
| match KernelDebugLog::new().await { |
| Ok(klog) => logs_repo.drain_debuglog(klog), |
| Err(err) => warn!( |
| ?err, |
| "Failed to start the kernel debug log reader. Klog won't be in syslog" |
| ), |
| }; |
| } |
| |
| // Start related services that should start once the Archivist has started. |
| for name in &config.bind_services { |
| info!("Connecting to service {}", name); |
| let (_local, remote) = zx::Channel::create(); |
| if let Err(e) = fdio::service_connect(&format!("/svc/{name}"), remote) { |
| error!("Couldn't connect to service {}: {:?}", name, e); |
| } |
| } |
| |
| // TODO(https://fxbug.dev/324494668): remove this when Netstack2 is gone. |
| if let Ok(dir) = fuchsia_fs::directory::open_in_namespace( |
| "/netstack-diagnostics", |
| fuchsia_fs::OpenFlags::RIGHT_READABLE, |
| ) { |
| inspect_repo.add_inspect_handle( |
| Arc::new(ComponentIdentity::new( |
| ExtendedMoniker::parse_str("core/network/netstack").unwrap(), |
| "fuchsia-pkg://fuchsia.com/netstack#meta/netstack2.cm", |
| )), |
| dir, |
| ); |
| } |
| |
| Self { |
| accessor_server, |
| log_server, |
| inspect_sink_server, |
| log_settings_server, |
| event_router, |
| _serial_task: serial_task, |
| stop_recv: None, |
| lifecycle_task: None, |
| _drain_klog_task: None, |
| incoming_external_event_producers, |
| pipelines, |
| _inspect_repository: inspect_repo, |
| logs_repository: logs_repo, |
| } |
| } |
| |
| /// Sets the request stream from which Lifecycle/Stop requests will come instructing the |
| /// Archivist to stop ingesting new data and drain current data to clients. |
| pub fn set_lifecycle_request_stream(&mut self, request_stream: LifecycleRequestStream) { |
| debug!("Lifecycle listener initialized."); |
| let (t, r) = component_lifecycle::serve(request_stream); |
| self.lifecycle_task = Some(t); |
| self.stop_recv = Some(r); |
| } |
| |
| fn init_pipelines(config: &Config) -> Vec<Arc<Pipeline>> { |
| let pipelines_node = component::inspector().root().create_child("pipelines"); |
| let accessor_stats_node = |
| component::inspector().root().create_child("archive_accessor_stats"); |
| let pipelines_path = Path::new(&config.pipelines_path); |
| let pipelines = [ |
| Pipeline::feedback(pipelines_path, &pipelines_node, &accessor_stats_node), |
| Pipeline::legacy_metrics(pipelines_path, &pipelines_node, &accessor_stats_node), |
| Pipeline::lowpan(pipelines_path, &pipelines_node, &accessor_stats_node), |
| Pipeline::all_access(pipelines_path, &pipelines_node, &accessor_stats_node), |
| ]; |
| |
| if pipelines.iter().any(|p| p.config_has_error()) { |
| component::health().set_unhealthy("Pipeline config has an error"); |
| } else { |
| component::health().set_ok(); |
| } |
| let pipelines = pipelines.into_iter().map(Arc::new).collect::<Vec<_>>(); |
| |
| component::inspector().root().record(pipelines_node); |
| component::inspector().root().record(accessor_stats_node); |
| |
| pipelines |
| } |
| |
| pub async fn initialize_external_event_sources( |
| event_router: &mut EventRouter, |
| ) -> Vec<fasync::Task<()>> { |
| let mut incoming_external_event_producers = vec![]; |
| match EventSource::new("/events/log_sink_requested_event_stream").await { |
| Err(err) => warn!(?err, "Failed to create event source for log sink requests"), |
| Ok(mut event_source) => { |
| event_router.add_producer(ProducerConfig { |
| producer: &mut event_source, |
| events: vec![EventType::LogSinkRequested], |
| }); |
| incoming_external_event_producers.push(fasync::Task::spawn(async move { |
| // This should never exit. |
| let _ = event_source.spawn().await; |
| })); |
| } |
| } |
| |
| match EventSource::new("/events/inspect_sink_requested_event_stream").await { |
| Err(err) => { |
| warn!(?err, "Failed to create event source for InspectSink requests") |
| } |
| Ok(mut event_source) => { |
| event_router.add_producer(ProducerConfig { |
| producer: &mut event_source, |
| events: vec![EventType::InspectSinkRequested], |
| }); |
| incoming_external_event_producers.push(fasync::Task::spawn(async move { |
| // This should never exit. |
| let _ = event_source.spawn().await; |
| })); |
| } |
| } |
| |
| incoming_external_event_producers |
| } |
| |
| fn add_host_before_last_dot(input: &str) -> String { |
| let (rest, last) = input.rsplit_once('.').unwrap(); |
| format!("{}.host.{}", rest, last) |
| } |
| |
| /// Run archivist to completion. |
| /// # Arguments: |
| /// * `outgoing_channel`- channel to serve outgoing directory on. |
| pub async fn run( |
| mut self, |
| mut fs: ServiceFs<ServiceObj<'static, ()>>, |
| is_embedded: bool, |
| ) -> Result<(), Error> { |
| debug!("Running Archivist."); |
| |
| // Start servicing all outgoing services. |
| self.serve_protocols(&mut fs); |
| let run_outgoing = fs.collect::<()>(); |
| let _inspect_server_task = inspect_runtime::publish( |
| component::inspector(), |
| inspect_runtime::PublishOptions::default(), |
| ); |
| |
| // Start ingesting events. |
| let (terminate_handle, drain_events_fut) = self |
| .event_router |
| .start() |
| // panic: can only panic if we didn't register event producers and consumers correctly. |
| .expect("Failed to start event router"); |
| let _event_routing_task = fasync::Task::spawn(async move { |
| drain_events_fut.await; |
| }); |
| |
| let accessor_server = Arc::clone(&self.accessor_server); |
| let log_server = Arc::clone(&self.log_server); |
| let logs_repo = Arc::clone(&self.logs_repository); |
| let inspect_sink_server = Arc::clone(&self.inspect_sink_server); |
| let all_msg = async { |
| logs_repo.wait_for_termination().await; |
| debug!("Flushing to listeners."); |
| accessor_server.wait_for_servers_to_complete().await; |
| log_server.wait_for_servers_to_complete().await; |
| debug!("Log listeners and batch iterators stopped."); |
| inspect_sink_server.wait_for_servers_to_complete().await; |
| }; |
| |
| let (abortable_fut, abort_handle) = abortable(run_outgoing); |
| |
| let log_server = self.log_server; |
| let inspect_sink_server = self.inspect_sink_server; |
| let accessor_server = self.accessor_server; |
| let incoming_external_event_producers = self.incoming_external_event_producers; |
| let logs_repo = Arc::clone(&self.logs_repository); |
| let stop_fut = match self.stop_recv { |
| Some(stop_recv) => async move { |
| stop_recv.into_future().await.ok(); |
| terminate_handle.terminate().await; |
| std::mem::drop(incoming_external_event_producers); |
| inspect_sink_server.stop(); |
| log_server.stop(); |
| accessor_server.stop(); |
| logs_repo.stop_accepting_new_log_sinks(); |
| abort_handle.abort() |
| } |
| .left_future(), |
| None => future::ready(()).right_future(), |
| }; |
| |
| // Ensure logs repo remains alive since it holds BudgetManager which |
| // should remain alive. |
| let _logs_repo = self.logs_repository; |
| |
| if is_embedded { |
| debug!("Entering core loop."); |
| } else { |
| info!("archivist: Entering core loop."); |
| } |
| |
| // Combine all three futures into a main future. |
| future::join3(abortable_fut, stop_fut, all_msg).map(|_| Ok(())).await |
| } |
| |
| fn serve_protocols(&mut self, fs: &mut ServiceFs<ServiceObj<'static, ()>>) { |
| component::serve_inspect_stats(); |
| |
| let mut svc_dir = fs.dir("svc"); |
| |
| // Serve fuchsia.diagnostics.ArchiveAccessors backed by a pipeline. |
| for pipeline in &self.pipelines { |
| let host_accessor_server = Arc::clone(&self.accessor_server); |
| let accessor_server = Arc::clone(&self.accessor_server); |
| let accessor_pipeline = Arc::clone(pipeline); |
| svc_dir.add_fidl_service_at( |
| pipeline.protocol_name(), |
| move |stream: ArchiveAccessorRequestStream| { |
| accessor_server.spawn_server(Arc::clone(&accessor_pipeline), stream); |
| }, |
| ); |
| let accessor_pipeline = Arc::clone(pipeline); |
| // TODO(https://fxbug.dev/42077091): Add Inspect support |
| let accessor = Self::add_host_before_last_dot(accessor_pipeline.protocol_name()); |
| svc_dir.add_fidl_service_at( |
| accessor, |
| move |stream: fhost::ArchiveAccessorRequestStream| { |
| host_accessor_server.spawn_server(Arc::clone(&accessor_pipeline), stream); |
| }, |
| ); |
| } |
| |
| // Server fuchsia.logger.Log |
| let log_server = Arc::clone(&self.log_server); |
| svc_dir.add_fidl_service(move |stream| { |
| debug!("fuchsia.logger.Log connection"); |
| log_server.spawn(stream); |
| }); |
| |
| // Server fuchsia.diagnostics.LogSettings |
| let log_settings_server = Arc::clone(&self.log_settings_server); |
| svc_dir.add_fidl_service(move |stream| { |
| debug!("fuchsia.diagnostics.LogSettings connection"); |
| log_settings_server.spawn(stream); |
| }); |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::{ |
| constants::*, |
| events::router::{Dispatcher, EventProducer}, |
| logs::testing::*, |
| }; |
| use diagnostics_data::LogsData; |
| use fidl::endpoints::create_proxy; |
| use fidl_fuchsia_diagnostics::{ |
| ClientSelectorConfiguration, DataType, Format, StreamParameters, |
| }; |
| use fidl_fuchsia_diagnostics_host as fhost; |
| use fidl_fuchsia_inspect::{InspectSinkMarker, InspectSinkRequestStream}; |
| use fidl_fuchsia_io as fio; |
| use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequestStream}; |
| use fidl_fuchsia_process_lifecycle::{LifecycleMarker, LifecycleProxy}; |
| use fuchsia_async as fasync; |
| use fuchsia_component::client::connect_to_protocol_at_dir_svc; |
| use std::marker::PhantomData; |
| |
| async fn init_archivist(fs: &mut ServiceFs<ServiceObj<'static, ()>>) -> Archivist { |
| let config = Config { |
| enable_klog: false, |
| log_to_debuglog: false, |
| maximum_concurrent_snapshots_per_reader: 4, |
| logs_max_cached_original_bytes: LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES, |
| num_threads: 1, |
| pipelines_path: DEFAULT_PIPELINES_PATH.into(), |
| bind_services: vec![], |
| allow_serial_logs: vec![], |
| deny_serial_log_tags: vec![], |
| per_component_batch_timeout_seconds: -1, |
| }; |
| |
| let mut archivist = Archivist::new(config).await; |
| |
| // Install a couple of iunattributed sources for the purposes of the test. |
| let mut source = UnattributedSource::<LogSinkMarker>::default(); |
| archivist.event_router.add_producer(ProducerConfig { |
| producer: &mut source, |
| events: vec![EventType::LogSinkRequested], |
| }); |
| fs.dir("svc").add_fidl_service(move |stream| { |
| source.new_connection(stream); |
| }); |
| |
| let mut source = UnattributedSource::<InspectSinkMarker>::default(); |
| archivist.event_router.add_producer(ProducerConfig { |
| producer: &mut source, |
| events: vec![EventType::InspectSinkRequested], |
| }); |
| fs.dir("svc").add_fidl_service(move |stream| { |
| source.new_connection(stream); |
| }); |
| |
| archivist |
| } |
| |
| pub struct UnattributedSource<P> { |
| dispatcher: Dispatcher, |
| _phantom: PhantomData<P>, |
| } |
| |
| impl<P> Default for UnattributedSource<P> { |
| fn default() -> Self { |
| Self { dispatcher: Dispatcher::default(), _phantom: PhantomData } |
| } |
| } |
| |
| impl UnattributedSource<LogSinkMarker> { |
| pub fn new_connection(&mut self, request_stream: LogSinkRequestStream) { |
| self.dispatcher |
| .emit(Event { |
| timestamp: zx::Time::get_monotonic(), |
| payload: EventPayload::LogSinkRequested(LogSinkRequestedPayload { |
| component: Arc::new(ComponentIdentity::unknown()), |
| request_stream, |
| }), |
| }) |
| .ok(); |
| } |
| } |
| |
| impl UnattributedSource<InspectSinkMarker> { |
| pub fn new_connection(&mut self, request_stream: InspectSinkRequestStream) { |
| self.dispatcher |
| .emit(Event { |
| timestamp: zx::Time::get_monotonic(), |
| payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload { |
| component: Arc::new(ComponentIdentity::unknown()), |
| request_stream, |
| }), |
| }) |
| .ok(); |
| } |
| } |
| |
| impl<P> EventProducer for UnattributedSource<P> { |
| fn set_dispatcher(&mut self, dispatcher: Dispatcher) { |
| self.dispatcher = dispatcher; |
| } |
| } |
| |
| // run archivist and send signal when it dies. |
| async fn run_archivist_and_signal_on_exit( |
| ) -> (fio::DirectoryProxy, LifecycleProxy, oneshot::Receiver<()>) { |
| let (directory, server_end) = create_proxy::<fio::DirectoryMarker>().unwrap(); |
| |
| let mut fs = ServiceFs::new(); |
| fs.serve_connection(server_end).unwrap(); |
| let mut archivist = init_archivist(&mut fs).await; |
| |
| let (lifecycle_proxy, request_stream) = |
| fidl::endpoints::create_proxy_and_stream::<LifecycleMarker>().unwrap(); |
| archivist.set_lifecycle_request_stream(request_stream); |
| let (signal_send, signal_recv) = oneshot::channel(); |
| fasync::Task::spawn(async move { |
| archivist.run(fs, false).await.expect("Cannot run archivist"); |
| signal_send.send(()).unwrap(); |
| }) |
| .detach(); |
| (directory, lifecycle_proxy, signal_recv) |
| } |
| |
| // runs archivist and returns its directory. |
| async fn run_archivist() -> fio::DirectoryProxy { |
| let (directory, server_end) = create_proxy::<fio::DirectoryMarker>().unwrap(); |
| let mut fs = ServiceFs::new(); |
| fs.serve_connection(server_end).unwrap(); |
| let archivist = init_archivist(&mut fs).await; |
| fasync::Task::spawn(async move { |
| archivist.run(fs, false).await.expect("Cannot run archivist"); |
| }) |
| .detach(); |
| directory |
| } |
| |
| #[fuchsia::test] |
| async fn can_log_and_retrive_log() { |
| let directory = run_archivist().await; |
| let mut recv_logs = start_listener(&directory); |
| |
| let mut log_helper = LogSinkHelper::new(&directory); |
| log_helper.write_log("my msg1"); |
| log_helper.write_log("my msg2"); |
| |
| assert_eq!( |
| vec! {Some("my msg1".to_owned()),Some("my msg2".to_owned())}, |
| vec! {recv_logs.next().await,recv_logs.next().await} |
| ); |
| |
| // new client can log |
| let mut log_helper2 = LogSinkHelper::new(&directory); |
| log_helper2.write_log("my msg1"); |
| log_helper.write_log("my msg2"); |
| |
| let mut expected = vec!["my msg1".to_owned(), "my msg2".to_owned()]; |
| expected.sort(); |
| |
| let mut actual = vec![recv_logs.next().await.unwrap(), recv_logs.next().await.unwrap()]; |
| actual.sort(); |
| |
| assert_eq!(expected, actual); |
| |
| // can log after killing log sink proxy |
| log_helper.kill_log_sink(); |
| log_helper.write_log("my msg1"); |
| log_helper.write_log("my msg2"); |
| |
| assert_eq!( |
| expected, |
| vec! {recv_logs.next().await.unwrap(),recv_logs.next().await.unwrap()} |
| ); |
| |
| // can log from new socket cnonnection |
| log_helper2.add_new_connection(); |
| log_helper2.write_log("my msg1"); |
| log_helper2.write_log("my msg2"); |
| |
| assert_eq!( |
| expected, |
| vec! {recv_logs.next().await.unwrap(),recv_logs.next().await.unwrap()} |
| ); |
| } |
| |
| #[fuchsia::test] |
| async fn remote_log_test() { |
| let directory = run_archivist().await; |
| let accessor = |
| connect_to_protocol_at_dir_svc::<fhost::ArchiveAccessorMarker>(&directory).unwrap(); |
| loop { |
| let (local, remote) = fuchsia_zircon::Socket::create_stream(); |
| let mut reader = fuchsia_async::Socket::from_socket(local); |
| accessor |
| .stream_diagnostics( |
| &StreamParameters { |
| data_type: Some(DataType::Logs), |
| stream_mode: Some(fidl_fuchsia_diagnostics::StreamMode::Snapshot), |
| format: Some(Format::Json), |
| client_selector_configuration: Some( |
| ClientSelectorConfiguration::SelectAll(true), |
| ), |
| ..Default::default() |
| }, |
| remote, |
| ) |
| .await |
| .unwrap(); |
| let log_helper = LogSinkHelper::new(&directory); |
| let log_writer = log_helper.connect(); |
| LogSinkHelper::write_log_at(&log_writer, "Test message"); |
| let mut data = vec![]; |
| reader.read_to_end(&mut data).await.unwrap(); |
| if data.is_empty() { |
| continue; |
| } |
| let logs = serde_json::from_slice::<Vec<LogsData>>(&data).unwrap(); |
| for log in logs { |
| if log.msg() == Some("Test message") { |
| return; |
| } |
| } |
| } |
| } |
| |
| /// Makes sure that implementation can handle multiple sockets from same |
| /// log sink. |
| #[fuchsia::test] |
| async fn log_from_multiple_sock() { |
| let directory = run_archivist().await; |
| let mut recv_logs = start_listener(&directory); |
| |
| let log_helper = LogSinkHelper::new(&directory); |
| let sock1 = log_helper.connect(); |
| let sock2 = log_helper.connect(); |
| let sock3 = log_helper.connect(); |
| |
| LogSinkHelper::write_log_at(&sock1, "msg sock1-1"); |
| LogSinkHelper::write_log_at(&sock2, "msg sock2-1"); |
| LogSinkHelper::write_log_at(&sock1, "msg sock1-2"); |
| LogSinkHelper::write_log_at(&sock3, "msg sock3-1"); |
| LogSinkHelper::write_log_at(&sock2, "msg sock2-2"); |
| |
| let mut expected = vec![ |
| "msg sock1-1".to_owned(), |
| "msg sock1-2".to_owned(), |
| "msg sock2-1".to_owned(), |
| "msg sock2-2".to_owned(), |
| "msg sock3-1".to_owned(), |
| ]; |
| expected.sort(); |
| |
| let mut actual = vec![ |
| recv_logs.next().await.unwrap(), |
| recv_logs.next().await.unwrap(), |
| recv_logs.next().await.unwrap(), |
| recv_logs.next().await.unwrap(), |
| recv_logs.next().await.unwrap(), |
| ]; |
| actual.sort(); |
| |
| assert_eq!(expected, actual); |
| } |
| |
| /// Stop API works |
| #[fuchsia::test] |
| async fn stop_works() { |
| let (directory, lifecycle_proxy, signal_recv) = run_archivist_and_signal_on_exit().await; |
| let mut recv_logs = start_listener(&directory); |
| |
| { |
| // make sure we can write logs |
| let log_sink_helper = LogSinkHelper::new(&directory); |
| let sock1 = log_sink_helper.connect(); |
| LogSinkHelper::write_log_at(&sock1, "msg sock1-1"); |
| log_sink_helper.write_log("msg sock1-2"); |
| let mut expected = vec!["msg sock1-1".to_owned(), "msg sock1-2".to_owned()]; |
| expected.sort(); |
| let mut actual = vec![recv_logs.next().await.unwrap(), recv_logs.next().await.unwrap()]; |
| actual.sort(); |
| assert_eq!(expected, actual); |
| |
| // Start new connections and sockets |
| let log_sink_helper1 = LogSinkHelper::new(&directory); |
| let sock2 = log_sink_helper.connect(); |
| // Write logs before calling stop |
| log_sink_helper1.write_log("msg 1"); |
| log_sink_helper1.write_log("msg 2"); |
| let log_sink_helper2 = LogSinkHelper::new(&directory); |
| |
| lifecycle_proxy.stop().unwrap(); |
| |
| // make more socket connections and write to them and old ones. |
| let sock3 = log_sink_helper2.connect(); |
| log_sink_helper2.write_log("msg 3"); |
| log_sink_helper2.write_log("msg 4"); |
| |
| LogSinkHelper::write_log_at(&sock3, "msg 5"); |
| LogSinkHelper::write_log_at(&sock2, "msg 6"); |
| log_sink_helper.write_log("msg 7"); |
| LogSinkHelper::write_log_at(&sock1, "msg 8"); |
| |
| LogSinkHelper::write_log_at(&sock2, "msg 9"); |
| } // kills all sockets and log_sink connections |
| let mut expected = vec![]; |
| let mut actual = vec![]; |
| for i in 1..=9 { |
| expected.push(format!("msg {i}")); |
| actual.push(recv_logs.next().await.unwrap()); |
| } |
| expected.sort(); |
| actual.sort(); |
| |
| // make sure archivist is dead. |
| signal_recv.await.unwrap(); |
| |
| assert_eq!(expected, actual); |
| } |
| } |