blob: 41c76a4990a9e7d019112c3b22e6cb72271ba838 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/syslog/cpp/macros.h>
#include <zircon/assert.h>
#include <algorithm>
#include <fbl/algorithm.h>
#include <src/lib/chunked-compression/chunked-archive.h>
#include <src/lib/chunked-compression/chunked-compressor.h>
#include <src/lib/chunked-compression/status.h>
#include <src/lib/chunked-compression/streaming-chunked-compressor.h>
#include <zstd/zstd.h>
namespace chunked_compression {
struct StreamingChunkedCompressor::CompressionContext {
CompressionContext() = default;
explicit CompressionContext(ZSTD_CCtx* ctx) : inner_(ctx) {}
~CompressionContext() { ZSTD_freeCCtx(inner_); }
size_t current_output_frame_start_ = 0ul;
size_t current_output_frame_relative_pos_ = 0ul;
ZSTD_CCtx* inner_;
};
StreamingChunkedCompressor::StreamingChunkedCompressor()
: StreamingChunkedCompressor(CompressionParams{}) {}
StreamingChunkedCompressor::StreamingChunkedCompressor(CompressionParams params)
: params_(params), context_(std::make_unique<CompressionContext>(ZSTD_createCCtx())) {
// TODO: create factory method, return status instead of asserting.
ZX_ASSERT(params.IsValid());
}
StreamingChunkedCompressor::~StreamingChunkedCompressor() {}
StreamingChunkedCompressor::StreamingChunkedCompressor(StreamingChunkedCompressor&& o) {
MoveFrom(std::move(o));
}
StreamingChunkedCompressor& StreamingChunkedCompressor::operator=(StreamingChunkedCompressor&& o) {
MoveFrom(std::move(o));
return *this;
}
void StreamingChunkedCompressor::MoveFrom(StreamingChunkedCompressor&& o) {
compressed_output_ = o.compressed_output_;
o.compressed_output_ = nullptr;
compressed_output_len_ = o.compressed_output_len_;
compressed_output_offset_ = o.compressed_output_offset_;
o.compressed_output_offset_ = 0ul;
input_len_ = o.input_len_;
input_offset_ = o.input_offset_;
o.input_offset_ = 0ul;
header_writer_ = std::move(o.header_writer_);
progress_callback_ = std::move(o.progress_callback_);
params_ = o.params_;
context_ = std::move(o.context_);
}
Status StreamingChunkedCompressor::Init(size_t stream_len, void* output, size_t output_len) {
size_t num_frames = HeaderWriter::NumFramesForDataSize(stream_len, params_.chunk_size);
size_t metadata_size = HeaderWriter::MetadataSizeForNumFrames(num_frames);
if (metadata_size > output_len) {
return kStatusErrBufferTooSmall;
}
size_t r = ZSTD_initCStream(context_->inner_, params_.compression_level);
if (ZSTD_isError(r)) {
FX_SLOG(ERROR, "Failed to init stream");
return kStatusErrInternal;
}
if (params_.frame_checksum) {
r = ZSTD_CCtx_setParameter(context_->inner_, ZSTD_c_checksumFlag, 1);
if (ZSTD_isError(r)) {
FX_SLOG(ERROR, "Failed to init stream");
return kStatusErrInternal;
}
}
compressed_output_ = static_cast<uint8_t*>(output);
compressed_output_len_ = output_len;
compressed_output_offset_ = metadata_size;
input_len_ = stream_len;
input_offset_ = 0ul;
Status status = StartFrame();
if (status != kStatusOk) {
compressed_output_ = nullptr;
return status;
}
status = HeaderWriter::Create(output, metadata_size, num_frames, &header_writer_);
if (status != kStatusOk) {
compressed_output_ = nullptr;
return status;
}
return kStatusOk;
}
Status StreamingChunkedCompressor::Update(const void* input, size_t len) {
if (compressed_output_ == nullptr) {
return kStatusErrBadState;
} else if (len > input_len_ - input_offset_) {
// |len| takes us past the expected end of the input stream
return kStatusErrInvalidArgs;
}
size_t consumed = 0;
// Consume input up to one input frame at a time.
while (consumed < len) {
const size_t bytes_left = len - consumed;
const size_t current_frame_start = fbl::round_down(input_offset_, params_.chunk_size);
const size_t current_frame_end = std::min(current_frame_start + params_.chunk_size, input_len_);
const size_t bytes_left_in_current_frame = current_frame_end - input_offset_;
const size_t bytes_to_consume = std::min(bytes_left, bytes_left_in_current_frame);
Status status = AppendToFrame(static_cast<const uint8_t*>(input) + consumed, bytes_to_consume);
if (status != kStatusOk) {
return status;
}
consumed += bytes_to_consume;
}
return kStatusOk;
}
Status StreamingChunkedCompressor::Final(size_t* compressed_size_out) {
if (compressed_output_ == nullptr) {
return kStatusErrBadState;
}
if (input_offset_ < input_len_) {
// Final() was called before the entire input was processed.
return kStatusErrBadState;
}
// There should not be any pending output frames.
ZX_DEBUG_ASSERT(context_->current_output_frame_relative_pos_ == 0ul);
Status status = header_writer_.Finalize();
if (status == kStatusOk) {
*compressed_size_out = compressed_output_offset_;
}
return status;
}
Status StreamingChunkedCompressor::StartFrame() {
ZX_DEBUG_ASSERT(context_->current_output_frame_relative_pos_ == 0ul);
context_->current_output_frame_start_ = compressed_output_offset_;
// Since we know the data size in advance we can optimize compression by hinting the size
// to zstd. This will make the entire chunk be written as a single data frame
size_t next_chunk_size = std::min(params_.chunk_size, input_len_ - input_offset_);
size_t r = ZSTD_CCtx_reset(context_->inner_, ZSTD_reset_session_only);
if (ZSTD_isError(r)) {
return kStatusErrInternal;
}
r = ZSTD_CCtx_setPledgedSrcSize(context_->inner_, next_chunk_size);
if (ZSTD_isError(r)) {
return kStatusErrInternal;
}
return kStatusOk;
}
Status StreamingChunkedCompressor::EndFrame(size_t uncompressed_frame_start,
size_t uncompressed_frame_len) {
ZX_DEBUG_ASSERT(uncompressed_frame_start % params_.chunk_size == 0);
SeekTableEntry entry;
entry.decompressed_offset = uncompressed_frame_start;
entry.decompressed_size = uncompressed_frame_len;
entry.compressed_offset = context_->current_output_frame_start_;
entry.compressed_size = compressed_output_offset_ - context_->current_output_frame_start_;
Status status = header_writer_.AddEntry(entry);
if (status != kStatusOk) {
return status;
}
context_->current_output_frame_relative_pos_ = 0ul;
if (progress_callback_) {
(*progress_callback_)(input_offset_, input_len_, compressed_output_offset_);
}
return kStatusOk;
}
Status StreamingChunkedCompressor::AppendToFrame(const void* data, size_t len) {
const size_t current_frame_start = fbl::round_down(input_offset_, params_.chunk_size);
const size_t current_frame_end = std::min(current_frame_start + params_.chunk_size, input_len_);
const size_t bytes_left_in_current_frame = current_frame_end - input_offset_;
ZX_DEBUG_ASSERT(len <= bytes_left_in_current_frame);
const bool will_finish_frame = bytes_left_in_current_frame == len;
ZSTD_inBuffer in_buf;
in_buf.src = data;
in_buf.size = len;
in_buf.pos = 0ul;
// |out_buf| is set up to be relative to the current output frame we are processing.
ZSTD_outBuffer out_buf;
// dst is the start of the frame.
out_buf.dst = static_cast<uint8_t*>(compressed_output_) + context_->current_output_frame_start_;
// size is the total number of bytes left in the output buffer, from the start of the current
// frame.
out_buf.size = compressed_output_len_ - context_->current_output_frame_start_;
// pos is the progress past the start of the frame so far.
out_buf.pos = context_->current_output_frame_relative_pos_;
ZX_DEBUG_ASSERT(context_->current_output_frame_start_ + out_buf.size <= compressed_output_len_);
ZX_DEBUG_ASSERT(out_buf.pos <= out_buf.size);
size_t r = ZSTD_compressStream(context_->inner_, &out_buf, &in_buf);
if (ZSTD_isError(r)) {
FX_SLOG(ERROR, "ZSTD_compressStream failed", KV("status", r),
KV("status_str", ZSTD_getErrorName(r)));
return kStatusErrInternal;
} else if (in_buf.pos < in_buf.size) {
FX_SLOG(ERROR, "Partial read");
return kStatusErrInternal;
}
if (will_finish_frame) {
r = ZSTD_endStream(context_->inner_, &out_buf);
if (ZSTD_isError(r)) {
FX_SLOG(ERROR, "ZSTD_endStream failed", KV("status", r),
KV("status_str", ZSTD_getErrorName(r)));
return kStatusErrInternal;
}
}
input_offset_ += len;
compressed_output_offset_ += (out_buf.pos - context_->current_output_frame_relative_pos_);
if (will_finish_frame) {
// Case 1: The frame is finished. Write the seek table entry and advance to the next output
// frame.
Status status = EndFrame(current_frame_start, current_frame_end - current_frame_start);
if (status != kStatusOk) {
FX_SLOG(ERROR, "Failed to finalize frame");
return status;
}
if (input_offset_ < input_len_) {
status = StartFrame();
if (status != kStatusOk) {
FX_SLOG(ERROR, "Failed to start next frame");
}
}
} else {
// Case 2: The frame isn't complete yet. Mark our progress.
context_->current_output_frame_relative_pos_ = out_buf.pos;
}
return kStatusOk;
}
} // namespace chunked_compression