blob: 22786f107c4e9f282175e8e74fe00238ff813360 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/connectivity/overnet/lib/routing/routing_table.h"
#include <iostream>
#include <unordered_set>
#include "garnet/public/lib/fostr/fidl/fuchsia/overnet/protocol/formatting.h"
#include "src/connectivity/overnet/lib/protocol/varint.h"
using overnet::routing_table_impl::FullLinkLabel;
namespace overnet {
namespace {
template <class T>
void MoveInto(std::initializer_list<T> from, std::vector<T>* to) {
for (auto& val : from) {
to->emplace_back(fidl::Clone(val));
}
}
} // namespace
RoutingTable::RoutingTable(NodeId root_node, Timer* timer, bool allow_threading)
: root_node_(root_node), timer_(timer), allow_threading_(allow_threading) {}
RoutingTable::~RoutingTable() {
std::unique_lock<std::mutex> lock(mu_);
if (processing_changes_) {
std::thread pending_processing = std::move(*processing_changes_);
processing_changes_.Reset();
cv_.notify_all();
lock.unlock();
pending_processing.join();
}
for (auto& n : nodes_) {
RemoveOutgoingLinks(&n.second);
}
}
void RoutingTable::ProcessUpdate(
std::initializer_list<fuchsia::overnet::protocol::NodeStatus> nodes,
std::initializer_list<fuchsia::overnet::protocol::LinkStatus> links,
bool flush_old_nodes) {
if (nodes.size() == 0 && links.size() == 0 && !flush_old_nodes)
return;
std::unique_lock<std::mutex> lock(mu_);
last_update_ = timer_->Now();
const bool was_empty = change_log_.Empty() && !flush_requested_;
if (flush_old_nodes)
flush_requested_ = true;
MoveInto(nodes, &change_log_.nodes);
MoveInto(links, &change_log_.links);
if (!was_empty)
return;
if (processing_changes_)
return;
auto process_changes = [this, changes = std::move(change_log_),
flush = flush_requested_, now = last_update_,
renderer = ScopedRenderer::current(),
severity = ScopedSeverity::current()]() mutable {
ScopedRenderer scoped_renderer(renderer);
ScopedSeverity scoped_severity(severity);
while (true) {
ApplyChanges(now, changes, flush);
SelectedLinks new_selected_links = BuildForwardingTable();
// Publish changes. If change-log has grown, restart update.
std::lock_guard<std::mutex> lock(mu_);
if (selected_links_ != new_selected_links) {
selected_links_.swap(new_selected_links);
selected_links_version_++;
}
if (!processing_changes_) {
// Indicates that the owning RoutingTable instance is in its destruction
// sequence.
cv_.notify_all();
return;
} else if (change_log_.Empty() && !flush_requested_) {
processing_changes_->detach();
processing_changes_.Reset();
cv_.notify_all();
return;
} else {
changes = std::move(change_log_);
change_log_.Clear();
flush = flush_requested_;
flush_requested_ = false;
now = last_update_;
}
}
};
if (allow_threading_) {
processing_changes_.Reset(std::move(process_changes));
cv_.notify_all();
}
flush_requested_ = false;
change_log_.Clear();
if (!allow_threading_) {
lock.unlock();
process_changes();
}
}
void RoutingTable::ApplyChanges(TimeStamp now, const StatusVecs& changes,
bool flush) {
bool new_gossip_version = false;
// Update all metrics from changelogs.
for (const auto& m : changes.nodes) {
auto it = nodes_.find(NodeId(m.id));
const char* log_verb = "uninteresting";
if (it == nodes_.end()) {
if (m.version != fuchsia::overnet::protocol::METRIC_VERSION_TOMBSTONE) {
new_gossip_version = true;
nodes_.emplace(std::piecewise_construct,
std::forward_as_tuple(NodeId(m.id)),
std::forward_as_tuple(now, fidl::Clone(m)));
log_verb = "new";
}
} else if (m.version > it->second.status.version) {
new_gossip_version = true;
it->second.status = fidl::Clone(m);
it->second.last_updated = now;
log_verb = "updated";
}
OVERNET_TRACE(DEBUG) << "NODE UPDATE: " << log_verb << " " << m;
}
for (const auto& m : changes.links) {
auto report_drop = [&m](const char* why) {
OVERNET_TRACE(DEBUG) << "Drop link info: from=" << m.from
<< " to=" << m.to << " label=" << m.local_id
<< " version=" << m.version << ": " << why;
};
// Cannot add a link if the relevant nodes are unknown.
auto from_node = nodes_.find(NodeId(m.from));
if (from_node == nodes_.end()) {
report_drop("from node does not exist in routing table");
continue;
}
auto to_node = nodes_.find(NodeId(m.to));
if (to_node == nodes_.end()) {
report_drop("to node does not exist in routing table");
continue;
}
// Add the link.
const FullLinkLabel key = {NodeId(m.from), NodeId(m.to), m.local_id};
auto it = links_.find(key);
if (it == links_.end() &&
m.version != fuchsia::overnet::protocol::METRIC_VERSION_TOMBSTONE) {
new_gossip_version = true;
it = links_
.emplace(
std::piecewise_construct, std::forward_as_tuple(key),
std::forward_as_tuple(now, fidl::Clone(m),
&from_node->second, &to_node->second))
.first;
from_node->second.outgoing_links.PushBack(&it->second);
to_node->second.incoming_links.PushBack(&it->second);
OVERNET_TRACE(DEBUG) << "NEWLINK: " << m;
} else if (m.version > it->second.status.version) {
new_gossip_version = true;
it->second.status = fidl::Clone(m);
it->second.last_updated = now;
OVERNET_TRACE(DEBUG) << "UPDATELINK: " << m;
} else {
report_drop("old version");
continue; // Skip keep-alive.
}
// Keep-alive the nodes.
from_node->second.last_updated = now;
to_node->second.last_updated = now;
}
// Remove anything old if we've been asked to.
if (flush) {
for (auto it = nodes_.begin(); it != nodes_.end();) {
if (it->first != root_node_ &&
it->second.last_updated >= now + EntryExpiry()) {
RemoveOutgoingLinks(&it->second);
RemoveIncomingLinks(&it->second);
it = nodes_.erase(it);
} else {
++it;
}
}
}
// Publish out to the application for propagation.
if (new_gossip_version) {
std::vector<fuchsia::overnet::protocol::NodeStatus> publish_node_status;
std::vector<fuchsia::overnet::protocol::LinkStatus> publish_link_status;
for (const auto& np : nodes_) {
publish_node_status.push_back(fidl::Clone(np.second.status));
}
for (const auto& lp : links_) {
publish_link_status.push_back(fidl::Clone(lp.second.status));
}
std::vector<Callback<void>> notify_callbacks;
std::lock_guard<std::mutex> mutex(shared_table_mu_);
gossip_version_++;
shared_node_status_.swap(publish_node_status);
shared_link_status_.swap(publish_link_status);
// Places complete list of notification callbacks into notify_callbacks,
// which will be destroyed after mutex is released, forcing all callbacks to
// be called.
on_node_table_update_.swap(notify_callbacks);
}
}
void RoutingTable::RemoveOutgoingLinks(Node* node) {
while (Link* link = node->outgoing_links.PopFront()) {
link->to_node->incoming_links.Remove(link);
links_.erase(FullLinkLabel{NodeId(link->status.from),
NodeId(link->status.to), link->status.local_id});
}
}
void RoutingTable::RemoveIncomingLinks(Node* node) {
while (Link* link = node->incoming_links.PopFront()) {
link->from_node->outgoing_links.Remove(link);
links_.erase(FullLinkLabel{NodeId(link->status.from),
NodeId(link->status.to), link->status.local_id});
}
}
RoutingTable::SelectedLinks RoutingTable::BuildForwardingTable() {
OVERNET_TRACE(DEBUG) << "Rebuilding forwarding table";
auto node_it = nodes_.find(root_node_);
if (node_it == nodes_.end()) {
OVERNET_TRACE(DEBUG) << "No root known";
return SelectedLinks(); // Root node as yet unknown.
}
for (const auto& n : nodes_) {
OVERNET_TRACE(DEBUG) << n.first << " metrics\n" << n.second.status.metrics;
}
for (const auto& l : links_) {
OVERNET_TRACE(DEBUG) << l.first << " metrics\n" << l.second.status.metrics;
}
++path_finding_run_;
node_it->second.last_path_finding_run = path_finding_run_;
node_it->second.best_rtt = TimeDelta::Zero();
node_it->second.mss = std::numeric_limits<uint32_t>::max();
InternalList<Node, &Node::path_finding_node> todo;
auto enqueue = [&todo](Node* node) {
if (node->queued)
return;
node->queued = true;
todo.PushBack(node);
};
auto node_id_of = [this](const Node* n) {
for (const auto& np : nodes_) {
if (&np.second == n) {
return np.first;
}
}
return NodeId(0);
};
enqueue(&node_it->second);
while (!todo.Empty()) {
Node* src = todo.PopFront();
src->queued = false;
for (auto link : src->outgoing_links) {
if (link->status.version ==
fuchsia::overnet::protocol::METRIC_VERSION_TOMBSTONE)
continue;
TimeDelta rtt = std::min(
TimeDelta::FromHours(1),
src->best_rtt +
(src->status.metrics.has_forwarding_time()
? TimeDelta::FromMicroseconds(
src->status.metrics.forwarding_time())
: TimeDelta::PositiveInf()) +
(link->status.metrics.has_rtt()
? TimeDelta::FromMicroseconds(link->status.metrics.rtt())
: TimeDelta::PositiveInf()));
Node* dst = link->to_node;
// For now we order by RTT.
OVERNET_TRACE(DEBUG) << "RB[" << root_node_
<< "]: src=" << node_id_of(src)
<< " dst=" << node_id_of(dst)
<< " dst->last_path_finding_run="
<< dst->last_path_finding_run
<< " path_finding_run=" << path_finding_run_
<< " src->best_rtt=" << src->best_rtt
<< " dst->best_rtt=" << dst->best_rtt
<< " rtt=" << rtt << " src->mss=" << src->mss
<< " link->mss="
<< (link->status.metrics.has_mss()
? link->status.metrics.mss()
: std::numeric_limits<uint32_t>::max());
if (dst->last_path_finding_run != path_finding_run_ ||
dst->best_rtt > rtt) {
dst->last_path_finding_run = path_finding_run_;
dst->best_rtt = rtt;
dst->best_from = src;
dst->best_link = link;
dst->mss =
std::min(src->mss, link->status.metrics.has_mss()
? link->status.metrics.mss()
: std::numeric_limits<uint32_t>::max());
enqueue(dst);
}
}
}
SelectedLinks selected_links;
for (node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
if (node_it->first == root_node_) {
continue;
}
if (node_it->second.last_path_finding_run != path_finding_run_) {
OVERNET_TRACE(DEBUG) << "RB[" << root_node_ << "]: " << node_it->first
<< " unreachable";
continue; // Unreachable
}
Node* n = &node_it->second;
while (n->best_from->status.id != root_node_) {
n = n->best_from;
}
Link* link = n->best_link;
assert(link->status.from == root_node_);
selected_links.emplace(node_it->first,
SelectedLink{link->status.to, link->status.local_id,
node_it->second.mss});
}
return selected_links;
}
uint64_t RoutingTable::SendUpdate(fuchsia::overnet::protocol::Peer_Proxy* peer,
Optional<NodeId> exclude_node) const {
OVERNET_TRACE(DEBUG) << root_node_ << " SendUpdate exclude=" << exclude_node;
std::lock_guard<std::mutex> mutex(shared_table_mu_);
std::unordered_set<NodeId> version_zero_nodes;
for (const auto& m : shared_node_status_) {
OVERNET_TRACE(DEBUG) << "Consider node: " << m;
if (m.version == 0) {
version_zero_nodes.insert(NodeId(m.id));
continue;
}
if (NodeId(m.id) == exclude_node) {
continue;
}
OVERNET_TRACE(DEBUG) << "Send: " << m;
peer->UpdateNodeStatus(fidl::Clone(m));
}
for (const auto& m : shared_link_status_) {
OVERNET_TRACE(DEBUG) << "Consider link: " << m;
if (NodeId(m.from) == exclude_node ||
version_zero_nodes.count(NodeId(m.from)) > 0 ||
version_zero_nodes.count(NodeId(m.to)) > 0) {
continue;
}
OVERNET_TRACE(DEBUG) << "Send: " << m;
peer->UpdateLinkStatus(fidl::Clone(m));
}
return gossip_version_;
}
} // namespace overnet