blob: f12a12312a540c7ece5cb12dfe6c7a4bd5037954 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
events::error::{EventError, MonikerError},
fidl::endpoints::{ServerEnd, ServiceMarker},
fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequestStream},
fidl_fuchsia_sys2::{self as fsys, Event, EventHeader},
fuchsia_zircon as zx,
futures::{channel::mpsc, stream::BoxStream},
std::{convert::TryFrom, ops::Deref},
pub trait EventSource: Sync + Send {
async fn listen(&mut self, sender: mpsc::Sender<ComponentEvent>) -> Result<(), EventError>;
/// The capacity for bounded channels used by this implementation.
pub static CHANNEL_CAPACITY: usize = 1024;
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct Moniker(Vec<String>);
impl Deref for Moniker {
type Target = Vec<String>;
fn deref(&self) -> &Self::Target {
impl Into<Moniker> for Vec<&str> {
fn into(self) -> Moniker {
Moniker(self.into_iter().map(|s| s.to_string()).collect())
impl Into<Moniker> for Vec<String> {
fn into(self) -> Moniker {
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UniqueKey(Vec<String>);
impl Deref for UniqueKey {
type Target = Vec<String>;
fn deref(&self) -> &Self::Target {
impl Into<UniqueKey> for Vec<&str> {
fn into(self) -> UniqueKey {
UniqueKey(self.into_iter().map(|s| s.to_string()).collect())
/// Represents the ID of a component.
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub enum ComponentIdentifier {
Legacy {
/// The realm path plus the name of the component.
moniker: Moniker,
/// The instance ID of the component.
instance_id: String,
impl ComponentIdentifier {
/// Returns the relative moniker to be used for selectors.
/// For legacy components (v1), this is the relative moniker with respect to the root realm.
pub fn relative_moniker_for_selectors(&self) -> Moniker {
match self {
Self::Legacy { moniker, .. } => moniker.clone(),
Self::Moniker(segments) => {
if segments.is_empty() {
} else {
Moniker(segments.iter().map(|s| s.to_string()).collect())
pub fn unique_key(&self) -> UniqueKey {
match self {
Self::Legacy { instance_id, .. } => {
let mut key = self.relative_moniker_for_selectors().0;
Self::Moniker(segments) => {
let mut key = vec![];
for segment in segments {
pub fn parse_from_moniker(moniker: &str) -> Result<Self, MonikerError> {
if moniker == "<component_manager>" {
return Ok(ComponentIdentifier::Moniker(vec![MonikerSegment {
collection: None,
name: "<component_manager>".to_string(),
instance_id: "0".to_string(),
if moniker == "." {
return Ok(ComponentIdentifier::Moniker(vec![]));
let without_root = moniker
.ok_or_else(|| MonikerError::InvalidMonikerPrefix(moniker.to_string()))?;
let mut segments = vec![];
for raw_segment in without_root.split("/") {
let mut parts = raw_segment.split(":");
let segment = match (,, {
// we have a collection, a component name, and an instance id
(Some(c), Some(n), Some(i)) => MonikerSegment {
collection: Some(c.to_string()),
name: n.to_string(),
instance_id: i.to_string(),
// we have a name and an instance id, no collection
(Some(n), Some(i), None) => MonikerSegment {
collection: None,
name: n.to_string(),
instance_id: i.to_string(),
_ => return Err(MonikerError::InvalidSegment(raw_segment.to_string())),
impl std::fmt::Display for ComponentIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Legacy { moniker, instance_id } => {
for (i, segment) in moniker.iter().enumerate() {
if i > 0 {
write!(f, "/")?;
write!(f, "{}", segment)?;
write!(f, ":{}", instance_id)
Self::Moniker(segments) => {
if segments.is_empty() {
write!(f, ".")
} else {
for (i, segment) in segments.iter().enumerate() {
if i > 0 {
write!(f, "/")?;
write!(f, "{}", segment)?;
/// A single segment in the moniker of a component.
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub struct MonikerSegment {
/// The name of the component's collection, if any.
pub collection: Option<String>,
/// The name of the component.
pub name: String,
/// The instance of the component.
pub instance_id: String,
impl std::fmt::Display for MonikerSegment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(collection) = &self.collection {
write!(f, "{}:", collection)?;
write!(f, "{}",
#[derive(Debug, Clone, ValidFidlTable, PartialEq)]
pub struct ValidatedSourceIdentity {
pub realm_path: Vec<String>,
pub component_url: String,
pub component_name: String,
pub instance_id: String,
#[derive(Debug, ValidFidlTable)]
pub struct ValidatedEventHeader {
pub event_type: fsys::EventType,
pub component_url: String,
pub moniker: String,
pub timestamp: i64,
#[derive(Debug, ValidFidlTable)]
pub struct ValidatedEvent {
/// Information about the component for which this event was generated.
pub header: ValidatedEventHeader,
/// Optional payload for some event types
pub event_result: Option<fsys::EventResult>,
/// The ID of a component as used in components V1.
/// Represents the shared data associated with
/// all component events.
#[derive(Debug, PartialEq, Clone)]
pub struct EventMetadata {
pub identity: ComponentIdentity,
pub timestamp: zx::Time,
impl EventMetadata {
pub fn new(identity: ComponentIdentity) -> Self {
Self { identity, timestamp: zx::Time::get_monotonic() }
impl TryFrom<SourceIdentity> for EventMetadata {
type Error = EventError;
fn try_from(component: SourceIdentity) -> Result<Self, Self::Error> {
Ok(EventMetadata {
identity: ComponentIdentity::try_from(component)?,
timestamp: zx::Time::get_monotonic(),
/// Represents the diagnostics data associated
/// with a component being observed starting.
#[derive(Debug, PartialEq)]
pub struct StartEvent {
pub metadata: EventMetadata,
/// Represents the diagnostics data associated
/// with a component being observed running.
#[derive(Debug, PartialEq)]
pub struct RunningEvent {
pub metadata: EventMetadata,
pub component_start_time: zx::Time,
/// Represents the diagnostics data associated
/// with a component being observed stopping.
#[derive(Debug, PartialEq)]
pub struct StopEvent {
pub metadata: EventMetadata,
/// Represents the diagnostics data associated
/// with a new Diagnostics Directory being
/// made available.
pub struct DiagnosticsReadyEvent {
pub metadata: EventMetadata,
/// Proxy to the inspect data host.
pub directory: Option<DirectoryProxy>,
/// A new incoming connection to `LogSink`.
pub struct LogSinkRequestedEvent {
pub metadata: EventMetadata,
pub requests: LogSinkRequestStream,
impl LogSinkRequestedEvent {
fn new(event: ValidatedEvent, metadata: EventMetadata) -> Result<Self, EventError> {
let payload = event.event_result.ok_or(EventError::MissingField("event_result")).and_then(
|result| match result {
fsys::EventResult::Payload(fsys::EventPayload::CapabilityRequested(payload)) => {
fsys::EventResult::Error(fsys::EventError {
description: Some(description),
}) => Err(EventError::ReceivedError { description }),
result => Err(EventError::UnrecognizedResult { result }),
let capability_name ="name"))?;
if &capability_name != LogSinkMarker::NAME {
Err(EventError::IncorrectName {
received: capability_name,
expected: LogSinkMarker::NAME,
let capability = payload.capability.ok_or(EventError::MissingField("capability"))?;
let requests = ServerEnd::<LogSinkMarker>::new(capability)
.map_err(|source| EventError::InvalidServerEnd { source })?;
Ok(Self { metadata, requests })
impl std::fmt::Debug for LogSinkRequestedEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LogSinkRequestedEvent").field("metadata", &self.metadata).finish()
pub type ComponentEventChannel = mpsc::Sender<ComponentEvent>;
/// A stream of |ComponentEvent|s
pub type ComponentEventStream = BoxStream<'static, ComponentEvent>;
/// An event that occurred to a component.
pub enum ComponentEvent {
/// We observed the component starting.
/// We observed the component already started.
/// We observed the component stopping.
/// We observed the creation of a new `out` directory.
/// We received a new connection to `LogSink`.
/// Data associated with a component.
/// This data is stored by data collectors and passed by the collectors to processors.
pub enum InspectData {
/// Empty data, for testing.
/// A VMO containing data associated with the event.
/// A file containing data associated with the event.
/// Because we can't synchronously retrieve file contents like we can for VMOs, this holds
/// the full file contents. Future changes should make streaming ingestion feasible.
/// A connection to a Tree service and a handle to the root hierarchy VMO. This VMO is what a
/// root.inspect file would contain and the result of calling Tree#GetContent. We hold to it
/// so that we can use it when the component is removed, at which point calling the Tree
/// service is not an option.
Tree(TreeProxy, Option<zx::Vmo>),
/// A connection to the deprecated Inspect service.
impl TryFrom<Event> for ComponentEvent {
type Error = EventError;
fn try_from(event: Event) -> Result<ComponentEvent, Self::Error> {
let event: ValidatedEvent = ValidatedEvent::try_from(event)?;
let metadata = EventMetadata {
identity: ComponentIdentity::from_identifier_and_url(
timestamp: zx::Time::from_nanos(event.header.timestamp),
match event.header.event_type {
fsys::EventType::Started => {
let start_event = StartEvent { metadata };
fsys::EventType::Stopped => {
let stop_event = StopEvent { metadata };
fsys::EventType::CapabilityReady | fsys::EventType::Running => {
construct_payload_holding_component_event(event, metadata)
fsys::EventType::CapabilityRequested => {
Ok(ComponentEvent::LogSinkRequested(LogSinkRequestedEvent::new(event, metadata)?))
_ => Err(EventError::InvalidEventType { ty: event.header.event_type }),
fn construct_payload_holding_component_event(
event: ValidatedEvent,
shared_data: EventMetadata,
) -> Result<ComponentEvent, EventError> {
match event.event_result {
Some(result) => {
match result {
)) => {
let name ="name"))?;
if name == "diagnostics" {
match capability_ready.node {
Some(node) => {
let diagnostics_ready_event = DiagnosticsReadyEvent {
metadata: shared_data,
directory: io_util::node_to_directory(node.into_proxy()?).ok(),
None => Err(EventError::MissingDiagnosticsDir),
} else {
Err(EventError::IncorrectName { received: name, expected: "diagnostics" })
fsys::EventResult::Payload(fsys::EventPayload::Running(payload)) => {
match payload.started_timestamp {
Some(timestamp) => {
let existing_data = RunningEvent {
metadata: shared_data,
component_start_time: zx::Time::from_nanos(timestamp),
None => Err(EventError::MissingStartTimestamp),
fsys::EventResult::Error(fsys::EventError {
description: Some(description),
}) => {
// TODO( result.error carries information about errors that happened
// in component_manager. We should dump those in diagnostics.
Err(EventError::ReceivedError { description })
result => Err(EventError::UnrecognizedResult { result }),
None => Err(EventError::MissingPayload { event }),
impl PartialEq for ComponentEvent {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(ComponentEvent::Start(a), ComponentEvent::Start(b)) => {
return a == b;
(ComponentEvent::Stop(a), ComponentEvent::Stop(b)) => {
return a == b;
(ComponentEvent::DiagnosticsReady(a), ComponentEvent::DiagnosticsReady(b)) => {
return a == b;
(ComponentEvent::Running(a), ComponentEvent::Running(b)) => {
return a == b;
// we can't check two LogSinkRequested events for equality because they have channels
_ => false,
// Requires a custom partial_eq due to the presence of a directory proxy.
// Two events with the same metadata and different directory proxies
// will be considered the same.
impl PartialEq for DiagnosticsReadyEvent {
fn eq(&self, other: &Self) -> bool {
self.metadata == other.metadata
mod tests {
use super::*;
use crate::logs::testing::create_log_sink_requested_event;
use std::convert::TryInto;
fn convert_v2_moniker_for_diagnostics() {
let identifier = ComponentIdentifier::parse_from_moniker("./a:0").unwrap();
assert_eq!(identifier.relative_moniker_for_selectors(), vec!["a"].into());
assert_eq!(identifier.unique_key(), vec!["a", "0"].into());
let identifier = ComponentIdentifier::parse_from_moniker("./a:0/b:1").unwrap();
assert_eq!(identifier.relative_moniker_for_selectors(), vec!["a", "b"].into());
assert_eq!(identifier.unique_key(), vec!["a", "0", "b", "1"].into());
let identifier = ComponentIdentifier::parse_from_moniker("./a:0/coll:comp:1/b:0").unwrap();
assert_eq!(identifier.relative_moniker_for_selectors(), vec!["a", "coll:comp", "b"].into());
assert_eq!(identifier.unique_key(), vec!["a", "0", "coll:comp", "1", "b", "0"].into());
let identifier = ComponentIdentifier::parse_from_moniker(".").unwrap();
#[fuchsia_async::run_singlethreaded(test)] // we need an executor for the fidl types
async fn validate_logsink_requested_event() {
let target_moniker = "./foo:0";
let target_url = "".to_string();
let (_log_sink_proxy, log_sink_server_end) =
let raw_event = create_log_sink_requested_event(
let event = match raw_event.try_into().unwrap() {
ComponentEvent::LogSinkRequested(e) => e,
other => panic!("incorrect event type received: {:?}", other),