blob: 8c76ce7f8a61f705984ef968ea19240f7c1085e1 [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef LIB_ASYNC_PATTERNS_CPP_INTERNAL_TASK_QUEUE_H_
#define LIB_ASYNC_PATTERNS_CPP_INTERNAL_TASK_QUEUE_H_
#include <lib/async/cpp/sequence_checker.h>
#include <lib/async/dispatcher.h>
#include <lib/async/task.h>
#include <lib/fit/function.h>
#include <lib/sync/cpp/mutex.h>
#include <zircon/listnode.h>
#include <atomic>
#include <memory>
namespace async_patterns::internal {
class TaskQueueHandle;
// |Task| implementations allocate their captured state along with an intrusive
// list node header on the heap. Since each task is of indeterminate size, this
// design ensures that each task uses exactly one heap allocation.
class Task : public list_node_t {
public:
Task() : list_node_t(LIST_INITIAL_CLEARED_VALUE) {}
virtual ~Task() = default;
virtual void Run() = 0;
template <typename Callable>
static std::unique_ptr<Task> Box(Callable&& callable) {
class TaskImpl final : public Task {
public:
explicit TaskImpl(Callable&& callable) : callable_(std::move(callable)) {}
void Run() final { callable_(); }
private:
Callable callable_;
};
return std::make_unique<TaskImpl>(std::forward<Callable>(callable));
}
};
// A task queue that offers thread-safe task insertion and inversion of control:
// tasks added to the queue are not immediately/synchronously run. Rather, they
// are run (or canceled) when the synchronize dispatcher comes around to
// process them.
//
// If the dispatcher is shut down, all tasks will be silently destroyed.
class TaskQueue : private async_task_t {
public:
// Constructs an empty task queue. |dispatcher| will be used to run
// tasks added to the queue.
//
// Thread safety: |TaskQueue| must be constructed on the associated
// synchronized dispatcher.
static std::shared_ptr<TaskQueue> Create(async_dispatcher_t* dispatcher, const char* description);
// Destroys the queue.
//
// Invariant: |Stop| must be called before destruction.
// Thread safety: |TaskQueue| may be destroyed on any thread.
~TaskQueue();
// Schedules a task onto the queue. Wakes the dispatcher if not already.
//
// Thread safety: |Add| can be called from any threads.
void Add(std::unique_ptr<Task> task);
// Atomically puts the queue into a state where no future tasks will be run.
// Tasks that are pending will be destroyed during |Stop|. Tasks added after
// |Stop| will be immediately destroyed in their respective |Add| calls.
//
// Cancels the pending wake in the dispatcher if any.
//
// Thread safety: |Stop| must be called from the synchronized dispatcher that
// is running the tasks.
void Stop();
explicit TaskQueue(async_dispatcher_t* dispatcher, const char* description);
TaskQueue(const TaskQueue&) = delete;
TaskQueue& operator=(const TaskQueue&) = delete;
TaskQueue(TaskQueue&&) = delete;
TaskQueue& operator=(TaskQueue&&) = delete;
private:
// |OnWake| is called from the synchronized dispatcher when there are tasks
// in the queue that need running.
void OnWake(zx_status_t status);
void StopLocked() __TA_REQUIRES(mutex_);
static void DropTasks(list_node_t* tasks) __TA_EXCLUDES(mutex_);
static void RunTasks(list_node_t* tasks);
bool WakeDispatcher() __TA_REQUIRES(mutex_);
void CancelWakeDispatcher() __TA_REQUIRES(mutex_);
async_dispatcher_t* const dispatcher_;
const async::synchronization_checker checker_;
libsync::Mutex mutex_;
// A list of |Task|.
list_node_t task_list_ __TA_GUARDED(mutex_) = LIST_INITIAL_VALUE(task_list_);
bool stopped_ __TA_GUARDED(mutex_) = false;
// Whether |OnWake| will be called by the dispatcher at some point.
bool wake_pending_ __TA_GUARDED(mutex_) = false;
};
// |TaskQueueHandle| are references vended out to possibly arbitrary threads
// that let them add tasks to the underlying queue. |TaskQueueHandle|s may be
// held on to indefinitely. The added tasks might be discarded if the underlying
// queue is stopped.
class TaskQueueHandle {
public:
// Adds a |task| to the referenced task queue.
template <typename Callable>
void Add(Callable&& task) const {
queue_->Add(Task::Box(std::forward<Callable>(task)));
}
bool has_value() const { return queue_ != nullptr; }
void reset() { queue_.reset(); }
explicit TaskQueueHandle(std::shared_ptr<TaskQueue> queue) : queue_(std::move(queue)) {}
private:
// The |TaskQueue| will clear out all of its expensive internal state before
// destruction. Thus holding onto a stopped |TaskQueue| object is cheap and
// reduces the atomic overhead of regular task posting compared to |weak_ptr|.
std::shared_ptr<TaskQueue> queue_;
};
// A type that submits tasks using |TaskQueueHandle| that is meant to work
// together with |PendingCall|.
class SubmitWithTaskQueueHandle {
public:
explicit SubmitWithTaskQueueHandle(TaskQueueHandle handle) : handle_(std::move(handle)) {}
template <typename Task>
void operator()(Task&& task) {
handle_.Add(std::forward<Task>(task));
reset();
}
bool has_value() const { return handle_.has_value(); }
void reset() { handle_.reset(); }
private:
TaskQueueHandle handle_;
};
} // namespace async_patterns::internal
#endif // LIB_ASYNC_PATTERNS_CPP_INTERNAL_TASK_QUEUE_H_