| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #![allow(clippy::large_futures)] |
| |
| use crate::{constants::*, test_topology}; |
| use anyhow::Error; |
| use diagnostics_data::{Data, Logs}; |
| use diagnostics_message::{fx_log_packet_t, METADATA_SIZE}; |
| use diagnostics_reader::{ArchiveReader, RetryConfig}; |
| use fidl::prelude::*; |
| use fidl_fuchsia_archivist_test as ftest; |
| use fidl_fuchsia_archivist_tests::{ |
| SocketPuppetControllerRequest, SocketPuppetControllerRequestStream, SocketPuppetProxy, |
| }; |
| use fidl_fuchsia_component as fcomponent; |
| use fidl_fuchsia_component::RealmMarker; |
| use fidl_fuchsia_component_decl::ChildRef; |
| use fidl_fuchsia_diagnostics as fdiagnostics; |
| use fidl_fuchsia_io as fio; |
| use fuchsia_async::Task; |
| use fuchsia_component::{client, server::ServiceFs}; |
| use fuchsia_component_test::{ |
| Capability, ChildOptions, LocalComponentHandles, RealmInstance, Ref, Route, |
| }; |
| use fuchsia_zircon as zx; |
| use futures::{ |
| channel::mpsc::{self, Receiver}, |
| StreamExt, |
| }; |
| use std::ops::Deref; |
| use tracing::{debug, info}; |
| |
| const TEST_PACKET_LEN: usize = 49; |
| const MAX_PUPPETS: usize = 5; |
| const SPAM_PUPPET_ID: usize = 0; |
| const VICTIM_PUPPET_ID: usize = 1; |
| const SPAM_COUNT: usize = 9001; |
| |
| #[fuchsia::test(logging_minimum_severity = "debug")] |
| async fn test_budget() { |
| info!("testing that the archivist's log buffers correctly enforce their budget"); |
| |
| info!("creating nested environment for collecting diagnostics"); |
| let mut env = PuppetEnv::create(MAX_PUPPETS).await; |
| // New test |
| // Spam puppet which spams the good puppet's logs removing them from the buffer |
| env.create_puppet(SPAM_PUPPET_ID).await; |
| env.create_puppet(VICTIM_PUPPET_ID).await; |
| let expected = env.running_puppets[VICTIM_PUPPET_ID].emit_packet().await; |
| let mut observed_logs = env.log_reader.snapshot_then_subscribe::<Logs>().unwrap(); |
| // split_streams is needed here to ensure parallel execution. |
| // If this isn't ran in parallel, the ordering required by this |
| // test never happens. |
| let (mut observed_logs_2, _errors) = |
| env.log_reader.snapshot_then_subscribe::<Logs>().unwrap().split_streams(); |
| let msg_a = observed_logs.next().await.unwrap().unwrap(); |
| let msg_a_2 = observed_logs_2.next().await.unwrap(); |
| assert_eq!(expected, msg_a); |
| assert_eq!(expected, msg_a_2); |
| for _ in 0..SPAM_COUNT { |
| let last_msg = env.running_puppets[SPAM_PUPPET_ID].emit_packet().await; |
| assert_eq!(last_msg, observed_logs.next().await.unwrap().unwrap()); |
| } |
| let log = observed_logs_2.skip(33).next().await.unwrap(); |
| assert_eq!(log.rolled_out_logs(), Some(8907)); |
| let mut observed_logs = env.log_reader.snapshot::<Logs>().await.unwrap().into_iter(); |
| let msg_b = observed_logs.next().unwrap(); |
| assert!(!msg_b.moniker.contains(&format!("puppet-{VICTIM_PUPPET_ID}"))); |
| // Vicitm logs should have been rolled out. |
| let messages = observed_logs |
| .filter(|log| log.moniker.contains(&format!("puppet-{VICTIM_PUPPET_ID}"))) |
| .collect::<Vec<_>>(); |
| assert!(messages.is_empty()); |
| assert_ne!(msg_a, msg_b); |
| } |
| |
| struct PuppetEnv { |
| max_puppets: usize, |
| instance: RealmInstance, |
| controllers: Receiver<SocketPuppetControllerRequestStream>, |
| launched_monikers: Vec<String>, |
| running_puppets: Vec<Puppet>, |
| log_reader: ArchiveReader, |
| _log_errors: Task<()>, |
| } |
| |
| impl PuppetEnv { |
| async fn create(max_puppets: usize) -> Self { |
| let (sender, controllers) = mpsc::channel(1); |
| let (builder, test_realm) = test_topology::create(test_topology::Options { |
| archivist_config: ftest::ArchivistConfig { |
| logs_max_cached_original_bytes: Some(3000), |
| ..Default::default() |
| }, |
| realm_name: None, |
| }) |
| .await |
| .expect("create base topology"); |
| let mocks_server = builder |
| .add_local_child( |
| "mocks-server", |
| move |handles: LocalComponentHandles| Box::pin(run_mocks(handles, sender.clone())), |
| ChildOptions::new(), |
| ) |
| .await |
| .unwrap(); |
| |
| builder |
| .add_route( |
| Route::new() |
| .capability(Capability::protocol_by_name( |
| "fuchsia.archivist.tests.SocketPuppetController", |
| )) |
| .from(&mocks_server) |
| .to(&test_realm), |
| ) |
| .await |
| .unwrap(); |
| |
| for i in 0..max_puppets { |
| let name = format!("puppet-{i}"); |
| let puppet = test_realm |
| .add_child(name.clone(), SOCKET_PUPPET_COMPONENT_URL, ChildOptions::new()) |
| .await |
| .unwrap(); |
| test_realm |
| .add_route( |
| Route::new() |
| .capability(Capability::protocol_by_name( |
| "fuchsia.archivist.tests.SocketPuppetController", |
| )) |
| .from(Ref::parent()) |
| .to(&puppet), |
| ) |
| .await |
| .unwrap(); |
| test_realm |
| .add_route( |
| Route::new() |
| .capability(Capability::protocol_by_name("fuchsia.logger.LogSink")) |
| .from(Ref::child("archivist")) |
| .to(&puppet), |
| ) |
| .await |
| .unwrap(); |
| } |
| |
| info!("starting our instance"); |
| test_topology::expose_test_realm_protocol(&builder, &test_realm).await; |
| let instance = builder.build().await.expect("create instance"); |
| |
| let archive = || { |
| instance |
| .root |
| .connect_to_protocol_at_exposed_dir::<fdiagnostics::ArchiveAccessorMarker>() |
| .unwrap() |
| }; |
| let mut inspect_reader = ArchiveReader::new(); |
| inspect_reader |
| .with_archive(archive()) |
| .with_minimum_schema_count(1) // we only request inspect from our archivist |
| .add_selector("archivist:root/logs_buffer") |
| .add_selector("archivist:root/sources"); |
| let mut log_reader = ArchiveReader::new(); |
| log_reader |
| .with_archive(archive()) |
| .with_minimum_schema_count(0) // we want this to return even when no log messages |
| .retry(RetryConfig::never()); |
| let (_log_subscription, mut errors) = |
| log_reader.snapshot_then_subscribe::<Logs>().unwrap().split_streams(); |
| |
| let _log_errors = Task::spawn(async move { |
| if let Some(error) = errors.next().await { |
| panic!("{error:#?}"); |
| } |
| }); |
| |
| Self { |
| max_puppets, |
| controllers, |
| instance, |
| launched_monikers: vec![], |
| running_puppets: vec![], |
| log_reader, |
| _log_errors, |
| } |
| } |
| |
| async fn create_puppet(&mut self, id: usize) -> String { |
| assert!(id < self.max_puppets); |
| let child_ref = ChildRef { name: format!("puppet-{id}"), collection: None }; |
| |
| let (exposed_dir, server_end) = |
| fidl::endpoints::create_proxy::<fio::DirectoryMarker>().unwrap(); |
| let realm = self.instance.root.connect_to_protocol_at_exposed_dir::<RealmMarker>().unwrap(); |
| realm.open_exposed_dir(&child_ref, server_end).await.unwrap().unwrap(); |
| |
| let _ = client::connect_to_protocol_at_dir_root::<fcomponent::BinderMarker>(&exposed_dir) |
| .unwrap(); |
| |
| debug!("waiting for controller request"); |
| let mut controller = self.controllers.next().await.unwrap(); |
| |
| debug!("waiting for ControlPuppet call"); |
| let proxy = match controller.next().await { |
| Some(Ok(SocketPuppetControllerRequest::ControlPuppet { |
| to_control, |
| control_handle, |
| })) => { |
| control_handle.shutdown(); |
| to_control.into_proxy().unwrap() |
| } |
| _ => panic!("did not expect that"), |
| }; |
| |
| let moniker = format!("puppet-{id}"); |
| let puppet = Puppet { moniker: moniker.clone(), proxy }; |
| |
| info!("having the puppet connect to LogSink"); |
| puppet.connect_to_log_sink().await.unwrap(); |
| |
| info!("observe the puppet appears in archivist's inspect output"); |
| self.launched_monikers.push(moniker.clone()); |
| self.running_puppets.push(puppet); |
| moniker |
| } |
| } |
| |
| struct Puppet { |
| proxy: SocketPuppetProxy, |
| moniker: String, |
| } |
| |
| impl std::fmt::Debug for Puppet { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| f.debug_struct("Puppet").field("moniker", &self.moniker).finish() |
| } |
| } |
| |
| impl Puppet { |
| async fn emit_packet(&self) -> MessageReceipt { |
| let timestamp = zx::Time::get_monotonic().into_nanos(); |
| let mut packet: fx_log_packet_t = Default::default(); |
| packet.metadata.severity = fdiagnostics::Severity::Info.into_primitive() as i32; |
| packet.metadata.time = timestamp; |
| packet.fill_data(1..(TEST_PACKET_LEN - METADATA_SIZE), b'A' as _); |
| self.proxy.emit_packet(packet.as_bytes()).await.unwrap(); |
| MessageReceipt { timestamp, moniker: self.moniker.clone() } |
| } |
| } |
| |
| impl Deref for Puppet { |
| type Target = SocketPuppetProxy; |
| fn deref(&self) -> &Self::Target { |
| &self.proxy |
| } |
| } |
| |
| async fn run_mocks( |
| handles: LocalComponentHandles, |
| mut sender: mpsc::Sender<SocketPuppetControllerRequestStream>, |
| ) -> Result<(), Error> { |
| let mut fs = ServiceFs::new(); |
| fs.dir("svc").add_fidl_service(move |stream: SocketPuppetControllerRequestStream| { |
| sender.start_send(stream).unwrap(); |
| }); |
| fs.serve_connection(handles.outgoing_dir)?; |
| fs.collect::<()>().await; |
| Ok(()) |
| } |
| |
| /// A value indicating a message was sent by a particular puppet. |
| #[derive(Clone, Debug, PartialEq)] |
| struct MessageReceipt { |
| moniker: String, |
| timestamp: i64, |
| } |
| |
| impl PartialEq<Data<Logs>> for MessageReceipt { |
| fn eq(&self, other: &Data<Logs>) -> bool { |
| other.moniker == self.moniker && other.metadata.timestamp == self.timestamp |
| } |
| } |