blob: 5602a428bcf789d64e585b6268bd245d1194cceb [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "router.h"
#include <iostream>
namespace overnet {
static constexpr TimeDelta kPollLinkChangeTimeout =
TimeDelta::FromMilliseconds(100);
Router::~Router() { shutting_down_ = true; }
void Router::Close(Callback<void> quiesced) {
OVERNET_TRACE(DEBUG, trace_sink_) << "Close";
shutting_down_ = true;
poll_link_changes_timeout_.Reset();
flush_old_nodes_timeout_.Reset();
links_.clear();
CloseLinks(std::move(quiesced));
}
void Router::CloseLinks(Callback<void> quiesced) {
OVERNET_TRACE(DEBUG, trace_sink_)
<< "CloseLinks remaining=" << owned_links_.size();
if (owned_links_.empty()) {
CloseStreams(std::move(quiesced));
return;
}
auto it = owned_links_.begin();
auto p = it->second.release();
owned_links_.erase(it);
p->Close(Callback<void>(ALLOCATED_CALLBACK,
[this, p, quiesced = std::move(quiesced)]() mutable {
delete p;
CloseLinks(std::move(quiesced));
}));
}
void Router::CloseStreams(Callback<void> quiesced) {
OVERNET_TRACE(DEBUG, trace_sink_)
<< "CloseStreams remaining=" << streams_.size();
if (streams_.empty()) {
OVERNET_TRACE(DEBUG, trace_sink_) << "Closed";
return;
}
auto it = streams_.begin();
auto id = it->first;
it->second.Close(Callback<void>(
ALLOCATED_CALLBACK, [this, id, quiesced = std::move(quiesced)]() mutable {
auto it = streams_.find(id);
if (it != streams_.end()) {
assert(!it->second.has_handler());
streams_.erase(it);
}
assert(streams_.count(id) == 0);
CloseStreams(std::move(quiesced));
}));
}
void Router::Forward(Message message) {
OVERNET_TRACE(DEBUG, trace_sink_) << "Forward " << message.header;
if (shutting_down_) {
return;
}
assert(!message.make_payload.empty());
// There are three primary cases we care about here, that can be discriminated
// based on the destination count of the message:
// 1. If there are zero destinations, this is a malformed message (fail).
// 2. If there is one destination, forward the message on.
// 3. If there are multiple destinations, broadcast this message to all
// destinations.
// We separate 2 & 3 as the single forwarding case can be made
// (much) more efficient.
switch (message.header.destinations().size()) {
case 0:
// Malformed message, bail
// TODO(ctiller): Log error: Routing header must have at least one
// destination
break;
case 1: {
// Single destination... it could be either a local stream or need to be
// forwarded to a remote node over some link.
const RoutableMessage::Destination& dst =
message.header.destinations()[0];
if (dst.dst() == node_id_) {
streams_[LocalStreamId{message.header.src(), dst.stream_id()}]
.HandleMessage(dst.seq(), message.received,
message.make_payload(LazySliceArgs{
0, std::numeric_limits<uint32_t>::max()}));
} else {
link_holder(dst.dst())->Forward(std::move(message));
}
} break;
default: {
// Multiple destination:
// - Handle local streams directly.
// - For remote forwarding:
// 1. If we know the next hop, and that next hop is used for multiple of
// our destinations, keep the multicast group together for that set.
// 2. Separate the multicast if next hops are different.
// 3. Separate the multicast if we do not know about next hops yet.
std::unordered_map<Link*, std::vector<RoutableMessage::Destination>>
group_forward;
std::vector<std::pair<RoutableMessage::Destination, LinkHolder*>>
disconnected_holders;
Optional<std::pair<RoutableMessage::Destination, StreamHolder*>>
handle_locally;
uint32_t overall_mss = std::numeric_limits<uint32_t>::max();
for (const auto& dst : message.header.destinations()) {
if (dst.dst() == node_id_) {
// Locally handled stream
if (!handle_locally.has_value()) {
handle_locally = std::make_pair(
dst, &streams_[LocalStreamId{message.header.src(),
dst.stream_id()}]);
}
} else {
// Remote destination
LinkHolder* h = link_holder(dst.dst());
if (h->link() == nullptr) {
// We don't know the next link, ask the LinkHolder to forward (which
// will continue forwarding the message when we know the next hop).
disconnected_holders.push_back(std::make_pair(dst, h));
} else {
// We know the next link: gather destinations together by link so
// that we can (hopefully) keep multicast groups together
group_forward[h->link()].emplace_back(dst);
overall_mss = std::min(overall_mss, h->path_mss());
}
}
}
Slice payload = message.make_payload(LazySliceArgs{0, overall_mss});
// Forward any grouped messages now that we've examined all destinations
for (auto& grp : group_forward) {
grp.first->Forward(Message::SimpleForwarder(
message.header.WithDestinations(std::move(grp.second)), payload,
message.received));
}
for (auto& lh : disconnected_holders) {
lh.second->Forward(Message::SimpleForwarder(
message.header.WithDestinations({lh.first}), payload,
message.received));
}
if (handle_locally.has_value()) {
handle_locally->second->HandleMessage(
handle_locally->first.seq(), message.received, std::move(payload));
}
} break;
}
}
void Router::UpdateRoutingTable(std::vector<NodeMetrics> node_metrics,
std::vector<LinkMetrics> link_metrics,
bool flush_old_nodes) {
routing_table_.Update(std::move(node_metrics), std::move(link_metrics),
flush_old_nodes);
MaybeStartPollingLinkChanges();
}
void Router::MaybeStartPollingLinkChanges() {
if (shutting_down_ || poll_link_changes_timeout_)
return;
poll_link_changes_timeout_.Reset(
timer_, timer_->Now() + kPollLinkChangeTimeout,
[this](const Status& status) {
if (status.is_ok()) {
poll_link_changes_timeout_.Reset();
const bool keep_polling = !routing_table_.PollLinkUpdates(
[this](const RoutingTable::SelectedLinks& selected_links) {
// Clear routing information for now unreachable links.
for (auto& lnk : links_) {
if (selected_links.count(lnk.first) == 0) {
lnk.second.SetLink(nullptr, 0);
}
}
// Set routing information for other links.
for (const auto& sl : selected_links) {
OVERNET_TRACE(INFO, trace_sink_)
<< "Select: " << sl.first << " " << sl.second.link_id
<< " (route_mss=" << sl.second.route_mss << ")";
auto it = owned_links_.find(sl.second.link_id);
link_holder(sl.first)->SetLink(
it == owned_links_.end() ? nullptr : it->second.get(),
sl.second.route_mss);
}
MaybeStartFlushingOldEntries();
});
if (keep_polling)
MaybeStartPollingLinkChanges();
}
});
}
void Router::MaybeStartFlushingOldEntries() {
if (flush_old_nodes_timeout_)
return;
flush_old_nodes_timeout_.Reset(timer_,
timer_->Now() + routing_table_.EntryExpiry(),
[this](const Status& status) {
if (status.is_ok()) {
flush_old_nodes_timeout_.Reset();
UpdateRoutingTable({}, {}, true);
}
});
}
Status Router::RegisterStream(NodeId peer, StreamId stream_id,
StreamHandler* stream_handler) {
OVERNET_TRACE(DEBUG, trace_sink_) << "RegisterStream: " << peer << "/"
<< stream_id << " at " << stream_handler;
return streams_[LocalStreamId{peer, stream_id}].SetHandler(stream_handler);
}
Status Router::UnregisterStream(NodeId peer, StreamId stream_id,
StreamHandler* stream_handler) {
OVERNET_TRACE(DEBUG, trace_sink_) << "UnregisterStream: " << peer << "/"
<< stream_id << " at " << stream_handler;
auto it = streams_.find(LocalStreamId{peer, stream_id});
if (it == streams_.end()) {
return Status(StatusCode::FAILED_PRECONDITION, "Stream not registered");
}
Status status = it->second.ClearHandler(stream_handler);
streams_.erase(it);
return status;
}
void Router::RegisterLink(LinkPtr<> link) {
const auto& metrics = link->GetLinkMetrics();
owned_links_.emplace(metrics.link_label(), std::move(link));
UpdateRoutingTable({NodeMetrics(metrics.to(), 0)}, {metrics}, false);
}
void Router::StreamHolder::HandleMessage(SeqNum seq, TimeStamp received,
Slice payload) {
if (handler_ == nullptr) {
pending_.emplace_back(Pending{seq, received, std::move(payload)});
} else {
handler_->HandleMessage(seq, received, std::move(payload));
}
}
Status Router::StreamHolder::SetHandler(StreamHandler* handler) {
if (handler_ != nullptr) {
return Status(StatusCode::FAILED_PRECONDITION, "Handler already set");
}
handler_ = handler;
std::vector<Pending> pending;
pending.swap(pending_);
for (auto& p : pending) {
handler_->HandleMessage(p.seq, p.received, std::move(p.payload));
}
return Status::Ok();
}
Status Router::StreamHolder::ClearHandler(StreamHandler* handler) {
if (handler_ != handler) {
return Status(StatusCode::FAILED_PRECONDITION, "Invalid clear handler");
}
handler_ = nullptr;
return Status::Ok();
}
void Router::LinkHolder::Forward(Message message) {
if (link_ == nullptr) {
OVERNET_TRACE(DEBUG, trace_sink_) << "Queue: " << message.header;
pending_.emplace_back(std::move(message));
} else {
link_->Forward(std::move(message));
}
}
void Router::LinkHolder::SetLink(Link* link, uint32_t path_mss) {
link_ = link;
std::vector<Message> pending;
pending.swap(pending_);
for (auto& p : pending) {
link_->Forward(std::move(p));
}
}
} // namespace overnet