blob: c142c7a1ce8582fa454dd2e890f39b4ef47f0d74 [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/async/time.h>
#include <lib/async_patterns/cpp/internal/task_queue.h>
#include <lib/sync/cpp/mutex.h>
#include <threads.h>
#include <zircon/assert.h>
#include <mutex>
namespace async_patterns::internal {
std::shared_ptr<TaskQueue> TaskQueue::Create(async_dispatcher_t* dispatcher,
const char* description) {
return std::make_shared<TaskQueue>(dispatcher, description);
}
TaskQueue::TaskQueue(async_dispatcher_t* dispatcher, const char* description)
: async_task_t({{ASYNC_STATE_INIT},
[](async_dispatcher_t*, async_task_t* task, zx_status_t status) {
static_cast<TaskQueue*>(task)->OnWake(status);
},
async_now(dispatcher)}),
dispatcher_(dispatcher),
checker_(dispatcher, description) {
std::lock_guard guard(checker_);
ZX_DEBUG_ASSERT(dispatcher_ != nullptr);
}
TaskQueue::~TaskQueue() {
std::lock_guard guard(mutex_);
ZX_ASSERT(stopped_);
ZX_ASSERT(!wake_pending_);
ZX_ASSERT(list_is_empty(&task_list_));
}
void TaskQueue::Add(std::unique_ptr<Task> task) {
list_node_t tasks;
bool ok;
{
std::lock_guard guard(mutex_);
if (stopped_) {
return;
}
list_add_tail(&task_list_, task.release());
ok = WakeDispatcher();
if (!ok) {
StopLocked();
list_move(&task_list_, &tasks);
}
}
if (!ok) {
DropTasks(&tasks);
}
}
void TaskQueue::Stop() {
std::lock_guard guard(checker_);
list_node_t tasks;
{
std::lock_guard mutex_guard(mutex_);
if (stopped_) {
return;
}
StopLocked();
list_move(&task_list_, &tasks);
}
DropTasks(&tasks);
}
void TaskQueue::OnWake(zx_status_t status) {
std::lock_guard outer(checker_);
list_node_t tasks;
// Harvest tasks under the lock.
{
std::lock_guard inner(mutex_);
wake_pending_ = false;
if (status != ZX_OK) {
StopLocked();
}
list_move(&task_list_, &tasks);
}
if (status != ZX_OK) {
DropTasks(&tasks);
} else {
RunTasks(&tasks);
}
}
void TaskQueue::StopLocked() {
ZX_DEBUG_ASSERT(!stopped_);
stopped_ = true;
CancelWakeDispatcher();
}
void TaskQueue::DropTasks(list_node_t* tasks) {
list_node_t* node = nullptr;
list_node_t* temp_node = nullptr;
list_for_every_safe(tasks, node, temp_node) {
list_delete(node);
auto* task = static_cast<Task*>(node);
delete task;
}
}
void TaskQueue::RunTasks(list_node_t* tasks) {
list_node_t* node = nullptr;
list_node_t* temp_node = nullptr;
list_for_every_safe(tasks, node, temp_node) {
list_delete(node);
auto* task = static_cast<Task*>(node);
task->Run();
delete task;
}
}
bool TaskQueue::WakeDispatcher() {
if (wake_pending_) {
return true;
}
zx_status_t status = async_post_task(dispatcher_, this);
ZX_DEBUG_ASSERT(status == ZX_OK || status == ZX_ERR_BAD_STATE);
return wake_pending_ = status == ZX_OK;
}
void TaskQueue::CancelWakeDispatcher() {
if (!wake_pending_) {
return;
}
zx_status_t status = async_cancel_task(dispatcher_, this);
ZX_DEBUG_ASSERT(status == ZX_OK);
wake_pending_ = false;
}
} // namespace async_patterns::internal