blob: 67c91ab863ca20256f01fb648b84be2e67c2c728 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#pragma once
#include <random>
#include <unordered_map>
#include "garnet/lib/overnet/environment/timer.h"
#include "garnet/lib/overnet/environment/trace.h"
#include "garnet/lib/overnet/protocol/routable_message.h"
#include "garnet/lib/overnet/routing/routing_table.h"
#include "garnet/lib/overnet/vocabulary/callback.h"
#include "garnet/lib/overnet/vocabulary/closed_ptr.h"
#include "garnet/lib/overnet/vocabulary/lazy_slice.h"
#include "garnet/lib/overnet/vocabulary/slice.h"
namespace overnet {
namespace router_impl {
struct LocalStreamId {
NodeId peer;
StreamId stream_id;
uint64_t Hash() const { return peer.Hash() ^ stream_id.Hash(); }
};
inline bool operator==(const LocalStreamId& lhs, const LocalStreamId& rhs) {
return lhs.peer == rhs.peer && lhs.stream_id == rhs.stream_id;
}
} // namespace router_impl
} // namespace overnet
namespace std {
template <>
struct hash<overnet::router_impl::LocalStreamId> {
size_t operator()(const overnet::router_impl::LocalStreamId& id) const {
return id.Hash();
}
};
} // namespace std
namespace overnet {
inline auto ForwardingPayloadFactory(Slice payload) {
return [payload = std::move(payload)](auto args) mutable {
return std::move(payload);
};
}
struct Message final {
RoutableMessage header;
LazySlice make_payload;
TimeStamp received;
uint32_t mss = std::numeric_limits<uint32_t>::max();
static Message SimpleForwarder(RoutableMessage msg, Slice payload,
TimeStamp received) {
return Message{std::move(msg), ForwardingPayloadFactory(payload), received};
}
};
class Link {
public:
virtual ~Link() {}
virtual void Close(Callback<void> quiesced) = 0;
virtual void Forward(Message message) = 0;
virtual fuchsia::overnet::protocol::LinkMetrics GetLinkMetrics() = 0;
};
template <class T = Link>
using LinkPtr = ClosedPtr<T, Link>;
template <class T, class... Args>
LinkPtr<T> MakeLink(Args&&... args) {
return MakeClosedPtr<T, Link>(std::forward<Args>(args)...);
}
class Router {
public:
static constexpr inline auto kModule = Module::ROUTER;
class StreamHandler {
public:
virtual ~StreamHandler() {}
virtual void RouterClose(Callback<void> quiesced) = 0;
virtual void HandleMessage(SeqNum seq, TimeStamp received, Slice data) = 0;
};
Router(Timer* timer, NodeId node_id, bool allow_non_determinism);
virtual ~Router();
virtual void Close(Callback<void> quiesced);
// Forward a message to either ourselves or a link
void Forward(Message message);
// Register a (locally handled) stream into this Router
Status RegisterStream(NodeId peer, StreamId stream_id,
StreamHandler* stream_handler);
Status UnregisterStream(NodeId peer, StreamId stream_id,
StreamHandler* stream_handler);
// Register a link to another router (usually on a different machine)
void RegisterLink(LinkPtr<> link);
NodeId node_id() const { return node_id_; }
Timer* timer() const { return timer_; }
auto* rng() { return &rng_; }
void UpdateRoutingTable(
std::vector<fuchsia::overnet::protocol::NodeMetrics> node_metrics,
std::vector<fuchsia::overnet::protocol::LinkMetrics> link_metrics) {
UpdateRoutingTable(std::move(node_metrics), std::move(link_metrics), false);
}
void BlockUntilNoBackgroundUpdatesProcessing() {
routing_table_.BlockUntilNoBackgroundUpdatesProcessing();
}
// Return true if this router believes a route exists to a particular node.
bool HasRouteTo(NodeId node_id) {
return node_id == node_id_ || link_holder(node_id)->link() != nullptr;
}
Optional<NodeId> SelectGossipPeer();
Slice WriteGossipUpdate(Border desired_border, NodeId target);
Status ApplyGossipUpdate(Slice update, NodeId peer);
void SetDescription(fuchsia::overnet::protocol::PeerDescription description);
template <class F>
void ForEachNodeMetric(F mutator) {
routing_table_.ForEachNodeMetric(mutator);
}
private:
Timer* const timer_;
const NodeId node_id_;
void UpdateRoutingTable(
std::vector<fuchsia::overnet::protocol::NodeMetrics> node_metrics,
std::vector<fuchsia::overnet::protocol::LinkMetrics> link_metrics,
bool flush_old_nodes);
virtual void OnUnknownStream(NodeId peer, StreamId stream_id) {}
void MaybeStartPollingLinkChanges();
void MaybeStartFlushingOldEntries();
void CloseLinks(Callback<void> quiesced);
void CloseStreams(Callback<void> quiesced);
class StreamHolder {
public:
StreamHolder(NodeId peer, StreamId id) : peer_(peer), stream_(id) {}
Status SetHandler(StreamHandler* handler);
[[nodiscard]] bool HandleMessage(SeqNum seq, TimeStamp received,
Slice payload);
Status ClearHandler(StreamHandler* handler);
void Close(Callback<void> quiesced) {
if (handler_ != nullptr)
handler_->RouterClose(std::move(quiesced));
}
bool has_handler() { return handler_ != nullptr; }
private:
const NodeId peer_;
const StreamId stream_;
StreamHandler* handler_ = nullptr;
// TODO(ctiller): globally cap the number of buffered packets within Router
struct BufferedPacket {
SeqNum seq;
TimeStamp received;
Slice payload;
};
std::unique_ptr<BufferedPacket> buffered_;
};
class LinkHolder {
public:
LinkHolder(NodeId target) {}
void Forward(Message message);
void SetLink(Link* link, uint32_t path_mss, bool is_direct);
Link* link() { return link_; }
bool has_direct_link() const { return is_direct_; }
uint32_t path_mss() { return path_mss_; }
uint64_t last_gossip_version() const { return last_gossip_version_; }
void set_last_gossip_version(uint64_t n) { last_gossip_version_ = n; }
private:
Link* link_ = nullptr;
bool is_direct_ = false;
uint32_t path_mss_ = std::numeric_limits<uint32_t>::max();
std::vector<Message> pending_;
uint64_t last_gossip_version_ = 0;
};
LinkHolder* link_holder(NodeId node_id) {
auto it = links_.find(node_id);
if (it != links_.end())
return &it->second;
return &links_
.emplace(std::piecewise_construct,
std::forward_as_tuple(node_id),
std::forward_as_tuple(node_id))
.first->second;
}
StreamHolder* stream_holder(NodeId node_id, StreamId stream_id) {
auto it = streams_.find(LocalStreamId{node_id, stream_id});
if (it != streams_.end())
return &it->second;
return &streams_
.emplace(
std::piecewise_construct,
std::forward_as_tuple(LocalStreamId{node_id, stream_id}),
std::forward_as_tuple(node_id, stream_id))
.first->second;
}
typedef router_impl::LocalStreamId LocalStreamId;
bool shutting_down_ = false;
std::unordered_map<uint64_t, LinkPtr<>> owned_links_;
std::unordered_map<LocalStreamId, StreamHolder> streams_;
std::unordered_map<NodeId, LinkHolder> links_;
std::mt19937 rng_;
RoutingTable routing_table_;
Optional<Timeout> poll_link_changes_timeout_;
Optional<Timeout> flush_old_nodes_timeout_;
fuchsia::overnet::protocol::NodeMetrics own_metrics_;
};
} // namespace overnet