blob: 06d89845200e339a9f4832e8ee9b9ddf19d096c4 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/connectivity/overnet/lib/embedded/omdp_nub.h"
#include <net/if.h>
#include "garnet/public/lib/fostr/fidl/fuchsia/overnet/omdp/formatting.h"
#include "src/connectivity/overnet/lib/omdp/omdp.h"
#include "src/connectivity/overnet/lib/vocabulary/ip_addr.h"
#include "src/connectivity/overnet/lib/vocabulary/socket.h"
namespace overnet {
std::random_device rng_dev;
static const size_t kMaximumPacketSize = 1400;
OmdpNub::OmdpNub(BasicOvernetEmbedded* root, UdpNub* udp_nub)
: BasicOvernetEmbedded::Actor(root),
Omdp(root->node_id().get(), root->timer(), []() { return rng_dev(); }),
reactor_(root->reactor()),
udp_nub_(udp_nub),
timer_(root->timer()) {}
OmdpNub::~OmdpNub() = default;
Status OmdpNub::Start() {
return incoming_.Create(AF_INET6, SOCK_DGRAM, 0)
.Then([&] { return incoming_.SetOptReusePort(true); })
.Then([&] {
auto addr = *IpAddr::AnyIpv6().WithPort(kMulticastGroupAddr.port());
OVERNET_TRACE(DEBUG) << "Bind incoming OMDP socket to: " << addr;
return incoming_.Bind(addr);
})
.Then([&] {
return incoming_.SetOpt(
IPPROTO_IPV6, IPV6_JOIN_GROUP,
ipv6_mreq{kMulticastGroupAddr.ipv6.sin6_addr, 0});
})
.Then([&] {
ScheduleBroadcast();
WaitForInbound();
return Status::Ok();
});
}
void OmdpNub::Broadcast(Slice data) {
auto& socket = *udp_nub_->socket();
auto* interfaces = if_nameindex();
if (interfaces == nullptr) {
OVERNET_TRACE(WARNING) << "Failed to retrieve interface list";
return;
}
OVERNET_TRACE(DEBUG) << "BROADCAST " << data << " to " << kMulticastGroupAddr;
std::vector<Status> failures;
bool any_successes = false;
for (size_t i = 0;
interfaces[i].if_index != 0 || interfaces[i].if_name != nullptr; i++) {
if (auto status =
socket
.SetOpt(IPPROTO_IPV6, IPV6_MULTICAST_IF, interfaces[i].if_index)
.Then(
[&] { return socket.SendTo(data, 0, kMulticastGroupAddr); })
.WithLazyContext([&] {
std::ostringstream out;
out << "On " << interfaces[i].if_name;
return out.str();
});
status.is_ok()) {
any_successes = true;
} else {
failures.emplace_back(std::move(status));
}
}
if (!any_successes) {
for (const auto& status : failures) {
OVERNET_TRACE(WARNING) << "Omdp broadcast failed: " << status;
}
}
if_freenameindex(interfaces);
}
void OmdpNub::OnNewNode(uint64_t node_id, IpAddr addr) {
udp_nub_->Initiate({addr}, NodeId(node_id));
}
void OmdpNub::WaitForInbound() {
reactor_->OnRead(incoming_.get(),
[this](const Status& status) { InboundReady(status); });
}
void OmdpNub::InboundReady(const Status& status) {
OVERNET_TRACE(DEBUG) << "InboundReady gets status: " << status;
if (status.is_error()) {
return;
}
auto data_and_addr = incoming_.RecvFrom(kMaximumPacketSize, 0);
if (data_and_addr.is_error()) {
OVERNET_TRACE(ERROR) << data_and_addr.AsStatus();
// Wait a bit before trying again to avoid spamming the log.
timer_->At(timer_->Now() + TimeDelta::FromSeconds(10),
[this] { WaitForInbound(); });
return;
}
ScopedOp scoped_op(Op::New(overnet::OpType::INCOMING_PACKET));
OVERNET_TRACE(TRACE) << "Got omdp packet " << data_and_addr->data << " from "
<< data_and_addr->addr;
if (auto status = Process(std::move(data_and_addr->addr),
std::move(data_and_addr->data));
status.is_error()) {
OVERNET_TRACE(ERROR) << "Omdp process failed: " << status;
}
WaitForInbound();
}
} // namespace overnet