blob: 9d609f92c8e8f83bfe0e261b8f3681ed7143a5a3 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <stdio.h>
#include <zircon/syscalls.h>
#include <zircon/types.h>
#include <memory>
#include <io-scheduler/io-scheduler.h>
#include <io-scheduler/worker.h>
namespace ioscheduler {
zx_status_t Worker::Create(Scheduler* sched, uint32_t id, std::unique_ptr<Worker>* out) {
fbl::AllocChecker ac;
std::unique_ptr<Worker> worker(new (&ac) Worker(sched, id));
if (!ac.check()) {
fprintf(stderr, "Failed to allocate worker.\n");
return ZX_ERR_NO_MEMORY;
}
if (thrd_create_with_name(&worker->thread_, worker->ThreadEntry, worker.get(), "io-worker") !=
thrd_success) {
fprintf(stderr, "Failed to create worker thread.\n");
return ZX_ERR_NO_MEMORY;
}
worker->thread_started_ = true;
*out = std::move(worker);
return ZX_OK;
}
Worker::Worker(Scheduler* sched, uint32_t id) : sched_(sched), id_(id) {}
Worker::~Worker() {
if (thread_started_) {
thrd_join(thread_, nullptr);
}
}
int Worker::ThreadEntry(void* arg) {
Worker* w = static_cast<Worker*>(arg);
w->WorkerLoop();
return 0;
}
void Worker::DoAcquire() {
ZX_DEBUG_ASSERT(!input_closed_);
SchedulerClient* client = sched_->client();
const size_t max_ops = 10;
zx_status_t status;
size_t acquire_count = 0;
StreamOp* op_list[max_ops];
status = client->Acquire(op_list, max_ops, &acquire_count, true);
if (status == ZX_ERR_CANCELED) {
// No more ops to read. Drain the streams and exit.
input_closed_ = true;
return;
}
if (status != ZX_OK) {
fprintf(stderr, "ioworker %u: Unexpected return status from Acquire() %d\n", id_, status);
client->Fatal();
input_closed_ = true;
return;
}
// Containerize all ops for safety.
UniqueOp uop_list[max_ops];
for (size_t i = 0; i < acquire_count; i++) {
uop_list[i].set(op_list[i]);
}
// Enqueue ops in the scheduler's priority queue.
size_t num_error = 0;
sched_->Enqueue(uop_list, acquire_count, uop_list, &num_error);
// Any ops remaining in the list have encountered an error and should be released.
for (size_t i = 0; i < num_error; i++) {
client->Release(uop_list[i].release());
}
}
void Worker::ExecuteLoop() {
ZX_DEBUG_ASSERT(!cancelled_);
SchedulerClient* client = sched_->client();
zx_status_t status;
for (;;) {
// Fetch an op.
UniqueOp op;
status = sched_->Dequeue(false, &op);
if (status == ZX_ERR_SHOULD_WAIT) {
// No more ops in scheduler, acquire more.
break;
} else if (status == ZX_ERR_CANCELED) {
// Shutdown initiated.
cancelled_ = true;
break;
}
ZX_DEBUG_ASSERT(status == ZX_OK);
if (op->is_deferred()) {
// Op completion has been deferred. Release it now.
sched_->ReleaseOp(std::move(op));
continue;
}
// Execute it.
status = client->Issue(op.get());
if (status == ZX_ERR_ASYNC) {
// Op queued for async completion. Released when completed.
// Op is retained in stream.
op.release();
continue;
}
if (status != ZX_OK) {
fprintf(stderr, "ioworker %u: Unexpected return status from Issue() %d\n", id_, status);
// Mark op as failed.
op->set_result(ZX_ERR_IO);
}
sched_->ReleaseOp(std::move(op));
}
}
void Worker::WorkerLoop() {
while ((!input_closed_) || (!cancelled_)) {
// Fetch ops from the client.
if (!input_closed_) {
DoAcquire();
}
// Drain the priority queue.
if (!cancelled_) {
ExecuteLoop();
}
}
}
} // namespace ioscheduler