| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. |
| |
| use super::message::Message; |
| use fidl::endpoints::ClientEnd; |
| use fidl_fuchsia_logger::{ |
| LogFilterOptions, LogListenerSafeMarker, LogListenerSafeProxy, LogMessage, |
| }; |
| use fuchsia_async::Task; |
| use futures::prelude::*; |
| use std::{sync::Arc, task::Poll}; |
| use thiserror::Error; |
| use tracing::{debug, error, trace}; |
| |
| mod asbestos; |
| mod filter; |
| |
| pub use asbestos::pretend_scary_listener_is_safe; |
| use filter::MessageFilter; |
| |
| /// An individual log listener. Wraps the FIDL type `LogListenerProxy` in filtering options provided |
| /// when connecting. |
| pub struct Listener { |
| listener: LogListenerSafeProxy, |
| filter: MessageFilter, |
| status: Status, |
| } |
| |
| #[derive(PartialEq)] |
| enum Status { |
| Fine, |
| Stale, |
| } |
| |
| impl Listener { |
| /// Create a new `Listener`. Fails if `client` can't be converted into a `LogListenerProxy` or |
| /// if `LogFilterOptions` are invalid. |
| pub fn new( |
| log_listener: ClientEnd<LogListenerSafeMarker>, |
| options: Option<Box<LogFilterOptions>>, |
| ) -> Result<Self, ListenerError> { |
| debug!("New listener with options {:?}", &options); |
| Ok(Self { |
| status: Status::Fine, |
| listener: log_listener |
| .into_proxy() |
| .map_err(|source| ListenerError::CreatingListenerProxy { source })?, |
| filter: MessageFilter::new(options)?, |
| }) |
| } |
| |
| pub fn spawn( |
| self, |
| logs: impl Stream<Item = Arc<Message>> + Send + Unpin + 'static, |
| call_done: bool, |
| ) -> Task<()> { |
| Task::spawn(async move { self.run(logs, call_done).await }) |
| } |
| |
| /// Send messages to the listener. First eagerly collects any backlog and sends it out in |
| /// batches before waiting for wakeups. |
| async fn run(mut self, mut logs: impl Stream<Item = Arc<Message>> + Unpin, call_done: bool) { |
| debug!("Backfilling from cursor until pending."); |
| let mut backlog = vec![]; |
| futures::future::poll_fn(|cx| { |
| loop { |
| match logs.poll_next_unpin(cx) { |
| Poll::Ready(Some(next)) => backlog.push(next), |
| _ => break, |
| } |
| } |
| |
| Poll::Ready(()) |
| }) |
| .await; |
| |
| self.backfill(backlog).await; |
| debug!("Done backfilling."); |
| |
| pin_utils::pin_mut!(logs); |
| while let Some(message) = logs.next().await { |
| self.send_log(&message).await; |
| } |
| |
| if call_done { |
| self.listener.done().ok(); |
| } |
| debug!("Listener exiting."); |
| } |
| |
| /// Returns whether this listener should continue receiving messages. |
| fn is_healthy(&self) -> bool { |
| self.status == Status::Fine |
| } |
| |
| /// Send all messages currently in the provided buffer to this listener. Attempts to batch up |
| /// to the message size limit. Returns early if the listener appears to be unhealthy. |
| async fn backfill<'a>(&mut self, mut messages: Vec<Arc<Message>>) { |
| messages.sort_by_key(|m| m.metadata.timestamp); |
| let mut batch_size = 0; |
| let mut filtered_batch = vec![]; |
| for msg in messages { |
| let size = msg.metadata.size_bytes; |
| if self.filter.should_send(&msg) { |
| if batch_size + size > fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize { |
| self.send_filtered_logs(&mut filtered_batch).await; |
| if !self.is_healthy() { |
| return; |
| } |
| filtered_batch.clear(); |
| batch_size = 0; |
| } |
| batch_size += size; |
| trace!("Batching {:?}.", msg.id); |
| filtered_batch.push(msg.for_listener()); |
| } |
| } |
| |
| if !filtered_batch.is_empty() { |
| self.send_filtered_logs(&mut filtered_batch).await; |
| } |
| } |
| |
| /// Send a batch of pre-filtered log messages to this listener. |
| async fn send_filtered_logs(&mut self, log_messages: &mut Vec<LogMessage>) { |
| trace!("Flushing batch."); |
| self.check_result({ |
| let mut log_messages = log_messages.iter_mut(); |
| let fut = self.listener.log_many(&mut log_messages); |
| fut.await |
| }); |
| } |
| |
| /// Send a single log message if it should be sent according to this listener's filter settings. |
| async fn send_log(&mut self, log_message: &Message) { |
| if self.filter.should_send(log_message) { |
| trace!("Sending {:?}.", log_message.id); |
| let mut to_send = log_message.for_listener(); |
| self.check_result(self.listener.log(&mut to_send).await); |
| } |
| } |
| |
| /// Consume the result of sending logs to this listener, potentially marking it stale. |
| fn check_result(&mut self, result: Result<(), fidl::Error>) { |
| if let Err(e) = result { |
| if e.is_closed() { |
| self.status = Status::Stale; |
| } else { |
| error!(?e, "Error calling listener"); |
| } |
| } |
| } |
| } |
| |
| #[derive(Debug, Error)] |
| pub enum ListenerError { |
| #[error("{count} tags provided, max {}", fidl_fuchsia_logger::MAX_TAGS)] |
| TooManyTags { count: usize }, |
| |
| #[error("tag at index {index} is too long, max {}", fidl_fuchsia_logger::MAX_TAG_LEN_BYTES)] |
| TagTooLong { index: usize }, |
| |
| #[error("couldn't create LogListenerProxy")] |
| CreatingListenerProxy { source: fidl::Error }, |
| |
| #[error("couldn't decode value: {source}")] |
| Decode { |
| #[from] |
| source: super::error::StreamError, |
| }, |
| |
| #[error("error while forwarding unsafe log requests: {source}")] |
| AsbestosIo { source: fidl::Error }, |
| } |