blob: 062ad907dffda5e1210f4582a064979182da7795 [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/async-loop/cpp/loop.h>
#include <lib/fpromise/promise.h>
#include <lib/fpromise/single_threaded_executor.h>
#include <lib/sync/cpp/completion.h>
#include <thread>
#include <gtest/gtest.h>
#include <sdk/lib/async_patterns/cpp/callback.h>
#include <sdk/lib/async_patterns/cpp/receiver.h>
#include "src/lib/testing/predicates/status.h"
namespace {
struct Owner {
public:
explicit Owner(async_dispatcher_t* dispatcher, int& count)
: receiver_{this, dispatcher}, count_{count} {}
void OnCallback(int arg) {
EXPECT_EQ(arg, 42);
count_++;
}
int OnCallbackWithCounter(int arg) {
EXPECT_EQ(arg, 42);
count_++;
return count_;
}
async_patterns::Callback<void(int)> GetCallback() { return receiver_.Once(&Owner::OnCallback); }
async_patterns::Callback<int(int)> GetCallbackWithCounter() {
return receiver_.Once(&Owner::OnCallbackWithCounter);
}
void OnCallbackMoveOnly(std::unique_ptr<int> arg) {
EXPECT_EQ(*arg, 42);
count_++;
}
async_patterns::Callback<void(std::unique_ptr<int>)> GetCallbackMoveOnly() {
return receiver_.Once(&Owner::OnCallbackMoveOnly);
}
private:
async_patterns::Receiver<Owner> receiver_;
int& count_;
};
TEST(Receiver, Receive) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
int count = 0;
Owner owner{loop.dispatcher(), count};
async_patterns::Callback<void(int)> callback = owner.GetCallback();
EXPECT_EQ(count, 0);
callback(42);
EXPECT_EQ(count, 0);
ASSERT_OK(loop.RunUntilIdle());
EXPECT_EQ(count, 1);
}
TEST(Receiver, ReceiveMoveOnly) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
int count = 0;
Owner owner{loop.dispatcher(), count};
async_patterns::Callback callback = owner.GetCallbackMoveOnly();
EXPECT_EQ(count, 0);
callback(std::make_unique<int>(42));
EXPECT_EQ(count, 0);
ASSERT_OK(loop.RunUntilIdle());
EXPECT_EQ(count, 1);
}
TEST(Receiver, ReceiveConvertToFitCallback) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
int count = 0;
Owner owner{loop.dispatcher(), count};
async_patterns::Callback<void(int)> callback = owner.GetCallback();
fit::callback<void(int)> fit_callback = std::move(callback).ignore_result();
EXPECT_EQ(count, 0);
fit_callback(42);
EXPECT_EQ(count, 0);
ASSERT_OK(loop.RunUntilIdle());
EXPECT_EQ(count, 1);
}
TEST(Receiver, OnceToVoidPromise) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
int count = 0;
Owner owner{loop.dispatcher(), count};
async_patterns::Callback<void(int)> callback = owner.GetCallback();
// Fire-and-forget
{
EXPECT_EQ(count, 0);
callback(42);
EXPECT_EQ(count, 0);
ASSERT_OK(loop.RunUntilIdle());
EXPECT_EQ(count, 1);
}
// Get promise.
{
count = 0;
callback = owner.GetCallback();
EXPECT_EQ(count, 0);
fpromise::promise<> promise = callback(42).promise();
EXPECT_EQ(count, 0);
fpromise::single_threaded_executor executor;
bool called = false;
executor.schedule_task(promise.and_then([&called]() { called = true; }));
ASSERT_OK(loop.RunUntilIdle());
ASSERT_FALSE(called);
executor.run();
ASSERT_TRUE(called);
EXPECT_EQ(count, 1);
}
}
TEST(Receiver, OnceToPromise) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
int count = 0;
Owner owner{loop.dispatcher(), count};
async_patterns::Callback<int(int)> callback = owner.GetCallbackWithCounter();
// Fire-and-forget
{
EXPECT_EQ(count, 0);
callback(42);
EXPECT_EQ(count, 0);
ASSERT_OK(loop.RunUntilIdle());
EXPECT_EQ(count, 1);
}
// Get promise.
{
count = 0;
callback = owner.GetCallbackWithCounter();
EXPECT_EQ(count, 0);
fpromise::promise<int> promise = callback(42).promise();
EXPECT_EQ(count, 0);
fpromise::single_threaded_executor executor;
bool called = false;
executor.schedule_task(promise.and_then([&called](int& count) {
called = true;
ASSERT_EQ(count, 1);
}));
ASSERT_OK(loop.RunUntilIdle());
ASSERT_FALSE(called);
executor.run();
ASSERT_TRUE(called);
EXPECT_EQ(count, 1);
}
}
TEST(Receiver, CannotReceiveAfterDestruction) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
int count = 0;
std::optional<Owner> owner_store;
owner_store.emplace(loop.dispatcher(), count);
Owner& owner = *owner_store;
async_patterns::Callback<void(int)> callback = owner.GetCallback();
EXPECT_EQ(count, 0);
callback(42);
EXPECT_EQ(count, 0);
owner_store.reset();
ASSERT_OK(loop.RunUntilIdle());
EXPECT_EQ(count, 0);
}
TEST(Receiver, CannotReceiveAfterDispatcherShutdown) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
int count = 0;
Owner owner{loop.dispatcher(), count};
async_patterns::Callback<void(int)> callback = owner.GetCallback();
EXPECT_EQ(count, 0);
callback(42);
EXPECT_EQ(count, 0);
loop.Shutdown();
EXPECT_EQ(count, 0);
}
TEST(Receiver, CannotReceiveAfterDispatcherShutdownAndBothGoAway) {
std::unique_ptr loop = std::make_unique<async::Loop>(&kAsyncLoopConfigNeverAttachToThread);
int count = 0;
std::unique_ptr owner = std::make_unique<Owner>(loop->dispatcher(), count);
async_patterns::Callback<void(int)> callback = owner->GetCallback();
owner.reset();
loop.reset();
EXPECT_EQ(count, 0);
callback(42);
EXPECT_EQ(count, 0);
}
TEST(Receiver, CheckSynchronization) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
int count = 0;
Owner owner{loop.dispatcher(), count};
async_patterns::Callback<void(int)> callback = owner.GetCallback();
callback(42);
// If |Owner| lives on the main thread, we cannot dispatch tasks to it
// from an arbitrary thread.
std::thread([&] { ASSERT_DEATH(loop.RunUntilIdle(), "thread-unsafe"); }).join();
}
TEST(Receiver, BindLambda) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
struct Owner {
public:
explicit Owner(async_dispatcher_t* dispatcher) : receiver_{this, dispatcher} {}
async_patterns::Callback<void(int)> GetCallback() {
return receiver_.Once([](Owner* owner, int arg) {
EXPECT_EQ(arg, 42);
owner->count_++;
});
}
// Won't compile due to static assertion.
#if 0
async_patterns::Callback<int> GetCallbackWithStatefulLambda() {
return receiver_.Once([this](Owner* owner, int arg) {
EXPECT_EQ(arg, 42);
owner->count_++;
});
}
#endif
int count() const { return count_; }
private:
int count_ = 0;
async_patterns::Receiver<Owner> receiver_;
};
Owner owner{loop.dispatcher()};
EXPECT_EQ(owner.count(), 0);
async_patterns::Callback<void(int)> callback = owner.GetCallback();
EXPECT_EQ(owner.count(), 0);
callback(42);
EXPECT_EQ(owner.count(), 0);
ASSERT_OK(loop.RunUntilIdle());
EXPECT_EQ(owner.count(), 1);
}
class DiagnosticsExample {
public:
explicit DiagnosticsExample(async_dispatcher_t* dispatcher) : receiver_{this, dispatcher} {}
async_patterns::Function<void(std::string)> StreamLogs() {
return receiver_.Repeating(&DiagnosticsExample::OnLog);
}
async_patterns::Function<size_t(std::string)> StreamLogsWithCounter() {
return receiver_.Repeating(&DiagnosticsExample::OnLogWithCounter);
}
async_patterns::Function<void(std::string)> StreamLogsWithLambda() {
return receiver_.Repeating(
[](DiagnosticsExample* self, std::string log) { self->OnLog(std::move(log)); });
}
async_patterns::Function<void(std::unique_ptr<std::string>)> StreamUniqueLogs() {
return receiver_.Repeating(&DiagnosticsExample::OnUniqueLog);
}
async_patterns::Function<void(std::unique_ptr<std::string>)> StreamUniqueLogsWithLambda() {
return receiver_.Repeating([](DiagnosticsExample* self, std::unique_ptr<std::string> log) {
self->OnUniqueLog(std::move(log));
});
}
const std::vector<std::string>& logs() const { return logs_; }
private:
void OnLog(std::string log) { logs_.push_back(std::move(log)); }
size_t OnLogWithCounter(std::string log) {
logs_.push_back(std::move(log));
return logs_.size();
}
void OnUniqueLog(std::unique_ptr<std::string> log) { OnLog(std::move(*log)); }
async_patterns::Receiver<DiagnosticsExample> receiver_;
std::vector<std::string> logs_;
};
TEST(Receiver, RepeatingConstruction) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
DiagnosticsExample diag{loop.dispatcher()};
async_patterns::Function<void(std::string)> log = diag.StreamLogs();
static_assert(std::is_move_constructible_v<decltype(log)>);
static_assert(std::is_copy_constructible_v<decltype(log)>);
}
TEST(Receiver, Repeating) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
DiagnosticsExample diag{loop.dispatcher()};
async_patterns::Function<void(std::string)> log = diag.StreamLogs();
{
log(std::string{"hello"});
EXPECT_EQ(diag.logs().size(), 0u);
ASSERT_OK(loop.RunUntilIdle());
std::vector<std::string> expected{"hello"};
EXPECT_EQ(diag.logs(), expected);
}
// |Function| can be copied.
{
async_patterns::Function<void(std::string)> log2 = log;
log2(std::string{"world"});
log(std::string{"abc"});
ASSERT_OK(loop.RunUntilIdle());
std::vector<std::string> expected{"hello", "world", "abc"};
EXPECT_EQ(diag.logs(), expected);
}
}
TEST(Receiver, RepeatingMoveOnly) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
DiagnosticsExample diag{loop.dispatcher()};
async_patterns::Function<void(std::unique_ptr<std::string>)> log = diag.StreamUniqueLogs();
log(std::make_unique<std::string>("abc"));
ASSERT_OK(loop.RunUntilIdle());
std::vector<std::string> expected{"abc"};
EXPECT_EQ(diag.logs(), expected);
}
TEST(Receiver, RepeatingLambda) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
DiagnosticsExample diag{loop.dispatcher()};
async_patterns::Function<void(std::string)> log = diag.StreamLogsWithLambda();
log(std::string{"hello"});
EXPECT_EQ(diag.logs().size(), 0u);
ASSERT_OK(loop.RunUntilIdle());
std::vector<std::string> expected{"hello"};
EXPECT_EQ(diag.logs(), expected);
}
TEST(Receiver, RepeatingLambdaMoveOnly) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
DiagnosticsExample diag{loop.dispatcher()};
async_patterns::Function<void(std::unique_ptr<std::string>)> log =
diag.StreamUniqueLogsWithLambda();
log(std::make_unique<std::string>("hello"));
EXPECT_EQ(diag.logs().size(), 0u);
ASSERT_OK(loop.RunUntilIdle());
std::vector<std::string> expected{"hello"};
EXPECT_EQ(diag.logs(), expected);
}
TEST(Receiver, RepeatingConvertToFitFunction) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
DiagnosticsExample diag{loop.dispatcher()};
async_patterns::Function<void(std::string)> function = diag.StreamLogs();
fit::function<void(std::string)> fit_function = std::move(function).ignore_result();
fit_function(std::string("abc"));
EXPECT_EQ(diag.logs().size(), 0u);
ASSERT_OK(loop.RunUntilIdle());
std::vector<std::string> expected{"abc"};
EXPECT_EQ(diag.logs(), expected);
}
TEST(Receiver, RepeatingToVoidPromise) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
DiagnosticsExample diag{loop.dispatcher()};
async_patterns::Function<void(std::string)> function = diag.StreamLogs();
// Fire-and-forget.
{
function(std::string("abc"));
EXPECT_EQ(diag.logs().size(), 0u);
ASSERT_OK(loop.RunUntilIdle());
std::vector<std::string> expected{"abc"};
EXPECT_EQ(diag.logs(), expected);
}
// Get a promise.
{
fpromise::promise<> promise = function(std::string("def")).promise();
fpromise::single_threaded_executor executor;
bool called = false;
executor.schedule_task(promise.and_then([&called]() { called = true; }));
ASSERT_OK(loop.RunUntilIdle());
ASSERT_FALSE(called);
executor.run();
ASSERT_TRUE(called);
std::vector<std::string> expected{"abc", "def"};
EXPECT_EQ(diag.logs(), expected);
}
}
TEST(Receiver, RepeatingToPromise) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
DiagnosticsExample diag{loop.dispatcher()};
async_patterns::Function<size_t(std::string)> function = diag.StreamLogsWithCounter();
// Fire-and-forget.
{
function(std::string("abc"));
EXPECT_EQ(diag.logs().size(), 0u);
ASSERT_OK(loop.RunUntilIdle());
std::vector<std::string> expected{"abc"};
EXPECT_EQ(diag.logs(), expected);
}
// Get a promise.
{
fpromise::promise<size_t> promise = function(std::string("def")).promise();
fpromise::single_threaded_executor executor;
bool called = false;
executor.schedule_task(promise.and_then([&called](size_t& size) {
called = true;
ASSERT_EQ(size, 2u);
}));
ASSERT_OK(loop.RunUntilIdle());
ASSERT_FALSE(called);
executor.run();
ASSERT_TRUE(called);
std::vector<std::string> expected{"abc", "def"};
EXPECT_EQ(diag.logs(), expected);
}
}
TEST(Receiver, OnceManyArgs) {
class Owner {
public:
explicit Owner(async_dispatcher_t* dispatcher) : receiver_{this, dispatcher} {}
async_patterns::Callback<void(int, const std::string&, float)> Test() {
return receiver_.Once(&Owner::OnTest);
}
int count() const { return count_; }
private:
void OnTest(int a, const std::string& b, float c) {
EXPECT_EQ(a, 1);
EXPECT_EQ(b, "2");
EXPECT_EQ(c, 3.0);
count_++;
}
async_patterns::Receiver<Owner> receiver_;
int count_ = 0;
};
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
Owner owner{loop.dispatcher()};
async_patterns::Callback f = owner.Test();
EXPECT_EQ(owner.count(), 0);
f(1, std::string{"2"}, 3.0);
EXPECT_EQ(owner.count(), 0);
ASSERT_OK(loop.RunUntilIdle());
EXPECT_EQ(owner.count(), 1);
ASSERT_DEATH(f(1, std::string{"2"}, 3.0), "");
}
TEST(Receiver, RepeatingManyArgs) {
class Owner {
public:
explicit Owner(async_dispatcher_t* dispatcher) : receiver_{this, dispatcher} {}
async_patterns::Function<void(int, const std::string&, float)> Test() {
return receiver_.Repeating(&Owner::OnTest);
}
int count() const { return count_; }
private:
void OnTest(int a, const std::string& b, float c) {
EXPECT_EQ(a, 1);
EXPECT_EQ(b, "2");
EXPECT_EQ(c, 3.0);
count_++;
}
async_patterns::Receiver<Owner> receiver_;
int count_ = 0;
};
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
Owner owner{loop.dispatcher()};
async_patterns::Function f = owner.Test();
EXPECT_EQ(owner.count(), 0);
f(1, std::string{"2"}, 3.0);
EXPECT_EQ(owner.count(), 0);
ASSERT_OK(loop.RunUntilIdle());
EXPECT_EQ(owner.count(), 1);
f(1, std::string{"2"}, 3.0);
ASSERT_OK(loop.RunUntilIdle());
EXPECT_EQ(owner.count(), 2);
}
TEST(Receiver, OrderingAcrossCallbacksAndFunctions) {
class Owner {
public:
explicit Owner(async_dispatcher_t* dispatcher) : receiver_{this, dispatcher} {}
async_patterns::Function<void(const std::string&)> Method1() {
return receiver_.Repeating(&Owner::LogMethodCall);
}
async_patterns::Callback<void(const std::string&)> Method2() {
return receiver_.Once(&Owner::LogMethodCall);
}
async_patterns::Function<void(const std::string&)> Method3() {
return receiver_.Repeating(&Owner::LogMethodCall);
}
const std::vector<std::string>& logs() const { return logs_; }
private:
void LogMethodCall(const std::string& b) { logs_.push_back(b); }
async_patterns::Receiver<Owner> receiver_;
std::vector<std::string> logs_;
};
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
Owner owner{loop.dispatcher()};
async_patterns::Function method_1 = owner.Method1();
async_patterns::Callback method_2 = owner.Method2();
async_patterns::Function method_3 = owner.Method3();
method_3("1");
method_1("2");
method_1("3");
method_3("4");
method_1("5");
method_2("6");
method_3("7");
ASSERT_OK(loop.RunUntilIdle());
std::vector<std::string> expected{"1", "2", "3", "4", "5", "6", "7"};
EXPECT_EQ(owner.logs(), expected);
}
} // namespace