blob: 3718ef845cdf60b26d59d2adbff13fc8bd9a125e [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <array>
#include <tuple>
#include "garnet/lib/overnet/endpoint/router_endpoint.h"
#include "garnet/lib/overnet/links/packet_link.h"
#include "garnet/lib/overnet/protocol/fidl.h"
#include "garnet/lib/overnet/testing/test_timer.h"
#include "garnet/lib/overnet/testing/trace_cout.h"
#include "gtest/gtest.h"
//////////////////////////////////////////////////////////////////////////////
// Two node fling
namespace overnet {
namespace router_endpoint2node {
static const bool kTraceEverything = false;
struct LinkState {
uint64_t id;
int outstanding_packets;
};
class DeliverySimulator {
public:
virtual ~DeliverySimulator() {}
// Returns Nothing if the slice should be dropped, or a delay if it should be
// delivered.
virtual Optional<TimeDelta> ChoosePacketDelivery(LinkState link_state,
size_t slice_size) const = 0;
virtual std::string name() = 0;
virtual Bandwidth SimulatedBandwidth() const = 0;
};
// Very fast reliable packet delivery: it's often easier to debug problems
// with HappyDelivery than with packet loss enabled (assuming it shows up).
class HappyDelivery : public DeliverySimulator {
public:
Optional<TimeDelta> ChoosePacketDelivery(LinkState link_state,
size_t slice_size) const override {
return TimeDelta::FromMicroseconds(1);
}
std::string name() override { return "HappyDelivery"; }
Bandwidth SimulatedBandwidth() const override {
return Bandwidth::FromKilobitsPerSecond(1024 * 1024);
}
};
// Windowed number of outstanding packets
class WindowedDelivery : public DeliverySimulator {
public:
WindowedDelivery(int max_outstanding, TimeDelta window)
: max_outstanding_(max_outstanding), window_(window) {}
Optional<TimeDelta> ChoosePacketDelivery(LinkState link_state,
size_t slice_size) const override {
if (link_state.outstanding_packets >= max_outstanding_)
return Nothing;
return window_;
}
std::string name() override {
std::ostringstream out;
out << "WindowedDelivery{max:" << max_outstanding_ << " over " << window_
<< "}";
return out.str();
}
Bandwidth SimulatedBandwidth() const override {
return Bandwidth::BytesPerTime(256 * max_outstanding_, window_);
}
private:
const int max_outstanding_;
const TimeDelta window_;
};
class InProcessLinkImpl final
: public PacketLink,
public std::enable_shared_from_this<InProcessLinkImpl> {
public:
InProcessLinkImpl(RouterEndpoint* src, RouterEndpoint* dest, uint64_t link_id,
const DeliverySimulator* simulator)
: PacketLink(src, dest->node_id(), 256, link_id),
timer_(dest->timer()),
link_id_(link_id),
simulator_(simulator),
from_(src->node_id()) {
src->RegisterPeer(dest->node_id());
}
~InProcessLinkImpl() {
auto strong_partner = partner_.lock();
if (strong_partner != nullptr) {
strong_partner->partner_.reset();
}
}
void Partner(std::shared_ptr<InProcessLinkImpl> other) {
partner_ = other;
other->partner_ = shared_from_this();
}
void Emit(Slice packet) {
auto delay = simulator_->ChoosePacketDelivery(
LinkState{link_id_, outstanding_packets_}, packet.length());
OVERNET_TRACE(DEBUG) << "Packet sim says " << delay << " for " << packet;
if (!delay.has_value()) {
return;
}
outstanding_packets_++;
timer_->At(
timer_->Now() + *delay,
Callback<void>(ALLOCATED_CALLBACK, [self = shared_from_this(),
now = timer_->Now(), packet]() {
ScopedOp scoped_op(Op::New(OpType::INCOMING_PACKET));
auto strong_partner = self->partner_.lock();
OVERNET_TRACE(DEBUG)
<< (strong_partner == nullptr ? "DROP" : "EMIT")
<< " PACKET from " << self->from_ << " " << packet;
self->outstanding_packets_--;
if (strong_partner) {
strong_partner->Process(now, packet);
}
}));
}
private:
Timer* const timer_;
const uint64_t link_id_;
const DeliverySimulator* const simulator_;
std::weak_ptr<InProcessLinkImpl> partner_;
const NodeId from_;
int outstanding_packets_ = 0;
};
class InProcessLink final : public Link {
public:
InProcessLink(RouterEndpoint* src, RouterEndpoint* dest, uint64_t link_id,
const DeliverySimulator* simulator)
: impl_(new InProcessLinkImpl(src, dest, link_id, simulator)) {}
std::shared_ptr<InProcessLinkImpl> get() { return impl_; }
void Close(Callback<void> quiesced) {
impl_->Close(Callback<void>(
ALLOCATED_CALLBACK,
[this, quiesced = std::move(quiesced)]() mutable { impl_.reset(); }));
}
void Forward(Message message) { impl_->Forward(std::move(message)); }
fuchsia::overnet::protocol::LinkMetrics GetLinkMetrics() {
return impl_->GetLinkMetrics();
}
private:
std::shared_ptr<InProcessLinkImpl> impl_;
};
class Env {
public:
virtual ~Env() {}
virtual TimeDelta AllowedTime(uint64_t length) const = 0;
virtual RouterEndpoint* endpoint1() = 0;
virtual RouterEndpoint* endpoint2() = 0;
void AwaitConnected() {
while (!endpoint1()->HasRouteTo(endpoint2()->node_id()) ||
!endpoint2()->HasRouteTo(endpoint1()->node_id())) {
endpoint1()->BlockUntilNoBackgroundUpdatesProcessing();
endpoint2()->BlockUntilNoBackgroundUpdatesProcessing();
test_timer_.StepUntilNextEvent();
}
}
void FlushTodo(
std::function<bool()> until,
TimeStamp deadline = TimeStamp::AfterEpoch(TimeDelta::FromHours(1))) {
bool stepped = false;
while (test_timer_.Now() < deadline + TimeDelta::FromHours(1)) {
if (until())
break;
if (!test_timer_.StepUntilNextEvent())
break;
stepped = true;
}
if (!stepped) {
test_timer_.Step(TimeDelta::FromMilliseconds(1).as_us());
}
ASSERT_LT(test_timer_.Now(), deadline);
}
Timer* timer() { return &test_timer_; }
private:
TestTimer test_timer_;
TraceCout trace_cout_{&test_timer_};
ScopedRenderer scoped_renderer{&trace_cout_};
ScopedSeverity scoped_severity{kTraceEverything ? Severity::DEBUG
: Severity::INFO};
};
class TwoNode final : public Env {
public:
TwoNode(const DeliverySimulator* simulator, uint64_t node_id_1,
uint64_t node_id_2)
: simulator_(simulator) {
endpoint1_ = new RouterEndpoint(timer(), NodeId(node_id_1), false);
endpoint2_ = new RouterEndpoint(timer(), NodeId(node_id_2), false);
auto link1 = MakeLink<InProcessLink>(endpoint1_, endpoint2_, 1, simulator);
auto link2 = MakeLink<InProcessLink>(endpoint2_, endpoint1_, 2, simulator);
link1->get()->Partner(link2->get());
endpoint1_->RegisterLink(std::move(link1));
endpoint2_->RegisterLink(std::move(link2));
}
virtual ~TwoNode() {
if (!testing::Test::HasFailure()) {
bool done = false;
endpoint1_->Close(Callback<void>(ALLOCATED_CALLBACK, [&done, this]() {
endpoint2_->Close(Callback<void>(ALLOCATED_CALLBACK, [&done, this]() {
delete endpoint1_;
delete endpoint2_;
done = true;
}));
}));
FlushTodo([&done] { return done; });
}
}
TimeDelta AllowedTime(uint64_t data_length) const override {
// TODO(ctiller): make this just
// 'simulator_->SimulatedBandwidth().SendTimeForBytes(data_length)'
return TimeDelta::FromSeconds(1) +
simulator_->SimulatedBandwidth().SendTimeForBytes(3 * data_length);
}
RouterEndpoint* endpoint1() override { return endpoint1_; }
RouterEndpoint* endpoint2() override { return endpoint2_; }
private:
const DeliverySimulator* const simulator_;
RouterEndpoint* endpoint1_;
RouterEndpoint* endpoint2_;
};
class ThreeNode final : public Env {
public:
ThreeNode(const DeliverySimulator* simulator, uint64_t node_id_1,
uint64_t node_id_2, uint64_t node_id_h)
: simulator_(simulator) {
endpoint1_ = new RouterEndpoint(timer(), NodeId(node_id_1), false);
endpointH_ = new RouterEndpoint(timer(), NodeId(node_id_h), false);
endpoint2_ = new RouterEndpoint(timer(), NodeId(node_id_2), false);
auto link1H = MakeLink<InProcessLink>(endpoint1_, endpointH_, 1, simulator);
auto linkH1 = MakeLink<InProcessLink>(endpointH_, endpoint1_, 2, simulator);
auto link2H = MakeLink<InProcessLink>(endpoint2_, endpointH_, 3, simulator);
auto linkH2 = MakeLink<InProcessLink>(endpointH_, endpoint2_, 4, simulator);
link1H->get()->Partner(linkH1->get());
link2H->get()->Partner(linkH2->get());
endpoint1_->RegisterLink(std::move(link1H));
endpoint2_->RegisterLink(std::move(link2H));
endpointH_->RegisterLink(std::move(linkH1));
endpointH_->RegisterLink(std::move(linkH2));
}
virtual ~ThreeNode() {
if (!testing::Test::HasFailure()) {
bool done = false;
endpointH_->Close(Callback<void>(ALLOCATED_CALLBACK, [this, &done]() {
endpoint1_->Close(Callback<void>(ALLOCATED_CALLBACK, [this, &done]() {
endpoint2_->Close(Callback<void>(ALLOCATED_CALLBACK, [this, &done]() {
delete endpoint1_;
delete endpoint2_;
delete endpointH_;
done = true;
}));
}));
}));
FlushTodo([&done] { return done; });
}
}
TimeDelta AllowedTime(uint64_t data_length) const override {
// TODO(ctiller): make this just
// 'simulator_->SimulatedBandwidth().SendTimeForBytes(data_length)'
return TimeDelta::FromSeconds(4) +
simulator_->SimulatedBandwidth().SendTimeForBytes(5 * data_length);
}
RouterEndpoint* endpoint1() override { return endpoint1_; }
RouterEndpoint* endpoint2() override { return endpoint2_; }
private:
const DeliverySimulator* const simulator_;
RouterEndpoint* endpoint1_;
RouterEndpoint* endpointH_;
RouterEndpoint* endpoint2_;
};
class MakeEnvInterface {
public:
virtual const char* name() const = 0;
virtual std::shared_ptr<Env> Make() const = 0;
};
using MakeEnv = std::shared_ptr<MakeEnvInterface>;
template <class T, class... Arg>
MakeEnv MakeMakeEnv(const char* name, Arg&&... args) {
class Impl final : public MakeEnvInterface {
public:
Impl(const char* name, std::tuple<Arg...> args)
: name_(name), args_(args) {}
const char* name() const { return name_.c_str(); }
std::shared_ptr<Env> Make() const {
return std::apply(
[](Arg... args) { return std::make_shared<T>(args...); }, args_);
}
private:
const std::string name_;
const std::tuple<Arg...> args_;
};
return MakeEnv(
new Impl(name, std::tuple<Arg...>(std::forward<Arg>(args)...)));
}
static const auto kSimulators = [] {
std::vector<std::unique_ptr<DeliverySimulator>> out;
out.emplace_back(new HappyDelivery());
out.emplace_back(new WindowedDelivery(3, TimeDelta::FromMilliseconds(3)));
return out;
}();
template <class Env, int N>
void AddVariations(const char* base_name, std::vector<MakeEnv>* envs) {
for (const auto& simulator : kSimulators) {
std::array<int, N> ary;
for (int i = 0; i < N; i++) {
ary[i] = i + 1;
}
do {
std::ostringstream name;
name << base_name;
for (auto i : ary)
name << i;
name << ":";
name << simulator->name();
envs->emplace_back(std::apply(
[&name, simulator = simulator.get()](auto... args) {
return MakeMakeEnv<Env>(
name.str().c_str(),
std::forward<DeliverySimulator* const>(simulator),
std::forward<int>(args)...);
},
ary));
} while (std::next_permutation(ary.begin(), ary.end()));
}
}
const auto kEnvVariations = [] {
std::vector<MakeEnv> envs;
AddVariations<TwoNode, 2>("TwoNode", &envs);
AddVariations<ThreeNode, 3>("ThreeNode", &envs);
return envs;
}();
class RouterEndpoint_IntegrationEnv : public ::testing::TestWithParam<MakeEnv> {
};
std::ostream& operator<<(std::ostream& out, MakeEnv env) {
return out << env->name();
}
TEST_P(RouterEndpoint_IntegrationEnv, NoOp) {
std::cout << "Param: " << GetParam() << std::endl;
GetParam()->Make()->AwaitConnected();
}
TEST_P(RouterEndpoint_IntegrationEnv, NodeDescriptionPropagation) {
std::cout << "Param: " << GetParam() << std::endl;
auto env = GetParam()->Make();
fuchsia::overnet::protocol::PeerDescription desc;
desc.mutable_services()->push_back("#ff00ff");
env->endpoint1()->SetDescription(std::move(desc));
env->AwaitConnected();
auto start_wait = env->timer()->Now();
auto idle_time_done = [&] {
return env->timer()->Now() - start_wait >= TimeDelta::FromSeconds(5);
};
while (!idle_time_done()) {
env->FlushTodo(idle_time_done);
}
bool found = false;
env->endpoint2()->ForEachNodeMetric(
[env = env.get(),
&found](const fuchsia::overnet::protocol::NodeMetrics& m) {
fuchsia::overnet::protocol::PeerDescription want_desc;
want_desc.mutable_services()->push_back("#ff00ff");
if (m.label()->id == env->endpoint1()->node_id()) {
found = true;
EXPECT_EQ(*m.description(), want_desc);
}
});
EXPECT_TRUE(found);
}
INSTANTIATE_TEST_CASE_P(RouterEndpoint_IntegrationEnv_Instance,
RouterEndpoint_IntegrationEnv,
::testing::ValuesIn(kEnvVariations.begin(),
kEnvVariations.end()));
struct OneMessageArgs {
MakeEnv make_env;
Slice body;
};
std::unique_ptr<fuchsia::overnet::protocol::Introduction> AbcIntro() {
fuchsia::overnet::protocol::Introduction intro;
intro.set_service_name("abc");
return std::make_unique<fuchsia::overnet::protocol::Introduction>(
std::move(intro));
}
std::ostream& operator<<(std::ostream& out, OneMessageArgs args) {
return out << "env=" << args.make_env->name() << " body=" << args.body;
}
class RouterEndpoint_OneMessageIntegration
: public ::testing::TestWithParam<OneMessageArgs> {};
TEST_P(RouterEndpoint_OneMessageIntegration, Works) {
std::cout << "Param: " << GetParam() << std::endl;
auto env = GetParam().make_env->Make();
const TimeDelta kAllowedTime = env->AllowedTime(
Encode(AbcIntro().get())->length() + GetParam().body.length());
std::cout << "Allowed time for body: " << kAllowedTime << std::endl;
env->AwaitConnected();
auto got_pull_cb = std::make_shared<bool>(false);
auto got_push_cb = std::make_shared<bool>(false);
env->endpoint2()->RecvIntro(StatusOrCallback<
RouterEndpoint::ReceivedIntroduction>(
ALLOCATED_CALLBACK,
[this,
got_pull_cb](StatusOr<RouterEndpoint::ReceivedIntroduction>&& status) {
OVERNET_TRACE(INFO) << "ep2: recv_intro status=" << status.AsStatus();
ASSERT_TRUE(status.is_ok()) << status.AsStatus();
auto intro = std::move(*status);
EXPECT_EQ(*AbcIntro(), intro.introduction)
<< Encode(&intro.introduction);
auto stream =
MakeClosedPtr<RouterEndpoint::Stream>(std::move(intro.new_stream));
OVERNET_TRACE(INFO) << "ep2: start pull_all";
auto* op = new RouterEndpoint::ReceiveOp(stream.get());
OVERNET_TRACE(INFO) << "ep2: op=" << op;
op->PullAll(StatusOrCallback<Optional<std::vector<Slice>>>(
ALLOCATED_CALLBACK,
[this, got_pull_cb, stream{std::move(stream)},
op](const StatusOr<Optional<std::vector<Slice>>>& status) mutable {
OVERNET_TRACE(INFO)
<< "ep2: pull_all status=" << status.AsStatus();
EXPECT_TRUE(status.is_ok()) << status.AsStatus();
EXPECT_TRUE(status->has_value());
auto pull_text =
Slice::Join((*status)->begin(), (*status)->end());
EXPECT_EQ(GetParam().body, pull_text) << pull_text.AsStdString();
delete op;
*got_pull_cb = true;
}));
}));
ScopedOp scoped_op(Op::New(OpType::OUTGOING_REQUEST));
env->endpoint1()->SendIntro(
env->endpoint2()->node_id(),
fuchsia::overnet::protocol::ReliabilityAndOrdering::ReliableOrdered,
std::move(*AbcIntro()),
StatusOrCallback<RouterEndpoint::NewStream>(
ALLOCATED_CALLBACK,
[this,
got_push_cb](StatusOr<RouterEndpoint::NewStream> intro_status) {
OVERNET_TRACE(INFO) << "ep1: send_intro status=" << intro_status;
ASSERT_TRUE(intro_status.is_ok());
auto stream = MakeClosedPtr<RouterEndpoint::Stream>(
std::move(*intro_status.get()));
RouterEndpoint::SendOp(stream.get(), GetParam().body.length())
.Push(GetParam().body, Callback<void>::Ignored());
*got_push_cb = true;
}));
env->FlushTodo(
[got_pull_cb, got_push_cb]() { return *got_pull_cb && *got_push_cb; },
env->timer()->Now() + kAllowedTime);
ASSERT_TRUE(*got_push_cb);
ASSERT_TRUE(*got_pull_cb);
}
const std::vector<OneMessageArgs> kOneMessageArgTests = [] {
std::vector<OneMessageArgs> out;
const std::vector<size_t> kLengths = {1, 32, 1024, 32768, 1048576};
for (MakeEnv make_env : kEnvVariations) {
for (auto payload_length : kLengths) {
out.emplace_back(
OneMessageArgs{make_env, Slice::RepeatedChar(payload_length, 'a')});
}
}
return out;
}();
INSTANTIATE_TEST_CASE_P(RouterEndpoint_OneMessageIntegration_Instance,
RouterEndpoint_OneMessageIntegration,
::testing::ValuesIn(kOneMessageArgTests.begin(),
kOneMessageArgTests.end()));
} // namespace router_endpoint2node
} // namespace overnet