blob: cf2dbfb798075fdaaa1a8009d919a34ccd28901d [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/async-loop/cpp/loop.h>
#include <lib/fit/defer.h>
#include <lib/sync/cpp/completion.h>
#include <lib/zx/time.h>
#include <latch>
#include <list>
#include <semaphore>
#include <thread>
#include <gtest/gtest.h>
#include <sdk/lib/async_patterns/cpp/internal/task_queue.h>
#include "src/lib/testing/predicates/status.h"
namespace {
using async_patterns::internal::Task;
using async_patterns::internal::TaskQueue;
using async_patterns::internal::TaskQueueHandle;
TEST(TaskQueue, MustStopBeforeDestruction) {
ASSERT_DEATH(
{
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
TaskQueue queue(loop.dispatcher(), "");
},
"");
}
TEST(TaskQueue, Add) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
TaskQueue queue(loop.dispatcher(), "");
bool called = false;
queue.Add(Task::Box([&] { called = true; }));
EXPECT_OK(loop.RunUntilIdle());
EXPECT_TRUE(called);
queue.Stop();
}
TEST(TaskQueue, StopBeforeAdd) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
TaskQueue queue(loop.dispatcher(), "");
queue.Stop();
for (size_t i = 0; i < 3; i++) {
bool called = false;
queue.Add(Task::Box([&] { called = true; }));
EXPECT_OK(loop.RunUntilIdle());
EXPECT_FALSE(called);
}
}
TEST(TaskQueue, StopAfterAdd) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
TaskQueue queue(loop.dispatcher(), "");
bool called = false;
queue.Add(Task::Box([&] { called = true; }));
queue.Stop();
EXPECT_OK(loop.RunUntilIdle());
EXPECT_FALSE(called);
}
TEST(TaskQueue, DispatcherShutdownBeforeAdd) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
TaskQueue queue(loop.dispatcher(), "");
loop.Shutdown();
for (size_t i = 0; i < 3; i++) {
bool called = false;
bool dropped = false;
queue.Add(Task::Box([&, on_drop = fit::defer([&] { dropped = true; })] { called = true; }));
EXPECT_FALSE(called);
EXPECT_TRUE(dropped);
}
queue.Stop();
}
TEST(TaskQueue, DispatcherShutdownAfterAdd) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
TaskQueue queue(loop.dispatcher(), "");
bool called = false;
bool dropped = false;
queue.Add(Task::Box([&, on_drop = fit::defer([&] { dropped = true; })] { called = true; }));
loop.Shutdown();
EXPECT_FALSE(called);
EXPECT_TRUE(dropped);
queue.Stop();
}
TEST(TaskQueue, AddFromOtherThread) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
auto queue = std::make_shared<TaskQueue>(loop.dispatcher(), "");
libsync::Completion added;
libsync::Completion called;
std::thread::id main_thread_id = std::this_thread::get_id();
std::optional<std::thread::id> task_thread_id;
std::thread t([handle = TaskQueueHandle(queue), &added, &called, &task_thread_id] {
handle.Add([&called, &task_thread_id] {
task_thread_id.emplace(std::this_thread::get_id());
called.Signal();
});
added.Signal();
});
EXPECT_OK(added.Wait());
EXPECT_FALSE(called.signaled());
EXPECT_OK(loop.RunUntilIdle());
EXPECT_TRUE(called.signaled());
EXPECT_EQ(main_thread_id, task_thread_id.value());
queue->Stop();
t.join();
}
struct TaskAdderThreadPool {
public:
static constexpr size_t kNumThreads = 50;
static constexpr size_t kMinimumNumberOfTasks = kNumThreads * 1000;
// If the scheduler doesn't schedule the main thread as much, we might OOM
// the system as the worker threads keep queuing tests. This is a backstop
// against that. Assuming each task is 64 bytes, we should not use more than
// 256 MiB of RAM.
static constexpr size_t kMaximumNumberOfOutstandingTasks = 256 * 1024 * 1024 / 64;
explicit TaskAdderThreadPool(const TaskQueueHandle& handle) {
for (size_t i = 0; i < kNumThreads; i++) {
threads_.emplace_back([this, handle] {
entered_.count_down();
start_.Wait();
for (;;) {
outstanding_tasks_.acquire();
handle.Add([this, on_dropped = fit::defer([this] { dropped_count_.fetch_add(1); }),
destroyed = fit::defer([this] { outstanding_tasks_.release(); })]() mutable {
ran_count_++;
on_dropped.cancel();
});
if (quit_.load()) {
return;
}
}
});
}
entered_.wait();
start_.Signal();
}
~TaskAdderThreadPool() {
quit_.store(true);
for (auto& t : threads_) {
t.join();
}
}
size_t ran_count() const { return ran_count_; }
const std::atomic_size_t& dropped_count() const { return dropped_count_; }
private:
std::latch entered_{kNumThreads};
libsync::Completion start_;
std::atomic_bool quit_{false};
size_t ran_count_ = 0;
std::atomic_size_t dropped_count_{0};
std::counting_semaphore<> outstanding_tasks_{kMaximumNumberOfOutstandingTasks};
std::list<std::thread> threads_;
};
// A mini-stress test to attempt to exercise as many thread interleavings as possible.
//
// Worth-noting this test does not use RunUntilIdle. When many threads are posting
// work into the loop, RunUntilIdle may live lock where it is forever processing
// new tasks.
TEST(TaskQueue, AddFromManyThreads) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
auto queue = std::make_shared<TaskQueue>(loop.dispatcher(), "");
TaskAdderThreadPool pool{TaskQueueHandle{queue}};
// Run tasks in batches for a while.
for (;;) {
loop.Run(zx::time::infinite_past(), /* once */ true);
EXPECT_EQ(pool.dropped_count().load(), 0u);
if (pool.ran_count() >= TaskAdderThreadPool::kMinimumNumberOfTasks) {
break;
}
}
queue->Stop();
// Observe enough tasks dropped for a while.
for (;;) {
loop.Run(zx::time::infinite_past(), /* once */ true);
if (pool.dropped_count().load() >= TaskAdderThreadPool::kMinimumNumberOfTasks) {
break;
}
}
}
TEST(TaskQueue, AddFromManyThreadsButDispatcherShutsDown) {
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
auto queue = std::make_shared<TaskQueue>(loop.dispatcher(), "");
TaskAdderThreadPool pool{TaskQueueHandle{queue}};
// Run tasks in batches for a while.
for (;;) {
loop.Run(zx::time::infinite_past(), /* once */ true);
EXPECT_EQ(pool.dropped_count().load(), 0u);
if (pool.ran_count() >= TaskAdderThreadPool::kMinimumNumberOfTasks) {
break;
}
}
loop.Shutdown();
// Observe enough tasks dropped for a while.
for (;;) {
zx::nanosleep(zx::deadline_after(zx::usec(1)));
if (pool.dropped_count().load() >= TaskAdderThreadPool::kMinimumNumberOfTasks) {
break;
}
}
queue->Stop();
}
} // namespace