blob: db97b18c4972d748c1a610080d776741b9887366 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef SRC_CONNECTIVITY_OVERNET_OVERNETSTACK_UDP_NUB_H_
#define SRC_CONNECTIVITY_OVERNET_OVERNETSTACK_UDP_NUB_H_
#include <lib/async-loop/cpp/loop.h>
#include <lib/async/cpp/task.h>
#include <lib/fsl/tasks/fd_waiter.h>
#include <netinet/in.h>
#include <poll.h>
#include <sys/socket.h>
#include "src/connectivity/overnet/lib/links/packet_nub.h"
#include "src/connectivity/overnet/lib/vocabulary/ip_addr.h"
#include "src/connectivity/overnet/lib/vocabulary/socket.h"
#include "src/connectivity/overnet/overnetstack/overnet_app.h"
namespace overnetstack {
static constexpr uint32_t kAssumedUDPPacketSize = 1500;
using UdpNubBase = overnet::PacketNub<overnet::IpAddr, kAssumedUDPPacketSize,
overnet::HashIpAddr, overnet::EqIpAddr>;
class UdpNub final : public UdpNubBase, public OvernetApp::Actor {
public:
explicit UdpNub(OvernetApp* app)
: UdpNubBase(app->endpoint()),
endpoint_(app->endpoint()),
timer_(app->timer()) {}
overnet::Status Start() override {
return CreateFD()
.Then([this]() { return SetOptionSharePort(); })
.Then([this]() { return SetOptionReceiveAnything(); })
.Then([this]() { return Bind(); })
.Then([this]() {
WaitForInbound();
return overnet::Status::Ok();
});
}
uint16_t port() { return port_; }
overnet::NodeId node_id() { return endpoint_->node_id(); }
void SendTo(overnet::IpAddr addr, overnet::Slice slice) override {
OVERNET_TRACE(TRACE) << "sending packet " << slice << " to " << addr;
if (auto status = socket_.SendTo(std::move(slice), 0, *addr.AsIpv6());
status.is_error()) {
OVERNET_TRACE(WARNING) << "sendto fails: " << status;
}
}
void Publish(overnet::LinkPtr<> link) override {
overnet::NodeId node = overnet::NodeId(link->GetLinkStatus().to);
OVERNET_TRACE(DEBUG) << "NewLink: " << node << "\n";
endpoint_->RegisterPeer(node);
endpoint_->RegisterLink(std::move(link));
}
overnet::Socket* socket() { return &socket_; }
private:
overnet::RouterEndpoint* const endpoint_;
overnet::Timer* const timer_;
overnet::Socket socket_;
uint16_t port_ = -1;
fsl::FDWaiter fd_waiter_;
void WaitForInbound() {
assert(socket_.IsValid());
if (!fd_waiter_.Wait(
[this](zx_status_t status, uint32_t events) {
InboundReady(status, events);
},
socket_.get(), POLLIN)) {
OVERNET_TRACE(DEBUG) << "fd_waiter_.Wait() failed\n";
}
}
void InboundReady(zx_status_t status, uint32_t events) {
auto now = timer_->Now();
auto data_and_addr = socket_.RecvFrom(kAssumedUDPPacketSize, 0);
if (data_and_addr.is_error()) {
OVERNET_TRACE(ERROR) << data_and_addr.AsStatus();
// Wait a bit before trying again to avoid spamming the log.
async::PostDelayedTask(
async_get_default_dispatcher(), [this]() { WaitForInbound(); },
zx::sec(10));
return;
}
overnet::ScopedOp scoped_op(
overnet::Op::New(overnet::OpType::INCOMING_PACKET));
OVERNET_TRACE(TRACE) << "Got packet " << data_and_addr->data << " from "
<< data_and_addr->addr;
Process(now, data_and_addr->addr, std::move(data_and_addr->data));
WaitForInbound();
}
overnet::Status CreateFD() {
socket_ = overnet::Socket(::socket(AF_INET6, SOCK_DGRAM, 0));
if (!socket_.IsValid()) {
return StatusFromErrno("Failed to create socket");
}
return overnet::Status::Ok();
}
overnet::Status SetOptionSharePort() { return socket_.SetOptReusePort(true); }
overnet::Status SetOptionReceiveAnything() {
return overnet::Status::Ok();
// return SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, 0, "IPV6_ONLY");
}
overnet::Status Bind() {
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_addr = in6addr_any;
int result =
bind(socket_.get(), reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
if (result < 0) {
return StatusFromErrno("Failed to bind() to in6addr_any");
}
socklen_t len = sizeof(addr);
result =
getsockname(socket_.get(), reinterpret_cast<sockaddr*>(&addr), &len);
if (result < 0) {
return StatusFromErrno("Failed to getsockname() for new socket");
}
port_ = ntohs(addr.sin6_port);
return overnet::Status::Ok();
}
overnet::Status StatusFromErrno(const std::string& why) {
int err = errno;
std::ostringstream msg;
msg << why << ", errno=" << err;
// TODO(ctiller): Choose an appropriate status code based upon errno?
return overnet::Status(overnet::StatusCode::UNKNOWN, msg.str());
}
};
} // namespace overnetstack
#endif // SRC_CONNECTIVITY_OVERNET_OVERNETSTACK_UDP_NUB_H_