blob: 23c859d00b2899a77bd41f8ed9528a09d96236ca [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#pragma once
#include <deque>
#include <map>
#include <queue>
#include "src/connectivity/overnet/lib/environment/timer.h"
#include "src/connectivity/overnet/lib/environment/trace.h"
#include "src/connectivity/overnet/lib/labels/seq_num.h"
#include "src/connectivity/overnet/lib/packet_protocol/bbr.h"
#include "src/connectivity/overnet/lib/protocol/ack_frame.h"
#include "src/connectivity/overnet/lib/protocol/varint.h"
#include "src/connectivity/overnet/lib/vocabulary/callback.h"
#include "src/connectivity/overnet/lib/vocabulary/lazy_slice.h"
#include "src/connectivity/overnet/lib/vocabulary/once_fn.h"
#include "src/connectivity/overnet/lib/vocabulary/optional.h"
#include "src/connectivity/overnet/lib/vocabulary/slice.h"
#include "src/connectivity/overnet/lib/vocabulary/status.h"
// Enable for indepth refcount debugging for packet protocol ops.
//#define OVERNET_TRACE_PACKET_PROTOCOL_OPS
namespace overnet {
class PacketProtocol {
public:
static constexpr inline auto kModule = Module::PACKET_PROTOCOL;
static constexpr size_t kMaxUnackedReceives = 3;
/////////////////////////////////////////////////////////////////////////////
// Collaborating types.
public:
// PacketSender defines how a packet protocol sends it's data.
class PacketSender {
public:
virtual void SendPacket(SeqNum seq, LazySlice send) = 0;
};
// Codec describes the transformation to apply to *payload* bytes that are
// sent.
class Codec {
public:
// How much will this codec expand a message? (maximums only).
const Border border;
virtual StatusOr<Slice> Encode(uint64_t seq_idx, Slice src) const = 0;
virtual StatusOr<Slice> Decode(uint64_t seq_idx, Slice src) const = 0;
protected:
explicit Codec(Border border) : border(border) {}
};
class SendRequestHdl;
// SendRequest is a request to send one message, and an ack function for when
// it's received.
class SendRequest {
public:
SendRequest(bool must_send_ack = false) : must_send_ack_(must_send_ack) {}
// Called at most once, and always before Ack.
virtual Slice GenerateBytes(LazySliceArgs args) = 0;
// Called exactly once.
virtual void Ack(const Status& status) = 0;
bool must_send_ack() const { return must_send_ack_; }
template <class GB, class A>
static SendRequestHdl FromFunctors(GB gb, A a);
private:
const bool must_send_ack_;
};
// A smart pointer around SendRequest. The held request must stay valid until
// Ack() is called.
class SendRequestHdl {
public:
explicit SendRequestHdl(SendRequest* req) : req_(req) {}
SendRequestHdl() : SendRequestHdl(nullptr) {}
SendRequestHdl(const SendRequestHdl&) = delete;
SendRequestHdl& operator=(const SendRequestHdl&) = delete;
SendRequestHdl(SendRequestHdl&& other) : req_(other.req_) {
other.req_ = nullptr;
}
SendRequestHdl& operator=(SendRequestHdl&& other) {
this->~SendRequestHdl();
req_ = other.req_;
other.req_ = nullptr;
return *this;
}
~SendRequestHdl() {
if (req_) {
req_->Ack(Status::Cancelled());
}
}
bool empty() const { return req_ == nullptr; }
Slice GenerateBytes(LazySliceArgs args) {
return req_->GenerateBytes(args);
}
void Ack(const Status& status) {
SendRequest* req = req_;
req_ = nullptr;
req->Ack(status);
}
SendRequest* borrow() { return req_; }
private:
SendRequest* req_;
};
// A smart pointer to prevent a PacketProtocol from quiescing until it's
// either explicitly dropped or destroyed.
class ProtocolRef {
public:
ProtocolRef(PacketProtocol* protocol) : protocol_(protocol) {
protocol_->refs_++;
}
~ProtocolRef() {
if (protocol_) {
Drop();
}
}
void Drop() {
assert(protocol_ != nullptr);
auto protocol = protocol_;
protocol_ = nullptr;
if (0 == --protocol->refs_) {
auto cb = std::move(protocol->quiesce_);
cb();
}
}
ProtocolRef(const ProtocolRef& other) = delete;
ProtocolRef& operator=(const ProtocolRef& other) = delete;
ProtocolRef(ProtocolRef&& other) : protocol_(other.protocol_) {
other.protocol_ = nullptr;
}
ProtocolRef& operator=(ProtocolRef&& other) {
std::swap(protocol_, other.protocol_);
return *this;
}
PacketProtocol* operator->() const { return protocol_; }
PacketProtocol* get() const { return protocol_; }
bool has_value() const { return protocol_ != nullptr; }
private:
PacketProtocol* protocol_;
};
/////////////////////////////////////////////////////////////////////////////
// Internal interfaces.
private:
enum class ProcessMessageResult : uint8_t {
NOT_PROCESSED,
NACK,
OPTIONAL_ACK,
ACK,
ACK_URGENTLY,
};
friend std::ostream& operator<<(std::ostream& out, ProcessMessageResult pmr) {
switch (pmr) {
case ProcessMessageResult::NOT_PROCESSED:
return out << "NOT_PROCESSED";
case ProcessMessageResult::NACK:
return out << "NACK";
case ProcessMessageResult::ACK:
return out << "ACK";
case ProcessMessageResult::ACK_URGENTLY:
return out << "ACK_URGENTLY";
case ProcessMessageResult::OPTIONAL_ACK:
return out << "OPTIONAL_ACK";
}
abort();
}
class PacketSend;
// OutstandingMessages tracks messages that are sent but not yet acknowledged.
class OutstandingMessages {
public:
OutstandingMessages(PacketProtocol* protocol);
~OutstandingMessages() { NackAll(); }
void Send(BBR::TransmitRequest bbr_request, SendRequestHdl request);
Status ValidateAck(const AckFrame& ack) const;
void ProcessValidAck(AckFrame ack, TimeStamp received);
void ReceivedPacket() { ScheduleRetransmit(); }
private:
friend class PacketSend;
Slice GeneratePacket(BBR::TransmitRequest bbr_request, uint64_t seq_idx,
LazySliceArgs args);
void CancelPacket(uint64_t seq_idx);
void FinishedSending();
void ScheduleRetransmit();
Optional<TimeStamp> RetransmitDeadline();
void CheckRetransmit();
void NackAll();
enum class OutstandingPacketState : uint8_t {
PENDING,
SENT,
ACKED,
NACKED,
};
friend std::ostream& operator<<(std::ostream& out,
OutstandingPacketState state) {
switch (state) {
case OutstandingPacketState::PENDING:
return out << "PENDING";
case OutstandingPacketState::SENT:
return out << "SENT";
case OutstandingPacketState::ACKED:
return out << "ACKED";
case OutstandingPacketState::NACKED:
return out << "NACKED";
}
}
struct OutstandingPacket {
OutstandingPacketState state;
uint64_t max_seen_sequence_at_send;
Optional<BBR::SentPacket> bbr_sent_packet;
SendRequestHdl request;
};
// Assists processing a set of acks/nacks
class AckProcessor {
public:
AckProcessor(OutstandingMessages* outstanding, TimeDelta queue_delay)
: outstanding_(outstanding), queue_delay_(queue_delay) {}
~AckProcessor();
void Ack(uint64_t seq);
void Nack(uint64_t seq, const Status& status);
private:
OutstandingMessages* const outstanding_;
const TimeDelta queue_delay_;
BBR::Ack bbr_ack_;
};
PacketProtocol* const protocol_;
uint64_t send_tip_ = 1;
uint64_t max_outstanding_size_ = 1;
uint64_t last_sent_ack_ = 0;
std::deque<OutstandingPacket> outstanding_;
Optional<Timeout> rto_timer_;
};
// SendQueue groups together pending sends in the protocol and manages writing
// them out.
class SendQueue {
public:
SendQueue(PacketProtocol* protocol) : protocol_(protocol) {}
// Schedule this queue to be sent. Should only be used as directed.
void Schedule();
// A message was just sent: possibly schedule the next one.
void SentMessage();
// Ensure a tail loss probe is scheduled: used to force acks to be sent on
// idle connections.
void ScheduleTailLossProbe();
// Force a packet to be sent with an ack.
void ForceSendAck();
// Returns true if send request addition triggers the need for
// Schedule to be called.
[[nodiscard]] bool Add(SendRequestHdl hdl);
private:
PacketProtocol* const protocol_;
bool scheduled_ = false;
bool last_send_was_tail_loss_probe_ = false;
Optional<BBR::TransmitRequest> transmit_request_;
std::queue<SendRequestHdl> requests_;
struct ScheduledTailLossProbe {
template <class F>
ScheduledTailLossProbe(Timer* timer, TimeStamp when, F f)
: when(when), timeout(timer, when, std::move(f)) {}
TimeStamp when;
Timeout timeout;
};
Optional<ScheduledTailLossProbe> scheduled_tail_loss_probe_;
};
enum class AckUrgency { NOT_REQUIRED, SEND_SOON, SEND_IMMEDIATELY };
friend std::ostream& operator<<(std::ostream& out, AckUrgency urgency) {
switch (urgency) {
case AckUrgency::NOT_REQUIRED:
return out << "NOT_REQUIRED";
case AckUrgency::SEND_SOON:
return out << "SEND_SOON";
case AckUrgency::SEND_IMMEDIATELY:
return out << "SEND_IMMEDIATELY";
}
}
class AckSender {
public:
AckSender(PacketProtocol* protocol);
void NeedAck(AckUrgency urgency);
bool ShouldSendAck() const {
return !all_acks_acknowledged_ && sent_full_acks_.empty();
}
void AckSent(uint64_t seq_idx, bool partial);
void OnNack(uint64_t seq_idx);
void OnAck(uint64_t seq_idx);
private:
PacketProtocol* const protocol_;
std::vector<uint64_t> sent_full_acks_;
bool all_acks_acknowledged_ = true;
AckUrgency urgency_ = AckUrgency::NOT_REQUIRED;
std::string SentFullAcksString() const;
};
// ReceivedQueue tracks which packets we've received (or not), and state we
// need to acknowledge them.
class ReceivedQueue {
public:
// Return true if an ack should be sent now.
template <class F>
[[nodiscard]] AckUrgency Received(SeqNum seq_num, TimeStamp received,
F logic);
uint64_t max_seen_sequence() const;
bool CanBuildAck() const { return max_seen_sequence() > 0; }
AckFrame BuildAck(uint64_t seq_idx, TimeStamp now, uint32_t max_length,
AckSender* ack_sender);
void SetTip(uint64_t seq_idx, TimeStamp received);
private:
[[nodiscard]] bool EnsureValidReceivedPacket(uint64_t seq_idx,
TimeStamp received) {
constexpr uint64_t kMaxSkip = 65536;
if (seq_idx > received_tip_ && seq_idx - received_tip_ > kMaxSkip) {
return false;
}
while (received_tip_ + received_packets_.size() <= seq_idx) {
received_packets_.emplace_back(
ReceivedPacket{ReceiveState::UNKNOWN, received});
}
return true;
}
std::string ReceivedPacketSummary() const;
uint64_t received_tip_ = 0;
uint64_t optional_ack_run_length_ = 0;
enum class ReceiveState {
UNKNOWN,
NOT_RECEIVED,
RECEIVED,
};
friend std::ostream& operator<<(std::ostream& out, ReceiveState state) {
switch (state) {
case ReceiveState::UNKNOWN:
return out << "UNKNOWN";
case ReceiveState::NOT_RECEIVED:
return out << "NOT_RECEIVED";
case ReceiveState::RECEIVED:
return out << "RECEIVED";
}
}
// TODO(ctiller): Find a more efficient data structure.
struct ReceivedPacket {
ReceiveState state;
TimeStamp when;
};
std::deque<ReceivedPacket> received_packets_;
};
// A Transaction is created to describe a set of changes to a PacketProtocol.
// One is created in response to every Process() call, and in response to
// sends that are outside of incoming messages.
// Only one Transaction can be active at a time.
// A Transaction can process only one incoming message.
// A Transaction may process any number (including zero) sends.
class Transaction {
public:
Transaction(PacketProtocol* protocol);
~Transaction();
void Send(SendRequestHdl hdl);
void ScheduleForcedAck() { schedule_send_queue_ = true; }
void QuiesceOnCompletion(Callback<void> callback);
bool Closing() const { return quiesce_ || !protocol_->state_.has_value(); }
private:
SendQueue* send_queue();
PacketProtocol* const protocol_;
bool schedule_send_queue_ = false;
bool quiesce_ = false;
};
class PacketSend final {
public:
PacketSend(PacketProtocol* protocol, uint64_t seq_idx,
BBR::TransmitRequest bbr_request);
~PacketSend();
PacketSend(const PacketSend&) = delete;
PacketSend(PacketSend&&) = default;
Slice operator()(LazySliceArgs args);
private:
ProtocolRef protocol_;
uint64_t seq_idx_;
BBR::TransmitRequest bbr_request_;
};
/////////////////////////////////////////////////////////////////////////////
// PacketProtocol interface
public:
using RandFunc = BBR::RandFunc;
PacketProtocol(Timer* timer, RandFunc rand, PacketSender* packet_sender,
const Codec* codec, uint64_t mss);
// Request that a single message be sent.
void Send(SendRequestHdl send_request);
template <class GB, class A>
void Send(GB gb, A a) {
Send(SendRequest::FromFunctors(std::move(gb), std::move(a)));
}
class IncomingMessage {
public:
explicit IncomingMessage(Slice payload) : payload(std::move(payload)) {}
IncomingMessage(const IncomingMessage&) = delete;
IncomingMessage& operator=(const IncomingMessage&) = delete;
Slice payload;
void Nack() { nack_ = true; }
bool was_nacked() const { return nack_; }
private:
bool nack_ = false;
};
using ProcessCallback =
Callback<StatusOr<IncomingMessage*>, 4 * sizeof(void*)>;
void Process(TimeStamp received, SeqNum seq, Slice slice,
ProcessCallback handle_message);
void Close(Callback<void> quiesced);
uint32_t maximum_send_size() const { return maximum_send_size_; }
TimeDelta round_trip_time() const {
return state_.has_value() ? state_->bbr_.rtt() : TimeDelta::PositiveInf();
}
Bandwidth bottleneck_bandwidth() const {
return state_.has_value() ? state_->bbr_.bottleneck_bandwidth()
: Bandwidth::Zero();
}
static Codec* NullCodec();
/////////////////////////////////////////////////////////////////////////////
// Internal methods.
private:
TimeDelta CurrentRTT() const;
TimeDelta RetransmitDelay() const;
TimeDelta TailLossProbeDelay() const;
Slice FormatPacket(uint64_t seq_idx, SendRequest* request,
LazySliceArgs args);
ProcessMessageResult ProcessMessage(uint64_t seq_idx, Slice slice,
TimeStamp received,
ProcessCallback handle_message);
// Run closure f in a transaction (creating one if necessary)
template <class F>
auto InTransaction(F f) {
if (active_transaction_ != nullptr) {
return f(active_transaction_);
} else {
Transaction transaction(this);
return f(&transaction);
}
}
void Quiesce();
/////////////////////////////////////////////////////////////////////////////
// Internal state.
private:
const Codec* const codec_;
Timer* const timer_;
PacketSender* const packet_sender_;
Transaction* active_transaction_ = nullptr;
Callback<void> quiesce_;
const uint32_t maximum_send_size_;
uint32_t refs_ = 0;
ProtocolRef master_ref_{this};
struct OpenState {
OpenState(PacketProtocol* protocol, RandFunc rand)
: ack_sender(protocol),
outstanding_messages(protocol),
bbr_(protocol->timer_, std::move(rand), protocol->maximum_send_size_,
Nothing) {}
AckSender ack_sender;
Optional<SendQueue> send_queue;
ReceivedQueue received_queue;
OutstandingMessages outstanding_messages;
BBR bbr_;
};
Optional<OpenState> state_;
};
template <class GB, class A>
PacketProtocol::SendRequestHdl PacketProtocol::SendRequest::FromFunctors(GB gb,
A a) {
class Send final : public SendRequest {
public:
Send(GB generate_bytes, A ack)
: generate_bytes_(std::move(generate_bytes)), ack_(std::move(ack)) {}
Slice GenerateBytes(LazySliceArgs args) { return generate_bytes_(args); }
void Ack(const Status& status) {
ack_(status);
delete this;
}
private:
GB generate_bytes_;
A ack_;
};
return SendRequestHdl(new Send(std::move(gb), std::move(a)));
}
} // namespace overnet