| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <lib/syslog/cpp/macros.h> |
| #include <zircon/assert.h> |
| |
| #include <algorithm> |
| |
| #include <fbl/algorithm.h> |
| #include <src/lib/chunked-compression/chunked-archive.h> |
| #include <src/lib/chunked-compression/chunked-compressor.h> |
| #include <src/lib/chunked-compression/status.h> |
| #include <src/lib/chunked-compression/streaming-chunked-compressor.h> |
| #include <zstd/zstd.h> |
| |
| namespace chunked_compression { |
| |
| struct StreamingChunkedCompressor::CompressionContext { |
| CompressionContext() = default; |
| explicit CompressionContext(ZSTD_CCtx* ctx) : inner_(ctx) {} |
| ~CompressionContext() { ZSTD_freeCCtx(inner_); } |
| |
| size_t current_output_frame_start_ = 0ul; |
| size_t current_output_frame_relative_pos_ = 0ul; |
| |
| ZSTD_CCtx* inner_; |
| }; |
| |
| StreamingChunkedCompressor::StreamingChunkedCompressor() |
| : StreamingChunkedCompressor(CompressionParams{}) {} |
| |
| StreamingChunkedCompressor::StreamingChunkedCompressor(CompressionParams params) |
| : params_(params), context_(std::make_unique<CompressionContext>(ZSTD_createCCtx())) { |
| // TODO: create factory method, return status instead of asserting. |
| ZX_ASSERT(params.IsValid()); |
| } |
| |
| StreamingChunkedCompressor::~StreamingChunkedCompressor() {} |
| |
| StreamingChunkedCompressor::StreamingChunkedCompressor(StreamingChunkedCompressor&& o) { |
| MoveFrom(std::move(o)); |
| } |
| |
| StreamingChunkedCompressor& StreamingChunkedCompressor::operator=(StreamingChunkedCompressor&& o) { |
| MoveFrom(std::move(o)); |
| return *this; |
| } |
| |
| void StreamingChunkedCompressor::MoveFrom(StreamingChunkedCompressor&& o) { |
| compressed_output_ = o.compressed_output_; |
| o.compressed_output_ = nullptr; |
| |
| compressed_output_len_ = o.compressed_output_len_; |
| compressed_output_offset_ = o.compressed_output_offset_; |
| o.compressed_output_offset_ = 0ul; |
| |
| input_len_ = o.input_len_; |
| input_offset_ = o.input_offset_; |
| o.input_offset_ = 0ul; |
| |
| header_writer_ = std::move(o.header_writer_); |
| |
| progress_callback_ = std::move(o.progress_callback_); |
| |
| params_ = o.params_; |
| |
| context_ = std::move(o.context_); |
| } |
| |
| Status StreamingChunkedCompressor::Init(size_t stream_len, void* output, size_t output_len) { |
| size_t num_frames = HeaderWriter::NumFramesForDataSize(stream_len, params_.chunk_size); |
| size_t metadata_size = HeaderWriter::MetadataSizeForNumFrames(num_frames); |
| if (metadata_size > output_len) { |
| return kStatusErrBufferTooSmall; |
| } |
| |
| size_t r = ZSTD_initCStream(context_->inner_, params_.compression_level); |
| if (ZSTD_isError(r)) { |
| FX_LOGS(ERROR) << "Failed to init stream"; |
| return kStatusErrInternal; |
| } |
| if (params_.frame_checksum) { |
| r = ZSTD_CCtx_setParameter(context_->inner_, ZSTD_c_checksumFlag, 1); |
| if (ZSTD_isError(r)) { |
| FX_LOGS(ERROR) << "Failed to init stream"; |
| return kStatusErrInternal; |
| } |
| } |
| |
| compressed_output_ = static_cast<uint8_t*>(output); |
| compressed_output_len_ = output_len; |
| compressed_output_offset_ = metadata_size; |
| |
| input_len_ = stream_len; |
| input_offset_ = 0ul; |
| |
| Status status = StartFrame(); |
| if (status != kStatusOk) { |
| compressed_output_ = nullptr; |
| return status; |
| } |
| |
| status = HeaderWriter::Create(output, metadata_size, num_frames, &header_writer_); |
| if (status != kStatusOk) { |
| compressed_output_ = nullptr; |
| return status; |
| } |
| |
| return kStatusOk; |
| } |
| |
| Status StreamingChunkedCompressor::Update(const void* input, size_t len) { |
| if (compressed_output_ == nullptr) { |
| return kStatusErrBadState; |
| } else if (len > input_len_ - input_offset_) { |
| // |len| takes us past the expected end of the input stream |
| return kStatusErrInvalidArgs; |
| } |
| |
| size_t consumed = 0; |
| // Consume input up to one input frame at a time. |
| while (consumed < len) { |
| const size_t bytes_left = len - consumed; |
| |
| const size_t current_frame_start = fbl::round_down(input_offset_, params_.chunk_size); |
| const size_t current_frame_end = std::min(current_frame_start + params_.chunk_size, input_len_); |
| const size_t bytes_left_in_current_frame = current_frame_end - input_offset_; |
| |
| const size_t bytes_to_consume = std::min(bytes_left, bytes_left_in_current_frame); |
| |
| Status status = AppendToFrame(static_cast<const uint8_t*>(input) + consumed, bytes_to_consume); |
| if (status != kStatusOk) { |
| return status; |
| } |
| consumed += bytes_to_consume; |
| } |
| return kStatusOk; |
| } |
| |
| Status StreamingChunkedCompressor::Final(size_t* compressed_size_out) { |
| if (compressed_output_ == nullptr) { |
| return kStatusErrBadState; |
| } |
| |
| if (input_offset_ < input_len_) { |
| // Final() was called before the entire input was processed. |
| return kStatusErrBadState; |
| } |
| // There should not be any pending output frames. |
| ZX_DEBUG_ASSERT(context_->current_output_frame_relative_pos_ == 0ul); |
| |
| Status status = header_writer_.Finalize(); |
| if (status == kStatusOk) { |
| *compressed_size_out = compressed_output_offset_; |
| } |
| return status; |
| } |
| |
| Status StreamingChunkedCompressor::StartFrame() { |
| ZX_DEBUG_ASSERT(context_->current_output_frame_relative_pos_ == 0ul); |
| |
| context_->current_output_frame_start_ = compressed_output_offset_; |
| |
| // Since we know the data size in advance we can optimize compression by hinting the size |
| // to zstd. This will make the entire chunk be written as a single data frame |
| size_t next_chunk_size = std::min(params_.chunk_size, input_len_ - input_offset_); |
| size_t r = ZSTD_CCtx_reset(context_->inner_, ZSTD_reset_session_only); |
| if (ZSTD_isError(r)) { |
| return kStatusErrInternal; |
| } |
| r = ZSTD_CCtx_setPledgedSrcSize(context_->inner_, next_chunk_size); |
| if (ZSTD_isError(r)) { |
| return kStatusErrInternal; |
| } |
| |
| return kStatusOk; |
| } |
| |
| Status StreamingChunkedCompressor::EndFrame(size_t uncompressed_frame_start, |
| size_t uncompressed_frame_len) { |
| ZX_DEBUG_ASSERT(uncompressed_frame_start % params_.chunk_size == 0); |
| |
| SeekTableEntry entry; |
| entry.decompressed_offset = uncompressed_frame_start; |
| entry.decompressed_size = uncompressed_frame_len; |
| entry.compressed_offset = context_->current_output_frame_start_; |
| entry.compressed_size = compressed_output_offset_ - context_->current_output_frame_start_; |
| Status status = header_writer_.AddEntry(entry); |
| if (status != kStatusOk) { |
| return status; |
| } |
| |
| context_->current_output_frame_relative_pos_ = 0ul; |
| |
| if (progress_callback_) { |
| (*progress_callback_)(input_offset_, input_len_, compressed_output_offset_); |
| } |
| |
| return kStatusOk; |
| } |
| |
| Status StreamingChunkedCompressor::AppendToFrame(const void* data, size_t len) { |
| const size_t current_frame_start = fbl::round_down(input_offset_, params_.chunk_size); |
| const size_t current_frame_end = std::min(current_frame_start + params_.chunk_size, input_len_); |
| |
| const size_t bytes_left_in_current_frame = current_frame_end - input_offset_; |
| ZX_DEBUG_ASSERT(len <= bytes_left_in_current_frame); |
| |
| const bool will_finish_frame = bytes_left_in_current_frame == len; |
| |
| ZSTD_inBuffer in_buf; |
| in_buf.src = data; |
| in_buf.size = len; |
| in_buf.pos = 0ul; |
| |
| // |out_buf| is set up to be relative to the current output frame we are processing. |
| ZSTD_outBuffer out_buf; |
| // dst is the start of the frame. |
| out_buf.dst = static_cast<uint8_t*>(compressed_output_) + context_->current_output_frame_start_; |
| // size is the total number of bytes left in the output buffer, from the start of the current |
| // frame. |
| out_buf.size = compressed_output_len_ - context_->current_output_frame_start_; |
| // pos is the progress past the start of the frame so far. |
| out_buf.pos = context_->current_output_frame_relative_pos_; |
| |
| ZX_DEBUG_ASSERT(context_->current_output_frame_start_ + out_buf.size <= compressed_output_len_); |
| ZX_DEBUG_ASSERT(out_buf.pos <= out_buf.size); |
| |
| size_t r = ZSTD_compressStream(context_->inner_, &out_buf, &in_buf); |
| if (ZSTD_isError(r)) { |
| FX_LOGS(ERROR) << "ZSTD_compressStream failed: " << ZSTD_getErrorName(r); |
| return kStatusErrInternal; |
| } else if (in_buf.pos < in_buf.size) { |
| FX_LOGS(ERROR) << "Partial read"; |
| return kStatusErrInternal; |
| } |
| |
| if (will_finish_frame) { |
| r = ZSTD_endStream(context_->inner_, &out_buf); |
| if (ZSTD_isError(r)) { |
| FX_LOGS(ERROR) << "ZSTD_endStream failed: " << ZSTD_getErrorName(r); |
| return kStatusErrInternal; |
| } |
| } |
| |
| input_offset_ += len; |
| compressed_output_offset_ += (out_buf.pos - context_->current_output_frame_relative_pos_); |
| |
| if (will_finish_frame) { |
| // Case 1: The frame is finished. Write the seek table entry and advance to the next output |
| // frame. |
| Status status = EndFrame(current_frame_start, current_frame_end - current_frame_start); |
| if (status != kStatusOk) { |
| FX_LOGS(ERROR) << "Failed to finalize frame"; |
| return status; |
| } |
| |
| if (input_offset_ < input_len_) { |
| status = StartFrame(); |
| if (status != kStatusOk) { |
| FX_LOGS(ERROR) << "Failed to start next frame"; |
| } |
| } |
| } else { |
| // Case 2: The frame isn't complete yet. Mark our progress. |
| context_->current_output_frame_relative_pos_ = out_buf.pos; |
| } |
| |
| return kStatusOk; |
| } |
| |
| } // namespace chunked_compression |