blob: 75283ab0846b9b106ecf6bba7a41a67473840228 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "garnet/lib/overnet/endpoint/router_endpoint.h"
#include <iostream>
#include <memory>
#include "garnet/lib/overnet/protocol/fidl.h"
namespace overnet {
static const auto kOvernetSystemNamespace =
std::string("fuchsia.overnet.system.");
static const auto kOvernetGossipService =
std::string("fuchsia.overnet.system.gossip");
void RouterEndpoint::NewStream::Fail(const Status& status) {
auto* s = new Stream(std::move(*this));
s->Close(status, [s] { delete s; });
}
RouterEndpoint::RouterEndpoint(Timer* timer, NodeId node_id,
bool allow_non_determinism)
: Router(timer, node_id, allow_non_determinism) {
StartGossipTimer();
}
RouterEndpoint::~RouterEndpoint() { assert(connection_streams_.empty()); }
void RouterEndpoint::StartGossipTimer() {
Timer* timer = this->timer();
gossip_timer_.Reset(
timer, timer->Now() + gossip_interval_, [this](const Status& status) {
if (status.is_error())
return;
auto node = SelectGossipPeer();
if (!node) {
gossip_interval_ =
std::min(3 * gossip_interval_ / 2, TimeDelta::FromMinutes(30));
StartGossipTimer();
} else {
gossip_interval_ = InitialGossipInterval();
SendGossipTo(*node, [this] {
if (!closing_) {
StartGossipTimer();
}
});
}
});
}
void RouterEndpoint::SendGossipTo(NodeId target, Callback<void> done) {
// Are we still gossiping?
if (!gossip_timer_.get()) {
return;
}
auto con = connection_streams_.find(target);
if (con == connection_streams_.end()) {
return;
}
auto* stream = con->second.GossipStream();
if (stream == nullptr) {
return;
}
auto slice = WriteGossipUpdate(Border::None(), target);
OVERNET_TRACE(DEBUG) << "SEND_GOSSIP_TO:" << target << " " << slice;
Stream::SendOp(stream, slice.length()).Push(slice, std::move(done));
}
void RouterEndpoint::Close(Callback<void> done) {
closing_ = true;
gossip_timer_.Reset();
if (connection_streams_.empty()) {
Router::Close(std::move(done));
return;
}
auto it = connection_streams_.begin();
OVERNET_TRACE(INFO) << "Closing peer " << it->first;
Callback<void> after_close(
ALLOCATED_CALLBACK, [this, it, done = std::move(done)]() mutable {
OVERNET_TRACE(INFO) << "Closed peer " << it->first;
connection_streams_.erase(it);
Close(std::move(done));
});
it->second.Close(Status::Cancelled(), std::move(after_close));
}
void RouterEndpoint::RegisterPeer(NodeId peer) {
GetOrCreateConnectionStream(peer);
}
RouterEndpoint::ConnectionStream* RouterEndpoint::GetOrCreateConnectionStream(
NodeId peer) {
assert(peer != node_id());
auto it = connection_streams_.find(peer);
if (it != connection_streams_.end()) {
return &it->second;
}
OVERNET_TRACE(DEBUG) << "Creating connection stream for peer " << peer;
auto* stream =
&connection_streams_
.emplace(std::piecewise_construct, std::forward_as_tuple(peer),
std::forward_as_tuple(this, peer))
.first->second;
stream->Register();
return stream;
}
RouterEndpoint::Stream::Stream(NewStream introduction)
: DatagramStream(introduction.creator_, introduction.peer_,
introduction.reliability_and_ordering_,
introduction.stream_id_) {
auto it = introduction.creator_->connection_streams_.find(introduction.peer_);
if (it == introduction.creator_->connection_streams_.end()) {
OVERNET_TRACE(DEBUG) << "Failed to find connection " << introduction.peer_;
Close(Status(StatusCode::FAILED_PRECONDITION,
"Connection closed before stream creation"),
Callback<void>::Ignored());
} else {
connection_stream_ = &it->second;
connection_stream_->forked_streams_.PushBack(this);
}
introduction.creator_ = nullptr;
Register();
}
void RouterEndpoint::Stream::Close(const Status& status,
Callback<void> quiesced) {
if (connection_stream_ != nullptr) {
connection_stream_->forked_streams_.Remove(this);
connection_stream_ = nullptr;
}
DatagramStream::Close(status, std::move(quiesced));
}
RouterEndpoint::ConnectionStream::ConnectionStream(RouterEndpoint* endpoint,
NodeId peer)
: DatagramStream(
endpoint, peer,
fuchsia::overnet::protocol::ReliabilityAndOrdering::ReliableUnordered,
StreamId(0)),
endpoint_(endpoint),
next_stream_id_(peer < endpoint->node_id() ? 2 : 1) {
BeginForkRead();
}
RouterEndpoint::ConnectionStream::~ConnectionStream() {
if (fork_read_state_ == ReadState::Reading) {
fork_read_->Close(Status::Cancelled());
}
assert(fork_read_state_ == ReadState::Stopped);
fork_read_.Destroy();
if (gossip_read_state_ == ReadState::Reading) {
gossip_read_->Close(Status::Cancelled());
}
if (gossip_read_state_ != ReadState::Waiting) {
assert(gossip_read_state_ == ReadState::Stopped);
gossip_read_.Destroy();
}
}
void RouterEndpoint::ConnectionStream::BeginGossipRead() {
ScopedModule<DatagramStream> dgstream(gossip_stream_.get());
OVERNET_TRACE(DEBUG) << "BEGIN_GOSSIP_READ";
gossip_read_state_ = ReadState::Reading;
gossip_read_.Init(gossip_stream_.get());
gossip_read_->PullAll(StatusOrCallback<Optional<std::vector<Slice>>>(
[this](StatusOr<Optional<std::vector<Slice>>>&& read_status) {
ScopedModule<DatagramStream> dgstream(gossip_stream_.get());
OVERNET_TRACE(DEBUG) << "GOSSIP_READ:" << read_status;
assert(gossip_read_state_ == ReadState::Reading);
if (read_status.is_error()) {
gossip_read_state_ = ReadState::Stopped;
Close(read_status.AsStatus(), Callback<void>::Ignored());
return;
} else if (!read_status->has_value()) {
gossip_read_state_ = ReadState::Stopped;
Close(Status::Ok(), Callback<void>::Ignored());
return;
}
auto apply_status = endpoint_->ApplyGossipUpdate(
Slice::Join((*read_status)->begin(), (*read_status)->end()),
peer());
if (apply_status.is_error()) {
gossip_read_state_ = ReadState::Stopped;
Close(apply_status, Callback<void>::Ignored());
return;
}
gossip_read_.Destroy();
gossip_read_state_ = ReadState::Waiting;
BeginGossipRead();
}));
}
void RouterEndpoint::ConnectionStream::BeginForkRead() {
fork_read_state_ = ReadState::Reading;
fork_read_.Init(this);
fork_read_->PullAll(StatusOrCallback<Optional<std::vector<Slice>>>(
[this](StatusOr<Optional<std::vector<Slice>>>&& read_status) {
assert(fork_read_state_ == ReadState::Reading);
if (read_status.is_error()) {
fork_read_state_ = ReadState::Stopped;
Close(read_status.AsStatus(), Callback<void>::Ignored());
return;
} else if (!read_status->has_value()) {
fork_read_state_ = ReadState::Stopped;
Close(Status::Ok(), Callback<void>::Ignored());
return;
}
auto fork_frame_status = Decode<fuchsia::overnet::protocol::ForkFrame>(
Slice::Join((*read_status)->begin(), (*read_status)->end()));
if (fork_frame_status.is_error()) {
fork_read_state_ = ReadState::Stopped;
Close(fork_frame_status.AsStatus(), Callback<void>::Ignored());
return;
}
if (fork_frame_status->introduction.has_service_name() &&
fork_frame_status->introduction.service_name()->find(
kOvernetSystemNamespace) == 0) {
const auto& svc = fork_frame_status->introduction.service_name();
enum class SystemService {
NO_IDEA,
GOSSIP,
};
auto svc_type = SystemService::NO_IDEA;
if (*svc == kOvernetGossipService) {
svc_type = SystemService::GOSSIP;
}
// fork_frame_status, and therefore svc is no longer valid after this
// line
auto received_intro =
endpoint_->UnwrapForkFrame(peer(), std::move(*fork_frame_status));
if (received_intro.is_error()) {
fork_read_state_ = ReadState::Stopped;
Close(received_intro.AsStatus(), Callback<void>::Ignored());
return;
}
switch (svc_type) {
case SystemService::NO_IDEA:
received_intro->new_stream.Fail(
Status(StatusCode::FAILED_PRECONDITION, "Unknown service"));
break;
case SystemService::GOSSIP:
if (IsGossipStreamInitiator()) {
received_intro->new_stream.Fail(
Status(StatusCode::FAILED_PRECONDITION,
"Not gossip stream initiator"));
} else if (gossip_stream_) {
received_intro->new_stream.Fail(
Status(StatusCode::FAILED_PRECONDITION,
"Gossip channel already exists"));
} else {
InstantiateGossipStream(std::move(received_intro->new_stream));
}
break;
}
fork_read_.Destroy();
fork_read_state_ = ReadState::Waiting;
BeginForkRead();
} else {
fork_frame_.Init(std::move(*fork_frame_status));
endpoint_->incoming_forks_.PushBack(this);
fork_read_.Destroy();
fork_read_state_ = ReadState::Waiting;
if (this == endpoint_->incoming_forks_.Front()) {
endpoint_->MaybeContinueIncomingForks();
}
}
}));
}
void RouterEndpoint::SendIntro(
NodeId peer,
fuchsia::overnet::protocol::ReliabilityAndOrdering reliability_and_ordering,
fuchsia::overnet::protocol::Introduction introduction,
StatusOrCallback<NewStream> new_stream_ready) {
GetOrCreateConnectionStream(peer)->Fork(reliability_and_ordering,
std::move(introduction),
std::move(new_stream_ready));
}
StatusOr<RouterEndpoint::OutgoingFork> RouterEndpoint::Stream::Fork(
fuchsia::overnet::protocol::ReliabilityAndOrdering reliability_and_ordering,
fuchsia::overnet::protocol::Introduction introduction) {
if (connection_stream_ == nullptr) {
return StatusOr<OutgoingFork>(StatusCode::FAILED_PRECONDITION,
"Closed stream");
}
return connection_stream_->MakeFork(reliability_and_ordering,
std::move(introduction));
}
void RouterEndpoint::ConnectionStream::Close(const Status& status,
Callback<void> quiesced) {
if (status.is_error()) {
OVERNET_TRACE(ERROR) << "Connection to " << peer()
<< " closed with error: " << status;
}
if (gossip_read_state_ == ReadState::Reading) {
gossip_read_->Close(status);
}
if (gossip_stream_) {
gossip_stream_->Close(status, Callback<void>::Ignored());
}
gossip_stream_.reset();
if (!closing_status_) {
closing_status_.Reset(status);
}
if (forked_streams_.Empty()) {
DatagramStream::Close(status, std::move(quiesced));
} else {
forked_streams_.Front()->Close(
status,
Callback<void>(ALLOCATED_CALLBACK,
[this, status, quiesced{std::move(quiesced)}]() mutable {
this->Close(status, std::move(quiesced));
}));
}
assert(quiesced.empty());
}
RouterEndpoint::Stream* RouterEndpoint::ConnectionStream::GossipStream() {
if (gossip_stream_ == nullptr && !closing_status_.has_value() &&
IsGossipStreamInitiator() && !forking_gossip_stream_) {
OVERNET_TRACE(DEBUG) << "Initiate gossip stream: ep="
<< endpoint_->node_id() << " peer=" << peer();
forking_gossip_stream_ = true;
fuchsia::overnet::protocol::Introduction introduction;
introduction.set_service_name(kOvernetGossipService);
Fork(fuchsia::overnet::protocol::ReliabilityAndOrdering::
ReliableOrdered /* TODO(ctiller): should be UnreliableUnordered */,
std::move(introduction), [this](StatusOr<NewStream> new_stream) {
OVERNET_TRACE(DEBUG)
<< "Forked gossip stream: ep=" << endpoint_->node_id()
<< " peer=" << peer();
assert(forking_gossip_stream_);
forking_gossip_stream_ = false;
if (new_stream.is_error()) {
Close(new_stream.AsStatus().WithContext("Opening gossip stream"),
Callback<void>::Ignored());
} else {
InstantiateGossipStream(std::move(*new_stream));
}
});
}
return gossip_stream_.get();
}
void RouterEndpoint::ConnectionStream::InstantiateGossipStream(NewStream ns) {
OVERNET_TRACE(DEBUG) << "Instantiate gossip stream: ep="
<< endpoint_->node_id() << " peer=" << peer();
assert(gossip_stream_ == nullptr);
gossip_stream_.reset(new Stream(std::move(ns)));
BeginGossipRead();
}
StatusOr<RouterEndpoint::OutgoingFork>
RouterEndpoint::ConnectionStream::MakeFork(
fuchsia::overnet::protocol::ReliabilityAndOrdering reliability_and_ordering,
fuchsia::overnet::protocol::Introduction introduction) {
if (closing_status_) {
return *closing_status_;
}
StreamId id(next_stream_id_);
next_stream_id_ += 2;
return OutgoingFork{
NewStream{endpoint_, peer(), reliability_and_ordering, id},
fuchsia::overnet::protocol::ForkFrame{
id.as_fidl(), reliability_and_ordering, std::move(introduction)}};
}
void RouterEndpoint::ConnectionStream::Fork(
fuchsia::overnet::protocol::ReliabilityAndOrdering reliability_and_ordering,
fuchsia::overnet::protocol::Introduction introduction,
StatusOrCallback<NewStream> new_stream_ready) {
auto outgoing_fork =
MakeFork(reliability_and_ordering, std::move(introduction));
if (outgoing_fork.is_error()) {
new_stream_ready(outgoing_fork.AsStatus());
return;
}
std::vector<uint8_t> bytes;
auto encoded = Encode(&outgoing_fork->fork_frame);
if (encoded.is_error()) {
new_stream_ready(encoded.AsStatus());
return;
}
Slice payload = std::move(*encoded);
SendOp send_op(this, payload.length());
send_op.Push(payload, Callback<void>::Ignored());
send_op.Push(Slice(),
Callback<void>(
ALLOCATED_CALLBACK,
[new_stream = std::move(outgoing_fork->new_stream),
new_stream_ready = std::move(new_stream_ready)]() mutable {
new_stream_ready(std::move(new_stream));
}));
}
void RouterEndpoint::RecvIntro(StatusOrCallback<ReceivedIntroduction> ready) {
recv_intro_ready_ = std::move(ready);
MaybeContinueIncomingForks();
}
void RouterEndpoint::MaybeContinueIncomingForks() {
if (recv_intro_ready_.empty() || incoming_forks_.Empty())
return;
auto* incoming_fork = incoming_forks_.Front();
incoming_forks_.Remove(incoming_fork);
assert(incoming_fork->fork_read_state_ ==
ConnectionStream::ReadState::Waiting);
recv_intro_ready_(UnwrapForkFrame(
incoming_fork->peer(), std::move(*incoming_fork->fork_frame_.get())));
incoming_fork->fork_frame_.Destroy();
incoming_fork->BeginForkRead();
}
StatusOr<RouterEndpoint::ReceivedIntroduction> RouterEndpoint::UnwrapForkFrame(
NodeId peer, fuchsia::overnet::protocol::ForkFrame fork_frame) {
return ReceivedIntroduction{
NewStream{this, peer, fork_frame.reliability_and_ordering,
StreamId(fork_frame.stream_id)},
std::move(fork_frame.introduction)};
}
void RouterEndpoint::OnUnknownStream(NodeId node_id, StreamId stream_id) {
if (stream_id == StreamId(0)) {
GetOrCreateConnectionStream(node_id);
}
}
} // namespace overnet