blob: 78df48586c8d37ac0b71059fd57a6038f4139281 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chunk_input_stream.h"
#include <algorithm>
ChunkInputStream::ChunkInputStream(size_t chunk_size,
TimestampExtrapolator&& timestamp_extrapolator,
InputBlockProcessor&& input_block_processor)
: chunk_size_(chunk_size),
timestamp_extrapolator_(std::move(timestamp_extrapolator)),
input_block_processor_(std::move(input_block_processor)) {
ZX_DEBUG_ASSERT_MSG(chunk_size_ != 0, "A chunk size of zero will never make progress.");
ZX_DEBUG_ASSERT(input_block_processor_);
scratch_block_.data.resize(chunk_size_);
}
ChunkInputStream::Status ChunkInputStream::ProcessInputPacket(
const CodecPacket* input_codec_packet) {
ZX_DEBUG_ASSERT(input_codec_packet);
ZX_DEBUG_ASSERT_MSG(!early_terminated_, "This stream was terminated by the user.");
if (input_codec_packet->has_timestamp_ish()) {
timestamp_extrapolator_.Inform(BytesSeen(), input_codec_packet->timestamp_ish());
}
InputPacket input_packet = {.packet = input_codec_packet};
if (!scratch_block_.empty()) {
AppendToScratchBlock(&input_packet);
}
if (scratch_block_.full()) {
Status status;
if ((status = EmitBlock(scratch_block_.data.data(), chunk_size_)) != kOk) {
return status;
}
scratch_block_.len = 0;
}
if (input_packet.bytes_unread() > 0) {
ZX_DEBUG_ASSERT_MSG(!next_output_timestamp_,
"Any stashed timestamp should have been used when "
"emitting the scratch block.");
auto [timestamp, status] = ExtrapolateTimestamp();
if (status != kOk) {
return status;
}
next_output_timestamp_ = timestamp;
}
while (input_packet.bytes_unread() >= chunk_size_) {
Status status;
if ((status = EmitBlock(input_packet.data_at_offset(), chunk_size_)) != kOk) {
return status;
}
input_packet.offset += chunk_size_;
}
AppendToScratchBlock(&input_packet);
ZX_DEBUG_ASSERT_MSG(input_packet.bytes_unread() == 0,
"We should leave no bytes unread in the input packet.");
return kOk;
}
ChunkInputStream::Status ChunkInputStream::Flush() {
ZX_DEBUG_ASSERT_MSG(!early_terminated_, "This stream was terminated by the user.");
if (scratch_block_.empty_bytes_left() > 0) {
memset(scratch_block_.empty_start(), 0, scratch_block_.empty_bytes_left());
}
auto result = EmitBlock(scratch_block_.data.data(), scratch_block_.len,
/*is_end_of_stream=*/true);
scratch_block_.len = 0;
return result;
}
void ChunkInputStream::AppendToScratchBlock(InputPacket* input_packet) {
ZX_DEBUG_ASSERT(input_packet);
const size_t n = std::min(input_packet->bytes_unread(), scratch_block_.empty_bytes_left());
if (n == 0) {
return;
}
memcpy(scratch_block_.empty_start(), input_packet->data_at_offset(), n);
input_packet->offset += n;
scratch_block_.len += n;
}
ChunkInputStream::Status ChunkInputStream::EmitBlock(const uint8_t* data,
const size_t non_padding_len,
const bool is_end_of_stream) {
stream_index_ += chunk_size_;
std::optional<uint64_t> timestamp_ish;
std::swap(timestamp_ish, next_output_timestamp_);
std::optional<uint64_t> flush_timestamp;
if (is_end_of_stream) {
auto [timestamp, status] = ExtrapolateTimestamp();
if (status != kOk) {
return status;
}
flush_timestamp = timestamp;
}
InputBlock input_block;
input_block.data = data;
input_block.len = chunk_size_;
input_block.non_padding_len = non_padding_len;
input_block.is_end_of_stream = is_end_of_stream;
input_block.timestamp_ish = timestamp_ish;
input_block.flush_timestamp_ish = flush_timestamp;
if (input_block_processor_(input_block) == kTerminate) {
early_terminated_ = true;
return kUserTerminated;
}
return kOk;
}
std::pair<std::optional<uint64_t>, ChunkInputStream::Status>
ChunkInputStream::ExtrapolateTimestamp() {
std::optional<uint64_t> timestamp;
if (timestamp_extrapolator_.has_information() &&
!(timestamp = timestamp_extrapolator_.Extrapolate(stream_index_))) {
return {std::nullopt, kExtrapolationFailedWithoutTimebase};
}
return {timestamp, kOk};
}
size_t ChunkInputStream::BytesSeen() const { return stream_index_ + scratch_block_.len; }