| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| capability::CapabilitySource, |
| model::{ |
| events::{ |
| event::Event, registry::EventSubscription, source::EventSource, stream::EventStream, |
| }, |
| hooks::{ |
| EventError, EventErrorPayload, EventPayload, EventResult, EventType, HasEventType, |
| }, |
| }, |
| }, |
| cm_moniker::InstancedExtendedMoniker, |
| cm_rust::{CapabilityName, EventMode}, |
| fidl::endpoints::{ClientEnd, Proxy}, |
| fidl_fuchsia_component as fcomponent, fidl_fuchsia_io as fio, fidl_fuchsia_sys2 as fsys, |
| fuchsia_zircon as zx, |
| futures::{lock::Mutex, TryStreamExt}, |
| log::{error, info, warn}, |
| moniker::{AbsoluteMoniker, AbsoluteMonikerBase, RelativeMoniker, RelativeMonikerBase}, |
| std::sync::Arc, |
| }; |
| |
| pub async fn serve_event_source_sync( |
| event_source: EventSource, |
| stream: fsys::EventSourceRequestStream, |
| ) { |
| let result = stream |
| .try_for_each_concurrent(None, move |request| { |
| let mut event_source = event_source.clone(); |
| async move { |
| match request { |
| fsys::EventSourceRequest::Subscribe { events, stream, responder } => { |
| // Subscribe to events. |
| let requests = events |
| .into_iter() |
| .filter(|request| request.event_name.is_some()) |
| .map(|request| EventSubscription { |
| event_name: request |
| .event_name |
| .map(|name| CapabilityName::from(name)) |
| .unwrap(), |
| mode: EventMode::Async, |
| }) |
| .collect(); |
| |
| match event_source.subscribe(requests).await { |
| Ok(event_stream) => { |
| // Unblock the component |
| responder.send(&mut Ok(()))?; |
| |
| // Serve the event_stream over FIDL asynchronously |
| serve_event_stream(event_stream, stream).await; |
| } |
| Err(e) => { |
| info!("Error subscribing to events: {:?}", e); |
| responder.send(&mut Err(fcomponent::Error::ResourceUnavailable))?; |
| } |
| }; |
| } |
| fsys::EventSourceRequest::TakeStaticEventStream { path, responder, .. } => { |
| let mut result = event_source |
| .take_static_event_stream(path) |
| .await |
| .ok_or(fcomponent::Error::ResourceUnavailable); |
| responder.send(&mut result)?; |
| } |
| } |
| Ok(()) |
| } |
| }) |
| .await; |
| if let Err(e) = result { |
| error!("Error serving EventSource: {}", e); |
| } |
| } |
| |
| /// Serves EventStream FIDL requests received over the provided stream. |
| pub async fn serve_event_stream( |
| mut event_stream: EventStream, |
| client_end: ClientEnd<fsys::EventStreamMarker>, |
| ) { |
| let listener = client_end.into_proxy().expect("cannot create proxy from client_end"); |
| |
| while let Some(event) = event_stream.next().await { |
| // Create the basic Event FIDL object. |
| let event_fidl_object = match create_event_fidl_object(event).await { |
| Err(e) => { |
| warn!("Failed to create event object: {:?}", e); |
| continue; |
| } |
| Ok(res) => res, |
| }; |
| if let Err(e) = listener.on_event(event_fidl_object) { |
| // It's not an error for the client to drop the listener. |
| if !e.is_closed() { |
| warn!("Unexpected error while serving EventStream: {:?}", e); |
| } |
| return; |
| } |
| } |
| } |
| |
| async fn maybe_create_event_result( |
| event_result: &EventResult, |
| ) -> Result<Option<fsys::EventResult>, fidl::Error> { |
| match event_result { |
| Ok(EventPayload::DirectoryReady { name, node, .. }) => { |
| Ok(Some(create_directory_ready_payload(name.to_string(), node)?)) |
| } |
| Ok(EventPayload::CapabilityRequested { name, capability, .. }) => Ok(Some( |
| create_capability_requested_payload(name.to_string(), capability.clone()).await, |
| )), |
| Ok(EventPayload::CapabilityRouted { source, .. }) => { |
| Ok(Some(create_capability_routed_payload(source))) |
| } |
| Ok(EventPayload::Running { started_timestamp }) => Ok(Some(fsys::EventResult::Payload( |
| fsys::EventPayload::Running(fsys::RunningPayload { |
| started_timestamp: Some(started_timestamp.into_nanos()), |
| ..fsys::RunningPayload::EMPTY |
| }), |
| ))), |
| Ok(EventPayload::Stopped { status }) => Ok(Some(fsys::EventResult::Payload( |
| fsys::EventPayload::Stopped(fsys::StoppedPayload { |
| status: Some(status.into_raw()), |
| ..fsys::StoppedPayload::EMPTY |
| }), |
| ))), |
| Ok(payload) => Ok(maybe_create_empty_payload(payload.event_type())), |
| Err(EventError { |
| source, |
| event_error_payload: EventErrorPayload::DirectoryReady { name }, |
| }) => Ok(Some(fsys::EventResult::Error(fsys::EventError { |
| error_payload: Some(fsys::EventErrorPayload::DirectoryReady( |
| fsys::DirectoryReadyError { |
| name: Some(name.to_string()), |
| ..fsys::DirectoryReadyError::EMPTY |
| }, |
| )), |
| description: Some(format!("{}", source)), |
| ..fsys::EventError::EMPTY |
| }))), |
| Err(EventError { |
| source, |
| event_error_payload: EventErrorPayload::CapabilityRequested { name, .. }, |
| }) => Ok(Some(fsys::EventResult::Error(fsys::EventError { |
| error_payload: Some(fsys::EventErrorPayload::CapabilityRequested( |
| fsys::CapabilityRequestedError { |
| name: Some(name.to_string()), |
| ..fsys::CapabilityRequestedError::EMPTY |
| }, |
| )), |
| description: Some(format!("{}", source)), |
| ..fsys::EventError::EMPTY |
| }))), |
| Err(EventError { |
| source, |
| event_error_payload: EventErrorPayload::Running { started_timestamp }, |
| }) => Ok(Some(fsys::EventResult::Error(fsys::EventError { |
| error_payload: Some(fsys::EventErrorPayload::Running(fsys::RunningError { |
| started_timestamp: Some(started_timestamp.into_nanos()), |
| ..fsys::RunningError::EMPTY |
| })), |
| description: Some(format!("{}", source)), |
| ..fsys::EventError::EMPTY |
| }))), |
| Err(error) => Ok(maybe_create_empty_error_payload(error)), |
| } |
| } |
| |
| fn create_directory_ready_payload( |
| name: String, |
| node: &fio::NodeProxy, |
| ) -> Result<fsys::EventResult, fidl::Error> { |
| let node = { |
| let (node_clone, server_end) = fidl::endpoints::create_proxy()?; |
| node.clone(fio::CLONE_FLAG_SAME_RIGHTS, server_end)?; |
| let node_client_end = node_clone |
| .into_channel() |
| .expect("could not convert directory to channel") |
| .into_zx_channel() |
| .into(); |
| Some(node_client_end) |
| }; |
| |
| let payload = fsys::DirectoryReadyPayload { |
| name: Some(name), |
| node, |
| ..fsys::DirectoryReadyPayload::EMPTY |
| }; |
| Ok(fsys::EventResult::Payload(fsys::EventPayload::DirectoryReady(payload))) |
| } |
| |
| async fn create_capability_requested_payload( |
| name: String, |
| capability: Arc<Mutex<Option<zx::Channel>>>, |
| ) -> fsys::EventResult { |
| let capability = capability.lock().await.take(); |
| match capability { |
| Some(capability) => { |
| let payload = fsys::CapabilityRequestedPayload { |
| name: Some(name), |
| capability: Some(capability), |
| ..fsys::CapabilityRequestedPayload::EMPTY |
| }; |
| fsys::EventResult::Payload(fsys::EventPayload::CapabilityRequested(payload)) |
| } |
| None => { |
| // This can happen if a hook takes the capability prior to the events system. |
| let payload = |
| fsys::EventErrorPayload::CapabilityRequested(fsys::CapabilityRequestedError { |
| name: Some(name), |
| ..fsys::CapabilityRequestedError::EMPTY |
| }); |
| fsys::EventResult::Error(fsys::EventError { |
| error_payload: Some(payload), |
| description: Some("Capability unavailable".to_string()), |
| ..fsys::EventError::EMPTY |
| }) |
| } |
| } |
| } |
| |
| fn create_capability_routed_payload(source: &CapabilitySource) -> fsys::EventResult { |
| let name = source.source_name().map(|n| n.to_string()); |
| let payload = fsys::CapabilityRoutedPayload { name, ..fsys::CapabilityRoutedPayload::EMPTY }; |
| fsys::EventResult::Payload(fsys::EventPayload::CapabilityRouted(payload)) |
| } |
| |
| fn maybe_create_empty_payload(event_type: EventType) -> Option<fsys::EventResult> { |
| let result = match event_type { |
| EventType::Purged => { |
| fsys::EventResult::Payload(fsys::EventPayload::Purged(fsys::PurgedPayload::EMPTY)) |
| } |
| EventType::Discovered => fsys::EventResult::Payload(fsys::EventPayload::Discovered( |
| fsys::DiscoveredPayload::EMPTY, |
| )), |
| EventType::Destroyed => { |
| fsys::EventResult::Payload(fsys::EventPayload::Destroyed(fsys::DestroyedPayload::EMPTY)) |
| } |
| EventType::Resolved => { |
| fsys::EventResult::Payload(fsys::EventPayload::Resolved(fsys::ResolvedPayload::EMPTY)) |
| } |
| EventType::Started => { |
| fsys::EventResult::Payload(fsys::EventPayload::Started(fsys::StartedPayload::EMPTY)) |
| } |
| _ => fsys::EventResult::unknown(999, Default::default()), |
| }; |
| Some(result) |
| } |
| |
| fn maybe_create_empty_error_payload(error: &EventError) -> Option<fsys::EventResult> { |
| let error_payload = match error.event_type() { |
| EventType::Purged => fsys::EventErrorPayload::Purged(fsys::PurgedError::EMPTY), |
| EventType::Discovered => fsys::EventErrorPayload::Discovered(fsys::DiscoveredError::EMPTY), |
| EventType::Destroyed => fsys::EventErrorPayload::Destroyed(fsys::DestroyedError::EMPTY), |
| EventType::Resolved => fsys::EventErrorPayload::Resolved(fsys::ResolvedError::EMPTY), |
| EventType::Started => fsys::EventErrorPayload::Started(fsys::StartedError::EMPTY), |
| EventType::Stopped => fsys::EventErrorPayload::Stopped(fsys::StoppedError::EMPTY), |
| _ => fsys::EventErrorPayload::unknown(999, Default::default()), |
| }; |
| Some(fsys::EventResult::Error(fsys::EventError { |
| error_payload: Some(error_payload), |
| description: Some(format!("{}", error.source)), |
| ..fsys::EventError::EMPTY |
| })) |
| } |
| |
| /// Creates the basic FIDL Event object |
| async fn create_event_fidl_object(event: Event) -> Result<fsys::Event, fidl::Error> { |
| let moniker_string = match (&event.event.target_moniker, &event.scope_moniker) { |
| (moniker @ InstancedExtendedMoniker::ComponentManager, _) => moniker.to_string(), |
| ( |
| InstancedExtendedMoniker::ComponentInstance(target), |
| InstancedExtendedMoniker::ComponentManager, |
| ) => { |
| RelativeMoniker::from_absolute(&AbsoluteMoniker::root(), &target.to_absolute_moniker()) |
| .to_string() |
| } |
| ( |
| InstancedExtendedMoniker::ComponentInstance(target), |
| InstancedExtendedMoniker::ComponentInstance(scope), |
| ) => RelativeMoniker::from_absolute::<AbsoluteMoniker>( |
| &scope.to_absolute_moniker(), |
| &target.to_absolute_moniker(), |
| ) |
| .to_string(), |
| }; |
| let header = Some(fsys::EventHeader { |
| event_type: Some(event.event.event_type().into()), |
| moniker: Some(moniker_string), |
| component_url: Some(event.event.component_url.clone()), |
| timestamp: Some(event.event.timestamp.into_nanos()), |
| ..fsys::EventHeader::EMPTY |
| }); |
| let event_result = maybe_create_event_result(&event.event.result).await?; |
| Ok(fsys::Event { header, event_result, ..fsys::Event::EMPTY }) |
| } |