blob: 397cf080ef4de845d3cf30a473b70bdf19c04d6e [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
container::ComponentDiagnostics,
error::Error,
events::types::UniqueKey,
identity::ComponentIdentity,
inspect::container::{InspectArtifactsContainer, UnpopulatedInspectDataContainer},
lifecycle::container::{LifecycleArtifactsContainer, LifecycleDataContainer},
logs::{
budget::BudgetManager,
container::LogsArtifactsContainer,
debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY},
error::LogsError,
listener::Listener,
multiplex::{Multiplexer, MultiplexerHandle},
},
ImmutableString,
},
diagnostics_data::LogsData,
diagnostics_hierarchy::{
trie::{self, TrieIterableNode},
InspectHierarchyMatcher,
},
fidl::endpoints::ProtocolMarker,
fidl_fuchsia_diagnostics::{
self, LogInterestSelector, LogSettingsMarker, LogSettingsRequest, LogSettingsRequestStream,
Selector, StreamMode,
},
fidl_fuchsia_io as fio,
fidl_fuchsia_logger::{LogMarker, LogRequest, LogRequestStream},
fuchsia_async as fasync, fuchsia_inspect as inspect, fuchsia_zircon as zx,
futures::channel::mpsc,
futures::prelude::*,
io_util,
lazy_static::lazy_static,
parking_lot::{Mutex, RwLock},
selectors,
std::{
collections::{BTreeMap, HashMap},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
},
tracing::{debug, error, warn},
};
lazy_static! {
static ref CONNECTION_ID: AtomicUsize = AtomicUsize::new(0);
}
/// DataRepo holds all diagnostics data and is a singleton wrapped by multiple
/// [`pipeline::Pipeline`]s in a given Archivist instance.
#[derive(Clone)]
pub struct DataRepo {
inner: Arc<RwLock<DataRepoState>>,
}
impl std::ops::Deref for DataRepo {
type Target = RwLock<DataRepoState>;
fn deref(&self) -> &Self::Target {
&*self.inner
}
}
#[cfg(test)]
impl Default for DataRepo {
fn default() -> Self {
let budget = BudgetManager::new(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES);
DataRepo { inner: DataRepoState::new(budget, &Default::default()) }
}
}
impl DataRepo {
pub fn new(logs_budget: &BudgetManager, parent: &fuchsia_inspect::Node) -> Self {
DataRepo { inner: DataRepoState::new(logs_budget.clone(), parent) }
}
/// Drain the kernel's debug log. The returned future completes once
/// existing messages have been ingested.
pub async fn drain_debuglog<K>(self, klog_reader: K)
where
K: DebugLog + Send + Sync + 'static,
{
debug!("Draining debuglog.");
let container = self.write().get_log_container(KERNEL_IDENTITY.clone());
let mut kernel_logger = DebugLogBridge::create(klog_reader);
let mut messages = match kernel_logger.existing_logs().await {
Ok(messages) => messages,
Err(e) => {
error!(%e, "failed to read from kernel log, important logs may be missing");
return;
}
};
messages.sort_by_key(|m| m.timestamp());
for message in messages {
container.ingest_message(message);
}
let res = kernel_logger
.listen()
.try_for_each(|message| async {
container.ingest_message(message);
Ok(())
})
.await;
if let Err(e) = res {
error!(%e, "failed to drain kernel log, important logs may be missing");
}
}
/// Spawn a task to handle requests from components reading the shared log.
pub fn handle_log(
self,
stream: LogRequestStream,
sender: mpsc::UnboundedSender<fasync::Task<()>>,
) {
if let Err(e) = sender.clone().unbounded_send(fasync::Task::spawn(async move {
if let Err(e) = self.handle_log_requests(stream, sender).await {
warn!("error handling Log requests: {}", e);
}
})) {
warn!("Couldn't queue listener task: {:?}", e);
}
}
/// Handle requests to `fuchsia.logger.Log`. All request types read the
/// whole backlog from memory, `DumpLogs(Safe)` stops listening after that.
async fn handle_log_requests(
self,
mut stream: LogRequestStream,
mut sender: mpsc::UnboundedSender<fasync::Task<()>>,
) -> Result<(), LogsError> {
let connection_id = CONNECTION_ID.fetch_add(1, Ordering::Relaxed);
while let Some(request) = stream.next().await {
let request = request.map_err(|source| LogsError::HandlingRequests {
protocol: LogMarker::NAME,
source,
})?;
let (listener, options, dump_logs, selectors) = match request {
LogRequest::ListenSafe { log_listener, options, .. } => {
(log_listener, options, false, None)
}
LogRequest::DumpLogsSafe { log_listener, options, .. } => {
(log_listener, options, true, None)
}
LogRequest::ListenSafeWithSelectors {
log_listener, options, selectors, ..
} => (log_listener, options, false, Some(selectors)),
};
let listener = Listener::new(listener, options)?;
let mode =
if dump_logs { StreamMode::Snapshot } else { StreamMode::SnapshotThenSubscribe };
let logs = self.logs_cursor(mode, None);
if let Some(s) = selectors {
self.write().update_logs_interest(connection_id, s);
}
sender.send(listener.spawn(logs, dump_logs)).await.ok();
}
self.write().finish_interest_connection(connection_id);
Ok(())
}
pub async fn handle_log_settings(
self,
mut stream: LogSettingsRequestStream,
) -> Result<(), LogsError> {
let connection_id = CONNECTION_ID.fetch_add(1, Ordering::Relaxed);
while let Some(request) = stream.next().await {
let request = request.map_err(|source| LogsError::HandlingRequests {
protocol: LogSettingsMarker::NAME,
source,
})?;
match request {
LogSettingsRequest::RegisterInterest { selectors, .. } => {
self.write().update_logs_interest(connection_id, selectors);
}
}
}
self.write().finish_interest_connection(connection_id);
Ok(())
}
pub fn logs_cursor(
&self,
mode: StreamMode,
selectors: Option<Vec<Selector>>,
) -> impl Stream<Item = Arc<LogsData>> + Send + 'static {
let mut repo = self.write();
let (mut merged, mpx_handle) = Multiplexer::new();
if let Some(selectors) = selectors {
merged.set_selectors(selectors);
}
repo.data_directories
.iter()
.filter_map(|(_, c)| c)
.filter_map(|c| {
c.logs_cursor(mode).map(|cursor| (c.identity.relative_moniker.clone(), cursor))
})
.for_each(|(n, c)| {
mpx_handle.send(n, c);
});
repo.logs_multiplexers.add(mode, mpx_handle);
merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
merged
}
/// Returns `true` if a container exists for the requested `identity` and that container either
/// corresponds to a running component or we've decided to still retain it.
pub fn is_live(&self, identity: &ComponentIdentity) -> bool {
let this = self.read();
if let Some(containers) = this.data_directories.get(&identity.unique_key().into()) {
let diagnostics_containers = containers.get_values();
diagnostics_containers.len() == 1 && diagnostics_containers[0].should_retain()
} else {
false
}
}
/// Stop accepting new messages, ensuring that pending Cursors return Poll::Ready(None) after
/// consuming any messages received before this call.
pub fn terminate_logs(&self) {
let mut repo = self.write();
for container in repo.data_directories.iter().filter_map(|(_, v)| v) {
container.terminate_logs();
}
repo.logs_multiplexers.terminate();
}
}
pub struct DataRepoState {
pub data_directories: trie::Trie<String, ComponentDiagnostics>,
inspect_node: inspect::Node,
/// A reference to the budget manager, kept to be passed to containers.
logs_budget: BudgetManager,
/// The current global interest in logs, as defined by the last client to send us selectors.
logs_interest: Vec<LogInterestSelector>,
/// BatchIterators for logs need to be made aware of new components starting and their logs.
logs_multiplexers: MultiplexerBroker,
/// Interest registrations that we have received through fuchsia.logger.Log/ListWithSelectors
/// or through fuchsia.logger.LogSettings/RegisterInterest.
interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
}
impl DataRepoState {
fn new(logs_budget: BudgetManager, parent: &fuchsia_inspect::Node) -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(Self {
inspect_node: parent.create_child("sources"),
data_directories: trie::Trie::new(),
logs_budget,
logs_interest: vec![],
logs_multiplexers: MultiplexerBroker::new(),
interest_registrations: BTreeMap::new(),
}))
}
pub fn mark_stopped(&mut self, key: &UniqueKey) {
if let Some(containers) = self.data_directories.get_mut(key) {
let diagnostics_containers = containers.get_values_mut();
if diagnostics_containers.len() == 1 {
diagnostics_containers[0].mark_stopped();
}
}
}
pub fn add_new_component(
&mut self,
identity: ComponentIdentity,
event_timestamp: zx::Time,
) -> Result<(), Error> {
let lifecycle_artifact_container = LifecycleArtifactsContainer { event_timestamp };
let unique_key: Vec<_> = identity.unique_key().into();
let diag_repo_entry_opt = self.data_directories.get_mut(&unique_key);
match diag_repo_entry_opt {
Some(diag_repo_entry) => {
let diag_repo_entry_values: &mut [ComponentDiagnostics] =
diag_repo_entry.get_values_mut();
match &mut *diag_repo_entry_values {
[] => {
// An entry with no values implies that the somehow we observed the
// creation of a component lower in the topology before observing this
// one. If this is the case, just instantiate as though it's our first
// time encountering this moniker segment.
self.data_directories.insert(
unique_key,
ComponentDiagnostics::new_with_lifecycle(
Arc::new(identity),
lifecycle_artifact_container,
&self.inspect_node,
),
)
}
[existing_diagnostics_artifact_container] => {
// Races may occur between seeing diagnostics ready and seeing
// creation lifecycle events. Handle this here.
// TODO(fxbug.dev/52047): Remove once caching handles ordering issues.
existing_diagnostics_artifact_container.mark_started();
if existing_diagnostics_artifact_container.lifecycle.is_none() {
existing_diagnostics_artifact_container.lifecycle =
Some(lifecycle_artifact_container);
}
}
_ => {
return Err(Error::MultipleArtifactContainers(unique_key));
}
}
}
// This case is expected to be the most common case. We've seen a creation
// lifecycle event and it promotes the instantiation of a new data repository entry.
None => self.data_directories.insert(
unique_key,
ComponentDiagnostics::new_with_lifecycle(
Arc::new(identity),
lifecycle_artifact_container,
&self.inspect_node,
),
),
}
Ok(())
}
/// Returns a container for logs artifacts, constructing one and adding it to the trie if
/// necessary.
pub fn get_log_container(
&mut self,
identity: ComponentIdentity,
) -> Arc<LogsArtifactsContainer> {
let trie_key: Vec<_> = identity.unique_key().into();
// we use a macro instead of a closure to avoid lifetime issues
macro_rules! insert_component {
() => {{
let mut to_insert =
ComponentDiagnostics::empty(Arc::new(identity), &self.inspect_node);
let logs = to_insert.logs(
&self.logs_budget,
&self.logs_interest,
&mut self.logs_multiplexers,
);
self.data_directories.insert(trie_key, to_insert);
logs
}};
}
match self.data_directories.get_mut(&trie_key) {
Some(component) => match &mut component.get_values_mut()[..] {
[] => insert_component!(),
[existing] => existing.logs(
&self.logs_budget,
&self.logs_interest,
&mut self.logs_multiplexers,
),
_ => unreachable!("invariant: each trie node has 0-1 entries"),
},
None => insert_component!(),
}
}
pub fn update_logs_interest(
&mut self,
connection_id: usize,
selectors: Vec<LogInterestSelector>,
) {
let previous_selectors =
self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
// unwrap safe, we just inserted.
let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
for (_, dir) in self.data_directories.iter() {
if let Some(dir) = dir {
if let Some(logs) = &dir.logs {
logs.update_interest(new_selectors, &previous_selectors);
}
}
}
}
pub fn finish_interest_connection(&mut self, connection_id: usize) {
let selectors = self.interest_registrations.remove(&connection_id);
if let Some(selectors) = selectors {
for (_, dir) in self.data_directories.iter() {
if let Some(dir) = dir {
if let Some(logs) = &dir.logs {
logs.reset_interest(&selectors);
}
}
}
}
}
pub fn add_inspect_artifacts(
&mut self,
identity: ComponentIdentity,
directory_proxy: fio::DirectoryProxy,
event_timestamp: zx::Time,
) -> Result<(), Error> {
let inspect_container = InspectArtifactsContainer {
component_diagnostics_proxy: directory_proxy,
event_timestamp,
};
self.insert_inspect_artifact_container(inspect_container, identity)
}
// Inserts an InspectArtifactsContainer into the data repository.
fn insert_inspect_artifact_container(
&mut self,
inspect_container: InspectArtifactsContainer,
identity: ComponentIdentity,
) -> Result<(), Error> {
let unique_key: Vec<_> = identity.unique_key().into();
let diag_repo_entry_opt = self.data_directories.get_mut(&unique_key);
match diag_repo_entry_opt {
Some(diag_repo_entry) => {
let diag_repo_entry_values: &mut [ComponentDiagnostics] =
diag_repo_entry.get_values_mut();
match &mut *diag_repo_entry_values {
[] => {
// An entry with no values implies that the somehow we observed the
// creation of a component lower in the topology before observing this
// one. If this is the case, just instantiate as though it's our first
// time encountering this moniker segment.
self.data_directories.insert(
unique_key,
ComponentDiagnostics::new_with_inspect(
Arc::new(identity),
inspect_container,
&self.inspect_node,
),
)
}
[existing_diagnostics_artifact_container] => {
// Races may occur between synthesized and real diagnostics_ready
// events, so we must handle de-duplication here.
// TODO(fxbug.dev/52047): Remove once caching handles ordering issues.
existing_diagnostics_artifact_container.mark_started();
if existing_diagnostics_artifact_container.inspect.is_none() {
// This is expected to be the most common case. We've encountered the
// diagnostics_ready event for a component that has already been
// observed to be started/existing. We now must update the diagnostics
// artifact container with the inspect artifacts that accompanied the
// diagnostics_ready event.
existing_diagnostics_artifact_container.inspect =
Some(inspect_container);
}
}
_ => {
return Err(Error::MultipleArtifactContainers(unique_key));
}
}
}
// This case is expected to be uncommon; we've encountered a diagnostics_ready
// event before a start or existing event!
None => self.data_directories.insert(
unique_key,
ComponentDiagnostics::new_with_inspect(
Arc::new(identity),
inspect_container,
&self.inspect_node,
),
),
}
Ok(())
}
pub fn fetch_lifecycle_event_data(&self) -> Vec<LifecycleDataContainer> {
self.data_directories.iter().fold(
Vec::new(),
|mut acc, (_, diagnostics_artifacts_container_opt)| {
match diagnostics_artifacts_container_opt {
None => acc,
Some(diagnostics_artifacts_container) => {
if let Some(lifecycle_artifacts) =
&diagnostics_artifacts_container.lifecycle
{
acc.push(LifecycleDataContainer::from_lifecycle_artifact(
lifecycle_artifacts,
diagnostics_artifacts_container.identity.clone(),
));
}
if let Some(inspect_artifacts) = &diagnostics_artifacts_container.inspect {
acc.push(LifecycleDataContainer::from_inspect_artifact(
inspect_artifacts,
diagnostics_artifacts_container.identity.clone(),
));
}
if let Some(log_artifacts) = &diagnostics_artifacts_container.logs {
acc.push(LifecycleDataContainer::from_logs_sink_connected_artifact(
log_artifacts,
diagnostics_artifacts_container.identity.clone(),
));
}
acc
}
}
},
)
}
/// Return all of the DirectoryProxies that contain Inspect hierarchies
/// which contain data that should be selected from.
pub fn fetch_inspect_data(
&self,
component_selectors: &Option<Vec<Selector>>,
moniker_to_static_matcher_map: Option<&HashMap<ImmutableString, InspectHierarchyMatcher>>,
) -> Vec<UnpopulatedInspectDataContainer> {
return self
.data_directories
.iter()
.filter_map(|(_, diagnostics_artifacts_container_opt)| {
let (diagnostics_artifacts_container, inspect_artifacts) =
match &diagnostics_artifacts_container_opt {
Some(diagnostics_artifacts_container) => {
match &diagnostics_artifacts_container.inspect {
Some(inspect_artifacts) => {
(diagnostics_artifacts_container, inspect_artifacts)
}
None => return None,
}
}
None => return None,
};
let optional_hierarchy_matcher = match moniker_to_static_matcher_map {
Some(map) => {
match map.get(
diagnostics_artifacts_container
.identity
.relative_moniker
.join("/")
.as_str(),
) {
Some(inspect_matcher) => Some(inspect_matcher),
// Return early if there were static selectors, and none were for this
// moniker.
None => return None,
}
}
None => None,
};
// Verify that the dynamic selectors contain an entry that applies to
// this moniker as well.
if !match component_selectors {
Some(component_selectors) => component_selectors.iter().any(|s| {
selectors::match_component_moniker_against_selector(
&diagnostics_artifacts_container.identity.relative_moniker,
s,
)
.ok()
.unwrap_or(false)
}),
None => true,
} {
return None;
}
// This artifact contains inspect and matches a passed selector.
io_util::clone_directory(
&inspect_artifacts.component_diagnostics_proxy,
fio::OpenFlags::CLONE_SAME_RIGHTS,
)
.ok()
.map(|directory| UnpopulatedInspectDataContainer {
identity: diagnostics_artifacts_container.identity.clone(),
component_diagnostics_proxy: directory,
inspect_matcher: optional_hierarchy_matcher.cloned(),
})
})
.collect();
}
}
type LiveIteratorsMap = HashMap<usize, (StreamMode, MultiplexerHandle<Arc<LogsData>>)>;
/// Ensures that BatchIterators get access to logs from newly started components.
pub struct MultiplexerBroker {
live_iterators: Arc<Mutex<LiveIteratorsMap>>,
cleanup_sender: mpsc::UnboundedSender<usize>,
_live_iterators_cleanup_task: fasync::Task<()>,
}
impl MultiplexerBroker {
fn new() -> Self {
let (cleanup_sender, mut receiver) = mpsc::unbounded();
let live_iterators = Arc::new(Mutex::new(HashMap::new()));
let live_iterators_clone = live_iterators.clone();
Self {
live_iterators,
cleanup_sender,
_live_iterators_cleanup_task: fasync::Task::spawn(async move {
while let Some(id) = receiver.next().await {
live_iterators_clone.lock().remove(&id);
}
}),
}
}
fn cleanup_sender(&self) -> mpsc::UnboundedSender<usize> {
self.cleanup_sender.clone()
}
/// A new BatchIterator has been created and must be notified when future log containers are
/// created.
fn add(&mut self, mode: StreamMode, recipient: MultiplexerHandle<Arc<LogsData>>) {
match mode {
// snapshot streams only want to know about what's currently available
StreamMode::Snapshot => recipient.close(),
StreamMode::SnapshotThenSubscribe | StreamMode::Subscribe => {
self.live_iterators.lock().insert(recipient.multiplexer_id(), (mode, recipient));
}
}
}
/// Notify existing BatchIterators of a new logs container so they can include its messages
/// in their results.
pub fn send(&mut self, container: &Arc<LogsArtifactsContainer>) {
self.live_iterators.lock().retain(|_, (mode, recipient)| {
recipient.send(container.identity.relative_moniker.clone(), container.cursor(*mode))
});
}
/// Notify all multiplexers to terminate their streams once sub streams have terminated.
fn terminate(&mut self) {
for (_, (_, recipient)) in self.live_iterators.lock().drain() {
recipient.close();
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{events::types::ComponentIdentifier, logs::stored_message::StoredMessage},
diagnostics_hierarchy::trie::TrieIterableNode,
diagnostics_log_encoding::{
encode::Encoder, Argument, Record, Severity as StreamSeverity, Value,
},
fuchsia_zircon as zx,
selectors::{self, FastError},
std::{io::Cursor, time::Duration},
};
const TEST_URL: &'static str = "fuchsia-pkg://test";
#[fuchsia::test]
async fn inspect_repo_disallows_duplicated_dirs() {
let inspect_repo = DataRepo::default();
let mut inspect_repo = inspect_repo.write();
let moniker = vec!["a", "b", "foo.cmx"].into();
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy { instance_id, moniker };
let identity = ComponentIdentity::from_identifier_and_url(component_id, TEST_URL);
let (proxy, _) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>()
.expect("create directory proxy");
inspect_repo
.add_inspect_artifacts(identity.clone(), proxy, zx::Time::from_nanos(0))
.expect("add to repo");
let (proxy, _) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>()
.expect("create directory proxy");
inspect_repo
.add_inspect_artifacts(identity.clone(), proxy, zx::Time::from_nanos(0))
.expect("add to repo");
assert_eq!(
inspect_repo
.data_directories
.get(&identity.unique_key().into())
.unwrap()
.get_values()
.len(),
1
);
}
#[fuchsia::test]
async fn data_repo_updates_existing_entry_to_hold_inspect_data() {
let data_repo = DataRepo::default();
let mut data_repo = data_repo.write();
let moniker = vec!["a", "b", "foo.cmx"].into();
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy { instance_id, moniker };
let identity = ComponentIdentity::from_identifier_and_url(component_id, TEST_URL);
data_repo
.add_new_component(identity.clone(), zx::Time::from_nanos(0))
.expect("instantiated new component.");
let (proxy, _) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>()
.expect("create directory proxy");
data_repo
.add_inspect_artifacts(identity.clone(), proxy, zx::Time::from_nanos(0))
.expect("add to repo");
assert_eq!(
data_repo
.data_directories
.get(&identity.unique_key().into())
.unwrap()
.get_values()
.len(),
1
);
let entry =
&data_repo.data_directories.get(&identity.unique_key().into()).unwrap().get_values()[0];
assert!(entry.inspect.is_some());
assert_eq!(entry.identity.url, TEST_URL);
}
#[fuchsia::test]
async fn data_repo_tolerates_duplicate_new_component_insertions() {
let data_repo = DataRepo::default();
let mut data_repo = data_repo.write();
let moniker = vec!["a", "b", "foo.cmx"].into();
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy { instance_id, moniker };
let identity = ComponentIdentity::from_identifier_and_url(component_id.clone(), TEST_URL);
data_repo
.add_new_component(identity.clone(), zx::Time::from_nanos(0))
.expect("instantiated new component.");
let duplicate_new_component_insertion =
data_repo.add_new_component(identity.clone(), zx::Time::from_nanos(1));
assert!(duplicate_new_component_insertion.is_ok());
let repo_values =
data_repo.data_directories.get(&identity.unique_key().into()).unwrap().get_values();
assert_eq!(repo_values.len(), 1);
let entry = &repo_values[0];
assert!(entry.lifecycle.is_some());
assert_eq!(entry.identity.relative_moniker, component_id.relative_moniker_for_selectors());
assert_eq!(entry.identity.url, TEST_URL);
}
#[fuchsia::test]
async fn running_components_provide_start_time() {
let data_repo = DataRepo::default();
let mut data_repo = data_repo.write();
let moniker = vec!["a", "b", "foo.cmx"].into();
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy { instance_id, moniker };
let identity = ComponentIdentity::from_identifier_and_url(component_id.clone(), TEST_URL);
let component_insertion =
data_repo.add_new_component(identity.clone(), zx::Time::from_nanos(1));
assert!(component_insertion.is_ok());
let repo_values =
data_repo.data_directories.get(&identity.unique_key().into()).unwrap().get_values();
assert_eq!(repo_values.len(), 1);
let entry = &repo_values[0];
assert!(entry.lifecycle.is_some());
assert_eq!(entry.identity.relative_moniker, component_id.relative_moniker_for_selectors());
assert_eq!(entry.identity.url, TEST_URL);
}
#[fuchsia::test]
async fn data_repo_tolerant_of_new_component_calls_if_diagnostics_ready_already_processed() {
let data_repo = DataRepo::default();
let mut data_repo = data_repo.write();
let moniker = vec!["a", "b", "foo.cmx"].into();
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy { instance_id, moniker };
let identity = ComponentIdentity::from_identifier_and_url(component_id, TEST_URL);
let (proxy, _) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>()
.expect("create directory proxy");
data_repo
.add_inspect_artifacts(identity.clone(), proxy, zx::Time::from_nanos(0))
.expect("add to repo");
let false_new_component_result =
data_repo.add_new_component(identity.clone(), zx::Time::from_nanos(0));
assert!(false_new_component_result.is_ok());
// We shouldn't have overwritten the entry. There should still be an inspect
// artifacts container.
assert_eq!(
data_repo
.data_directories
.get(&identity.unique_key().into())
.unwrap()
.get_values()
.len(),
1
);
let entry =
&data_repo.data_directories.get(&identity.unique_key().into()).unwrap().get_values()[0];
assert_eq!(entry.identity.url, TEST_URL);
assert!(entry.inspect.is_some());
assert!(entry.lifecycle.is_some());
}
#[fuchsia::test]
async fn diagnostics_repo_cant_have_more_than_one_diagnostics_data_container_per_component() {
let data_repo = DataRepo::default();
let mut data_repo = data_repo.write();
let moniker = vec!["a", "b", "foo.cmx"].into();
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy { instance_id, moniker };
let identity = ComponentIdentity::from_identifier_and_url(component_id, TEST_URL);
data_repo
.add_new_component(identity.clone(), zx::Time::from_nanos(0))
.expect("insertion will succeed.");
assert_eq!(
data_repo
.data_directories
.get(&identity.unique_key().into())
.unwrap()
.get_values()
.len(),
1
);
let mutable_values = data_repo
.data_directories
.get_mut(&identity.unique_key().into())
.unwrap()
.get_values_mut();
mutable_values
.push(ComponentDiagnostics::empty(Arc::new(identity.clone()), &Default::default()));
let (proxy, _) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>()
.expect("create directory proxy");
assert!(data_repo.add_inspect_artifacts(identity, proxy, zx::Time::from_nanos(0)).is_err());
}
#[fuchsia::test]
async fn data_repo_filters_inspect_by_selectors() {
let data_repo = DataRepo::default();
let realm_path = vec!["a".to_string(), "b".to_string()];
let instance_id = "1234".to_string();
let mut moniker = realm_path.clone();
moniker.push("foo.cmx".to_string());
let component_id = ComponentIdentifier::Legacy { instance_id, moniker: moniker.into() };
let identity = ComponentIdentity::from_identifier_and_url(component_id, TEST_URL);
data_repo
.write()
.add_new_component(identity.clone(), zx::Time::from_nanos(0))
.expect("insertion will succeed.");
data_repo
.write()
.add_inspect_artifacts(
identity,
io_util::open_directory_in_namespace("/tmp", io_util::OpenFlags::RIGHT_READABLE)
.expect("open root"),
zx::Time::from_nanos(0),
)
.expect("add inspect artifacts");
let mut moniker = realm_path;
moniker.push("foo2.cmx".to_string());
let component_id2 = ComponentIdentifier::Legacy {
instance_id: "12345".to_string(),
moniker: moniker.into(),
};
let identity2 = ComponentIdentity::from_identifier_and_url(component_id2, TEST_URL);
data_repo
.write()
.add_new_component(identity2.clone(), zx::Time::from_nanos(0))
.expect("insertion will succeed.");
data_repo
.write()
.add_inspect_artifacts(
identity2,
io_util::open_directory_in_namespace("/tmp", io_util::OpenFlags::RIGHT_READABLE)
.expect("open root"),
zx::Time::from_nanos(0),
)
.expect("add inspect artifacts");
assert_eq!(2, data_repo.read().fetch_inspect_data(&None, None).len());
let selectors = Some(vec![
selectors::parse_selector::<FastError>("a/b/foo.cmx:root").expect("parse selector")
]);
assert_eq!(1, data_repo.read().fetch_inspect_data(&selectors, None).len());
let selectors = Some(vec![
selectors::parse_selector::<FastError>("a/b/f*.cmx:root").expect("parse selector")
]);
assert_eq!(2, data_repo.read().fetch_inspect_data(&selectors, None).len());
let selectors = Some(vec![
selectors::parse_selector::<FastError>("foo.cmx:root").expect("parse selector")
]);
assert_eq!(0, data_repo.read().fetch_inspect_data(&selectors, None).len());
}
#[fuchsia::test]
async fn data_repo_filters_logs_by_selectors() {
let repo = DataRepo::default();
let foo_container =
repo.write().get_log_container(ComponentIdentity::from_identifier_and_url(
ComponentIdentifier::parse_from_moniker("./foo").unwrap(),
"fuchsia-pkg://foo",
));
let bar_container =
repo.write().get_log_container(ComponentIdentity::from_identifier_and_url(
ComponentIdentifier::parse_from_moniker("./bar").unwrap(),
"fuchsia-pkg://bar",
));
foo_container.ingest_message(make_message("a", 1));
bar_container.ingest_message(make_message("b", 2));
foo_container.ingest_message(make_message("c", 3));
let stream = repo.logs_cursor(StreamMode::Snapshot, None);
let results =
stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
let filtered_stream = repo.logs_cursor(
StreamMode::Snapshot,
Some(vec![selectors::parse_selector::<FastError>("foo:root").unwrap()]),
);
let results =
filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
}
#[fuchsia::test]
async fn multiplexer_broker_cleanup() {
let repo = DataRepo::default();
let stream = repo.logs_cursor(StreamMode::SnapshotThenSubscribe, None);
assert_eq!(repo.read().logs_multiplexers.live_iterators.lock().len(), 1);
// When the multiplexer goes away it must be forgotten by the broker.
drop(stream);
loop {
fasync::Timer::new(Duration::from_millis(100)).await;
if repo.read().logs_multiplexers.live_iterators.lock().len() == 0 {
break;
}
}
}
fn make_message(msg: &str, timestamp: i64) -> StoredMessage {
let record = Record {
timestamp,
severity: StreamSeverity::Debug,
arguments: vec![
Argument { name: "pid".to_string(), value: Value::UnsignedInt(1) },
Argument { name: "tid".to_string(), value: Value::UnsignedInt(2) },
Argument { name: "message".to_string(), value: Value::Text(msg.to_string()) },
],
};
let mut buffer = Cursor::new(vec![0u8; 1024]);
let mut encoder = Encoder::new(&mut buffer);
encoder.write_record(&record).unwrap();
let encoded = &buffer.get_ref()[..buffer.position() as usize];
StoredMessage::structured(encoded, Default::default()).unwrap()
}
}