blob: 338a7427b1a34b02705a24be63cd82f49798ecae [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#pragma once
#include <queue> // TODO(ctiller): switch to a short queue (inlined 1-2 elems, linked list)
#include "ack_frame.h"
#include "internal_list.h"
#include "linearizer.h"
#include "packet_protocol.h"
#include "receive_mode.h"
#include "reliability_and_ordering.h"
#include "router.h"
#include "seq_num.h"
#include "sink.h"
#include "slice.h"
#include "timer.h"
#include "trace.h"
namespace overnet {
class MessageFragment {
public:
enum class Type : uint8_t { Chunk = 0, MessageAbort = 1, StreamEnd = 2 };
MessageFragment(const MessageFragment&) = delete;
MessageFragment& operator=(const MessageFragment&) = delete;
MessageFragment(uint64_t message, Chunk chunk)
: message_(message), type_(Type::Chunk) {
assert(message > 0);
new (&payload_.chunk) Chunk(std::move(chunk));
}
MessageFragment(MessageFragment&& other)
: message_(other.message_), type_(other.type_) {
switch (type_) {
case Type::Chunk:
new (&payload_.chunk) Chunk(std::move(other.payload_.chunk));
break;
case Type::MessageAbort:
case Type::StreamEnd:
new (&payload_.status) Status(std::move(other.payload_.status));
break;
}
}
MessageFragment& operator=(MessageFragment&& other) {
if (type_ != other.type_) {
this->~MessageFragment();
new (this) MessageFragment(std::forward<MessageFragment>(other));
} else {
switch (type_) {
case Type::Chunk:
payload_.chunk = std::move(other.payload_.chunk);
break;
case Type::MessageAbort:
case Type::StreamEnd:
payload_.status = std::move(other.payload_.status);
break;
}
}
return *this;
}
~MessageFragment() {
switch (type_) {
case Type::Chunk:
payload_.chunk.~Chunk();
break;
case Type::MessageAbort:
case Type::StreamEnd:
payload_.status.~Status();
break;
}
}
static MessageFragment Abort(uint64_t message_id, Status status) {
return MessageFragment(message_id, Type::MessageAbort, std::move(status));
}
static MessageFragment EndOfStream(uint64_t last_message_id, Status status) {
return MessageFragment(last_message_id, Type::StreamEnd, std::move(status));
}
Slice Write(uint64_t desired_prefix) const;
static StatusOr<MessageFragment> Parse(Slice incoming);
Type type() const { return type_; }
uint64_t message() const { return message_; }
const Chunk& chunk() const {
assert(type_ == Type::Chunk);
return payload_.chunk;
}
Chunk* mutable_chunk() {
assert(type_ == Type::Chunk);
return &payload_.chunk;
}
const Status& status() const {
assert(type_ == Type::MessageAbort || type_ == Type::StreamEnd);
return payload_.status;
}
private:
static constexpr uint8_t kFlagEndOfMessage = 0x80;
static constexpr uint8_t kFlagTypeMask = 0x0f;
static constexpr uint8_t kReservedFlags =
static_cast<uint8_t>(~(kFlagTypeMask | kFlagEndOfMessage));
MessageFragment(uint64_t message, Type type, Status status)
: message_(message), type_(type) {
assert(type == Type::MessageAbort || type == Type::StreamEnd);
assert(message != 0);
new (&payload_.status) Status(std::move(status));
}
uint64_t message_;
union Payload {
Payload() {}
~Payload() {}
Chunk chunk;
Status status;
};
Type type_;
Payload payload_;
};
class DatagramStream : private Router::StreamHandler,
private PacketProtocol::PacketSender {
class IncomingMessage {
public:
// TODO(ctiller): 1MB stubbed in for the moment until something better
explicit IncomingMessage(TraceSink trace_sink)
: linearizer_(1024 * 1024,
trace_sink.Decorate([this](const std::string& msg) {
std::ostringstream out;
out << "Msg[" << this << "] " << msg;
return out.str();
})) {}
void Pull(StatusOrCallback<Optional<Slice>>&& done) {
linearizer_.Pull(std::forward<StatusOrCallback<Optional<Slice>>>(done));
}
void PullAll(StatusOrCallback<std::vector<Slice>>&& done) {
linearizer_.PullAll(
std::forward<StatusOrCallback<std::vector<Slice>>>(done));
}
void Push(Chunk&& chunk) { linearizer_.Push(std::forward<Chunk>(chunk)); }
void Close(const Status& status) { linearizer_.Close(status); }
InternalListNode<IncomingMessage> incoming_link;
private:
Linearizer linearizer_;
};
public:
DatagramStream(Router* router, TraceSink sink, NodeId peer,
ReliabilityAndOrdering reliability_and_ordering,
StreamId stream_id);
~DatagramStream();
DatagramStream(const DatagramStream&) = delete;
DatagramStream& operator=(const DatagramStream&) = delete;
DatagramStream(DatagramStream&&) = delete;
DatagramStream& operator=(DatagramStream&&) = delete;
void Close(const Status& status, Callback<void> quiesced);
void Close(Callback<void> quiesced) override {
Close(Status::Ok(), std::move(quiesced));
}
class SendOp final : public Sink<Slice> {
public:
SendOp(DatagramStream* stream, uint64_t payload_length);
void Push(Slice item) override;
void Close(const Status& status, Callback<void> quiesced) override;
void Close(Callback<void> quiesced) {
Close(Status::Ok(), std::move(quiesced));
}
private:
void SetClosed(const Status& status);
void SendChunk(Chunk chunk);
void SendError(const Status& status);
void CompleteReliable(const Status& status, Chunk chunk);
void CompleteUnreliable(const Status& status);
void BeginOp() { ++outstanding_ops_; }
void EndOp() {
if (0 == --outstanding_ops_) {
if (!quiesced_.empty())
quiesced_();
}
}
PacketProtocol::SendCallback MakeAckCallback(const Chunk& chunk);
class OutstandingOp {
public:
OutstandingOp() = delete;
OutstandingOp(SendOp* op) : op_(op) { op_->BeginOp(); }
OutstandingOp(const OutstandingOp& other) : op_(other.op_) {
op_->BeginOp();
}
OutstandingOp& operator=(OutstandingOp other) {
other.Swap(this);
return *this;
}
void Swap(OutstandingOp* other) { std::swap(op_, other->op_); }
~OutstandingOp() { op_->EndOp(); }
SendOp* operator->() const { return op_; }
private:
// TODO(ctiller): perhaps consider packing these into one word.
SendOp* op_;
};
enum class State : uint8_t {
OPEN,
CLOSED_OK,
CLOSED_WITH_ERROR,
};
DatagramStream* const stream_;
const TraceSink trace_sink_;
int outstanding_ops_ = 0;
const uint64_t payload_length_;
const uint64_t message_id_;
const uint8_t message_id_length_;
State state_ = State::OPEN;
uint64_t push_offset_ = 0;
Callback<void> quiesced_;
};
class ReceiveOp final : public Source<Slice> {
friend class DatagramStream;
public:
explicit ReceiveOp(DatagramStream* stream);
void Pull(StatusOrCallback<Optional<Slice>> ready) override;
void PullAll(StatusOrCallback<std::vector<Slice>> ready) override;
void Close(const Status& status) override;
private:
const TraceSink trace_sink_;
IncomingMessage* incoming_message_ = nullptr;
StatusOrCallback<Optional<Slice>> pending_pull_;
StatusOrCallback<std::vector<Slice>> pending_pull_all_;
Optional<Status> pending_close_reason_;
InternalListNode<ReceiveOp> waiting_link_;
};
NodeId peer() const { return peer_; }
private:
void HandleMessage(SeqNum seq, TimeStamp received, Slice data) override final;
void SendPacket(SeqNum seq, LazySlice data,
Callback<void> done) override final;
void SendCloseAndFlushQuiesced(const Status& status, int retry_number);
void FinishClosing();
void MaybeContinueReceive();
Timer* const timer_;
Router* const router_;
TraceSink const trace_sink_;
const NodeId peer_;
const StreamId stream_id_;
const ReliabilityAndOrdering reliability_and_ordering_;
uint64_t next_message_id_ = 1;
uint64_t largest_incoming_message_id_seen_ = 0;
receive_mode::ParameterizedReceiveMode receive_mode_;
PacketProtocol packet_protocol_;
enum class CloseState : uint8_t {
OPEN,
LOCAL_CLOSE_REQUESTED,
REMOTE_CLOSED,
CLOSING_PROTOCOL,
CLOSED,
};
friend inline std::ostream& operator<<(std::ostream& out, CloseState state) {
switch (state) {
case CloseState::OPEN:
return out << "OPEN";
case CloseState::LOCAL_CLOSE_REQUESTED:
return out << "LOCAL_CLOSE_REQUESTED";
case CloseState::REMOTE_CLOSED:
return out << "REMOTE_CLOSED";
case CloseState::CLOSING_PROTOCOL:
return out << "CLOSING_PROTOCOL";
case CloseState::CLOSED:
return out << "CLOSED";
}
return out << "UNKNOWN";
}
CloseState close_state_ = CloseState::OPEN;
struct RequestedClose {
uint64_t last_message_id;
Status status;
bool operator==(const RequestedClose& other) const {
return last_message_id == other.last_message_id &&
status.code() == other.status.code();
}
bool operator!=(const RequestedClose& other) const {
return !operator==(other);
}
};
Optional<RequestedClose> requested_close_;
// TODO(ctiller): a custom allocator here would be worthwhile, especially one
// that could remove allocations for the common case of few entries.
std::unordered_map<uint64_t, IncomingMessage> messages_;
InternalList<IncomingMessage, &IncomingMessage::incoming_link>
unclaimed_messages_;
InternalList<ReceiveOp, &ReceiveOp::waiting_link_> unclaimed_receives_;
std::vector<Callback<void>> on_quiesced_;
};
} // namespace overnet