blob: ec2e7e5ebaa90b530b9c99548788ceffaa2206d4 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{diagnostics::DiagnosticsServerStats, server::ServerError},
anyhow::Context as _,
fidl_fuchsia_diagnostics::{FormattedContent, MAXIMUM_ENTRIES_PER_BATCH},
fuchsia_zircon as zx,
futures::prelude::*,
log::{error, warn},
serde::Serialize,
std::{
convert::TryInto,
ops::Deref,
pin::Pin,
sync::Arc,
task::{Context, Poll},
},
};
/// Serialize the `contents` to JSON in a VMO, returned as a `FormattedContent`.
pub fn serialize_to_formatted_json_content(
contents: impl Serialize,
) -> Result<FormattedContent, anyhow::Error> {
let content_string = serde_json::to_string_pretty(&contents)?;
make_json_formatted_content(&content_string)
}
/// Produces a `FormattedContent` with the provided JSON string as its contents. Does not validate
/// that `content_string` is JSON.
fn make_json_formatted_content(content_string: &str) -> Result<FormattedContent, anyhow::Error> {
let size = content_string.len() as u64;
let vmo = zx::Vmo::create(size).context("error creating buffer")?;
vmo.write(content_string.as_bytes(), 0).context("error writing buffer")?;
Ok(FormattedContent::Json(fidl_fuchsia_mem::Buffer { vmo, size }))
}
#[pin_project::pin_project]
pub struct FormattedContentBatcher<C> {
#[pin]
items: C,
stats: Arc<DiagnosticsServerStats>,
}
impl<I, E> FormattedContentBatcher<futures::stream::ReadyChunks<I>>
where
I: Stream<Item = Result<JsonString, E>>,
E: Into<ServerError>,
{
pub fn new(items: I, stats: Arc<DiagnosticsServerStats>) -> Self {
Self { items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _), stats }
}
}
impl<I, T, E> Stream for FormattedContentBatcher<futures::stream::ReadyChunks<I>>
where
I: Stream<Item = Result<T, E>>,
T: TryInto<FormattedContent, Error = ServerError>,
E: Into<ServerError>,
{
type Item = Vec<Result<FormattedContent, ServerError>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.items.poll_next(cx) {
Poll::Ready(Some(chunk)) => {
// loop over chunk instead of into_iter/map because we can't move `this`
let mut batch = vec![];
for item in chunk {
let result = match item {
Ok(i) => i.try_into(),
Err(e) => {
this.stats.add_result_error();
Err(e.into())
}
};
batch.push(result);
}
Poll::Ready(Some(batch))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
/// A string whose contents are valid JSON.
pub struct JsonString(String);
impl JsonString {
pub fn serialize(source: &impl Serialize) -> Result<Self, serde_json::Error> {
serde_json::to_string_pretty(source).map(JsonString)
}
}
impl TryInto<FormattedContent> for JsonString {
type Error = ServerError;
fn try_into(self) -> Result<FormattedContent, Self::Error> {
let size = self.len() as u64;
let vmo = zx::Vmo::create(size).map_err(ServerError::VmoCreate)?;
vmo.write(self.as_bytes(), 0).map_err(ServerError::VmoWrite)?;
Ok(FormattedContent::Json(fidl_fuchsia_mem::Buffer { vmo, size }))
}
}
impl Deref for JsonString {
type Target = str;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
/// Wraps an iterator over serializable items and yields FormattedContents, packing items
/// into a JSON array in each VMO up to the size limit provided.
pub struct JsonPacketSerializer<I> {
items: I,
stats: Arc<DiagnosticsServerStats>,
max_packet_size: usize,
overflow: Option<String>,
}
impl<I> JsonPacketSerializer<I> {
pub fn new(stats: Arc<DiagnosticsServerStats>, max_packet_size: usize, items: I) -> Self {
Self { items, stats, max_packet_size, overflow: None }
}
}
impl<I, P, S> Stream for JsonPacketSerializer<I>
where
I: Stream<Item = P> + Unpin,
P: Deref<Target = S>,
S: Serialize,
{
type Item = Result<JsonString, serde_json::Error>;
/// Serialize log messages in a JSON array up to the maximum size provided. Returns Ok(None)
/// when there are no more messages to serialize.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut batch = String::from("[");
if let Some(item) = self.overflow.take() {
batch.push_str(&item);
self.stats.add_result();
}
let mut items_is_pending = false;
loop {
let item = match self.items.poll_next_unpin(cx) {
Poll::Ready(Some(item)) => item,
Poll::Ready(None) => break,
Poll::Pending => {
items_is_pending = true;
break;
}
};
let item = serde_json::to_string(&*item)?;
if item.len() >= self.max_packet_size {
warn!(
"serializing oversize item into packet (limit={} actual={})",
self.max_packet_size,
item.len()
);
}
let is_first = batch.len() == 1;
// items after the first will have a comma *and* newline *and* ending array bracket
let pending_len = item.len() + if is_first { 1 } else { 3 };
// existing batch + item + array end bracket
if batch.len() + pending_len > self.max_packet_size {
self.overflow = Some(item);
break;
}
if !is_first {
batch.push_str(",\n");
}
batch.push_str(&item);
self.stats.add_result();
}
batch.push_str("]");
if batch.len() > self.max_packet_size {
error!(
"returned a string longer than maximum specified (actual {}, max {})",
batch.len(),
self.max_packet_size
)
}
// we only want to return an item if we wrote more than opening & closing brackets,
// and as a string the batch's length is measured in bytes
if batch.len() > 2 {
Poll::Ready(Some(Ok(JsonString(batch))))
} else {
if items_is_pending {
Poll::Pending
} else {
Poll::Ready(None)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::diagnostics::ArchiveAccessorStats;
use futures::stream::iter;
#[fuchsia_async::run_singlethreaded(test)]
async fn two_items_joined_and_split() {
let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
let smallest_possible_joined_len = joined[0].len();
let make_packets = |max| async move {
let node = fuchsia_inspect::Node::default();
let accessor_stats = Arc::new(ArchiveAccessorStats::new(node));
let test_stats = Arc::new(DiagnosticsServerStats::for_logs(accessor_stats));
JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
.collect::<Vec<_>>()
.await
.into_iter()
.map(|s| s.unwrap().0)
.collect::<Vec<_>>()
};
let actual_joined = make_packets(smallest_possible_joined_len).await;
assert_eq!(&actual_joined[..], joined);
let actual_split = make_packets(smallest_possible_joined_len - 1).await;
assert_eq!(&actual_split[..], split);
}
}