blob: c0a78c107128472374d2d759e55e5b18ecb9ceed [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
constants::FORMATTED_CONTENT_CHUNK_SIZE_TARGET,
diagnostics::DiagnosticsServerStats,
formatter::{FormattedContentBatcher, JsonPacketSerializer, JsonString},
},
diagnostics_data::{Data, DiagnosticsData},
fidl_fuchsia_diagnostics::{
self, BatchIteratorControlHandle, BatchIteratorRequest, BatchIteratorRequestStream,
FormattedContent,
},
fuchsia_zircon_status::Status as ZxStatus,
futures::prelude::*,
log::warn,
serde::Serialize,
std::{ops::Deref, pin::Pin, sync::Arc},
thiserror::Error,
};
pub struct AccessorServer {
requests: BatchIteratorRequestStream,
stats: Arc<DiagnosticsServerStats>,
data: FormattedStream,
}
type FormattedStream =
Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, ServerError>>> + Send>>;
impl AccessorServer {
pub fn new<Items, D>(
data: Items,
requests: BatchIteratorRequestStream,
stats: Arc<DiagnosticsServerStats>,
) -> Result<Self, ServerError>
where
Items: Stream<Item = Data<D>> + Send + 'static,
D: DiagnosticsData,
{
let result_stats = stats.clone();
let data = data.map(move |d| {
if D::has_errors(&d.metadata) {
result_stats.add_result_error();
}
let res = JsonString::serialize(&d);
if res.is_err() {
result_stats.add_result_error();
}
result_stats.add_result();
res
});
let data = Box::pin(FormattedContentBatcher::new(data, stats.clone()));
Self::new_inner(data, requests, stats)
}
pub fn new_serving_arrays<D, P, S>(
data: S,
requests: BatchIteratorRequestStream,
stats: Arc<DiagnosticsServerStats>,
) -> Result<Self, ServerError>
where
P: Deref<Target = D>,
D: Serialize,
S: Stream<Item = P> + Send + Unpin + 'static,
{
let serialized =
JsonPacketSerializer::new(stats.clone(), FORMATTED_CONTENT_CHUNK_SIZE_TARGET, data);
let data = Box::pin(FormattedContentBatcher::new(serialized, stats.clone()));
Self::new_inner(data, requests, stats)
}
fn new_inner(
data: FormattedStream,
requests: BatchIteratorRequestStream,
stats: Arc<DiagnosticsServerStats>,
) -> Result<Self, ServerError> {
stats.open_connection();
Ok(Self { data, requests, stats })
}
pub async fn run(mut self) -> Result<(), ServerError> {
while let Some(res) = self.requests.next().await {
let BatchIteratorRequest::GetNext { responder } = res?;
self.stats.add_request();
// if we get None back, treat that as a terminal batch with an empty vec
let batch = self.data.next().await.unwrap_or(vec![]);
// turn errors into epitaphs -- we drop intermediate items if there was an error midway
let batch = batch.into_iter().collect::<Result<Vec<_>, _>>()?;
// increment counters
self.stats.add_response();
if batch.is_empty() {
self.stats.add_terminal();
}
let mut response = Ok(batch);
responder.send(&mut response)?;
}
Ok(())
}
}
impl Drop for AccessorServer {
fn drop(&mut self) {
self.stats.close_connection();
}
}
#[derive(Debug, Error)]
pub enum ServerError {
#[error("data_type must be set")]
MissingDataType,
#[error("client_selector_configuration must be set")]
MissingSelectors,
#[error("no selectors were provided")]
EmptySelectors,
#[error("requested selectors are unsupported: {}", .0)]
InvalidSelectors(&'static str),
#[error("couldn't parse/validate the provided selectors")]
ParseSelectors(#[source] anyhow::Error),
#[error("format must be set")]
MissingFormat,
#[error("only JSON supported right now")]
UnsupportedFormat,
#[error("stream_mode must be set")]
MissingMode,
#[error("only snapshot supported right now")]
UnsupportedMode,
#[error("IPC failure")]
Ipc {
#[from]
source: fidl::Error,
},
#[error("Unable to create a VMO -- extremely unusual!")]
VmoCreate(#[source] ZxStatus),
#[error("Unable to write to VMO -- we may be OOMing")]
VmoWrite(#[source] ZxStatus),
#[error("JSON serialization failure: {}", source)]
Serialization {
#[from]
source: serde_json::Error,
},
}
impl ServerError {
pub fn close(self, control: BatchIteratorControlHandle) {
warn!("Closing BatchIterator: {}", &self);
let epitaph = match self {
ServerError::MissingDataType => ZxStatus::INVALID_ARGS,
ServerError::EmptySelectors
| ServerError::MissingSelectors
| ServerError::InvalidSelectors(_)
| ServerError::ParseSelectors(_) => ZxStatus::INVALID_ARGS,
ServerError::VmoCreate(status) | ServerError::VmoWrite(status) => status,
ServerError::MissingFormat | ServerError::MissingMode => ZxStatus::INVALID_ARGS,
ServerError::UnsupportedFormat | ServerError::UnsupportedMode => ZxStatus::WRONG_TYPE,
ServerError::Serialization { .. } => ZxStatus::BAD_STATE,
ServerError::Ipc { .. } => ZxStatus::IO,
};
control.shutdown_with_epitaph(epitaph);
}
}