| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #pragma once |
| |
| #include <random> |
| #include <unordered_map> |
| #include "garnet/lib/overnet/environment/timer.h" |
| #include "garnet/lib/overnet/environment/trace.h" |
| #include "garnet/lib/overnet/protocol/routable_message.h" |
| #include "garnet/lib/overnet/routing/routing_table.h" |
| #include "garnet/lib/overnet/vocabulary/callback.h" |
| #include "garnet/lib/overnet/vocabulary/closed_ptr.h" |
| #include "garnet/lib/overnet/vocabulary/lazy_slice.h" |
| #include "garnet/lib/overnet/vocabulary/slice.h" |
| |
| namespace overnet { |
| namespace router_impl { |
| struct LocalStreamId { |
| NodeId peer; |
| StreamId stream_id; |
| uint64_t Hash() const { return peer.Hash() ^ stream_id.Hash(); } |
| }; |
| inline bool operator==(const LocalStreamId& lhs, const LocalStreamId& rhs) { |
| return lhs.peer == rhs.peer && lhs.stream_id == rhs.stream_id; |
| } |
| } // namespace router_impl |
| } // namespace overnet |
| |
| namespace std { |
| template <> |
| struct hash<overnet::router_impl::LocalStreamId> { |
| size_t operator()(const overnet::router_impl::LocalStreamId& id) const { |
| return id.Hash(); |
| } |
| }; |
| } // namespace std |
| |
| namespace overnet { |
| |
| inline auto ForwardingPayloadFactory(Slice payload) { |
| return [payload = std::move(payload)](auto args) mutable { |
| return std::move(payload); |
| }; |
| } |
| |
| struct Message final { |
| RoutableMessage header; |
| LazySlice make_payload; |
| TimeStamp received; |
| uint32_t mss = std::numeric_limits<uint32_t>::max(); |
| |
| static Message SimpleForwarder(RoutableMessage msg, Slice payload, |
| TimeStamp received) { |
| return Message{std::move(msg), ForwardingPayloadFactory(payload), received}; |
| } |
| }; |
| |
| class Link { |
| public: |
| virtual ~Link() {} |
| virtual void Close(Callback<void> quiesced) = 0; |
| virtual void Forward(Message message) = 0; |
| virtual fuchsia::overnet::protocol::LinkMetrics GetLinkMetrics() = 0; |
| }; |
| |
| template <class T = Link> |
| using LinkPtr = ClosedPtr<T, Link>; |
| template <class T, class... Args> |
| LinkPtr<T> MakeLink(Args&&... args) { |
| return MakeClosedPtr<T, Link>(std::forward<Args>(args)...); |
| } |
| |
| class Router { |
| public: |
| static constexpr inline auto kModule = Module::ROUTER; |
| |
| class StreamHandler { |
| public: |
| virtual ~StreamHandler() {} |
| virtual void RouterClose(Callback<void> quiesced) = 0; |
| virtual void HandleMessage(SeqNum seq, TimeStamp received, Slice data) = 0; |
| }; |
| |
| Router(Timer* timer, NodeId node_id, bool allow_non_determinism); |
| |
| virtual ~Router(); |
| |
| virtual void Close(Callback<void> quiesced); |
| |
| // Forward a message to either ourselves or a link |
| void Forward(Message message); |
| // Register a (locally handled) stream into this Router |
| Status RegisterStream(NodeId peer, StreamId stream_id, |
| StreamHandler* stream_handler); |
| Status UnregisterStream(NodeId peer, StreamId stream_id, |
| StreamHandler* stream_handler); |
| // Register a link to another router (usually on a different machine) |
| void RegisterLink(LinkPtr<> link); |
| |
| NodeId node_id() const { return node_id_; } |
| Timer* timer() const { return timer_; } |
| auto* rng() { return &rng_; } |
| |
| void UpdateRoutingTable( |
| std::vector<fuchsia::overnet::protocol::NodeMetrics> node_metrics, |
| std::vector<fuchsia::overnet::protocol::LinkMetrics> link_metrics) { |
| UpdateRoutingTable(std::move(node_metrics), std::move(link_metrics), false); |
| } |
| |
| void BlockUntilNoBackgroundUpdatesProcessing() { |
| routing_table_.BlockUntilNoBackgroundUpdatesProcessing(); |
| } |
| |
| // Return true if this router believes a route exists to a particular node. |
| bool HasRouteTo(NodeId node_id) { |
| return node_id == node_id_ || link_holder(node_id)->link() != nullptr; |
| } |
| |
| Optional<NodeId> SelectGossipPeer(); |
| Slice WriteGossipUpdate(Border desired_border, NodeId target); |
| Status ApplyGossipUpdate(Slice update, NodeId peer); |
| |
| void SetDescription(fuchsia::overnet::protocol::PeerDescription description); |
| |
| template <class F> |
| void ForEachNodeMetric(F mutator) { |
| routing_table_.ForEachNodeMetric(mutator); |
| } |
| |
| private: |
| Timer* const timer_; |
| const NodeId node_id_; |
| |
| void UpdateRoutingTable( |
| std::vector<fuchsia::overnet::protocol::NodeMetrics> node_metrics, |
| std::vector<fuchsia::overnet::protocol::LinkMetrics> link_metrics, |
| bool flush_old_nodes); |
| virtual void OnUnknownStream(NodeId peer, StreamId stream_id) {} |
| |
| void MaybeStartPollingLinkChanges(); |
| void MaybeStartFlushingOldEntries(); |
| |
| void CloseLinks(Callback<void> quiesced); |
| void CloseStreams(Callback<void> quiesced); |
| |
| class StreamHolder { |
| public: |
| StreamHolder(NodeId peer, StreamId id) : peer_(peer), stream_(id) {} |
| Status SetHandler(StreamHandler* handler); |
| [[nodiscard]] bool HandleMessage(SeqNum seq, TimeStamp received, |
| Slice payload); |
| Status ClearHandler(StreamHandler* handler); |
| void Close(Callback<void> quiesced) { |
| if (handler_ != nullptr) |
| handler_->RouterClose(std::move(quiesced)); |
| } |
| bool has_handler() { return handler_ != nullptr; } |
| |
| private: |
| const NodeId peer_; |
| const StreamId stream_; |
| StreamHandler* handler_ = nullptr; |
| // TODO(ctiller): globally cap the number of buffered packets within Router |
| struct BufferedPacket { |
| SeqNum seq; |
| TimeStamp received; |
| Slice payload; |
| }; |
| std::unique_ptr<BufferedPacket> buffered_; |
| }; |
| |
| class LinkHolder { |
| public: |
| LinkHolder(NodeId target) {} |
| void Forward(Message message); |
| void SetLink(Link* link, uint32_t path_mss, bool is_direct); |
| Link* link() { return link_; } |
| bool has_direct_link() const { return is_direct_; } |
| uint32_t path_mss() { return path_mss_; } |
| |
| uint64_t last_gossip_version() const { return last_gossip_version_; } |
| void set_last_gossip_version(uint64_t n) { last_gossip_version_ = n; } |
| |
| private: |
| Link* link_ = nullptr; |
| bool is_direct_ = false; |
| uint32_t path_mss_ = std::numeric_limits<uint32_t>::max(); |
| std::vector<Message> pending_; |
| uint64_t last_gossip_version_ = 0; |
| }; |
| |
| LinkHolder* link_holder(NodeId node_id) { |
| auto it = links_.find(node_id); |
| if (it != links_.end()) |
| return &it->second; |
| return &links_ |
| .emplace(std::piecewise_construct, |
| std::forward_as_tuple(node_id), |
| std::forward_as_tuple(node_id)) |
| .first->second; |
| } |
| |
| StreamHolder* stream_holder(NodeId node_id, StreamId stream_id) { |
| auto it = streams_.find(LocalStreamId{node_id, stream_id}); |
| if (it != streams_.end()) |
| return &it->second; |
| return &streams_ |
| .emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(LocalStreamId{node_id, stream_id}), |
| std::forward_as_tuple(node_id, stream_id)) |
| .first->second; |
| } |
| |
| typedef router_impl::LocalStreamId LocalStreamId; |
| |
| bool shutting_down_ = false; |
| std::unordered_map<uint64_t, LinkPtr<>> owned_links_; |
| |
| std::unordered_map<LocalStreamId, StreamHolder> streams_; |
| std::unordered_map<NodeId, LinkHolder> links_; |
| std::mt19937 rng_; |
| |
| RoutingTable routing_table_; |
| Optional<Timeout> poll_link_changes_timeout_; |
| Optional<Timeout> flush_old_nodes_timeout_; |
| fuchsia::overnet::protocol::NodeMetrics own_metrics_; |
| }; |
| |
| } // namespace overnet |