| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::{constants::*, test_topology, utils}; |
| use anyhow::Error; |
| use archivist_lib::{ |
| configs::parse_config, |
| logs::message::{fx_log_packet_t, METADATA_SIZE}, |
| }; |
| use component_events::{events::*, matcher::ExitStatusMatcher}; |
| use diagnostics_data::{Data, LogError, Logs, Severity}; |
| use diagnostics_hierarchy::trie::TrieIterableNode; |
| use diagnostics_reader::{ArchiveReader, Inspect, SubscriptionResultsStream}; |
| use fidl_fuchsia_archivist_tests::{ |
| SocketPuppetControllerRequest, SocketPuppetControllerRequestStream, SocketPuppetProxy, |
| }; |
| use fidl_fuchsia_diagnostics::ArchiveAccessorMarker; |
| use fidl_fuchsia_io::DirectoryMarker; |
| use fidl_fuchsia_sys2::{ChildRef, EventSourceMarker, RealmMarker}; |
| use fuchsia_async::{Task, Timer}; |
| use fuchsia_component::{client, server::ServiceFs}; |
| use fuchsia_component_test::{builder::*, mock, RealmInstance}; |
| use fuchsia_zircon as zx; |
| use futures::{ |
| channel::mpsc::{self, Receiver}, |
| StreamExt, |
| }; |
| use rand::{prelude::SliceRandom, rngs::StdRng, SeedableRng}; |
| use std::{collections::BTreeMap, ops::Deref, time::Duration}; |
| use tracing::{debug, info, trace}; |
| |
| const TEST_PACKET_LEN: usize = 49; |
| const MAX_PUPPETS: usize = 5; |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_budget() { |
| fuchsia_syslog::init().unwrap(); |
| fuchsia_syslog::set_severity(fuchsia_syslog::levels::DEBUG); |
| |
| info!("testing that the archivist's log buffers correctly enforce their budget"); |
| |
| info!("creating nested environment for collecting diagnostics"); |
| let mut env = PuppetEnv::create(MAX_PUPPETS).await; |
| |
| info!("check that archivist log state is clean"); |
| env.assert_archivist_state_matches_expected().await; |
| |
| for i in 0..MAX_PUPPETS { |
| env.launch_puppet(i).await; |
| } |
| env.validate().await; |
| } |
| |
| struct PuppetEnv { |
| max_puppets: usize, |
| instance: RealmInstance, |
| controllers: Receiver<SocketPuppetControllerRequestStream>, |
| messages_allowed_in_cache: usize, |
| messages_sent: Vec<MessageReceipt>, |
| launched_monikers: Vec<String>, |
| running_puppets: Vec<Puppet>, |
| inspect_reader: ArchiveReader, |
| log_reader: ArchiveReader, |
| log_subscription: SubscriptionResultsStream<Logs>, |
| rng: StdRng, |
| _log_errors: Task<()>, |
| } |
| |
| impl PuppetEnv { |
| async fn create(max_puppets: usize) -> Self { |
| let (sender, controllers) = mpsc::channel(1); |
| let mut builder = test_topology::create(test_topology::Options { |
| archivist_url: ARCHIVIST_WITH_SMALL_CACHES, |
| }) |
| .await |
| .expect("create base topology"); |
| builder |
| .add_component( |
| "mocks-server", |
| ComponentSource::Mock(mock::Mock::new(move |mock_handles: mock::MockHandles| { |
| Box::pin(run_mocks(mock_handles, sender.clone())) |
| })), |
| ) |
| .await |
| .unwrap(); |
| |
| for i in 0..max_puppets { |
| let name = format!("test/puppet-{}", i); |
| builder |
| .add_component(name.clone(), ComponentSource::url(SOCKET_PUPPET_COMPONENT_URL)) |
| .await |
| .unwrap() |
| .add_route(CapabilityRoute { |
| capability: Capability::protocol( |
| "fuchsia.archivist.tests.SocketPuppetController", |
| ), |
| source: RouteEndpoint::component("mocks-server"), |
| targets: vec![RouteEndpoint::component(name.clone())], |
| }) |
| .unwrap() |
| .add_route(CapabilityRoute { |
| capability: Capability::protocol("fuchsia.logger.LogSink"), |
| source: RouteEndpoint::component("test/archivist"), |
| targets: vec![RouteEndpoint::component(name)], |
| }) |
| .unwrap(); |
| } |
| |
| info!("starting our instance"); |
| let mut realm = builder.build(); |
| test_topology::expose_test_realm_protocol(&mut realm).await; |
| let instance = realm.create().await.expect("create instance"); |
| |
| let config = parse_config("/pkg/data/config/small-caches-config.json").unwrap(); |
| let messages_allowed_in_cache = config.logs.max_cached_original_bytes / TEST_PACKET_LEN; |
| |
| let archive = |
| || instance.root.connect_to_protocol_at_exposed_dir::<ArchiveAccessorMarker>().unwrap(); |
| let mut inspect_reader = ArchiveReader::new(); |
| inspect_reader |
| .with_archive(archive()) |
| .with_minimum_schema_count(1) // we only request inspect from our archivist |
| .add_selector("archivist:root/logs_buffer") |
| .add_selector("archivist:root/sources"); |
| let mut log_reader = ArchiveReader::new(); |
| log_reader |
| .with_archive(archive()) |
| .with_minimum_schema_count(0) // we want this to return even when no log messages |
| .retry_if_empty(false); |
| let (log_subscription, mut errors) = |
| log_reader.snapshot_then_subscribe::<Logs>().unwrap().split_streams(); |
| |
| let _log_errors = Task::spawn(async move { |
| if let Some(error) = errors.next().await { |
| panic!("{:#?}", error); |
| } |
| }); |
| |
| Self { |
| max_puppets, |
| controllers, |
| instance, |
| messages_allowed_in_cache, |
| messages_sent: vec![], |
| launched_monikers: vec![], |
| running_puppets: vec![], |
| inspect_reader, |
| log_reader, |
| log_subscription, |
| rng: StdRng::seed_from_u64(0xA455), |
| _log_errors, |
| } |
| } |
| |
| async fn launch_puppet(&mut self, id: usize) { |
| assert!(id < self.max_puppets); |
| let mut child_ref = ChildRef { name: format!("puppet-{}", id), collection: None }; |
| |
| let (_client_end, server_end) = |
| fidl::endpoints::create_endpoints::<DirectoryMarker>().unwrap(); |
| let realm = self.instance.root.connect_to_protocol_at_exposed_dir::<RealmMarker>().unwrap(); |
| realm.bind_child(&mut child_ref, server_end).await.unwrap().unwrap(); |
| |
| debug!("waiting for controller request"); |
| let mut controller = self.controllers.next().await.unwrap(); |
| |
| debug!("waiting for ControlPuppet call"); |
| let proxy = match controller.next().await { |
| Some(Ok(SocketPuppetControllerRequest::ControlPuppet { |
| to_control, |
| control_handle, |
| })) => { |
| control_handle.shutdown(); |
| to_control.into_proxy().unwrap() |
| } |
| _ => panic!("did not expect that"), |
| }; |
| |
| let moniker = format!( |
| "fuchsia_component_test_collection:{}/test/puppet-{}", |
| self.instance.root.child_name(), |
| id |
| ); |
| let puppet = Puppet { id, moniker: moniker.clone(), proxy }; |
| |
| info!("having the puppet connect to LogSink"); |
| puppet.connect_to_log_sink().await.unwrap(); |
| |
| info!("observe the puppet appears in archivist's inspect output"); |
| self.launched_monikers.push(moniker); |
| self.running_puppets.push(puppet); |
| |
| // wait for archivist to catch up with what we launched |
| while self.current_expected_sources() != self.current_observed_sources().await { |
| Timer::new(Duration::from_millis(100)).await; |
| } |
| } |
| |
| fn current_expected_sources(&self) -> BTreeMap<String, Count> { |
| // make sure we have an empty entry for each puppet we've launched |
| let mut expected_sources = BTreeMap::new(); |
| for source in &self.launched_monikers { |
| expected_sources.insert(source.clone(), Count { total: 0, dropped: 0 }); |
| } |
| |
| // compute the expected drops for each component based on our list of receipts |
| for (prior_messages, receipt) in self.messages_sent.iter().rev().enumerate() { |
| let mut puppet_count = expected_sources.get_mut(&receipt.moniker).unwrap(); |
| puppet_count.total += 1; |
| if prior_messages >= self.messages_allowed_in_cache { |
| puppet_count.dropped += 1; |
| } |
| } |
| |
| // archivist should have dropped all containers that have stopped and are empty |
| expected_sources |
| .into_iter() |
| .filter(|(moniker, count)| { |
| let has_messages = count.total > 0 && count.total != count.dropped; |
| let is_running = |
| self.running_puppets.iter().find(|puppet| moniker == &puppet.moniker).is_some(); |
| is_running || has_messages |
| }) |
| .collect() |
| } |
| |
| async fn current_observed_sources(&self) -> BTreeMap<String, Count> { |
| // we only request inspect from archivist-with-small-caches.cmx, 1 result always returned |
| let results = |
| self.inspect_reader.snapshot::<Inspect>().await.unwrap().into_iter().next().unwrap(); |
| let root = results.payload.as_ref().unwrap(); |
| |
| let mut counts = BTreeMap::new(); |
| let sources = root.get_child("sources").unwrap(); |
| |
| for (moniker, source) in sources.get_children() { |
| if let Some(logs) = source.get_child("logs") { |
| let total = logs.get_child("total").unwrap(); |
| let total_number = *total.get_property("number").unwrap().uint().unwrap() as usize; |
| let total_bytes = *total.get_property("bytes").unwrap().uint().unwrap() as usize; |
| assert_eq!(total_bytes, total_number * TEST_PACKET_LEN); |
| |
| let dropped = logs.get_child("dropped").unwrap(); |
| let dropped_number = |
| *dropped.get_property("number").unwrap().uint().unwrap() as usize; |
| let dropped_bytes = |
| *dropped.get_property("bytes").unwrap().uint().unwrap() as usize; |
| assert_eq!(dropped_bytes, dropped_number * TEST_PACKET_LEN); |
| |
| counts.insert( |
| moniker.clone(), |
| Count { total: total_number, dropped: dropped_number }, |
| ); |
| } |
| } |
| |
| counts |
| } |
| |
| async fn assert_archivist_state_matches_expected(&self) { |
| let expected_sources = self.current_expected_sources(); |
| let observed_sources = self.current_observed_sources().await; |
| assert_eq!(observed_sources, expected_sources); |
| |
| let expected_drops = || expected_sources.iter().filter(|(_, c)| c.dropped > 0); |
| let mut expected_logs = self |
| .messages_sent |
| .iter() |
| .rev() // we want the newest messages |
| .take(self.messages_allowed_in_cache) |
| .rev(); // but in the order they were sent |
| trace!("reading log snapshot"); |
| let observed_logs = self.log_reader.snapshot::<Logs>().await.unwrap().into_iter(); |
| |
| let mut dropped_message_warnings = BTreeMap::new(); |
| for observed in observed_logs { |
| if observed.metadata.errors.is_some() { |
| dropped_message_warnings.insert(observed.moniker.clone(), observed); |
| } else { |
| let expected = expected_logs.next().unwrap(); |
| assert_eq!(expected, &observed); |
| } |
| } |
| |
| for (moniker, Count { dropped, .. }) in expected_drops() { |
| let dropped_logs_warning = dropped_message_warnings.remove(moniker).unwrap(); |
| assert_eq!( |
| dropped_logs_warning.metadata.errors, |
| Some(vec![LogError::DroppedLogs { count: *dropped as u64 }]) |
| ); |
| assert_eq!(dropped_logs_warning.metadata.severity, Severity::Warn); |
| } |
| |
| assert!(dropped_message_warnings.is_empty(), "must have encountered all expected warnings"); |
| } |
| |
| async fn validate(mut self) { |
| // we want to spend most of this test's effort exercising the behavior of dropping messages |
| let overall_messages_to_log = self.messages_allowed_in_cache * 15; |
| |
| // we want to ensure that messages are retained after a component stops, up to our policy. |
| // this value should be chosen to ensure that we get to a point of rolling out all the |
| // messages for the stopped component and actually dropping it |
| let iteration_for_killing_a_puppet = self.messages_allowed_in_cache; |
| |
| let event_source = |
| EventSource::from_proxy(client::connect_to_protocol::<EventSourceMarker>().unwrap()); |
| let mut event_stream = event_source |
| .subscribe(vec![EventSubscription::new(vec![Stopped::NAME], EventMode::Async)]) |
| .await |
| .unwrap(); |
| |
| info!("having the puppets log packets until overflow"); |
| for i in 0..overall_messages_to_log { |
| trace!(i, "loop ticked"); |
| if i == iteration_for_killing_a_puppet { |
| let to_stop = self.running_puppets.pop().unwrap(); |
| let receipt = to_stop.emit_packet().await; |
| self.check_receipt(receipt).await; |
| |
| let id = to_stop.id; |
| drop(to_stop); |
| |
| utils::wait_for_component_stopped_event( |
| &self.instance.root.child_name(), |
| &format!("puppet-{}", id), |
| ExitStatusMatcher::Clean, |
| &mut event_stream, |
| ) |
| .await; |
| } |
| |
| let puppet = self.running_puppets.choose(&mut self.rng).unwrap(); |
| let receipt = puppet.emit_packet().await; |
| self.check_receipt(receipt).await; |
| } |
| |
| assert_eq!( |
| self.current_expected_sources().len(), |
| self.running_puppets.len(), |
| "must have stopped a component and rolled out all of its logs" |
| ); |
| info!("test complete!"); |
| } |
| |
| async fn check_receipt(&mut self, receipt: MessageReceipt) { |
| let next_message = self.log_subscription.next().await.unwrap(); |
| assert_eq!(receipt, next_message); |
| |
| self.messages_sent.push(receipt); |
| self.assert_archivist_state_matches_expected().await; |
| } |
| } |
| |
| struct Puppet { |
| proxy: SocketPuppetProxy, |
| moniker: String, |
| id: usize, |
| } |
| |
| impl std::fmt::Debug for Puppet { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| f.debug_struct("Puppet").field("moniker", &self.moniker).finish() |
| } |
| } |
| |
| impl Puppet { |
| async fn emit_packet(&self) -> MessageReceipt { |
| let timestamp = zx::Time::get_monotonic().into_nanos(); |
| let mut packet: fx_log_packet_t = Default::default(); |
| packet.metadata.severity = fuchsia_syslog::levels::INFO; |
| packet.metadata.time = timestamp; |
| packet.fill_data(1..(TEST_PACKET_LEN - METADATA_SIZE), b'A' as _); |
| self.proxy.emit_packet(packet.as_bytes()).await.unwrap(); |
| MessageReceipt { timestamp, moniker: self.moniker.clone() } |
| } |
| } |
| |
| impl Deref for Puppet { |
| type Target = SocketPuppetProxy; |
| fn deref(&self) -> &Self::Target { |
| &self.proxy |
| } |
| } |
| |
| async fn run_mocks( |
| mock_handles: mock::MockHandles, |
| mut sender: mpsc::Sender<SocketPuppetControllerRequestStream>, |
| ) -> Result<(), Error> { |
| let mut fs = ServiceFs::new(); |
| fs.dir("svc").add_fidl_service(move |stream: SocketPuppetControllerRequestStream| { |
| sender.start_send(stream).unwrap(); |
| }); |
| fs.serve_connection(mock_handles.outgoing_dir.into_channel())?; |
| fs.collect::<()>().await; |
| Ok(()) |
| } |
| |
| #[derive(Clone, Copy, Debug, PartialEq)] |
| struct Count { |
| total: usize, |
| dropped: usize, |
| } |
| |
| /// A value indicating a message was sent by a particular puppet. |
| #[derive(Clone, Debug, PartialEq)] |
| struct MessageReceipt { |
| moniker: String, |
| timestamp: i64, |
| } |
| |
| impl PartialEq<Data<Logs>> for MessageReceipt { |
| fn eq(&self, other: &Data<Logs>) -> bool { |
| // we launch `socket_puppet0.cmx` and store that moniker, but the moniker we get back from |
| // archivist looks like `socket_puppet0.cmx:12345`, so we do a prefix match instead of |
| // full string equality |
| other.moniker.starts_with(&self.moniker) |
| && *other.metadata.timestamp as i64 == self.timestamp |
| } |
| } |