blob: f1cca95bfe747cfdf67ad9360f1d925c2222dbea [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "routing_table.h"
#include <iostream>
#include <unordered_set>
#include "garnet/lib/overnet/protocol/varint.h"
using overnet::routing_table_impl::FullLinkLabel;
namespace overnet {
namespace {
template <class T>
void MoveInto(std::vector<T>* from, std::vector<T>* to) {
if (to->empty()) {
*to = std::move(*from);
} else {
for (auto& val : *from) {
to->emplace_back(std::move(val));
}
}
}
} // namespace
RoutingTable::RoutingTable(NodeId root_node, Timer* timer, bool allow_threading)
: root_node_(root_node), timer_(timer), allow_threading_(allow_threading) {}
RoutingTable::~RoutingTable() {
std::unique_lock<std::mutex> lock(mu_);
if (processing_changes_) {
std::thread pending_processing = std::move(*processing_changes_);
processing_changes_.Reset();
cv_.notify_all();
lock.unlock();
pending_processing.join();
}
for (auto& n : node_metrics_) {
RemoveOutgoingLinks(n.second);
}
}
void RoutingTable::Update(std::vector<NodeMetrics> node_metrics,
std::vector<LinkMetrics> link_metrics,
bool flush_old_nodes) {
if (node_metrics.empty() && link_metrics.empty() && !flush_old_nodes)
return;
std::unique_lock<std::mutex> lock(mu_);
last_update_ = timer_->Now();
const bool was_empty = change_log_.Empty() && !flush_requested_;
if (flush_old_nodes)
flush_requested_ = true;
#ifndef NDEBUG
for (const auto& m : node_metrics) {
assert(m.version() != 0 || m.created_locally());
}
for (const auto& m : link_metrics) {
assert(m.version() != 0);
}
#endif
MoveInto(&node_metrics, &change_log_.node_metrics);
MoveInto(&link_metrics, &change_log_.link_metrics);
if (!was_empty)
return;
if (processing_changes_)
return;
auto process_changes = [this, changes = std::move(change_log_),
flush = flush_requested_, now = last_update_,
renderer = ScopedRenderer::current(),
severity = ScopedSeverity::current()]() mutable {
ScopedRenderer scoped_renderer(renderer);
ScopedSeverity scoped_severity(severity);
while (true) {
ApplyChanges(now, changes, flush);
SelectedLinks new_selected_links = BuildForwardingTable();
// Publish changes. If change-log has grown, restart update.
std::lock_guard<std::mutex> lock(mu_);
if (selected_links_ != new_selected_links) {
selected_links_.swap(new_selected_links);
selected_links_version_++;
}
if (!processing_changes_) {
// Indicates that the owning RoutingTable instance is in its destruction
// sequence.
cv_.notify_all();
return;
} else if (change_log_.Empty() && !flush_requested_) {
processing_changes_->detach();
processing_changes_.Reset();
cv_.notify_all();
return;
} else {
changes = std::move(change_log_);
change_log_.Clear();
flush = flush_requested_;
flush_requested_ = false;
now = last_update_;
}
}
};
if (allow_threading_) {
processing_changes_.Reset(std::move(process_changes));
cv_.notify_all();
}
flush_requested_ = false;
change_log_.Clear();
if (!allow_threading_) {
lock.unlock();
process_changes();
}
}
void RoutingTable::ApplyChanges(TimeStamp now, const Metrics& changes,
bool flush) {
bool new_gossip_version = false;
// Update all metrics from changelogs.
for (const auto& m : changes.node_metrics) {
OVERNET_TRACE(DEBUG) << "APPLY: " << m;
if (m.node_id() == root_node_ && !m.created_locally()) {
OVERNET_TRACE(WARNING) << "Dropping node update received to self: " << m;
continue;
}
auto it = node_metrics_.find(m.node_id());
if (it == node_metrics_.end()) {
if (m.version() != METRIC_VERSION_TOMBSTONE) {
new_gossip_version = true;
node_metrics_.emplace(std::piecewise_construct,
std::forward_as_tuple(m.node_id()),
std::forward_as_tuple(now, m));
}
} else if (m.version() > it->second.metrics.version()) {
new_gossip_version = true;
it->second.metrics = m;
it->second.last_updated = now;
}
}
for (const auto& m : changes.link_metrics) {
OVERNET_TRACE(DEBUG) << "APPLY: " << m;
auto report_drop = [&m](const char* why) {
OVERNET_TRACE(INFO) << "Drop link info: from=" << m.from()
<< " to=" << m.to() << " label=" << m.link_label()
<< " version=" << m.version() << ": " << why;
};
if (m.from() == root_node_ && !m.created_locally()) {
OVERNET_TRACE(WARNING)
<< "Dropping link update for link owned by self: " << m;
continue;
}
// Cannot add a link if the relevant nodes are unknown.
auto from_node = node_metrics_.find(m.from());
if (from_node == node_metrics_.end()) {
report_drop("from node does not exist in routing table");
continue;
}
auto to_node = node_metrics_.find(m.to());
if (to_node == node_metrics_.end()) {
report_drop("to node does not exist in routing table");
continue;
}
// Add the link.
const FullLinkLabel key = {m.from(), m.to(), m.link_label()};
auto it = link_metrics_.find(key);
if (it == link_metrics_.end() && m.version() != METRIC_VERSION_TOMBSTONE) {
new_gossip_version = true;
it = link_metrics_
.emplace(std::piecewise_construct, std::forward_as_tuple(key),
std::forward_as_tuple(now, m, &to_node->second))
.first;
from_node->second.outgoing_links.PushBack(&it->second);
} else if (m.version() > it->second.metrics.version()) {
new_gossip_version = true;
it->second.metrics = m;
it->second.last_updated = now;
} else {
report_drop("old version");
continue; // Skip keep-alive.
}
// Keep-alive the nodes.
from_node->second.last_updated = now;
to_node->second.last_updated = now;
}
// Remove anything old if we've been asked to.
if (flush) {
for (auto it = node_metrics_.begin(); it != node_metrics_.end();) {
if (it->first != root_node_ &&
it->second.last_updated >= now + EntryExpiry()) {
RemoveOutgoingLinks(it->second);
it = node_metrics_.erase(it);
} else {
++it;
}
}
}
// Publish out to the application for propagation.
if (new_gossip_version) {
std::vector<NodeMetrics> publish_node_metrics;
std::vector<LinkMetrics> publish_link_metrics;
for (const auto& np : node_metrics_) {
publish_node_metrics.push_back(np.second.metrics);
}
for (const auto& lp : link_metrics_) {
publish_link_metrics.push_back(lp.second.metrics);
}
std::lock_guard<std::mutex> mutex(shared_table_mu_);
gossip_version_++;
shared_node_metrics_.swap(publish_node_metrics);
shared_link_metrics_.swap(publish_link_metrics);
}
}
void RoutingTable::RemoveOutgoingLinks(Node& node) {
while (Link* link = node.outgoing_links.PopFront()) {
link_metrics_.erase(FullLinkLabel{link->metrics.from(), link->metrics.to(),
link->metrics.link_label()});
}
}
RoutingTable::SelectedLinks RoutingTable::BuildForwardingTable() {
OVERNET_TRACE(DEBUG) << "Rebuilding forwarding table";
auto node_it = node_metrics_.find(root_node_);
if (node_it == node_metrics_.end()) {
OVERNET_TRACE(DEBUG) << "No root known";
return SelectedLinks(); // Root node as yet unknown.
}
++path_finding_run_;
node_it->second.last_path_finding_run = path_finding_run_;
node_it->second.best_rtt = TimeDelta::Zero();
node_it->second.mss = std::numeric_limits<uint32_t>::max();
InternalList<Node, &Node::path_finding_node> todo;
auto enqueue = [&todo](Node* node) {
if (node->queued)
return;
node->queued = true;
todo.PushBack(node);
};
enqueue(&node_it->second);
while (!todo.Empty()) {
Node* src = todo.PopFront();
OVERNET_TRACE(DEBUG) << "VISIT: " << src->metrics;
src->queued = false;
for (auto link : src->outgoing_links) {
OVERNET_TRACE(DEBUG) << "VISIT_LINK: " << link->metrics;
if (link->metrics.version() == METRIC_VERSION_TOMBSTONE)
continue;
TimeDelta rtt =
src->best_rtt + src->metrics.forwarding_time() + link->metrics.rtt();
Node* dst = link->to_node;
// For now we order by RTT.
if (dst->last_path_finding_run != path_finding_run_ ||
dst->best_rtt > rtt) {
dst->last_path_finding_run = path_finding_run_;
dst->best_rtt = rtt;
dst->best_from = src;
dst->best_link = link;
dst->mss = std::min(src->mss, link->metrics.mss());
enqueue(dst);
}
}
}
SelectedLinks selected_links;
for (node_it = node_metrics_.begin(); node_it != node_metrics_.end();
++node_it) {
if (node_it->second.last_path_finding_run != path_finding_run_) {
continue; // Unreachable
}
if (node_it->first == root_node_) {
continue;
}
Node* n = &node_it->second;
while (n->best_from->metrics.node_id() != root_node_) {
n = n->best_from;
}
Link* link = n->best_link;
assert(link->metrics.from() == root_node_);
selected_links[node_it->first] =
SelectedLink{link->metrics.link_label(), n->mss};
}
return selected_links;
}
namespace {
enum class Key : uint64_t {
NODE = 1,
LINK = 2,
VERSION = 3,
FORWARDING_TIME_US = 10,
DESCRIPTION = 11,
LABEL = 49,
BW_LINK = 50,
BW_USED = 51,
RTT = 52,
MSS = 53,
};
}
std::pair<Slice, uint64_t> RoutingTable::Write(
Border desired_border, Optional<NodeId> exclude_node) const {
struct Segment {
Segment(Key key, NodeId id)
: key(key),
length(id.wire_length()),
write([id](uint8_t* p) { id.Write(p); }) {}
Segment(Key key, std::pair<NodeId, NodeId> p)
: key(key),
length(p.first.wire_length() + p.second.wire_length()),
write([p](uint8_t* out) { p.second.Write(p.first.Write(out)); }) {}
Segment(Key key, uint64_t x)
: key(key),
length(varint::WireSizeFor(x)),
write([x, length = length](uint8_t* p) {
varint::Write(x, length, p);
}) {}
Segment(Key key, Slice value)
: key(key),
length(value.length()),
write([value = std::move(value)](uint8_t* p) {
memcpy(p, value.begin(), value.length());
}) {}
Key key;
uint8_t key_length = varint::WireSizeFor(static_cast<uint64_t>(key));
uint32_t length;
uint8_t length_length = varint::WireSizeFor(length);
std::function<void(uint8_t*)> write;
};
std::vector<Segment> segments;
uint64_t version;
{
std::unordered_set<NodeId> version_zero_nodes;
std::lock_guard<std::mutex> mutex(shared_table_mu_);
version = gossip_version_;
for (const auto& m : shared_node_metrics_) {
if (m.version() == 0) {
version_zero_nodes.insert(m.node_id());
continue;
}
if (m.node_id() == exclude_node)
continue;
OVERNET_TRACE(DEBUG) << "WRITE: " << m;
segments.emplace_back(Key::NODE, m.node_id());
segments.emplace_back(Key::VERSION, m.version());
if (m.forwarding_time() != TimeDelta::PositiveInf()) {
segments.emplace_back(Key::FORWARDING_TIME_US,
m.forwarding_time().as_us());
}
if (m.description().length() != 0) {
segments.emplace_back(Key::DESCRIPTION, m.description());
}
}
for (const auto& m : shared_link_metrics_) {
if (m.from() == exclude_node || version_zero_nodes.count(m.from()) > 0 ||
version_zero_nodes.count(m.to()) > 0)
continue;
OVERNET_TRACE(DEBUG) << "WRITE: " << m;
segments.emplace_back(Key::LINK, std::make_pair(m.from(), m.to()));
segments.emplace_back(Key::VERSION, m.version());
segments.emplace_back(Key::LABEL, m.link_label());
if (m.bw_link() != Bandwidth::Zero()) {
segments.emplace_back(Key::BW_LINK, m.bw_link().bits_per_second());
}
if (m.bw_used() != Bandwidth::Zero()) {
segments.emplace_back(Key::BW_USED, m.bw_used().bits_per_second());
}
if (m.rtt() != TimeDelta::PositiveInf()) {
segments.emplace_back(Key::RTT, m.rtt().as_us());
}
if (m.mss() != std::numeric_limits<uint32_t>::max()) {
segments.emplace_back(Key::MSS, m.mss());
}
}
}
size_t slice_length = 0;
for (const auto& seg : segments) {
slice_length += seg.key_length;
slice_length += seg.length_length;
slice_length += seg.length;
}
return std::make_pair(
Slice::WithInitializerAndBorders(
slice_length, desired_border,
[&](uint8_t* out) {
for (const auto& seg : segments) {
out = varint::Write(static_cast<uint64_t>(seg.key),
seg.key_length, out);
out = varint::Write(seg.length, seg.length_length, out);
seg.write(out);
out += seg.length;
}
}),
version);
}
Status RoutingTable::Parse(Slice update, std::vector<NodeMetrics>* node_metrics,
std::vector<LinkMetrics>* link_metrics) const {
const uint8_t* p = update.begin();
const uint8_t* const end = update.end();
NodeMetrics* adding_node = nullptr;
LinkMetrics* adding_link = nullptr;
uint64_t key;
uint64_t length;
auto read_node = [&](const char* what) -> StatusOr<NodeId> {
uint64_t id;
if (!ParseLE64(&p, end, &id)) {
return StatusOr<NodeId>(StatusCode::FAILED_PRECONDITION, what);
}
return NodeId(id);
};
auto read_uint64 = [&](const char* what) -> StatusOr<uint64_t> {
uint64_t x;
if (!varint::Read(&p, end, &x)) {
return StatusOr<uint64_t>(StatusCode::FAILED_PRECONDITION, what);
}
return x;
};
auto read_slice = [&](const char* what) -> Slice {
auto out = update.FromPointer(p).ToOffset(length);
p += length;
return out;
};
auto read_bandwidth = [&](const char* what) -> StatusOr<Bandwidth> {
return read_uint64(what).Then([](uint64_t x) {
return StatusOr<Bandwidth>(Bandwidth::FromBitsPerSecond(x));
});
};
auto read_time_delta = [&](const char* what) -> StatusOr<TimeDelta> {
return read_uint64(what).Then([](uint64_t x) {
return StatusOr<TimeDelta>(TimeDelta::FromMicroseconds(x));
});
};
auto read_uint32 = [&](const char* what) -> StatusOr<uint32_t> {
return read_uint64(what).Then([](uint64_t x) {
if (x > std::numeric_limits<uint32_t>::max()) {
return StatusOr<uint32_t>(StatusCode::FAILED_PRECONDITION,
"Out of range");
}
return StatusOr<uint32_t>(static_cast<uint32_t>(x));
});
};
auto read_node_pair =
[&](const char* what) -> StatusOr<std::pair<NodeId, NodeId>> {
return read_node(what).Then([&](NodeId n) {
return read_node(what).Then(
[n](NodeId m) -> StatusOr<std::pair<NodeId, NodeId>> {
return std::make_pair(n, m);
});
});
};
auto with_link = [&adding_link](auto setter) {
return [&adding_link, setter](auto x) {
if (!adding_link)
return Status(StatusCode::FAILED_PRECONDITION,
"Property only on links");
(adding_link->*setter)(x);
return Status::Ok();
};
};
auto with_node = [&adding_node](auto setter) {
return [&adding_node, setter](auto x) {
if (!adding_node)
return Status(StatusCode::FAILED_PRECONDITION,
"Property only on nodes");
(adding_node->*setter)(x);
return Status::Ok();
};
};
while (p != end) {
if (!varint::Read(&p, end, &key) || !varint::Read(&p, end, &length)) {
return Status(StatusCode::FAILED_PRECONDITION,
"Failed to read segment header");
}
if (uint64_t(end - p) < length) {
return Status(StatusCode::FAILED_PRECONDITION, "Short segment");
}
Status status = Status::Ok();
const uint8_t* const next = p + length;
switch (static_cast<Key>(key)) {
case Key::NODE:
status = read_node("Node::Id").Then([&](NodeId n) {
adding_link = nullptr;
node_metrics->emplace_back();
adding_node = &node_metrics->back();
adding_node->set_node_id(n);
return Status::Ok();
});
break;
case Key::LINK:
status = read_node_pair("Link::Id").Then([&](auto p) {
adding_node = nullptr;
link_metrics->emplace_back();
adding_link = &link_metrics->back();
adding_link->set_from_to(p.first, p.second);
return Status::Ok();
});
break;
case Key::VERSION:
status = read_uint64("Version").Then([&](uint64_t v) {
if (v == 0) {
return Status(StatusCode::INVALID_ARGUMENT,
"Zero version number is not allowed");
}
if (adding_link)
adding_link->set_version(v);
if (adding_node)
adding_node->set_version(v);
return Status::Ok();
});
break;
case Key::FORWARDING_TIME_US:
status = read_time_delta("ForwardingTime")
.Then(with_node(&NodeMetrics::set_forwarding_time));
break;
case Key::DESCRIPTION:
status =
with_node(&NodeMetrics::set_description)(read_slice("Description"));
break;
case Key::LABEL:
status =
read_uint64("Label").Then(with_link(&LinkMetrics::set_link_label));
break;
case Key::BW_LINK:
status =
read_bandwidth("BWLink").Then(with_link(&LinkMetrics::set_bw_link));
break;
case Key::BW_USED:
status =
read_bandwidth("BWUsed").Then(with_link(&LinkMetrics::set_bw_link));
break;
case Key::RTT:
status = read_time_delta("RTT").Then(with_link(&LinkMetrics::set_rtt));
break;
case Key::MSS:
status = read_uint32("MSS").Then(with_link(&LinkMetrics::set_mss));
break;
default:
// Unknown field: skip it.
OVERNET_TRACE(DEBUG) << "Skipping unknown routing table key " << key;
p = next;
break;
}
if (status.is_error()) {
return status;
}
if (p != next) {
return Status(StatusCode::FAILED_PRECONDITION,
"Length mismatch reading segment");
}
p = next;
}
// Verify everythings ok
for (const auto& m : *node_metrics) {
if (m.version() == 0) {
return Status(StatusCode::INVALID_ARGUMENT,
"Bad node metric: no version number");
}
}
for (const auto& m : *link_metrics) {
if (m.version() == 0) {
return Status(StatusCode::INVALID_ARGUMENT,
"Bad link metric: no version number");
}
}
return Status::Ok();
}
Status RoutingTable::Read(Slice update) {
OVERNET_TRACE(DEBUG) << "READ: " << update;
std::vector<NodeMetrics> node_metrics;
std::vector<LinkMetrics> link_metrics;
return Parse(std::move(update), &node_metrics, &link_metrics).Then([&] {
Update(std::move(node_metrics), std::move(link_metrics), true);
return Status::Ok();
});
}
std::ostream& operator<<(std::ostream& out, const LinkMetrics& m) {
return out << "LINK{" << m.from() << "->" << m.to() << "/" << m.link_label()
<< " @ " << m.version() << "; bw_link=" << m.bw_link()
<< ", bw_used=" << m.bw_used() << ", rtt=" << m.rtt()
<< ", mss=" << m.mss() << "}";
}
std::ostream& operator<<(std::ostream& out, const NodeMetrics& m) {
return out << "NODE{" << m.node_id() << " @ " << m.version()
<< "; forwarding_time=" << m.forwarding_time()
<< ", description=" << m.description() << "}";
}
} // namespace overnet