blob: 3132f64a8a1cfdfb53206b64cfc1668427b4182a [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#pragma once
#include <unordered_map>
#include "broadcast_sink.h"
#include "callback.h"
#include "routing_header.h"
#include "sink.h"
#include "slice.h"
namespace overnet {
namespace router_impl {
struct LocalStreamId {
NodeId peer;
StreamId stream_id;
uint64_t Hash() const { return peer.Hash() ^ stream_id.Hash(); }
};
inline bool operator==(const LocalStreamId& lhs, const LocalStreamId& rhs) {
return lhs.peer == rhs.peer && lhs.stream_id == rhs.stream_id;
}
} // namespace router_impl
} // namespace overnet
namespace std {
template <>
struct hash<overnet::router_impl::LocalStreamId> {
size_t operator()(const overnet::router_impl::LocalStreamId& id) const {
return id.Hash();
}
};
} // namespace std
namespace overnet {
struct Message final {
RoutingHeader routing_header;
StatusOrCallback<Sink<Chunk>*> ready_for_data;
};
class Link {
public:
virtual ~Link() {}
virtual void Forward(Message message) = 0;
};
class Router final {
public:
class StreamHandler {
public:
virtual ~StreamHandler() {}
virtual void HandleMessage(
SeqNum seq, uint64_t payload_length, bool is_control,
ReliabilityAndOrdering reliability_and_ordering,
StatusOrCallback<Sink<Chunk>*> ready_for_data) = 0;
};
Router(NodeId node_id) : node_id_(node_id) {}
// Forward a message to either ourselves or a link
void Forward(Message message);
// Register a (locally handled) stream into this Router
Status RegisterStream(NodeId peer, StreamId stream_id,
StreamHandler* stream_handler);
// Register a link to another router (usually on a different machine)
Status RegisterLink(NodeId peer, Link* link);
NodeId node_id() const { return node_id_; }
private:
const NodeId node_id_;
class StreamHolder {
public:
void HandleMessage(SeqNum seq, uint64_t payload_length, bool is_control,
ReliabilityAndOrdering reliability_and_ordering,
StatusOrCallback<Sink<Chunk>*> ready_for_data);
Status SetHandler(StreamHandler* handler);
private:
struct Pending {
SeqNum seq;
uint64_t length;
bool is_control;
ReliabilityAndOrdering reliability_and_ordering;
StatusOrCallback<Sink<Chunk>*> ready_for_data;
};
StreamHandler* handler_ = nullptr;
std::vector<Pending> pending_;
};
class LinkHolder {
public:
void Forward(Message message);
void SetLink(Link* link);
Link* link() { return link_; }
private:
Link* link_ = nullptr;
std::vector<Message> pending_;
};
typedef router_impl::LocalStreamId LocalStreamId;
std::unordered_map<LocalStreamId, StreamHolder> streams_;
std::unordered_map<NodeId, LinkHolder> links_;
};
} // namespace overnet