blob: c719ba9fad3ac742a170a6af3fb97da10f9a8be6 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#pragma once
#include <random>
#include <unordered_map>
#include "src/connectivity/overnet/lib/environment/timer.h"
#include "src/connectivity/overnet/lib/environment/trace.h"
#include "src/connectivity/overnet/lib/protocol/routable_message.h"
#include "src/connectivity/overnet/lib/routing/routing_table.h"
#include "src/connectivity/overnet/lib/stats/link.h"
#include "src/connectivity/overnet/lib/vocabulary/callback.h"
#include "src/connectivity/overnet/lib/vocabulary/closed_ptr.h"
#include "src/connectivity/overnet/lib/vocabulary/lazy_slice.h"
#include "src/connectivity/overnet/lib/vocabulary/slice.h"
namespace overnet {
namespace router_impl {
struct LocalStreamId {
NodeId peer;
StreamId stream_id;
uint64_t Hash() const { return peer.Hash() ^ stream_id.Hash(); }
};
inline bool operator==(const LocalStreamId& lhs, const LocalStreamId& rhs) {
return lhs.peer == rhs.peer && lhs.stream_id == rhs.stream_id;
}
} // namespace router_impl
} // namespace overnet
namespace std {
template <>
struct hash<overnet::router_impl::LocalStreamId> {
size_t operator()(const overnet::router_impl::LocalStreamId& id) const { return id.Hash(); }
};
} // namespace std
namespace overnet {
inline auto ForwardingPayloadFactory(Slice payload) {
return [payload = std::move(payload)](auto args) mutable { return std::move(payload); };
}
struct Message final {
RoutableMessage header;
LazySlice make_payload;
TimeStamp received;
uint32_t mss = std::numeric_limits<uint32_t>::max();
static Message SimpleForwarder(RoutableMessage msg, Slice payload, TimeStamp received) {
return Message{std::move(msg), ForwardingPayloadFactory(payload), received};
}
};
class Link {
public:
virtual ~Link() {}
virtual void Close(Callback<void> quiesced) = 0;
virtual void Forward(Message message) = 0;
virtual fuchsia::overnet::protocol::LinkStatus GetLinkStatus() = 0;
virtual const LinkStats* GetStats() const = 0;
};
template <class T = Link>
using LinkPtr = ClosedPtr<T, Link>;
template <class T, class... Args>
LinkPtr<T> MakeLink(Args&&... args) {
return MakeClosedPtr<T, Link>(std::forward<Args>(args)...);
}
class Router {
public:
static constexpr inline auto kModule = Module::ROUTER;
class StreamHandler {
public:
virtual ~StreamHandler() {}
virtual void RouterClose(Callback<void> quiesced) = 0;
virtual void HandleMessage(SeqNum seq, TimeStamp received, Slice data) = 0;
};
Router(Timer* timer, NodeId node_id, bool allow_non_determinism);
virtual ~Router();
virtual void Close(Callback<void> quiesced);
// Forward a message to either ourselves or a link
void Forward(Message message);
// Register a (locally handled) stream into this Router
Status RegisterStream(NodeId peer, StreamId stream_id, StreamHandler* stream_handler);
Status UnregisterStream(NodeId peer, StreamId stream_id, StreamHandler* stream_handler);
// Register a link to another router (usually on a different machine)
void RegisterLink(LinkPtr<> link);
NodeId node_id() const { return node_id_; }
Timer* timer() const { return timer_; }
auto* rng() { return &rng_; }
void UpdateRoutingTable(
std::initializer_list<fuchsia::overnet::protocol::NodeStatus> node_status,
std::initializer_list<fuchsia::overnet::protocol::LinkStatus> link_status) {
UpdateRoutingTable(std::move(node_status), std::move(link_status), false);
}
void BlockUntilNoBackgroundUpdatesProcessing() {
routing_table_.BlockUntilNoBackgroundUpdatesProcessing();
}
// Return true if this router believes a route exists to a particular node.
bool HasRouteTo(NodeId node_id) {
return node_id == node_id_ || link_holder(node_id)->link() != nullptr;
}
Optional<NodeId> SelectGossipPeer();
void SendGossipUpdate(fuchsia::overnet::protocol::Peer_Proxy* peer, NodeId target);
void ApplyGossipUpdate(fuchsia::overnet::protocol::NodeStatus node_status) {
UpdateRoutingTable({std::move(node_status)}, {}, false);
}
void ApplyGossipUpdate(fuchsia::overnet::protocol::LinkStatus link_status) {
UpdateRoutingTable({}, {std::move(link_status)}, false);
}
template <class F>
void ForEachNodeMetric(F mutator) {
routing_table_.ForEachNodeMetric(mutator);
}
// Request notification that the node tables has been updated.
// This may be called back on an arbitrary thread (unlike most of overnet).
void OnNodeTableUpdate(uint64_t last_seen_version, Callback<void> callback) {
routing_table_.OnNodeTableUpdate(last_seen_version, std::move(callback));
}
template <class F>
void ForEachLink(F f) const {
for (const auto& [label, link] : owned_links_) {
f(label.target_node, link.get());
}
}
private:
Timer* const timer_;
const NodeId node_id_;
void UpdateRoutingTable(std::initializer_list<fuchsia::overnet::protocol::NodeStatus> node_status,
std::initializer_list<fuchsia::overnet::protocol::LinkStatus> link_status,
bool flush_old_nodes);
virtual void OnUnknownStream(NodeId peer, StreamId stream_id) {}
void MaybeStartPollingLinkChanges();
void MaybeStartFlushingOldEntries();
void CloseLinks(Callback<void> quiesced);
void CloseStreams(Callback<void> quiesced);
class StreamHolder {
public:
StreamHolder(NodeId peer, StreamId id) : peer_(peer), stream_(id) {}
Status SetHandler(StreamHandler* handler);
[[nodiscard]] bool HandleMessage(SeqNum seq, TimeStamp received, Slice payload);
Status ClearHandler(StreamHandler* handler);
void Close(Callback<void> quiesced) {
if (handler_ != nullptr)
handler_->RouterClose(std::move(quiesced));
}
bool has_handler() { return handler_ != nullptr; }
private:
const NodeId peer_;
const StreamId stream_;
StreamHandler* handler_ = nullptr;
// TODO(ctiller): globally cap the number of buffered packets within Router
struct BufferedPacket {
SeqNum seq;
TimeStamp received;
Slice payload;
};
std::unique_ptr<BufferedPacket> buffered_;
};
class LinkHolder {
public:
LinkHolder(NodeId target) {}
void Forward(Message message);
void SetLink(Link* link, uint32_t path_mss, bool is_direct);
Link* link() { return link_; }
const Link* link() const { return link_; }
bool has_direct_link() const { return is_direct_; }
uint32_t path_mss() { return path_mss_; }
uint64_t last_gossip_version() const { return last_gossip_version_; }
void set_last_gossip_version(uint64_t n) { last_gossip_version_ = n; }
private:
Link* link_ = nullptr;
bool is_direct_ = false;
uint32_t path_mss_ = std::numeric_limits<uint32_t>::max();
std::vector<Message> pending_;
uint64_t last_gossip_version_ = 0;
};
LinkHolder* link_holder(NodeId node_id) {
auto it = links_.find(node_id);
if (it != links_.end())
return &it->second;
return &links_
.emplace(std::piecewise_construct, std::forward_as_tuple(node_id),
std::forward_as_tuple(node_id))
.first->second;
}
StreamHolder* stream_holder(NodeId node_id, StreamId stream_id) {
auto it = streams_.find(LocalStreamId{node_id, stream_id});
if (it != streams_.end())
return &it->second;
return &streams_
.emplace(std::piecewise_construct,
std::forward_as_tuple(LocalStreamId{node_id, stream_id}),
std::forward_as_tuple(node_id, stream_id))
.first->second;
}
typedef router_impl::LocalStreamId LocalStreamId;
struct OwnedLabel {
NodeId target_node;
uint64_t target_nodes_label;
bool operator==(const OwnedLabel& other) const {
return target_node == other.target_node && target_nodes_label == other.target_nodes_label;
}
};
struct HashOwnedLabel {
size_t operator()(const OwnedLabel& label) const {
return label.target_node.get() ^ label.target_nodes_label;
}
};
bool shutting_down_ = false;
std::unordered_map<OwnedLabel, LinkPtr<>, HashOwnedLabel> owned_links_;
std::unordered_map<LocalStreamId, StreamHolder> streams_;
std::unordered_map<NodeId, LinkHolder> links_;
std::mt19937 rng_;
RoutingTable routing_table_;
Optional<Timeout> poll_link_changes_timeout_;
Optional<Timeout> flush_old_nodes_timeout_;
fuchsia::overnet::protocol::NodeStatus own_node_status_;
};
} // namespace overnet