blob: b60835cdbcd02ddccb339dd15ef72f5d9bd1a482 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "router.h"
#include <iostream>
namespace overnet {
static constexpr TimeDelta kPollLinkChangeTimeout =
TimeDelta::FromMilliseconds(100);
void Router::Forward(Message message) {
assert(!message.done.empty());
// There are three primary cases we care about here, that can be discriminated
// based on the destination count of the message:
// 1. If there are zero destinations, this is a malformed message (fail).
// 2. If there is one destination, forward the message on.
// 3. If there are multiple destinations, broadcast this message to all
// destinations.
// We separate 2 & 3 as the single forwarding case can be made
// (much) more efficient.
switch (message.wire.destinations().size()) {
case 0:
// Malformed message, bail
// TODO(ctiller): Log error: Routing header must have at least one
// destination
message.done(Status(StatusCode::INVALID_ARGUMENT,
"Routing header must have at least one destination"));
break;
case 1: {
// Single destination... it could be either a local stream or need to be
// forwarded to a remote node over some link.
const RoutableMessage::Destination& dst = message.wire.destinations()[0];
if (dst.dst() == node_id_) {
streams_[LocalStreamId{message.wire.src(), dst.stream_id()}]
.HandleMessage(dst.seq(), message.received,
std::move(*message.wire.mutable_payload()),
std::move(message.done));
} else {
links_[dst.dst()].Forward(std::move(message));
}
} break;
default: {
// Multiple destination:
// - Handle local streams directly.
// - For remote forwarding:
// 1. If we know the next hop, and that next hop is used for multiple of
// our destinations, keep the multicast group together for that set.
// 2. Separate the multicast if next hops are different.
// 3. Separate the multicast if we do not know about next hops yet.
std::unordered_map<Link*, std::vector<RoutableMessage::Destination>>
group_forward;
class Broadcaster {
public:
Broadcaster(StatusCallback cb) : cb_(std::move(cb)) {}
StatusCallback AddCallback() {
++n_;
return [this](const Status& status) {
if (!cb_.empty() && !status.is_ok()) {
cb_(status);
}
Step();
};
}
void Step() {
if (--n_ == 0) {
if (!cb_.empty()) cb_(Status::Ok());
delete this;
}
}
private:
int n_ = 1;
StatusCallback cb_;
};
Broadcaster* b = new Broadcaster(std::move(message.done));
for (const auto& dst : message.wire.destinations()) {
if (dst.dst() == node_id_) {
// Locally handled stream
streams_[LocalStreamId{message.wire.src(), dst.stream_id()}]
.HandleMessage(dst.seq(), message.received,
message.wire.payload(), b->AddCallback());
} else {
// Remote destination
LinkHolder& h = links_[dst.dst()];
if (h.link() == nullptr) {
// We don't know the next link, ask the LinkHolder to forward (which
// will continue forwarding the message when we know the next hop).
h.Forward(Message{message.wire.WithDestinations({dst}),
message.received, b->AddCallback()});
} else {
// We know the next link: gather destinations together by link so
// that we can (hopefully) keep multicast groups together
group_forward[h.link()].emplace_back(dst);
}
}
}
// Forward any grouped messages now that we've examined all destinations
for (auto& grp : group_forward) {
grp.first->Forward(
Message{message.wire.WithDestinations(std::move(grp.second)),
message.received, b->AddCallback()});
}
b->Step();
} break;
}
}
void Router::UpdateRoutingTable(std::vector<NodeMetrics> node_metrics,
std::vector<LinkMetrics> link_metrics,
bool flush_old_nodes) {
routing_table_.Update(std::move(node_metrics), std::move(link_metrics),
flush_old_nodes);
MaybeStartPollingLinkChanges();
}
void Router::MaybeStartPollingLinkChanges() {
if (poll_link_changes_timeout_) return;
poll_link_changes_timeout_.Reset(
timer_, timer_->Now() + kPollLinkChangeTimeout,
[this](const Status& status) {
poll_link_changes_timeout_.Reset();
if (status.is_ok()) {
const bool keep_polling = !routing_table_.PollLinkUpdates(
[this](const RoutingTable::SelectedLinks& selected_links) {
// Clear routing information for now unreachable links.
for (auto& lnk : links_) {
if (selected_links.count(lnk.first) == 0) {
lnk.second.SetLink(nullptr);
}
}
// Set routing information for other links.
for (const auto& sl : selected_links) {
std::cout << "Select: " << sl.first << " " << sl.second
<< "\n";
auto it = owned_links_.find(sl.second);
links_[sl.first].SetLink(
it == owned_links_.end() ? nullptr : it->second.get());
}
MaybeStartFlushingOldEntries();
});
if (keep_polling) MaybeStartPollingLinkChanges();
}
});
}
void Router::MaybeStartFlushingOldEntries() {
if (flush_old_nodes_timeout_) return;
flush_old_nodes_timeout_.Reset(timer_,
timer_->Now() + routing_table_.EntryExpiry(),
[this](const Status& status) {
flush_old_nodes_timeout_.Reset();
if (status.is_ok()) {
UpdateRoutingTable({}, {}, true);
}
});
}
Status Router::RegisterStream(NodeId peer, StreamId stream_id,
StreamHandler* stream_handler) {
return streams_[LocalStreamId{peer, stream_id}].SetHandler(stream_handler);
}
void Router::RegisterLink(std::unique_ptr<Link> link) {
const auto& metrics = link->GetLinkMetrics();
owned_links_.emplace(metrics.link_label(), std::move(link));
UpdateRoutingTable({NodeMetrics(metrics.to(), 0)}, {metrics}, false);
}
void Router::StreamHolder::HandleMessage(Optional<SeqNum> seq,
TimeStamp received, Slice data,
StatusCallback done) {
assert(!done.empty());
if (handler_ == nullptr) {
pending_.emplace_back(
Pending{seq, received, std::move(data), std::move(done)});
} else {
handler_->HandleMessage(seq, received, std::move(data), std::move(done));
}
}
Status Router::StreamHolder::SetHandler(StreamHandler* handler) {
if (handler_ != nullptr) {
return Status(StatusCode::FAILED_PRECONDITION, "Handler already set");
}
handler_ = handler;
std::vector<Pending> pending;
pending.swap(pending_);
for (auto& p : pending) {
handler_->HandleMessage(p.seq, p.received, std::move(p.data),
std::move(p.done));
}
return Status::Ok();
}
void Router::LinkHolder::Forward(Message message) {
assert(!message.done.empty());
if (link_ == nullptr) {
pending_.emplace_back(std::move(message));
} else {
link_->Forward(std::move(message));
}
}
void Router::LinkHolder::SetLink(Link* link) {
link_ = link;
std::vector<Message> pending;
pending.swap(pending_);
for (auto& p : pending) {
link_->Forward(std::move(p));
}
}
} // namespace overnet