blob: df4fe4f4fefec3ab274515f2791c9a9dc6b7ab55 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/storage/blobfs/compression/streaming_chunked_decompressor.h"
#include <algorithm>
#include "src/storage/blobfs/compression/chunked.h"
namespace {
const size_t kSystemPageSize = zx_system_get_page_size();
}
namespace blobfs {
zx::result<std::unique_ptr<StreamingChunkedDecompressor>> StreamingChunkedDecompressor::Create(
DecompressorCreatorConnector& connector, const chunked_compression::SeekTable& seek_table,
StreamingChunkedDecompressor::StreamCallback stream_callback) {
ZX_DEBUG_ASSERT(stream_callback != nullptr);
zx_status_t status;
fzl::OwnedVmoMapper decompression_buff;
zx::vmo compression_buff;
if (seek_table.Entries().empty()) {
return zx::error(ZX_ERR_INVALID_ARGS);
}
using chunked_compression::SeekTableEntry;
SeekTableEntry* largest_decompressed_entry =
std::max_element(seek_table.Entries().begin(), seek_table.Entries().end(),
[](const SeekTableEntry& a, const SeekTableEntry& b) {
return a.decompressed_size < b.decompressed_size;
});
status = decompression_buff.CreateAndMap(largest_decompressed_entry->decompressed_size,
"blobfs-write-decomp-buff");
if (status != ZX_OK) {
return zx::error(status);
}
status = zx::vmo::create(seek_table.CompressedSize(), 0, &compression_buff);
if (status != ZX_OK) {
return zx::error(status);
}
zx::result client_or =
ExternalDecompressorClient::Create(&connector, decompression_buff.vmo(), compression_buff);
if (client_or.is_error()) {
return client_or.take_error();
}
std::unique_ptr<StreamingChunkedDecompressor> out(new StreamingChunkedDecompressor(
seek_table, std::move(client_or.value()), std::move(decompression_buff),
std::move(compression_buff), std::move(stream_callback)));
return zx::ok(std::move(out));
}
StreamingChunkedDecompressor::StreamingChunkedDecompressor(
const chunked_compression::SeekTable& seek_table,
std::unique_ptr<ExternalDecompressorClient> decompressor_client,
fzl::OwnedVmoMapper decompression_buff, zx::vmo compression_buff,
StreamingChunkedDecompressor::StreamCallback stream_callback)
: seek_table_(seek_table),
decompressor_client_(std::move(decompressor_client)),
decompressor_(decompressor_client_.get(), CompressionAlgorithm::kChunked),
decompression_buff_(std::move(decompression_buff)),
compression_buff_(std::move(compression_buff)),
stream_callback_(std::move(stream_callback)) {}
zx::result<> StreamingChunkedDecompressor::Update(cpp20::span<const uint8_t> data) {
if (compressed_bytes_ + data.size_bytes() > seek_table_.CompressedSize()) {
return zx::error(ZX_ERR_OUT_OF_RANGE);
}
if (curr_entry_ == seek_table_.Entries().end()) {
return zx::error(ZX_ERR_OUT_OF_RANGE);
}
compression_buff_.write(data.data(), compressed_bytes_, data.size_bytes());
compressed_bytes_ += data.size_bytes();
// If we don't have enough bytes to decode the next seek table entry, wait for more data.
if (compressed_bytes_ < curr_entry_->compressed_offset + curr_entry_->compressed_size) {
return zx::ok();
}
while (curr_entry_ != seek_table_.Entries().end() &&
compressed_bytes_ >= curr_entry_->compressed_offset + curr_entry_->compressed_size) {
zx_status_t status =
decompressor_.DecompressRange(curr_entry_->compressed_offset, curr_entry_->compressed_size,
curr_entry_->decompressed_size);
if (status != ZX_OK) {
return zx::error(status);
}
zx::result result = stream_callback_(
{static_cast<const uint8_t*>(decompression_buff_.start()), curr_entry_->decompressed_size});
if (result.is_error()) {
return result.take_error();
}
++curr_entry_;
}
// Decommit unused pages from compressed buffer VMO.
if (curr_entry_ != seek_table_.Entries().end()) {
// We have more seek table entries to process, decommit all pages behind the current entry.
zx_status_t status = compression_buff_.op_range(
ZX_VMO_OP_DECOMMIT, 0, fbl::round_down(curr_entry_->compressed_offset, kSystemPageSize),
nullptr, 0);
if (status != ZX_OK) {
return zx::error(status);
}
} else {
// We processed all seek table entries, we can just destroy the buffer VMOs.
compression_buff_.reset();
decompression_buff_.Reset();
}
return zx::ok();
}
} // namespace blobfs