blob: cc25348a31cc53657b549c7de54fc1066ec8e495 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#pragma once
#include <tuple>
#include <vector>
#include "node_id.h"
#include "reliability_and_ordering.h"
#include "seq_num.h"
#include "status.h"
#include "stream_id.h"
#include "varint.h"
namespace overnet {
// Routing headers are passed over links between nodes in a (potentially)
// non-private way. They should expose a minimal amount of information to route
// a message to the correct destination.
//
// A routing header contains source and (potentially multiple) destination
// information - multiple in the case of multicasting.
//
// Additionally it specifies control vs payload - allowing two channels per
// stream, one being intended only for control information.
//
// Finally it specifies the reliability/ordering data. This is redundant
// information (as it should be known by each node at the endpoint of the stream
// - and indeed should be verified there), but can be used by intermediatories
// to provide better back-pressure behavior.
class RoutingHeader {
public:
// A single destination for a message... a triplet of:
// - dst - the destination node
// - stream_id - which stream is this message for
// - seq - the sequence number of this message within its stream
class Destination {
friend class RoutingHeader;
public:
Destination(NodeId dst, StreamId stream_id, SeqNum seq)
: dst_(dst), stream_id_(stream_id), seq_(seq) {}
NodeId dst() const { return dst_; }
StreamId stream_id() const { return stream_id_; }
SeqNum seq() const { return seq_; }
friend bool operator==(const Destination& a, const Destination& b) {
return std::tie(a.dst_, a.stream_id_, a.seq_) ==
std::tie(b.dst_, b.stream_id_, b.seq_);
}
private:
NodeId dst_;
StreamId stream_id_;
SeqNum seq_;
};
// Since this object is complicated to write, we do it in two steps:
// - a Writer object is constructed, which can be used to measure the length
// of the eventually written bytes
// - the Writer object is used to generate the bytes for the wire
// In doing so we give the caller the flexibility to *not write* should the
// measured length be too long.
class Writer {
public:
explicit Writer(const RoutingHeader* hdr, NodeId writer, NodeId target);
Writer(const Writer&) = delete;
Writer& operator=(const Writer&) = delete;
size_t wire_length() const { return wire_length_; }
uint8_t* Write(uint8_t* bytes) const;
private:
bool IsLocal() const { return (flags_value_ & 1) != 0; }
const RoutingHeader* const hdr_;
const uint64_t flags_value_;
const uint8_t flags_length_;
const uint8_t payload_length_length_;
struct Destination {
uint8_t stream_len;
};
// TODO(ctiller): the expected length of this vector is one, avoid this
// allocation
std::vector<Destination> dsts_;
size_t wire_length_;
};
// Since these objects are potentially expensive to copy, we disable the
// implicit copy operations to avoid doing so by mistake
RoutingHeader(const RoutingHeader&) = delete;
RoutingHeader& operator=(const RoutingHeader&) = delete;
// Move construction
RoutingHeader(RoutingHeader&& other)
: src_(other.src_),
is_control_(other.is_control_),
reliability_and_ordering_(other.reliability_and_ordering_),
dsts_(std::move(other.dsts_)),
payload_length_(other.payload_length_) {}
// Payload message header constructor
explicit RoutingHeader(NodeId src, uint64_t payload_length,
ReliabilityAndOrdering reliability_and_ordering)
: src_(src),
is_control_(false),
reliability_and_ordering_(reliability_and_ordering),
payload_length_(payload_length) {}
// Control message header constructor
enum ControlMessage { CONTROL_MESSAGE };
explicit RoutingHeader(NodeId src, uint64_t payload_length, ControlMessage)
: src_(src),
is_control_(true),
reliability_and_ordering_(ReliabilityAndOrdering::ReliableOrdered),
payload_length_(payload_length) {}
static StatusOr<RoutingHeader> Parse(const uint8_t** bytes,
const uint8_t* end, NodeId reader,
NodeId writer);
RoutingHeader& AddDestination(NodeId peer, StreamId stream, SeqNum seq) {
dsts_.emplace_back(peer, stream, seq);
return *this;
}
NodeId src() const { return src_; }
uint64_t payload_length() const { return payload_length_; }
bool is_control() const { return is_control_; }
ReliabilityAndOrdering reliability_and_ordering() const {
return reliability_and_ordering_;
}
// Return a new RoutingHeader with a different set of destinations (but
// otherwise equal)
RoutingHeader WithDestinations(std::vector<Destination> dsts) const {
return RoutingHeader(src_, is_control_, reliability_and_ordering_,
std::move(dsts), payload_length_);
}
const std::vector<Destination>& destinations() const { return dsts_; }
friend bool operator==(const RoutingHeader& a, const RoutingHeader& b) {
return std::tie(a.src_, a.is_control_, a.reliability_and_ordering_,
a.payload_length_, a.dsts_) ==
std::tie(b.src_, b.is_control_, b.reliability_and_ordering_,
b.payload_length_, b.dsts_);
}
private:
RoutingHeader(NodeId src, bool is_control,
ReliabilityAndOrdering reliability_and_ordering,
std::vector<Destination> dsts, uint64_t payload_length)
: src_(src),
is_control_(is_control),
reliability_and_ordering_(reliability_and_ordering),
dsts_(std::move(dsts)),
payload_length_(payload_length) {}
NodeId src_;
bool is_control_;
ReliabilityAndOrdering reliability_and_ordering_;
// TODO(ctiller): small vector optimization
std::vector<Destination> dsts_;
uint64_t payload_length_ = 0;
// Flags format:
// bit 0: is_local -- is this a single destination message whos src is
// this node and whos dst is the peer we're sending
// to?
// bit 1: channel - 1 -> control channel, 0 -> payload channel
// bits 2,3,4: reliability/ordering mode (must be 0 for control channel)
// bits 5: reserved (must be zero)
// bit 6...: destination count
static constexpr uint64_t kFlagIsLocal = 1;
static constexpr uint64_t kFlagIsControl = 2;
static constexpr uint64_t kFlagReservedMask = 32;
static constexpr uint64_t kFlagsReliabilityAndOrderingShift = 2;
static constexpr uint64_t kFlagsDestinationCountShift = 6;
// all reliability and orderings must fit within this mask
static constexpr uint64_t kReliabilityAndOrderingMask = 0x07;
uint64_t DeriveFlags(NodeId writer, NodeId target) const;
};
std::ostream& operator<<(std::ostream& out, const RoutingHeader& h);
} // namespace overnet
namespace std {
template <>
struct hash<overnet::NodeId> {
size_t operator()(overnet::NodeId id) const { return id.Hash(); }
};
template <>
struct hash<overnet::StreamId> {
size_t operator()(overnet::StreamId id) const { return id.Hash(); }
};
} // namespace std