blob: 7c9779f96749a922f2cd307584eec8c9d00cd0e8 [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/async-loop/cpp/loop.h>
#include <lib/async_patterns/cpp/dispatcher_bound.h>
#include <lib/sync/cpp/completion.h>
#include <thread>
#include <gtest/gtest.h>
#include <sdk/lib/async_patterns/cpp/receiver.h>
#include "src/lib/testing/predicates/status.h"
namespace {
// In this test fixture, the loop should always be run from the main thread.
class DispatcherBound : public testing::Test {
protected:
void SetUp() override { loop_thread_id_ = std::this_thread::get_id(); }
void TearDown() override {}
async::Loop& loop() { return loop_; }
std::thread::id loop_thread_id() const { return loop_thread_id_; }
private:
std::thread::id loop_thread_id_;
async::Loop loop_{&kAsyncLoopConfigNeverAttachToThread};
};
using ArcAtomic = std::shared_ptr<std::atomic_int>;
ArcAtomic make_arc_atomic() { return std::make_shared<std::atomic_int>(); }
// |ThreadAffine| asserts that it is always used from a particular thread.
// It will be used to check that |DispatcherBound| schedules all work on the
// dispatcher thread.
class ThreadAffine {
public:
explicit ThreadAffine(std::thread::id expected, ArcAtomic counter)
: expected_(expected), counter_(std::move(counter)) {
EXPECT_EQ(expected_, std::this_thread::get_id());
counter_->fetch_add(1);
}
~ThreadAffine() {
EXPECT_EQ(expected_, std::this_thread::get_id());
counter_->fetch_sub(1);
}
// The following are a bunch of thread-affine operations that take different
// kinds of data types and exercised in the tests later.
void NoArg() { EXPECT_EQ(expected_, std::this_thread::get_id()); }
void Add(int a, int b, const ArcAtomic& result) {
EXPECT_EQ(expected_, std::this_thread::get_id());
result->store(a + b);
}
void PassMoveOnly(std::unique_ptr<int> p) {
EXPECT_EQ(expected_, std::this_thread::get_id());
EXPECT_NE(nullptr, p.get());
}
void PassPointer(std::string* s) {
EXPECT_EQ(expected_, std::this_thread::get_id());
EXPECT_NE("", *s);
*s = "";
}
void PassReference(std::string& s) {
EXPECT_EQ(expected_, std::this_thread::get_id());
EXPECT_NE("", s);
s = "";
}
void PassConstReference(const std::string& s) {
EXPECT_EQ(expected_, std::this_thread::get_id());
EXPECT_NE("", s);
}
void PassValue(std::vector<int> v) {
EXPECT_EQ(expected_, std::this_thread::get_id());
EXPECT_GT(v.size(), 0u);
v = {};
}
private:
std::thread::id expected_;
ArcAtomic counter_;
};
TEST_F(DispatcherBound, ConstructDisengaged) {
async_patterns::DispatcherBound<ThreadAffine> obj{loop().dispatcher()};
EXPECT_FALSE(obj.has_value());
}
TEST_F(DispatcherBound, Construct) {
libsync::Completion create;
libsync::Completion destroy;
auto object_count = make_arc_atomic();
// From a foreign thread |t1|, we schedule the creation and destruction of a
// |ThreadAffine| object onto the loop.
std::thread t1{[&] {
async_patterns::DispatcherBound<ThreadAffine> obj{loop().dispatcher(), std::in_place,
loop_thread_id(), object_count};
EXPECT_TRUE(obj.has_value());
create.Signal();
destroy.Wait();
obj.reset();
EXPECT_FALSE(obj.has_value());
}};
EXPECT_OK(create.Wait());
EXPECT_EQ(0, object_count->load());
EXPECT_OK(loop().RunUntilIdle());
EXPECT_EQ(1, object_count->load());
destroy.Signal();
t1.join();
EXPECT_EQ(1, object_count->load());
EXPECT_OK(loop().RunUntilIdle());
EXPECT_EQ(0, object_count->load());
}
TEST_F(DispatcherBound, Emplace) {
libsync::Completion did_create_1;
libsync::Completion will_create_2;
libsync::Completion did_create_2;
libsync::Completion will_destroy;
auto object_count_1 = make_arc_atomic();
auto object_count_2 = make_arc_atomic();
// From a foreign thread |t1|, we schedule the creation and destruction of
// two |ThreadAffine| objects onto the loop.
std::thread t1{[&] {
async_patterns::DispatcherBound<ThreadAffine> obj{loop().dispatcher()};
obj.emplace(loop_thread_id(), object_count_1);
EXPECT_TRUE(obj.has_value());
did_create_1.Signal();
will_create_2.Wait();
obj.emplace(loop_thread_id(), object_count_2);
EXPECT_TRUE(obj.has_value());
did_create_2.Signal();
will_destroy.Wait();
}};
EXPECT_OK(did_create_1.Wait());
EXPECT_EQ(0, object_count_1->load());
EXPECT_EQ(0, object_count_2->load());
EXPECT_OK(loop().RunUntilIdle());
EXPECT_EQ(1, object_count_1->load());
EXPECT_EQ(0, object_count_2->load());
will_create_2.Signal();
EXPECT_OK(did_create_2.Wait());
EXPECT_EQ(1, object_count_1->load());
EXPECT_EQ(0, object_count_2->load());
EXPECT_OK(loop().RunUntilIdle());
EXPECT_EQ(0, object_count_1->load());
EXPECT_EQ(1, object_count_2->load());
will_destroy.Signal();
t1.join();
EXPECT_EQ(0, object_count_1->load());
EXPECT_EQ(1, object_count_2->load());
EXPECT_OK(loop().RunUntilIdle());
EXPECT_EQ(0, object_count_1->load());
EXPECT_EQ(0, object_count_2->load());
}
TEST_F(DispatcherBound, AsyncCall) {
auto count = make_arc_atomic();
async_patterns::DispatcherBound<ThreadAffine> obj{loop().dispatcher(), std::in_place,
loop_thread_id(), count};
EXPECT_OK(loop().RunUntilIdle());
{
obj.AsyncCall(&ThreadAffine::NoArg);
EXPECT_OK(loop().RunUntilIdle());
}
{
auto result = make_arc_atomic();
obj.AsyncCall(&ThreadAffine::Add, 1, 2, result);
EXPECT_EQ(0, result->load());
EXPECT_OK(loop().RunUntilIdle());
EXPECT_EQ(3, result->load());
}
{
std::unique_ptr p = std::make_unique<int>(42);
obj.AsyncCall(&ThreadAffine::PassMoveOnly, std::move(p));
EXPECT_EQ(nullptr, p);
EXPECT_OK(loop().RunUntilIdle());
}
// Copy a |T| to a receiver that expects a |const T&|.
{
std::string s = "abc";
obj.AsyncCall(&ThreadAffine::PassConstReference, s);
EXPECT_EQ("abc", s);
EXPECT_OK(loop().RunUntilIdle());
EXPECT_EQ("abc", s);
}
// Move a |T| to a receiver that expects a |const T&|.
{
std::string s = "abc";
obj.AsyncCall(&ThreadAffine::PassConstReference, std::move(s));
EXPECT_EQ("", s);
EXPECT_OK(loop().RunUntilIdle());
EXPECT_EQ("", s);
}
// Copy a |T&| to a receiver that expects a |const T&|.
{
std::optional<std::string> s = std::string{"abc"};
std::string& s_ref = s.value();
obj.AsyncCall(&ThreadAffine::PassConstReference, s_ref);
// After firing the async call, the queued call should have its own private
// copy of |s|, so it should be allowed to destroy our |s| here.
s.reset();
EXPECT_OK(loop().RunUntilIdle());
}
// Move a |T| to a receiver that expects a |T|.
{
std::vector<int> v{1, 2, 3};
obj.AsyncCall(&ThreadAffine::PassValue, std::move(v));
EXPECT_EQ(0u, v.size());
EXPECT_OK(loop().RunUntilIdle());
EXPECT_EQ(0u, v.size());
}
// Copy a |T&| to a receiver that expects a |T|.
{
std::vector<int> v2{1, 2, 3};
std::vector<int>& v2_ref = v2;
obj.AsyncCall(&ThreadAffine::PassValue, v2_ref);
EXPECT_EQ(3u, v2_ref.size());
EXPECT_OK(loop().RunUntilIdle());
EXPECT_EQ(3u, v2_ref.size());
}
// Calling a function that consumes a move-only type with |T&| is not allowed.
#if 0
{
std::unique_ptr p = std::make_unique<int>(42);
obj.AsyncCall(&ThreadAffine::PassMoveOnly, p);
EXPECT_EQ(nullptr, p);
EXPECT_OK(loop().RunUntilIdle());
}
#endif
// Pass-through a |T&| to a receiver that expects a |T&| is not supported.
#if 0
{
std::string s = "abc";
std::string& s2 = s;
obj.AsyncCall(&ThreadAffine::PassReference, s2);
EXPECT_EQ("abc", s2);
EXPECT_OK(loop().RunUntilIdle());
EXPECT_EQ("", s2);
}
#endif
// Pointer arguments are not supported.
#if 0
{
std::string s = "abc";
obj.AsyncCall(&ThreadAffine::PassPointer, &s);
EXPECT_EQ("abc", s);
EXPECT_OK(loop().RunUntilIdle());
EXPECT_EQ("", s);
}
#endif
}
TEST_F(DispatcherBound, AsyncCallWithReply) {
class Background {
public:
explicit Background(std::string base) : base_(std::move(base)) {}
std::string Concat(const std::string& arg) { return base_ + arg; }
private:
std::string base_;
};
// |Owner| asynchronously tells |Background| to concatenate a string,
// and check the result in |DoneConcat|.
class Owner {
public:
explicit Owner(async_dispatcher_t* owner_dispatcher) : receiver_{this, owner_dispatcher} {
background_.AsyncCall(&Background::Concat, std::string("def"))
.Then(receiver_.Once(&Owner::DoneConcat));
// Passing incompatible types is not allowed.
#if 0
background_.AsyncCall(&Background::Concat, std::string("def"))
.Then(receiver_.Once([] (Owner*, int not_a_string) {}));
#endif
}
bool got_result() const { return got_result_; }
async::Loop& background_loop() { return background_loop_; }
private:
void DoneConcat(const std::string& result) {
EXPECT_FALSE(got_result_);
EXPECT_EQ(result, "abcdef");
got_result_ = true;
}
async::Loop background_loop_{&kAsyncLoopConfigNeverAttachToThread};
async_patterns::DispatcherBound<Background> background_{background_loop_.dispatcher(),
std::in_place, std::string("abc")};
async_patterns::Receiver<Owner> receiver_;
bool got_result_ = false;
};
Owner owner{loop().dispatcher()};
EXPECT_FALSE(owner.got_result());
// Nothing to process on the main loop.
ASSERT_OK(loop().RunUntilIdle());
EXPECT_FALSE(owner.got_result());
// Background loop should process |Concat| and post back the result.
ASSERT_OK(owner.background_loop().RunUntilIdle());
EXPECT_FALSE(owner.got_result());
// Main loop should process |DoneConcat|.
ASSERT_OK(loop().RunUntilIdle());
EXPECT_TRUE(owner.got_result());
}
TEST_F(DispatcherBound, SynchronouslyRunIfShutdown) {
loop().Shutdown();
auto object_count = make_arc_atomic();
{
async_patterns::DispatcherBound<ThreadAffine> obj{loop().dispatcher()};
EXPECT_EQ(0, object_count->load());
obj.emplace(loop_thread_id(), object_count);
EXPECT_EQ(1, object_count->load());
auto result = make_arc_atomic();
obj.AsyncCall(&ThreadAffine::Add, 1, 2, result);
EXPECT_EQ(3, result->load());
}
EXPECT_EQ(0, object_count->load());
}
TEST_F(DispatcherBound, ShutdownAfterConstruction) {
auto object_count = make_arc_atomic();
{
async_patterns::DispatcherBound<ThreadAffine> obj{loop().dispatcher()};
EXPECT_EQ(0, object_count->load());
obj.emplace(loop_thread_id(), object_count);
loop().Shutdown();
EXPECT_EQ(1, object_count->load());
}
EXPECT_EQ(0, object_count->load());
}
TEST_F(DispatcherBound, ShutdownAfterAsyncCall) {
auto object_count = make_arc_atomic();
{
async_patterns::DispatcherBound<ThreadAffine> obj{loop().dispatcher()};
EXPECT_EQ(0, object_count->load());
obj.emplace(loop_thread_id(), object_count);
loop().RunUntilIdle();
EXPECT_EQ(1, object_count->load());
auto result = make_arc_atomic();
obj.AsyncCall(&ThreadAffine::Add, 1, 2, result);
loop().Shutdown();
EXPECT_EQ(3, result->load());
}
EXPECT_EQ(0, object_count->load());
}
TEST_F(DispatcherBound, DispatcherOutlivesDispatcherBound) {
auto count = make_arc_atomic();
{
async_patterns::DispatcherBound<ThreadAffine> obj{loop().dispatcher()};
obj.emplace(loop_thread_id(), count);
}
loop().RunUntilIdle();
EXPECT_EQ(0, count->load());
}
TEST_F(DispatcherBound, MakeDispatcherBound) {
auto object_count = make_arc_atomic();
auto obj = async_patterns::MakeDispatcherBound<ThreadAffine>(loop().dispatcher(),
loop_thread_id(), object_count);
EXPECT_EQ(0, object_count->load());
loop().RunUntilIdle();
EXPECT_EQ(1, object_count->load());
}
} // namespace