blob: 488b2b3c963dbd97e4e442c8ccafd5d659f606f9 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/connectivity/overnet/lib/links/packet_link.h"
#include <iostream>
#include <sstream>
namespace overnet {
PacketLink::PacketLink(Router* router, NodeId peer, uint32_t mss,
uint64_t label)
: router_(router),
timer_(router->timer()),
peer_(peer),
label_(label),
protocol_{router_->timer(), [router] { return (*router->rng())(); }, this,
PacketProtocol::NullCodec(), mss} {}
void PacketLink::Close(Callback<void> quiesced) {
ScopedModule<PacketLink> scoped_module(this);
stashed_.Reset();
while (!outgoing_.empty()) {
outgoing_.pop();
}
protocol_.Close(std::move(quiesced));
}
void PacketLink::Forward(Message message) {
// TODO(ctiller): do some real thinking about what this value should be
constexpr size_t kMaxBufferedMessages = 32;
ScopedModule<PacketLink> scoped_module(this);
if (outgoing_.size() >= kMaxBufferedMessages) {
auto drop = std::move(outgoing_.front());
outgoing_.pop();
}
bool send_immediately = !sending_ && outgoing_.empty();
OVERNET_TRACE(DEBUG) << "Forward sending=" << sending_
<< " outgoing=" << outgoing_.size()
<< " imm=" << send_immediately;
outgoing_.emplace(std::move(message));
if (send_immediately) {
SchedulePacket();
}
}
void PacketLink::Tombstone() {
metrics_version_ = fuchsia::overnet::protocol::METRIC_VERSION_TOMBSTONE;
}
fuchsia::overnet::protocol::LinkStatus PacketLink::GetLinkStatus() {
ScopedModule<PacketLink> scoped_module(this);
if (metrics_version_ ==
fuchsia::overnet::protocol::METRIC_VERSION_TOMBSTONE) {
return fuchsia::overnet::protocol::LinkStatus{router_->node_id().as_fidl(),
peer_.as_fidl(), label_,
metrics_version_};
}
// Advertise MSS as smaller than it is to account for some bugs that exist
// right now.
// TODO(ctiller): eliminate this - we should be precise.
constexpr uint32_t kUnderadvertiseMaximumSendSize = 32;
fuchsia::overnet::protocol::LinkStatus m{router_->node_id().as_fidl(),
peer_.as_fidl(), label_,
metrics_version_++};
m.metrics.set_bw_link(protocol_.bottleneck_bandwidth().bits_per_second());
m.metrics.set_rtt(protocol_.round_trip_time().as_us());
m.metrics.set_mss(
std::max(kUnderadvertiseMaximumSendSize, protocol_.maximum_send_size()) -
kUnderadvertiseMaximumSendSize);
return m;
}
void PacketLink::SchedulePacket() {
assert(!sending_);
assert(!outgoing_.empty() || stashed_.has_value());
auto r = new LinkSendRequest(this);
OVERNET_TRACE(DEBUG) << "Schedule " << r;
protocol_.Send(PacketProtocol::SendRequestHdl(r));
}
PacketLink::LinkSendRequest::LinkSendRequest(PacketLink* link) : link_(link) {
assert(!link->sending_);
link->sending_ = true;
}
PacketLink::LinkSendRequest::~LinkSendRequest() { assert(!blocking_sends_); }
Slice PacketLink::LinkSendRequest::GenerateBytes(LazySliceArgs args) {
auto link = link_;
ScopedModule<PacketLink> scoped_module(link_);
ScopedOp scoped_op(op_);
OVERNET_TRACE(DEBUG) << "LinkSendRequest[" << this << "]: GenerateBytes";
assert(blocking_sends_);
assert(link->sending_);
blocking_sends_ = false;
auto pkt = link->BuildPacket(args);
link->sending_ = false;
OVERNET_TRACE(DEBUG) << "LinkSendRequest[" << this << "]: Generated " << pkt;
if (link->stashed_.has_value() || !link->outgoing_.empty()) {
link->SchedulePacket();
}
return pkt;
}
void PacketLink::LinkSendRequest::Ack(const Status& status) {
ScopedModule<PacketLink> scoped_module(link_);
ScopedOp scoped_op(op_);
OVERNET_TRACE(DEBUG) << "LinkSendRequest[" << this
<< "]: Ack status=" << status
<< " blocking_sends=" << blocking_sends_;
if (blocking_sends_) {
assert(status.is_error());
assert(link_->sending_);
link_->sending_ = false;
blocking_sends_ = false;
}
delete this;
}
Slice PacketLink::BuildPacket(LazySliceArgs args) {
OVERNET_TRACE(DEBUG)
<< "Build outgoing=" << outgoing_.size() << " stashed="
<< (stashed_ ? [&](){
std::ostringstream fmt;
fmt << stashed_->message << "+" << stashed_->payload.length() << "b";
return fmt.str();
}() : "nil");
auto remaining_length = args.max_length;
auto add_serialized_msg = [&remaining_length, this](
const RoutableMessage& wire,
Slice payload) -> bool {
auto serialized = wire.Write(router_->node_id(), peer_, std::move(payload));
const auto serialized_length = serialized.length();
const auto length_length = varint::WireSizeFor(serialized_length);
const auto segment_length = length_length + serialized_length;
OVERNET_TRACE(DEBUG) << "AddMsg segment_length=" << segment_length
<< " remaining_length=" << remaining_length
<< (segment_length > remaining_length ? " => SKIP"
: "")
<< "; serialized:" << serialized;
if (segment_length > remaining_length) {
return false;
}
send_slices_.push_back(serialized.WithPrefix(
length_length, [length_length, serialized_length](uint8_t* p) {
varint::Write(serialized_length, length_length, p);
}));
remaining_length -= segment_length;
return true;
};
static const uint32_t kMinMSS = 64;
if (stashed_.has_value()) {
if (add_serialized_msg(stashed_->message, stashed_->payload)) {
stashed_.Reset();
} else {
if (args.has_other_content) {
// Skip sending any other messages: we'll retry this message
// without an ack momentarily.
remaining_length = 0;
} else {
// There's no chance we'll ever send this message: drop it.
abort();
stashed_.Reset();
OVERNET_TRACE(DEBUG) << "drop stashed";
}
}
}
while (!outgoing_.empty() && remaining_length > kMinMSS) {
// Ensure there's space with the routing header included.
Optional<size_t> max_len_before_prefix =
outgoing_.front().header.MaxPayloadLength(router_->node_id(), peer_,
remaining_length);
if (!max_len_before_prefix.has_value() || *max_len_before_prefix <= 1) {
break;
}
// And ensure there's space with the segment length header.
auto max_len = varint::MaximumLengthWithPrefix(*max_len_before_prefix);
// Pull out the message.
Message msg = std::move(outgoing_.front());
outgoing_.pop();
// Serialize it.
auto payload = msg.make_payload(LazySliceArgs{
Border::None(), std::min(msg.mss, static_cast<uint32_t>(max_len)),
args.has_other_content || !send_slices_.empty()});
if (payload.length() == 0) {
continue;
}
// Add the serialized version to the outgoing queue.
if (!add_serialized_msg(msg.header, payload)) {
// If it fails, stash it, and retry the next loop around.
// This may happen if the sender is unable to trim to the maximum length.
OVERNET_TRACE(DEBUG) << "stash too long";
stashed_.Reset(std::move(msg.header), std::move(payload));
break;
}
}
Slice send =
Slice::Join(send_slices_.begin(), send_slices_.end(),
args.desired_border.WithAddedPrefix(SeqNum::kMaxWireLength));
send_slices_.clear();
return send;
}
void PacketLink::SendPacket(SeqNum seq, LazySlice data) {
if (send_packet_queue_ != nullptr) {
send_packet_queue_->emplace(std::move(data));
return;
}
PacketProtocol::ProtocolRef protocol_ref(&protocol_);
std::queue<LazySlice> send_packet_queue;
send_packet_queue_ = &send_packet_queue;
for (;;) {
const auto prefix_length = 1 + seq.wire_length();
auto data_slice = data(
LazySliceArgs{Border::Prefix(prefix_length),
protocol_.maximum_send_size() - prefix_length, false});
auto send_slice = data_slice.WithPrefix(prefix_length, [seq](uint8_t* p) {
*p++ = 0;
seq.Write(p);
});
OVERNET_TRACE(DEBUG) << "Emit " << send_slice;
stats_.outgoing_packet_count++;
Emit(std::move(send_slice));
if (send_packet_queue.empty()) {
break;
}
data = std::move(send_packet_queue.front());
send_packet_queue.pop();
}
send_packet_queue_ = nullptr;
}
void PacketLink::Process(TimeStamp received, Slice packet) {
stats_.incoming_packet_count++;
ScopedModule<PacketLink> scoped_module(this);
const uint8_t* const begin = packet.begin();
const uint8_t* p = begin;
const uint8_t* const end = packet.end();
if (p == end) {
OVERNET_TRACE(WARNING) << "Empty packet";
return;
}
if (*p != 0) {
OVERNET_TRACE(WARNING) << "Non-zero op-code received in PacketLink";
return;
}
++p;
// Packets without sequence numbers are used to end the three way handshake.
if (p == end)
return;
auto seq_status = SeqNum::Parse(&p, end);
if (seq_status.is_error()) {
OVERNET_TRACE(WARNING) << "Packet seqnum parse failure: "
<< seq_status.AsStatus();
return;
}
packet.TrimBegin(p - begin);
// begin, p, end are no longer valid.
protocol_.Process(
received, *seq_status.get(), std::move(packet),
[this, received](auto packet_status) {
if (packet_status.is_error()) {
if (packet_status.code() != StatusCode::CANCELLED) {
OVERNET_TRACE(WARNING)
<< "Packet header parse failure: " << packet_status.AsStatus();
}
return;
}
if (auto* msg = *packet_status) {
auto body_status = ProcessBody(received, std::move(msg->payload));
if (body_status.is_error()) {
OVERNET_TRACE(WARNING)
<< "Packet body parse failure: " << body_status;
return;
}
}
});
}
Status PacketLink::ProcessBody(TimeStamp received, Slice packet) {
while (packet.length()) {
const uint8_t* const begin = packet.begin();
const uint8_t* p = begin;
const uint8_t* const end = packet.end();
uint64_t serialized_length;
if (!varint::Read(&p, end, &serialized_length)) {
return Status(StatusCode::INVALID_ARGUMENT,
"Failed to parse segment length");
}
assert(end >= p);
if (static_cast<uint64_t>(end - p) < serialized_length) {
return Status(StatusCode::INVALID_ARGUMENT,
"Message body extends past end of packet");
}
packet.TrimBegin(p - begin);
auto msg_status = RoutableMessage::Parse(
packet.TakeUntilOffset(serialized_length), router_->node_id(), peer_);
if (msg_status.is_error()) {
return msg_status.AsStatus();
}
router_->Forward(Message::SimpleForwarder(std::move(msg_status->message),
std::move(msg_status->payload),
received));
}
return Status::Ok();
}
} // namespace overnet