| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| constants::FORMATTED_CONTENT_CHUNK_SIZE_TARGET, |
| diagnostics::BatchIteratorConnectionStats, |
| error::AccessorError, |
| formatter::{new_batcher, FormattedStream, JsonPacketSerializer, SerializedVmo}, |
| inspect::{repository::InspectRepository, ReaderServer}, |
| logs::repository::LogsRepository, |
| pipeline::Pipeline, |
| ImmutableString, |
| }, |
| diagnostics_data::{Data, DiagnosticsData, Metadata}, |
| fidl::endpoints::{ControlHandle, RequestStream}, |
| fidl_fuchsia_diagnostics::{ |
| ArchiveAccessorRequest, ArchiveAccessorRequestStream, BatchIteratorControlHandle, |
| BatchIteratorRequest, BatchIteratorRequestStream, ClientSelectorConfiguration, DataType, |
| Format, FormattedContent, PerformanceConfiguration, Selector, SelectorArgument, StreamMode, |
| StreamParameters, StringSelector, TreeSelector, TreeSelectorUnknown, |
| }, |
| fidl_fuchsia_diagnostics_host as fhost, |
| fidl_fuchsia_mem::Buffer, |
| flyweights::FlyStr, |
| fuchsia_async::{self as fasync, Task}, |
| fuchsia_inspect::NumericProperty, |
| fuchsia_sync::Mutex, |
| fuchsia_trace as ftrace, fuchsia_zircon as zx, |
| futures::{ |
| channel::mpsc, |
| future::{select, Either}, |
| pin_mut, |
| prelude::*, |
| stream::Peekable, |
| StreamExt, |
| }, |
| selectors::FastError, |
| serde::Serialize, |
| std::{collections::HashMap, pin::Pin, sync::Arc}, |
| thiserror::Error, |
| tracing::warn, |
| }; |
| |
| #[derive(Debug, Copy, Clone)] |
| pub struct BatchRetrievalTimeout(i64); |
| |
| impl BatchRetrievalTimeout { |
| pub fn from_seconds(s: i64) -> Self { |
| Self(s) |
| } |
| |
| #[cfg(test)] |
| pub fn max() -> Self { |
| Self::from_seconds(-1) |
| } |
| |
| pub fn seconds(&self) -> i64 { |
| if self.0 > 0 { |
| self.0 |
| } else { |
| i64::MAX |
| } |
| } |
| } |
| |
| /// ArchiveAccessorServer represents an incoming connection from a client to an Archivist |
| /// instance, through which the client may make Reader requests to the various data |
| /// sources the Archivist offers. |
| pub struct ArchiveAccessorServer { |
| inspect_repository: Arc<InspectRepository>, |
| logs_repository: Arc<LogsRepository>, |
| maximum_concurrent_snapshots_per_reader: u64, |
| server_task_sender: Arc<Mutex<mpsc::UnboundedSender<fasync::Task<()>>>>, |
| server_task_drainer: Mutex<Option<fasync::Task<()>>>, |
| default_batch_timeout_seconds: BatchRetrievalTimeout, |
| } |
| |
| fn validate_and_parse_selectors( |
| selector_args: Vec<SelectorArgument>, |
| ) -> Result<Vec<Selector>, AccessorError> { |
| let mut selectors = vec![]; |
| let mut errors = vec![]; |
| |
| if selector_args.is_empty() { |
| return Err(AccessorError::EmptySelectors); |
| } |
| |
| for selector_arg in selector_args { |
| match selectors::take_from_argument::<FastError>(selector_arg) { |
| Ok(s) => selectors.push(s), |
| Err(e) => errors.push(e), |
| } |
| } |
| |
| if !errors.is_empty() { |
| warn!(?errors, "Found errors in selector arguments"); |
| } |
| |
| Ok(selectors) |
| } |
| |
| fn validate_and_parse_log_selectors( |
| selector_args: Vec<SelectorArgument>, |
| ) -> Result<Vec<Selector>, AccessorError> { |
| // Only accept selectors of the type: `component:root` for logs for now. |
| let selectors = validate_and_parse_selectors(selector_args)?; |
| for selector in &selectors { |
| // Unwrap safe: Previous validation discards any selector without a node. |
| let tree_selector = selector.tree_selector.as_ref().unwrap(); |
| match tree_selector { |
| TreeSelector::PropertySelector(_) => { |
| return Err(AccessorError::InvalidLogSelector); |
| } |
| TreeSelector::SubtreeSelector(subtree_selector) => { |
| if subtree_selector.node_path.len() != 1 { |
| return Err(AccessorError::InvalidLogSelector); |
| } |
| match &subtree_selector.node_path[0] { |
| StringSelector::ExactMatch(val) if val == "root" => {} |
| StringSelector::StringPattern(val) if val == "root" => {} |
| _ => { |
| return Err(AccessorError::InvalidLogSelector); |
| } |
| } |
| } |
| TreeSelectorUnknown!() => {} |
| } |
| } |
| Ok(selectors) |
| } |
| |
| impl ArchiveAccessorServer { |
| /// Create a new accessor for interacting with the archivist's data. The pipeline |
| /// parameter determines which static configurations scope/restrict the visibility of |
| /// data accessed by readers spawned by this accessor. |
| pub fn new( |
| inspect_repository: Arc<InspectRepository>, |
| logs_repository: Arc<LogsRepository>, |
| maximum_concurrent_snapshots_per_reader: u64, |
| default_batch_timeout_seconds: BatchRetrievalTimeout, |
| ) -> Self { |
| let (snd, rcv) = mpsc::unbounded(); |
| ArchiveAccessorServer { |
| inspect_repository, |
| logs_repository, |
| maximum_concurrent_snapshots_per_reader, |
| server_task_sender: Arc::new(Mutex::new(snd)), |
| server_task_drainer: Mutex::new(Some(fasync::Task::spawn(async move { |
| rcv.for_each_concurrent(None, |rx| rx).await |
| }))), |
| default_batch_timeout_seconds, |
| } |
| } |
| |
| pub async fn wait_for_servers_to_complete(&self) { |
| let task = self |
| .server_task_drainer |
| .lock() |
| .take() |
| .expect("The accessor server task is only awaited for once"); |
| task.await; |
| } |
| |
| pub fn stop(&self) { |
| self.server_task_sender.lock().disconnect(); |
| } |
| |
| async fn spawn<R: ArchiveAccessorWriter + Send>( |
| pipeline: Arc<Pipeline>, |
| inspect_repo: Arc<InspectRepository>, |
| log_repo: Arc<LogsRepository>, |
| requests: R, |
| params: StreamParameters, |
| maximum_concurrent_snapshots_per_reader: u64, |
| default_batch_timeout_seconds: BatchRetrievalTimeout, |
| ) -> Result<(), AccessorError> { |
| let format = params.format.ok_or(AccessorError::MissingFormat)?; |
| if !matches!(format, Format::Json | Format::Cbor) { |
| return Err(AccessorError::UnsupportedFormat); |
| } |
| let mode = params.stream_mode.ok_or(AccessorError::MissingMode)?; |
| |
| let performance_config: PerformanceConfig = PerformanceConfig::new( |
| ¶ms, |
| maximum_concurrent_snapshots_per_reader, |
| default_batch_timeout_seconds, |
| )?; |
| |
| let trace_id = ftrace::Id::random(); |
| match params.data_type.ok_or(AccessorError::MissingDataType)? { |
| DataType::Inspect => { |
| let _trace_guard = ftrace::async_enter!( |
| trace_id, |
| c"app", |
| c"ArchiveAccessorServer::spawn", |
| "data_type" => "Inspect", |
| "trace_id" => u64::from(trace_id) |
| ); |
| if !matches!(mode, StreamMode::Snapshot) { |
| return Err(AccessorError::UnsupportedMode); |
| } |
| let stats = Arc::new(pipeline.accessor_stats().new_inspect_batch_iterator()); |
| |
| let selectors = |
| params.client_selector_configuration.ok_or(AccessorError::MissingSelectors)?; |
| |
| let selectors = match selectors { |
| ClientSelectorConfiguration::Selectors(selectors) => { |
| Some(validate_and_parse_selectors(selectors)?) |
| } |
| ClientSelectorConfiguration::SelectAll(_) => None, |
| _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")), |
| }; |
| |
| let static_selectors_matchers = pipeline.read().static_selectors_matchers(); |
| let unpopulated_container_vec = |
| inspect_repo.fetch_inspect_data(&selectors, static_selectors_matchers); |
| |
| let per_component_budget_opt = if unpopulated_container_vec.is_empty() { |
| None |
| } else { |
| performance_config |
| .aggregated_content_limit_bytes |
| .map(|limit| (limit as usize) / unpopulated_container_vec.len()) |
| }; |
| |
| if let Some(max_snapshot_size) = performance_config.aggregated_content_limit_bytes { |
| stats.global_stats().record_max_snapshot_size_config(max_snapshot_size); |
| } |
| BatchIterator::new( |
| ReaderServer::stream( |
| unpopulated_container_vec, |
| performance_config, |
| selectors, |
| Arc::clone(&stats), |
| trace_id, |
| ), |
| requests, |
| mode, |
| stats, |
| per_component_budget_opt, |
| trace_id, |
| format, |
| )? |
| .run() |
| .await |
| } |
| DataType::Logs => { |
| let _trace_guard = ftrace::async_enter!( |
| trace_id, |
| c"app", |
| c"ArchiveAccessorServer::spawn", |
| "data_type" => "Logs", |
| // An async duration cannot have multiple concurrent child async durations |
| // so we include the nonce as metadata to manually determine relationship. |
| "trace_id" => u64::from(trace_id) |
| ); |
| let stats = Arc::new(pipeline.accessor_stats().new_logs_batch_iterator()); |
| let selectors = match params.client_selector_configuration { |
| Some(ClientSelectorConfiguration::Selectors(selectors)) => { |
| Some(validate_and_parse_log_selectors(selectors)?) |
| } |
| Some(ClientSelectorConfiguration::SelectAll(_)) => None, |
| _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")), |
| }; |
| let logs = log_repo |
| .logs_cursor(mode, selectors, trace_id) |
| .map(move |inner: _| (*inner).clone()); |
| BatchIterator::new_serving_arrays(logs, requests, mode, stats, trace_id)? |
| .run() |
| .await |
| } |
| } |
| } |
| |
| /// Spawn an instance `fidl_fuchsia_diagnostics/Archive` that allows clients to open |
| /// reader session to diagnostics data. |
| pub fn spawn_server<RequestStream>(&self, pipeline: Arc<Pipeline>, mut stream: RequestStream) |
| where |
| RequestStream: ArchiveAccessorTranslator + Send + 'static, |
| <RequestStream as ArchiveAccessorTranslator>::InnerDataRequestChannel: |
| ArchiveAccessorWriter + Send, |
| { |
| // Self isn't guaranteed to live into the exception handling of the async block. We need to clone self |
| // to have a version that can be referenced in the exception handling. |
| let batch_iterator_task_sender = Arc::clone(&self.server_task_sender); |
| let log_repo = Arc::clone(&self.logs_repository); |
| let inspect_repo = Arc::clone(&self.inspect_repository); |
| let maximum_concurrent_snapshots_per_reader = self.maximum_concurrent_snapshots_per_reader; |
| let default_batch_timeout_seconds = self.default_batch_timeout_seconds; |
| self.server_task_sender |
| .lock() |
| .unbounded_send(fasync::Task::spawn(async move { |
| let stats = pipeline.accessor_stats(); |
| stats.global_stats.connections_opened.add(1); |
| while let Some(request) = stream.next().await { |
| let control_handle = request.iterator.get_control_handle(); |
| stats.global_stats.stream_diagnostics_requests.add(1); |
| let pipeline = Arc::clone(&pipeline); |
| |
| // Store the batch iterator task so that we can ensure that the client finishes |
| // draining items through it when a Controller#Stop call happens. For example, |
| // this allows tests to fetch all isolated logs before finishing. |
| let inspect_repo_for_task = Arc::clone(&inspect_repo); |
| let log_repo_for_task = Arc::clone(&log_repo); |
| batch_iterator_task_sender |
| .lock() |
| .unbounded_send(Task::spawn(async move { |
| if let Err(e) = Self::spawn( |
| pipeline, |
| inspect_repo_for_task, |
| log_repo_for_task, |
| request.iterator, |
| request.parameters, |
| maximum_concurrent_snapshots_per_reader, |
| default_batch_timeout_seconds, |
| ) |
| .await |
| { |
| if let Some(control) = control_handle { |
| e.close(control); |
| } |
| } |
| })) |
| .ok(); |
| } |
| pipeline.accessor_stats().global_stats.connections_closed.add(1); |
| })) |
| .ok(); |
| } |
| } |
| |
| pub trait ArchiveAccessorWriter { |
| /// Writes diagnostics data to the remote side. |
| fn write( |
| &mut self, |
| results: Vec<FormattedContent>, |
| ) -> impl Future<Output = Result<(), IteratorError>> + Send; |
| |
| /// Waits for a buffer to be available for writing into. For sockets, this is a no-op. |
| fn wait_for_buffer(&mut self) -> impl Future<Output = anyhow::Result<()>> + Send { |
| futures::future::ready(Ok(())) |
| } |
| |
| /// Takes the control handle from the FIDL stream (or returns None |
| /// if the handle has already been taken, or if this is a socket. |
| fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> { |
| None |
| } |
| |
| /// Sends an on ready event. |
| fn maybe_respond_ready(&mut self) -> impl Future<Output = Result<(), AccessorError>> + Send { |
| futures::future::ready(Ok(())) |
| } |
| |
| /// Waits for ZX_ERR_PEER_CLOSED |
| fn wait_for_close(&mut self) -> impl Future<Output = ()> + Send; |
| } |
| |
| fn get_buffer_from_formatted_content( |
| content: fidl_fuchsia_diagnostics::FormattedContent, |
| ) -> Result<Buffer, AccessorError> { |
| match content { |
| FormattedContent::Json(json) => Ok(json), |
| FormattedContent::Text(text) => Ok(text), |
| _ => Err(AccessorError::UnsupportedFormat), |
| } |
| } |
| |
| impl ArchiveAccessorWriter for fuchsia_async::Socket { |
| async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> { |
| if data.is_empty() { |
| return Err(IteratorError::PeerClosed); |
| } |
| let mut buf = vec![0]; |
| for value in data { |
| let data = get_buffer_from_formatted_content(value)?; |
| buf.resize(data.size as usize, 0); |
| data.vmo.read(&mut buf, 0)?; |
| let res = self.write_all(&buf).await; |
| if res.is_err() { |
| // connection probably closed. |
| break; |
| } |
| } |
| Ok(()) |
| } |
| |
| async fn wait_for_close(&mut self) { |
| let _ = self.on_closed().await; |
| } |
| } |
| |
| #[derive(Error, Debug)] |
| pub enum IteratorError { |
| #[error("Peer closed")] |
| PeerClosed, |
| #[error(transparent)] |
| Ipc(#[from] fidl::Error), |
| #[error(transparent)] |
| AccessorError(#[from] AccessorError), |
| // This error should be unreachable. We should never |
| // fail to read from a VMO that we created, but the type system |
| // requires us to handle this. |
| #[error("Error reading from VMO: {}", source)] |
| VmoReadError { |
| #[from] |
| source: fuchsia_zircon::Status, |
| }, |
| } |
| |
| impl ArchiveAccessorWriter for Peekable<BatchIteratorRequestStream> { |
| async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> { |
| loop { |
| match self.next().await { |
| Some(Ok(BatchIteratorRequest::GetNext { responder })) => { |
| responder.send(Ok(data))?; |
| return Ok(()); |
| } |
| Some(Ok(BatchIteratorRequest::WaitForReady { responder })) => { |
| responder.send()?; |
| } |
| Some(Ok(BatchIteratorRequest::_UnknownMethod { .. })) => { |
| return Err(IteratorError::PeerClosed); |
| } |
| Some(Err(err)) => return Err(err.into()), |
| None => { |
| return Err(IteratorError::PeerClosed); |
| } |
| } |
| } |
| } |
| |
| async fn maybe_respond_ready(&mut self) -> Result<(), AccessorError> { |
| let mut this = Pin::new(self); |
| if matches!(this.as_mut().peek().await, Some(Ok(BatchIteratorRequest::WaitForReady { .. }))) |
| { |
| let Some(Ok(BatchIteratorRequest::WaitForReady { responder })) = this.next().await |
| else { |
| unreachable!("We already checked the next request was WaitForReady"); |
| }; |
| responder.send()?; |
| } |
| Ok(()) |
| } |
| |
| async fn wait_for_buffer(&mut self) -> anyhow::Result<()> { |
| let this = Pin::new(self); |
| match this.peek().await { |
| Some(Ok(_)) => Ok(()), |
| _ => Err(IteratorError::PeerClosed.into()), |
| } |
| } |
| |
| fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> { |
| Some(self.get_ref().control_handle()) |
| } |
| |
| async fn wait_for_close(&mut self) { |
| let _ = self.get_ref().control_handle().on_closed().await; |
| } |
| } |
| |
| pub struct ArchiveIteratorRequest<R> { |
| parameters: StreamParameters, |
| iterator: R, |
| } |
| |
| /// Translation trait used to support both remote and |
| /// local ArchiveAccessor implementations. |
| pub trait ArchiveAccessorTranslator { |
| type InnerDataRequestChannel; |
| fn next( |
| &mut self, |
| ) -> impl Future<Output = Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>>> + Send; |
| } |
| |
| impl ArchiveAccessorTranslator for fhost::ArchiveAccessorRequestStream { |
| type InnerDataRequestChannel = fuchsia_async::Socket; |
| |
| async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> { |
| match StreamExt::next(self).await { |
| Some(Ok(fhost::ArchiveAccessorRequest::StreamDiagnostics { |
| parameters, |
| responder, |
| stream, |
| })) => { |
| // It's fine for the client to send us a socket |
| // and discard the channel without waiting for a response. |
| // Future communication takes place over the socket so |
| // the client may opt to use this as an optimization. |
| let _ = responder.send(); |
| Some(ArchiveIteratorRequest { |
| iterator: fuchsia_async::Socket::from_socket(stream), |
| parameters, |
| }) |
| } |
| _ => None, |
| } |
| } |
| } |
| |
| impl ArchiveAccessorTranslator for ArchiveAccessorRequestStream { |
| type InnerDataRequestChannel = Peekable<BatchIteratorRequestStream>; |
| |
| async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> { |
| match StreamExt::next(self).await { |
| Some(Ok(ArchiveAccessorRequest::StreamDiagnostics { |
| control_handle: _, |
| result_stream, |
| stream_parameters, |
| })) => Some(ArchiveIteratorRequest { |
| iterator: result_stream.into_stream().unwrap().peekable(), |
| parameters: stream_parameters, |
| }), |
| _ => None, |
| } |
| } |
| } |
| struct SchemaTruncationCounter { |
| truncated_schemas: u64, |
| total_schemas: u64, |
| } |
| |
| impl SchemaTruncationCounter { |
| pub fn new() -> Arc<Mutex<Self>> { |
| Arc::new(Mutex::new(Self { truncated_schemas: 0, total_schemas: 0 })) |
| } |
| } |
| |
| pub(crate) struct BatchIterator<R> { |
| requests: R, |
| stats: Arc<BatchIteratorConnectionStats>, |
| data: FormattedStream, |
| truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>, |
| parent_trace_id: ftrace::Id, |
| } |
| |
| // Checks if a given schema is within a components budget, and if it is, updates the budget, |
| // then returns true. Otherwise, if the schema is not within budget, returns false. |
| fn maybe_update_budget( |
| budget_map: &mut HashMap<ImmutableString, usize>, |
| moniker: &str, |
| bytes: usize, |
| byte_limit: usize, |
| ) -> bool { |
| if let Some(remaining_budget) = budget_map.get_mut(&FlyStr::new(moniker)) { |
| if *remaining_budget + bytes > byte_limit { |
| false |
| } else { |
| *remaining_budget += bytes; |
| true |
| } |
| } else if bytes > byte_limit { |
| budget_map.insert(FlyStr::new(moniker), 0); |
| false |
| } else { |
| budget_map.insert(FlyStr::new(moniker), bytes); |
| true |
| } |
| } |
| |
| impl<R> BatchIterator<R> |
| where |
| R: ArchiveAccessorWriter + Send, |
| { |
| pub fn new<Items, D>( |
| data: Items, |
| requests: R, |
| mode: StreamMode, |
| stats: Arc<BatchIteratorConnectionStats>, |
| per_component_byte_limit_opt: Option<usize>, |
| parent_trace_id: ftrace::Id, |
| format: Format, |
| ) -> Result<Self, AccessorError> |
| where |
| Items: Stream<Item = Data<D>> + Send + 'static, |
| D: DiagnosticsData + 'static, |
| { |
| let result_stats_for_fut = Arc::clone(&stats); |
| |
| let budget_tracker_shared = Arc::new(Mutex::new(HashMap::new())); |
| |
| let truncation_counter = SchemaTruncationCounter::new(); |
| let stream_owned_counter_for_fut = Arc::clone(&truncation_counter); |
| |
| let data = data.then(move |mut d| { |
| let stream_owned_counter = Arc::clone(&stream_owned_counter_for_fut); |
| let result_stats = Arc::clone(&result_stats_for_fut); |
| let budget_tracker = Arc::clone(&budget_tracker_shared); |
| async move { |
| let trace_id = ftrace::Id::random(); |
| let _trace_guard = ftrace::async_enter!( |
| trace_id, |
| c"app", |
| c"BatchIterator::new.serialize", |
| // An async duration cannot have multiple concurrent child async durations |
| // so we include the nonce as metadata to manually determine relationship. |
| "parent_trace_id" => u64::from(parent_trace_id), |
| "trace_id" => u64::from(trace_id), |
| "moniker" => d.moniker.as_ref() |
| ); |
| let mut unlocked_counter = stream_owned_counter.lock(); |
| let mut tracker_guard = budget_tracker.lock(); |
| unlocked_counter.total_schemas += 1; |
| if d.metadata.has_errors() { |
| result_stats.add_result_error(); |
| } |
| |
| match SerializedVmo::serialize(&d, D::DATA_TYPE, format) { |
| Err(e) => { |
| result_stats.add_result_error(); |
| Err(e) |
| } |
| Ok(contents) => { |
| result_stats.add_result(); |
| match per_component_byte_limit_opt { |
| Some(x) => { |
| if maybe_update_budget( |
| &mut tracker_guard, |
| &d.moniker, |
| contents.size as usize, |
| x, |
| ) { |
| Ok(contents) |
| } else { |
| result_stats.add_schema_truncated(); |
| unlocked_counter.truncated_schemas += 1; |
| d.drop_payload(); |
| // TODO(66085): If a payload is truncated, cache the |
| // new schema so that we can reuse if other schemas from |
| // the same component get dropped. |
| SerializedVmo::serialize(&d, D::DATA_TYPE, format) |
| } |
| } |
| None => Ok(contents), |
| } |
| } |
| } |
| } |
| }); |
| |
| Self::new_inner( |
| new_batcher(data, Arc::clone(&stats), mode), |
| requests, |
| stats, |
| Some(truncation_counter), |
| parent_trace_id, |
| ) |
| } |
| |
| pub fn new_serving_arrays<D, S>( |
| data: S, |
| requests: R, |
| mode: StreamMode, |
| stats: Arc<BatchIteratorConnectionStats>, |
| parent_trace_id: ftrace::Id, |
| ) -> Result<Self, AccessorError> |
| where |
| D: Serialize + Send + 'static, |
| S: Stream<Item = D> + Send + Unpin + 'static, |
| { |
| let data = JsonPacketSerializer::new( |
| Arc::clone(&stats), |
| FORMATTED_CONTENT_CHUNK_SIZE_TARGET, |
| data, |
| ); |
| Self::new_inner( |
| new_batcher(data, Arc::clone(&stats), mode), |
| requests, |
| stats, |
| None, |
| parent_trace_id, |
| ) |
| } |
| |
| fn new_inner( |
| data: FormattedStream, |
| requests: R, |
| stats: Arc<BatchIteratorConnectionStats>, |
| truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>, |
| parent_trace_id: ftrace::Id, |
| ) -> Result<Self, AccessorError> { |
| stats.open_connection(); |
| Ok(Self { data, requests, stats, truncation_counter, parent_trace_id }) |
| } |
| |
| pub async fn run(mut self) -> Result<(), AccessorError> { |
| self.requests.maybe_respond_ready().await?; |
| while self.requests.wait_for_buffer().await.is_ok() { |
| self.stats.add_request(); |
| let start_time = zx::Time::get_monotonic(); |
| let trace_id = ftrace::Id::random(); |
| let _trace_guard = ftrace::async_enter!( |
| trace_id, |
| c"app", |
| c"BatchIterator::run.get_send_batch", |
| // An async duration cannot have multiple concurrent child async durations |
| // so we include the nonce as metadata to manually determine relationship. |
| "parent_trace_id" => u64::from(self.parent_trace_id), |
| "trace_id" => u64::from(trace_id) |
| ); |
| let batch = { |
| let wait_for_close = self.requests.wait_for_close(); |
| let next_data = self.data.next(); |
| pin_mut!(wait_for_close); |
| match select(next_data, wait_for_close).await { |
| // if we get None back, treat that as a terminal batch with an empty vec |
| Either::Left((batch_option, _)) => batch_option.unwrap_or_default(), |
| // if the client closes the channel, stop waiting and terminate. |
| Either::Right(_) => break, |
| } |
| }; |
| |
| // turn errors into epitaphs -- we drop intermediate items if there was an error midway |
| let batch = batch.into_iter().collect::<Result<Vec<_>, _>>()?; |
| |
| // increment counters |
| self.stats.add_response(); |
| if batch.is_empty() { |
| if let Some(truncation_count) = &self.truncation_counter { |
| let unlocked_count = truncation_count.lock(); |
| if unlocked_count.total_schemas > 0 { |
| self.stats.global_stats().record_percent_truncated_schemas( |
| ((unlocked_count.truncated_schemas as f32 |
| / unlocked_count.total_schemas as f32) |
| * 100.0) |
| .round() as u64, |
| ); |
| } |
| } |
| self.stats.add_terminal(); |
| } |
| self.stats.global_stats().record_batch_duration(zx::Time::get_monotonic() - start_time); |
| if self.requests.write(batch).await.is_err() { |
| // Peer closed, end the stream. |
| break; |
| } |
| } |
| Ok(()) |
| } |
| } |
| |
| impl<R> Drop for BatchIterator<R> { |
| fn drop(&mut self) { |
| self.stats.close_connection(); |
| } |
| } |
| |
| pub struct PerformanceConfig { |
| pub batch_timeout_sec: i64, |
| pub aggregated_content_limit_bytes: Option<u64>, |
| pub maximum_concurrent_snapshots_per_reader: u64, |
| } |
| |
| impl PerformanceConfig { |
| fn new( |
| params: &StreamParameters, |
| maximum_concurrent_snapshots_per_reader: u64, |
| default_batch_timeout_seconds: BatchRetrievalTimeout, |
| ) -> Result<PerformanceConfig, AccessorError> { |
| let batch_timeout = match params { |
| // If only nested batch retrieval timeout is definitely not set, |
| // use the optional outer field. |
| StreamParameters { |
| batch_retrieval_timeout_seconds, |
| performance_configuration: None, |
| .. |
| } |
| | StreamParameters { |
| batch_retrieval_timeout_seconds, |
| performance_configuration: |
| Some(PerformanceConfiguration { batch_retrieval_timeout_seconds: None, .. }), |
| .. |
| } => batch_retrieval_timeout_seconds, |
| // If the outer field is definitely not set, and the inner field might be, |
| // use the inner field. |
| StreamParameters { |
| batch_retrieval_timeout_seconds: None, |
| performance_configuration: |
| Some(PerformanceConfiguration { batch_retrieval_timeout_seconds, .. }), |
| .. |
| } => batch_retrieval_timeout_seconds, |
| // Both the inner and outer fields are set, which is an error. |
| _ => return Err(AccessorError::DuplicateBatchTimeout), |
| } |
| .map(BatchRetrievalTimeout::from_seconds) |
| .unwrap_or(default_batch_timeout_seconds); |
| |
| let aggregated_content_limit_bytes = match params { |
| StreamParameters { |
| performance_configuration: |
| Some(PerformanceConfiguration { max_aggregate_content_size_bytes, .. }), |
| .. |
| } => *max_aggregate_content_size_bytes, |
| _ => None, |
| }; |
| |
| Ok(PerformanceConfig { |
| batch_timeout_sec: batch_timeout.seconds(), |
| aggregated_content_limit_bytes, |
| maximum_concurrent_snapshots_per_reader, |
| }) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::diagnostics::AccessorStats; |
| use assert_matches::assert_matches; |
| use fidl_fuchsia_diagnostics::{ArchiveAccessorMarker, BatchIteratorMarker}; |
| use fuchsia_inspect::{Inspector, Node}; |
| use fuchsia_zircon_status as zx_status; |
| use zx::AsHandleRef; |
| |
| #[fuchsia::test] |
| async fn logs_only_accept_basic_component_selectors() { |
| let (accessor, stream) = |
| fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>().unwrap(); |
| let pipeline = Arc::new(Pipeline::for_test(None)); |
| let inspector = Inspector::default(); |
| let log_repo = LogsRepository::new(1_000_000, inspector.root()); |
| let inspect_repo = Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)])); |
| let server = |
| ArchiveAccessorServer::new(inspect_repo, log_repo, 4, BatchRetrievalTimeout::max()); |
| server.spawn_server(pipeline, stream); |
| |
| // A selector of the form `component:node/path:property` is rejected. |
| let (batch_iterator, server_end) = |
| fidl::endpoints::create_proxy::<BatchIteratorMarker>().unwrap(); |
| assert!(accessor |
| .r#stream_diagnostics( |
| &StreamParameters { |
| data_type: Some(DataType::Logs), |
| stream_mode: Some(StreamMode::SnapshotThenSubscribe), |
| format: Some(Format::Json), |
| client_selector_configuration: Some(ClientSelectorConfiguration::Selectors( |
| vec![SelectorArgument::RawSelector("foo:root/bar:baz".to_string()),] |
| )), |
| ..Default::default() |
| }, |
| server_end |
| ) |
| .is_ok()); |
| assert_matches!( |
| batch_iterator.get_next().await, |
| Err(fidl::Error::ClientChannelClosed { status: zx_status::Status::INVALID_ARGS, .. }) |
| ); |
| |
| // A selector of the form `component:root` is accepted. |
| let (batch_iterator, server_end) = |
| fidl::endpoints::create_proxy::<BatchIteratorMarker>().unwrap(); |
| assert!(accessor |
| .r#stream_diagnostics( |
| &StreamParameters { |
| data_type: Some(DataType::Logs), |
| stream_mode: Some(StreamMode::Snapshot), |
| format: Some(Format::Json), |
| client_selector_configuration: Some(ClientSelectorConfiguration::Selectors( |
| vec![SelectorArgument::RawSelector("foo:root".to_string()),] |
| )), |
| ..Default::default() |
| }, |
| server_end |
| ) |
| .is_ok()); |
| |
| assert!(batch_iterator.get_next().await.is_ok()); |
| } |
| |
| #[fuchsia::test] |
| async fn accessor_skips_invalid_selectors() { |
| let (accessor, stream) = |
| fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>().unwrap(); |
| let pipeline = Arc::new(Pipeline::for_test(None)); |
| let inspector = Inspector::default(); |
| let log_repo = LogsRepository::new(1_000_000, inspector.root()); |
| let inspect_repo = Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)])); |
| let server = Arc::new(ArchiveAccessorServer::new( |
| inspect_repo, |
| log_repo, |
| 4, |
| BatchRetrievalTimeout::max(), |
| )); |
| server.spawn_server(pipeline, stream); |
| |
| // A selector of the form `component:node/path:property` is rejected. |
| let (batch_iterator, server_end) = |
| fidl::endpoints::create_proxy::<BatchIteratorMarker>().unwrap(); |
| |
| assert!(accessor |
| .r#stream_diagnostics( |
| &StreamParameters { |
| data_type: Some(DataType::Inspect), |
| stream_mode: Some(StreamMode::Snapshot), |
| format: Some(Format::Json), |
| client_selector_configuration: Some(ClientSelectorConfiguration::Selectors( |
| vec![ |
| SelectorArgument::RawSelector("invalid".to_string()), |
| SelectorArgument::RawSelector("valid:root".to_string()), |
| ] |
| )), |
| ..Default::default() |
| }, |
| server_end |
| ) |
| .is_ok()); |
| |
| // The batch iterator proxy should remain valid and providing responses regardless of the |
| // invalid selectors that were given. |
| assert!(batch_iterator.get_next().await.is_ok()); |
| } |
| |
| #[fuchsia::test] |
| async fn buffered_iterator_handles_two_consecutive_buffer_waits() { |
| let (client, server) = |
| fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>().unwrap(); |
| let _fut = client.get_next(); |
| let mut server = server.peekable(); |
| assert_matches!(server.wait_for_buffer().await, Ok(())); |
| assert_matches!(server.wait_for_buffer().await, Ok(())); |
| } |
| |
| #[fuchsia::test] |
| async fn buffered_iterator_handles_peer_closed() { |
| let (client, server) = |
| fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>().unwrap(); |
| let mut server = server.peekable(); |
| drop(client); |
| assert_matches!( |
| server |
| .write(vec![FormattedContent::Text(Buffer { |
| size: 1, |
| vmo: fuchsia_zircon::Vmo::create(1).unwrap(), |
| })]) |
| .await, |
| Err(IteratorError::PeerClosed) |
| ); |
| } |
| |
| #[fuchsia::test] |
| fn socket_writer_handles_text() { |
| let vmo = fuchsia_zircon::Vmo::create(1).unwrap(); |
| vmo.write(&[5u8], 0).unwrap(); |
| let koid = vmo.get_koid().unwrap(); |
| let text = FormattedContent::Text(Buffer { size: 1, vmo }); |
| let result = get_buffer_from_formatted_content(text).unwrap(); |
| assert_eq!(result.size, 1); |
| assert_eq!(result.vmo.get_koid().unwrap(), koid); |
| let mut buffer = [0]; |
| result.vmo.read(&mut buffer, 0).unwrap(); |
| assert_eq!(buffer[0], 5); |
| } |
| |
| #[fuchsia::test] |
| fn socket_writer_does_not_handle_cbor() { |
| let vmo = fuchsia_zircon::Vmo::create(1).unwrap(); |
| vmo.write(&[5u8], 0).unwrap(); |
| let text = FormattedContent::Cbor(vmo); |
| let result = get_buffer_from_formatted_content(text); |
| assert_matches!(result, Err(AccessorError::UnsupportedFormat)); |
| } |
| |
| #[fuchsia::test] |
| async fn socket_writer_handles_closed_socket() { |
| let (local, remote) = fuchsia_zircon::Socket::create_stream(); |
| drop(local); |
| let mut remote = fuchsia_async::Socket::from_socket(remote); |
| { |
| let result = ArchiveAccessorWriter::write( |
| &mut remote, |
| vec![FormattedContent::Text(Buffer { |
| size: 1, |
| vmo: fuchsia_zircon::Vmo::create(1).unwrap(), |
| })], |
| ) |
| .await; |
| assert_matches!(result, Ok(())); |
| } |
| remote.wait_for_close().await; |
| } |
| |
| #[fuchsia::test] |
| fn batch_iterator_terminates_on_client_disconnect() { |
| let mut executor = fasync::TestExecutor::new(); |
| let (batch_iterator_proxy, stream) = |
| fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>().unwrap(); |
| // Create a batch iterator that uses a hung stream to serve logs. |
| let batch_iterator = BatchIterator::new( |
| futures::stream::pending::<diagnostics_data::Data<diagnostics_data::Logs>>(), |
| stream.peekable(), |
| StreamMode::Subscribe, |
| Arc::new(AccessorStats::new(Node::default()).new_inspect_batch_iterator()), |
| None, |
| ftrace::Id::random(), |
| Format::Json, |
| ) |
| .expect("create batch iterator"); |
| |
| let mut batch_iterator_fut = batch_iterator.run().boxed(); |
| assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending()); |
| |
| // After sending a request, the request should be unfulfilled. |
| let mut iterator_request_fut = batch_iterator_proxy.get_next(); |
| assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending()); |
| assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending()); |
| assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending()); |
| |
| // After closing the client end of the channel, the server should terminate and release |
| // resources. |
| drop(iterator_request_fut); |
| drop(batch_iterator_proxy); |
| assert_matches!(executor.run_singlethreaded(&mut batch_iterator_fut), Ok(())); |
| } |
| |
| #[fuchsia::test] |
| async fn batch_iterator_on_ready_is_called() { |
| let (accessor, stream) = |
| fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>().unwrap(); |
| let pipeline = Arc::new(Pipeline::for_test(None)); |
| let inspector = Inspector::default(); |
| let log_repo = LogsRepository::new(1_000_000, inspector.root()); |
| let inspect_repo = Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)])); |
| let server = |
| ArchiveAccessorServer::new(inspect_repo, log_repo, 4, BatchRetrievalTimeout::max()); |
| server.spawn_server(pipeline, stream); |
| |
| // A selector of the form `component:node/path:property` is rejected. |
| let (batch_iterator, server_end) = |
| fidl::endpoints::create_proxy::<BatchIteratorMarker>().unwrap(); |
| assert!(accessor |
| .r#stream_diagnostics( |
| &StreamParameters { |
| data_type: Some(DataType::Logs), |
| stream_mode: Some(StreamMode::Subscribe), |
| format: Some(Format::Json), |
| client_selector_configuration: Some(ClientSelectorConfiguration::SelectAll( |
| true |
| )), |
| ..Default::default() |
| }, |
| server_end |
| ) |
| .is_ok()); |
| |
| // We receive a response for WaitForReady |
| assert!(batch_iterator.wait_for_ready().await.is_ok()); |
| } |
| } |