blob: 4289b2b19b0826b14f6e60d0a76dd854b8f354af [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "dispatcher.h"
#include <fidl/fuchsia.scheduler/cpp/wire.h>
#include <lib/async/dispatcher.h>
#include <lib/async/irq.h>
#include <lib/async/receiver.h>
#include <lib/async/sequence_id.h>
#include <lib/async/task.h>
#include <lib/async/trap.h>
#include <lib/component/incoming/cpp/protocol.h>
#include <lib/fdf/dispatcher.h>
#include <lib/fit/defer.h>
#include <lib/trace/event.h>
#include <lib/zx/clock.h>
#include <lib/zx/thread.h>
#include <stdlib.h>
#include <threads.h>
#include <zircon/assert.h>
#include <zircon/errors.h>
#include <zircon/listnode.h>
#include <zircon/status.h>
#include <zircon/syscalls.h>
#include <memory>
#include <string>
#include <fbl/auto_lock.h>
#include <fbl/intrusive_double_list.h>
#include <fbl/ref_ptr.h>
#include "src/devices/bin/driver_runtime/callback_request.h"
#include "src/devices/bin/driver_runtime/driver_context.h"
#include "src/devices/lib/log/log.h"
namespace driver_runtime {
namespace {
const async_ops_t g_dispatcher_ops = {
.version = ASYNC_OPS_V3,
.reserved = 0,
.v1 = {
.now =
[](async_dispatcher_t* dispatcher) {
return static_cast<Dispatcher*>(dispatcher)->GetTime();
},
.begin_wait =
[](async_dispatcher_t* dispatcher, async_wait_t* wait) {
return static_cast<Dispatcher*>(dispatcher)->BeginWait(wait);
},
.cancel_wait =
[](async_dispatcher_t* dispatcher, async_wait_t* wait) {
return static_cast<Dispatcher*>(dispatcher)->CancelWait(wait);
},
.post_task =
[](async_dispatcher_t* dispatcher, async_task_t* task) {
return static_cast<Dispatcher*>(dispatcher)->PostTask(task);
},
.cancel_task =
[](async_dispatcher_t* dispatcher, async_task_t* task) {
return static_cast<Dispatcher*>(dispatcher)->CancelTask(task);
},
.queue_packet =
[](async_dispatcher_t* dispatcher, async_receiver_t* receiver,
const zx_packet_user_t* data) {
return static_cast<Dispatcher*>(dispatcher)->QueuePacket(receiver, data);
},
.set_guest_bell_trap = [](async_dispatcher_t* dispatcher, async_guest_bell_trap_t* trap,
zx_handle_t guest, zx_vaddr_t addr,
size_t length) { return ZX_ERR_NOT_SUPPORTED; },
},
.v2 = {
.bind_irq =
[](async_dispatcher_t* dispatcher, async_irq_t* irq) {
return static_cast<Dispatcher*>(dispatcher)->BindIrq(irq);
},
.unbind_irq =
[](async_dispatcher_t* dispatcher, async_irq_t* irq) {
return static_cast<Dispatcher*>(dispatcher)->UnbindIrq(irq);
},
.create_paged_vmo = [](async_dispatcher_t* dispatcher, async_paged_vmo_t* paged_vmo,
uint32_t options, zx_handle_t pager, uint64_t vmo_size,
zx_handle_t* vmo_out) { return ZX_ERR_NOT_SUPPORTED; },
.detach_paged_vmo = [](async_dispatcher_t* dispatcher,
async_paged_vmo_t* paged_vmo) { return ZX_ERR_NOT_SUPPORTED; },
},
.v3 = {
.get_sequence_id =
[](async_dispatcher_t* dispatcher, async_sequence_id_t* out_sequence_id,
const char** out_error) {
return static_cast<Dispatcher*>(dispatcher)
->GetSequenceId(out_sequence_id, out_error);
},
.check_sequence_id =
[](async_dispatcher_t* dispatcher, async_sequence_id_t sequence_id,
const char** out_error) {
return static_cast<Dispatcher*>(dispatcher)->CheckSequenceId(sequence_id, out_error);
},
},
};
} // namespace
DispatcherCoordinator& GetDispatcherCoordinator() {
static DispatcherCoordinator shared_loop;
return shared_loop;
}
Dispatcher::AsyncWait::AsyncWait(async_wait_t* original_wait, Dispatcher& dispatcher)
: CallbackRequest(CallbackRequest::RequestType::kWait),
async_wait_t{{ASYNC_STATE_INIT},
&Dispatcher::AsyncWait::Handler,
original_wait->object,
original_wait->trigger,
0},
original_wait_(original_wait) {
// Use one of the async_wait_t's reserved fields to stash a pointer to the AsyncWait object.
original_wait_->state.reserved[0] = reinterpret_cast<uintptr_t>(this);
auto async_dispatcher = dispatcher.GetAsyncDispatcher();
driver_runtime::Callback callback =
[this, async_dispatcher](std::unique_ptr<driver_runtime::CallbackRequest> callback_request,
zx_status_t status) {
// Clear the pointer to the AsyncWait object.
original_wait_->state.reserved[0] = 0;
zx_packet_signal_t* signal_packet = signal_packet_ ? &signal_packet_.value() : nullptr;
original_wait_->handler(async_dispatcher, original_wait_, status, signal_packet);
};
// Note that this callback is called *after* |OnSignal|, which is the immediate callback that is
// invoked when the async wait is signaled.
SetCallback(static_cast<fdf_dispatcher_t*>(&dispatcher), std::move(callback), original_wait_);
}
Dispatcher::AsyncWait::~AsyncWait() {
// This shouldn't destruct until the wait was canceled or it has been completed.
ZX_ASSERT(dispatcher_ref_ == nullptr);
}
// static
zx_status_t Dispatcher::AsyncWait::BeginWait(std::unique_ptr<AsyncWait> wait,
Dispatcher& dispatcher) {
// Purposefully create a cycle which is broken in Cancel or OnSignal.
// This needs to be done ahead of starting the async wait in case another thread on the dispatcher
// signals the dispatcher.
auto dispatcher_ref = fbl::RefPtr(&dispatcher);
wait->dispatcher_ref_ = fbl::ExportToRawPtr(&dispatcher_ref);
auto* wait_ref = wait.get();
dispatcher.AddWaitLocked(std::move(wait));
zx_status_t status = async_begin_wait(dispatcher.process_shared_dispatcher_, wait_ref);
if (status != ZX_OK) {
dispatcher.RemoveWaitLocked(wait_ref);
fbl::ImportFromRawPtr(wait_ref->dispatcher_ref_.exchange(nullptr));
return status;
}
return ZX_OK;
}
bool Dispatcher::AsyncWait::Cancel() {
// We do a load here rather than an exchange as OnSignal may still be triggered and we need to
// avoid preventing it from accessing the |dispatcher_ref_|.
auto* dispatcher_ref = dispatcher_ref_.load();
if (dispatcher_ref == nullptr) {
// OnSignal was triggered in another thread.
return false;
}
auto dispatcher = fbl::RefPtr(dispatcher_ref);
auto status = async_cancel_wait(dispatcher->process_shared_dispatcher_, this);
if (status != ZX_OK) {
// OnSignal was triggered in another thread, or is about to be.
ZX_DEBUG_ASSERT(status == ZX_ERR_NOT_FOUND);
return false;
}
// It is now safe to recover the dispatcher reference.
dispatcher_ref = dispatcher_ref_.exchange(nullptr);
ZX_DEBUG_ASSERT(dispatcher_ref != nullptr);
fbl::ImportFromRawPtr(dispatcher_ref);
return true;
}
// static
void Dispatcher::AsyncWait::Handler(async_dispatcher_t* dispatcher, async_wait_t* wait,
zx_status_t status, const zx_packet_signal_t* signal) {
static_cast<AsyncWait*>(wait)->OnSignal(dispatcher, status, signal);
}
void Dispatcher::AsyncWait::OnSignal(async_dispatcher_t* async_dispatcher, zx_status_t status,
const zx_packet_signal_t* signal) {
auto* dispatcher_ref = dispatcher_ref_.exchange(nullptr);
ZX_DEBUG_ASSERT(dispatcher_ref != nullptr);
auto dispatcher = fbl::ImportFromRawPtr(dispatcher_ref);
if (signal) {
signal_packet_ = *signal;
} else {
signal_packet_ = std::nullopt;
}
dispatcher->QueueWait(this, status);
dispatcher->thread_pool()->OnThreadWakeup();
}
Dispatcher::AsyncIrq::AsyncIrq(async_irq_t* original_irq, Dispatcher& dispatcher)
: async_irq_t{{ASYNC_STATE_INIT}, &Dispatcher::AsyncIrq::Handler, original_irq->object},
original_irq_(original_irq) {
// Store a pointer to our IRQ wrapper, so |UnbindIrq| can back map from the user's IRQ object.
original_irq_->state.reserved[0] = reinterpret_cast<uintptr_t>(this);
}
Dispatcher::AsyncIrq::~AsyncIrq() {
// This shouldn't destruct until after the irq has been unbound, either by the user or
// |ShutdownAsync|.
ZX_ASSERT(dispatcher_ == nullptr);
}
// static
zx_status_t Dispatcher::AsyncIrq::Bind(std::unique_ptr<AsyncIrq> irq, Dispatcher& dispatcher) {
// The AsyncIrq will hold the dispatcher reference until the irq is unbound.
irq->SetDispatcherRef(fbl::RefPtr(&dispatcher));
auto* irq_ref = irq.get();
dispatcher.AddIrqLocked(std::move(irq));
zx_status_t status = async_bind_irq(dispatcher.process_shared_dispatcher_, irq_ref);
if (status != ZX_OK) {
ZX_ASSERT(dispatcher.RemoveIrqLocked(irq_ref) != nullptr);
irq->SetDispatcherRef(nullptr);
return status;
}
return ZX_OK;
}
bool Dispatcher::AsyncIrq::Unbind() {
auto dispatcher = GetDispatcherRef();
if (!dispatcher) {
return false;
}
auto status = async_unbind_irq(dispatcher->process_shared_dispatcher_, this);
if (status != ZX_OK) {
return false;
}
SetDispatcherRef(nullptr);
original_irq_->state.reserved[0] = 0;
return true;
}
std::unique_ptr<driver_runtime::CallbackRequest> Dispatcher::AsyncIrq::CreateCallbackRequest(
Dispatcher& dispatcher) {
auto async_dispatcher = dispatcher.GetAsyncDispatcher();
// TODO(https://fxbug.dev/42052990): We should consider something more efficient than creating a
// callback request each time the irq is triggered. This is complex due to an AsyncIrq not having
// a 1:1 mapping to interrupt callbacks, and we cannot easily return ownership of a
// |CallbackRequest| after dispatching it. See https://fxbug.dev/42052990 for a longer
// explanation.
auto callback_request =
std::make_unique<driver_runtime::CallbackRequest>(CallbackRequest::RequestType::kIrq);
driver_runtime::Callback callback =
[this, async_dispatcher](std::unique_ptr<driver_runtime::CallbackRequest> callback_request,
zx_status_t status) {
// We should not clear the reserved state, as this AsyncIrq object is still bound for
// future interrupts.
original_irq_->handler(async_dispatcher, original_irq_, status, &interrupt_packet_);
};
callback_request->SetCallback(static_cast<fdf_dispatcher_t*>(&dispatcher), std::move(callback),
this);
return callback_request;
}
// static
void Dispatcher::AsyncIrq::Handler(async_dispatcher_t* dispatcher, async_irq_t* irq,
zx_status_t status, const zx_packet_interrupt_t* packet) {
static_cast<AsyncIrq*>(irq)->OnSignal(dispatcher, status, packet);
}
void Dispatcher::AsyncIrq::OnSignal(async_dispatcher_t* global_dispatcher, zx_status_t status,
const zx_packet_interrupt_t* packet) {
fbl::RefPtr<Dispatcher> dispatcher = GetDispatcherRef();
// This may be cleared if the irq has already been unbound, but this irq packet was already pulled
// from the port. If so, we should not deliver the irq to the user.
if (!dispatcher) {
return;
}
interrupt_packet_ = *packet;
// We do not hold the irq lock before calling |QueueIrq|, as it would cause
// incorrect lock ordering.
dispatcher->QueueIrq(this, status);
dispatcher->thread_pool()->OnThreadWakeup();
}
Dispatcher::Dispatcher(uint32_t options, std::string_view name, bool unsynchronized,
bool allow_sync_calls, const void* owner, ThreadPool* thread_pool,
async_dispatcher_t* process_shared_dispatcher,
fdf_dispatcher_shutdown_observer_t* observer)
: async_dispatcher_t{&g_dispatcher_ops},
options_(options),
unsynchronized_(unsynchronized),
allow_sync_calls_(allow_sync_calls),
owner_(owner),
thread_pool_(thread_pool),
process_shared_dispatcher_(process_shared_dispatcher),
timer_(this),
shutdown_observer_(observer) {
name_.Append(name);
}
// static
zx_status_t Dispatcher::CreateWithAdder(uint32_t options, std::string_view name,
std::string_view scheduler_role, const void* owner,
ThreadPool* thread_pool,
async_dispatcher_t* parent_dispatcher, ThreadAdder adder,
fdf_dispatcher_shutdown_observer_t* observer,
Dispatcher** out_dispatcher) {
ZX_DEBUG_ASSERT(out_dispatcher);
bool unsynchronized = options & FDF_DISPATCHER_OPTION_UNSYNCHRONIZED;
bool allow_sync_calls = options & FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS;
if (unsynchronized && allow_sync_calls) {
return ZX_ERR_NOT_SUPPORTED;
}
if (!owner) {
return ZX_ERR_INVALID_ARGS;
}
if (allow_sync_calls) {
zx_status_t status = adder();
if (status != ZX_OK) {
return status;
}
}
auto dispatcher =
fbl::MakeRefCounted<Dispatcher>(options, name, unsynchronized, allow_sync_calls, owner,
thread_pool, parent_dispatcher, observer);
zx::event event;
if (zx_status_t status = zx::event::create(0, &event); status != ZX_OK) {
return status;
}
auto self = dispatcher.get();
auto event_waiter = std::make_unique<EventWaiter>(
std::move(event),
[self](std::unique_ptr<EventWaiter> event_waiter, fbl::RefPtr<Dispatcher> dispatcher_ref) {
auto ref = dispatcher_ref;
self->DispatchCallbacks(std::move(event_waiter), std::move(dispatcher_ref));
ref->thread_pool()->OnThreadWakeup();
});
dispatcher->SetEventWaiter(event_waiter.get());
zx_status_t status = EventWaiter::BeginWaitWithRef(std::move(event_waiter), dispatcher);
if (status == ZX_ERR_BAD_STATE) {
dispatcher->SetEventWaiter(nullptr);
return status;
}
if (scheduler_role != ThreadPool::kNoSchedulerRole) {
if (thread_pool->num_dispatchers() == 0) {
// Each thread in the thread pool will check whether it needs to set the scheduler
// role when it wakes up. If this is the first dispatcher, we might as well
// post a task so we can get the scheduler role set ASAP. Otherwise, if there
// are multiple dispatchers and threads, it becomes less likely that we would
// happen to post the task to a thread that doesn't already have the scheduler role set,
// so we'll leave any unset thread to set it's role on next wakeup.
status = async::PostTask(thread_pool->loop()->dispatcher(), [dispatcher]() mutable {
// This task is intentionally empty, the actual work takes place in the async loop's
// prologue function. See |Dispatcher::ThreadPool::ThreadWakeupPrologue|.
});
if (status != ZX_OK) {
// Posting a task would only fail if global loop (and probably process) was shutting down.
LOGF(ERROR, "Failed to post task to set scheduler role");
return status;
}
}
}
// This may fail if the entire driver is being shut down by the driver host.
status = GetDispatcherCoordinator().AddDispatcher(dispatcher);
if (status != ZX_OK) {
dispatcher->SetEventWaiter(nullptr);
return status;
}
// This reference will be recovered in |Destroy|.
*out_dispatcher = fbl::ExportToRawPtr(&dispatcher);
return ZX_OK;
}
// fdf_dispatcher_t implementation
// static
zx_status_t Dispatcher::Create(uint32_t options, std::string_view name,
std::string_view scheduler_role,
fdf_dispatcher_shutdown_observer_t* observer,
Dispatcher** out_dispatcher) {
auto thread_pool = GetDispatcherCoordinator().default_thread_pool();
if (scheduler_role != ThreadPool::kNoSchedulerRole) {
auto result = GetDispatcherCoordinator().GetOrCreateThreadPool(scheduler_role);
if (result.is_error()) {
return result.status_value();
}
thread_pool = *result;
}
return CreateWithAdder(
options, name, scheduler_role, driver_context::GetCurrentDriver(), thread_pool,
thread_pool->loop()->dispatcher(), [thread_pool]() { return thread_pool->AddThread(); },
observer, out_dispatcher);
}
zx_status_t Dispatcher::CreateUnmanagedDispatcher(
uint32_t options, std::string_view name, fdf_dispatcher_shutdown_observer_t* shutdown_observer,
Dispatcher** out_dispatcher) {
auto unmanaged_thread_pool = GetDispatcherCoordinator().GetOrCreateUnmanagedThreadPool();
return CreateWithAdder(
options, name, ThreadPool::kNoSchedulerRole, driver_context::GetCurrentDriver(),
unmanaged_thread_pool, unmanaged_thread_pool->loop()->dispatcher(),
[unmanaged_thread_pool]() { return unmanaged_thread_pool->AddThread(); }, shutdown_observer,
out_dispatcher);
}
// static
Dispatcher* Dispatcher::DowncastAsyncDispatcher(async_dispatcher_t* dispatcher) {
auto ret = static_cast<Dispatcher*>(dispatcher);
ret->canary_.Assert();
return ret;
}
async_dispatcher_t* Dispatcher::GetAsyncDispatcher() {
// Note: We inherit from async_t so we can upcast to it.
return static_cast<async_dispatcher_t*>(this);
}
void Dispatcher::ShutdownAsync() {
{
fbl::AutoLock lock(&callback_lock_);
switch (state_) {
case DispatcherState::kRunning:
state_ = DispatcherState::kShuttingDown;
break;
case DispatcherState::kShuttingDown:
case DispatcherState::kShutdown:
case DispatcherState::kDestroyed:
return;
default:
ZX_ASSERT_MSG(false, "Dispatcher::ShutdownAsync got unknown dispatcher state %d",
static_cast<int>(state_));
}
// Move the requests into a separate queue so we will be able to enter an idle state.
// This queue will be processed by |CompleteShutdown|.
shutdown_queue_ = std::move(callback_queue_);
shutdown_queue_.splice(shutdown_queue_.end(), registered_callbacks_);
// Try to cancel all outstanding waits. Successfully canceled waits should be have their
// callbacks triggered.
auto waits = std::move(waits_);
for (auto wait = waits.pop_front(); wait; wait = waits.pop_front()) {
// It's possible that the wait has already been cancelled but not yet pulled
// from the |waits_| list, in which case the user may have already freed
// the handle they were waiting on, so we should not try to cancel it again.
if (!wait->is_pending_cancellation() && wait->Cancel()) {
// We were successful. Lets queue this up to be processed by |CompleteDestroy|.
shutdown_queue_.push_back(std::move(wait));
} else {
// We weren't successful, |wait| is being run or queued to run and will want to remove this
// from the |waits_| list.
waits_.push_back(std::move(wait));
}
}
// It's easier to handle |irqs_| in |CompleteShutdown|, so unbinding will only
// ever happen on a thread at once. If the irq gets triggered in the meanwhile,
// |QueueIrq| will return early.
zx_status_t status = timer_.Cancel();
// If we could not cancel the timer, it is going to run / is already running in another
// thread, and we don't want |CompleteShutdown| to run until after that completes.
if (status != ZX_OK) {
shutdown_waiting_for_timer_ = true;
}
shutdown_queue_.splice(shutdown_queue_.end(), delayed_tasks_);
// To avoid race conditions with attempting to cancel a wait that might be scheduled to
// run, we will cancel the event waiter in the |CompleteShutdown| callback. This is as
// |async::Wait::Cancel| is not thread safe.
}
auto dispatcher_ref = fbl::RefPtr<Dispatcher>(this);
// The dispatcher shutdown API specifies that on shutdown, tasks and cancellation
// callbacks should run serialized. Wait for all active threads to
// complete before calling the cancellation callbacks.
auto event = RegisterForCompleteShutdownEvent();
ZX_ASSERT(event.status_value() == ZX_OK);
// Don't use async::WaitOnce as it sets the handler in a thread unsafe way.
auto wait = std::make_unique<async::Wait>(
event->get(), ZX_EVENT_SIGNALED, 0,
[dispatcher_ref = std::move(dispatcher_ref), event = std::move(*event)](
async_dispatcher_t* dispatcher, async::Wait* wait, zx_status_t status,
const zx_packet_signal_t* signal) mutable {
ZX_ASSERT(status == ZX_OK || status == ZX_ERR_CANCELED);
dispatcher_ref->CompleteShutdown();
delete wait;
});
ZX_ASSERT(wait->Begin(process_shared_dispatcher_) == ZX_OK);
wait.release(); // This will be deleted by the wait handler once it is called.
}
void Dispatcher::CompleteShutdown() {
fbl::DoublyLinkedList<std::unique_ptr<AsyncIrq>> unbound_irqs;
std::unordered_set<fdf_token_t*> registered_tokens;
{
fbl::AutoLock lock(&callback_lock_);
ZX_ASSERT(state_ == DispatcherState::kShuttingDown);
ZX_ASSERT_MSG(num_active_threads_ == 0, "CompleteShutdown called but there are active threads");
ZX_ASSERT_MSG(callback_queue_.is_empty(),
"CompleteShutdown called but callback queue has %lu items",
callback_queue_.size_slow());
ZX_ASSERT_MSG((!event_waiter_ || !event_waiter_->signaled()),
"CompleteShutdown called but event waiter is still signaled");
ZX_ASSERT(IsIdleLocked());
ZX_ASSERT_MSG(!HasFutureOpsScheduledLocked(),
"CompleteShutdown called but future ops are scheduled");
if (event_waiter_) {
// Since the event waiter holds a reference to the dispatcher,
// we need to cancel it to reclaim it.
// This should always succeed, as there should be no other threads processing
// tasks for this dispatcher, and we should have cleared |event_waiter_| if
// the AsyncLoopOwned event waiter was dropped.
ZX_ASSERT(event_waiter_->Cancel() != nullptr);
event_waiter_ = nullptr;
}
unbound_irqs = std::move(irqs_);
for (auto& irq : unbound_irqs) {
// It's possible that a callback request may be queued for a triggered irq.
// We should only queue an additional cancellation callback if one does not
// already exist.
auto iter =
shutdown_queue_.find_if([operation = &irq](const CallbackRequest& callback_request) {
return callback_request.holds_async_operation(operation);
});
if (iter == shutdown_queue_.end()) {
auto callback_request = irq.CreateCallbackRequest(*this);
shutdown_queue_.push_back(std::move(callback_request));
}
// If the irq is still in the list, unbinding shouldn't fail.
// The only case would be if the async loop is also shutting down,
// but we shouldn't do that before all the driver dispatchers have completed shutdown.
ZX_ASSERT_MSG(irq.Unbind(), "Dispatcher::ShutdownAsync failed to unbind irq");
}
registered_tokens = std::move(registered_tokens_);
}
for (auto irq = unbound_irqs.pop_front(); irq; irq = unbound_irqs.pop_front()) {
// Though the irq has been unbound, it's possible that another |process_shared_dispatcher|
// thread has already pulled an irq packet from the port and may attempt to call the irq
// handler. Delay destroying our irq wrapper for a bit in case this race condition happens.
thread_pool_->CacheUnboundIrq(std::move(irq));
}
// We want |fdf_dispatcher_get_current_dispatcher| to work in cancellation and shutdown callbacks.
driver_context::PushDriver(owner_, this);
auto pop_driver = fit::defer([]() { driver_context::PopDriver(); });
// We remove one item at a time from the shutdown queue, in case someone
// tries to cancel a wait (which has not been canceled yet) from within a
// canceled callback. We don't use fbl::AutoLock as we want to be able to
// release and re-acquire the lock in the loop.
callback_lock_.Acquire();
while (!shutdown_queue_.is_empty()) {
auto callback_request = shutdown_queue_.pop_front();
ZX_ASSERT(callback_request);
// Call the callbacks outside the lock.
callback_lock_.Release();
callback_request->Call(std::move(callback_request), ZX_ERR_CANCELED);
callback_lock_.Acquire();
}
callback_lock_.Release();
for (auto token : registered_tokens) {
token->handler(static_cast<fdf_dispatcher_t*>(this), token, ZX_ERR_CANCELED,
FDF_HANDLE_INVALID);
}
fdf_dispatcher_shutdown_observer_t* shutdown_observer = nullptr;
{
fbl::AutoLock lock(&callback_lock_);
state_ = DispatcherState::kShutdown;
shutdown_observer = shutdown_observer_;
}
GetDispatcherCoordinator().NotifyDispatcherShutdown(*this, std::move(shutdown_observer));
}
void Dispatcher::Destroy() {
{
fbl::AutoLock lock(&callback_lock_);
ZX_ASSERT(state_ == DispatcherState::kShutdown);
state_ = DispatcherState::kDestroyed;
}
// Recover the reference created in |CreateWithAdder|.
auto dispatcher_ref = fbl::ImportFromRawPtr(this);
GetDispatcherCoordinator().RemoveDispatcher(*this);
}
zx_status_t Dispatcher::Seal(uint32_t option) {
if (option != FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS) {
return ZX_ERR_INVALID_ARGS;
}
{
fbl::AutoLock lock(&callback_lock_);
if (driver_context::GetCurrentDispatcher() != this || !IsRunningLocked()) {
return ZX_ERR_BAD_STATE;
}
if (!allow_sync_calls_) {
return ZX_ERR_ALREADY_EXISTS;
}
// Set our field.
allow_sync_calls_ = false;
}
// Tell our thread pool to remove a thread as we no longer have allow_sync_calls_, which caused
// an extra thread to be added when this dispatcher was initially created.
return thread_pool()->RemoveThread();
}
// async_dispatcher_t implementation
zx_time_t Dispatcher::GetTime() { return zx_clock_get_monotonic(); }
zx_status_t Dispatcher::BeginWait(async_wait_t* wait) {
fbl::AutoLock lock(&callback_lock_);
if (!IsRunningLocked()) {
return ZX_ERR_BAD_STATE;
}
// TODO(92740): we should do something more efficient rather than creating a new
// AsyncWait each time.
auto async_wait = std::make_unique<AsyncWait>(wait, *this);
return AsyncWait::BeginWait(std::move(async_wait), *this);
}
zx_status_t Dispatcher::CancelWait(async_wait_t* wait) {
// The implementation of this method has to be more complicated than simply
//
// return async_cancel_wait(wait);
//
// because the dispatcher wraps the wait's callback with its own custom callback,
// |OnSignal|, so there is an interval between the wait being pulled off the port and the wait's
// callback being invoked, during which we need to implement custom logic to cancel the wait.
// First, try to cancel the async wait from the shared dispatcher.
auto* async_wait = reinterpret_cast<AsyncWait*>(wait->state.reserved[0]);
if (async_wait != nullptr) {
if (async_wait->Cancel()) {
// We shouldn't have to worry about racing anyone if cancelation was successful.
ZX_ASSERT(RemoveWait(async_wait) != nullptr);
return ZX_OK;
}
// async_wait->Cancel() will fail in the case that the wait has already been pulled off the
// port.
}
// Second, try to cancel it from the callback queue.
fbl::AutoLock lock(&callback_lock_);
auto callback_request = CancelAsyncOperationLocked(wait);
if (callback_request) {
return ZX_OK;
} else if (unsynchronized()) {
return ZX_ERR_NOT_FOUND;
} else {
// The async_wait is set to null right before the callback is invoked, so if it is null it's too
// late to cancel. If the caller of |CancelWait| is not a dispatcher-managed thread then we
// can't guarantee the dispatcher isn't currently invoking the callback.
if (async_wait == nullptr || driver_context::GetCurrentDispatcher() != this) {
return ZX_ERR_NOT_FOUND;
}
// If we failed to cancel it from the callback queue and we are a synchronized dispatcher,
// then another thread must have pulled the packet from the port and is about to queue the
// callback (i.e., it is sitting in |OnSignal| right before |QueueWait|). We mark the wait as
// pending cancellation so that it is cancelled rather than invoked when |QueueWait| is called.
async_wait->MarkPendingCancellation();
return ZX_OK;
}
}
zx::time Dispatcher::GetNextTimeoutLocked() const {
// Check delayed tasks only when callback_queue_ is empty. We will routinely check if delayed
// tasks can be moved into the callback queue anyways and reset the timer whenever callback queue
// is empty.
if (callback_queue_.is_empty()) {
if (delayed_tasks_.is_empty()) {
return zx::time::infinite();
}
return static_cast<const DelayedTask*>(&delayed_tasks_.front())->deadline;
}
return zx::time::infinite();
}
void Dispatcher::ResetTimerLocked() {
zx::time deadline = GetNextTimeoutLocked();
if (deadline == zx::time::infinite()) {
// Nothing is left on the queue to fire.
timer_.Cancel();
return;
}
// The tradeoff of using a task instead of a dedicated timer is that we need to cancel the task
// every time a task with a shorter deadline comes in. This isn't really too bad, assuming there
// is at least two delayed tasks scheduled, otherwise the timer will be canceled. If we used a
// custom implementation for our shared process loop, then we could also have an
// "UpdateTaskDeadline" method on tasks which would allow us to shift the deadline as necessary,
// without risking the need to cancel the task.
if (timer_.current_deadline() > deadline && timer_.Cancel() == ZX_OK) {
timer_.BeginWait(deadline);
}
}
void Dispatcher::InsertDelayedTaskSortedLocked(std::unique_ptr<DelayedTask> task) {
// Find the first node that is bigger and insert before it. fbl::DoublyLinkedList handles all of
// the edge cases for us.
auto iter = delayed_tasks_.find_if([&](const CallbackRequest& other) {
return static_cast<const DelayedTask*>(&other)->deadline > task->deadline;
});
delayed_tasks_.insert(iter, std::move(task));
}
void Dispatcher::CheckDelayedTasksLocked() {
if (!IsRunningLocked()) {
IdleCheckLocked();
return;
}
zx::time now = zx::clock::get_monotonic();
auto iter = delayed_tasks_.find_if([&](const CallbackRequest& task) {
return static_cast<const DelayedTask*>(&task)->deadline > now;
});
if (iter != delayed_tasks_.begin()) {
fbl::DoublyLinkedList<std::unique_ptr<CallbackRequest>> done_tasks;
done_tasks = delayed_tasks_.split_after(--iter);
// split_after removes the tasks which are *not* done, so we must swap the lists to get desired
// result.
std::swap(delayed_tasks_, done_tasks);
callback_queue_.splice(callback_queue_.end(), done_tasks);
if (event_waiter_ && !event_waiter_->signaled()) {
event_waiter_->signal();
}
} else {
ResetTimerLocked();
}
}
void Dispatcher::Timer::Handler() {
{
fbl::AutoLock al(&dispatcher_->callback_lock_);
current_deadline_ = zx::time::infinite();
dispatcher_->CheckDelayedTasksLocked();
}
dispatcher_->thread_pool()->OnThreadWakeup();
{
fbl::AutoLock lock(&dispatcher_->callback_lock_);
// Check if the dispatcher is shutting down and waiting for the handler to complete.
if (!dispatcher_->IsRunningLocked()) {
dispatcher_->shutdown_waiting_for_timer_ = false;
dispatcher_->IdleCheckLocked();
}
}
}
zx_status_t Dispatcher::PostTask(async_task_t* task) {
driver_runtime::Callback callback =
[this, task](std::unique_ptr<driver_runtime::CallbackRequest> callback_request,
zx_status_t status) { task->handler(this, task, status); };
const zx::time now = zx::clock::get_monotonic();
if (zx::time(task->deadline) <= now) {
// TODO(92740): we should do something more efficient rather than creating a new
// callback request each time.
auto callback_request =
std::make_unique<driver_runtime::CallbackRequest>(CallbackRequest::RequestType::kTask);
callback_request->SetCallback(static_cast<fdf_dispatcher_t*>(this), std::move(callback), task);
CallbackRequest* callback_ptr = callback_request.get();
callback_request = RegisterCallbackWithoutQueueing(std::move(callback_request));
// Dispatcher returned callback request as queueing failed.
if (callback_request) {
return ZX_ERR_BAD_STATE;
}
QueueRegisteredCallback(callback_ptr, ZX_OK);
} else {
if (task->deadline == ZX_TIME_INFINITE) {
// Tasks must complete.
return ZX_ERR_INVALID_ARGS;
}
auto delayed_task = std::make_unique<DelayedTask>(zx::time(task->deadline));
delayed_task->SetCallback(static_cast<fdf_dispatcher_t*>(this), std::move(callback), task);
fbl::AutoLock al(&callback_lock_);
InsertDelayedTaskSortedLocked(std::move(delayed_task));
ResetTimerLocked();
}
return ZX_OK;
}
zx_status_t Dispatcher::CancelTask(async_task_t* task) {
fbl::AutoLock lock(&callback_lock_);
auto callback_request = CancelAsyncOperationLocked(task);
return callback_request ? ZX_OK : ZX_ERR_NOT_FOUND;
}
zx_status_t Dispatcher::QueuePacket(async_receiver_t* receiver, const zx_packet_user_t* data) {
fbl::AutoLock lock(&callback_lock_);
if (!IsRunningLocked()) {
return ZX_ERR_BAD_STATE;
}
return async_queue_packet(process_shared_dispatcher_, receiver, data);
}
zx_status_t Dispatcher::BindIrq(async_irq_t* irq) {
if (unsynchronized()) {
// TODO(https://fxbug.dev/42052791): support interrupts on unsynchronized dispatchers.
return ZX_ERR_NOT_SUPPORTED;
}
fbl::AutoLock lock(&callback_lock_);
if (!IsRunningLocked()) {
return ZX_ERR_BAD_STATE;
}
auto async_irq = std::make_unique<AsyncIrq>(irq, *this);
return AsyncIrq::Bind(std::move(async_irq), *this);
}
zx_status_t Dispatcher::UnbindIrq(async_irq_t* irq) {
if (unsynchronized()) {
// TODO(https://fxbug.dev/42052791): support interrupts on unsynchronized dispatchers.
return ZX_ERR_NOT_SUPPORTED;
}
auto* async_irq = reinterpret_cast<AsyncIrq*>(irq->state.reserved[0]);
if (!async_irq) {
return ZX_ERR_NOT_FOUND;
}
// Check that the irq is unbound from the same dispatcher it was bound to.
auto cur_dispatcher = driver_context::GetCurrentDispatcher();
if (!cur_dispatcher || (cur_dispatcher != async_irq->GetDispatcherRef().get())) {
return ZX_ERR_BAD_STATE;
}
std::unique_ptr<AsyncIrq> unbound_irq;
{
// The |callback_lock_| must be held across clearing the |dispatcher_ref_| in
// the irq, and removing any queued callback request for the irq.
fbl::AutoLock lock(&callback_lock_);
if (!async_irq->Unbind()) {
return ZX_ERR_NOT_FOUND;
}
unbound_irq = RemoveIrqLocked(async_irq);
ZX_ASSERT(unbound_irq != nullptr);
// If the irq has been triggered, there may be a callback request queued.
CancelAsyncOperationLocked(async_irq);
}
// Though the irq has been unbound, it's possible that another |process_shared_dispatcher|
// thread has already pulled an irq packet from the port and may attempt to call the irq
// handler. Delay destroying our irq wrapper for a bit in case this race condition happens.
thread_pool_->CacheUnboundIrq(std::move(unbound_irq));
return ZX_OK;
}
namespace {
const char kSequenceIdWrongDispatcherType[] =
"A synchronized fdf_dispatcher_t is required. "
"Ensure the fdf_dispatcher_t does not have the |FDF_DISPATCHER_OPTION_UNSYNCHRONIZED| option.";
const char kSequenceIdUnknownThread[] =
"The current thread is not managed by a driver dispatcher. "
"Ensure the object is always used from a dispatcher managed thread.";
const char kSequenceIdWrongDispatcherInstance[] =
"Access from multiple driver dispatchers detected. "
"This is not allowed. Ensure the object is used from the same |fdf_dispatcher_t|.";
} // namespace
zx_status_t Dispatcher::GetSequenceId(async_sequence_id_t* out_sequence_id,
const char** out_error) {
if (unsynchronized()) {
*out_error = kSequenceIdWrongDispatcherType;
return ZX_ERR_WRONG_TYPE;
}
auto* current_dispatcher = driver_context::GetCurrentDispatcher();
if (current_dispatcher == nullptr) {
*out_error = kSequenceIdUnknownThread;
return ZX_ERR_INVALID_ARGS;
}
if (current_dispatcher != this) {
*out_error = kSequenceIdWrongDispatcherInstance;
return ZX_ERR_INVALID_ARGS;
}
out_sequence_id->value = reinterpret_cast<uint64_t>(this);
return ZX_OK;
}
zx_status_t Dispatcher::CheckSequenceId(async_sequence_id_t sequence_id, const char** out_error) {
async_sequence_id_t current_sequence_id;
zx_status_t status = GetSequenceId(&current_sequence_id, out_error);
if (status != ZX_OK) {
return status;
}
if (current_sequence_id.value != sequence_id.value) {
*out_error = kSequenceIdWrongDispatcherInstance;
return ZX_ERR_OUT_OF_RANGE;
}
return ZX_OK;
}
std::unique_ptr<driver_runtime::CallbackRequest> Dispatcher::RegisterCallbackWithoutQueueing(
std::unique_ptr<driver_runtime::CallbackRequest> callback_request) {
fbl::AutoLock lock(&callback_lock_);
if (!IsRunningLocked()) {
return callback_request;
}
registered_callbacks_.push_back(std::move(callback_request));
return nullptr;
}
fit::result<Dispatcher::NonInlinedReason> Dispatcher::ShouldInline(
std::unique_ptr<CallbackRequest>& callback_request) {
auto req_type = callback_request->request_type();
if (!unsynchronized_) {
// Blocking dispatchers are required to queue all callbacks onto the async loop.
if (allow_sync_calls_) {
return fit::error(NonInlinedReason::kAllowSyncCalls);
}
// Synchronous dispatchers do not allow parallel callbacks. If we are already
// dispatching a request on another thread, we will have to queue this request for later.
if (dispatching_sync_) {
return fit::error(NonInlinedReason::kDispatchingOnAnotherThread);
}
// TODO(https://fxbug.dev/42180471): we should be able to remove the task check once we track
// drivers through banjo calls, or start each DFv2 driver with a ALLOW_SYNC_CALLS
// dispatcher.
if (req_type == CallbackRequest::RequestType::kTask) {
return fit::error(NonInlinedReason::kTask);
}
}
// Callbacks that are for waits or irqs can skip the reentrancy check.
// This is as they are always first registered on the global async loop which
// will initiate the callback when ready, at which point the driver call stack
// will be empty, but we still want to consider it not reentrant and directly
// call into the driver.
bool is_global_loop_callback = (req_type == CallbackRequest::RequestType::kIrq) ||
(req_type == CallbackRequest::RequestType::kWait);
if (is_global_loop_callback) {
return fit::ok();
}
// Check if the call would be reentrant, in which case we will queue it up to be run
// later.
//
// If it is unknown which driver is calling this function, it is considered
// to be potentially reentrant.
//
// The call stack may be empty if the user writes to a channel, or registers a
// read callback on a thread not managed by the driver runtime.
// We use |GetCurrentDriver| rather than |IsCallStackEmpty| as this also
// handles the case where the testing dispatcher is set as the thread's default dispatcher.
if (!driver_context::GetCurrentDriver()) {
return fit::error(NonInlinedReason::kUnknownThread);
}
if ((driver_context::GetCurrentDriver() == owner_) ||
driver_context::IsDriverInCallStack(owner_)) {
return fit::error(NonInlinedReason::kReentrant);
}
return fit::ok();
}
void Dispatcher::QueueRegisteredCallback(driver_runtime::CallbackRequest* request,
zx_status_t callback_reason, bool was_deferred) {
TRACE_DURATION("driver_runtime", "Dispatcher::QueueRegisteredCallback", "dispatcher_name",
name_.c_str(), "callback_reason", zx_status_get_string(callback_reason),
"was_deferred", TA_BOOL(was_deferred));
ZX_ASSERT(request);
auto decrement_and_idle_check = fit::defer([this]() {
fbl::AutoLock lock(&callback_lock_);
ZX_ASSERT(num_active_threads_ > 0);
num_active_threads_--;
IdleCheckLocked();
});
std::unique_ptr<driver_runtime::CallbackRequest> callback_request;
{
fbl::AutoLock lock(&callback_lock_);
// It's possible that we are being called from a |Channel::Write| on the peer of a channel
// that is registered on this dispatcher. This means that there is no guarantee that the
// dispatcher will not enter |CompleteShutdown| between when we return from this check
// and when we decrement |num_active_threads_| in |decrement_and_idle_check|.
// Instead do not increment |num_active_threads_| until after this check.
if (!IsRunningLocked()) {
decrement_and_idle_check.cancel();
// We still want to do |IdleCheckLocked|, in case this is a completed |Wait| being processed.
IdleCheckLocked();
return;
}
num_active_threads_++;
// Finding the callback request may fail if the request was cancelled in the meanwhile.
// This is possible if the channel was about to queue the registered callback (in response
// to a channel write or a peer channel closing), but the client cancelled the callback.
//
// Calling |request->InContainer| may crash if the callback request was destructed between
// when we called |RegisterCallbackWithoutQueueing| and now.
// TODO(https://fxbug.dev/42053744): if we change CallbackRequests to use RefPtrs, we should be
// able to switch this back to an |InContainer| check.
callback_request =
registered_callbacks_.erase_if([request](const CallbackRequest& callback_request) {
return &callback_request == request;
});
if (!callback_request) {
return;
}
callback_request->SetCallbackReason(callback_reason);
// Whether we want to call the callback now, or queue it to be run on the async loop.
fit::result<NonInlinedReason> should_inline = ShouldInline(callback_request);
debug_stats_.num_total_requests++;
if (should_inline.is_error()) {
callback_queue_.push_back(std::move(callback_request));
if (event_waiter_ && !event_waiter_->signaled()) {
event_waiter_->signal();
}
// If the message was not inlined earlier due to the wait not yet been ready,
// we should make sure this reason is displayed even if any other of the
// reasons also apply.
if (was_deferred) {
debug_stats_.non_inlined.channel_wait_not_yet_registered++;
} else {
switch (should_inline.error_value()) {
case kAllowSyncCalls:
debug_stats_.non_inlined.allow_sync_calls++;
break;
case kDispatchingOnAnotherThread:
debug_stats_.non_inlined.parallel_dispatch++;
break;
case kTask:
debug_stats_.non_inlined.task++;
break;
case kUnknownThread:
debug_stats_.non_inlined.unknown_thread++;
break;
case kReentrant:
debug_stats_.non_inlined.reentrant++;
break;
default:
LOGF(ERROR, "Unhandled NonInlinedReason");
};
}
return;
}
// The request was not queued earlier, so we don't count it as inlined in the stats,
// even though it is getting inlined in this specific instance.
if (was_deferred) {
debug_stats_.non_inlined.channel_wait_not_yet_registered++;
} else {
debug_stats_.num_inlined_requests++;
}
dispatching_sync_ = true;
}
DispatchCallback(std::move(callback_request));
fbl::AutoLock lock(&callback_lock_);
dispatching_sync_ = false;
if (!callback_queue_.is_empty() && event_waiter_ && !event_waiter_->signaled() &&
IsRunningLocked()) {
event_waiter_->signal();
}
}
void Dispatcher::AddWaitLocked(std::unique_ptr<Dispatcher::AsyncWait> wait) {
ZX_DEBUG_ASSERT(!fbl::InContainer<AsyncWaitTag>(*wait));
waits_.push_back(std::move(wait));
}
std::unique_ptr<Dispatcher::AsyncWait> Dispatcher::RemoveWait(Dispatcher::AsyncWait* wait) {
fbl::AutoLock al(&callback_lock_);
return RemoveWaitLocked(wait);
}
std::unique_ptr<Dispatcher::AsyncWait> Dispatcher::RemoveWaitLocked(Dispatcher::AsyncWait* wait) {
ZX_DEBUG_ASSERT(fbl::InContainer<AsyncWaitTag>(*wait));
auto ret = waits_.erase(*wait);
IdleCheckLocked();
return ret;
}
void Dispatcher::QueueWait(Dispatcher::AsyncWait* wait, zx_status_t status) {
fbl::AutoLock al(&callback_lock_);
ZX_DEBUG_ASSERT(fbl::InContainer<AsyncWaitTag>(*wait));
if (wait->is_pending_cancellation()) {
// Wait was cancelled so we return immediately without invoking the callback.
waits_.erase(*wait);
// In case this is the last wait that shutdown is waiting on.
IdleCheckLocked();
return;
}
if (!IsRunningLocked()) {
// We are waiting for all outstanding waits to be completed. They will be serviced in
// CompleteDestroy.
shutdown_queue_.push_back(waits_.erase(*wait));
IdleCheckLocked();
} else {
registered_callbacks_.push_back(waits_.erase(*wait));
al.release();
QueueRegisteredCallback(wait, status);
}
}
void Dispatcher::AddIrqLocked(std::unique_ptr<Dispatcher::AsyncIrq> irq) {
ZX_DEBUG_ASSERT(!irq->InContainer());
irqs_.push_back(std::move(irq));
}
std::unique_ptr<Dispatcher::AsyncIrq> Dispatcher::RemoveIrqLocked(Dispatcher::AsyncIrq* irq) {
ZX_DEBUG_ASSERT(irq->InContainer());
return irqs_.erase(*irq);
}
void Dispatcher::QueueIrq(AsyncIrq* irq, zx_status_t status) {
auto callback_request = irq->CreateCallbackRequest(*this);
CallbackRequest* callback_ptr = callback_request.get();
{
fbl::AutoLock al(&callback_lock_);
// If the dispatcher is shutting down, we will not deliver any more irqs to the user.
// |CompleteShutdown| will call the irq handler with |ZX_ERR_CANCELED|.
if (!IsRunningLocked()) {
return;
}
if (!irq->GetDispatcherRef()) {
// It's possible that the irq was unbound before we acquired the |callback_lock_|.
return;
}
// Unbinding only happens while the |callback_lock_| is held, so we don't
// need to hold the irq lock while we register this callback request.
registered_callbacks_.push_back(std::move(callback_request));
}
// If the irq is unbound before calling this, it will remove the callback request from
// |registered_callbacks_|.
QueueRegisteredCallback(callback_ptr, status);
}
std::unique_ptr<CallbackRequest> Dispatcher::CancelCallback(CallbackRequest& request_to_cancel) {
fbl::AutoLock lock(&callback_lock_);
// The request can be in |registered_callbacks_|, |callback_queue_| or |shutdown_queue_|.
if (request_to_cancel.InContainer()) {
return request_to_cancel.RemoveFromContainer();
}
return nullptr;
}
bool Dispatcher::SetCallbackReason(CallbackRequest* callback_to_update,
zx_status_t callback_reason) {
fbl::AutoLock lock(&callback_lock_);
auto iter = callback_queue_.find_if(
[callback_to_update](auto& callback) -> bool { return &callback == callback_to_update; });
if (iter == callback_queue_.end()) {
return false;
}
callback_to_update->SetCallbackReason(callback_reason);
return true;
}
std::unique_ptr<CallbackRequest> Dispatcher::CancelAsyncOperationLocked(void* operation) {
auto iter = registered_callbacks_.erase_if([operation](const CallbackRequest& callback_request) {
return callback_request.holds_async_operation(operation);
});
if (iter) {
return iter;
}
iter = callback_queue_.erase_if([operation](const CallbackRequest& callback_request) {
return callback_request.holds_async_operation(operation);
});
if (iter) {
return iter;
}
iter = shutdown_queue_.erase_if([operation](const CallbackRequest& callback_request) {
return callback_request.holds_async_operation(operation);
});
if (iter) {
return iter;
}
iter = delayed_tasks_.erase_if([operation](const CallbackRequest& callback_request) {
return callback_request.holds_async_operation(operation);
});
if (iter) {
ResetTimerLocked();
}
return iter;
}
void Dispatcher::DispatchCallback(
std::unique_ptr<driver_runtime::CallbackRequest> callback_request) {
TRACE_DURATION("driver_runtime", "Dispatcher::DispatchCallback", "dispatcher_name",
name_.c_str());
driver_context::PushDriver(owner_, this);
auto pop_driver = fit::defer([]() { driver_context::PopDriver(); });
callback_request->Call(std::move(callback_request), ZX_OK);
}
void Dispatcher::DispatchCallbacks(std::unique_ptr<EventWaiter> event_waiter,
fbl::RefPtr<Dispatcher> dispatcher_ref) {
ZX_ASSERT(dispatcher_ref != nullptr);
auto defer = fit::defer([&]() {
fbl::AutoLock lock(&callback_lock_);
if (event_waiter) {
// We call |BeginWaitWithRef| even when shutting down so that the |event_waiter|
// stays alive until the dispatcher is destroyed. This allows |IsIdleLocked| to
// correctly check the state of the event waiter. |CompleteShutdown| will cancel
// and drop the event waiter.
zx_status_t status = event_waiter->BeginWaitWithRef(std::move(event_waiter), dispatcher_ref);
if (status == ZX_ERR_BAD_STATE) {
event_waiter_ = nullptr;
}
}
ZX_ASSERT(num_active_threads_ > 0);
num_active_threads_--;
IdleCheckLocked();
});
uint32_t num_callbacks_dispatched = 0;
fbl::DoublyLinkedList<std::unique_ptr<CallbackRequest>> to_call;
{
fbl::AutoLock lock(&callback_lock_);
num_active_threads_++;
// Parallel callbacks are not allowed in synchronized dispatchers.
// We should not be scheduled to run on two different dispatcher threads,
// but it's possible we could still get here if we are currently doing a
// direct call into the driver. In this case, we should designal the event
// waiter, and once the direct call completes it will signal it again.
if ((!unsynchronized_ && dispatching_sync_) || !IsRunningLocked()) {
event_waiter->designal();
return;
}
dispatching_sync_ = true;
num_callbacks_dispatched += TakeNextCallbacks(&to_call);
// Check if there are callbacks left to process and we should wake up an additional
// thread. For synchronized dispatchers, parallel callbacks are disallowed.
if (unsynchronized_ && !callback_queue_.is_empty()) {
zx_status_t status = event_waiter->BeginWaitWithRef(std::move(event_waiter), dispatcher_ref);
if (status == ZX_ERR_BAD_STATE) {
event_waiter_ = nullptr;
}
}
}
while (true) {
// Call the callbacks outside of the lock.
while (!to_call.is_empty()) {
auto callback_request = to_call.pop_front();
ZX_ASSERT(callback_request);
DispatchCallback(std::move(callback_request));
}
{
fbl::AutoLock lock(&callback_lock_);
// Check if there are any more callbacks to dispatch. This may be the case
// if we were dispatching an async operation, or if the user queued more
// operations during the last callback.
if (!callback_queue_.is_empty() && (num_callbacks_dispatched < kBatchSize)) {
num_callbacks_dispatched += TakeNextCallbacks(&to_call);
// Time to dispatch more callbacks.
continue;
}
// If we woke up an additional thread, that thread will update the
// event waiter signals as necessary.
if (!event_waiter) {
return;
}
dispatching_sync_ = false;
ResetTimerLocked();
if (callback_queue_.is_empty() && event_waiter->signaled()) {
event_waiter->designal();
}
return;
}
}
}
uint32_t Dispatcher::TakeNextCallbacks(
fbl::DoublyLinkedList<std::unique_ptr<CallbackRequest>>* out_callbacks) {
// For synchronized dispatchers, cancellation of ChannelReads are guaranteed to succeed.
// Since cancellation may be called from the ChannelRead, or from another async operation
// (like a task), we need to make sure that if we are calling an async operation
// that is the only callback request pulled from the callback queue.
// This will guarantee that cancellation will always succeed without having to lock
// |to_call|.
bool has_async_op = false;
uint32_t n = 0;
while ((n < kBatchSize) && !callback_queue_.is_empty() && !has_async_op) {
std::unique_ptr<CallbackRequest> callback_request = callback_queue_.pop_front();
ZX_ASSERT(callback_request);
has_async_op = !unsynchronized_ && callback_request->has_async_operation();
// For synchronized dispatchers, an async operation should be the only member in
// |to_call|.
if (has_async_op && n > 0) {
callback_queue_.push_front(std::move(callback_request));
break;
}
out_callbacks->push_back(std::move(callback_request));
n++;
}
return n;
}
zx::result<zx::event> Dispatcher::RegisterForCompleteShutdownEvent() {
fbl::AutoLock lock_(&callback_lock_);
auto event = complete_shutdown_event_manager_.GetEvent();
if (event.is_error()) {
return event;
}
if (IsIdleLocked() && !HasFutureOpsScheduledLocked()) {
zx_status_t status = complete_shutdown_event_manager_.Signal();
if (status != ZX_OK) {
return zx::error(status);
}
}
return event;
}
void Dispatcher::WaitUntilIdle() {
ZX_ASSERT(!IsRuntimeManagedThread());
fbl::AutoLock lock_(&callback_lock_);
if (IsIdleLocked()) {
return;
}
idle_event_.Wait(&callback_lock_);
return;
}
bool Dispatcher::IsIdleLocked() {
// If the event waiter was signaled, the thread will be scheduled to run soon.
return (num_active_threads_ == 0) && callback_queue_.is_empty() &&
(!event_waiter_ || !event_waiter_->signaled());
}
bool Dispatcher::HasFutureOpsScheduledLocked() {
return !waits_.is_empty() || timer_.is_armed() || shutdown_waiting_for_timer_;
}
void Dispatcher::IdleCheckLocked() {
if (IsIdleLocked()) {
idle_event_.Broadcast();
if (!HasFutureOpsScheduledLocked()) {
complete_shutdown_event_manager_.Signal();
}
}
}
bool Dispatcher::HasQueuedTasks() {
fbl::AutoLock lock(&callback_lock_);
for (auto& callback_request : callback_queue_) {
if (callback_request.request_type() == CallbackRequest::RequestType::kTask) {
return true;
}
}
for (auto& callback_request : shutdown_queue_) {
if (callback_request.request_type() == CallbackRequest::RequestType::kTask) {
return true;
}
}
return false;
}
void Dispatcher::EventWaiter::HandleEvent(std::unique_ptr<EventWaiter> event_waiter,
async_dispatcher_t* dispatcher, async::WaitBase* wait,
zx_status_t status, const zx_packet_signal_t* signal) {
if (status == ZX_ERR_CANCELED) {
LOGF(TRACE, "Dispatcher: event waiter shutting down\n");
event_waiter->dispatcher_ref_->SetEventWaiter(nullptr);
event_waiter->dispatcher_ref_ = nullptr;
return;
} else if (status != ZX_OK) {
LOGF(ERROR, "Dispatcher: event waiter error: %d\n", status);
event_waiter->dispatcher_ref_->SetEventWaiter(nullptr);
event_waiter->dispatcher_ref_ = nullptr;
return;
}
if (signal->observed & ZX_USER_SIGNAL_0) {
// The callback is in charge of calling |BeginWaitWithRef| on the event waiter.
fbl::RefPtr<Dispatcher> dispatcher_ref = std::move(event_waiter->dispatcher_ref_);
event_waiter->InvokeCallback(std::move(event_waiter), dispatcher_ref);
} else {
LOGF(ERROR, "Dispatcher: event waiter got unexpected signals: %x\n", signal->observed);
}
}
// static
zx_status_t Dispatcher::EventWaiter::BeginWaitWithRef(std::unique_ptr<EventWaiter> event,
fbl::RefPtr<Dispatcher> dispatcher) {
ZX_ASSERT(dispatcher != nullptr);
event->dispatcher_ref_ = dispatcher;
return BeginWait(std::move(event), dispatcher->process_shared_dispatcher_);
}
zx::result<zx::event> Dispatcher::CompleteShutdownEventManager::GetEvent() {
if (!event_.is_valid()) {
// If this is the first waiter to register, we need to create the
// idle event manager's event.
zx_status_t status = zx::event::create(0, &event_);
if (status != ZX_OK) {
return zx::error(status);
}
}
zx::event dup;
zx_status_t status = event_.duplicate(ZX_RIGHTS_BASIC, &dup);
if (status != ZX_OK) {
return zx::error(status);
}
return zx::ok(std::move(dup));
}
zx_status_t Dispatcher::CompleteShutdownEventManager::Signal() {
if (!event_.is_valid()) {
return ZX_OK; // No-one is waiting for idle events.
}
zx_status_t status = event_.signal(0u, ZX_EVENT_SIGNALED);
event_.reset();
return status;
}
zx_status_t Dispatcher::RegisterPendingToken(fdf_token_t* token) {
fbl::AutoLock lock(&callback_lock_);
if (!IsRunningLocked()) {
return ZX_ERR_BAD_STATE;
}
if (registered_tokens_.find(token) != registered_tokens_.end()) {
return ZX_ERR_BAD_STATE;
}
registered_tokens_.insert(token);
return ZX_OK;
}
zx_status_t Dispatcher::ScheduleTokenCallback(fdf_token_t* token, zx_status_t status,
fdf::Channel channel) {
CallbackRequest* callback_request_ptr = nullptr;
{
fbl::AutoLock lock(&callback_lock_);
if (!IsRunningLocked()) {
return ZX_ERR_BAD_STATE;
}
auto callback_request = std::make_unique<CallbackRequest>();
driver_runtime::Callback callback =
[dispatcher = this, token, channel = std::move(channel)](
std::unique_ptr<driver_runtime::CallbackRequest> callback_request,
zx_status_t status) mutable {
token->handler(static_cast<fdf_dispatcher_t*>(dispatcher), token, status,
channel.release());
};
callback_request->SetCallback(static_cast<fdf_dispatcher_t*>(this), std::move(callback));
callback_request_ptr = callback_request.get();
registered_callbacks_.push_back(std::move(callback_request));
registered_tokens_.erase(token);
}
// If the dispatcher is shutdown in the meanwhile, the callback request will be completed
// with |ZX_ERR_CANCELED| in |CompleteShutdown|.
QueueRegisteredCallback(callback_request_ptr, status);
return ZX_OK;
}
// static
void DispatcherCoordinator::WaitUntilDispatchersIdle() {
std::vector<fbl::RefPtr<Dispatcher>> dispatchers;
{
fbl::AutoLock lock(&(GetDispatcherCoordinator().lock_));
for (auto& driver : GetDispatcherCoordinator().drivers_) {
driver.GetDispatchers(dispatchers);
}
}
for (auto& d : dispatchers) {
d->WaitUntilIdle();
}
}
// static
void DispatcherCoordinator::WaitUntilDispatchersDestroyed() {
auto& coordinator = GetDispatcherCoordinator();
fbl::AutoLock lock(&coordinator.lock_);
if (coordinator.AreAllDriversDestroyedLocked()) {
return;
}
coordinator.drivers_destroyed_event_.Wait(&coordinator.lock_);
}
// static
zx_status_t DispatcherCoordinator::TestingRun(zx::time deadline, bool once) {
std::optional<Dispatcher::ThreadPool>& unmanaged_thread_pool =
GetDispatcherCoordinator().unmanaged_thread_pool_;
if (unmanaged_thread_pool.has_value()) {
return unmanaged_thread_pool.value().loop()->Run(deadline, once);
}
return ZX_ERR_BAD_STATE;
}
// static
zx_status_t DispatcherCoordinator::TestingRunUntilIdle() {
std::optional<Dispatcher::ThreadPool>& unmanaged_thread_pool =
GetDispatcherCoordinator().unmanaged_thread_pool_;
if (unmanaged_thread_pool.has_value()) {
return unmanaged_thread_pool.value().loop()->RunUntilIdle();
}
return ZX_ERR_BAD_STATE;
}
// static
void DispatcherCoordinator::TestingQuit() {
std::optional<Dispatcher::ThreadPool>& unmanaged_thread_pool =
GetDispatcherCoordinator().unmanaged_thread_pool_;
if (unmanaged_thread_pool.has_value()) {
unmanaged_thread_pool.value().loop()->Quit();
}
}
// static
zx_status_t DispatcherCoordinator::TestingResetQuit() {
std::optional<Dispatcher::ThreadPool>& unmanaged_thread_pool =
GetDispatcherCoordinator().unmanaged_thread_pool_;
if (unmanaged_thread_pool.has_value()) {
return unmanaged_thread_pool.value().loop()->ResetQuit();
}
return ZX_ERR_BAD_STATE;
}
// static
zx_status_t DispatcherCoordinator::ShutdownDispatchersAsync(
const void* driver, fdf_env_driver_shutdown_observer_t* observer) {
DriverState::DriverShutdownCallback shutdown_callback;
std::vector<fbl::RefPtr<Dispatcher>> dispatchers;
{
fbl::AutoLock lock(&(GetDispatcherCoordinator().lock_));
auto driver_state_iter = GetDispatcherCoordinator().drivers_.find(driver);
if (!driver_state_iter.IsValid()) {
return ZX_ERR_INVALID_ARGS;
}
driver_state_iter->GetDispatchers(dispatchers);
auto driver_state_ref = fbl::RefPtr<DriverState>(&(*driver_state_iter));
shutdown_callback = [driver, observer, driver_state_ref = std::move(driver_state_ref)]() {
observer->handler(driver, observer);
driver_state_ref->SetDriverShutdownComplete();
};
// Set the driver state so that attempts to create new dispatchers on the driver
// return an error.
// If there are no dispatchers to shutdown, we will post a task to call the callback
// immediately rather than set it in the driver state.
auto status = driver_state_iter->SetDriverShuttingDown(
dispatchers.empty() ? nullptr : std::move(shutdown_callback));
if (status != ZX_OK) {
return status;
}
}
for (auto& dispatcher : dispatchers) {
async::PostTask(dispatcher->GetAsyncDispatcher(), [=]() { dispatcher->ShutdownAsync(); });
}
if (dispatchers.empty()) {
auto thread_pool = GetDispatcherCoordinator().default_thread_pool();
ZX_ASSERT(shutdown_callback);
// The dispatchers have already been shutdown and no calls to |NotifyDispatcherShutdown|
// will occur, so we need to schedule the handler to be called.
async::PostTask(thread_pool->loop()->dispatcher(),
[callback = std::move(shutdown_callback)]() mutable { callback(); });
}
return ZX_OK;
}
// static
void DispatcherCoordinator::DestroyAllDispatchers() {
std::vector<fbl::RefPtr<Dispatcher>> dispatchers;
{
fbl::AutoLock lock(&(GetDispatcherCoordinator().lock_));
for (auto& driver_state : GetDispatcherCoordinator().drivers_) {
// We should have already shutdown all dispatchers.
ZX_ASSERT(driver_state.CompletedShutdown());
driver_state.GetShutdownDispatchers(dispatchers);
}
}
for (auto& dispatcher : dispatchers) {
dispatcher->Destroy();
}
WaitUntilDispatchersDestroyed();
}
// static
zx_status_t DispatcherCoordinator::TokenRegister(zx_handle_t token, fdf_dispatcher_t* dispatcher,
fdf_token_t* handler) {
DispatcherCoordinator& coordinator = GetDispatcherCoordinator();
return coordinator.token_manager_.Register(token, dispatcher, handler);
}
// static
zx_status_t DispatcherCoordinator::TokenTransfer(zx_handle_t token, fdf_handle_t handle) {
DispatcherCoordinator& coordinator = GetDispatcherCoordinator();
return coordinator.token_manager_.Transfer(token, handle);
}
zx_status_t DispatcherCoordinator::AddDispatcher(fbl::RefPtr<Dispatcher> dispatcher) {
fbl::AutoLock lock(&lock_);
// Check if we already have a driver state object.
auto driver_state = drivers_.find(dispatcher->owner());
if (driver_state == drivers_.end()) {
auto new_driver_state = fbl::AdoptRef(new DriverState(dispatcher->owner()));
drivers_.insert(new_driver_state);
driver_state = drivers_.find(dispatcher->owner());
} else {
// If the driver is shutting down, we should not allow creating new dispatchers.
if (driver_state->IsShuttingDown()) {
return ZX_ERR_BAD_STATE;
}
}
driver_state->AddDispatcher(dispatcher);
dispatcher->thread_pool()->OnDispatcherAdded();
return ZX_OK;
}
// static
uint32_t DispatcherCoordinator::GetThreadLimit(std::string_view scheduler_role) {
auto thread_pool = GetDispatcherCoordinator().default_thread_pool();
if (scheduler_role != Dispatcher::ThreadPool::kNoSchedulerRole) {
auto result = GetDispatcherCoordinator().GetOrCreateThreadPool(scheduler_role);
if (result.is_error()) {
return 0;
}
thread_pool = *result;
}
return thread_pool->max_threads();
}
// static
zx_status_t DispatcherCoordinator::SetThreadLimit(std::string_view scheduler_role,
uint32_t max_threads) {
auto thread_pool = GetDispatcherCoordinator().default_thread_pool();
if (scheduler_role != Dispatcher::ThreadPool::kNoSchedulerRole) {
auto result = GetDispatcherCoordinator().GetOrCreateThreadPool(scheduler_role);
if (result.is_error()) {
return 0;
}
thread_pool = *result;
}
return thread_pool->set_max_threads(max_threads);
}
void DispatcherCoordinator::NotifyDispatcherShutdown(
Dispatcher& dispatcher, fdf_dispatcher_shutdown_observer_t* dispatcher_shutdown_observer) {
DriverState::DriverShutdownCallback shutdown_callback = nullptr;
fbl::RefPtr<Dispatcher> initial_dispatcher;
fbl::RefPtr<DriverState> driver_state;
auto dec = fit::defer([&]() {
fbl::AutoLock lock(&lock_);
num_notify_shutdown_threads_--;
// The last dispatcher may have been destroyed during a shutdown handler, so check
// if all drivers have been destroyed.
if (AreAllDriversDestroyedLocked()) {
drivers_destroyed_event_.Broadcast();
}
});
{
fbl::AutoLock lock(&lock_);
num_notify_shutdown_threads_++;
auto driver_state_iter = drivers_.find(dispatcher.owner());
ZX_ASSERT(driver_state_iter != drivers_.end());
driver_state = fbl::RefPtr<DriverState>(&(*driver_state_iter));
// Prepare to call the dispatcher's shutdown observer.
// We need to set the dispatcher as shutdown beforehand, in case the user tries to
// destroy the dispatcher.
driver_state->SetDispatcherShutdown(dispatcher);
driver_state->ObserverCallStarted();
}
// We need to call the dispatcher shutdown observer before calling the driver shutdown observer
// (if any).
if (dispatcher_shutdown_observer) {
// We should have already set up the driver call stack before calling
// |NotifyDispatcherShutdown|.
ZX_ASSERT(driver_context::GetCurrentDispatcher() == &dispatcher);
dispatcher_shutdown_observer->handler(static_cast<fdf_dispatcher_t*>(&dispatcher),
dispatcher_shutdown_observer);
}
{
fbl::AutoLock lock(&lock_);
driver_state->ObserverCallComplete();
// Check if we are still waiting for dispatchers to complete shutting down.
if (!driver_state->CompletedShutdown()) {
return;
}
// Check that we are the last shutdown handler, so we don't
// call any driver shutdown observer before all dispatcher shutdown observers have returned.
if (driver_state->num_pending_observer_calls() > 0) {
return;
}
// We should take ownership of the driver shutdown callback before dropping the lock.
// This ensures we do not attempt to call it multiple times.
shutdown_callback = driver_state->take_driver_shutdown_callback();
if (!shutdown_callback) {
// No one to notify.
return;
}
// There should always be an initial dispatcher, as the dispatcher is the one that calls
// |NotifyDispatcherShutdown|.
initial_dispatcher = driver_state->initial_dispatcher();
ZX_ASSERT(initial_dispatcher != nullptr);
}
{
// Make sure the shutdown context looks like it is happening from the initial
// dispatcher's thread.
driver_context::PushDriver(initial_dispatcher->owner(), initial_dispatcher.get());
auto pop_driver = fit::defer([]() { driver_context::PopDriver(); });
shutdown_callback();
}
}
void DispatcherCoordinator::RemoveDispatcher(Dispatcher& dispatcher) {
fbl::AutoLock lock(&lock_);
auto driver_state = drivers_.find(dispatcher.owner());
ZX_ASSERT(driver_state != drivers_.end());
auto thread_pool = dispatcher.thread_pool();
thread_pool->OnDispatcherRemoved(dispatcher);
if (thread_pool->num_dispatchers() == 0) {
DestroyThreadPool(thread_pool);
}
driver_state->RemoveDispatcher(dispatcher);
// If all dispatchers have been destroyed, the driver can be removed from the
// driver state map.
if (!driver_state->HasDispatchers()) {
drivers_.erase(driver_state);
}
if (AreAllDriversDestroyedLocked()) {
drivers_destroyed_event_.Broadcast();
}
}
zx_status_t DispatcherCoordinator::Start() {
DispatcherCoordinator& coordinator = GetDispatcherCoordinator();
fbl::AutoLock lock(&coordinator.lock_);
auto thread_pool = coordinator.default_thread_pool();
if (thread_pool->num_threads() != 0) {
return ZX_ERR_BAD_STATE;
}
return thread_pool->AddThread();
}
// static
void DispatcherCoordinator::EnvReset() {
DispatcherCoordinator& coordinator = GetDispatcherCoordinator();
coordinator.Reset();
}
void DispatcherCoordinator::Reset() {
{
fbl::AutoLock al(&lock_);
ZX_ASSERT(drivers_.is_empty());
}
default_thread_pool()->Reset();
if (unmanaged_thread_pool_.has_value()) {
unmanaged_thread_pool_.value().Reset();
}
unmanaged_thread_pool_.reset();
}
zx::result<Dispatcher::ThreadPool*> DispatcherCoordinator::GetOrCreateThreadPool(
std::string_view scheduler_role) {
fbl::AutoLock al(&lock_);
auto iter = role_to_thread_pool_.find(std::string(scheduler_role));
if (iter != role_to_thread_pool_.end()) {
return zx::ok(&(*iter));
}
auto thread_pool = std::make_unique<Dispatcher::ThreadPool>(scheduler_role);
zx_status_t status = thread_pool->AddThread();
if (status != ZX_OK) {
return zx::error(status);
}
auto* thread_pool_ptr = thread_pool.get();
role_to_thread_pool_.insert(std::move(thread_pool));
return zx::ok(thread_pool_ptr);
}
void DispatcherCoordinator::DestroyThreadPool(Dispatcher::ThreadPool* thread_pool) {
if (thread_pool == default_thread_pool()) {
return;
}
if (unmanaged_thread_pool_.has_value() && thread_pool == &unmanaged_thread_pool_.value()) {
return;
}
// We should immediately remove the thread pool from the coordinator
// map, so that a new driver doesn't try to use a destructing thread pool.
std::unique_ptr<Dispatcher::ThreadPool> owned_thread_pool =
role_to_thread_pool_.erase(*thread_pool);
ZX_ASSERT(owned_thread_pool != nullptr);
// Ensure we are running on a default thread pool thread.
async::PostTask(default_thread_pool()->loop()->dispatcher(),
[thread_pool = std::move(owned_thread_pool)]() {
// This will destruct the thread pool and join with its threads.
});
}
void Dispatcher::ThreadPool::ThreadWakeupPrologue() {
if (driver_context::GetRoleProfileStatus().has_value()) {
// We have already attempted to set the role profile for the current thread.
return;
}
zx_status_t status = SetRoleProfile();
if (status != ZX_OK) {
// Failing to set the role profile is not a fatal error.
LOGF(WARNING, "Failed to set scheduler role: %d\n", status);
}
driver_context::SetRoleProfileStatus(status);
}
zx_status_t Dispatcher::ThreadPool::SetRoleProfile() {
#if __Fuchsia_API_level__ >= 20
zx::result client_end = component::Connect<fuchsia_scheduler::RoleManager>();
if (client_end.is_error()) {
return client_end.status_value();
}
auto role_manager = *std::move(client_end);
const zx_rights_t kRights = ZX_RIGHT_TRANSFER | ZX_RIGHT_MANAGE_THREAD;
zx::thread duplicate;
zx_status_t status = zx::thread::self()->duplicate(kRights, &duplicate);
if (status != ZX_OK) {
return status;
}
fidl::Arena arena;
auto request =
fuchsia_scheduler::wire::RoleManagerSetRoleRequest::Builder(arena)
.target(fuchsia_scheduler::wire::RoleTarget::WithThread(std::move(duplicate)))
.role(fuchsia_scheduler::wire::RoleName{fidl::StringView::FromExternal(scheduler_role())})
.Build();
auto result = fidl::WireCall(role_manager)->SetRole(request);
if (result.status() != ZX_OK) {
return result.status();
}
if (!result.value().is_ok()) {
return result.value().error_value();
}
return ZX_OK;
#endif
return ZX_ERR_NOT_SUPPORTED;
}
zx_status_t Dispatcher::ThreadPool::AddThread() {
if (is_unmanaged_) {
// No-op for the unmanaged thread-pool.
return ZX_OK;
}
fbl::AutoLock lock(&lock_);
dispatcher_threads_needed_++;
// This avoids starting an additional thread when there is only 1 dispatcher and
// we have already started an initial thread for the thread pool.
// Note this check is before we have incremented |num_dispatchers_| for the current dispatcher.
// TODO(https://fxbug.dev/42076454): we should be able to remove the scheduler role check.
if ((scheduler_role_ != kNoSchedulerRole) && (num_threads_ > num_dispatchers_)) [[likely]] {
return ZX_OK;
}
if (num_threads_ >= dispatcher_threads_needed_ || num_threads_ == max_threads_) {
return ZX_OK;
}
auto name = "fdf-dispatcher-thread-" + std::to_string(num_threads_);
if (scheduler_role() != kNoSchedulerRole) {
name += ":";
name += scheduler_role();
}
zx_status_t status = loop_.StartThread(name.c_str());
if (status == ZX_OK) {
num_threads_++;
}
return status;
}
zx_status_t Dispatcher::ThreadPool::RemoveThread() {
if (is_unmanaged_) {
// No-op for the unmanaged thread-pool.
return ZX_OK;
}
fbl::AutoLock lock(&lock_);
ZX_ASSERT(dispatcher_threads_needed_ > 0);
dispatcher_threads_needed_--;
return ZX_OK;
}
void Dispatcher::ThreadPool::OnDispatcherAdded() {
fbl::AutoLock lock(&lock_);
num_dispatchers_++;
}
void Dispatcher::ThreadPool::OnDispatcherRemoved(Dispatcher& dispatcher) {
fbl::AutoLock lock(&lock_);
// We need to check the process shared dispatcher matches as tests inject their own.
if (!is_unmanaged_ && dispatcher.allow_sync_calls() &&
dispatcher.process_shared_dispatcher() == loop()->dispatcher()) {
ZX_ASSERT(dispatcher_threads_needed_ > 0);
dispatcher_threads_needed_--;
}
ZX_ASSERT(num_dispatchers_ > 0);
num_dispatchers_--;
}
void Dispatcher::ThreadPool::Reset() {
{
fbl::AutoLock lock(&lock_);
ZX_ASSERT_MSG(dispatcher_threads_needed_ <= 1, "Too many dispatcher threads to reset: %d",
dispatcher_threads_needed_);
}
loop_.Quit();
loop_.JoinThreads();
loop_.ResetQuit();
loop_.RunUntilIdle();
{
fbl::AutoLock al(&lock_);
max_threads_ = 10;
num_threads_ = 0;
dispatcher_threads_needed_ = 0;
num_dispatchers_ = 0;
}
}
void Dispatcher::ThreadPool::CacheUnboundIrq(std::unique_ptr<Dispatcher::AsyncIrq> irq) {
fbl::AutoLock lock(&lock_);
cached_irqs_.AddIrqLocked(std::move(irq));
}
void Dispatcher::ThreadPool::OnThreadWakeup() {
uint32_t thread_irq_generation_id = driver_context::GetIrqGenerationId();
// Check if we have already tracked this thread wakeup for the current generation of irqs.
// |cur_generatiom_id| is atomic - we do not acquire the lock here to avoid unnecessary lock
// contention per thread wakeup. If the generation id changes in the meanwhile, the next wakeuup
// of this thread can handle that.
if (thread_irq_generation_id == cached_irqs_.cur_generation_id()) {
return;
}
fbl::AutoLock lock(&lock_);
// We should set this first, as |cached_irqs_.NewThreadWakeupLocked| may increment the generation
// id if it clears the current generation.
driver_context::SetIrqGenerationId(cached_irqs_.cur_generation_id());
cached_irqs_.NewThreadWakeupLocked(num_threads_);
}
void Dispatcher::ThreadPool::CachedIrqs::AddIrqLocked(std::unique_ptr<Dispatcher::AsyncIrq> irq) {
// Check if we are tracking a new generation of irqs.
if (cur_generation_.is_empty()) {
IncrementGenerationId();
}
// We should only add to the current generation of cached irqs if no thread has woken up yet.
if (threads_wakeup_count_ == 0) {
cur_generation_.push_back(std::move(irq));
} else {
next_generation_.push_back(std::move(irq));
}
}
void Dispatcher::ThreadPool::CachedIrqs::NewThreadWakeupLocked(uint32_t total_number_threads) {
threads_wakeup_count_++;
// If all threads have woken up since the current generation of cached irqs was populated,
// we can be sure that no threads have a pending irq packet that correspond to these unbound irqs.
if (threads_wakeup_count_ < total_number_threads) {
return;
}
// Drop the current generation of irqs, and begin tracking thread wakeups for the next generation.
cur_generation_ = std::move(next_generation_);
// If the next generation already has irqs, we need to increment the generation counter
// so that thread wakeups will be tracked.
if (cur_generation_.size() > 0) {
IncrementGenerationId();
}
threads_wakeup_count_ = 0;
}
} // namespace driver_runtime