blob: e1ac5867c2962a14c661dcdce8177b340ad43a92 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#pragma once
#include <map>
#include "optional.h"
#include "sink.h"
#include "slice.h"
#include "trace.h"
namespace overnet {
class Linearizer final : public Sink<Chunk>, public Source<Slice> {
public:
explicit Linearizer(uint64_t max_buffer, TraceSink trace_sink);
~Linearizer();
// Override Sink<Chunk>.
void Push(Chunk chunk) override;
void Close(const Status& status, Callback<void> quiesced) override;
// Override Source<Slice>.
void Pull(StatusOrCallback<Optional<Slice>> ready) override;
void PullAll(StatusOrCallback<std::vector<Slice>> ready) override;
void Close(const Status& status) override;
private:
void IntegratePush(Chunk chunk);
void ValidateInternals() const;
const uint64_t max_buffer_;
const TraceSink trace_sink_;
uint64_t offset_ = 0;
Optional<uint64_t> length_;
std::map<uint64_t, Slice> pending_push_;
enum class ReadMode {
Closed,
Idle,
ReadSlice,
ReadAll,
};
inline friend std::ostream& operator<<(std::ostream& out, ReadMode m) {
switch (m) {
case ReadMode::Closed:
return out << "Closed";
case ReadMode::Idle:
return out << "Idle";
case ReadMode::ReadSlice:
return out << "ReadSlice";
case ReadMode::ReadAll:
return out << "ReadAll";
}
}
struct Closed {
Status status;
};
struct ReadSlice {
StatusOrCallback<Optional<Slice>> done;
};
struct ReadAll {
std::vector<Slice> building;
StatusOrCallback<std::vector<Slice>> done;
};
union ReadData {
ReadData() {}
~ReadData() {}
Closed closed;
ReadSlice read_slice;
ReadAll read_all;
};
ReadMode read_mode_ = ReadMode::Idle;
ReadData read_data_;
void IdleToClosed(const Status& status);
void IdleToReadSlice(StatusOrCallback<Optional<Slice>> done);
void IdleToReadAll(StatusOrCallback<std::vector<Slice>> done);
ReadSlice ReadSliceToIdle();
ReadAll ReadAllToIdle();
void ContinueReadAll();
};
// Expects one Close() from Sink, and one from Source, then deletes itself.
class ReffedLinearizer final : public Sink<Chunk>, public Source<Slice> {
public:
static ReffedLinearizer* Make(uint64_t max_buffer, TraceSink trace_sink) {
return new ReffedLinearizer(max_buffer, trace_sink);
}
// Override Sink<Chunk> and Source<Slice>.
// Override Sink<Chunk>.
void Push(Chunk chunk) override { impl_.Push(std::move(chunk)); }
void Close(const Status& status, Callback<void> quiesced) override {
assert(quiesced_.empty());
assert(!quiesced.empty());
quiesced_ = std::move(quiesced);
Close(status);
}
// Override Souce<Slice>.
void Pull(StatusOrCallback<Optional<Slice>> ready) override {
impl_.Pull(std::move(ready));
}
void Close(const Status& status) override {
impl_.Close(status);
if (--refs_ == 0) {
quiesced_();
delete this;
}
}
private:
explicit ReffedLinearizer(uint64_t max_buffer, TraceSink trace_sink)
: impl_(max_buffer, trace_sink) {}
int refs_ = 2;
Linearizer impl_;
Callback<void> quiesced_;
};
} // namespace overnet