blob: a0b5aa5a68070d3c07384453b951cc71644ef251 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "router_endpoint.h"
#include <iostream>
#include <memory>
#include "fork_frame.h"
#include "linearizer.h"
namespace overnet {
namespace {
template <class T>
void ReadAllAndParse(Source<Slice>* src, StatusOrCallback<T> ready) {
src->PullAll(StatusOrCallback<std::vector<Slice>>(
ALLOCATED_CALLBACK,
[ready{std::move(ready)}](StatusOr<std::vector<Slice>>&& status) mutable {
if (status.is_error()) {
ready(status.AsStatus());
return;
}
ready(T::Parse(Slice::Join(status->begin(), status->end())));
}));
}
} // namespace
RouterEndpoint::RouterEndpoint(Timer* timer, NodeId node_id,
bool allow_threading)
: router_(timer, node_id, allow_threading) {}
void RouterEndpoint::RegisterPeer(NodeId peer) {
assert(peer != router_.node_id());
if (connection_streams_.count(peer) != 0) return;
connection_streams_.emplace(std::piecewise_construct,
std::forward_as_tuple(peer),
std::forward_as_tuple(this, peer));
}
RouterEndpoint::Stream::Stream(NewStream introduction)
: DatagramStream(&introduction.creator_->router_, introduction.peer_,
introduction.reliability_and_ordering_,
introduction.stream_id_) {}
RouterEndpoint::ConnectionStream::ConnectionStream(RouterEndpoint* endpoint,
NodeId peer)
: DatagramStream(&endpoint->router_, peer,
ReliabilityAndOrdering::ReliableUnordered, StreamId(0)),
endpoint_(endpoint),
next_stream_id_(peer < endpoint->node_id() ? 2 : 1) {
BeginRead();
}
RouterEndpoint::ConnectionStream::~ConnectionStream() {
if (fork_read_state_ == ForkReadState::Reading) {
fork_read_->Close(Status::Cancelled());
}
assert(fork_read_state_ == ForkReadState::Stopped);
}
void RouterEndpoint::ConnectionStream::BeginRead() {
fork_read_state_ = ForkReadState::Reading;
fork_read_.Init(this);
fork_read_->PullAll(StatusOrCallback<std::vector<Slice>>(
[this](StatusOr<std::vector<Slice>>&& read_status) {
assert(fork_read_state_ == ForkReadState::Reading);
if (read_status.is_error()) {
fork_read_state_ = ForkReadState::Stopped;
Close(read_status.AsStatus());
return;
}
auto fork_frame_status = ForkFrame::Parse(
Slice::Join(read_status->begin(), read_status->end()));
if (fork_frame_status.is_error()) {
fork_read_state_ = ForkReadState::Stopped;
Close(fork_frame_status.AsStatus());
return;
}
fork_frame_.Init(std::move(*fork_frame_status));
endpoint_->incoming_forks_.PushBack(this);
fork_read_.Destroy();
fork_read_state_ = ForkReadState::Waiting;
if (this == endpoint_->incoming_forks_.Front()) {
endpoint_->MaybeContinueIncomingForks();
}
}));
}
StatusOr<RouterEndpoint::NewStream> RouterEndpoint::SendIntro(
NodeId peer, ReliabilityAndOrdering reliability_and_ordering,
Slice introduction) {
auto it = connection_streams_.find(peer);
if (it == connection_streams_.end()) {
return StatusOr<NewStream>(StatusCode::FAILED_PRECONDITION,
"Remote peer not registered with this endpoint");
}
return it->second.Fork(reliability_and_ordering, std::move(introduction));
}
StatusOr<RouterEndpoint::NewStream> RouterEndpoint::ConnectionStream::Fork(
ReliabilityAndOrdering reliability_and_ordering, Slice introduction) {
StreamId id(next_stream_id_);
next_stream_id_ += 2;
Slice payload =
ForkFrame(id, reliability_and_ordering, std::move(introduction)).Write();
// TODO(ctiller): Don't allocate.
auto* send_op = new SendOp(this, payload.length());
send_op->Push(payload, StatusCallback([send_op](const Status& status) {
send_op->Close(status, [send_op]() { delete send_op; });
}));
return NewStream{endpoint_, peer(), reliability_and_ordering, id};
}
void RouterEndpoint::RecvIntro(StatusOrCallback<ReceivedIntroduction> ready) {
recv_intro_ready_ = std::move(ready);
MaybeContinueIncomingForks();
}
void RouterEndpoint::MaybeContinueIncomingForks() {
if (recv_intro_ready_.empty() || incoming_forks_.Empty()) return;
auto* incoming_fork = incoming_forks_.Front();
incoming_forks_.Remove(incoming_fork);
assert(incoming_fork->fork_read_state_ ==
ConnectionStream::ForkReadState::Waiting);
recv_intro_ready_(ReceivedIntroduction{
NewStream{this, incoming_fork->peer(),
incoming_fork->fork_frame_->reliability_and_ordering(),
incoming_fork->fork_frame_->stream_id()},
incoming_fork->fork_frame_->introduction()});
incoming_fork->fork_frame_.Destroy();
incoming_fork->BeginRead();
}
} // namespace overnet