blob: aeebdb996ffbd439ac163534e6bb59c9d586b522 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
constants::MAXIMUM_CACHED_LOGS_BYTES,
container::ComponentDiagnostics,
events::types::ComponentIdentifier,
inspect::container::{InspectArtifactsContainer, UnpopulatedInspectDataContainer},
lifecycle::container::{LifecycleArtifactsContainer, LifecycleDataContainer},
logs::{
buffer::{LazyItem, MemoryBoundedBuffer},
debuglog::{DebugLog, DebugLogBridge},
error::{ForwardError, LogsError, StreamError},
interest::InterestDispatcher,
listener::{pretend_scary_listener_is_safe, Listener, ListenerError},
log_sink_request_stream_from_event,
socket::{Encoding, Forwarder, LegacyEncoding, LogMessageSocket, StructuredEncoding},
source_identity_from_event,
stats::{LogManagerStats, LogSource},
Message,
},
},
anyhow::{format_err, Error},
diagnostics_hierarchy::{trie, InspectHierarchyMatcher},
fidl::endpoints::ServiceMarker,
fidl_fuchsia_diagnostics::{self, Interest, Selector, StreamMode},
fidl_fuchsia_io::{DirectoryProxy, CLONE_FLAG_SAME_RIGHTS},
fidl_fuchsia_logger::{
LogMarker, LogRequest, LogRequestStream, LogSinkControlHandle, LogSinkMarker,
LogSinkRequest, LogSinkRequestStream,
},
fidl_fuchsia_sys2 as fsys,
fidl_fuchsia_sys_internal::{
LogConnection, LogConnectionListenerRequest, LogConnectorProxy, SourceIdentity,
},
fuchsia_async::Task,
fuchsia_inspect as inspect,
fuchsia_inspect_derive::{Inspect, WithInspect},
fuchsia_zircon as zx,
futures::channel::mpsc,
futures::prelude::*,
io_util,
parking_lot::RwLock,
selectors,
std::collections::HashMap,
std::sync::Arc,
tracing::{debug, error, trace, warn},
};
/// DataRepo holds all diagnostics data and is a singleton wrapped by multiple
/// [`pipeline::Pipeline`]s in a given Archivist instance.
#[derive(Clone)]
pub struct DataRepo {
inner: Arc<RwLock<DataRepoState>>,
}
impl std::ops::Deref for DataRepo {
type Target = RwLock<DataRepoState>;
fn deref(&self) -> &Self::Target {
&*self.inner
}
}
impl DataRepo {
pub fn new() -> Self {
DataRepo {
inner: Arc::new(RwLock::new(DataRepoState {
data_directories: trie::Trie::new(),
logs: Default::default(),
})),
}
}
pub fn with_logs_inspect(parent: &fuchsia_inspect::Node, name: impl AsRef<str>) -> Self {
let logs = LogState::default();
DataRepo {
inner: Arc::new(RwLock::new(DataRepoState {
data_directories: trie::Trie::new(),
logs: logs
.with_inspect(parent, name)
.expect("couldn't attach log stats to inspect"),
})),
}
}
}
impl DataRepo {
/// Drain the kernel's debug log. The returned future completes once
/// existing messages have been ingested.
pub async fn drain_debuglog<K>(self, klog_reader: K)
where
K: DebugLog + Send + Sync + 'static,
{
debug!("Draining debuglog.");
let component_log_stats =
{ self.read().logs.stats.get_component_log_stats("fuchsia-boot://klog") };
let mut kernel_logger = DebugLogBridge::create(klog_reader);
let mut messages = match kernel_logger.existing_logs().await {
Ok(messages) => messages,
Err(e) => {
error!(%e, "failed to read from kernel log, important logs may be missing");
return;
}
};
messages.sort_by_key(|m| m.metadata.timestamp);
for message in messages {
component_log_stats.record_log(&message);
self.ingest_message(message, LogSource::Kernel);
}
let res = kernel_logger
.listen()
.try_for_each(|message| async {
component_log_stats.clone().record_log(&message);
self.ingest_message(message, LogSource::Kernel);
Ok(())
})
.await;
if let Err(e) = res {
error!(%e, "failed to drain kernel log, important logs may be missing");
}
}
/// Drain log sink for messages sent by the archivist itself.
pub async fn drain_internal_log_sink(self, socket: zx::Socket, name: &str) {
let forwarder = self.read().logs.legacy_forwarder.clone();
// TODO(fxbug.dev/50105): Figure out how to properly populate SourceIdentity
let mut source = SourceIdentity::EMPTY;
source.component_name = Some(name.to_owned());
let source = Arc::new(source);
let log_stream = LogMessageSocket::new(socket, source, forwarder)
.expect("failed to create internal LogMessageSocket");
self.drain_messages(log_stream).await;
unreachable!();
}
/// Handle `LogConnectionListener` for the parent realm, eventually passing
/// `LogSink` connections into the manager.
pub async fn handle_log_connector(
self,
connector: LogConnectorProxy,
sender: mpsc::UnboundedSender<Task<()>>,
) {
debug!("Handling LogSink connections from appmgr.");
match connector.take_log_connection_listener().await {
Ok(Some(listener)) => {
let mut connections =
listener.into_stream().expect("getting request stream from server end");
while let Ok(Some(connection)) = connections.try_next().await {
match connection {
LogConnectionListenerRequest::OnNewConnection {
connection: LogConnection { log_request, source_identity },
control_handle: _,
} => {
let stream = log_request
.into_stream()
.expect("getting LogSinkRequestStream from serverend");
let source = Arc::new(source_identity);
sender
.unbounded_send(Task::spawn(self.clone().handle_log_sink(
stream,
source,
sender.clone(),
)))
.expect("channel is held by archivist, lasts for whole program");
}
};
}
}
Ok(None) => warn!("local realm already gave out LogConnectionListener, skipping logs"),
Err(e) => error!(%e, "error retrieving LogConnectionListener from LogConnector"),
}
}
/// Handle `LogSink` protocol on `stream`. The future returned by this
/// function will not complete before all messages on this connection are
/// processed.
pub async fn handle_log_sink(
self,
mut stream: LogSinkRequestStream,
source: Arc<SourceIdentity>,
sender: mpsc::UnboundedSender<Task<()>>,
) {
if source.component_name.is_none() {
self.read().logs.stats.record_unattributed();
}
while let Some(next) = stream.next().await {
match next {
Ok(LogSinkRequest::Connect { socket, control_handle }) => {
let forwarder = { self.read().logs.legacy_forwarder.clone() };
match LogMessageSocket::new(socket, source.clone(), forwarder) {
Ok(log_stream) => {
self.try_add_interest_listener(&source, control_handle);
let task = Task::spawn(self.clone().drain_messages(log_stream));
sender.unbounded_send(task).expect("channel alive for whole program");
}
Err(e) => {
control_handle.shutdown();
warn!(?source, %e, "error creating socket")
}
};
}
Ok(LogSinkRequest::ConnectStructured { socket, control_handle }) => {
let forwarder = { self.read().logs.structured_forwarder.clone() };
match LogMessageSocket::new_structured(socket, source.clone(), forwarder) {
Ok(log_stream) => {
self.try_add_interest_listener(&source, control_handle);
let task = Task::spawn(self.clone().drain_messages(log_stream));
sender.unbounded_send(task).expect("channel alive for whole program");
}
Err(e) => {
control_handle.shutdown();
warn!(?source, %e, "error creating socket")
}
};
}
Err(e) => error!(?source, %e, "error handling log sink"),
}
}
}
/// Drain a `LogMessageSocket` which wraps a socket from a component
/// generating logs.
async fn drain_messages<E>(self, mut log_stream: LogMessageSocket<E>)
where
E: Encoding + Unpin,
{
let component_log_stats =
{ self.read().logs.stats.get_component_log_stats(log_stream.source_url()) };
loop {
match log_stream.next().await {
Ok(message) => {
component_log_stats.record_log(&message);
self.ingest_message(message, LogSource::LogSink);
}
Err(StreamError::Closed) => return,
Err(e) => {
self.read().logs.stats.record_closed_stream();
warn!(source = ?log_stream.source_url(), %e, "closing socket");
return;
}
}
}
}
/// Add 'Interest' listener to connect the interest dispatcher to the
/// LogSinkControlHandle (weak reference) associated with the given source.
/// Interest listeners are only supported for log connections where the
/// SourceIdentity includes an attributed component name. If no component
/// name is present, this function will exit without adding any listener.
fn try_add_interest_listener(
&self,
source: &Arc<SourceIdentity>,
control_handle: LogSinkControlHandle,
) {
if source.component_name.is_none() {
return;
}
let control_handle = Arc::new(control_handle);
let event_listener = control_handle.clone();
self.write()
.logs
.interest_dispatcher
.add_interest_listener(source, Arc::downgrade(&event_listener));
// ack successful connections with 'empty' interest
// for async clients
let _ = control_handle.send_on_register_interest(Interest::EMPTY);
}
/// Handle the components v2 EventStream for attributed logs of v2
/// components.
pub async fn handle_event_stream(
self,
mut stream: fsys::EventStreamRequestStream,
sender: mpsc::UnboundedSender<Task<()>>,
) {
while let Ok(Some(request)) = stream.try_next().await {
match request {
fsys::EventStreamRequest::OnEvent { event, .. } => {
if let Err(e) = self.handle_event(event, sender.clone()) {
error!(%e, "Unable to process event");
}
}
}
}
}
/// Handle the components v2 CapabilityRequested event for attributed logs of
/// v2 components.
fn handle_event(
&self,
event: fsys::Event,
sender: mpsc::UnboundedSender<Task<()>>,
) -> Result<(), LogsError> {
let identity = source_identity_from_event(&event)?;
let stream = log_sink_request_stream_from_event(event)?;
let task = Task::spawn(self.clone().handle_log_sink(stream, identity, sender.clone()));
sender.unbounded_send(task).expect("channel is alive for whole program");
Ok(())
}
/// Spawn a task to handle requests from components reading the shared log.
pub fn handle_log(self, stream: LogRequestStream, sender: mpsc::UnboundedSender<Task<()>>) {
if let Err(e) = sender.clone().unbounded_send(Task::spawn(async move {
if let Err(e) = self.handle_log_requests(stream, sender).await {
warn!("error handling Log requests: {}", e);
}
})) {
warn!("Couldn't queue listener task: {:?}", e);
}
}
/// Handle requests to `fuchsia.logger.Log`. All request types read the
/// whole backlog from memory, `DumpLogs(Safe)` stops listening after that.
async fn handle_log_requests(
self,
mut stream: LogRequestStream,
mut sender: mpsc::UnboundedSender<Task<()>>,
) -> Result<(), LogsError> {
while let Some(request) = stream.next().await {
let request = request.map_err(|source| LogsError::HandlingRequests {
protocol: LogMarker::NAME,
source,
})?;
let (listener, options, dump_logs, selectors) = match request {
LogRequest::ListenSafe { log_listener, options, .. } => {
(log_listener, options, false, None)
}
LogRequest::DumpLogsSafe { log_listener, options, .. } => {
(log_listener, options, true, None)
}
LogRequest::ListenSafeWithSelectors {
log_listener, options, selectors, ..
} => (log_listener, options, false, Some(selectors)),
// TODO(fxbug.dev/48758) delete these methods!
LogRequest::Listen { log_listener, options, .. } => {
warn!("Use of fuchsia.logger.Log.Listen. Use ListenSafe.");
let listener = pretend_scary_listener_is_safe(log_listener)
.map_err(|source| ListenerError::AsbestosIo { source })?;
(listener, options, false, None)
}
LogRequest::DumpLogs { log_listener, options, .. } => {
warn!("Use of fuchsia.logger.Log.DumpLogs. Use DumpLogsSafe.");
let listener = pretend_scary_listener_is_safe(log_listener)
.map_err(|source| ListenerError::AsbestosIo { source })?;
(listener, options, true, None)
}
};
let listener = Listener::new(listener, options)?;
let mode =
if dump_logs { StreamMode::Snapshot } else { StreamMode::SnapshotThenSubscribe };
let logs = self.cursor(mode);
if let Some(s) = selectors {
self.write().logs.interest_dispatcher.update_selectors(s);
}
sender.send(listener.spawn(logs, dump_logs)).await.ok();
}
Ok(())
}
pub fn cursor(&self, mode: StreamMode) -> impl Stream<Item = Arc<Message>> {
self.read().logs.log_msg_buffer.cursor(mode).map(|item| match item {
LazyItem::Next(m) => m,
LazyItem::ItemsDropped(n) => Arc::new(Message::for_dropped(n)),
})
}
/// Ingest an individual log message.
fn ingest_message(&self, log_msg: Message, source: LogSource) {
let mut inner = self.write();
trace!("Ingesting {:?}", log_msg.id);
// We always record the log before pushing onto the buffer and waking listeners because
// we want to be able to see that stats are updated as soon as we receive messages in tests.
inner.logs.stats.record_log(&log_msg, source);
inner.logs.log_msg_buffer.push(log_msg);
}
/// Stop accepting new messages, ensuring that pending Cursors return Poll::Ready(None) after
/// consuming any messages received before this call.
pub fn terminate_logs(&self) {
self.write().logs.log_msg_buffer.terminate();
}
/// Initializes internal log forwarders.
pub fn forward_logs(self) {
if let Err(e) = self.init_forwarders() {
error!(%e, "couldn't forward logs");
} else {
debug!("Log forwarding initialized.");
}
}
fn init_forwarders(&self) -> Result<(), LogsError> {
let sink =
fuchsia_component::client::connect_to_service::<LogSinkMarker>().map_err(|source| {
LogsError::ConnectingToService { protocol: LogSinkMarker::NAME, source }
})?;
let mut state = self.write();
let (send, recv) = zx::Socket::create(zx::SocketOpts::DATAGRAM)
.map_err(|source| ForwardError::Create { source })?;
sink.connect(recv).map_err(|source| ForwardError::Connect { source })?;
state.logs.legacy_forwarder.init(send);
let (send, recv) = zx::Socket::create(zx::SocketOpts::DATAGRAM)
.map_err(|source| ForwardError::Create { source })?;
sink.connect_structured(recv).map_err(|source| ForwardError::Connect { source })?;
state.logs.structured_forwarder.init(send);
Ok(())
}
}
pub struct DataRepoState {
pub data_directories: trie::Trie<String, ComponentDiagnostics>,
logs: LogState,
}
#[derive(Inspect)]
struct LogState {
#[inspect(skip)]
interest_dispatcher: InterestDispatcher,
#[inspect(skip)]
legacy_forwarder: Forwarder<LegacyEncoding>,
#[inspect(skip)]
structured_forwarder: Forwarder<StructuredEncoding>,
#[inspect(rename = "buffer_stats")]
log_msg_buffer: MemoryBoundedBuffer<Message>,
stats: LogManagerStats,
inspect_node: inspect::Node,
}
impl Default for LogState {
fn default() -> Self {
LogState {
interest_dispatcher: InterestDispatcher::default(),
log_msg_buffer: MemoryBoundedBuffer::new(MAXIMUM_CACHED_LOGS_BYTES),
stats: LogManagerStats::new_detached(),
inspect_node: inspect::Node::default(),
legacy_forwarder: Forwarder::new(),
structured_forwarder: Forwarder::new(),
}
}
}
impl DataRepoState {
pub fn remove(&mut self, component_id: &ComponentIdentifier) {
self.data_directories.remove(component_id.unique_key());
}
pub fn add_new_component(
&mut self,
identifier: ComponentIdentifier,
component_url: impl Into<String>,
event_timestamp: zx::Time,
component_start_time: Option<zx::Time>,
) -> Result<(), Error> {
let relative_moniker = identifier.relative_moniker_for_selectors();
let lifecycle_artifact_container = LifecycleArtifactsContainer {
event_timestamp: event_timestamp,
component_start_time: component_start_time,
};
let key = identifier.unique_key();
let diag_repo_entry_opt = self.data_directories.get_mut(key.clone());
match diag_repo_entry_opt {
Some(diag_repo_entry) => {
let diag_repo_entry_values: &mut [ComponentDiagnostics] =
diag_repo_entry.get_values_mut();
match &mut diag_repo_entry_values[..] {
[] => {
// An entry with no values implies that the somehow we observed the
// creation of a component lower in the topology before observing this
// one. If this is the case, just instantiate as though it's our first
// time encountering this moniker segment.
self.data_directories.insert(
key,
ComponentDiagnostics {
relative_moniker: relative_moniker,
component_url: component_url.into(),
lifecycle: Some(lifecycle_artifact_container),
inspect: None,
},
)
}
[existing_diagnostics_artifact_container] => {
// Races may occur between seeing diagnostics ready and seeing
// creation lifecycle events. Handle this here.
// TODO(fxbug.dev/52047): Remove once caching handles ordering issues.
if existing_diagnostics_artifact_container.lifecycle.is_none() {
existing_diagnostics_artifact_container.lifecycle =
Some(lifecycle_artifact_container);
}
}
_ => {
return Err(format_err!(
concat!(
"Encountered a diagnostics data repository node with more",
"than one artifact container, moniker: {:?}."
),
key
));
}
}
}
// This case is expected to be the most common case. We've seen a creation
// lifecycle event and it promotes the instantiation of a new data repository entry.
None => self.data_directories.insert(
key,
ComponentDiagnostics {
relative_moniker: relative_moniker,
component_url: component_url.into(),
lifecycle: Some(lifecycle_artifact_container),
inspect: None,
},
),
}
Ok(())
}
pub fn add_inspect_artifacts(
&mut self,
identifier: ComponentIdentifier,
component_url: impl Into<String>,
directory_proxy: DirectoryProxy,
event_timestamp: zx::Time,
) -> Result<(), Error> {
let relative_moniker = identifier.relative_moniker_for_selectors();
let key = identifier.unique_key();
let inspect_container = InspectArtifactsContainer {
component_diagnostics_proxy: Arc::new(directory_proxy),
event_timestamp,
};
self.insert_inspect_artifact_container(
inspect_container,
key,
relative_moniker,
component_url.into(),
)
}
// Inserts an InspectArtifactsContainer into the data repository.
fn insert_inspect_artifact_container(
&mut self,
inspect_container: InspectArtifactsContainer,
key: Vec<String>,
relative_moniker: Vec<String>,
component_url: String,
) -> Result<(), Error> {
let diag_repo_entry_opt = self.data_directories.get_mut(key.clone());
match diag_repo_entry_opt {
Some(diag_repo_entry) => {
let diag_repo_entry_values: &mut [ComponentDiagnostics] =
diag_repo_entry.get_values_mut();
match &mut diag_repo_entry_values[..] {
[] => {
// An entry with no values implies that the somehow we observed the
// creation of a component lower in the topology before observing this
// one. If this is the case, just instantiate as though it's our first
// time encountering this moniker segment.
self.data_directories.insert(
key,
ComponentDiagnostics {
relative_moniker: relative_moniker,
component_url,
lifecycle: None,
inspect: Some(inspect_container),
},
)
}
[existing_diagnostics_artifact_container] => {
// Races may occur between synthesized and real diagnostics_ready
// events, so we must handle de-duplication here.
// TODO(fxbug.dev/52047): Remove once caching handles ordering issues.
if existing_diagnostics_artifact_container.inspect.is_none() {
// This is expected to be the most common case. We've encountered the
// diagnostics_ready event for a component that has already been
// observed to be started/existing. We now must update the diagnostics
// artifact container with the inspect artifacts that accompanied the
// diagnostics_ready event.
existing_diagnostics_artifact_container.inspect =
Some(inspect_container);
}
}
_ => {
return Err(format_err!(
concat!(
"Encountered a diagnostics data repository node with more",
"than one artifact container, moniker: {:?}."
),
key
));
}
}
}
// This case is expected to be uncommon; we've encountered a diagnostics_ready
// event before a start or existing event!
None => self.data_directories.insert(
key,
ComponentDiagnostics {
relative_moniker: relative_moniker,
component_url,
lifecycle: None,
inspect: Some(inspect_container),
},
),
}
Ok(())
}
pub fn fetch_lifecycle_event_data(&self) -> Vec<LifecycleDataContainer> {
self.data_directories.iter().fold(
Vec::new(),
|mut acc, (_, diagnostics_artifacts_container_opt)| {
match diagnostics_artifacts_container_opt {
None => acc,
Some(diagnostics_artifacts_container) => {
if let Some(lifecycle_artifacts) =
&diagnostics_artifacts_container.lifecycle
{
acc.push(LifecycleDataContainer::from_lifecycle_artifact(
lifecycle_artifacts,
diagnostics_artifacts_container.relative_moniker.clone(),
diagnostics_artifacts_container.component_url.clone(),
));
}
if let Some(inspect_artifacts) = &diagnostics_artifacts_container.inspect {
acc.push(LifecycleDataContainer::from_inspect_artifact(
inspect_artifacts,
diagnostics_artifacts_container.relative_moniker.clone(),
diagnostics_artifacts_container.component_url.clone(),
));
}
acc
}
}
},
)
}
/// Return all of the DirectoryProxies that contain Inspect hierarchies
/// which contain data that should be selected from.
pub fn fetch_inspect_data(
&self,
component_selectors: &Option<Vec<Arc<Selector>>>,
moniker_to_static_matcher_map: Option<&HashMap<String, InspectHierarchyMatcher>>,
) -> Vec<UnpopulatedInspectDataContainer> {
return self
.data_directories
.iter()
.filter_map(|(_, diagnostics_artifacts_container_opt)| {
let (diagnostics_artifacts_container, inspect_artifacts) =
match &diagnostics_artifacts_container_opt {
Some(diagnostics_artifacts_container) => {
match &diagnostics_artifacts_container.inspect {
Some(inspect_artifacts) => {
(diagnostics_artifacts_container, inspect_artifacts)
}
None => return None,
}
}
None => return None,
};
let optional_hierarchy_matcher = match moniker_to_static_matcher_map {
Some(map) => {
match map.get(&diagnostics_artifacts_container.relative_moniker.join("/")) {
Some(inspect_matcher) => Some(inspect_matcher),
// Return early if there were static selectors, and none were for this
// moniker.
None => return None,
}
}
None => None,
};
// Verify that the dynamic selectors contain an entry that applies to
// this moniker as well.
if !match component_selectors {
Some(component_selectors) => component_selectors.iter().any(|s| {
selectors::match_component_moniker_against_selector(
&diagnostics_artifacts_container.relative_moniker,
s,
)
.ok()
.unwrap_or(false)
}),
None => true,
} {
return None;
}
// This artifact contains inspect and matches a passed selector.
io_util::clone_directory(
&inspect_artifacts.component_diagnostics_proxy,
CLONE_FLAG_SAME_RIGHTS,
)
.ok()
.map(|directory| UnpopulatedInspectDataContainer {
relative_moniker: diagnostics_artifacts_container.relative_moniker.clone(),
component_url: diagnostics_artifacts_container.component_url.clone(),
component_diagnostics_proxy: directory,
inspect_matcher: optional_hierarchy_matcher.cloned(),
})
})
.collect();
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::events::types::{ComponentIdentifier, LegacyIdentifier, RealmPath},
diagnostics_hierarchy::trie::TrieIterableNode,
fidl_fuchsia_io::DirectoryMarker,
fuchsia_async as fasync, fuchsia_zircon as zx,
};
const TEST_URL: &'static str = "fuchsia-pkg://test";
#[fasync::run_singlethreaded(test)]
async fn inspect_repo_disallows_duplicated_dirs() {
let inspect_repo = DataRepo::new();
let mut inspect_repo = inspect_repo.write();
let realm_path = RealmPath(vec!["a".to_string(), "b".to_string()]);
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy(LegacyIdentifier {
instance_id,
realm_path,
component_name: "foo.cmx".into(),
});
let (proxy, _) =
fidl::endpoints::create_proxy::<DirectoryMarker>().expect("create directory proxy");
inspect_repo
.add_inspect_artifacts(component_id.clone(), TEST_URL, proxy, zx::Time::from_nanos(0))
.expect("add to repo");
let (proxy, _) =
fidl::endpoints::create_proxy::<DirectoryMarker>().expect("create directory proxy");
inspect_repo
.add_inspect_artifacts(component_id.clone(), TEST_URL, proxy, zx::Time::from_nanos(0))
.expect("add to repo");
let key = component_id.unique_key();
assert_eq!(inspect_repo.data_directories.get(key).unwrap().get_values().len(), 1);
}
#[fasync::run_singlethreaded(test)]
async fn data_repo_updates_existing_entry_to_hold_inspect_data() {
let data_repo = DataRepo::new();
let mut data_repo = data_repo.write();
let realm_path = RealmPath(vec!["a".to_string(), "b".to_string()]);
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy(LegacyIdentifier {
instance_id,
realm_path,
component_name: "foo.cmx".into(),
});
data_repo
.add_new_component(component_id.clone(), TEST_URL, zx::Time::from_nanos(0), None)
.expect("instantiated new component.");
let (proxy, _) =
fidl::endpoints::create_proxy::<DirectoryMarker>().expect("create directory proxy");
data_repo
.add_inspect_artifacts(component_id.clone(), TEST_URL, proxy, zx::Time::from_nanos(0))
.expect("add to repo");
let key = component_id.unique_key();
assert_eq!(data_repo.data_directories.get(key.clone()).unwrap().get_values().len(), 1);
let entry = &data_repo.data_directories.get(key.clone()).unwrap().get_values()[0];
assert!(entry.inspect.is_some());
assert_eq!(entry.component_url, TEST_URL);
}
#[fasync::run_singlethreaded(test)]
async fn data_repo_tolerates_duplicate_new_component_insertions() {
let data_repo = DataRepo::new();
let mut data_repo = data_repo.write();
let realm_path = RealmPath(vec!["a".to_string(), "b".to_string()]);
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy(LegacyIdentifier {
instance_id,
realm_path,
component_name: "foo.cmx".into(),
});
data_repo
.add_new_component(component_id.clone(), TEST_URL, zx::Time::from_nanos(0), None)
.expect("instantiated new component.");
let duplicate_new_component_insertion = data_repo.add_new_component(
component_id.clone(),
TEST_URL,
zx::Time::from_nanos(1),
Some(zx::Time::from_nanos(0)),
);
assert!(duplicate_new_component_insertion.is_ok());
let key = component_id.unique_key();
let repo_values = data_repo.data_directories.get(key.clone()).unwrap().get_values();
assert_eq!(repo_values.len(), 1);
let entry = &repo_values[0];
assert!(entry.lifecycle.is_some());
let lifecycle_container = entry.lifecycle.as_ref().unwrap();
assert!(lifecycle_container.component_start_time.is_none());
assert_eq!(entry.relative_moniker, component_id.relative_moniker_for_selectors());
assert_eq!(entry.component_url, TEST_URL);
}
#[fasync::run_singlethreaded(test)]
async fn running_components_provide_start_time() {
let data_repo = DataRepo::new();
let mut data_repo = data_repo.write();
let realm_path = RealmPath(vec!["a".to_string(), "b".to_string()]);
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy(LegacyIdentifier {
instance_id,
realm_path,
component_name: "foo.cmx".into(),
});
let component_insertion = data_repo.add_new_component(
component_id.clone(),
TEST_URL,
zx::Time::from_nanos(1),
Some(zx::Time::from_nanos(0)),
);
assert!(component_insertion.is_ok());
let key = component_id.unique_key();
let repo_values = data_repo.data_directories.get(key.clone()).unwrap().get_values();
assert_eq!(repo_values.len(), 1);
let entry = &repo_values[0];
assert!(entry.lifecycle.is_some());
let lifecycle_container = entry.lifecycle.as_ref().unwrap();
assert!(lifecycle_container.component_start_time.is_some());
assert_eq!(lifecycle_container.component_start_time.unwrap().into_nanos(), 0);
assert_eq!(entry.relative_moniker, component_id.relative_moniker_for_selectors());
assert_eq!(entry.component_url, TEST_URL);
}
#[fasync::run_singlethreaded(test)]
async fn data_repo_tolerant_of_new_component_calls_if_diagnostics_ready_already_processed() {
let data_repo = DataRepo::new();
let mut data_repo = data_repo.write();
let realm_path = RealmPath(vec!["a".to_string(), "b".to_string()]);
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy(LegacyIdentifier {
instance_id,
realm_path,
component_name: "foo.cmx".into(),
});
let (proxy, _) =
fidl::endpoints::create_proxy::<DirectoryMarker>().expect("create directory proxy");
data_repo
.add_inspect_artifacts(component_id.clone(), TEST_URL, proxy, zx::Time::from_nanos(0))
.expect("add to repo");
let false_new_component_result = data_repo.add_new_component(
component_id.clone(),
TEST_URL,
zx::Time::from_nanos(0),
None,
);
assert!(false_new_component_result.is_ok());
// We shouldn't have overwritten the entry. There should still be an inspect
// artifacts container.
let key = component_id.unique_key();
assert_eq!(data_repo.data_directories.get(key.clone()).unwrap().get_values().len(), 1);
let entry = &data_repo.data_directories.get(key.clone()).unwrap().get_values()[0];
assert_eq!(entry.component_url, TEST_URL);
assert!(entry.inspect.is_some());
assert!(entry.lifecycle.is_some());
}
#[fasync::run_singlethreaded(test)]
async fn diagnostics_repo_cant_have_more_than_one_diagnostics_data_container_per_component() {
let data_repo = DataRepo::new();
let mut data_repo = data_repo.write();
let realm_path = RealmPath(vec!["a".to_string(), "b".to_string()]);
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy(LegacyIdentifier {
instance_id,
realm_path,
component_name: "foo.cmx".into(),
});
data_repo
.add_new_component(component_id.clone(), TEST_URL, zx::Time::from_nanos(0), None)
.expect("insertion will succeed.");
let key = component_id.unique_key();
assert_eq!(data_repo.data_directories.get(key.clone()).unwrap().get_values().len(), 1);
let mutable_values =
data_repo.data_directories.get_mut(key.clone()).unwrap().get_values_mut();
mutable_values.push(ComponentDiagnostics {
relative_moniker: component_id.relative_moniker_for_selectors(),
component_url: TEST_URL.to_string(),
inspect: None,
lifecycle: None,
});
let (proxy, _) =
fidl::endpoints::create_proxy::<DirectoryMarker>().expect("create directory proxy");
assert!(data_repo
.add_inspect_artifacts(component_id.clone(), TEST_URL, proxy, zx::Time::from_nanos(0))
.is_err());
}
#[fasync::run_singlethreaded(test)]
async fn data_repo_filters_inspect_by_selectors() {
let data_repo = DataRepo::new();
let realm_path = RealmPath(vec!["a".to_string(), "b".to_string()]);
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy(LegacyIdentifier {
instance_id,
realm_path: realm_path.clone(),
component_name: "foo.cmx".into(),
});
data_repo
.write()
.add_new_component(component_id.clone(), TEST_URL, zx::Time::from_nanos(0), None)
.expect("insertion will succeed.");
data_repo
.write()
.add_inspect_artifacts(
component_id.clone(),
TEST_URL,
io_util::open_directory_in_namespace("/tmp", io_util::OPEN_RIGHT_READABLE)
.expect("open root"),
zx::Time::from_nanos(0),
)
.expect("add inspect artifacts");
let component_id2 = ComponentIdentifier::Legacy(LegacyIdentifier {
instance_id: "12345".to_string(),
realm_path,
component_name: "foo2.cmx".into(),
});
data_repo
.write()
.add_new_component(component_id2.clone(), TEST_URL, zx::Time::from_nanos(0), None)
.expect("insertion will succeed.");
data_repo
.write()
.add_inspect_artifacts(
component_id2.clone(),
TEST_URL,
io_util::open_directory_in_namespace("/tmp", io_util::OPEN_RIGHT_READABLE)
.expect("open root"),
zx::Time::from_nanos(0),
)
.expect("add inspect artifacts");
assert_eq!(2, data_repo.read().fetch_inspect_data(&None, None).len());
let selectors = Some(vec![Arc::new(
selectors::parse_selector("a/b/foo.cmx:root").expect("parse selector"),
)]);
assert_eq!(1, data_repo.read().fetch_inspect_data(&selectors, None).len());
let selectors = Some(vec![Arc::new(
selectors::parse_selector("a/b/f*.cmx:root").expect("parse selector"),
)]);
assert_eq!(2, data_repo.read().fetch_inspect_data(&selectors, None).len());
let selectors = Some(vec![Arc::new(
selectors::parse_selector("foo.cmx:root").expect("parse selector"),
)]);
assert_eq!(0, data_repo.read().fetch_inspect_data(&selectors, None).len());
}
}