blob: acf36c2056d63d58e55d375d714e1d6184671fe8 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "router_endpoint.h"
#include <iostream>
#include <memory>
#include "garnet/lib/overnet/protocol/fork_frame.h"
namespace overnet {
static const auto kOvernetSystemNamespace =
Slice::FromStaticString("fuchsia.overnet.system.");
static const auto kOvernetGossipService =
Slice::FromStaticString("fuchsia.overnet.system.gossip");
void RouterEndpoint::NewStream::Fail(const Status& status) {
auto* s = new Stream(std::move(*this));
s->Close(status, [s] { delete s; });
}
RouterEndpoint::RouterEndpoint(Timer* timer, NodeId node_id,
bool allow_non_determinism)
: Router(timer, node_id, allow_non_determinism) {
StartGossipTimer();
}
RouterEndpoint::~RouterEndpoint() { assert(connection_streams_.empty()); }
void RouterEndpoint::StartGossipTimer() {
Timer* timer = this->timer();
gossip_timer_.Reset(
timer, timer->Now() + gossip_interval_, [this](const Status& status) {
if (status.is_error())
return;
auto node = SelectGossipPeer();
if (!node) {
gossip_interval_ =
std::min(3 * gossip_interval_ / 2, TimeDelta::FromMinutes(30));
StartGossipTimer();
} else {
gossip_interval_ = InitialGossipInterval();
SendGossipTo(*node, [this] {
if (!closing_) {
StartGossipTimer();
}
});
}
});
}
void RouterEndpoint::SendGossipTo(NodeId target, Callback<void> done) {
// Are we still gossiping?
if (!gossip_timer_.get()) {
return;
}
auto con = connection_streams_.find(target);
if (con == connection_streams_.end()) {
return;
}
auto* stream = con->second.GossipStream();
if (stream == nullptr) {
return;
}
auto slice = WriteGossipUpdate(Border::None(), target);
OVERNET_TRACE(DEBUG) << "SEND_GOSSIP_TO:" << target << " " << slice;
Stream::SendOp(stream, slice.length()).Push(slice, std::move(done));
}
void RouterEndpoint::Close(Callback<void> done) {
closing_ = true;
gossip_timer_.Reset();
if (connection_streams_.empty()) {
Router::Close(std::move(done));
return;
}
auto it = connection_streams_.begin();
OVERNET_TRACE(INFO) << "Closing peer " << it->first;
Callback<void> after_close(
ALLOCATED_CALLBACK, [this, it, done = std::move(done)]() mutable {
OVERNET_TRACE(INFO) << "Closed peer " << it->first;
connection_streams_.erase(it);
Close(std::move(done));
});
it->second.Close(Status::Cancelled(), std::move(after_close));
}
void RouterEndpoint::RegisterPeer(NodeId peer) {
GetOrCreateConnectionStream(peer);
}
RouterEndpoint::ConnectionStream* RouterEndpoint::GetOrCreateConnectionStream(
NodeId peer) {
assert(peer != node_id());
auto it = connection_streams_.find(peer);
if (it != connection_streams_.end()) {
return &it->second;
}
OVERNET_TRACE(DEBUG) << "Creating connection stream for peer " << peer;
auto* stream =
&connection_streams_
.emplace(std::piecewise_construct, std::forward_as_tuple(peer),
std::forward_as_tuple(this, peer))
.first->second;
stream->Register();
return stream;
}
RouterEndpoint::Stream::Stream(NewStream introduction)
: DatagramStream(introduction.creator_, introduction.peer_,
introduction.reliability_and_ordering_,
introduction.stream_id_) {
auto it = introduction.creator_->connection_streams_.find(introduction.peer_);
if (it == introduction.creator_->connection_streams_.end()) {
OVERNET_TRACE(DEBUG) << "Failed to find connection " << introduction.peer_;
Close(Status(StatusCode::FAILED_PRECONDITION,
"Connection closed before stream creation"),
Callback<void>::Ignored());
} else {
connection_stream_ = &it->second;
connection_stream_->forked_streams_.PushBack(this);
}
introduction.creator_ = nullptr;
Register();
}
void RouterEndpoint::Stream::Close(const Status& status,
Callback<void> quiesced) {
if (connection_stream_ != nullptr) {
connection_stream_->forked_streams_.Remove(this);
connection_stream_ = nullptr;
}
DatagramStream::Close(status, std::move(quiesced));
}
RouterEndpoint::ConnectionStream::ConnectionStream(RouterEndpoint* endpoint,
NodeId peer)
: DatagramStream(endpoint, peer, ReliabilityAndOrdering::ReliableUnordered,
StreamId(0)),
endpoint_(endpoint),
next_stream_id_(peer < endpoint->node_id() ? 2 : 1) {
BeginForkRead();
}
RouterEndpoint::ConnectionStream::~ConnectionStream() {
if (fork_read_state_ == ReadState::Reading) {
fork_read_->Close(Status::Cancelled());
}
assert(fork_read_state_ == ReadState::Stopped);
fork_read_.Destroy();
if (gossip_read_state_ == ReadState::Reading) {
gossip_read_->Close(Status::Cancelled());
}
if (gossip_read_state_ != ReadState::Waiting) {
assert(gossip_read_state_ == ReadState::Stopped);
gossip_read_.Destroy();
}
}
void RouterEndpoint::ConnectionStream::BeginGossipRead() {
OVERNET_TRACE(DEBUG) << "BEGIN_GOSSIP_READ";
gossip_read_state_ = ReadState::Reading;
gossip_read_.Init(gossip_stream_.get());
gossip_read_->PullAll(StatusOrCallback<std::vector<Slice>>(
[this](StatusOr<std::vector<Slice>>&& read_status) {
OVERNET_TRACE(DEBUG) << "GOSSIP_READ:" << read_status;
assert(gossip_read_state_ == ReadState::Reading);
if (read_status.is_error()) {
gossip_read_state_ = ReadState::Stopped;
Close(read_status.AsStatus(), Callback<void>::Ignored());
return;
} else if (read_status->size() == 0) {
gossip_read_state_ = ReadState::Stopped;
Close(Status::Ok(), Callback<void>::Ignored());
return;
}
auto apply_status = endpoint_->ApplyGossipUpdate(
Slice::Join(read_status->begin(), read_status->end()), peer());
if (apply_status.is_error()) {
gossip_read_state_ = ReadState::Stopped;
Close(apply_status, Callback<void>::Ignored());
return;
}
gossip_read_.Destroy();
gossip_read_state_ = ReadState::Waiting;
BeginGossipRead();
}));
}
void RouterEndpoint::ConnectionStream::BeginForkRead() {
fork_read_state_ = ReadState::Reading;
fork_read_.Init(this);
fork_read_->PullAll(StatusOrCallback<std::vector<Slice>>(
[this](StatusOr<std::vector<Slice>>&& read_status) {
assert(fork_read_state_ == ReadState::Reading);
if (read_status.is_error()) {
fork_read_state_ = ReadState::Stopped;
Close(read_status.AsStatus(), Callback<void>::Ignored());
return;
} else if (read_status->size() == 0) {
fork_read_state_ = ReadState::Stopped;
Close(Status::Ok(), Callback<void>::Ignored());
return;
}
auto fork_frame_status = ForkFrame::Parse(
Slice::Join(read_status->begin(), read_status->end()));
if (fork_frame_status.is_error()) {
fork_read_state_ = ReadState::Stopped;
Close(fork_frame_status.AsStatus(), Callback<void>::Ignored());
return;
}
const auto& svc =
fork_frame_status->introduction()[Introduction::Key::ServiceName];
if (svc.has_value() && svc->StartsWith(kOvernetSystemNamespace)) {
enum class SystemService {
NO_IDEA,
GOSSIP,
};
auto svc_type = SystemService::NO_IDEA;
if (svc == kOvernetGossipService) {
svc_type = SystemService::GOSSIP;
}
// svc is no longer valid after this line
auto received_intro =
endpoint_->UnwrapForkFrame(peer(), std::move(*fork_frame_status));
switch (svc_type) {
case SystemService::NO_IDEA:
received_intro.new_stream.Fail(
Status(StatusCode::FAILED_PRECONDITION, "Unknown service"));
break;
case SystemService::GOSSIP:
if (IsGossipStreamInitiator()) {
received_intro.new_stream.Fail(
Status(StatusCode::FAILED_PRECONDITION,
"Not gossip stream initiator"));
} else if (gossip_stream_) {
received_intro.new_stream.Fail(
Status(StatusCode::FAILED_PRECONDITION,
"Gossip channel already exists"));
} else {
InstantiateGossipStream(std::move(received_intro.new_stream));
}
break;
}
fork_read_.Destroy();
fork_read_state_ = ReadState::Waiting;
BeginForkRead();
} else {
fork_frame_.Init(std::move(*fork_frame_status));
endpoint_->incoming_forks_.PushBack(this);
fork_read_.Destroy();
fork_read_state_ = ReadState::Waiting;
if (this == endpoint_->incoming_forks_.Front()) {
endpoint_->MaybeContinueIncomingForks();
}
}
}));
}
void RouterEndpoint::SendIntro(NodeId peer,
ReliabilityAndOrdering reliability_and_ordering,
Introduction introduction,
StatusOrCallback<NewStream> new_stream_ready) {
GetOrCreateConnectionStream(peer)->Fork(reliability_and_ordering,
std::move(introduction),
std::move(new_stream_ready));
}
StatusOr<RouterEndpoint::OutgoingFork> RouterEndpoint::Stream::Fork(
ReliabilityAndOrdering reliability_and_ordering,
Introduction introduction) {
if (connection_stream_ == nullptr) {
return StatusOr<OutgoingFork>(StatusCode::FAILED_PRECONDITION,
"Closed stream");
}
return connection_stream_->MakeFork(reliability_and_ordering,
std::move(introduction));
}
void RouterEndpoint::ConnectionStream::Close(const Status& status,
Callback<void> quiesced) {
if (status.is_error()) {
OVERNET_TRACE(ERROR) << "Connection to " << peer()
<< " closed with error: " << status;
}
if (gossip_read_state_ == ReadState::Reading) {
gossip_read_->Close(status);
}
if (gossip_stream_) {
gossip_stream_->Close(status, Callback<void>::Ignored());
}
gossip_stream_.reset();
if (!closing_status_) {
closing_status_.Reset(status);
}
if (forked_streams_.Empty()) {
DatagramStream::Close(status, std::move(quiesced));
} else {
forked_streams_.Front()->Close(
status,
Callback<void>(ALLOCATED_CALLBACK,
[this, status, quiesced{std::move(quiesced)}]() mutable {
this->Close(status, std::move(quiesced));
}));
}
}
RouterEndpoint::Stream* RouterEndpoint::ConnectionStream::GossipStream() {
if (gossip_stream_ == nullptr && !closing_status_.has_value() &&
IsGossipStreamInitiator() && !forking_gossip_stream_) {
forking_gossip_stream_ = true;
Introduction introduction;
introduction[Introduction::Key::ServiceName] = kOvernetGossipService;
Fork(ReliabilityAndOrdering::UnreliableUnordered, std::move(introduction),
[this](StatusOr<NewStream> new_stream) {
assert(forking_gossip_stream_);
forking_gossip_stream_ = false;
if (new_stream.is_error()) {
Close(new_stream.AsStatus().WithContext("Opening gossip stream"),
Callback<void>::Ignored());
} else {
InstantiateGossipStream(std::move(*new_stream));
}
});
}
return gossip_stream_.get();
}
void RouterEndpoint::ConnectionStream::InstantiateGossipStream(NewStream ns) {
assert(gossip_stream_ == nullptr);
gossip_stream_.reset(new Stream(std::move(ns)));
BeginGossipRead();
}
StatusOr<RouterEndpoint::OutgoingFork>
RouterEndpoint::ConnectionStream::MakeFork(
ReliabilityAndOrdering reliability_and_ordering,
Introduction introduction) {
if (closing_status_) {
return *closing_status_;
}
StreamId id(next_stream_id_);
next_stream_id_ += 2;
return OutgoingFork{
NewStream{endpoint_, peer(), reliability_and_ordering, id},
ForkFrame(id, reliability_and_ordering, std::move(introduction))};
}
void RouterEndpoint::ConnectionStream::Fork(
ReliabilityAndOrdering reliability_and_ordering, Introduction introduction,
StatusOrCallback<NewStream> new_stream_ready) {
auto outgoing_fork =
MakeFork(reliability_and_ordering, std::move(introduction));
if (outgoing_fork.is_error()) {
new_stream_ready(outgoing_fork.AsStatus());
return;
}
Slice payload = outgoing_fork->fork_frame.Write(Border::None());
SendOp send_op(this, payload.length());
send_op.Push(payload, Callback<void>::Ignored());
send_op.Push(Slice(),
Callback<void>(
ALLOCATED_CALLBACK,
[new_stream = std::move(outgoing_fork->new_stream),
new_stream_ready = std::move(new_stream_ready)]() mutable {
new_stream_ready(std::move(new_stream));
}));
}
void RouterEndpoint::RecvIntro(StatusOrCallback<ReceivedIntroduction> ready) {
recv_intro_ready_ = std::move(ready);
MaybeContinueIncomingForks();
}
void RouterEndpoint::MaybeContinueIncomingForks() {
if (recv_intro_ready_.empty() || incoming_forks_.Empty())
return;
auto* incoming_fork = incoming_forks_.Front();
incoming_forks_.Remove(incoming_fork);
assert(incoming_fork->fork_read_state_ ==
ConnectionStream::ReadState::Waiting);
recv_intro_ready_(UnwrapForkFrame(
incoming_fork->peer(), std::move(*incoming_fork->fork_frame_.get())));
incoming_fork->fork_frame_.Destroy();
incoming_fork->BeginForkRead();
}
RouterEndpoint::ReceivedIntroduction RouterEndpoint::UnwrapForkFrame(
NodeId peer, ForkFrame fork_frame) {
return ReceivedIntroduction{
NewStream{this, peer, fork_frame.reliability_and_ordering(),
fork_frame.stream_id()},
std::move(fork_frame.introduction())};
}
void RouterEndpoint::OnUnknownStream(NodeId node_id, StreamId stream_id) {
if (stream_id == StreamId(0)) {
GetOrCreateConnectionStream(node_id);
}
}
} // namespace overnet