| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| use { |
| crate::{diagnostics::BatchIteratorConnectionStats, error::AccessorError}, |
| fidl_fuchsia_diagnostics::{ |
| DataType, Format, FormattedContent, StreamMode, MAXIMUM_ENTRIES_PER_BATCH, |
| }, |
| fuchsia_sync::Mutex, |
| fuchsia_zircon as zx, |
| futures::prelude::*, |
| serde::Serialize, |
| std::{ |
| io::{BufWriter, Result as IoResult, Write}, |
| pin::Pin, |
| sync::Arc, |
| task::{Context, Poll}, |
| }, |
| tracing::{error, warn}, |
| }; |
| |
| pub type FormattedStream = |
| Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>; |
| |
| #[pin_project::pin_project] |
| pub struct FormattedContentBatcher<C> { |
| #[pin] |
| items: C, |
| stats: Arc<BatchIteratorConnectionStats>, |
| } |
| |
| /// Make a new `FormattedContentBatcher` with a chunking strategy depending on stream mode. |
| /// |
| /// In snapshot mode, batched items will not be flushed to the client until the batch is complete |
| /// or the underlying stream has terminated. |
| /// |
| /// In subscribe or snapshot-then-subscribe mode, batched items will be flushed whenever the |
| /// underlying stream is pending, ensuring clients always receive latest results. |
| pub fn new_batcher<I, T, E>( |
| items: I, |
| stats: Arc<BatchIteratorConnectionStats>, |
| mode: StreamMode, |
| ) -> FormattedStream |
| where |
| I: Stream<Item = Result<T, E>> + Send + 'static, |
| T: Into<FormattedContent> + Send, |
| E: Into<AccessorError> + Send, |
| { |
| match mode { |
| StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => { |
| Box::pin(FormattedContentBatcher { |
| items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _), |
| stats, |
| }) |
| } |
| StreamMode::Snapshot => Box::pin(FormattedContentBatcher { |
| items: items.chunks(MAXIMUM_ENTRIES_PER_BATCH as _), |
| stats, |
| }), |
| } |
| } |
| |
| impl<I, T, E> Stream for FormattedContentBatcher<I> |
| where |
| I: Stream<Item = Vec<Result<T, E>>>, |
| T: Into<FormattedContent>, |
| E: Into<AccessorError>, |
| { |
| type Item = Vec<Result<FormattedContent, AccessorError>>; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let this = self.project(); |
| match this.items.poll_next(cx) { |
| Poll::Ready(Some(chunk)) => { |
| // loop over chunk instead of into_iter/map because we can't move `this` |
| let mut batch = vec![]; |
| for item in chunk { |
| let result = match item { |
| Ok(i) => Ok(i.into()), |
| Err(e) => { |
| this.stats.add_result_error(); |
| Err(e.into()) |
| } |
| }; |
| batch.push(result); |
| } |
| Poll::Ready(Some(batch)) |
| } |
| Poll::Ready(None) => Poll::Ready(None), |
| Poll::Pending => Poll::Pending, |
| } |
| } |
| } |
| |
| #[derive(Clone)] |
| struct VmoWriter { |
| inner: Arc<Mutex<InnerVmoWriter>>, |
| } |
| |
| enum InnerVmoWriter { |
| Active { vmo: zx::Vmo, capacity: u64, tail: u64 }, |
| Done, |
| } |
| |
| impl VmoWriter { |
| // TODO(https://fxbug.dev/42125551): take the name of the VMO as well. |
| fn new(start_size: u64) -> Self { |
| let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, start_size) |
| .expect("can always create resizable vmo's"); |
| let capacity = vmo.get_size().expect("can always read vmo size"); |
| Self { inner: Arc::new(Mutex::new(InnerVmoWriter::Active { vmo, capacity, tail: 0 })) } |
| } |
| |
| fn tail(&self) -> u64 { |
| let guard = self.inner.lock(); |
| match &*guard { |
| InnerVmoWriter::Done => 0, |
| InnerVmoWriter::Active { tail, .. } => *tail, |
| } |
| } |
| |
| fn capacity(&self) -> u64 { |
| let guard = self.inner.lock(); |
| match &*guard { |
| InnerVmoWriter::Done => 0, |
| InnerVmoWriter::Active { capacity, .. } => *capacity, |
| } |
| } |
| |
| fn finalize(self) -> Option<(zx::Vmo, u64)> { |
| let mut inner = self.inner.lock(); |
| let mut swapped = InnerVmoWriter::Done; |
| std::mem::swap(&mut *inner, &mut swapped); |
| match swapped { |
| InnerVmoWriter::Done => None, |
| InnerVmoWriter::Active { vmo, tail, .. } => Some((vmo, tail)), |
| } |
| } |
| |
| fn reset(&mut self, new_tail: u64, new_capacity: u64) { |
| let mut inner = self.inner.lock(); |
| match &mut *inner { |
| InnerVmoWriter::Done => {} |
| InnerVmoWriter::Active { vmo, capacity, tail } => { |
| vmo.set_size(new_capacity).expect("can always resize a plain vmo"); |
| *capacity = new_capacity; |
| *tail = new_tail; |
| } |
| } |
| } |
| } |
| |
| impl Write for VmoWriter { |
| fn write(&mut self, buf: &[u8]) -> IoResult<usize> { |
| match &mut *self.inner.lock() { |
| InnerVmoWriter::Done => Ok(0), |
| InnerVmoWriter::Active { vmo, tail, capacity } => { |
| let new_tail = *tail + buf.len() as u64; |
| if new_tail > *capacity { |
| vmo.set_size(new_tail).expect("can always resize a plain vmo"); |
| *capacity = new_tail; |
| } |
| vmo.write(buf, *tail)?; |
| *tail = new_tail; |
| Ok(buf.len()) |
| } |
| } |
| } |
| |
| fn flush(&mut self) -> IoResult<()> { |
| Ok(()) |
| } |
| } |
| |
| /// Holds a VMO containing valid serialized data as well as the size of that data. |
| pub struct SerializedVmo { |
| pub vmo: zx::Vmo, |
| pub size: u64, |
| format: Format, |
| } |
| |
| impl SerializedVmo { |
| pub fn serialize( |
| source: &impl Serialize, |
| data_type: DataType, |
| format: Format, |
| ) -> Result<Self, AccessorError> { |
| let writer = VmoWriter::new(match data_type { |
| DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES as u64, |
| // Logs won't go through this codepath anyway, but in case we ever want to serialize a |
| // single log instance it makes sense to start at the page size. |
| DataType::Logs => 4096, // page size |
| }); |
| let batch_writer = BufWriter::new(writer.clone()); |
| match format { |
| Format::Json => { |
| serde_json::to_writer(batch_writer, source).map_err(AccessorError::Serialization)? |
| } |
| Format::Cbor => serde_cbor::to_writer(batch_writer, source) |
| .map_err(AccessorError::CborSerialization)?, |
| Format::Text => unreachable!("We'll never get Text"), |
| } |
| // Safe to unwrap we should always be able to take the vmo here. |
| let (vmo, tail) = writer.finalize().unwrap(); |
| Ok(Self { vmo, size: tail, format }) |
| } |
| } |
| |
| impl From<SerializedVmo> for FormattedContent { |
| fn from(content: SerializedVmo) -> FormattedContent { |
| match content.format { |
| Format::Json => { |
| // set_content_size() is redundant, but consumers may expect the size there. |
| content |
| .vmo |
| .set_content_size(&content.size) |
| .expect("set_content_size always returns Ok"); |
| FormattedContent::Json(fidl_fuchsia_mem::Buffer { |
| vmo: content.vmo, |
| size: content.size, |
| }) |
| } |
| Format::Cbor => { |
| content |
| .vmo |
| .set_content_size(&content.size) |
| .expect("set_content_size always returns Ok"); |
| FormattedContent::Cbor(content.vmo) |
| } |
| Format::Text => unreachable!("We'll never get Text"), |
| } |
| } |
| } |
| |
| /// Wraps an iterator over serializable items and yields FormattedContents, packing items |
| /// into a JSON array in each VMO up to the size limit provided. |
| #[pin_project::pin_project] |
| pub struct JsonPacketSerializer<I, S> { |
| #[pin] |
| items: I, |
| stats: Option<Arc<BatchIteratorConnectionStats>>, |
| max_packet_size: u64, |
| overflow: Option<S>, |
| } |
| |
| impl<I, S> JsonPacketSerializer<I, S> { |
| pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self { |
| Self { items, stats: Some(stats), max_packet_size, overflow: None } |
| } |
| |
| pub fn new_without_stats(max_packet_size: u64, items: I) -> Self { |
| Self { items, max_packet_size, overflow: None, stats: None } |
| } |
| } |
| |
| impl<I, S> Stream for JsonPacketSerializer<I, S> |
| where |
| I: Stream<Item = S> + Unpin, |
| S: Serialize, |
| { |
| type Item = Result<SerializedVmo, AccessorError>; |
| |
| /// Serialize log messages in a JSON array up to the maximum size provided. Returns Ok(None) |
| /// when there are no more messages to serialize. |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut this = self.project(); |
| let mut writer = VmoWriter::new(*this.max_packet_size); |
| writer.write_all(&[b'['])?; |
| |
| if let Some(item) = this.overflow.take() { |
| let batch_writer = BufWriter::new(writer.clone()); |
| serde_json::to_writer(batch_writer, &item)?; |
| if let Some(stats) = &this.stats { |
| stats.add_result(); |
| } |
| } |
| |
| let mut items_is_pending = false; |
| loop { |
| let item = match this.items.poll_next_unpin(cx) { |
| Poll::Ready(Some(item)) => item, |
| Poll::Ready(None) => break, |
| Poll::Pending => { |
| items_is_pending = true; |
| break; |
| } |
| }; |
| |
| let writer_tail = writer.tail(); |
| let is_first = writer_tail == 1; |
| let (last_tail, previous_size) = (writer_tail, writer.capacity()); |
| if !is_first { |
| writer.write_all(",\n".as_bytes())?; |
| } |
| let batch_writer = BufWriter::new(writer.clone()); |
| serde_json::to_writer(batch_writer, &item)?; |
| let writer_tail = writer.tail(); |
| let item_len = writer_tail - last_tail; |
| |
| // +1 for the ending bracket |
| if item_len + 1 >= *this.max_packet_size { |
| warn!( |
| "serializing oversize item into packet (limit={} actual={})", |
| *this.max_packet_size, |
| writer_tail - last_tail, |
| ); |
| } |
| |
| // existing batch + item + array end bracket |
| if writer_tail + 1 > *this.max_packet_size { |
| writer.reset(last_tail, previous_size); |
| *this.overflow = Some(item); |
| break; |
| } |
| |
| if let Some(stats) = &this.stats { |
| stats.add_result(); |
| } |
| } |
| |
| writer.write_all(&[b']'])?; |
| let writer_tail = writer.tail(); |
| if writer_tail > *this.max_packet_size { |
| error!( |
| actual = writer_tail, |
| max = *this.max_packet_size, |
| "returned a string longer than maximum specified", |
| ) |
| } |
| |
| // we only want to return an item if we wrote more than opening & closing brackets, |
| // and as a string the batch's length is measured in bytes |
| if writer_tail > 2 { |
| // safe to unwrap, the vmo is guaranteed to be present. |
| let (vmo, size) = writer.finalize().unwrap(); |
| Poll::Ready(Some(Ok(SerializedVmo { vmo, size, format: Format::Json }))) |
| } else if items_is_pending { |
| Poll::Pending |
| } else { |
| Poll::Ready(None) |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::diagnostics::AccessorStats; |
| use futures::stream::iter; |
| |
| #[fuchsia::test] |
| async fn two_items_joined_and_split() { |
| let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"]; |
| let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"]; |
| let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#]; |
| let smallest_possible_joined_len = joined[0].len() as u64; |
| |
| let make_packets = |max| async move { |
| let node = fuchsia_inspect::Node::default(); |
| let accessor_stats = Arc::new(AccessorStats::new(node)); |
| let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator()); |
| JsonPacketSerializer::new(test_stats, max, iter(inputs.iter())) |
| .collect::<Vec<_>>() |
| .await |
| .into_iter() |
| .map(|r| { |
| let result = r.unwrap(); |
| let mut buf = vec![0; result.size as usize]; |
| result.vmo.read(&mut buf, 0).expect("reading vmo"); |
| std::str::from_utf8(&buf).unwrap().to_string() |
| }) |
| .collect::<Vec<_>>() |
| }; |
| |
| let actual_joined = make_packets(smallest_possible_joined_len).await; |
| assert_eq!(&actual_joined[..], joined); |
| |
| let actual_split = make_packets(smallest_possible_joined_len - 1).await; |
| assert_eq!(&actual_split[..], split); |
| } |
| } |