blob: 05158345cd111c3a2380b4462f0b6c16fd5470af [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/connectivity/overnet/lib/packet_protocol/packet_protocol.h"
#include <iostream>
namespace overnet {
namespace {
class AckSendRequest final : public PacketProtocol::SendRequest {
public:
AckSendRequest() : SendRequest(true) {}
Slice GenerateBytes(LazySliceArgs args) override { return Slice(); }
void Ack(const Status& status) override {}
};
AckSendRequest ack_send_request;
} // namespace
///////////////////////////////////////////////////////////////////////////////
// Initialization/shutdown/control paths
PacketProtocol::PacketProtocol(Timer* timer, RandFunc rand,
PacketSender* packet_sender, const Codec* codec,
uint64_t mss)
: codec_(codec),
timer_(timer),
packet_sender_(packet_sender),
maximum_send_size_(mss) {
state_.Reset(this, std::move(rand));
}
void PacketProtocol::Close(Callback<void> quiesced) {
ScopedModule<PacketProtocol> in_pp(this);
OVERNET_TRACE(DEBUG) << "Close";
InTransaction([&](Transaction* transaction) {
transaction->QuiesceOnCompletion(std::move(quiesced));
});
}
PacketProtocol::AckSender::AckSender(PacketProtocol* protocol)
: protocol_(protocol) {}
PacketProtocol::OutstandingMessages::OutstandingMessages(
PacketProtocol* protocol)
: protocol_(protocol) {}
void PacketProtocol::Quiesce() {
ScopedModule<PacketProtocol> in_pp(this);
OVERNET_TRACE(DEBUG) << "Quiesce";
state_.Reset();
master_ref_.Drop();
}
PacketProtocol::Transaction::Transaction(PacketProtocol* protocol)
: protocol_(protocol) {
assert(protocol_->active_transaction_ == nullptr);
protocol_->active_transaction_ = this;
}
PacketProtocol::Transaction::~Transaction() {
for (;;) {
if (quiesce_) {
assert(protocol_->active_transaction_ == this);
protocol_->active_transaction_ = nullptr;
protocol_->Quiesce();
return;
}
if (schedule_send_queue_ && protocol_->state_.has_value()) {
schedule_send_queue_ = false;
protocol_->state_->send_queue->Schedule();
continue;
}
assert(protocol_->active_transaction_ == this);
protocol_->active_transaction_ = nullptr;
return;
}
}
void PacketProtocol::Transaction::Send(SendRequestHdl hdl) {
if (auto* q = send_queue()) {
schedule_send_queue_ |= q->Add(std::move(hdl));
}
}
void PacketProtocol::Transaction::QuiesceOnCompletion(Callback<void> callback) {
OVERNET_TRACE(DEBUG) << "Schedule Quiesce";
assert(!quiesce_);
assert(protocol_->quiesce_.empty());
protocol_->quiesce_ = std::move(callback);
quiesce_ = true;
}
PacketProtocol::SendQueue* PacketProtocol::Transaction::send_queue() {
if (quiesce_ || !protocol_->state_.has_value()) {
return nullptr;
}
if (!protocol_->state_->send_queue.has_value()) {
protocol_->state_->send_queue.Reset(protocol_);
}
return protocol_->state_->send_queue.get();
}
///////////////////////////////////////////////////////////////////////////////
// PacketProtocol::Send and friends.
// Defines the send path.
void PacketProtocol::Send(SendRequestHdl send_request) {
ScopedModule<PacketProtocol> in_pp(this);
OVERNET_TRACE(DEBUG) << "Send";
InTransaction([&](Transaction* transaction) {
transaction->Send(std::move(send_request));
});
}
bool PacketProtocol::SendQueue::Add(SendRequestHdl hdl) {
ScopedModule<PacketProtocol> in_pp(protocol_);
OVERNET_TRACE(DEBUG) << "SendQueue.Add";
scheduled_tail_loss_probe_.Reset();
requests_.push(std::move(hdl));
return !scheduled_;
}
void PacketProtocol::SendQueue::Schedule() {
ScopedModule<PacketProtocol> in_pp(protocol_);
OVERNET_TRACE(DEBUG) << "SendQueue.Schedule";
assert(!scheduled_);
scheduled_ = true;
assert(!transmit_request_.has_value());
transmit_request_.Reset(
&protocol_->state_->bbr_, [this](const Status& status) {
ScopedModule<PacketProtocol> scoped_module(protocol_);
OVERNET_TRACE(DEBUG) << "SendQueue.Schedule --> " << status
<< " requests=" << requests_.size();
auto transmit_request = transmit_request_.Take();
if (status.is_error()) {
return;
}
SendRequestHdl request;
if (requests_.empty()) {
last_send_was_tail_loss_probe_ = true;
protocol_->state_->outstanding_messages.Send(
std::move(transmit_request), SendRequestHdl(&ack_send_request));
} else {
last_send_was_tail_loss_probe_ = false;
request = std::move(requests_.front());
requests_.pop();
protocol_->state_->outstanding_messages.Send(
std::move(transmit_request), std::move(request));
}
});
}
void PacketProtocol::OutstandingMessages::Send(BBR::TransmitRequest bbr_request,
SendRequestHdl request) {
assert(protocol_->state_);
const uint64_t seq_idx = send_tip_ + outstanding_.size();
OVERNET_TRACE(DEBUG) << "OutstandingMessages.Send seq_idx=" << seq_idx;
SeqNum seq_num(seq_idx, max_outstanding_size_ + 1);
outstanding_.emplace_back(
OutstandingPacket{OutstandingPacketState::PENDING,
protocol_->state_->received_queue.max_seen_sequence(),
Nothing, std::move(request)});
max_outstanding_size_ =
std::max(outstanding_.size(), size_t(max_outstanding_size_));
protocol_->packet_sender_->SendPacket(
seq_num, PacketSend(protocol_, seq_idx, std::move(bbr_request)));
}
PacketProtocol::PacketSend::PacketSend(PacketProtocol* protocol,
uint64_t seq_idx,
BBR::TransmitRequest bbr_request)
: protocol_(protocol),
seq_idx_(seq_idx),
bbr_request_(std::move(bbr_request)) {
assert(protocol);
}
Slice PacketProtocol::PacketSend::operator()(LazySliceArgs args) {
auto protocol = std::move(protocol_);
auto slice =
protocol->InTransaction([=, protocol = protocol.get()](Transaction* t) {
ScopedModule<PacketProtocol> in_pp(protocol);
if (protocol->state_.has_value()) {
return protocol->state_->outstanding_messages.GeneratePacket(
std::move(bbr_request_), seq_idx_, args);
} else {
return Slice();
}
});
if (protocol->state_.has_value()) {
protocol->state_->send_queue->SentMessage();
}
return slice;
}
PacketProtocol::PacketSend::~PacketSend() {
if (protocol_.has_value() && protocol_->state_.has_value()) {
protocol_->state_->outstanding_messages.CancelPacket(seq_idx_);
}
}
void PacketProtocol::OutstandingMessages::CancelPacket(uint64_t seq_idx) {
protocol_->state_->send_queue->SentMessage();
AckProcessor(this, TimeDelta::Zero()).Nack(seq_idx, Status::Cancelled());
}
Slice PacketProtocol::OutstandingMessages::GeneratePacket(
BBR::TransmitRequest bbr_request, uint64_t seq_idx, LazySliceArgs args) {
if (seq_idx < send_tip_) {
// Frame was nacked before sending (probably due to shutdown).
OVERNET_TRACE(DEBUG) << "Seq " << seq_idx << " nacked before sending";
return Slice();
}
auto* outstanding_packet = &outstanding_[seq_idx - send_tip_];
auto send = protocol_->codec_->Encode(
seq_idx, protocol_->FormatPacket(
seq_idx, outstanding_packet->request.borrow(), args));
if (send.is_error()) {
OVERNET_TRACE(ERROR) << "Failed to encode packet: " << send.AsStatus();
}
// outstanding_ should not have changed
assert(outstanding_packet == &outstanding_[seq_idx - send_tip_]);
outstanding_packet->state = OutstandingPacketState::SENT;
outstanding_packet->bbr_sent_packet =
bbr_request.Sent(BBR::OutgoingPacket{seq_idx, send->length()});
ScheduleRetransmit();
return std::move(*send);
}
Slice PacketProtocol::FormatPacket(uint64_t seq_idx, SendRequest* request,
LazySliceArgs args) {
assert(state_.has_value());
const auto max_length = std::min(static_cast<uint64_t>(args.max_length),
static_cast<uint64_t>(maximum_send_size_));
auto make_args = [=](uint64_t prefix_length, bool has_other_content) {
auto maxlen = max_length;
auto subtract_from_max =
std::min(maxlen, prefix_length + codec_->border.Total());
maxlen -= subtract_from_max;
return LazySliceArgs{args.desired_border.WithAddedPrefix(
prefix_length + codec_->border.prefix),
maxlen, has_other_content || args.has_other_content};
};
if (state_->received_queue.CanBuildAck() &&
(request->must_send_ack() || state_->ack_sender.ShouldSendAck())) {
auto ack = state_->received_queue.BuildAck(
seq_idx, timer_->Now(), varint::MaximumLengthWithPrefix(max_length),
&state_->ack_sender);
AckFrame::Writer ack_writer(&ack);
const uint8_t ack_length_length =
varint::WireSizeFor(ack_writer.wire_length());
const uint64_t prefix_length = ack_length_length + ack_writer.wire_length();
return request->GenerateBytes(make_args(prefix_length, true))
.WithPrefix(prefix_length,
[&ack_writer, ack_length_length](uint8_t* p) {
ack_writer.Write(varint::Write(ack_writer.wire_length(),
ack_length_length, p));
});
} else {
return request->GenerateBytes(make_args(1, false))
.WithPrefix(1, [](uint8_t* p) { *p = 0; });
}
}
uint64_t PacketProtocol::ReceivedQueue::max_seen_sequence() const {
if (received_packets_.empty()) {
return received_tip_;
}
size_t idx = received_packets_.size() - 1;
while (idx > 0 && received_packets_[idx].state == ReceiveState::UNKNOWN) {
idx--;
}
return received_tip_ + idx;
}
AckFrame PacketProtocol::ReceivedQueue::BuildAck(uint64_t seq_idx,
TimeStamp now,
uint32_t max_length,
AckSender* ack_sender) {
OVERNET_TRACE(DEBUG) << "BuildAck seq:" << seq_idx
<< " from received packets: " << ReceivedPacketSummary();
auto packet_delay = [this, now](uint64_t seq_idx) -> uint64_t {
const auto& received_packet = received_packets_[seq_idx - received_tip_];
assert(received_packet.state != ReceiveState::UNKNOWN);
return (now - received_packet.when).as_us();
};
const auto max_seen = max_seen_sequence();
assert(max_seen > 0);
AckFrame ack(max_seen, packet_delay(max_seen));
for (uint64_t seq_idx = max_seen; seq_idx > received_tip_; seq_idx--) {
auto& received_packet = received_packets_[seq_idx - received_tip_];
switch (received_packet.state) {
case ReceiveState::UNKNOWN:
OVERNET_TRACE(DEBUG)
<< "Mark unseen packet " << seq_idx << " as NOT_RECEIVED";
received_packet = ReceivedPacket{ReceiveState::NOT_RECEIVED, now};
[[fallthrough]];
case ReceiveState::NOT_RECEIVED:
ack.AddNack(seq_idx);
break;
case ReceiveState::RECEIVED:
break;
}
}
ack.AdjustForMSS(max_length, packet_delay);
ack_sender->AckSent(seq_idx, ack.partial());
OVERNET_TRACE(DEBUG) << "BuildAck generates: " << ack << " bytes="
<< Slice::FromWriters(AckFrame::Writer(&ack));
return ack;
}
void PacketProtocol::SendQueue::SentMessage() {
ScopedModule<PacketProtocol> in_pp(protocol_);
OVERNET_TRACE(DEBUG) << "SendQueue.SentMessage: requests=" << requests_.size()
<< " scheduled_tail_loss_probe="
<< scheduled_tail_loss_probe_.Map(
[](const ScheduledTailLossProbe& tlp) {
return tlp.when;
});
assert(scheduled_);
scheduled_ = false;
protocol_->InTransaction([this](Transaction* t) {
if (!requests_.empty()) {
Schedule();
return;
}
if (!last_send_was_tail_loss_probe_ && !t->Closing()) {
ScheduleTailLossProbe();
return;
}
protocol_->state_->send_queue.Reset();
});
}
void PacketProtocol::SendQueue::ScheduleTailLossProbe() {
const auto when = protocol_->timer_->Now() + protocol_->TailLossProbeDelay();
OVERNET_TRACE(DEBUG) << "SendQueue.ScheduleTailLossProbe: requests="
<< requests_.size() << " scheduled_tail_loss_probe="
<< scheduled_tail_loss_probe_.Map(
[](const ScheduledTailLossProbe& tlp) {
return tlp.when;
})
<< " when=" << when;
if (!requests_.empty()) {
return;
}
if (scheduled_tail_loss_probe_.has_value() &&
scheduled_tail_loss_probe_->when <= when) {
return;
}
scheduled_tail_loss_probe_.Reset(
protocol_->timer_, when, [this](const Status& status) {
ScopedModule<PacketProtocol> in_pp(protocol_);
OVERNET_TRACE(DEBUG) << "SendQueue.ScheduleTailLossProbe --> " << status
<< " requests=" << requests_.size();
if (status.is_error()) {
return;
}
scheduled_tail_loss_probe_.Reset();
ForceSendAck();
});
}
void PacketProtocol::SendQueue::ForceSendAck() {
if (!scheduled_) {
protocol_->InTransaction([](Transaction* t) { t->ScheduleForcedAck(); });
}
}
///////////////////////////////////////////////////////////////////////////////
// PacketProtocol::Process and friends.
// Defines the receive path.
void PacketProtocol::Process(TimeStamp received, SeqNum seq_num, Slice slice,
ProcessCallback handle_message) {
ScopedModule<PacketProtocol> scoped_module(this);
Transaction transaction(this);
OVERNET_TRACE(DEBUG) << "Process: " << slice;
if (!state_.has_value()) {
return;
}
state_->ack_sender.NeedAck(
state_->received_queue.Received(seq_num, received, [&](uint64_t seq_idx) {
return ProcessMessage(seq_idx, std::move(slice), received,
std::move(handle_message));
}));
state_->outstanding_messages.ReceivedPacket();
}
template <class F>
PacketProtocol::AckUrgency PacketProtocol::ReceivedQueue::Received(
SeqNum seq_num, TimeStamp received, F logic) {
const auto seq_idx =
seq_num.Reconstruct(received_tip_ + received_packets_.size());
OVERNET_TRACE(DEBUG) << "Process seq:" << seq_idx;
if (seq_idx < received_tip_) {
return AckUrgency::NOT_REQUIRED;
}
if (!EnsureValidReceivedPacket(seq_idx, received)) {
return AckUrgency::NOT_REQUIRED;
}
auto* received_packet = &received_packets_[seq_idx - received_tip_];
if (received_packet->state != ReceiveState::UNKNOWN) {
OVERNET_TRACE(DEBUG) << "frozen as " << received_packet->state;
return AckUrgency::NOT_REQUIRED;
}
auto pmr = logic(seq_idx);
OVERNET_TRACE(DEBUG) << "Process seq:" << seq_idx
<< " process_message_result=" << pmr << " is_last="
<< (seq_idx ==
received_tip_ + received_packets_.size() - 1)
<< " optional_ack_run_length="
<< optional_ack_run_length_
<< " received_packets=" << ReceivedPacketSummary();
// received_packet may not be valid anymore, but the packet *must* exist
assert(seq_idx >= received_tip_);
received_packet = &received_packets_[seq_idx - received_tip_];
switch (pmr) {
case ProcessMessageResult::NOT_PROCESSED:
// Failed processing packets shouldn't generate traffic.
return AckUrgency::NOT_REQUIRED;
case ProcessMessageResult::NACK:
optional_ack_run_length_ = 0;
*received_packet = ReceivedPacket{ReceiveState::NOT_RECEIVED, received};
// Always send a nack as soon as we realize one is necessary.
return AckUrgency::SEND_IMMEDIATELY;
case ProcessMessageResult::OPTIONAL_ACK:
// If we get a single ack without a payload, we suppress sending a reply
// ack.
optional_ack_run_length_++;
if (optional_ack_run_length_ < 5) {
*received_packet = ReceivedPacket{ReceiveState::RECEIVED, received};
return AckUrgency::NOT_REQUIRED;
}
[[fallthrough]];
case ProcessMessageResult::ACK: {
optional_ack_run_length_ = 0;
*received_packet = ReceivedPacket{ReceiveState::RECEIVED, received};
int num_received = 0;
for (const auto& pkt : received_packets_) {
switch (pkt.state) {
case ReceiveState::RECEIVED:
num_received++;
if (num_received >= 3) {
return AckUrgency::SEND_IMMEDIATELY;
}
break;
default:
break;
}
}
// Got some data, make sure there's an ack scheduled soon.
return AckUrgency::SEND_SOON;
}
case ProcessMessageResult::ACK_URGENTLY: {
optional_ack_run_length_ = 0;
*received_packet = ReceivedPacket{ReceiveState::RECEIVED, received};
return AckUrgency::SEND_IMMEDIATELY;
}
}
}
std::string PacketProtocol::ReceivedQueue::ReceivedPacketSummary() const {
std::ostringstream out;
out << "{";
uint64_t seq_idx = received_tip_;
for (const auto& pkt : received_packets_) {
if (seq_idx != received_tip_) {
out << ", ";
}
out << "[" << seq_idx << "] " << pkt.state;
seq_idx++;
}
out << "}";
return out.str();
}
PacketProtocol::ProcessMessageResult PacketProtocol::ProcessMessage(
uint64_t seq_idx, Slice slice, TimeStamp received,
ProcessCallback handle_message) {
using StatusType = StatusOr<IncomingMessage*>;
// Decode slice
auto slice_status = codec_->Decode(seq_idx, std::move(slice));
if (slice_status.is_error()) {
handle_message(slice_status.AsStatus());
return ProcessMessageResult::NOT_PROCESSED;
}
slice = std::move(*slice_status);
const uint8_t* p = slice.begin();
const uint8_t* end = slice.end();
if (p == end) {
// Empty packet
handle_message(nullptr);
return ProcessMessageResult::OPTIONAL_ACK;
}
uint64_t ack_length;
if (!varint::Read(&p, end, &ack_length)) {
handle_message(StatusType(StatusCode::INVALID_ARGUMENT,
"Failed to parse ack length from message"));
return ProcessMessageResult::NOT_PROCESSED;
}
slice.TrimBegin(p - slice.begin());
OVERNET_TRACE(DEBUG) << "ack_length=" << ack_length;
if (ack_length > slice.length()) {
handle_message(StatusType(StatusCode::INVALID_ARGUMENT,
"Ack frame claimed to be past end of message"));
return ProcessMessageResult::NOT_PROCESSED;
}
Optional<AckFrame> ack;
ProcessMessageResult ack_result = ProcessMessageResult::OPTIONAL_ACK;
if (ack_length > 0) {
auto ack_status = AckFrame::Parse(slice.TakeUntilOffset(ack_length));
if (ack_status.is_error()) {
handle_message(ack_status.AsStatus());
return ProcessMessageResult::NOT_PROCESSED;
}
if (auto ack_valid = state_->outstanding_messages.ValidateAck(*ack_status);
ack_valid.is_error()) {
handle_message(ack_valid);
return ProcessMessageResult::NACK;
}
OVERNET_TRACE(DEBUG) << "Process seq:" << seq_idx
<< " got-ack:" << *ack_status;
ack = std::move(*ack_status);
if (ack->partial()) {
ack_result = ProcessMessageResult::ACK_URGENTLY;
}
}
if (slice.length() > 0) {
IncomingMessage msg(std::move(slice));
// Process the message body:
handle_message(&msg);
if (msg.was_nacked()) {
// Note: ack not processed
return ProcessMessageResult::NACK;
} else if (ack_result != ProcessMessageResult::ACK_URGENTLY) {
ack_result = ProcessMessageResult::ACK;
}
} else {
// Handle no message:
handle_message(nullptr);
}
if (ack.has_value()) {
state_->outstanding_messages.ProcessValidAck(ack.Take(), received);
}
return ack_result;
}
Status PacketProtocol::OutstandingMessages::ValidateAck(
const AckFrame& ack) const {
if (ack.ack_to_seq() < send_tip_) {
return Status::Ok();
}
if (ack.ack_to_seq() >= send_tip_ + outstanding_.size()) {
std::ostringstream msg;
msg << "Ack packet past sending sequence: ack_seq=" << ack.ack_to_seq()
<< " max_sent=" << (send_tip_ + outstanding_.size() - 1)
<< " outstanding_window=" << outstanding_.size();
return Status(StatusCode::INVALID_ARGUMENT, msg.str());
}
for (auto nack_seq : ack.nack_seqs()) {
if (nack_seq < send_tip_) {
continue;
}
const OutstandingPacket& pkt = outstanding_[nack_seq - send_tip_];
if (pkt.state == OutstandingPacketState::ACKED) {
// Previously acked packet becomes nacked: this is an error.
return Status(StatusCode::INVALID_ARGUMENT,
"Previously acked packet becomes nacked");
}
}
for (size_t i = 0; i < ack.ack_to_seq() - send_tip_; i++) {
if (!outstanding_[i].bbr_sent_packet.has_value()) {
return Status(StatusCode::INVALID_ARGUMENT, "Ack/nack unsent sequence");
}
}
return Status::Ok();
}
void PacketProtocol::OutstandingMessages::ProcessValidAck(AckFrame ack,
TimeStamp received) {
// Basic validation. Can assert anything that should be an error because
// ValidateAck should have been called prior.
if (ack.ack_to_seq() < send_tip_) {
return;
}
assert(ack.ack_to_seq() < send_tip_ + outstanding_.size());
// Move receive window forward.
const auto max_seen_sequence_at_send =
outstanding_[ack.ack_to_seq() - send_tip_].max_seen_sequence_at_send;
if (max_seen_sequence_at_send > 0) {
protocol_->state_->received_queue.SetTip(max_seen_sequence_at_send,
received);
}
AckProcessor ack_processor(this,
TimeDelta::FromMicroseconds(ack.ack_delay_us()));
// Fail any nacked packets.
// Iteration is from oldest packet to newest, such that the OLDEST nacked
// message is the most likely to be sent first. This has the important
// consequence that if the packet was a fragment of a large message that was
// rejected due to buffering, the earlier pieces (that are more likely to fit)
// are retransmitted first.
for (auto nack_seq : ack.nack_seqs()) {
ack_processor.Nack(nack_seq, Status::Unavailable());
}
// Clear out outstanding packet references, propagating acks.
for (size_t i = send_tip_; i <= ack.ack_to_seq(); i++) {
OutstandingPacket& pkt = outstanding_[i - send_tip_];
if (!pkt.request.empty()) {
ack_processor.Ack(i);
}
}
}
void PacketProtocol::OutstandingMessages::AckProcessor::Ack(uint64_t ack_seq) {
OutstandingPacket& pkt =
outstanding_->outstanding_[ack_seq - outstanding_->send_tip_];
auto request = std::move(pkt.request);
if (!request.empty()) {
assert(pkt.state == OutstandingPacketState::SENT);
pkt.state = OutstandingPacketState::ACKED;
bbr_ack_.acked_packets.push_back(*pkt.bbr_sent_packet);
request.Ack(Status::Ok());
}
}
void PacketProtocol::OutstandingMessages::AckProcessor::Nack(
uint64_t nack_seq, const Status& status) {
OVERNET_TRACE(DEBUG) << "AckProcessor.Nack: seq=" << nack_seq
<< " status=" << status
<< " send_tip=" << outstanding_->send_tip_;
assert(status.is_error());
if (outstanding_->protocol_->state_.has_value()) {
outstanding_->protocol_->state_->ack_sender.OnNack(nack_seq);
}
if (nack_seq < outstanding_->send_tip_) {
return;
}
OutstandingPacket& pkt =
outstanding_->outstanding_[nack_seq - outstanding_->send_tip_];
auto request = std::move(pkt.request);
OVERNET_TRACE(DEBUG) << "AckProcessor.Nack: seq=" << nack_seq
<< " has_request=" << !request.empty()
<< " state=" << pkt.state;
if (request.empty()) {
return;
}
switch (pkt.state) {
case OutstandingPacketState::PENDING:
pkt.state = OutstandingPacketState::NACKED;
break;
case OutstandingPacketState::SENT:
assert(pkt.bbr_sent_packet.has_value());
assert(pkt.bbr_sent_packet->outgoing.sequence == nack_seq);
bbr_ack_.nacked_packets.push_back(*pkt.bbr_sent_packet);
pkt.state = OutstandingPacketState::NACKED;
break;
case OutstandingPacketState::NACKED:
break;
default:
// Previously acked packet becomes nacked: this is an error.
abort();
}
request.Ack(status);
}
PacketProtocol::OutstandingMessages::AckProcessor::~AckProcessor() {
bool empty = true;
// Offset send_time to account for queuing delay on peer.
for (auto& pkt : bbr_ack_.acked_packets) {
empty = false;
pkt.send_time = pkt.send_time + queue_delay_;
}
for (auto& pkt : bbr_ack_.nacked_packets) {
empty = false;
pkt.send_time = pkt.send_time + queue_delay_;
}
if (!empty && outstanding_->protocol_->state_.has_value()) {
outstanding_->protocol_->state_->bbr_.OnAck(bbr_ack_);
}
while (!outstanding_->outstanding_.empty() &&
outstanding_->outstanding_.front().request.empty()) {
outstanding_->send_tip_++;
outstanding_->outstanding_.pop_front();
}
}
void PacketProtocol::ReceivedQueue::SetTip(uint64_t seq_idx,
TimeStamp received) {
assert(seq_idx >= 1);
uint64_t tip_idx = seq_idx - 1;
if (tip_idx <= received_tip_) {
return;
}
if (!EnsureValidReceivedPacket(tip_idx, received)) {
abort();
}
received_packets_.erase(
received_packets_.begin(),
received_packets_.begin() + (tip_idx - received_tip_));
received_tip_ = tip_idx;
}
///////////////////////////////////////////////////////////////////////////////
// Ack scheduling
std::string PacketProtocol::AckSender::SentFullAcksString() const {
std::ostringstream out;
bool first = true;
for (uint64_t ack : sent_full_acks_) {
if (!first) {
out << ",";
}
first = false;
out << ack;
}
return "{" + out.str() + "}";
}
void PacketProtocol::AckSender::NeedAck(AckUrgency urgency) {
if (urgency <= urgency_) {
return;
}
OVERNET_TRACE(DEBUG) << "AckSender.NeedAck"
<< " urgency=" << urgency
<< " all_acks_acknowledged=" << all_acks_acknowledged_
<< " sent_full_acks=" << SentFullAcksString();
urgency_ = urgency;
sent_full_acks_.clear();
all_acks_acknowledged_ = false;
assert(protocol_->state_.has_value());
if (!protocol_->state_->send_queue.has_value()) {
protocol_->state_->send_queue.Reset(protocol_);
}
switch (urgency_) {
case AckUrgency::NOT_REQUIRED:
abort();
case AckUrgency::SEND_SOON:
protocol_->state_->send_queue->ScheduleTailLossProbe();
break;
case AckUrgency::SEND_IMMEDIATELY:
protocol_->state_->send_queue->ForceSendAck();
break;
}
}
void PacketProtocol::AckSender::AckSent(uint64_t seq_idx, bool partial) {
OVERNET_TRACE(DEBUG) << "AckSender.AckSent seq_idx=" << seq_idx
<< " partial=" << partial
<< " all_acks_acknowledged=" << all_acks_acknowledged_
<< " sent_full_acks=" << SentFullAcksString();
if (!sent_full_acks_.empty()) {
assert(seq_idx > sent_full_acks_.back());
}
urgency_ = AckUrgency::NOT_REQUIRED;
if (!partial) {
sent_full_acks_.push_back(seq_idx);
} else if (sent_full_acks_.empty()) {
NeedAck(AckUrgency::SEND_SOON);
}
}
void PacketProtocol::AckSender::OnNack(uint64_t seq) {
OVERNET_TRACE(DEBUG) << "AckSender.OnNack"
<< " sent_full_acks=" << SentFullAcksString()
<< " seq=" << seq
<< " all_acks_acknowledged=" << all_acks_acknowledged_;
auto it =
std::lower_bound(sent_full_acks_.begin(), sent_full_acks_.end(), seq);
if (it == sent_full_acks_.end() || *it != seq) {
return;
}
sent_full_acks_.erase(it);
if (sent_full_acks_.empty()) {
NeedAck(AckUrgency::SEND_SOON);
}
}
void PacketProtocol::AckSender::OnAck(uint64_t seq) {
OVERNET_TRACE(DEBUG) << "AckSender.OnAck"
<< " sent_full_acks=" << SentFullAcksString()
<< " seq=" << seq
<< " all_acks_acknowledged=" << all_acks_acknowledged_;
auto it =
std::lower_bound(sent_full_acks_.begin(), sent_full_acks_.end(), seq);
if (it == sent_full_acks_.end() || *it != seq) {
return;
}
sent_full_acks_.clear();
all_acks_acknowledged_ = true;
}
///////////////////////////////////////////////////////////////////////////////
// Retransmit scheduling
void PacketProtocol::OutstandingMessages::ScheduleRetransmit() {
OVERNET_TRACE(DEBUG) << "OutstandingMessages.ScheduleRetransmit: rto_timer="
<< rto_timer_.has_value()
<< " deadline=" << RetransmitDeadline();
if (rto_timer_.has_value()) {
return;
}
if (auto timeout = RetransmitDeadline(); timeout.has_value()) {
rto_timer_.Reset(
protocol_->timer_, *timeout,
[protocol = protocol_](const Status& status) {
ScopedModule in_pp(protocol);
protocol->InTransaction([=](Transaction* transaction) {
OVERNET_TRACE(DEBUG)
<< "OutstandingMessages.ScheduleRetransmit --> " << status
<< " protocol_open=" << !transaction->Closing();
if (transaction->Closing()) {
return;
}
if (status.is_error()) {
protocol->state_->outstanding_messages.NackAll();
return;
}
protocol->state_->outstanding_messages.CheckRetransmit();
});
});
}
}
Optional<TimeStamp> PacketProtocol::OutstandingMessages::RetransmitDeadline() {
for (const auto& outstanding : outstanding_) {
if (outstanding.bbr_sent_packet.has_value() &&
outstanding.state == OutstandingPacketState::SENT) {
return outstanding.bbr_sent_packet->send_time +
protocol_->RetransmitDelay();
}
}
return Nothing;
}
void PacketProtocol::OutstandingMessages::CheckRetransmit() {
if (!protocol_->state_.has_value()) {
return;
}
rto_timer_.Reset();
const auto nack_before =
protocol_->timer_->Now() - protocol_->RetransmitDelay();
OVERNET_TRACE(DEBUG) << "OutstandingMessages.CheckRetransmit: nack_before="
<< nack_before
<< " (current_rtt=" << protocol_->CurrentRTT() << ")";
AckProcessor ack_processor(this, TimeDelta::Zero());
for (size_t i = 0; i < outstanding_.size(); i++) {
if (!outstanding_[i].bbr_sent_packet.has_value()) {
OVERNET_TRACE(DEBUG) << "OutstandingMessages.CheckRetransmit: seq "
<< (send_tip_ + i) << " not sent: STOP";
break;
}
if (outstanding_[i].bbr_sent_packet->send_time > nack_before) {
OVERNET_TRACE(DEBUG) << "OutstandingMessages.CheckRetransmit: seq "
<< (send_tip_ + i) << " sent at "
<< outstanding_[i].bbr_sent_packet->send_time
<< ": STOP";
break;
}
OVERNET_TRACE(DEBUG) << "OutstandingMessages.CheckRetransmit: seq "
<< (send_tip_ + i) << " sent at "
<< outstanding_[i].bbr_sent_packet->send_time
<< ": NACK";
ack_processor.Nack(send_tip_ + i, Status::Unavailable());
}
ScheduleRetransmit();
}
void PacketProtocol::OutstandingMessages::NackAll() {
OVERNET_TRACE(DEBUG) << "OutstandingMessages.NackAll";
AckProcessor ack_processor(this, TimeDelta::Zero());
for (uint64_t i = send_tip_, end = send_tip_ + outstanding_.size(); i < end;
i++) {
if (outstanding_[i - send_tip_].request.empty()) {
continue;
}
ack_processor.Nack(i, Status::Cancelled());
}
}
///////////////////////////////////////////////////////////////////////////////
// Utilities
TimeDelta PacketProtocol::CurrentRTT() const {
return std::max(TimeDelta::FromMilliseconds(1), state_->bbr_.rtt());
}
TimeDelta PacketProtocol::RetransmitDelay() const {
constexpr const auto kMinRetransmitDelay = TimeDelta::FromSeconds(1);
constexpr const int kRTTScaling = 4;
auto rtt = CurrentRTT();
if (rtt == TimeDelta::PositiveInf() ||
rtt < kMinRetransmitDelay / kRTTScaling) {
return kMinRetransmitDelay;
}
return kRTTScaling * rtt;
}
TimeDelta PacketProtocol::TailLossProbeDelay() const {
constexpr const auto kMinTailLossProbeDelay = TimeDelta::FromMilliseconds(1);
constexpr const int kRTTScaling = 4;
auto rtt = CurrentRTT();
if (rtt == TimeDelta::PositiveInf() ||
rtt < kRTTScaling * kMinTailLossProbeDelay) {
return kMinTailLossProbeDelay;
}
return rtt / kRTTScaling;
}
PacketProtocol::Codec* PacketProtocol::NullCodec() {
class NullCodec final : public Codec {
public:
NullCodec() : Codec(Border::None()) {}
StatusOr<Slice> Encode(uint64_t seq_idx, Slice src) const override {
return src;
}
StatusOr<Slice> Decode(uint64_t seq_idx, Slice src) const override {
return src;
}
};
static NullCodec null_codec;
return &null_codec;
}
} // namespace overnet