blob: 6400d218adeeddebfb187878d7d1db9c72d6c54f [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "data-assigner.h"
#include "minfs-private.h"
namespace minfs {
DataBlockAssigner::~DataBlockAssigner() {
if (thrd_) {
{
fbl::AutoLock lock(&lock_);
unmounting_ = true;
data_cvar_.Signal();
}
int r;
thrd_join(thrd_.value(), &r);
ZX_DEBUG_ASSERT(r == 0);
}
ZX_DEBUG_ASSERT(IsEmpty());
}
zx_status_t DataBlockAssigner::Create(TransactionalFs* minfs,
fbl::unique_ptr<DataBlockAssigner>* out) {
fbl::unique_ptr<DataBlockAssigner> processor(new DataBlockAssigner(minfs));
processor->thrd_ = std::make_optional<thrd_t>();
if (thrd_create_with_name(&processor->thrd_.value(), DataThread, processor.get(),
"minfs-data-async") != thrd_success) {
return ZX_ERR_NO_RESOURCES;
}
*out = std::move(processor);
return ZX_OK;
}
void DataBlockAssigner::EnqueueCallback(TaskCallback task) {
fbl::AutoLock lock(&lock_);
ReserveTask(std::move(task));
data_cvar_.Signal();
}
bool DataBlockAssigner::TasksWaiting() const {
fbl::AutoLock lock(&lock_);
return waiting_ > 0;
}
void DataBlockAssigner::ProcessNext() {
ZX_DEBUG_ASSERT(!IsEmpty());
std::optional<TaskCallback> task;
task_queue_[start_].swap(task);
ZX_DEBUG_ASSERT(task.has_value());
lock_.Release();
(*task)(minfs_);
task = nullptr;
lock_.Acquire();
if (waiting_ > 0) {
sync_cvar_.Signal();
}
// Update the queue.
start_ = (start_ + 1) % kMaxQueued;
count_--;
}
void DataBlockAssigner::ReserveTask(TaskCallback task) {
EnsureQueueSpace();
ZX_DEBUG_ASSERT(count_ < kMaxQueued);
count_++;
uint32_t task_index = (start_ + count_ - 1) % kMaxQueued;
ZX_DEBUG_ASSERT(!task_queue_[task_index].has_value());
task_queue_[task_index] = std::move(task);
ZX_DEBUG_ASSERT(task_queue_[task_index].has_value());
}
bool DataBlockAssigner::IsEmpty() const {
return count_ == 0;
}
void DataBlockAssigner::EnsureQueueSpace() {
ZX_DEBUG_ASSERT(count_ <= kMaxQueued);
while (count_ == kMaxQueued) {
waiting_++;
sync_cvar_.Wait(&lock_);
waiting_--;
}
ZX_DEBUG_ASSERT(count_ < kMaxQueued);
}
void DataBlockAssigner::ProcessLoop() {
fbl::AutoLock lock(&lock_);
while (true) {
while (!IsEmpty()) {
ProcessNext();
}
if (unmounting_) {
// Verify that the queue is empty.
ZX_DEBUG_ASSERT(IsEmpty());
break;
}
if (IsEmpty()) {
// If no updates have been queued, wait indefinitely until we are signalled.
data_cvar_.Wait(&lock_);
}
}
}
int DataBlockAssigner::DataThread(void* arg) {
DataBlockAssigner* assigner = reinterpret_cast<DataBlockAssigner*>(arg);
assigner->ProcessLoop();
return 0;
}
} // namespace minfs