blob: 5a638b4f87647ded4aba878182c1e9295e628608 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "packet_protocol.h"
#include <iostream>
namespace overnet {
static const char kClosing[] = "Closing";
static const char kMaybeScheduleAck[] = "MaybeScheduleAck";
static const char kMaybeSendAck[] = "MaybeSendAck";
static const char kRequestSendAck[] = "RequestSendAck";
static const char kRequestTransmit[] = "RequestTransmit";
static const char kScheduleRTO[] = "ScheduleRTO";
static const char kStartNext[] = "StartNext";
static const char kTransmitPacket[] = "TransmitPacket";
const char PacketProtocol::kProcessedPacket[] = "ProcessedPacket";
void PacketProtocol::Close(Callback<void> quiesced) {
assert(state_ == State::READY);
OVERNET_TRACE(DEBUG, trace_sink_)
<< "Close outstanding_ops=" << outstanding_ops_;
OutstandingOp<kClosing> op(this);
state_ = State::CLOSING;
quiesced_ = std::move(quiesced);
// Stop waiting for things.
rto_scheduler_.Reset();
ack_scheduler_.Reset();
outgoing_bbr_.CancelRequestTransmit();
NackAll();
decltype(queued_) queued;
queued.swap(queued_);
queued.clear();
}
void PacketProtocol::RequestSendAck() {
// Prevent quiescing during ack generation (otherwise cancelling a scheduled
// ack might cause us to quiesce).
OutstandingOp<kRequestSendAck> op(this);
MaybeForceAck();
}
void PacketProtocol::Send(LazySlice make_payload, SendCallback on_ack) {
OVERNET_TRACE(DEBUG, trace_sink_)
<< "Send state=" << static_cast<int>(state_)
<< " qsize=" << queued_.size() << " outstanding=" << outstanding_.size()
<< " sending=" << sending_.has_value();
if (state_ != State::READY) {
// Discard result, forcing callbacks to be made
return;
}
MaybeSendSlice(QueuedPacket{std::move(make_payload), std::move(on_ack)});
}
void PacketProtocol::MaybeSendSlice(QueuedPacket&& packet) {
if (!queued_.empty() || sending_) {
queued_.emplace_back(std::forward<QueuedPacket>(packet));
return;
}
SendSlice(std::forward<QueuedPacket>(packet));
}
void PacketProtocol::SendSlice(QueuedPacket&& packet) {
sending_.Reset(std::forward<QueuedPacket>(packet));
OVERNET_TRACE(DEBUG, trace_sink_) << "SendSlice send_tip=" << send_tip_
<< " outstanding=" << outstanding_.size();
outgoing_bbr_.RequestTransmit([self = OutstandingOp<kRequestTransmit>(this)](
const Status& status) mutable {
if (status.is_error()) {
self->sending_->on_ack(status);
self->sending_.Reset();
return;
}
self->TransmitPacket();
});
}
void PacketProtocol::TransmitPacket() {
const uint64_t seq_idx = send_tip_ + outstanding_.size();
if (seq_idx - send_tip_ > max_outstanding_size_) {
max_outstanding_size_ = seq_idx - send_tip_;
}
SeqNum seq_num(seq_idx, max_outstanding_size_);
OVERNET_TRACE(DEBUG, trace_sink_)
<< "TransmitPacket seq=" << seq_idx << " -> " << seq_num
<< " (send_tip=" << send_tip_ << ")";
if (outstanding_.empty()) {
KeepAlive();
}
outstanding_.emplace_back(
OutstandingPacket{max_seen_, Nothing, std::move(sending_->on_ack)});
auto send_fn = std::move(sending_->payload_factory);
send_fn.AddMutator([seq_idx, self = OutstandingOp<kTransmitPacket>(this)](
auto payload, LazySliceArgs args) {
OVERNET_TRACE(DEBUG, self->trace_sink_) << "GeneratePacket seq=" << seq_idx;
const auto outstanding_idx = seq_idx - self->send_tip_;
if (outstanding_idx >= self->outstanding_.size())
return Slice();
if (self->outstanding_[outstanding_idx].on_ack.empty())
return Slice();
auto slice = self->GeneratePacket(std::move(payload), args);
assert(!self->outstanding_[outstanding_idx].bbr_sent_packet.has_value());
self->outstanding_[outstanding_idx].bbr_sent_packet =
self->outgoing_bbr_.ScheduleTransmit(
args.delay_until_time,
BBR::OutgoingPacket{seq_idx, slice.length()});
return slice;
});
packet_sender_->SendPacket(seq_num, std::move(send_fn),
[self = OutstandingOp<kStartNext>(this)]() {
self->sending_.Reset();
self->ContinueSending();
});
}
Slice PacketProtocol::GeneratePacket(LazySlice payload, LazySliceArgs args) {
auto ack = GenerateAck();
if (ack) {
AckFrame::Writer ack_writer(ack.get());
const uint8_t ack_length_length =
varint::WireSizeFor(ack_writer.wire_length());
const uint64_t prefix_length = ack_length_length + ack_writer.wire_length();
auto payload_slice = payload(LazySliceArgs{
args.desired_prefix + prefix_length, args.max_length - prefix_length,
true, args.delay_until_time});
return payload_slice.WithPrefix(
prefix_length, [&ack_writer, ack_length_length](uint8_t* p) {
ack_writer.Write(
varint::Write(ack_writer.wire_length(), ack_length_length, p));
});
} else {
auto payload_slice =
payload(LazySliceArgs{args.desired_prefix + 1, args.max_length - 1,
args.has_other_content, args.delay_until_time});
return payload_slice.WithPrefix(1, [](uint8_t* p) { *p = 0; });
}
}
Status PacketProtocol::HandleAck(const AckFrame& ack) {
OVERNET_TRACE(DEBUG, trace_sink_) << "HandleAck: " << ack;
// TODO(ctiller): inline vectors to avoid allocations.
std::vector<SendCallback> acks;
std::vector<SendCallback> nacks;
BBR::Ack bbr_ack;
// Validate ack, and ignore if it's old.
if (ack.ack_to_seq() < send_tip_)
return Status::Ok();
if (ack.ack_to_seq() >= send_tip_ + outstanding_.size()) {
return Status(StatusCode::INVALID_ARGUMENT,
"Ack packet past sending sequence");
}
// Move receive window forward.
auto new_recv_tip = outstanding_[ack.ack_to_seq() - send_tip_].ack_to_seq;
if (new_recv_tip != recv_tip_) {
assert(new_recv_tip > recv_tip_);
while (!received_packets_.empty()) {
auto it = received_packets_.begin();
if (it->first >= new_recv_tip) {
break;
}
received_packets_.erase(it);
}
recv_tip_ = new_recv_tip;
}
// Fail any nacked packets.
for (auto nack_seq : ack.nack_seqs()) {
if (nack_seq < send_tip_) {
continue;
}
if (nack_seq >= send_tip_ + outstanding_.size()) {
return Status(StatusCode::INVALID_ARGUMENT, "Nack past sending sequence");
}
OutstandingPacket& pkt = outstanding_[nack_seq - send_tip_];
auto cb = std::move(pkt.on_ack);
if (!cb.empty()) {
nacks.emplace_back(std::move(cb));
}
if (pkt.bbr_sent_packet.has_value()) {
bbr_ack.nacked_packets.push_back(*pkt.bbr_sent_packet);
}
}
// Clear out outstanding packet references, propagating acks.
while (send_tip_ <= ack.ack_to_seq()) {
OutstandingPacket& pkt = outstanding_.front();
auto cb = std::move(pkt.on_ack);
send_tip_++;
if (!cb.empty()) {
bbr_ack.acked_packets.push_back(*pkt.bbr_sent_packet);
outstanding_.pop_front();
acks.emplace_back(std::move(cb));
} else {
outstanding_.pop_front();
}
}
outgoing_bbr_.OnAck(bbr_ack);
for (auto& cb : nacks) {
cb(Status::Cancelled());
}
for (auto& cb : acks) {
cb(Status::Ok());
}
// Continue sending if we can
ContinueSending();
return Status::Ok();
}
void PacketProtocol::ContinueSending() {
while (!queued_.empty() && !sending_ && state_ == State::READY) {
QueuedPacket p = std::move(queued_.front());
queued_.pop_front();
SendSlice(std::move(p));
}
if (ack_after_sending_) {
ack_after_sending_ = false;
MaybeSendAck();
}
}
PacketProtocol::ProcessedPacket PacketProtocol::Process(TimeStamp received,
SeqNum seq_num,
Slice slice) {
OVERNET_TRACE(DEBUG, trace_sink_) << "Process: " << slice;
using StatusType = StatusOr<Optional<Slice>>;
OutstandingOp<kProcessedPacket> op(this);
// Validate sequence number, ignore if it's old.
const auto seq_idx = seq_num.Reconstruct(recv_tip_);
OVERNET_TRACE(DEBUG, trace_sink_)
<< "Receive sequence " << seq_num << "=" << seq_idx << " recv_tip "
<< recv_tip_ << " max_seen=" << max_seen_;
if (seq_idx < recv_tip_) {
return ProcessedPacket(op, ProcessedPacket::Ack::NONE, Nothing);
}
// Keep track of the biggest valid sequence we've seen.
if (seq_idx > max_seen_) {
OVERNET_TRACE(DEBUG, trace_sink_) << "new max_seen";
max_seen_ = seq_idx;
max_seen_time_ = received;
}
KeepAlive();
const uint8_t* p = slice.begin();
const uint8_t* end = slice.end();
if (p == end) {
return ProcessedPacket(op, ProcessedPacket::Ack::NONE, Nothing);
}
uint64_t ack_length;
if (!varint::Read(&p, end, &ack_length)) {
return ProcessedPacket(
op, ProcessedPacket::Ack::NONE,
StatusType(StatusCode::INVALID_ARGUMENT,
"Failed to parse ack length from message"));
}
slice.TrimBegin(p - slice.begin());
OVERNET_TRACE(DEBUG, trace_sink_) << "ack_length=" << ack_length;
if (ack_length > slice.length()) {
return ProcessedPacket(
op, ProcessedPacket::Ack::NONE,
StatusType(StatusCode::INVALID_ARGUMENT,
"Ack frame claimed to be past end of message"));
}
ProcessedPacket::Ack ack = ProcessedPacket::Ack::NONE;
auto it = received_packets_.lower_bound(seq_idx);
if (it == received_packets_.end() || it->first != seq_idx) {
it = received_packets_.insert(
it, std::make_pair(seq_idx, ReceivedPacket{true, false}));
} else {
OVERNET_TRACE(DEBUG, trace_sink_)
<< "frozen as " << (it->second.received ? "received" : "nack");
return ProcessedPacket(op, ProcessedPacket::Ack::NONE, Nothing);
}
const bool is_pure_ack = ack_length > 0 && ack_length == slice.length();
bool suppress_ack = is_pure_ack;
bool prev_was_also_suppressed = false;
bool prev_was_discontiguous = false;
if (suppress_ack && it != received_packets_.begin()) {
auto prev = std::prev(it);
if (prev->first != it->first - 1) {
suppress_ack = false;
prev_was_discontiguous = true;
} else if (prev->second.suppressed_ack) {
suppress_ack = false;
prev_was_also_suppressed = true;
}
}
if (suppress_ack && it->first != received_packets_.rbegin()->first) {
suppress_ack = false;
}
it->second.suppressed_ack = suppress_ack;
OVERNET_TRACE(DEBUG, trace_sink_)
<< "pure_ack=" << is_pure_ack << " suppress_ack=" << suppress_ack
<< " is_last=" << (it->first == received_packets_.rbegin()->first)
<< " prev_was_also_suppressed=" << prev_was_also_suppressed
<< " prev_was_discontiguous=" << prev_was_discontiguous;
if (suppress_ack) {
OVERNET_TRACE(DEBUG, trace_sink_) << "ack suppressed";
} else {
if (seq_idx >= kMaxUnackedReceives &&
max_acked_ <= seq_idx - kMaxUnackedReceives) {
ack = ProcessedPacket::Ack::FORCE;
} else {
ack = ProcessedPacket::Ack::SCHEDULE;
}
}
if (ack_length == 0) {
return ProcessedPacket(op, ack, slice);
}
return ProcessedPacket(
op, ack,
AckFrame::Parse(slice.TakeUntilOffset(ack_length))
.Then([this](const AckFrame& frame) { return HandleAck(frame); })
.Then([&slice]() -> StatusType { return slice; }));
} // namespace overnet
bool PacketProtocol::AckIsNeeded() const { return max_seen_ > recv_tip_; }
Optional<AckFrame> PacketProtocol::GenerateAck() {
OVERNET_TRACE(DEBUG, trace_sink_)
<< "GenerateAck: max_seen=" << max_seen_ << " recv_tip=" << recv_tip_
<< " n=" << (max_seen_ - recv_tip_);
if (!AckIsNeeded()) {
return Nothing;
}
if (last_ack_send_ + QuarterRTT() > timer_->Now()) {
MaybeScheduleAck();
return Nothing;
}
const auto now = timer_->Now();
last_ack_send_ = now;
assert(max_seen_time_ <= now);
AckFrame ack(max_seen_, (now - max_seen_time_).as_us());
if (max_seen_ >= 1) {
for (uint64_t seq = max_seen_ - 1; seq > recv_tip_; seq--) {
auto it = received_packets_.lower_bound(seq);
if (it == received_packets_.end() || it->first != seq) {
received_packets_.insert(
it, std::make_pair(seq, ReceivedPacket{false, false}));
ack.AddNack(seq);
} else if (!it->second.received) {
ack.AddNack(seq);
}
}
}
OVERNET_TRACE(DEBUG, trace_sink_) << "GenerateAck generates:" << ack;
return std::move(ack);
}
void PacketProtocol::MaybeForceAck() {
if (ack_scheduler_.has_value()) {
ack_scheduler_->Cancel();
ack_scheduler_.Reset();
}
MaybeSendAck();
}
TimeDelta PacketProtocol::QuarterRTT() const {
auto est = outgoing_bbr_.rtt();
if (est == TimeDelta::PositiveInf()) {
est = TimeDelta::FromMilliseconds(100);
}
return est / 4;
}
void PacketProtocol::MaybeScheduleAck() {
if (!ack_scheduler_.has_value()) {
ack_scheduler_.Reset(
timer_, timer_->Now() + QuarterRTT(),
[self = OutstandingOp<kMaybeScheduleAck>(this)](const Status& status) {
if (status.is_error())
return;
self->ack_scheduler_.Reset();
self->MaybeSendAck();
});
}
}
void PacketProtocol::MaybeSendAck() {
OVERNET_TRACE(DEBUG, trace_sink_)
<< "MaybeSendAck: max_seen=" << max_seen_ << " recv_tip=" << recv_tip_
<< " n=" << (max_seen_ - recv_tip_) << " last_ack_send=" << last_ack_send_
<< " 1/4-rtt=" << QuarterRTT() << " now=" << timer_->Now()
<< " sending=" << (sending_ ? "true" : "false");
if (AckIsNeeded()) {
if (sending_) {
ack_after_sending_ = true;
} else if (last_ack_send_ + QuarterRTT() > timer_->Now()) {
MaybeScheduleAck();
} else {
Send([](auto) { return Slice(); },
[self = OutstandingOp<kMaybeSendAck>(this)](const Status& status) {
if (status.is_error() && self->state_ == State::READY) {
self->MaybeScheduleAck();
}
});
}
}
}
void PacketProtocol::KeepAlive() {
last_keepalive_event_ = timer_->Now();
OVERNET_TRACE(DEBUG, trace_sink_) << "KeepAlive " << last_keepalive_event_
<< " rto=" << RetransmissionDeadline();
if (!rto_scheduler_ && state_ == State::READY) {
ScheduleRTO();
}
}
void PacketProtocol::ScheduleRTO() {
assert(state_ == State::READY);
rto_scheduler_.Reset(
timer_, RetransmissionDeadline(),
StatusCallback(
ALLOCATED_CALLBACK, // TODO(ctiller): remove allocated
[self = OutstandingOp<kScheduleRTO>(this)](const Status& status) {
auto now = self->timer_->Now();
OVERNET_TRACE(DEBUG, self->trace_sink_)
<< " RTO check: now=" << now << " status=" << status
<< " rto=" << self->RetransmissionDeadline();
if (status.is_error()) {
if (now.after_epoch() == TimeDelta::PositiveInf()) {
// Shutting down - help by nacking everything.
self->NackAll();
}
return;
}
if (now >= self->RetransmissionDeadline()) {
self->rto_scheduler_.Reset();
self->NackAll();
} else {
self->ScheduleRTO();
}
}));
}
void PacketProtocol::NackAll() {
if (outstanding_.empty()) {
return;
}
auto last_sent = send_tip_ + outstanding_.size() - 1;
AckFrame f(last_sent, 0);
for (uint64_t i = last_sent; i >= send_tip_; i--) {
f.AddNack(i);
}
HandleAck(f);
}
TimeStamp PacketProtocol::RetransmissionDeadline() const {
auto rtt = std::min(outgoing_bbr_.rtt(), TimeDelta::FromSeconds(3));
return last_keepalive_event_ + 4 * rtt;
}
} // namespace overnet