blob: d5f9181361207871eb8d28aa0de99b63492f2bb8 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#pragma once
#include <unordered_map>
#include "callback.h"
#include "closed_ptr.h"
#include "lazy_slice.h"
#include "once_fn.h"
#include "routable_message.h"
#include "routing_table.h"
#include "sink.h"
#include "slice.h"
#include "timer.h"
#include "trace.h"
namespace overnet {
namespace router_impl {
struct LocalStreamId {
NodeId peer;
StreamId stream_id;
uint64_t Hash() const { return peer.Hash() ^ stream_id.Hash(); }
};
inline bool operator==(const LocalStreamId& lhs, const LocalStreamId& rhs) {
return lhs.peer == rhs.peer && lhs.stream_id == rhs.stream_id;
}
} // namespace router_impl
} // namespace overnet
namespace std {
template <>
struct hash<overnet::router_impl::LocalStreamId> {
size_t operator()(const overnet::router_impl::LocalStreamId& id) const {
return id.Hash();
}
};
} // namespace std
namespace overnet {
inline auto ForwardingPayloadFactory(Slice payload) {
return [payload = std::move(payload)](auto args) mutable {
return std::move(payload);
};
}
struct Message final {
RoutableMessage header;
LazySlice make_payload;
TimeStamp received;
static Message SimpleForwarder(RoutableMessage msg, Slice payload,
TimeStamp received) {
return Message{std::move(msg), ForwardingPayloadFactory(payload), received};
}
};
class Link {
public:
virtual ~Link() {}
virtual void Close(Callback<void> quiesced) = 0;
virtual void Forward(Message message) = 0;
virtual LinkMetrics GetLinkMetrics() = 0;
};
template <class T = Link>
using LinkPtr = ClosedPtr<T, Link>;
template <class T, class... Args>
LinkPtr<T> MakeLink(Args&&... args) {
return MakeClosedPtr<T, Link>(std::forward<Args>(args)...);
}
class Router final {
public:
class StreamHandler {
public:
virtual ~StreamHandler() {}
virtual void Close(Callback<void> quiesced) = 0;
virtual void HandleMessage(SeqNum seq, TimeStamp received, Slice data) = 0;
};
Router(Timer* timer, TraceSink trace_sink, NodeId node_id,
bool allow_threading)
: timer_(timer),
trace_sink_(trace_sink.Decorate([this](const std::string& msg) {
std::ostringstream out;
out << "Router[" << this << "] " << msg;
return out.str();
})),
node_id_(node_id),
routing_table_(node_id, timer, trace_sink_, allow_threading) {
UpdateRoutingTable({NodeMetrics(node_id, 0)}, {}, false);
}
~Router();
void Close(Callback<void> quiesced);
// Forward a message to either ourselves or a link
void Forward(Message message);
// Register a (locally handled) stream into this Router
Status RegisterStream(NodeId peer, StreamId stream_id,
StreamHandler* stream_handler);
Status UnregisterStream(NodeId peer, StreamId stream_id,
StreamHandler* stream_handler);
// Register a link to another router (usually on a different machine)
void RegisterLink(LinkPtr<> link);
NodeId node_id() const { return node_id_; }
Timer* timer() const { return timer_; }
void UpdateRoutingTable(std::vector<NodeMetrics> node_metrics,
std::vector<LinkMetrics> link_metrics) {
UpdateRoutingTable(std::move(node_metrics), std::move(link_metrics), false);
}
void BlockUntilNoBackgroundUpdatesProcessing() {
routing_table_.BlockUntilNoBackgroundUpdatesProcessing();
}
// Return true if this router believes a route exists to a particular node.
bool HasRouteTo(NodeId node_id) {
return node_id == node_id_ || link_holder(node_id)->link() != nullptr;
}
TraceSink trace_sink() const { return trace_sink_; }
private:
Timer* const timer_;
const TraceSink trace_sink_;
const NodeId node_id_;
void UpdateRoutingTable(std::vector<NodeMetrics> node_metrics,
std::vector<LinkMetrics> link_metrics,
bool flush_old_nodes);
void MaybeStartPollingLinkChanges();
void MaybeStartFlushingOldEntries();
void CloseLinks(Callback<void> quiesced);
void CloseStreams(Callback<void> quiesced);
class StreamHolder {
public:
void HandleMessage(SeqNum seq, TimeStamp received, Slice payload);
Status SetHandler(StreamHandler* handler);
Status ClearHandler(StreamHandler* handler);
void Close(Callback<void> quiesced) {
if (handler_ != nullptr)
handler_->Close(std::move(quiesced));
}
bool has_handler() { return handler_ != nullptr; }
private:
struct Pending {
SeqNum seq;
TimeStamp received;
Slice payload;
};
StreamHandler* handler_ = nullptr;
std::vector<Pending> pending_;
};
class LinkHolder {
public:
LinkHolder(NodeId target, TraceSink trace_sink)
: trace_sink_(
trace_sink.Decorate([this, target](const std::string& msg) {
std::ostringstream out;
out << "Link[" << this << ";to=" << target << "] " << msg;
return out.str();
})) {}
void Forward(Message message);
void SetLink(Link* link, uint32_t path_mss);
Link* link() { return link_; }
uint32_t path_mss() { return path_mss_; }
private:
const TraceSink trace_sink_;
Link* link_ = nullptr;
uint32_t path_mss_ = std::numeric_limits<uint32_t>::max();
std::vector<Message> pending_;
};
LinkHolder* link_holder(NodeId node_id) {
auto it = links_.find(node_id);
if (it != links_.end())
return &it->second;
return &links_
.emplace(std::piecewise_construct,
std::forward_as_tuple(node_id),
std::forward_as_tuple(node_id, trace_sink_))
.first->second;
}
typedef router_impl::LocalStreamId LocalStreamId;
bool shutting_down_ = false;
std::unordered_map<uint64_t, LinkPtr<>> owned_links_;
std::unordered_map<LocalStreamId, StreamHolder> streams_;
std::unordered_map<NodeId, LinkHolder> links_;
RoutingTable routing_table_;
Optional<Timeout> poll_link_changes_timeout_;
Optional<Timeout> flush_old_nodes_timeout_;
};
} // namespace overnet