| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "work-queue.h" |
| #include "minfs-private.h" |
| |
| namespace minfs { |
| |
| WorkQueue::~WorkQueue() { |
| if (thrd_) { |
| { |
| fbl::AutoLock lock(&lock_); |
| unmounting_ = true; |
| data_cvar_.Signal(); |
| } |
| |
| int r; |
| thrd_join(thrd_.value(), &r); |
| ZX_DEBUG_ASSERT(r == 0); |
| } |
| |
| ZX_DEBUG_ASSERT(IsEmpty()); |
| } |
| |
| zx_status_t WorkQueue::Create(TransactionalFs* minfs, fbl::unique_ptr<WorkQueue>* out) { |
| fbl::unique_ptr<WorkQueue> processor(new WorkQueue(minfs)); |
| |
| processor->thrd_ = std::make_optional<thrd_t>(); |
| |
| if (thrd_create_with_name(&processor->thrd_.value(), DataThread, processor.get(), |
| "minfs-data-async") != thrd_success) { |
| return ZX_ERR_NO_RESOURCES; |
| } |
| |
| *out = std::move(processor); |
| return ZX_OK; |
| } |
| |
| void WorkQueue::EnqueueCallback(TaskCallback task) { |
| fbl::AutoLock lock(&lock_); |
| ReserveTask(std::move(task)); |
| data_cvar_.Signal(); |
| } |
| |
| bool WorkQueue::TasksWaiting() const { |
| fbl::AutoLock lock(&lock_); |
| return waiting_ > 0; |
| } |
| |
| void WorkQueue::ProcessNext() { |
| ZX_DEBUG_ASSERT(!IsEmpty()); |
| std::optional<TaskCallback> task; |
| task_queue_[start_].swap(task); |
| ZX_DEBUG_ASSERT(task.has_value()); |
| |
| lock_.Release(); |
| (*task)(minfs_); |
| task = nullptr; |
| lock_.Acquire(); |
| |
| if (waiting_ > 0) { |
| sync_cvar_.Signal(); |
| } |
| |
| // Update the queue. |
| start_ = (start_ + 1) % kMaxQueued; |
| count_--; |
| } |
| |
| void WorkQueue::ReserveTask(TaskCallback task) { |
| EnsureQueueSpace(); |
| ZX_DEBUG_ASSERT(count_ < kMaxQueued); |
| count_++; |
| uint32_t task_index = (start_ + count_ - 1) % kMaxQueued; |
| ZX_DEBUG_ASSERT(!task_queue_[task_index].has_value()); |
| task_queue_[task_index] = std::move(task); |
| ZX_DEBUG_ASSERT(task_queue_[task_index].has_value()); |
| } |
| |
| bool WorkQueue::IsEmpty() const { |
| return count_ == 0; |
| } |
| |
| void WorkQueue::EnsureQueueSpace() { |
| ZX_DEBUG_ASSERT(count_ <= kMaxQueued); |
| while (count_ == kMaxQueued) { |
| waiting_++; |
| sync_cvar_.Wait(&lock_); |
| waiting_--; |
| } |
| ZX_DEBUG_ASSERT(count_ < kMaxQueued); |
| } |
| |
| void WorkQueue::ProcessLoop() { |
| fbl::AutoLock lock(&lock_); |
| while (true) { |
| while (!IsEmpty()) { |
| ProcessNext(); |
| } |
| |
| if (unmounting_) { |
| // Verify that the queue is empty. |
| ZX_DEBUG_ASSERT(IsEmpty()); |
| break; |
| } |
| |
| if (IsEmpty()) { |
| // If no updates have been queued, wait indefinitely until we are signalled. |
| data_cvar_.Wait(&lock_); |
| } |
| } |
| } |
| |
| int WorkQueue::DataThread(void* arg) { |
| WorkQueue* assigner = static_cast<WorkQueue*>(arg); |
| assigner->ProcessLoop(); |
| return 0; |
| } |
| |
| } // namespace minfs |