blob: ce56839f80472c076c65a7a01f1d0ccc6513f931 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef LIB_CLOSURE_QUEUE_CLOSURE_QUEUE_H_
#define LIB_CLOSURE_QUEUE_CLOSURE_QUEUE_H_
#include <lib/async/dispatcher.h>
#include <lib/fit/function.h>
#include <threads.h>
#include <zircon/compiler.h>
#include <memory>
#include <mutex>
#include <queue>
class ClosureQueue {
public:
// This can be called on any thread. Do not call SetDispatcher() after this
// constructor.
ClosureQueue(async_dispatcher_t* dispatcher, thrd_t dispatcher_thread);
// Must call SetDispatcher() before using the queue.
ClosureQueue();
void SetDispatcher(async_dispatcher_t* dispatcher, thrd_t dispatcher_thread);
// This must be called only on dispatcher_thread.
~ClosureQueue();
// If StopAndClear() isn't called yet, runs to_run on dispatcher.
// If StopAndClear() has already been called, deletes to_run on this thread.
// If StopAndClear() is called after Enqueue() returns but before to_run has
// been run on dispatcher, deletes to_run on thread that calls StopAndClear().
//
// If run, to_run will run using the dispatcher, on the dispatcher_thread.
//
// This can be called on any thread.
//
// TODO(dustingreen): Consider adding an over-full threshold that permits
// client code to notice when the queue is more full than expected (more full
// than makes any sense for a given protocol).
void Enqueue(fit::closure to_run);
// This can only be called on the dispatcher_thread. This prevents any
// additional calls to Enqueue() from actually enqueing anything, and deletes
// any previously-queued tasks that haven't already run.
//
// This is idempotent, and will run automatically at the start of
// ~ClosureQueue, but client code is encouraged to call StopAndClear() earlier
// than ~ClosureQueue if that helps ensure that all the captures of all the
// queued lambdas will still be fully usable up to the point where
// StopAndClear() is called.
//
// This must be called only on dispatcher_thread.
void StopAndClear();
bool is_stopped();
// The DDK has some sync calls such as DdkSuspend(). By allowing the DDK's
// main dispatcher thread to pump the queue, the code that calls Enqueue() can
// be simpler.
//
// The calling thread must be the dispatcher_thread.
//
// This method will wait for at least one task to be in the queue, and then
// run exactly one task (here on the dispatcher_thread that's calling this
// method), then will return to the caller.
//
// TODO(dustingreen): If DdkSuspend() and similar become async, consider
// removing this method.
void RunOneHere();
thrd_t dispatcher_thread();
private:
class Impl {
public:
static std::shared_ptr<Impl> Create(async_dispatcher_t* dispatcher, thrd_t dispatcher_thread);
~Impl();
void Enqueue(std::shared_ptr<Impl> self_shared, fit::closure to_run);
void StopAndClear();
bool is_stopped();
void RunOneHere();
thrd_t dispatcher_thread();
private:
Impl(async_dispatcher_t* dispatcher, thrd_t dispatcher_thread);
void TryRunAll();
std::mutex lock_;
// Starts non-nullptr. Set to nullptr to indicate that StopAndClear() has
// run.
//
// TODO(dustingreen): __TA_GUARDED(lock_), when I can figure out why it doesn't seem to work.
// __TA_GUARDED(lock_)
async_dispatcher_t* dispatcher_ = {};
const thrd_t dispatcher_thread_ = {};
// TODO(dustingreen): __TA_GUARDED(lock_), when I can figure out why it doesn't seem to work.
// __TA_GUARDED(lock_)
std::queue<fit::closure> pending_;
// Only touched on dispatcher_thread_. This is a member so that StopAndClear() will really
// clear synchronously.
std::queue<fit::closure> pending_on_dispatcher_thread_;
std::condition_variable pending_not_empty_condition_;
};
std::shared_ptr<Impl> impl_;
};
#endif // LIB_CLOSURE_QUEUE_CLOSURE_QUEUE_H_