blob: bca655c8177cfa284e8297327db575021c3951f4 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "packet_link.h"
#include <iostream>
namespace overnet {
static uint64_t GenerateLabel() {
static uint64_t next_label = 1;
return next_label++;
}
PacketLink::PacketLink(Router* router, NodeId peer, uint32_t mss)
: router_(router),
peer_(peer),
label_(GenerateLabel()),
protocol_{router_->timer(), this, mss} {}
void PacketLink::Forward(Message message) {
bool send_immediately = !sending_ && outgoing_.empty();
outgoing_.emplace(std::move(message));
if (send_immediately) BuildAndSendPacket();
}
LinkMetrics PacketLink::GetLinkMetrics() {
LinkMetrics m(router_->node_id(), peer_, metrics_version_++, label_);
m.set_bw_link(protocol_.BottleneckBandwidth());
m.set_rtt(protocol_.RoundTripTime());
return m;
}
void PacketLink::BuildAndSendPacket() {
assert(!sending_ && !outgoing_.empty());
sending_ = true;
protocol_.Send([this](uint64_t desired_prefix, uint64_t max_length) {
while (!outgoing_.empty()) {
Message& msg = outgoing_.front();
if (msg.wire.payload().length() > max_length) {
break;
}
auto serialized = msg.wire.Write(router_->node_id(), peer_);
const auto serialized_length = serialized.length();
const auto length_length = varint::WireSizeFor(serialized_length);
const auto segment_length = length_length + serialized_length;
if (segment_length > max_length) {
break;
}
send_slices_.push_back(serialized.WithPrefix(
length_length, [length_length, serialized_length](uint8_t* p) {
varint::Write(serialized_length, length_length, p);
}));
max_length -= segment_length;
sending_callbacks_.emplace_back(std::move(msg.done));
outgoing_.pop();
}
Slice send = Slice::Join(send_slices_.begin(), send_slices_.end(),
desired_prefix + SeqNum::kMaxWireLength);
send_slices_.clear();
return PacketProtocol::SendData{
send,
[this](const Status& status) {
for (auto& cb : sending_callbacks_) {
cb(status);
}
sending_callbacks_.clear();
sending_ = false;
if (status.is_ok() && !sending_ && !outgoing_.empty()) {
BuildAndSendPacket();
}
},
PacketProtocol::SendCallback::Ignored()};
});
}
void PacketLink::SendPacket(SeqNum seq, Slice data, StatusCallback done) {
Emit(data.WithPrefix(1 + seq.wire_length(), [seq](uint8_t* p) {
*p++ = 0;
seq.Write(p);
}));
done(Status::Ok());
}
void PacketLink::Process(TimeStamp received, Slice packet) {
const uint8_t* const begin = packet.begin();
const uint8_t* p = begin;
const uint8_t* const end = packet.end();
if (p == end) {
std::cerr << "Short packet received (no op code)\n";
return;
}
if (*p != 0) {
std::cerr << "Non-zero op-code received in PacketLink\n";
return;
}
++p;
// Packets without sequence numbers are used to end the three way handshake.
if (p == end) return;
auto seq_status = SeqNum::Parse(&p, end);
if (seq_status.is_error()) {
std::cerr << "Packet seqnum parse failure: " << seq_status.AsStatus()
<< "\n";
return;
}
packet.TrimBegin(p - begin);
// begin, p, end are no longer valid.
auto packet_status =
protocol_.Process(received, *seq_status.get(), std::move(packet));
if (packet_status.is_error()) {
std::cerr << "Packet header parse failure: " << packet_status.AsStatus()
<< "\n";
return;
}
if (*packet_status.get()) {
auto body_status =
ProcessBody(received, std::move(*packet_status.get()->get()));
if (body_status.is_error()) {
std::cerr << "Packet body parse failure: " << body_status << std::endl;
return;
}
}
}
Status PacketLink::ProcessBody(TimeStamp received, Slice packet) {
while (packet.length()) {
const uint8_t* const begin = packet.begin();
const uint8_t* p = begin;
const uint8_t* const end = packet.end();
uint64_t serialized_length;
if (!varint::Read(&p, end, &serialized_length)) {
return Status(StatusCode::INVALID_ARGUMENT,
"Failed to parse segment length");
}
assert(end >= p);
if (static_cast<uint64_t>(end - p) < serialized_length) {
return Status(StatusCode::INVALID_ARGUMENT,
"Message body extends past end of packet");
}
packet.TrimBegin(p - begin);
auto msg_status = RoutableMessage::Parse(
packet.TakeUntilOffset(serialized_length), router_->node_id(), peer_);
if (msg_status.is_error()) return msg_status.AsStatus();
router_->Forward(Message{std::move(*msg_status.get()), received,
StatusCallback::Ignored()});
}
return Status::Ok();
}
} // namespace overnet