blob: 0748f523b4c9313a81bb0fd480c8dac4efbfefbe [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/connectivity/overnet/lib/endpoint/router_endpoint.h"
#include <iostream>
#include <memory>
#include "garnet/public/lib/fostr/fidl/fuchsia/overnet/protocol/formatting.h"
#include "src/connectivity/overnet/lib/protocol/coding.h"
#include "src/connectivity/overnet/lib/protocol/fidl.h"
namespace overnet {
void RouterEndpoint::NewStream::Fail(const Status& status) {
auto* s = new Stream(std::move(*this));
s->Close(status, [s] { delete s; });
}
RouterEndpoint::RouterEndpoint(Timer* timer, NodeId node_id,
bool allow_non_determinism)
: Router(timer, node_id, allow_non_determinism) {
StartGossipTimer();
}
RouterEndpoint::~RouterEndpoint() { assert(connection_streams_.empty()); }
void RouterEndpoint::StartGossipTimer() {
Timer* timer = this->timer();
gossip_timer_.Reset(
timer, timer->Now() + gossip_interval_, [this](const Status& status) {
if (status.is_error())
return;
auto node = SelectGossipPeer();
if (!node) {
gossip_interval_ =
std::min(3 * gossip_interval_ / 2, TimeDelta::FromMinutes(30));
} else {
gossip_interval_ = InitialGossipInterval();
SendGossipTo(*node);
}
StartGossipTimer();
});
}
void RouterEndpoint::SendGossipTo(NodeId target) {
OVERNET_TRACE(DEBUG) << node_id() << " send gossip to " << target;
// Are we still gossiping?
if (!gossip_timer_.get()) {
return;
}
auto con = connection_streams_.find(target);
if (con == connection_streams_.end()) {
return;
}
SendGossipUpdate(con->second.proxy(), target);
}
void RouterEndpoint::Close(Callback<void> done) {
closing_ = true;
gossip_timer_.Reset();
description_timer_.Reset();
if (connection_streams_.empty()) {
Router::Close(std::move(done));
return;
}
auto it = connection_streams_.begin();
OVERNET_TRACE(DEBUG) << "Closing peer " << it->first;
Callback<void> after_close(
ALLOCATED_CALLBACK, [this, it, done = std::move(done)]() mutable {
OVERNET_TRACE(DEBUG) << "Closed peer " << it->first;
connection_streams_.erase(it);
NewNodeDescriptionTableVersion();
Close(std::move(done));
});
it->second.Close(Status::Cancelled(), std::move(after_close));
}
void RouterEndpoint::RegisterPeer(NodeId peer) {
GetOrCreateConnectionStream(peer);
}
void RouterEndpoint::Bind(Service* service) {
if (services_.emplace(service->fully_qualified_name, service).second) {
UpdatedDescription();
}
}
void RouterEndpoint::Unbind(Service* service) {
auto it = services_.find(service->fully_qualified_name);
if (it != services_.end() && it->second == service) {
services_.erase(it);
UpdatedDescription();
}
}
void RouterEndpoint::UpdatedDescription() {
// Send out a new description to all peers (after a brief delay)
if (description_timer_.has_value() || closing_) {
return;
}
OVERNET_TRACE(DEBUG) << "Schedule send update";
description_timer_.Reset(
timer(), timer()->Now() + TimeDelta::FromMilliseconds(200),
[this](const Status& status) {
if (status.is_error()) {
return;
}
description_timer_.Reset();
auto description = BuildDescription();
for (auto& id_conn_pair : connection_streams_) {
OVERNET_TRACE(DEBUG)
<< node_id() << " send description to " << id_conn_pair.first
<< ": " << BuildDescription();
id_conn_pair.second.proxy()->UpdateNodeDescription(
fidl::Clone(description));
}
});
}
fuchsia::overnet::protocol::PeerDescription RouterEndpoint::BuildDescription()
const {
fuchsia::overnet::protocol::PeerDescription desc;
for (const auto& str_svc_pair : services_) {
desc.mutable_services()->push_back(str_svc_pair.first);
}
return desc;
}
RouterEndpoint::ConnectionStream* RouterEndpoint::GetOrCreateConnectionStream(
NodeId peer) {
assert(peer != node_id());
auto it = connection_streams_.find(peer);
if (it != connection_streams_.end()) {
return &it->second;
}
if (closing_) {
OVERNET_TRACE(DEBUG) << node_id()
<< " skip creating connection stream for peer " << peer
<< " as we're closing";
return nullptr;
}
OVERNET_TRACE(DEBUG) << node_id() << " creating connection stream for peer "
<< peer;
auto* stream =
&connection_streams_
.emplace(std::piecewise_construct, std::forward_as_tuple(peer),
std::forward_as_tuple(this, peer))
.first->second;
stream->Register();
if (!description_timer_.has_value()) {
// Send a description (if there's a description timer then it'll anyway be
// sent soon, so skip)
OVERNET_TRACE(DEBUG) << node_id() << " send description to " << peer << ": "
<< BuildDescription();
stream->proxy()->UpdateNodeDescription(BuildDescription());
}
NewNodeDescriptionTableVersion();
return stream;
}
RouterEndpoint::Stream::Stream(NewStream introduction)
: DatagramStream(introduction.creator_, introduction.peer_,
introduction.reliability_and_ordering_,
introduction.stream_id_) {
auto it = introduction.creator_->connection_streams_.find(introduction.peer_);
if (it == introduction.creator_->connection_streams_.end()) {
OVERNET_TRACE(DEBUG) << "Failed to find connection " << introduction.peer_;
Close(Status(StatusCode::FAILED_PRECONDITION,
"Connection closed before stream creation"),
Callback<void>::Ignored());
} else {
connection_stream_ = &it->second;
connection_stream_->forked_streams_.PushBack(this);
}
introduction.creator_ = nullptr;
Register();
}
void RouterEndpoint::Stream::Close(const Status& status,
Callback<void> quiesced) {
if (connection_stream_ != nullptr) {
connection_stream_->forked_streams_.Remove(this);
connection_stream_ = nullptr;
}
DatagramStream::Close(status, std::move(quiesced));
}
RouterEndpoint::ConnectionStream::ConnectionStream(RouterEndpoint* endpoint,
NodeId peer)
: DatagramStream(
endpoint, peer,
fuchsia::overnet::protocol::ReliabilityAndOrdering::ReliableUnordered,
StreamId(0)),
endpoint_(endpoint),
next_stream_id_(peer < endpoint->node_id() ? 2 : 1),
proxy_(this),
stub_(this) {
BeginReading();
}
RouterEndpoint::ConnectionStream::~ConnectionStream() {}
void RouterEndpoint::ConnectionStream::SendFidl(fidl::Message message) {
auto slice = Encode(Slice::FromContainer(message.bytes()));
if (slice.is_error()) {
OVERNET_TRACE(ERROR) << "Failed to encode connection stream message: "
<< slice.AsStatus();
}
SendOp(this, slice->length())
.Push(std::move(*slice), Callback<void>::Ignored());
}
void RouterEndpoint::ConnectionStream::BeginReading() {
reader_.Reset(this);
reader_->PullAll(StatusOrCallback<Optional<std::vector<Slice>>>(
[this](StatusOr<Optional<std::vector<Slice>>>&& read_status) {
if (read_status.is_error()) {
Close(read_status.AsStatus(), Callback<void>::Ignored());
return;
} else if (!read_status->has_value()) {
Close(Status::Ok(), Callback<void>::Ignored());
return;
}
auto bytes =
Decode(Slice::Join((*read_status)->begin(), (*read_status)->end()));
if (bytes.is_error()) {
Close(bytes.AsStatus(), Callback<void>::Ignored());
return;
}
auto process_with = [&bytes](auto& with) {
std::vector<uint8_t> copy(bytes->begin(), bytes->end());
return with.Process_(fidl::Message(
fidl::BytePart(copy.data(), copy.size(), copy.size()),
fidl::HandlePart(nullptr, 0)));
};
auto status = process_with(proxy_);
if (status == ZX_ERR_NOT_SUPPORTED) {
status = process_with(stub_);
}
if (status != ZX_OK) {
OVERNET_TRACE(ERROR) << "Failed to process message: " << bytes
<< " ; coded: " << *read_status;
}
BeginReading();
}));
}
StatusOr<RouterEndpoint::NewStream> RouterEndpoint::InitiateStream(
NodeId peer,
fuchsia::overnet::protocol::ReliabilityAndOrdering reliability_and_ordering,
const std::string& service_name) {
return GetOrCreateConnectionStream(peer)->Fork(reliability_and_ordering,
service_name);
}
StatusOr<RouterEndpoint::NewStream> RouterEndpoint::Stream::InitiateFork(
fuchsia::overnet::protocol::ReliabilityAndOrdering
reliability_and_ordering) {
if (connection_stream_ == nullptr) {
return StatusOr<NewStream>(StatusCode::FAILED_PRECONDITION,
"Closed stream");
}
return connection_stream_->MakeFork(reliability_and_ordering);
}
void RouterEndpoint::ConnectionStream::Close(const Status& status,
Callback<void> quiesced) {
if (status.is_error()) {
OVERNET_TRACE(ERROR) << "Connection to " << peer()
<< " closed with error: " << status;
}
if (!closing_status_) {
closing_status_.Reset(status);
}
reader_.Reset();
if (forked_streams_.Empty()) {
DatagramStream::Close(status, std::move(quiesced));
} else {
forked_streams_.Front()->Close(
status,
Callback<void>(ALLOCATED_CALLBACK,
[this, status, quiesced{std::move(quiesced)}]() mutable {
this->Close(status, std::move(quiesced));
}));
}
assert(quiesced.empty());
}
StatusOr<RouterEndpoint::NewStream> RouterEndpoint::ConnectionStream::MakeFork(
fuchsia::overnet::protocol::ReliabilityAndOrdering
reliability_and_ordering) {
if (closing_status_) {
return *closing_status_;
}
StreamId id(next_stream_id_);
next_stream_id_ += 2;
return MakeFork(id, reliability_and_ordering);
}
StatusOr<RouterEndpoint::NewStream> RouterEndpoint::ConnectionStream::MakeFork(
StreamId id, fuchsia::overnet::protocol::ReliabilityAndOrdering
reliability_and_ordering) {
return NewStream{endpoint_, peer(), reliability_and_ordering, id};
}
StatusOr<RouterEndpoint::NewStream> RouterEndpoint::ConnectionStream::Fork(
fuchsia::overnet::protocol::ReliabilityAndOrdering reliability_and_ordering,
std::string service_name) {
auto outgoing_fork = MakeFork(reliability_and_ordering);
if (outgoing_fork.is_error()) {
return outgoing_fork.AsStatus();
}
proxy_.ConnectToService(std::move(service_name),
outgoing_fork->stream_id().as_fidl());
return outgoing_fork;
}
StatusOr<RouterEndpoint::NewStream> RouterEndpoint::Stream::ReceiveFork(
fuchsia::overnet::protocol::StreamId stream_id,
fuchsia::overnet::protocol::ReliabilityAndOrdering
reliability_and_ordering) {
if (connection_stream_ == nullptr) {
return StatusOr<NewStream>(StatusCode::FAILED_PRECONDITION,
"Closed stream");
}
return connection_stream_->MakeFork(stream_id, reliability_and_ordering);
}
void RouterEndpoint::OnUnknownStream(NodeId node_id, StreamId stream_id) {
if (stream_id == StreamId(0)) {
GetOrCreateConnectionStream(node_id);
}
}
void RouterEndpoint::ConnectionStream::Stub::ConnectToService(
std::string service_name, fuchsia::overnet::protocol::StreamId stream_id) {
auto new_stream = connection_stream_->MakeFork(
stream_id,
fuchsia::overnet::protocol::ReliabilityAndOrdering::ReliableOrdered);
if (new_stream.is_error()) {
OVERNET_TRACE(ERROR) << "Failed to process ConnectToService NewStream: "
<< new_stream.AsStatus();
return;
}
if (auto it = connection_stream_->endpoint_->services_.find(service_name);
it != connection_stream_->endpoint_->services_.end()) {
it->second->AcceptStream(std::move(*new_stream));
} else {
new_stream->Fail(
Status(StatusCode::INVALID_ARGUMENT, "Service not supported"));
}
}
void RouterEndpoint::ConnectionStream::Stub::Ping(PingCallback callback) {
#ifdef __Fuchsia__
zx_time_t now = 0;
zx_clock_get_new(ZX_CLOCK_UTC, &now);
callback(now);
#else
timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
callback(uint64_t(ts.tv_sec) * 1000000000 + ts.tv_nsec);
#endif
}
void RouterEndpoint::ConnectionStream::Stub::UpdateNodeStatus(
fuchsia::overnet::protocol::NodeStatus status) {
auto* const endpoint = connection_stream_->endpoint_;
OVERNET_TRACE(DEBUG) << "Got: UpdateNodeStatus " << status;
if (status.id == endpoint->node_id()) {
connection_stream_->Close(Status(StatusCode::INVALID_ARGUMENT,
"Attempt to set this nodes status"),
Callback<void>::Ignored());
return;
}
connection_stream_->endpoint_->RegisterPeer(NodeId(status.id));
endpoint->ApplyGossipUpdate(std::move(status));
}
void RouterEndpoint::ConnectionStream::Stub::UpdateLinkStatus(
fuchsia::overnet::protocol::LinkStatus status) {
auto* const endpoint = connection_stream_->endpoint_;
OVERNET_TRACE(DEBUG) << "Got: UpdateLinkStatus " << status;
if (status.from == endpoint->node_id()) {
connection_stream_->Close(Status(StatusCode::INVALID_ARGUMENT,
"Attempt to set this nodes link status"),
Callback<void>::Ignored());
return;
}
endpoint->ApplyGossipUpdate(std::move(status));
}
void RouterEndpoint::ConnectionStream::Stub::UpdateNodeDescription(
fuchsia::overnet::protocol::PeerDescription description) {
OVERNET_TRACE(DEBUG) << connection_stream_->endpoint_->node_id()
<< " update node description for "
<< connection_stream_->peer() << ": " << description;
connection_stream_->description_ = std::move(description);
connection_stream_->endpoint_->NewNodeDescriptionTableVersion();
}
void RouterEndpoint::OnNodeDescriptionTableChange(uint64_t last_seen_version,
Callback<void> on_change) {
if (last_seen_version == node_description_table_version_) {
on_node_description_table_change_.emplace_back(std::move(on_change));
}
// else don't store on_change, forcing its destructor to be called, forcing it
// to be called.
}
void RouterEndpoint::NewNodeDescriptionTableVersion() {
++node_description_table_version_;
std::vector<Callback<void>>().swap(on_node_description_table_change_);
}
} // namespace overnet