blob: 89ef723ddf37a4f0108ee65505a47ec64c311bc8 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/connectivity/overnet/lib/links/stream_link.h"
namespace overnet {
StreamLink::StreamLink(Router *router, NodeId peer,
std::unique_ptr<StreamFramer> framer, uint64_t label)
: router_(router),
framer_(std::move(framer)),
peer_(peer),
local_id_(label),
packet_stuffer_(router->node_id(), peer) {}
void StreamLink::Forward(Message message) {
if (closed_) {
return;
}
if (packet_stuffer_.Forward(std::move(message)) && !emitting_) {
EmitOne();
}
}
void StreamLink::SetClosed() {
closed_ = true;
packet_stuffer_.DropPendingMessages();
}
void StreamLink::EmitOne() {
static uint64_t num_forwards = 0;
auto n = num_forwards++;
OVERNET_TRACE(TRACE) << "StreamLink::EmitOne[" << n
<< "]: forward with emitting=" << emitting_
<< " closed=" << closed_;
assert(!emitting_);
assert(!closed_);
assert(packet_stuffer_.HasPendingMessages());
auto packet = packet_stuffer_.BuildPacket(
LazySliceArgs{framer_->desired_border, framer_->maximum_segment_size});
OVERNET_TRACE(TRACE) << "StreamLink::EmitOne[" << n << "]: emit " << packet;
emitting_ = true;
stats_.outgoing_packet_count++;
Emit(framer_->Frame(std::move(packet)),
StatusCallback(ALLOCATED_CALLBACK, [this, n](const Status &status) {
if (status.is_error()) {
OVERNET_TRACE(ERROR) << "Write failed: " << status;
SetClosed();
}
emitting_ = false;
OVERNET_TRACE(TRACE) << "StreamLink::EmitOne[" << n << "]: emitted";
if (closed_) {
MaybeQuiesce();
} else if (packet_stuffer_.HasPendingMessages()) {
EmitOne();
}
}));
}
void StreamLink::Process(TimeStamp received, Slice bytes) {
OVERNET_TRACE(TRACE) << "StreamLink.Read: " << bytes;
if (closed_) {
return;
}
framer_->Push(std::move(bytes));
for (;;) {
auto input = framer_->Pop();
if (input.is_error()) {
OVERNET_TRACE(ERROR) << input.AsStatus();
SetClosed();
return;
}
if (!input->has_value()) {
return;
}
stats_.incoming_packet_count++;
if (auto status = packet_stuffer_.ParseAndForwardTo(
received, std::move(**input), router_);
status.is_error()) {
OVERNET_TRACE(ERROR) << input.AsStatus();
SetClosed();
return;
}
}
}
void StreamLink::Close(Callback<void> quiesced) {
OVERNET_TRACE(DEBUG) << "Stream link closed by router";
SetClosed();
on_quiesced_ = std::move(quiesced);
MaybeQuiesce();
}
void StreamLink::MaybeQuiesce() {
if (closed_ && !emitting_ && !on_quiesced_.empty()) {
auto cb = std::move(on_quiesced_);
cb();
}
}
fuchsia::overnet::protocol::LinkStatus StreamLink::GetLinkStatus() {
// Advertise MSS as smaller than it is to account for some bugs that exist
// right now.
// TODO(ctiller): eliminate this - we should be precise.
constexpr size_t kUnderadvertiseMaximumSendSize = 32;
fuchsia::overnet::protocol::LinkMetrics m;
m.set_mss(std::max(kUnderadvertiseMaximumSendSize, maximum_segment_size()) -
kUnderadvertiseMaximumSendSize);
return fuchsia::overnet::protocol::LinkStatus{router_->node_id().as_fidl(),
peer_.as_fidl(), local_id_, 1,
std::move(m)};
}
} // namespace overnet