blob: 3c51c1385e39764063434e2fc1c5224e83db1bbf [file] [log] [blame]
/*
* Copyright (c) 2019 The Fuchsia Authors
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
* OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
* CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include "workqueue.h"
#include "debug.h"
#define WORKQUEUE_SIGNAL ZX_USER_SIGNAL_0
WorkQueue::WorkQueue(const char* name) : list_{}, current_(nullptr), work_ready_{}, thread_{} {
if (name == nullptr) {
strlcpy(this->name_, "nameless", sizeof(name_));
} else {
strlcpy(this->name_, name, sizeof(name_));
}
StartWorkQueue();
}
WorkQueue::~WorkQueue() {
auto work = WorkItem([](WorkItem* work) { thrd_exit(0); });
Schedule(&work);
thrd_join(thread_, nullptr);
}
WorkQueue& WorkQueue::DefaultInstance() {
static WorkQueue default_workqueue_("default_workqueue");
return default_workqueue_;
}
void WorkQueue::ScheduleDefault(WorkItem* work) {
if (work == nullptr) {
return;
}
DefaultInstance().Schedule(work);
}
void WorkQueue::Flush() {
auto work = WorkItem([](WorkItem* work) {});
zx_status_t result;
result = zx_event_create(0, &work.signaler);
if (result != ZX_OK) {
BRCMF_ERR("Failed to create signal (work not canceled)");
return;
}
Schedule(&work);
zx_signals_t observed;
result = zx_object_wait_one(work.signaler, WORKQUEUE_SIGNAL, ZX_TIME_INFINITE, &observed);
if (result != ZX_OK || (observed & WORKQUEUE_SIGNAL) == 0) {
BRCMF_ERR("Bad return from wait (work likely not flushed): result %d, observed %x", result,
observed);
}
zx_handle_close(work.signaler);
}
void WorkQueue::Schedule(WorkItem* work) {
if (work == nullptr) {
return;
}
list_node_t* node;
lock_.lock();
if (current_ == work) {
lock_.unlock();
return;
}
list_for_every(&list_, node) {
if (node == &work->item) {
lock_.unlock();
return;
}
}
work->workqueue = this;
list_add_tail(&list_, &work->item);
sync_completion_signal(&work_ready_);
lock_.unlock();
}
int WorkQueue::Runner() {
while (true) {
// When all the works are consumed, these two lines will block the thread.
sync_completion_wait(&work_ready_, ZX_TIME_INFINITE);
sync_completion_reset(&work_ready_);
list_node_t* item;
lock_.lock();
item = list_remove_head(&list_);
current_ = (item == nullptr) ? nullptr : containerof(item, WorkItem, item);
lock_.unlock();
while (current_ != nullptr) {
current_->handler(current_);
lock_.lock();
if (current_->signaler != ZX_HANDLE_INVALID) {
zx_object_signal(current_->signaler, 0, WORKQUEUE_SIGNAL);
}
item = list_remove_head(&list_);
current_ = (item == nullptr) ? nullptr : containerof(item, WorkItem, item);
lock_.unlock();
}
}
}
void WorkQueue::StartWorkQueue() {
work_ready_ = {};
list_initialize(&list_);
auto thread_func = [](void* arg) { return reinterpret_cast<WorkQueue*>(arg)->Runner(); };
thrd_create_with_name(&thread_, thread_func, this, name_);
}
WorkItem::WorkItem() : WorkItem(nullptr) {}
WorkItem::WorkItem(void (*handler)(WorkItem* work))
: handler(handler), signaler(ZX_HANDLE_INVALID), workqueue(nullptr) {
list_initialize(&item);
}
void WorkItem::Cancel() {
WorkQueue* wq = workqueue;
if (wq == nullptr) {
return;
}
zx_status_t result;
wq->lock_.lock();
if (wq->current_ == this) {
result = zx_event_create(0, &signaler);
wq->lock_.unlock();
if (result != ZX_OK) {
BRCMF_ERR("Failed to create signal (work not canceled)");
return;
}
zx_signals_t observed;
result = zx_object_wait_one(signaler, WORKQUEUE_SIGNAL, ZX_TIME_INFINITE, &observed);
if (result != ZX_OK || (observed & WORKQUEUE_SIGNAL) == 0) {
BRCMF_ERR("Bad return from wait (work likely not canceled): result %d, observed %x", result,
observed);
}
wq->lock_.lock();
zx_handle_close(signaler);
signaler = ZX_HANDLE_INVALID;
wq->lock_.unlock();
return;
} else {
list_node_t* node;
list_node_t* temp_node;
list_for_every_safe(&(wq->list_), node, temp_node) {
if (node == &item) {
list_delete(node);
wq->lock_.unlock();
return;
}
}
wq->lock_.unlock();
BRCMF_DBG(TEMP, "Work to be canceled not found");
}
}