| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! Async gzip decompression for a [`Stream`] of bytes. |
| |
| use bytes::Bytes; |
| use errors::wrap_error; |
| use futures::prelude::*; |
| use futures::stream::FusedStream; |
| use std::time::Duration; |
| mod asyncbufread_to_stream; |
| mod errors; |
| |
| pub use errors::{DecodeError, Error}; |
| |
| /// Size of decompressed chunks (except possibly the last chunk, |
| /// which may be smaller). |
| const OUTPUT_CHUNK_SIZE: usize = 32 * 1024; |
| |
| /// Decode a stream of gzip-compressed data. |
| /// |
| /// The input stream is a sequence of chunks, each of which may contain |
| /// any number of bytes. |
| /// |
| /// Output chunks are typically 32 KiB each, but this behavior should not |
| /// be relied upon. |
| /// |
| /// # Errors |
| /// |
| /// If the input stream yields an error, or we detect that the input is not |
| /// well-formed gzip, we stop decoding immediately and yield an error. |
| /// |
| /// In this case, some output chunks may already have been yielded. However, |
| /// we will _not_ attempt to yield an additional (partial) output chunk before |
| /// yielding the error. |
| /// |
| /// No more output chunks or errors are yielded after the first error. |
| /// Subsequent calls to `.next()` return `None`. |
| pub fn decode<B, E>( |
| compressed_input: impl Stream<Item = Result<B, E>>, |
| ) -> impl FusedStream<Item = Result<Bytes, Error<E>>> |
| where |
| B: AsRef<[u8]>, |
| E: std::error::Error + Send + Sync + 'static, |
| { |
| let output = decode_with_stats(compressed_input); |
| |
| output.map(|i| match i { |
| Ok((bytes, _)) => Ok(bytes), |
| Err(x) => Err(x), |
| }) |
| } |
| |
| /// An additional wrapper to decode a stream of gzip-compressed data. |
| /// |
| /// This API returns a tuple of decompressed bytes and a struct containing |
| /// statistics related to input size, time spent reading input, and time |
| /// spent decompressing it. |
| pub fn decode_with_stats<B, E>( |
| compressed_input: impl Stream<Item = Result<B, E>>, |
| ) -> impl FusedStream<Item = Result<(Bytes, ChunkStats), Error<E>>> |
| where |
| B: AsRef<[u8]>, |
| E: std::error::Error + Send + Sync + 'static, |
| { |
| // TODO(kevan): when https://github.com/rust-lang/futures-rs/pull/2599 gets merged, |
| // we can remove the Box. |
| let compressed_input = Box::pin(compressed_input); |
| |
| // Wrap each error in an io::Error, so we can use the AsyncBufRead-based implementation. |
| let compressed_input = compressed_input.map_err(wrap_error).into_async_read(); |
| |
| let output = asyncbufread_to_stream::decode(compressed_input, OUTPUT_CHUNK_SIZE); |
| |
| // Unwrap each io::Error that contains an error from the underlying stream. |
| output.map_err(Error::unwrap_inner_error) |
| } |
| |
| /// ChunkStats provides information related to the production of the yielded chunk. |
| #[derive(Clone, Debug, Default)] |
| pub struct ChunkStats { |
| /// Time spent in the gzip decoder to produce this chunk. |
| decode_time: Duration, |
| |
| /// Input consumed in the process of yielding a chunk. |
| bytes_read: usize, |
| } |
| |
| impl ChunkStats { |
| pub fn new() -> Self { |
| Self::default() |
| } |
| |
| pub fn clear(&mut self) { |
| *self = ChunkStats::new(); |
| } |
| |
| pub fn decode_time(&self) -> Duration { |
| self.decode_time |
| } |
| |
| pub(crate) fn add_decode_time(&mut self, val: Duration) { |
| self.decode_time += val; |
| } |
| |
| pub fn bytes_read(&self) -> usize { |
| self.bytes_read |
| } |
| |
| pub(crate) fn add_bytes_read(&mut self, val: usize) { |
| self.bytes_read += val; |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use flate2::bufread::GzEncoder; |
| use flate2::Compression; |
| use futures::{executor, pin_mut}; |
| use rand::{Fill, SeedableRng}; |
| use rand_xorshift::XorShiftRng; |
| use std::cmp::min; |
| use std::fmt::Debug; |
| use std::io::Read; |
| |
| #[derive(Debug, thiserror::Error)] |
| pub(crate) enum MockError { |
| #[error("bad thing happened: {0}")] |
| BadThing(String), |
| } |
| |
| fn mock_input_stream( |
| uncompressed: &[u8], |
| chunk_size: usize, |
| ) -> impl Stream<Item = Result<Vec<u8>, MockError>> { |
| let compressed = gzip_compress(&uncompressed); |
| let chunks = split_into_chunks(&compressed, chunk_size); |
| stream::iter(chunks).map(Ok) |
| } |
| |
| pub(crate) fn gzip_compress(bytes: &[u8]) -> Vec<u8> { |
| let mut out = vec![]; |
| GzEncoder::new(bytes, Compression::default()).read_to_end(&mut out).unwrap(); |
| out |
| } |
| |
| pub(crate) fn split_into_chunks(mut bytes: &[u8], chunk_size: usize) -> Vec<Vec<u8>> { |
| let mut chunks = Vec::with_capacity(ceil_div(bytes.len(), chunk_size)); |
| |
| while !bytes.is_empty() { |
| let len = min(bytes.len(), chunk_size); |
| chunks.push(Vec::from(&bytes[..len])); |
| bytes = &bytes[len..]; |
| } |
| |
| chunks |
| } |
| |
| fn ceil_div(x: usize, y: usize) -> usize { |
| let extra = if x % y != 0 { 1 } else { 0 }; |
| x / y + extra |
| } |
| |
| /// Compress some data, then decompress it using [`decode`]. |
| fn assert_round_trip(uncompressed: &[u8], input_chunk_size: usize) { |
| let input_stream = mock_input_stream(uncompressed, input_chunk_size); |
| |
| let output_stream = decode(input_stream).map(Result::unwrap); |
| pin_mut!(output_stream); |
| let decompressed: Vec<u8> = executor::block_on_stream(output_stream).flatten().collect(); |
| |
| assert_eq!(uncompressed, &decompressed); |
| } |
| |
| #[test] |
| fn test_small_examples() { |
| let tests: Vec<&[u8]> = vec![b"Hello world!", b"abc", b"A", b""]; |
| |
| for uncompressed in tests { |
| assert_round_trip(uncompressed, 3); |
| } |
| } |
| |
| /// Deterministically generate a "random-looking" input, which won't |
| /// compress much when gzipped. |
| /// |
| /// NOTE: every time this function is called, you'll get the same bytes |
| /// (or a prefix thereof). |
| pub(crate) fn random_looking_bytes(num_bytes: usize) -> Vec<u8> { |
| let mut fixed_seed_rng = XorShiftRng::seed_from_u64(0); |
| |
| let mut buf = vec![0; num_bytes]; |
| buf.try_fill(&mut fixed_seed_rng).unwrap(); |
| buf |
| } |
| |
| #[test] |
| fn test_random_input() { |
| assert_round_trip(&random_looking_bytes(100), 40); |
| } |
| |
| /// Test that an error in the input stream is propagated to the output stream. |
| #[test] |
| fn test_input_stream_error() { |
| let input_stream = mock_input_stream(&random_looking_bytes(100), 20); |
| |
| // Simulate an error in the input stream. |
| let error = MockError::BadThing("oh no!".into()); |
| let suffix = stream::once(future::ready(Err(error))); |
| let failing_stream = input_stream.take(3).chain(suffix); |
| |
| let output_stream = decode(failing_stream); |
| pin_mut!(output_stream); |
| let result: Result<Vec<Bytes>, _> = executor::block_on_stream(output_stream).collect(); |
| |
| assert!(matches!(result, Err(Error::Input(MockError::BadThing(..))))); |
| } |
| |
| /// Test that a corrupt gzip payload results in a DEFLATE error. |
| #[test] |
| fn test_corrupt_gzip_payload() { |
| let gzip_blob = gzip_compress(b""); |
| |
| // Wrap random garbage in a gzip header and footer. |
| let header = &gzip_blob[..10]; |
| let garbage = &random_looking_bytes(100); |
| let footer = &gzip_blob[gzip_blob.len() - 8..]; |
| |
| let slices = vec![header, garbage, footer].into_iter(); |
| let corrupted: Vec<u8> = slices.flatten().copied().collect(); |
| |
| let chunks = split_into_chunks(&corrupted, 20); |
| let input_stream = stream::iter(chunks).map(Ok::<_, MockError>); |
| |
| let output_stream = decode(input_stream); |
| pin_mut!(output_stream); |
| let result: Result<Vec<Bytes>, _> = executor::block_on_stream(output_stream).collect(); |
| |
| assert!(matches!(result, Err(Error::Decode(DecodeError::Deflate(..))))); |
| } |
| |
| /// Test that [`decode_with_stats`] returns a list of stats with a cumulative time value |
| /// that is non-zero. |
| #[test] |
| fn test_decode_with_stats_time() { |
| let input_stream = mock_input_stream(&random_looking_bytes(100), 20); |
| |
| let output_stream = decode_with_stats(input_stream).map(Result::unwrap); |
| pin_mut!(output_stream); |
| let result = executor::block_on_stream(output_stream).map(|(_, stats)| stats); |
| |
| let cumulative_decode_time = |
| result.fold(Duration::default(), |accumulator, val| accumulator + val.decode_time()); |
| |
| assert!(cumulative_decode_time.as_nanos() > 0); |
| } |
| |
| /// Test that [`decode_with_stats`] returns a list of stats with a cumulative bytes read value |
| /// that is non-zero. |
| #[test] |
| fn test_decode_with_stats_bytes() { |
| let input_stream = mock_input_stream(&random_looking_bytes(100), 20); |
| |
| let output_stream = decode_with_stats(input_stream).map(Result::unwrap); |
| pin_mut!(output_stream); |
| let result = executor::block_on_stream(output_stream).map(|(_, stats)| stats); |
| |
| let bytes_read = result.fold(0usize, |accumulator, val| accumulator + val.bytes_read()); |
| |
| assert!(bytes_read > 0); |
| } |
| } |