blob: c42c1bd8173b9e9096b4973d754950d8cf229267 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/connectivity/overnet/lib/links/packet_link.h"
#include <iostream>
#include <sstream>
namespace overnet {
PacketLink::PacketLink(Router* router, NodeId peer, uint32_t mss,
uint64_t label)
: router_(router),
timer_(router->timer()),
peer_(peer),
label_(label),
protocol_{router_->timer(),
[router] { return (*router->rng())(); },
this,
PacketProtocol::NullCodec(),
mss,
false},
packet_stuffer_(router->node_id(), peer) {}
void PacketLink::Close(Callback<void> quiesced) {
ScopedModule<PacketLink> scoped_module(this);
closed_ = true;
packet_stuffer_.DropPendingMessages();
protocol_.Close(std::move(quiesced));
}
void PacketLink::Forward(Message message) {
ScopedModule<PacketLink> scoped_module(this);
const bool send_immediately =
packet_stuffer_.Forward(std::move(message)) && !sending_;
OVERNET_TRACE(DEBUG) << "Forward sending=" << sending_
<< " imm=" << send_immediately;
if (send_immediately) {
SchedulePacket();
}
}
void PacketLink::Tombstone() {
metrics_version_ = fuchsia::overnet::protocol::METRIC_VERSION_TOMBSTONE;
}
fuchsia::overnet::protocol::LinkStatus PacketLink::GetLinkStatus() {
ScopedModule<PacketLink> scoped_module(this);
if (metrics_version_ ==
fuchsia::overnet::protocol::METRIC_VERSION_TOMBSTONE) {
return fuchsia::overnet::protocol::LinkStatus{router_->node_id().as_fidl(),
peer_.as_fidl(), label_,
metrics_version_};
}
// Advertise MSS as smaller than it is to account for some bugs that exist
// right now.
// TODO(ctiller): eliminate this - we should be precise.
constexpr uint32_t kUnderadvertiseMaximumSendSize = 32;
fuchsia::overnet::protocol::LinkStatus m{router_->node_id().as_fidl(),
peer_.as_fidl(), label_,
metrics_version_++};
m.metrics.set_bw_link(protocol_.bottleneck_bandwidth().bits_per_second());
m.metrics.set_rtt(protocol_.round_trip_time().as_us());
m.metrics.set_mss(
std::max(kUnderadvertiseMaximumSendSize, protocol_.maximum_send_size()) -
kUnderadvertiseMaximumSendSize);
return m;
}
void PacketLink::SchedulePacket() {
assert(!sending_);
assert(packet_stuffer_.HasPendingMessages());
auto r = new LinkSendRequest(this);
OVERNET_TRACE(DEBUG) << "Schedule " << r;
protocol_.Send(PacketProtocol::SendRequestHdl(r));
}
PacketLink::LinkSendRequest::LinkSendRequest(PacketLink* link) : link_(link) {
OVERNET_TRACE(DEBUG) << "LinkSendRequest[" << this << "]: Create";
assert(!link->sending_);
link->sending_ = true;
}
PacketLink::LinkSendRequest::~LinkSendRequest() {
assert(!blocking_sends_);
OVERNET_TRACE(DEBUG) << "LinkSendRequest[" << this << "]: Destroy";
}
Slice PacketLink::LinkSendRequest::GenerateBytes(LazySliceArgs args) {
auto link = link_;
ScopedModule<PacketLink> scoped_module(link_);
ScopedOp scoped_op(op_);
OVERNET_TRACE(DEBUG) << "LinkSendRequest[" << this << "]: GenerateBytes";
assert(blocking_sends_);
assert(link->sending_);
blocking_sends_ = false;
auto pkt = link->packet_stuffer_.BuildPacket(args);
link->sending_ = false;
OVERNET_TRACE(DEBUG) << "LinkSendRequest[" << this << "]: Generated " << pkt;
if (link->packet_stuffer_.HasPendingMessages()) {
link->SchedulePacket();
}
return pkt;
}
void PacketLink::LinkSendRequest::Ack(const Status& status) {
ScopedModule<PacketLink> scoped_module(link_);
ScopedOp scoped_op(op_);
OVERNET_TRACE(DEBUG) << "LinkSendRequest[" << this
<< "]: Ack status=" << status
<< " blocking_sends=" << blocking_sends_;
if (blocking_sends_) {
assert(status.is_error());
assert(link_->sending_);
link_->sending_ = false;
blocking_sends_ = false;
}
delete this;
}
void PacketLink::SendPacket(SeqNum seq, LazySlice data) {
if (send_packet_queue_ != nullptr) {
send_packet_queue_->emplace(std::move(data));
return;
}
PacketProtocol::ProtocolRef protocol_ref(&protocol_);
std::queue<LazySlice> send_packet_queue;
send_packet_queue_ = &send_packet_queue;
for (;;) {
const auto prefix_length = 1 + seq.wire_length();
auto data_slice = data(
LazySliceArgs{Border::Prefix(prefix_length),
protocol_.maximum_send_size() - prefix_length, false});
auto send_slice = data_slice.WithPrefix(prefix_length, [seq](uint8_t* p) {
*p++ = 0;
seq.Write(p);
});
OVERNET_TRACE(DEBUG) << "Emit " << send_slice;
Emit(std::move(send_slice));
if (send_packet_queue.empty()) {
break;
}
data = std::move(send_packet_queue.front());
send_packet_queue.pop();
}
send_packet_queue_ = nullptr;
}
void PacketLink::Process(TimeStamp received, Slice packet) {
if (closed_) {
return;
}
ScopedModule<PacketLink> scoped_module(this);
const uint8_t* const begin = packet.begin();
const uint8_t* p = begin;
const uint8_t* const end = packet.end();
if (p == end) {
OVERNET_TRACE(WARNING) << "Empty packet";
return;
}
if (*p != 0) {
OVERNET_TRACE(WARNING) << "Non-zero op-code received in PacketLink";
return;
}
++p;
// Packets without sequence numbers are used to end the three way handshake.
if (p == end) {
return;
}
auto seq_status = SeqNum::Parse(&p, end);
if (seq_status.is_error()) {
OVERNET_TRACE(WARNING) << "Packet seqnum parse failure: "
<< seq_status.AsStatus();
return;
}
packet.TrimBegin(p - begin);
// begin, p, end are no longer valid.
protocol_.Process(
received, *seq_status.get(), std::move(packet),
[this, received](auto packet_status) {
if (packet_status.is_error()) {
if (packet_status.code() != StatusCode::CANCELLED) {
OVERNET_TRACE(WARNING)
<< "Packet header parse failure: " << packet_status.AsStatus();
}
return;
}
if (auto* msg = *packet_status) {
auto body_status = ProcessBody(received, std::move(msg->payload));
if (body_status.is_error()) {
OVERNET_TRACE(WARNING)
<< "Packet body parse failure: " << body_status;
return;
}
}
});
}
Status PacketLink::ProcessBody(TimeStamp received, Slice packet) {
return packet_stuffer_.ParseAndForwardTo(received, std::move(packet),
router_);
}
} // namespace overnet