blob: 6b04db478692ae98488807c2f8898c9def2e59af [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#pragma once
#include <map>
#include "optional.h"
#include "sink.h"
#include "slice.h"
namespace overnet {
class Linearizer final : public Sink<Chunk>, public Source<Slice> {
public:
explicit Linearizer(uint64_t max_buffer);
// Override Sink<Chunk>.
void Push(Chunk chunk, StatusCallback done) override;
void Close(const Status& status, Callback<void> quiesced) override;
// Override Source<Slice>.
void Pull(StatusOrCallback<Optional<Slice>> ready) override;
void Close(const Status& status) override;
private:
void IntegratePush(Chunk chunk, StatusCallback done);
void ValidateInternals() const;
const uint64_t max_buffer_;
uint64_t offset_ = 0;
Optional<uint64_t> length_;
std::map<uint64_t, std::pair<Slice, StatusCallback>> pending_push_;
StatusOrCallback<Optional<Slice>> ready_;
bool closed_ = false;
Status closed_error_{Status::Ok()};
};
// Expects one Close() from Sink, and one from Source, then deletes itself.
class ReffedLinearizer final : public Sink<Chunk>, public Source<Slice> {
public:
static ReffedLinearizer* Make(uint64_t max_buffer) {
return new ReffedLinearizer(max_buffer);
}
// Override Sink<Chunk> and Source<Slice>.
// Override Sink<Chunk>.
void Push(Chunk chunk, StatusCallback done) override {
impl_.Push(std::move(chunk), std::move(done));
}
void Close(const Status& status, Callback<void> quiesced) override {
assert(quiesced_.empty());
assert(!quiesced.empty());
quiesced_ = std::move(quiesced);
Close(status);
}
// Override Souce<Slice>.
void Pull(StatusOrCallback<Optional<Slice>> ready) override {
impl_.Pull(std::move(ready));
}
void Close(const Status& status) override {
impl_.Close(status);
if (--refs_ == 0) {
quiesced_();
delete this;
}
}
private:
explicit ReffedLinearizer(uint64_t max_buffer) : impl_(max_buffer) {}
int refs_ = 2;
Linearizer impl_;
Callback<void> quiesced_;
};
} // namespace overnet