blob: 51a79b8db7e371570cd2a64ba153e255ee7b60a4 [file] [log] [blame] [edit]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "lib/closure-queue/closure_queue.h"
#include <lib/async/cpp/task.h>
#include <lib/async/dispatcher.h>
#include <zircon/assert.h>
ClosureQueue::ClosureQueue(async_dispatcher_t* dispatcher, thrd_t dispatcher_thread) {
SetDispatcher(dispatcher, dispatcher_thread);
}
ClosureQueue::ClosureQueue() {
// nothing to do here - the SetDispatcher() call takes care of setting up impl_.
}
void ClosureQueue::SetDispatcher(async_dispatcher_t* dispatcher, thrd_t dispatcher_thread) {
// Max 1 call to SetDispatcher() permitted.
ZX_DEBUG_ASSERT(!impl_);
impl_ = ClosureQueue::Impl::Create(dispatcher, dispatcher_thread);
}
ClosureQueue::~ClosureQueue() {
// Ensure stopped and cleared.
if (impl_) {
impl_->StopAndClear();
}
// ~impl_ drops refcount on Impl - it'll only be fully deleted after all the
// tasks queued in Impl::Enqueue() (that run TryRunAll()) have been deleted.
}
void ClosureQueue::Enqueue(fit::closure to_run) {
ZX_DEBUG_ASSERT(impl_);
impl_->Enqueue(impl_, std::move(to_run));
}
void ClosureQueue::StopAndClear() {
ZX_DEBUG_ASSERT(impl_);
impl_->StopAndClear();
}
bool ClosureQueue::is_stopped() {
if (!impl_) {
// Not SetDispatcher()'ed yet, so also not stopped yet.
return false;
}
return impl_->is_stopped();
}
void ClosureQueue::RunOneHere() {
ZX_DEBUG_ASSERT(impl_);
impl_->RunOneHere();
}
thrd_t ClosureQueue::dispatcher_thread() {
ZX_DEBUG_ASSERT(impl_);
return impl_->dispatcher_thread();
}
std::shared_ptr<ClosureQueue::Impl> ClosureQueue::Impl::Create(async_dispatcher_t* dispatcher,
thrd_t dispatcher_thread) {
return std::shared_ptr<ClosureQueue::Impl>(new ClosureQueue::Impl(dispatcher, dispatcher_thread));
}
ClosureQueue::Impl::Impl(async_dispatcher_t* dispatcher, thrd_t dispatcher_thread)
: dispatcher_(dispatcher), dispatcher_thread_(dispatcher_thread) {
ZX_DEBUG_ASSERT(dispatcher_);
ZX_DEBUG_ASSERT(dispatcher_thread);
}
ClosureQueue::Impl::~Impl() {
// This is set to nullptr in StopAndClear(), which is called in ~ClosureQueue before dropping
// impl_.
ZX_DEBUG_ASSERT(!dispatcher_);
}
void ClosureQueue::Impl::Enqueue(std::shared_ptr<Impl> self_shared, fit::closure to_run) {
std::lock_guard<std::mutex> lock(lock_);
if (!dispatcher_) {
// This path avoids LamdaQueue being overly picky about whether StopAndClear() is run before vs.
// after stopping other threads from calling Enqueue(), but it's of course still up to client
// code to ensure that ClosureQueue is valid/alive at the time that Enqueue is called on
// ClosureQueue. In other words, this path doesn't change the fact that lifetime protection in
// ClosureQueue is limited to the self_shared that's captured by a posted lambda further down in
// this method.
//
// ~to_run
return;
}
bool was_empty = pending_.empty();
pending_.emplace(std::move(to_run));
// We intentionally re-post any time the queue bounced off empty, so that the posted runner task
// isn't forced to keep re-checking-for/accepting additional tasks, which might tend to starve out
// other work.
if (was_empty) {
pending_not_empty_condition_.notify_all();
// Posting to a dispatcher under a lock is necessary here because otherwise
// the dispatcher can already be deleted.
zx_status_t result = async::PostTask(dispatcher_, [self_shared] {
// Will just return if StopAndClear() has already run.
self_shared->TryRunAll();
// ~self_shared, which may run ~Impl if StopAndClear() has run.
});
ZX_ASSERT(result == ZX_OK);
}
// ~lock
}
void ClosureQueue::Impl::StopAndClear() {
std::queue<fit::closure> local_pending;
std::queue<fit::closure> local_pending_on_dispatcher_thread;
std::lock_guard<std::mutex> lock(lock_);
if (!dispatcher_) {
// Idempotent; already stopped and cleared.
return;
}
// We only enforce that the first call to StopAndClear() that actually stops
// is on the dispatcher_thread_. It's fine to ~ClosureQueue on a different
// thread as long as we've previously run StopAndClear() on
// dispatcher_thread_.
ZX_DEBUG_ASSERT(thrd_current() == dispatcher_thread_);
local_pending.swap(pending_);
local_pending_on_dispatcher_thread.swap(pending_on_dispatcher_thread_);
dispatcher_ = nullptr;
// The order of these destructors is intentional, as we don't want to be
// holding the lock while calling or deleting to_run(s):
//
// ~lock
// ~local_pending_on_dispatcher_thread
// ~local_pending
}
bool ClosureQueue::Impl::is_stopped() {
std::lock_guard<std::mutex> lock(lock_);
return !dispatcher_;
}
// We intentionally only pick up tasks that were in the queue at the start of
// TryRunAll(), and not tasks added while TryRunAll() is running, so that other
// unrelated tasks/work that needs to run on the current thread has a chance to
// run.
void ClosureQueue::Impl::TryRunAll() {
ZX_DEBUG_ASSERT(thrd_current() == dispatcher_thread_);
{ // scope lock
std::lock_guard<std::mutex> lock(lock_);
// StopAndClear() can only be called on the dispatcher_thread_, so we're
// actually safe from that method's actions (even without holding lock_),
// but to make FXL_GUARDED_BY() happy we check dispatcher_ while holding
// lock_.
if (!dispatcher_) {
return;
}
ZX_DEBUG_ASSERT(dispatcher_);
ZX_DEBUG_ASSERT(pending_on_dispatcher_thread_.empty());
pending_on_dispatcher_thread_.swap(pending_);
// local_pending can be empty at this point, but only if RunOneHere() was
// used.
} // ~lock
while (!pending_on_dispatcher_thread_.empty()) {
fit::closure local_to_run =
std::move(pending_on_dispatcher_thread_.front());
pending_on_dispatcher_thread_.pop();
local_to_run();
// local_to_run() may have run StopAndClear().
if (is_stopped()) {
// StopAndClear() clears both pending_ and pending_on_dispatcher_thread_.
ZX_DEBUG_ASSERT(pending_on_dispatcher_thread_.empty());
break;
}
}
}
void ClosureQueue::Impl::RunOneHere() {
ZX_DEBUG_ASSERT(thrd_current() == dispatcher_thread_);
fit::closure local_to_run;
{ // scope lock
std::unique_lock<std::mutex> lock(lock_);
ZX_DEBUG_ASSERT(dispatcher_);
while (pending_.empty()) {
pending_not_empty_condition_.wait(lock);
}
local_to_run = std::move(pending_.front());
pending_.pop();
} // ~lock
local_to_run();
}
thrd_t ClosureQueue::Impl::dispatcher_thread() {
ZX_DEBUG_ASSERT(dispatcher_thread_);
return dispatcher_thread_;
}