blob: 167f5743c7e32c27bc5300d5aca5f28c7b2e9e3c [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#pragma once
#include <unordered_map>
#include "callback.h"
#include "routable_message.h"
#include "routing_table.h"
#include "sink.h"
#include "slice.h"
#include "timer.h"
namespace overnet {
namespace router_impl {
struct LocalStreamId {
NodeId peer;
StreamId stream_id;
uint64_t Hash() const { return peer.Hash() ^ stream_id.Hash(); }
};
inline bool operator==(const LocalStreamId& lhs, const LocalStreamId& rhs) {
return lhs.peer == rhs.peer && lhs.stream_id == rhs.stream_id;
}
} // namespace router_impl
} // namespace overnet
namespace std {
template <>
struct hash<overnet::router_impl::LocalStreamId> {
size_t operator()(const overnet::router_impl::LocalStreamId& id) const {
return id.Hash();
}
};
} // namespace std
namespace overnet {
struct Message final {
RoutableMessage wire;
TimeStamp received;
StatusCallback done;
};
class Link {
public:
virtual ~Link() {}
virtual void Forward(Message message) = 0;
virtual LinkMetrics GetLinkMetrics() = 0;
};
class Router final {
public:
class StreamHandler {
public:
virtual ~StreamHandler() {}
virtual void HandleMessage(Optional<SeqNum> seq, TimeStamp received,
Slice data, StatusCallback done) = 0;
};
Router(Timer* timer, NodeId node_id, bool allow_threading)
: timer_(timer),
node_id_(node_id),
routing_table_(node_id, timer, allow_threading) {
UpdateRoutingTable({NodeMetrics(node_id, 0)}, {}, false);
}
// Forward a message to either ourselves or a link
void Forward(Message message);
// Register a (locally handled) stream into this Router
Status RegisterStream(NodeId peer, StreamId stream_id,
StreamHandler* stream_handler);
// Register a link to another router (usually on a different machine)
void RegisterLink(std::unique_ptr<Link> link);
NodeId node_id() const { return node_id_; }
Timer* timer() const { return timer_; }
void UpdateRoutingTable(std::vector<NodeMetrics> node_metrics,
std::vector<LinkMetrics> link_metrics) {
UpdateRoutingTable(std::move(node_metrics), std::move(link_metrics), false);
}
void BlockUntilNoBackgroundUpdatesProcessing() {
routing_table_.BlockUntilNoBackgroundUpdatesProcessing();
}
// Return true if this router believes a route exists to a particular node.
bool HasRouteTo(NodeId node_id) {
return node_id == node_id_ || links_[node_id].link() != nullptr;
}
private:
Timer* const timer_;
const NodeId node_id_;
void UpdateRoutingTable(std::vector<NodeMetrics> node_metrics,
std::vector<LinkMetrics> link_metrics,
bool flush_old_nodes);
void MaybeStartPollingLinkChanges();
void MaybeStartFlushingOldEntries();
class StreamHolder {
public:
void HandleMessage(Optional<SeqNum> seq, TimeStamp received, Slice data,
StatusCallback done);
Status SetHandler(StreamHandler* handler);
private:
struct Pending {
Optional<SeqNum> seq;
TimeStamp received;
Slice data;
StatusCallback done;
};
StreamHandler* handler_ = nullptr;
std::vector<Pending> pending_;
};
class LinkHolder {
public:
void Forward(Message message);
void SetLink(Link* link);
Link* link() { return link_; }
private:
Link* link_ = nullptr;
std::vector<Message> pending_;
};
typedef router_impl::LocalStreamId LocalStreamId;
std::unordered_map<uint64_t, std::unique_ptr<Link>> owned_links_;
std::unordered_map<LocalStreamId, StreamHolder> streams_;
std::unordered_map<NodeId, LinkHolder> links_;
RoutingTable routing_table_;
Optional<Timeout> poll_link_changes_timeout_;
Optional<Timeout> flush_old_nodes_timeout_;
};
} // namespace overnet