blob: f12a12312a540c7ece5cb12dfe6c7a4bd5037954 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
container::ComponentIdentity,
events::error::{EventError, MonikerError},
},
async_trait::async_trait,
fidl::endpoints::{ServerEnd, ServiceMarker},
fidl_fuchsia_inspect::TreeProxy,
fidl_fuchsia_inspect_deprecated::InspectProxy,
fidl_fuchsia_io::DirectoryProxy,
fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequestStream},
fidl_fuchsia_sys2::{self as fsys, Event, EventHeader},
fidl_fuchsia_sys_internal::SourceIdentity,
fidl_table_validation::*,
fuchsia_zircon as zx,
futures::{channel::mpsc, stream::BoxStream},
io_util,
std::{convert::TryFrom, ops::Deref},
};
#[async_trait]
pub trait EventSource: Sync + Send {
async fn listen(&mut self, sender: mpsc::Sender<ComponentEvent>) -> Result<(), EventError>;
}
/// The capacity for bounded channels used by this implementation.
pub static CHANNEL_CAPACITY: usize = 1024;
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct Moniker(Vec<String>);
impl Deref for Moniker {
type Target = Vec<String>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Into<Moniker> for Vec<&str> {
fn into(self) -> Moniker {
Moniker(self.into_iter().map(|s| s.to_string()).collect())
}
}
impl Into<Moniker> for Vec<String> {
fn into(self) -> Moniker {
Moniker(self)
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UniqueKey(Vec<String>);
impl Deref for UniqueKey {
type Target = Vec<String>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Into<UniqueKey> for Vec<&str> {
fn into(self) -> UniqueKey {
UniqueKey(self.into_iter().map(|s| s.to_string()).collect())
}
}
/// Represents the ID of a component.
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub enum ComponentIdentifier {
Legacy {
/// The realm path plus the name of the component.
moniker: Moniker,
/// The instance ID of the component.
instance_id: String,
},
Moniker(Vec<MonikerSegment>),
}
impl ComponentIdentifier {
/// Returns the relative moniker to be used for selectors.
/// For legacy components (v1), this is the relative moniker with respect to the root realm.
pub fn relative_moniker_for_selectors(&self) -> Moniker {
match self {
Self::Legacy { moniker, .. } => moniker.clone(),
Self::Moniker(segments) => {
if segments.is_empty() {
Moniker(vec![])
} else {
Moniker(segments.iter().map(|s| s.to_string()).collect())
}
}
}
}
pub fn unique_key(&self) -> UniqueKey {
match self {
Self::Legacy { instance_id, .. } => {
let mut key = self.relative_moniker_for_selectors().0;
key.push(instance_id.clone());
UniqueKey(key)
}
Self::Moniker(segments) => {
let mut key = vec![];
for segment in segments {
key.push(segment.to_string());
key.push(segment.instance_id.clone());
}
UniqueKey(key)
}
}
}
pub fn parse_from_moniker(moniker: &str) -> Result<Self, MonikerError> {
if moniker == "<component_manager>" {
return Ok(ComponentIdentifier::Moniker(vec![MonikerSegment {
collection: None,
name: "<component_manager>".to_string(),
instance_id: "0".to_string(),
}]));
}
if moniker == "." {
return Ok(ComponentIdentifier::Moniker(vec![]));
}
let without_root = moniker
.strip_prefix("./")
.ok_or_else(|| MonikerError::InvalidMonikerPrefix(moniker.to_string()))?;
let mut segments = vec![];
for raw_segment in without_root.split("/") {
let mut parts = raw_segment.split(":");
let segment = match (parts.next(), parts.next(), parts.next()) {
// we have a collection, a component name, and an instance id
(Some(c), Some(n), Some(i)) => MonikerSegment {
collection: Some(c.to_string()),
name: n.to_string(),
instance_id: i.to_string(),
},
// we have a name and an instance id, no collection
(Some(n), Some(i), None) => MonikerSegment {
collection: None,
name: n.to_string(),
instance_id: i.to_string(),
},
_ => return Err(MonikerError::InvalidSegment(raw_segment.to_string())),
};
segments.push(segment);
}
Ok(ComponentIdentifier::Moniker(segments))
}
}
impl std::fmt::Display for ComponentIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Legacy { moniker, instance_id } => {
for (i, segment) in moniker.iter().enumerate() {
if i > 0 {
write!(f, "/")?;
}
write!(f, "{}", segment)?;
}
write!(f, ":{}", instance_id)
}
Self::Moniker(segments) => {
if segments.is_empty() {
write!(f, ".")
} else {
for (i, segment) in segments.iter().enumerate() {
if i > 0 {
write!(f, "/")?;
}
write!(f, "{}", segment)?;
}
Ok(())
}
}
}
}
}
/// A single segment in the moniker of a component.
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub struct MonikerSegment {
/// The name of the component's collection, if any.
pub collection: Option<String>,
/// The name of the component.
pub name: String,
/// The instance of the component.
pub instance_id: String,
}
impl std::fmt::Display for MonikerSegment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(collection) = &self.collection {
write!(f, "{}:", collection)?;
}
write!(f, "{}", self.name)
}
}
#[derive(Debug, Clone, ValidFidlTable, PartialEq)]
#[fidl_table_src(SourceIdentity)]
pub struct ValidatedSourceIdentity {
pub realm_path: Vec<String>,
pub component_url: String,
pub component_name: String,
pub instance_id: String,
}
#[derive(Debug, ValidFidlTable)]
#[fidl_table_src(EventHeader)]
pub struct ValidatedEventHeader {
pub event_type: fsys::EventType,
pub component_url: String,
pub moniker: String,
pub timestamp: i64,
}
#[derive(Debug, ValidFidlTable)]
#[fidl_table_src(Event)]
pub struct ValidatedEvent {
/// Information about the component for which this event was generated.
pub header: ValidatedEventHeader,
/// Optional payload for some event types
#[fidl_field_type(optional)]
pub event_result: Option<fsys::EventResult>,
}
/// The ID of a component as used in components V1.
/// Represents the shared data associated with
/// all component events.
#[derive(Debug, PartialEq, Clone)]
pub struct EventMetadata {
pub identity: ComponentIdentity,
pub timestamp: zx::Time,
}
impl EventMetadata {
pub fn new(identity: ComponentIdentity) -> Self {
Self { identity, timestamp: zx::Time::get_monotonic() }
}
}
impl TryFrom<SourceIdentity> for EventMetadata {
type Error = EventError;
fn try_from(component: SourceIdentity) -> Result<Self, Self::Error> {
Ok(EventMetadata {
identity: ComponentIdentity::try_from(component)?,
timestamp: zx::Time::get_monotonic(),
})
}
}
/// Represents the diagnostics data associated
/// with a component being observed starting.
#[derive(Debug, PartialEq)]
pub struct StartEvent {
pub metadata: EventMetadata,
}
/// Represents the diagnostics data associated
/// with a component being observed running.
#[derive(Debug, PartialEq)]
pub struct RunningEvent {
pub metadata: EventMetadata,
pub component_start_time: zx::Time,
}
/// Represents the diagnostics data associated
/// with a component being observed stopping.
#[derive(Debug, PartialEq)]
pub struct StopEvent {
pub metadata: EventMetadata,
}
/// Represents the diagnostics data associated
/// with a new Diagnostics Directory being
/// made available.
#[derive(Debug)]
pub struct DiagnosticsReadyEvent {
pub metadata: EventMetadata,
/// Proxy to the inspect data host.
pub directory: Option<DirectoryProxy>,
}
/// A new incoming connection to `LogSink`.
pub struct LogSinkRequestedEvent {
pub metadata: EventMetadata,
pub requests: LogSinkRequestStream,
}
impl LogSinkRequestedEvent {
fn new(event: ValidatedEvent, metadata: EventMetadata) -> Result<Self, EventError> {
let payload = event.event_result.ok_or(EventError::MissingField("event_result")).and_then(
|result| match result {
fsys::EventResult::Payload(fsys::EventPayload::CapabilityRequested(payload)) => {
Ok(payload)
}
fsys::EventResult::Error(fsys::EventError {
description: Some(description),
..
}) => Err(EventError::ReceivedError { description }),
result => Err(EventError::UnrecognizedResult { result }),
},
)?;
let capability_name = payload.name.ok_or(EventError::MissingField("name"))?;
if &capability_name != LogSinkMarker::NAME {
Err(EventError::IncorrectName {
received: capability_name,
expected: LogSinkMarker::NAME,
})?;
}
let capability = payload.capability.ok_or(EventError::MissingField("capability"))?;
let requests = ServerEnd::<LogSinkMarker>::new(capability)
.into_stream()
.map_err(|source| EventError::InvalidServerEnd { source })?;
Ok(Self { metadata, requests })
}
}
impl std::fmt::Debug for LogSinkRequestedEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LogSinkRequestedEvent").field("metadata", &self.metadata).finish()
}
}
pub type ComponentEventChannel = mpsc::Sender<ComponentEvent>;
/// A stream of |ComponentEvent|s
pub type ComponentEventStream = BoxStream<'static, ComponentEvent>;
/// An event that occurred to a component.
#[derive(Debug)]
pub enum ComponentEvent {
/// We observed the component starting.
Start(StartEvent),
/// We observed the component already started.
Running(RunningEvent),
/// We observed the component stopping.
Stop(StopEvent),
/// We observed the creation of a new `out` directory.
DiagnosticsReady(DiagnosticsReadyEvent),
/// We received a new connection to `LogSink`.
LogSinkRequested(LogSinkRequestedEvent),
}
/// Data associated with a component.
/// This data is stored by data collectors and passed by the collectors to processors.
#[derive(Debug)]
pub enum InspectData {
/// Empty data, for testing.
Empty,
/// A VMO containing data associated with the event.
Vmo(zx::Vmo),
/// A file containing data associated with the event.
///
/// Because we can't synchronously retrieve file contents like we can for VMOs, this holds
/// the full file contents. Future changes should make streaming ingestion feasible.
File(Vec<u8>),
/// A connection to a Tree service and a handle to the root hierarchy VMO. This VMO is what a
/// root.inspect file would contain and the result of calling Tree#GetContent. We hold to it
/// so that we can use it when the component is removed, at which point calling the Tree
/// service is not an option.
Tree(TreeProxy, Option<zx::Vmo>),
/// A connection to the deprecated Inspect service.
DeprecatedFidl(InspectProxy),
}
impl TryFrom<Event> for ComponentEvent {
type Error = EventError;
fn try_from(event: Event) -> Result<ComponentEvent, Self::Error> {
let event: ValidatedEvent = ValidatedEvent::try_from(event)?;
let metadata = EventMetadata {
identity: ComponentIdentity::from_identifier_and_url(
&ComponentIdentifier::parse_from_moniker(&event.header.moniker)?,
&event.header.component_url,
),
timestamp: zx::Time::from_nanos(event.header.timestamp),
};
match event.header.event_type {
fsys::EventType::Started => {
let start_event = StartEvent { metadata };
Ok(ComponentEvent::Start(start_event))
}
fsys::EventType::Stopped => {
let stop_event = StopEvent { metadata };
Ok(ComponentEvent::Stop(stop_event))
}
fsys::EventType::CapabilityReady | fsys::EventType::Running => {
construct_payload_holding_component_event(event, metadata)
}
fsys::EventType::CapabilityRequested => {
Ok(ComponentEvent::LogSinkRequested(LogSinkRequestedEvent::new(event, metadata)?))
}
_ => Err(EventError::InvalidEventType { ty: event.header.event_type }),
}
}
}
fn construct_payload_holding_component_event(
event: ValidatedEvent,
shared_data: EventMetadata,
) -> Result<ComponentEvent, EventError> {
match event.event_result {
Some(result) => {
match result {
fsys::EventResult::Payload(fsys::EventPayload::CapabilityReady(
capability_ready,
)) => {
let name = capability_ready.name.ok_or(EventError::MissingField("name"))?;
if name == "diagnostics" {
match capability_ready.node {
Some(node) => {
let diagnostics_ready_event = DiagnosticsReadyEvent {
metadata: shared_data,
directory: io_util::node_to_directory(node.into_proxy()?).ok(),
};
Ok(ComponentEvent::DiagnosticsReady(diagnostics_ready_event))
}
None => Err(EventError::MissingDiagnosticsDir),
}
} else {
Err(EventError::IncorrectName { received: name, expected: "diagnostics" })
}
}
fsys::EventResult::Payload(fsys::EventPayload::Running(payload)) => {
match payload.started_timestamp {
Some(timestamp) => {
let existing_data = RunningEvent {
metadata: shared_data,
component_start_time: zx::Time::from_nanos(timestamp),
};
Ok(ComponentEvent::Running(existing_data))
}
None => Err(EventError::MissingStartTimestamp),
}
}
fsys::EventResult::Error(fsys::EventError {
description: Some(description),
..
}) => {
// TODO(fxbug.dev/53903): result.error carries information about errors that happened
// in component_manager. We should dump those in diagnostics.
Err(EventError::ReceivedError { description })
}
result => Err(EventError::UnrecognizedResult { result }),
}
}
None => Err(EventError::MissingPayload { event }),
}
}
impl PartialEq for ComponentEvent {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(ComponentEvent::Start(a), ComponentEvent::Start(b)) => {
return a == b;
}
(ComponentEvent::Stop(a), ComponentEvent::Stop(b)) => {
return a == b;
}
(ComponentEvent::DiagnosticsReady(a), ComponentEvent::DiagnosticsReady(b)) => {
return a == b;
}
(ComponentEvent::Running(a), ComponentEvent::Running(b)) => {
return a == b;
}
// we can't check two LogSinkRequested events for equality because they have channels
_ => false,
}
}
}
// Requires a custom partial_eq due to the presence of a directory proxy.
// Two events with the same metadata and different directory proxies
// will be considered the same.
impl PartialEq for DiagnosticsReadyEvent {
fn eq(&self, other: &Self) -> bool {
self.metadata == other.metadata
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::logs::testing::create_log_sink_requested_event;
use std::convert::TryInto;
#[test]
fn convert_v2_moniker_for_diagnostics() {
let identifier = ComponentIdentifier::parse_from_moniker("./a:0").unwrap();
assert_eq!(identifier.relative_moniker_for_selectors(), vec!["a"].into());
assert_eq!(identifier.unique_key(), vec!["a", "0"].into());
let identifier = ComponentIdentifier::parse_from_moniker("./a:0/b:1").unwrap();
assert_eq!(identifier.relative_moniker_for_selectors(), vec!["a", "b"].into());
assert_eq!(identifier.unique_key(), vec!["a", "0", "b", "1"].into());
let identifier = ComponentIdentifier::parse_from_moniker("./a:0/coll:comp:1/b:0").unwrap();
assert_eq!(identifier.relative_moniker_for_selectors(), vec!["a", "coll:comp", "b"].into());
assert_eq!(identifier.unique_key(), vec!["a", "0", "coll:comp", "1", "b", "0"].into());
let identifier = ComponentIdentifier::parse_from_moniker(".").unwrap();
assert!(identifier.relative_moniker_for_selectors().is_empty());
assert!(identifier.unique_key().is_empty());
}
#[fuchsia_async::run_singlethreaded(test)] // we need an executor for the fidl types
async fn validate_logsink_requested_event() {
let target_moniker = "./foo:0";
let target_url = "http://foo.com".to_string();
let (_log_sink_proxy, log_sink_server_end) =
fidl::endpoints::create_proxy::<LogSinkMarker>().unwrap();
let raw_event = create_log_sink_requested_event(
target_moniker.to_owned(),
target_url.clone(),
log_sink_server_end.into_channel(),
);
let event = match raw_event.try_into().unwrap() {
ComponentEvent::LogSinkRequested(e) => e,
other => panic!("incorrect event type received: {:?}", other),
};
assert_eq!(
event.metadata.identity,
ComponentIdentity::from_identifier_and_url(
&ComponentIdentifier::parse_from_moniker(target_moniker).unwrap(),
target_url
)
);
}
}