blob: 4bd62282fe9185040f0ecdde6bae843dcc9c25ed [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/developer/debug/shared/worker_pool.h"
#include <lib/syslog/cpp/macros.h>
#include <iostream>
#include <gtest/gtest.h>
#include "src/developer/debug/shared/logging/logging.h"
using namespace std::chrono_literals;
namespace debug_ipc {
namespace {
class TestWorkerPoolObserver : public WorkerPool::Observer {
public:
TestWorkerPoolObserver(std::mutex* mutex, std::condition_variable* cv)
: mutex_(mutex), task_done_cv_(cv) {}
void OnWorkerCreation() override;
void OnWorkerExiting() override;
void OnExecutingTask() override;
void OnShutdown() override;
void Reset();
int workers_created() const { return workers_created_; }
int workers_exited() const { return workers_exited_; }
int tasks_executed() const { return tasks_executed_; }
bool shutdown_called() const { return shutdown_called_; }
bool all_tasks_done() const { return all_tasks_done_; }
std::mutex& mutex() { return *mutex_; }
std::condition_variable& task_done_cv() { return *task_done_cv_; }
private:
std::atomic<int> workers_created_ = 0;
std::atomic<int> workers_exited_ = 0;
std::atomic<int> tasks_executed_ = 0;
std::atomic<bool> all_tasks_done_ = false;
std::atomic<bool> shutdown_called_ = false;
std::mutex* mutex_;
std::condition_variable* task_done_cv_;
};
constexpr int kWorkerCount = 5;
void WaitForTasksToBeDone(TestWorkerPoolObserver* observer) {
auto timeout = 1s;
auto threshold = std::chrono::system_clock::now() + timeout;
std::unique_lock<std::mutex> lock(observer->mutex());
DEBUG_LOG(Test) << std::this_thread::get_id() << " Waiting for tasks.";
observer->task_done_cv().wait_for(lock, timeout, [&observer, threshold] {
// Either the tasks are done or we wait till the timeout.
if (observer->all_tasks_done())
return true;
return threshold < std::chrono::system_clock::now();
});
}
TEST(WorkerPool, PostTasks) {
// Enable for debugging the test.
// debug_ipc::SetDebugMode(true);
std::mutex mutex;
std::condition_variable task_done_cv;
TestWorkerPoolObserver observer(&mutex, &task_done_cv);
WorkerPool task_queue(kWorkerCount, &observer);
std::vector<bool> tasks_called;
for (int i = 0; i < kWorkerCount; i++) {
std::lock_guard<std::mutex> lock(observer.mutex());
tasks_called.push_back(false);
bool posted = task_queue.PostTask([i, &tasks_called, &observer]() {
std::lock_guard<std::mutex> guard(observer.mutex());
tasks_called[i] = true;
DEBUG_LOG(Test) << std::this_thread::get_id() << " First round: Task " << i << " executed.";
});
ASSERT_TRUE(posted) << "First round: while posting task " << i;
}
// Since we're not running, no tasks should've been run.
ASSERT_EQ(observer.workers_created(), 0);
// Will run until all the tasks have been completed or timeout.
task_queue.Run();
WaitForTasksToBeDone(&observer);
ASSERT_TRUE(observer.all_tasks_done());
ASSERT_EQ(observer.tasks_executed(), kWorkerCount);
// Should've created workers and not exited any.
ASSERT_TRUE(observer.workers_created() > 0);
ASSERT_EQ(observer.workers_exited(), 0);
// Should've run all the tasks.
ASSERT_EQ(observer.tasks_executed(), kWorkerCount);
for (int i = 0; i < kWorkerCount; i++) {
EXPECT_TRUE(tasks_called[i]) << "First Round: Task " << i << " not called.";
}
// We append some more tasks.
observer.Reset();
tasks_called.clear();
for (int i = 0; i < kWorkerCount; i++) {
std::lock_guard<std::mutex> lock(observer.mutex());
tasks_called.push_back(false);
bool posted = task_queue.PostTask([i, &tasks_called, &observer]() {
std::lock_guard<std::mutex> lock(observer.mutex());
tasks_called[i] = true;
DEBUG_LOG(Test) << std::this_thread::get_id() << " Second round: Task " << i << " executed.";
});
ASSERT_TRUE(posted) << "Second round: while posting task " << i;
}
WaitForTasksToBeDone(&observer);
ASSERT_TRUE(observer.all_tasks_done());
ASSERT_EQ(observer.tasks_executed(), kWorkerCount);
// Should've run all the tasks.
for (int i = 0; i < kWorkerCount; i++) {
EXPECT_TRUE(tasks_called[i]) << "Second Round: Task " << i << " not called.";
}
// No workers should've exited.
ASSERT_EQ(observer.workers_exited(), 0);
task_queue.Shutdown();
// Should've join all the workers.
EXPECT_TRUE(observer.shutdown_called());
EXPECT_EQ(observer.workers_exited(), observer.workers_created());
// Shouldn't be able to create tasks when shutdown.
EXPECT_FALSE(task_queue.PostTask({}));
}
// TestWorkerPoolObserver
// -------------------------------------------------------
void TestWorkerPoolObserver::OnWorkerCreation() { workers_created_++; }
void TestWorkerPoolObserver::OnWorkerExiting() { workers_exited_++; }
void TestWorkerPoolObserver::OnExecutingTask() {
std::lock_guard<std::mutex> lock(*mutex_);
tasks_executed_++;
DEBUG_LOG(Test) << std::this_thread::get_id() << " executed task count: " << tasks_executed_;
if (tasks_executed_ == kWorkerCount) {
DEBUG_LOG(Test) << "All tasks are done!";
all_tasks_done_ = true;
task_done_cv_->notify_one();
}
}
void TestWorkerPoolObserver::OnShutdown() { shutdown_called_ = true; };
void TestWorkerPoolObserver::Reset() {
std::lock_guard<std::mutex> lock(*mutex_);
tasks_executed_ = 0;
all_tasks_done_ = false;
}
} // namespace
} // namespace debug_ipc