| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "dispatcher.h" |
| |
| #include <fidl/fuchsia.scheduler/cpp/wire.h> |
| #include <lib/async/dispatcher.h> |
| #include <lib/async/irq.h> |
| #include <lib/async/receiver.h> |
| #include <lib/async/sequence_id.h> |
| #include <lib/async/task.h> |
| #include <lib/async/trap.h> |
| #include <lib/component/incoming/cpp/protocol.h> |
| #include <lib/fdf/dispatcher.h> |
| #include <lib/fit/defer.h> |
| #include <lib/trace/event.h> |
| #include <lib/zx/clock.h> |
| #include <lib/zx/thread.h> |
| #include <stdlib.h> |
| #include <threads.h> |
| #include <zircon/assert.h> |
| #include <zircon/errors.h> |
| #include <zircon/listnode.h> |
| #include <zircon/status.h> |
| #include <zircon/syscalls.h> |
| |
| #include <memory> |
| #include <string> |
| |
| #include <fbl/auto_lock.h> |
| #include <fbl/intrusive_double_list.h> |
| #include <fbl/ref_ptr.h> |
| |
| #include "src/devices/bin/driver_runtime/callback_request.h" |
| #include "src/devices/bin/driver_runtime/driver_context.h" |
| #include "src/devices/lib/log/log.h" |
| |
| namespace driver_runtime { |
| |
| namespace { |
| |
| const async_ops_t g_dispatcher_ops = { |
| .version = ASYNC_OPS_V3, |
| .reserved = 0, |
| .v1 = { |
| .now = |
| [](async_dispatcher_t* dispatcher) { |
| return static_cast<Dispatcher*>(dispatcher)->GetTime(); |
| }, |
| .begin_wait = |
| [](async_dispatcher_t* dispatcher, async_wait_t* wait) { |
| return static_cast<Dispatcher*>(dispatcher)->BeginWait(wait); |
| }, |
| .cancel_wait = |
| [](async_dispatcher_t* dispatcher, async_wait_t* wait) { |
| return static_cast<Dispatcher*>(dispatcher)->CancelWait(wait); |
| }, |
| .post_task = |
| [](async_dispatcher_t* dispatcher, async_task_t* task) { |
| return static_cast<Dispatcher*>(dispatcher)->PostTask(task); |
| }, |
| .cancel_task = |
| [](async_dispatcher_t* dispatcher, async_task_t* task) { |
| return static_cast<Dispatcher*>(dispatcher)->CancelTask(task); |
| }, |
| .queue_packet = |
| [](async_dispatcher_t* dispatcher, async_receiver_t* receiver, |
| const zx_packet_user_t* data) { |
| return static_cast<Dispatcher*>(dispatcher)->QueuePacket(receiver, data); |
| }, |
| .set_guest_bell_trap = [](async_dispatcher_t* dispatcher, async_guest_bell_trap_t* trap, |
| zx_handle_t guest, zx_vaddr_t addr, |
| size_t length) { return ZX_ERR_NOT_SUPPORTED; }, |
| }, |
| .v2 = { |
| .bind_irq = |
| [](async_dispatcher_t* dispatcher, async_irq_t* irq) { |
| return static_cast<Dispatcher*>(dispatcher)->BindIrq(irq); |
| }, |
| .unbind_irq = |
| [](async_dispatcher_t* dispatcher, async_irq_t* irq) { |
| return static_cast<Dispatcher*>(dispatcher)->UnbindIrq(irq); |
| }, |
| .create_paged_vmo = [](async_dispatcher_t* dispatcher, async_paged_vmo_t* paged_vmo, |
| uint32_t options, zx_handle_t pager, uint64_t vmo_size, |
| zx_handle_t* vmo_out) { return ZX_ERR_NOT_SUPPORTED; }, |
| .detach_paged_vmo = [](async_dispatcher_t* dispatcher, |
| async_paged_vmo_t* paged_vmo) { return ZX_ERR_NOT_SUPPORTED; }, |
| }, |
| .v3 = { |
| .get_sequence_id = |
| [](async_dispatcher_t* dispatcher, async_sequence_id_t* out_sequence_id, |
| const char** out_error) { |
| return static_cast<Dispatcher*>(dispatcher) |
| ->GetSequenceId(out_sequence_id, out_error); |
| }, |
| .check_sequence_id = |
| [](async_dispatcher_t* dispatcher, async_sequence_id_t sequence_id, |
| const char** out_error) { |
| return static_cast<Dispatcher*>(dispatcher)->CheckSequenceId(sequence_id, out_error); |
| }, |
| }, |
| }; |
| |
| } // namespace |
| |
| DispatcherCoordinator& GetDispatcherCoordinator() { |
| static DispatcherCoordinator shared_loop; |
| return shared_loop; |
| } |
| |
| Dispatcher::AsyncWait::AsyncWait(async_wait_t* original_wait, Dispatcher& dispatcher) |
| : CallbackRequest(CallbackRequest::RequestType::kWait), |
| async_wait_t{{ASYNC_STATE_INIT}, |
| &Dispatcher::AsyncWait::Handler, |
| original_wait->object, |
| original_wait->trigger, |
| 0}, |
| original_wait_(original_wait) { |
| // Use one of the async_wait_t's reserved fields to stash a pointer to the AsyncWait object. |
| original_wait_->state.reserved[0] = reinterpret_cast<uintptr_t>(this); |
| |
| auto async_dispatcher = dispatcher.GetAsyncDispatcher(); |
| driver_runtime::Callback callback = |
| [this, async_dispatcher](std::unique_ptr<driver_runtime::CallbackRequest> callback_request, |
| zx_status_t status) { |
| // Clear the pointer to the AsyncWait object. |
| original_wait_->state.reserved[0] = 0; |
| zx_packet_signal_t* signal_packet = signal_packet_ ? &signal_packet_.value() : nullptr; |
| original_wait_->handler(async_dispatcher, original_wait_, status, signal_packet); |
| }; |
| // Note that this callback is called *after* |OnSignal|, which is the immediate callback that is |
| // invoked when the async wait is signaled. |
| SetCallback(static_cast<fdf_dispatcher_t*>(&dispatcher), std::move(callback), original_wait_); |
| } |
| |
| Dispatcher::AsyncWait::~AsyncWait() { |
| // This shouldn't destruct until the wait was canceled or it has been completed. |
| ZX_ASSERT(dispatcher_ref_ == nullptr); |
| } |
| |
| // static |
| zx_status_t Dispatcher::AsyncWait::BeginWait(std::unique_ptr<AsyncWait> wait, |
| Dispatcher& dispatcher) { |
| // Purposefully create a cycle which is broken in Cancel or OnSignal. |
| // This needs to be done ahead of starting the async wait in case another thread on the dispatcher |
| // signals the dispatcher. |
| auto dispatcher_ref = fbl::RefPtr(&dispatcher); |
| wait->dispatcher_ref_ = fbl::ExportToRawPtr(&dispatcher_ref); |
| auto* wait_ref = wait.get(); |
| dispatcher.AddWaitLocked(std::move(wait)); |
| |
| zx_status_t status = async_begin_wait(dispatcher.process_shared_dispatcher_, wait_ref); |
| if (status != ZX_OK) { |
| dispatcher.RemoveWaitLocked(wait_ref); |
| fbl::ImportFromRawPtr(wait_ref->dispatcher_ref_.exchange(nullptr)); |
| return status; |
| } |
| return ZX_OK; |
| } |
| |
| bool Dispatcher::AsyncWait::Cancel() { |
| // We do a load here rather than an exchange as OnSignal may still be triggered and we need to |
| // avoid preventing it from accessing the |dispatcher_ref_|. |
| auto* dispatcher_ref = dispatcher_ref_.load(); |
| if (dispatcher_ref == nullptr) { |
| // OnSignal was triggered in another thread. |
| return false; |
| } |
| auto dispatcher = fbl::RefPtr(dispatcher_ref); |
| auto status = async_cancel_wait(dispatcher->process_shared_dispatcher_, this); |
| if (status != ZX_OK) { |
| // OnSignal was triggered in another thread, or is about to be. |
| ZX_DEBUG_ASSERT(status == ZX_ERR_NOT_FOUND); |
| return false; |
| } |
| // It is now safe to recover the dispatcher reference. |
| dispatcher_ref = dispatcher_ref_.exchange(nullptr); |
| ZX_DEBUG_ASSERT(dispatcher_ref != nullptr); |
| fbl::ImportFromRawPtr(dispatcher_ref); |
| |
| return true; |
| } |
| |
| // static |
| void Dispatcher::AsyncWait::Handler(async_dispatcher_t* dispatcher, async_wait_t* wait, |
| zx_status_t status, const zx_packet_signal_t* signal) { |
| static_cast<AsyncWait*>(wait)->OnSignal(dispatcher, status, signal); |
| } |
| |
| void Dispatcher::AsyncWait::OnSignal(async_dispatcher_t* async_dispatcher, zx_status_t status, |
| const zx_packet_signal_t* signal) { |
| auto* dispatcher_ref = dispatcher_ref_.exchange(nullptr); |
| ZX_DEBUG_ASSERT(dispatcher_ref != nullptr); |
| auto dispatcher = fbl::ImportFromRawPtr(dispatcher_ref); |
| |
| if (signal) { |
| signal_packet_ = *signal; |
| } else { |
| signal_packet_ = std::nullopt; |
| } |
| |
| dispatcher->QueueWait(this, status); |
| dispatcher->thread_pool()->OnThreadWakeup(); |
| } |
| |
| Dispatcher::AsyncIrq::AsyncIrq(async_irq_t* original_irq, Dispatcher& dispatcher) |
| : async_irq_t{{ASYNC_STATE_INIT}, &Dispatcher::AsyncIrq::Handler, original_irq->object}, |
| original_irq_(original_irq) { |
| // Store a pointer to our IRQ wrapper, so |UnbindIrq| can back map from the user's IRQ object. |
| original_irq_->state.reserved[0] = reinterpret_cast<uintptr_t>(this); |
| } |
| |
| Dispatcher::AsyncIrq::~AsyncIrq() { |
| // This shouldn't destruct until after the irq has been unbound, either by the user or |
| // |ShutdownAsync|. |
| ZX_ASSERT(dispatcher_ == nullptr); |
| } |
| |
| // static |
| zx_status_t Dispatcher::AsyncIrq::Bind(std::unique_ptr<AsyncIrq> irq, Dispatcher& dispatcher) { |
| // The AsyncIrq will hold the dispatcher reference until the irq is unbound. |
| irq->SetDispatcherRef(fbl::RefPtr(&dispatcher)); |
| |
| auto* irq_ref = irq.get(); |
| dispatcher.AddIrqLocked(std::move(irq)); |
| |
| zx_status_t status = async_bind_irq(dispatcher.process_shared_dispatcher_, irq_ref); |
| if (status != ZX_OK) { |
| ZX_ASSERT(dispatcher.RemoveIrqLocked(irq_ref) != nullptr); |
| irq->SetDispatcherRef(nullptr); |
| return status; |
| } |
| return ZX_OK; |
| } |
| |
| bool Dispatcher::AsyncIrq::Unbind() { |
| auto dispatcher = GetDispatcherRef(); |
| if (!dispatcher) { |
| return false; |
| } |
| auto status = async_unbind_irq(dispatcher->process_shared_dispatcher_, this); |
| if (status != ZX_OK) { |
| return false; |
| } |
| SetDispatcherRef(nullptr); |
| original_irq_->state.reserved[0] = 0; |
| return true; |
| } |
| |
| std::unique_ptr<driver_runtime::CallbackRequest> Dispatcher::AsyncIrq::CreateCallbackRequest( |
| Dispatcher& dispatcher) { |
| auto async_dispatcher = dispatcher.GetAsyncDispatcher(); |
| |
| // TODO(https://fxbug.dev/42052990): We should consider something more efficient than creating a |
| // callback request each time the irq is triggered. This is complex due to an AsyncIrq not having |
| // a 1:1 mapping to interrupt callbacks, and we cannot easily return ownership of a |
| // |CallbackRequest| after dispatching it. See https://fxbug.dev/42052990 for a longer |
| // explanation. |
| auto callback_request = |
| std::make_unique<driver_runtime::CallbackRequest>(CallbackRequest::RequestType::kIrq); |
| driver_runtime::Callback callback = |
| [this, async_dispatcher](std::unique_ptr<driver_runtime::CallbackRequest> callback_request, |
| zx_status_t status) { |
| // We should not clear the reserved state, as this AsyncIrq object is still bound for |
| // future interrupts. |
| original_irq_->handler(async_dispatcher, original_irq_, status, &interrupt_packet_); |
| }; |
| callback_request->SetCallback(static_cast<fdf_dispatcher_t*>(&dispatcher), std::move(callback), |
| this); |
| return callback_request; |
| } |
| |
| // static |
| void Dispatcher::AsyncIrq::Handler(async_dispatcher_t* dispatcher, async_irq_t* irq, |
| zx_status_t status, const zx_packet_interrupt_t* packet) { |
| static_cast<AsyncIrq*>(irq)->OnSignal(dispatcher, status, packet); |
| } |
| |
| void Dispatcher::AsyncIrq::OnSignal(async_dispatcher_t* global_dispatcher, zx_status_t status, |
| const zx_packet_interrupt_t* packet) { |
| fbl::RefPtr<Dispatcher> dispatcher = GetDispatcherRef(); |
| // This may be cleared if the irq has already been unbound, but this irq packet was already pulled |
| // from the port. If so, we should not deliver the irq to the user. |
| if (!dispatcher) { |
| return; |
| } |
| interrupt_packet_ = *packet; |
| |
| // We do not hold the irq lock before calling |QueueIrq|, as it would cause |
| // incorrect lock ordering. |
| dispatcher->QueueIrq(this, status); |
| dispatcher->thread_pool()->OnThreadWakeup(); |
| } |
| |
| Dispatcher::Dispatcher(uint32_t options, std::string_view name, bool unsynchronized, |
| bool allow_sync_calls, const void* owner, ThreadPool* thread_pool, |
| async_dispatcher_t* process_shared_dispatcher, |
| fdf_dispatcher_shutdown_observer_t* observer) |
| : async_dispatcher_t{&g_dispatcher_ops}, |
| options_(options), |
| unsynchronized_(unsynchronized), |
| allow_sync_calls_(allow_sync_calls), |
| owner_(owner), |
| thread_pool_(thread_pool), |
| process_shared_dispatcher_(process_shared_dispatcher), |
| timer_(this), |
| shutdown_observer_(observer) { |
| name_.Append(name); |
| } |
| |
| // static |
| zx_status_t Dispatcher::CreateWithAdder(uint32_t options, std::string_view name, |
| std::string_view scheduler_role, const void* owner, |
| ThreadPool* thread_pool, |
| async_dispatcher_t* parent_dispatcher, ThreadAdder adder, |
| fdf_dispatcher_shutdown_observer_t* observer, |
| Dispatcher** out_dispatcher) { |
| ZX_DEBUG_ASSERT(out_dispatcher); |
| |
| bool unsynchronized = options & FDF_DISPATCHER_OPTION_UNSYNCHRONIZED; |
| bool allow_sync_calls = options & FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS; |
| if (unsynchronized && allow_sync_calls) { |
| return ZX_ERR_NOT_SUPPORTED; |
| } |
| if (!owner) { |
| return ZX_ERR_INVALID_ARGS; |
| } |
| |
| if (allow_sync_calls) { |
| zx_status_t status = adder(); |
| if (status != ZX_OK) { |
| return status; |
| } |
| } |
| |
| auto dispatcher = |
| fbl::MakeRefCounted<Dispatcher>(options, name, unsynchronized, allow_sync_calls, owner, |
| thread_pool, parent_dispatcher, observer); |
| |
| zx::event event; |
| if (zx_status_t status = zx::event::create(0, &event); status != ZX_OK) { |
| return status; |
| } |
| |
| auto self = dispatcher.get(); |
| auto event_waiter = std::make_unique<EventWaiter>( |
| std::move(event), |
| [self](std::unique_ptr<EventWaiter> event_waiter, fbl::RefPtr<Dispatcher> dispatcher_ref) { |
| auto ref = dispatcher_ref; |
| self->DispatchCallbacks(std::move(event_waiter), std::move(dispatcher_ref)); |
| ref->thread_pool()->OnThreadWakeup(); |
| }); |
| dispatcher->SetEventWaiter(event_waiter.get()); |
| zx_status_t status = EventWaiter::BeginWaitWithRef(std::move(event_waiter), dispatcher); |
| if (status == ZX_ERR_BAD_STATE) { |
| dispatcher->SetEventWaiter(nullptr); |
| return status; |
| } |
| if (scheduler_role != ThreadPool::kNoSchedulerRole) { |
| if (thread_pool->num_dispatchers() == 0) { |
| // Each thread in the thread pool will check whether it needs to set the scheduler |
| // role when it wakes up. If this is the first dispatcher, we might as well |
| // post a task so we can get the scheduler role set ASAP. Otherwise, if there |
| // are multiple dispatchers and threads, it becomes less likely that we would |
| // happen to post the task to a thread that doesn't already have the scheduler role set, |
| // so we'll leave any unset thread to set it's role on next wakeup. |
| status = async::PostTask(thread_pool->loop()->dispatcher(), [dispatcher]() mutable { |
| // This task is intentionally empty, the actual work takes place in the async loop's |
| // prologue function. See |Dispatcher::ThreadPool::ThreadWakeupPrologue|. |
| }); |
| if (status != ZX_OK) { |
| // Posting a task would only fail if global loop (and probably process) was shutting down. |
| LOGF(ERROR, "Failed to post task to set scheduler role"); |
| return status; |
| } |
| } |
| } |
| |
| // This may fail if the entire driver is being shut down by the driver host. |
| status = GetDispatcherCoordinator().AddDispatcher(dispatcher); |
| if (status != ZX_OK) { |
| dispatcher->SetEventWaiter(nullptr); |
| return status; |
| } |
| |
| // This reference will be recovered in |Destroy|. |
| *out_dispatcher = fbl::ExportToRawPtr(&dispatcher); |
| return ZX_OK; |
| } |
| |
| // fdf_dispatcher_t implementation |
| |
| // static |
| zx_status_t Dispatcher::Create(uint32_t options, std::string_view name, |
| std::string_view scheduler_role, |
| fdf_dispatcher_shutdown_observer_t* observer, |
| Dispatcher** out_dispatcher) { |
| auto thread_pool = GetDispatcherCoordinator().default_thread_pool(); |
| if (scheduler_role != ThreadPool::kNoSchedulerRole) { |
| auto result = GetDispatcherCoordinator().GetOrCreateThreadPool(scheduler_role); |
| if (result.is_error()) { |
| return result.status_value(); |
| } |
| thread_pool = *result; |
| } |
| return CreateWithAdder( |
| options, name, scheduler_role, driver_context::GetCurrentDriver(), thread_pool, |
| thread_pool->loop()->dispatcher(), [thread_pool]() { return thread_pool->AddThread(); }, |
| observer, out_dispatcher); |
| } |
| |
| zx_status_t Dispatcher::CreateUnmanagedDispatcher( |
| uint32_t options, std::string_view name, fdf_dispatcher_shutdown_observer_t* shutdown_observer, |
| Dispatcher** out_dispatcher) { |
| auto unmanaged_thread_pool = GetDispatcherCoordinator().GetOrCreateUnmanagedThreadPool(); |
| return CreateWithAdder( |
| options, name, ThreadPool::kNoSchedulerRole, driver_context::GetCurrentDriver(), |
| unmanaged_thread_pool, unmanaged_thread_pool->loop()->dispatcher(), |
| [unmanaged_thread_pool]() { return unmanaged_thread_pool->AddThread(); }, shutdown_observer, |
| out_dispatcher); |
| } |
| |
| // static |
| Dispatcher* Dispatcher::DowncastAsyncDispatcher(async_dispatcher_t* dispatcher) { |
| auto ret = static_cast<Dispatcher*>(dispatcher); |
| ret->canary_.Assert(); |
| return ret; |
| } |
| |
| async_dispatcher_t* Dispatcher::GetAsyncDispatcher() { |
| // Note: We inherit from async_t so we can upcast to it. |
| return static_cast<async_dispatcher_t*>(this); |
| } |
| |
| void Dispatcher::ShutdownAsync() { |
| { |
| fbl::AutoLock lock(&callback_lock_); |
| |
| switch (state_) { |
| case DispatcherState::kRunning: |
| state_ = DispatcherState::kShuttingDown; |
| break; |
| case DispatcherState::kShuttingDown: |
| case DispatcherState::kShutdown: |
| case DispatcherState::kDestroyed: |
| return; |
| default: |
| ZX_ASSERT_MSG(false, "Dispatcher::ShutdownAsync got unknown dispatcher state %d", |
| static_cast<int>(state_)); |
| } |
| |
| // Move the requests into a separate queue so we will be able to enter an idle state. |
| // This queue will be processed by |CompleteShutdown|. |
| shutdown_queue_ = std::move(callback_queue_); |
| shutdown_queue_.splice(shutdown_queue_.end(), registered_callbacks_); |
| |
| // Try to cancel all outstanding waits. Successfully canceled waits should be have their |
| // callbacks triggered. |
| auto waits = std::move(waits_); |
| for (auto wait = waits.pop_front(); wait; wait = waits.pop_front()) { |
| // It's possible that the wait has already been cancelled but not yet pulled |
| // from the |waits_| list, in which case the user may have already freed |
| // the handle they were waiting on, so we should not try to cancel it again. |
| if (!wait->is_pending_cancellation() && wait->Cancel()) { |
| // We were successful. Lets queue this up to be processed by |CompleteDestroy|. |
| shutdown_queue_.push_back(std::move(wait)); |
| } else { |
| // We weren't successful, |wait| is being run or queued to run and will want to remove this |
| // from the |waits_| list. |
| waits_.push_back(std::move(wait)); |
| } |
| } |
| |
| // It's easier to handle |irqs_| in |CompleteShutdown|, so unbinding will only |
| // ever happen on a thread at once. If the irq gets triggered in the meanwhile, |
| // |QueueIrq| will return early. |
| |
| zx_status_t status = timer_.Cancel(); |
| // If we could not cancel the timer, it is going to run / is already running in another |
| // thread, and we don't want |CompleteShutdown| to run until after that completes. |
| if (status != ZX_OK) { |
| shutdown_waiting_for_timer_ = true; |
| } |
| shutdown_queue_.splice(shutdown_queue_.end(), delayed_tasks_); |
| |
| // To avoid race conditions with attempting to cancel a wait that might be scheduled to |
| // run, we will cancel the event waiter in the |CompleteShutdown| callback. This is as |
| // |async::Wait::Cancel| is not thread safe. |
| } |
| |
| auto dispatcher_ref = fbl::RefPtr<Dispatcher>(this); |
| |
| // The dispatcher shutdown API specifies that on shutdown, tasks and cancellation |
| // callbacks should run serialized. Wait for all active threads to |
| // complete before calling the cancellation callbacks. |
| auto event = RegisterForCompleteShutdownEvent(); |
| ZX_ASSERT(event.status_value() == ZX_OK); |
| |
| // Don't use async::WaitOnce as it sets the handler in a thread unsafe way. |
| auto wait = std::make_unique<async::Wait>( |
| event->get(), ZX_EVENT_SIGNALED, 0, |
| [dispatcher_ref = std::move(dispatcher_ref), event = std::move(*event)]( |
| async_dispatcher_t* dispatcher, async::Wait* wait, zx_status_t status, |
| const zx_packet_signal_t* signal) mutable { |
| ZX_ASSERT(status == ZX_OK || status == ZX_ERR_CANCELED); |
| dispatcher_ref->CompleteShutdown(); |
| delete wait; |
| }); |
| ZX_ASSERT(wait->Begin(process_shared_dispatcher_) == ZX_OK); |
| wait.release(); // This will be deleted by the wait handler once it is called. |
| } |
| |
| void Dispatcher::CompleteShutdown() { |
| fbl::DoublyLinkedList<std::unique_ptr<AsyncIrq>> unbound_irqs; |
| std::unordered_set<fdf_token_t*> registered_tokens; |
| { |
| fbl::AutoLock lock(&callback_lock_); |
| |
| ZX_ASSERT(state_ == DispatcherState::kShuttingDown); |
| |
| ZX_ASSERT_MSG(num_active_threads_ == 0, "CompleteShutdown called but there are active threads"); |
| ZX_ASSERT_MSG(callback_queue_.is_empty(), |
| "CompleteShutdown called but callback queue has %lu items", |
| callback_queue_.size_slow()); |
| ZX_ASSERT_MSG((!event_waiter_ || !event_waiter_->signaled()), |
| "CompleteShutdown called but event waiter is still signaled"); |
| ZX_ASSERT(IsIdleLocked()); |
| |
| ZX_ASSERT_MSG(!HasFutureOpsScheduledLocked(), |
| "CompleteShutdown called but future ops are scheduled"); |
| |
| if (event_waiter_) { |
| // Since the event waiter holds a reference to the dispatcher, |
| // we need to cancel it to reclaim it. |
| // This should always succeed, as there should be no other threads processing |
| // tasks for this dispatcher, and we should have cleared |event_waiter_| if |
| // the AsyncLoopOwned event waiter was dropped. |
| ZX_ASSERT(event_waiter_->Cancel() != nullptr); |
| event_waiter_ = nullptr; |
| } |
| |
| unbound_irqs = std::move(irqs_); |
| for (auto& irq : unbound_irqs) { |
| // It's possible that a callback request may be queued for a triggered irq. |
| // We should only queue an additional cancellation callback if one does not |
| // already exist. |
| auto iter = |
| shutdown_queue_.find_if([operation = &irq](const CallbackRequest& callback_request) { |
| return callback_request.holds_async_operation(operation); |
| }); |
| if (iter == shutdown_queue_.end()) { |
| auto callback_request = irq.CreateCallbackRequest(*this); |
| shutdown_queue_.push_back(std::move(callback_request)); |
| } |
| // If the irq is still in the list, unbinding shouldn't fail. |
| // The only case would be if the async loop is also shutting down, |
| // but we shouldn't do that before all the driver dispatchers have completed shutdown. |
| ZX_ASSERT_MSG(irq.Unbind(), "Dispatcher::ShutdownAsync failed to unbind irq"); |
| } |
| registered_tokens = std::move(registered_tokens_); |
| } |
| |
| for (auto irq = unbound_irqs.pop_front(); irq; irq = unbound_irqs.pop_front()) { |
| // Though the irq has been unbound, it's possible that another |process_shared_dispatcher| |
| // thread has already pulled an irq packet from the port and may attempt to call the irq |
| // handler. Delay destroying our irq wrapper for a bit in case this race condition happens. |
| thread_pool_->CacheUnboundIrq(std::move(irq)); |
| } |
| |
| // We want |fdf_dispatcher_get_current_dispatcher| to work in cancellation and shutdown callbacks. |
| driver_context::PushDriver(owner_, this); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| // We remove one item at a time from the shutdown queue, in case someone |
| // tries to cancel a wait (which has not been canceled yet) from within a |
| // canceled callback. We don't use fbl::AutoLock as we want to be able to |
| // release and re-acquire the lock in the loop. |
| callback_lock_.Acquire(); |
| while (!shutdown_queue_.is_empty()) { |
| auto callback_request = shutdown_queue_.pop_front(); |
| ZX_ASSERT(callback_request); |
| // Call the callbacks outside the lock. |
| callback_lock_.Release(); |
| callback_request->Call(std::move(callback_request), ZX_ERR_CANCELED); |
| callback_lock_.Acquire(); |
| } |
| callback_lock_.Release(); |
| |
| for (auto token : registered_tokens) { |
| token->handler(static_cast<fdf_dispatcher_t*>(this), token, ZX_ERR_CANCELED, |
| FDF_HANDLE_INVALID); |
| } |
| |
| fdf_dispatcher_shutdown_observer_t* shutdown_observer = nullptr; |
| { |
| fbl::AutoLock lock(&callback_lock_); |
| state_ = DispatcherState::kShutdown; |
| shutdown_observer = shutdown_observer_; |
| } |
| GetDispatcherCoordinator().NotifyDispatcherShutdown(*this, std::move(shutdown_observer)); |
| } |
| |
| void Dispatcher::Destroy() { |
| { |
| fbl::AutoLock lock(&callback_lock_); |
| ZX_ASSERT(state_ == DispatcherState::kShutdown); |
| state_ = DispatcherState::kDestroyed; |
| } |
| // Recover the reference created in |CreateWithAdder|. |
| auto dispatcher_ref = fbl::ImportFromRawPtr(this); |
| GetDispatcherCoordinator().RemoveDispatcher(*this); |
| } |
| |
| zx_status_t Dispatcher::Seal(uint32_t option) { |
| if (option != FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS) { |
| return ZX_ERR_INVALID_ARGS; |
| } |
| |
| { |
| fbl::AutoLock lock(&callback_lock_); |
| |
| if (driver_context::GetCurrentDispatcher() != this || !IsRunningLocked()) { |
| return ZX_ERR_BAD_STATE; |
| } |
| |
| if (!allow_sync_calls_) { |
| return ZX_ERR_ALREADY_EXISTS; |
| } |
| |
| // Set our field. |
| allow_sync_calls_ = false; |
| } |
| |
| // Tell our thread pool to remove a thread as we no longer have allow_sync_calls_, which caused |
| // an extra thread to be added when this dispatcher was initially created. |
| return thread_pool()->RemoveThread(); |
| } |
| |
| // async_dispatcher_t implementation |
| |
| zx_time_t Dispatcher::GetTime() { return zx_clock_get_monotonic(); } |
| |
| zx_status_t Dispatcher::BeginWait(async_wait_t* wait) { |
| fbl::AutoLock lock(&callback_lock_); |
| if (!IsRunningLocked()) { |
| return ZX_ERR_BAD_STATE; |
| } |
| // TODO(92740): we should do something more efficient rather than creating a new |
| // AsyncWait each time. |
| auto async_wait = std::make_unique<AsyncWait>(wait, *this); |
| return AsyncWait::BeginWait(std::move(async_wait), *this); |
| } |
| |
| zx_status_t Dispatcher::CancelWait(async_wait_t* wait) { |
| // The implementation of this method has to be more complicated than simply |
| // |
| // return async_cancel_wait(wait); |
| // |
| // because the dispatcher wraps the wait's callback with its own custom callback, |
| // |OnSignal|, so there is an interval between the wait being pulled off the port and the wait's |
| // callback being invoked, during which we need to implement custom logic to cancel the wait. |
| |
| // First, try to cancel the async wait from the shared dispatcher. |
| auto* async_wait = reinterpret_cast<AsyncWait*>(wait->state.reserved[0]); |
| if (async_wait != nullptr) { |
| if (async_wait->Cancel()) { |
| // We shouldn't have to worry about racing anyone if cancelation was successful. |
| ZX_ASSERT(RemoveWait(async_wait) != nullptr); |
| return ZX_OK; |
| } |
| |
| // async_wait->Cancel() will fail in the case that the wait has already been pulled off the |
| // port. |
| } |
| |
| // Second, try to cancel it from the callback queue. |
| fbl::AutoLock lock(&callback_lock_); |
| auto callback_request = CancelAsyncOperationLocked(wait); |
| if (callback_request) { |
| return ZX_OK; |
| } else if (unsynchronized()) { |
| return ZX_ERR_NOT_FOUND; |
| } else { |
| // The async_wait is set to null right before the callback is invoked, so if it is null it's too |
| // late to cancel. If the caller of |CancelWait| is not a dispatcher-managed thread then we |
| // can't guarantee the dispatcher isn't currently invoking the callback. |
| if (async_wait == nullptr || driver_context::GetCurrentDispatcher() != this) { |
| return ZX_ERR_NOT_FOUND; |
| } |
| |
| // If we failed to cancel it from the callback queue and we are a synchronized dispatcher, |
| // then another thread must have pulled the packet from the port and is about to queue the |
| // callback (i.e., it is sitting in |OnSignal| right before |QueueWait|). We mark the wait as |
| // pending cancellation so that it is cancelled rather than invoked when |QueueWait| is called. |
| async_wait->MarkPendingCancellation(); |
| return ZX_OK; |
| } |
| } |
| |
| zx::time Dispatcher::GetNextTimeoutLocked() const { |
| // Check delayed tasks only when callback_queue_ is empty. We will routinely check if delayed |
| // tasks can be moved into the callback queue anyways and reset the timer whenever callback queue |
| // is empty. |
| if (callback_queue_.is_empty()) { |
| if (delayed_tasks_.is_empty()) { |
| return zx::time::infinite(); |
| } |
| return static_cast<const DelayedTask*>(&delayed_tasks_.front())->deadline; |
| } |
| return zx::time::infinite(); |
| } |
| |
| void Dispatcher::ResetTimerLocked() { |
| zx::time deadline = GetNextTimeoutLocked(); |
| if (deadline == zx::time::infinite()) { |
| // Nothing is left on the queue to fire. |
| timer_.Cancel(); |
| return; |
| } |
| |
| // The tradeoff of using a task instead of a dedicated timer is that we need to cancel the task |
| // every time a task with a shorter deadline comes in. This isn't really too bad, assuming there |
| // is at least two delayed tasks scheduled, otherwise the timer will be canceled. If we used a |
| // custom implementation for our shared process loop, then we could also have an |
| // "UpdateTaskDeadline" method on tasks which would allow us to shift the deadline as necessary, |
| // without risking the need to cancel the task. |
| |
| if (timer_.current_deadline() > deadline && timer_.Cancel() == ZX_OK) { |
| timer_.BeginWait(deadline); |
| } |
| } |
| |
| void Dispatcher::InsertDelayedTaskSortedLocked(std::unique_ptr<DelayedTask> task) { |
| // Find the first node that is bigger and insert before it. fbl::DoublyLinkedList handles all of |
| // the edge cases for us. |
| auto iter = delayed_tasks_.find_if([&](const CallbackRequest& other) { |
| return static_cast<const DelayedTask*>(&other)->deadline > task->deadline; |
| }); |
| delayed_tasks_.insert(iter, std::move(task)); |
| } |
| |
| void Dispatcher::CheckDelayedTasksLocked() { |
| if (!IsRunningLocked()) { |
| IdleCheckLocked(); |
| return; |
| } |
| zx::time now = zx::clock::get_monotonic(); |
| auto iter = delayed_tasks_.find_if([&](const CallbackRequest& task) { |
| return static_cast<const DelayedTask*>(&task)->deadline > now; |
| }); |
| if (iter != delayed_tasks_.begin()) { |
| fbl::DoublyLinkedList<std::unique_ptr<CallbackRequest>> done_tasks; |
| done_tasks = delayed_tasks_.split_after(--iter); |
| // split_after removes the tasks which are *not* done, so we must swap the lists to get desired |
| // result. |
| std::swap(delayed_tasks_, done_tasks); |
| callback_queue_.splice(callback_queue_.end(), done_tasks); |
| if (event_waiter_ && !event_waiter_->signaled()) { |
| event_waiter_->signal(); |
| } |
| } else { |
| ResetTimerLocked(); |
| } |
| } |
| |
| void Dispatcher::Timer::Handler() { |
| { |
| fbl::AutoLock al(&dispatcher_->callback_lock_); |
| current_deadline_ = zx::time::infinite(); |
| dispatcher_->CheckDelayedTasksLocked(); |
| } |
| dispatcher_->thread_pool()->OnThreadWakeup(); |
| { |
| fbl::AutoLock lock(&dispatcher_->callback_lock_); |
| // Check if the dispatcher is shutting down and waiting for the handler to complete. |
| if (!dispatcher_->IsRunningLocked()) { |
| dispatcher_->shutdown_waiting_for_timer_ = false; |
| dispatcher_->IdleCheckLocked(); |
| } |
| } |
| } |
| |
| zx_status_t Dispatcher::PostTask(async_task_t* task) { |
| driver_runtime::Callback callback = |
| [this, task](std::unique_ptr<driver_runtime::CallbackRequest> callback_request, |
| zx_status_t status) { task->handler(this, task, status); }; |
| |
| const zx::time now = zx::clock::get_monotonic(); |
| if (zx::time(task->deadline) <= now) { |
| // TODO(92740): we should do something more efficient rather than creating a new |
| // callback request each time. |
| auto callback_request = |
| std::make_unique<driver_runtime::CallbackRequest>(CallbackRequest::RequestType::kTask); |
| callback_request->SetCallback(static_cast<fdf_dispatcher_t*>(this), std::move(callback), task); |
| CallbackRequest* callback_ptr = callback_request.get(); |
| callback_request = RegisterCallbackWithoutQueueing(std::move(callback_request)); |
| // Dispatcher returned callback request as queueing failed. |
| if (callback_request) { |
| return ZX_ERR_BAD_STATE; |
| } |
| QueueRegisteredCallback(callback_ptr, ZX_OK); |
| } else { |
| if (task->deadline == ZX_TIME_INFINITE) { |
| // Tasks must complete. |
| return ZX_ERR_INVALID_ARGS; |
| } |
| auto delayed_task = std::make_unique<DelayedTask>(zx::time(task->deadline)); |
| delayed_task->SetCallback(static_cast<fdf_dispatcher_t*>(this), std::move(callback), task); |
| |
| fbl::AutoLock al(&callback_lock_); |
| InsertDelayedTaskSortedLocked(std::move(delayed_task)); |
| ResetTimerLocked(); |
| } |
| return ZX_OK; |
| } |
| |
| zx_status_t Dispatcher::CancelTask(async_task_t* task) { |
| fbl::AutoLock lock(&callback_lock_); |
| auto callback_request = CancelAsyncOperationLocked(task); |
| return callback_request ? ZX_OK : ZX_ERR_NOT_FOUND; |
| } |
| |
| zx_status_t Dispatcher::QueuePacket(async_receiver_t* receiver, const zx_packet_user_t* data) { |
| fbl::AutoLock lock(&callback_lock_); |
| if (!IsRunningLocked()) { |
| return ZX_ERR_BAD_STATE; |
| } |
| return async_queue_packet(process_shared_dispatcher_, receiver, data); |
| } |
| |
| zx_status_t Dispatcher::BindIrq(async_irq_t* irq) { |
| if (unsynchronized()) { |
| // TODO(https://fxbug.dev/42052791): support interrupts on unsynchronized dispatchers. |
| return ZX_ERR_NOT_SUPPORTED; |
| } |
| |
| fbl::AutoLock lock(&callback_lock_); |
| if (!IsRunningLocked()) { |
| return ZX_ERR_BAD_STATE; |
| } |
| auto async_irq = std::make_unique<AsyncIrq>(irq, *this); |
| return AsyncIrq::Bind(std::move(async_irq), *this); |
| } |
| |
| zx_status_t Dispatcher::UnbindIrq(async_irq_t* irq) { |
| if (unsynchronized()) { |
| // TODO(https://fxbug.dev/42052791): support interrupts on unsynchronized dispatchers. |
| return ZX_ERR_NOT_SUPPORTED; |
| } |
| |
| auto* async_irq = reinterpret_cast<AsyncIrq*>(irq->state.reserved[0]); |
| if (!async_irq) { |
| return ZX_ERR_NOT_FOUND; |
| } |
| // Check that the irq is unbound from the same dispatcher it was bound to. |
| auto cur_dispatcher = driver_context::GetCurrentDispatcher(); |
| if (!cur_dispatcher || (cur_dispatcher != async_irq->GetDispatcherRef().get())) { |
| return ZX_ERR_BAD_STATE; |
| } |
| |
| std::unique_ptr<AsyncIrq> unbound_irq; |
| { |
| // The |callback_lock_| must be held across clearing the |dispatcher_ref_| in |
| // the irq, and removing any queued callback request for the irq. |
| fbl::AutoLock lock(&callback_lock_); |
| if (!async_irq->Unbind()) { |
| return ZX_ERR_NOT_FOUND; |
| } |
| unbound_irq = RemoveIrqLocked(async_irq); |
| ZX_ASSERT(unbound_irq != nullptr); |
| // If the irq has been triggered, there may be a callback request queued. |
| CancelAsyncOperationLocked(async_irq); |
| } |
| // Though the irq has been unbound, it's possible that another |process_shared_dispatcher| |
| // thread has already pulled an irq packet from the port and may attempt to call the irq |
| // handler. Delay destroying our irq wrapper for a bit in case this race condition happens. |
| thread_pool_->CacheUnboundIrq(std::move(unbound_irq)); |
| return ZX_OK; |
| } |
| |
| namespace { |
| |
| const char kSequenceIdWrongDispatcherType[] = |
| "A synchronized fdf_dispatcher_t is required. " |
| "Ensure the fdf_dispatcher_t does not have the |FDF_DISPATCHER_OPTION_UNSYNCHRONIZED| option."; |
| |
| const char kSequenceIdUnknownThread[] = |
| "The current thread is not managed by a driver dispatcher. " |
| "Ensure the object is always used from a dispatcher managed thread."; |
| |
| const char kSequenceIdWrongDispatcherInstance[] = |
| "Access from multiple driver dispatchers detected. " |
| "This is not allowed. Ensure the object is used from the same |fdf_dispatcher_t|."; |
| |
| } // namespace |
| |
| zx_status_t Dispatcher::GetSequenceId(async_sequence_id_t* out_sequence_id, |
| const char** out_error) { |
| if (unsynchronized()) { |
| *out_error = kSequenceIdWrongDispatcherType; |
| return ZX_ERR_WRONG_TYPE; |
| } |
| auto* current_dispatcher = driver_context::GetCurrentDispatcher(); |
| if (current_dispatcher == nullptr) { |
| *out_error = kSequenceIdUnknownThread; |
| return ZX_ERR_INVALID_ARGS; |
| } |
| if (current_dispatcher != this) { |
| *out_error = kSequenceIdWrongDispatcherInstance; |
| return ZX_ERR_INVALID_ARGS; |
| } |
| out_sequence_id->value = reinterpret_cast<uint64_t>(this); |
| return ZX_OK; |
| } |
| |
| zx_status_t Dispatcher::CheckSequenceId(async_sequence_id_t sequence_id, const char** out_error) { |
| async_sequence_id_t current_sequence_id; |
| zx_status_t status = GetSequenceId(¤t_sequence_id, out_error); |
| if (status != ZX_OK) { |
| return status; |
| } |
| if (current_sequence_id.value != sequence_id.value) { |
| *out_error = kSequenceIdWrongDispatcherInstance; |
| return ZX_ERR_OUT_OF_RANGE; |
| } |
| return ZX_OK; |
| } |
| |
| std::unique_ptr<driver_runtime::CallbackRequest> Dispatcher::RegisterCallbackWithoutQueueing( |
| std::unique_ptr<driver_runtime::CallbackRequest> callback_request) { |
| fbl::AutoLock lock(&callback_lock_); |
| if (!IsRunningLocked()) { |
| return callback_request; |
| } |
| registered_callbacks_.push_back(std::move(callback_request)); |
| return nullptr; |
| } |
| |
| fit::result<Dispatcher::NonInlinedReason> Dispatcher::ShouldInline( |
| std::unique_ptr<CallbackRequest>& callback_request) { |
| auto req_type = callback_request->request_type(); |
| |
| if (!unsynchronized_) { |
| // Blocking dispatchers are required to queue all callbacks onto the async loop. |
| if (allow_sync_calls_) { |
| return fit::error(NonInlinedReason::kAllowSyncCalls); |
| } |
| // Synchronous dispatchers do not allow parallel callbacks. If we are already |
| // dispatching a request on another thread, we will have to queue this request for later. |
| if (dispatching_sync_) { |
| return fit::error(NonInlinedReason::kDispatchingOnAnotherThread); |
| } |
| // TODO(https://fxbug.dev/42180471): we should be able to remove the task check once we track |
| // drivers through banjo calls, or start each DFv2 driver with a ALLOW_SYNC_CALLS |
| // dispatcher. |
| if (req_type == CallbackRequest::RequestType::kTask) { |
| return fit::error(NonInlinedReason::kTask); |
| } |
| } |
| // Callbacks that are for waits or irqs can skip the reentrancy check. |
| // This is as they are always first registered on the global async loop which |
| // will initiate the callback when ready, at which point the driver call stack |
| // will be empty, but we still want to consider it not reentrant and directly |
| // call into the driver. |
| bool is_global_loop_callback = (req_type == CallbackRequest::RequestType::kIrq) || |
| (req_type == CallbackRequest::RequestType::kWait); |
| if (is_global_loop_callback) { |
| return fit::ok(); |
| } |
| // Check if the call would be reentrant, in which case we will queue it up to be run |
| // later. |
| // |
| // If it is unknown which driver is calling this function, it is considered |
| // to be potentially reentrant. |
| // |
| // The call stack may be empty if the user writes to a channel, or registers a |
| // read callback on a thread not managed by the driver runtime. |
| // We use |GetCurrentDriver| rather than |IsCallStackEmpty| as this also |
| // handles the case where the testing dispatcher is set as the thread's default dispatcher. |
| if (!driver_context::GetCurrentDriver()) { |
| return fit::error(NonInlinedReason::kUnknownThread); |
| } |
| if ((driver_context::GetCurrentDriver() == owner_) || |
| driver_context::IsDriverInCallStack(owner_)) { |
| return fit::error(NonInlinedReason::kReentrant); |
| } |
| return fit::ok(); |
| } |
| |
| void Dispatcher::QueueRegisteredCallback(driver_runtime::CallbackRequest* request, |
| zx_status_t callback_reason, bool was_deferred) { |
| TRACE_DURATION("driver_runtime", "Dispatcher::QueueRegisteredCallback", "dispatcher_name", |
| name_.c_str(), "callback_reason", zx_status_get_string(callback_reason), |
| "was_deferred", TA_BOOL(was_deferred)); |
| |
| ZX_ASSERT(request); |
| |
| auto decrement_and_idle_check = fit::defer([this]() { |
| fbl::AutoLock lock(&callback_lock_); |
| ZX_ASSERT(num_active_threads_ > 0); |
| num_active_threads_--; |
| IdleCheckLocked(); |
| }); |
| |
| std::unique_ptr<driver_runtime::CallbackRequest> callback_request; |
| { |
| fbl::AutoLock lock(&callback_lock_); |
| // It's possible that we are being called from a |Channel::Write| on the peer of a channel |
| // that is registered on this dispatcher. This means that there is no guarantee that the |
| // dispatcher will not enter |CompleteShutdown| between when we return from this check |
| // and when we decrement |num_active_threads_| in |decrement_and_idle_check|. |
| // Instead do not increment |num_active_threads_| until after this check. |
| if (!IsRunningLocked()) { |
| decrement_and_idle_check.cancel(); |
| // We still want to do |IdleCheckLocked|, in case this is a completed |Wait| being processed. |
| IdleCheckLocked(); |
| return; |
| } |
| num_active_threads_++; |
| |
| // Finding the callback request may fail if the request was cancelled in the meanwhile. |
| // This is possible if the channel was about to queue the registered callback (in response |
| // to a channel write or a peer channel closing), but the client cancelled the callback. |
| // |
| // Calling |request->InContainer| may crash if the callback request was destructed between |
| // when we called |RegisterCallbackWithoutQueueing| and now. |
| // TODO(https://fxbug.dev/42053744): if we change CallbackRequests to use RefPtrs, we should be |
| // able to switch this back to an |InContainer| check. |
| callback_request = |
| registered_callbacks_.erase_if([request](const CallbackRequest& callback_request) { |
| return &callback_request == request; |
| }); |
| if (!callback_request) { |
| return; |
| } |
| callback_request->SetCallbackReason(callback_reason); |
| |
| // Whether we want to call the callback now, or queue it to be run on the async loop. |
| fit::result<NonInlinedReason> should_inline = ShouldInline(callback_request); |
| debug_stats_.num_total_requests++; |
| if (should_inline.is_error()) { |
| callback_queue_.push_back(std::move(callback_request)); |
| if (event_waiter_ && !event_waiter_->signaled()) { |
| event_waiter_->signal(); |
| } |
| |
| // If the message was not inlined earlier due to the wait not yet been ready, |
| // we should make sure this reason is displayed even if any other of the |
| // reasons also apply. |
| if (was_deferred) { |
| debug_stats_.non_inlined.channel_wait_not_yet_registered++; |
| } else { |
| switch (should_inline.error_value()) { |
| case kAllowSyncCalls: |
| debug_stats_.non_inlined.allow_sync_calls++; |
| break; |
| case kDispatchingOnAnotherThread: |
| debug_stats_.non_inlined.parallel_dispatch++; |
| break; |
| case kTask: |
| debug_stats_.non_inlined.task++; |
| break; |
| case kUnknownThread: |
| debug_stats_.non_inlined.unknown_thread++; |
| break; |
| case kReentrant: |
| debug_stats_.non_inlined.reentrant++; |
| break; |
| default: |
| LOGF(ERROR, "Unhandled NonInlinedReason"); |
| }; |
| } |
| return; |
| } |
| // The request was not queued earlier, so we don't count it as inlined in the stats, |
| // even though it is getting inlined in this specific instance. |
| if (was_deferred) { |
| debug_stats_.non_inlined.channel_wait_not_yet_registered++; |
| } else { |
| debug_stats_.num_inlined_requests++; |
| } |
| dispatching_sync_ = true; |
| } |
| DispatchCallback(std::move(callback_request)); |
| |
| fbl::AutoLock lock(&callback_lock_); |
| dispatching_sync_ = false; |
| if (!callback_queue_.is_empty() && event_waiter_ && !event_waiter_->signaled() && |
| IsRunningLocked()) { |
| event_waiter_->signal(); |
| } |
| } |
| |
| void Dispatcher::AddWaitLocked(std::unique_ptr<Dispatcher::AsyncWait> wait) { |
| ZX_DEBUG_ASSERT(!fbl::InContainer<AsyncWaitTag>(*wait)); |
| waits_.push_back(std::move(wait)); |
| } |
| |
| std::unique_ptr<Dispatcher::AsyncWait> Dispatcher::RemoveWait(Dispatcher::AsyncWait* wait) { |
| fbl::AutoLock al(&callback_lock_); |
| return RemoveWaitLocked(wait); |
| } |
| |
| std::unique_ptr<Dispatcher::AsyncWait> Dispatcher::RemoveWaitLocked(Dispatcher::AsyncWait* wait) { |
| ZX_DEBUG_ASSERT(fbl::InContainer<AsyncWaitTag>(*wait)); |
| auto ret = waits_.erase(*wait); |
| IdleCheckLocked(); |
| return ret; |
| } |
| |
| void Dispatcher::QueueWait(Dispatcher::AsyncWait* wait, zx_status_t status) { |
| fbl::AutoLock al(&callback_lock_); |
| |
| ZX_DEBUG_ASSERT(fbl::InContainer<AsyncWaitTag>(*wait)); |
| if (wait->is_pending_cancellation()) { |
| // Wait was cancelled so we return immediately without invoking the callback. |
| waits_.erase(*wait); |
| // In case this is the last wait that shutdown is waiting on. |
| IdleCheckLocked(); |
| return; |
| } |
| |
| if (!IsRunningLocked()) { |
| // We are waiting for all outstanding waits to be completed. They will be serviced in |
| // CompleteDestroy. |
| shutdown_queue_.push_back(waits_.erase(*wait)); |
| IdleCheckLocked(); |
| } else { |
| registered_callbacks_.push_back(waits_.erase(*wait)); |
| al.release(); |
| QueueRegisteredCallback(wait, status); |
| } |
| } |
| |
| void Dispatcher::AddIrqLocked(std::unique_ptr<Dispatcher::AsyncIrq> irq) { |
| ZX_DEBUG_ASSERT(!irq->InContainer()); |
| irqs_.push_back(std::move(irq)); |
| } |
| |
| std::unique_ptr<Dispatcher::AsyncIrq> Dispatcher::RemoveIrqLocked(Dispatcher::AsyncIrq* irq) { |
| ZX_DEBUG_ASSERT(irq->InContainer()); |
| return irqs_.erase(*irq); |
| } |
| |
| void Dispatcher::QueueIrq(AsyncIrq* irq, zx_status_t status) { |
| auto callback_request = irq->CreateCallbackRequest(*this); |
| CallbackRequest* callback_ptr = callback_request.get(); |
| |
| { |
| fbl::AutoLock al(&callback_lock_); |
| |
| // If the dispatcher is shutting down, we will not deliver any more irqs to the user. |
| // |CompleteShutdown| will call the irq handler with |ZX_ERR_CANCELED|. |
| if (!IsRunningLocked()) { |
| return; |
| } |
| if (!irq->GetDispatcherRef()) { |
| // It's possible that the irq was unbound before we acquired the |callback_lock_|. |
| return; |
| } |
| // Unbinding only happens while the |callback_lock_| is held, so we don't |
| // need to hold the irq lock while we register this callback request. |
| registered_callbacks_.push_back(std::move(callback_request)); |
| } |
| // If the irq is unbound before calling this, it will remove the callback request from |
| // |registered_callbacks_|. |
| QueueRegisteredCallback(callback_ptr, status); |
| } |
| |
| std::unique_ptr<CallbackRequest> Dispatcher::CancelCallback(CallbackRequest& request_to_cancel) { |
| fbl::AutoLock lock(&callback_lock_); |
| |
| // The request can be in |registered_callbacks_|, |callback_queue_| or |shutdown_queue_|. |
| if (request_to_cancel.InContainer()) { |
| return request_to_cancel.RemoveFromContainer(); |
| } |
| return nullptr; |
| } |
| |
| bool Dispatcher::SetCallbackReason(CallbackRequest* callback_to_update, |
| zx_status_t callback_reason) { |
| fbl::AutoLock lock(&callback_lock_); |
| auto iter = callback_queue_.find_if( |
| [callback_to_update](auto& callback) -> bool { return &callback == callback_to_update; }); |
| if (iter == callback_queue_.end()) { |
| return false; |
| } |
| callback_to_update->SetCallbackReason(callback_reason); |
| return true; |
| } |
| |
| std::unique_ptr<CallbackRequest> Dispatcher::CancelAsyncOperationLocked(void* operation) { |
| auto iter = registered_callbacks_.erase_if([operation](const CallbackRequest& callback_request) { |
| return callback_request.holds_async_operation(operation); |
| }); |
| if (iter) { |
| return iter; |
| } |
| iter = callback_queue_.erase_if([operation](const CallbackRequest& callback_request) { |
| return callback_request.holds_async_operation(operation); |
| }); |
| if (iter) { |
| return iter; |
| } |
| iter = shutdown_queue_.erase_if([operation](const CallbackRequest& callback_request) { |
| return callback_request.holds_async_operation(operation); |
| }); |
| if (iter) { |
| return iter; |
| } |
| iter = delayed_tasks_.erase_if([operation](const CallbackRequest& callback_request) { |
| return callback_request.holds_async_operation(operation); |
| }); |
| if (iter) { |
| ResetTimerLocked(); |
| } |
| return iter; |
| } |
| |
| void Dispatcher::DispatchCallback( |
| std::unique_ptr<driver_runtime::CallbackRequest> callback_request) { |
| TRACE_DURATION("driver_runtime", "Dispatcher::DispatchCallback", "dispatcher_name", |
| name_.c_str()); |
| |
| driver_context::PushDriver(owner_, this); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| callback_request->Call(std::move(callback_request), ZX_OK); |
| } |
| |
| void Dispatcher::DispatchCallbacks(std::unique_ptr<EventWaiter> event_waiter, |
| fbl::RefPtr<Dispatcher> dispatcher_ref) { |
| ZX_ASSERT(dispatcher_ref != nullptr); |
| |
| auto defer = fit::defer([&]() { |
| fbl::AutoLock lock(&callback_lock_); |
| |
| if (event_waiter) { |
| // We call |BeginWaitWithRef| even when shutting down so that the |event_waiter| |
| // stays alive until the dispatcher is destroyed. This allows |IsIdleLocked| to |
| // correctly check the state of the event waiter. |CompleteShutdown| will cancel |
| // and drop the event waiter. |
| zx_status_t status = event_waiter->BeginWaitWithRef(std::move(event_waiter), dispatcher_ref); |
| if (status == ZX_ERR_BAD_STATE) { |
| event_waiter_ = nullptr; |
| } |
| } |
| ZX_ASSERT(num_active_threads_ > 0); |
| num_active_threads_--; |
| IdleCheckLocked(); |
| }); |
| |
| uint32_t num_callbacks_dispatched = 0; |
| |
| fbl::DoublyLinkedList<std::unique_ptr<CallbackRequest>> to_call; |
| { |
| fbl::AutoLock lock(&callback_lock_); |
| num_active_threads_++; |
| |
| // Parallel callbacks are not allowed in synchronized dispatchers. |
| // We should not be scheduled to run on two different dispatcher threads, |
| // but it's possible we could still get here if we are currently doing a |
| // direct call into the driver. In this case, we should designal the event |
| // waiter, and once the direct call completes it will signal it again. |
| if ((!unsynchronized_ && dispatching_sync_) || !IsRunningLocked()) { |
| event_waiter->designal(); |
| return; |
| } |
| dispatching_sync_ = true; |
| |
| num_callbacks_dispatched += TakeNextCallbacks(&to_call); |
| |
| // Check if there are callbacks left to process and we should wake up an additional |
| // thread. For synchronized dispatchers, parallel callbacks are disallowed. |
| if (unsynchronized_ && !callback_queue_.is_empty()) { |
| zx_status_t status = event_waiter->BeginWaitWithRef(std::move(event_waiter), dispatcher_ref); |
| if (status == ZX_ERR_BAD_STATE) { |
| event_waiter_ = nullptr; |
| } |
| } |
| } |
| |
| while (true) { |
| // Call the callbacks outside of the lock. |
| while (!to_call.is_empty()) { |
| auto callback_request = to_call.pop_front(); |
| ZX_ASSERT(callback_request); |
| DispatchCallback(std::move(callback_request)); |
| } |
| |
| { |
| fbl::AutoLock lock(&callback_lock_); |
| // Check if there are any more callbacks to dispatch. This may be the case |
| // if we were dispatching an async operation, or if the user queued more |
| // operations during the last callback. |
| if (!callback_queue_.is_empty() && (num_callbacks_dispatched < kBatchSize)) { |
| num_callbacks_dispatched += TakeNextCallbacks(&to_call); |
| // Time to dispatch more callbacks. |
| continue; |
| } |
| |
| // If we woke up an additional thread, that thread will update the |
| // event waiter signals as necessary. |
| if (!event_waiter) { |
| return; |
| } |
| dispatching_sync_ = false; |
| ResetTimerLocked(); |
| if (callback_queue_.is_empty() && event_waiter->signaled()) { |
| event_waiter->designal(); |
| } |
| return; |
| } |
| } |
| } |
| |
| uint32_t Dispatcher::TakeNextCallbacks( |
| fbl::DoublyLinkedList<std::unique_ptr<CallbackRequest>>* out_callbacks) { |
| // For synchronized dispatchers, cancellation of ChannelReads are guaranteed to succeed. |
| // Since cancellation may be called from the ChannelRead, or from another async operation |
| // (like a task), we need to make sure that if we are calling an async operation |
| // that is the only callback request pulled from the callback queue. |
| // This will guarantee that cancellation will always succeed without having to lock |
| // |to_call|. |
| bool has_async_op = false; |
| uint32_t n = 0; |
| while ((n < kBatchSize) && !callback_queue_.is_empty() && !has_async_op) { |
| std::unique_ptr<CallbackRequest> callback_request = callback_queue_.pop_front(); |
| ZX_ASSERT(callback_request); |
| has_async_op = !unsynchronized_ && callback_request->has_async_operation(); |
| // For synchronized dispatchers, an async operation should be the only member in |
| // |to_call|. |
| if (has_async_op && n > 0) { |
| callback_queue_.push_front(std::move(callback_request)); |
| break; |
| } |
| out_callbacks->push_back(std::move(callback_request)); |
| n++; |
| } |
| return n; |
| } |
| |
| zx::result<zx::event> Dispatcher::RegisterForCompleteShutdownEvent() { |
| fbl::AutoLock lock_(&callback_lock_); |
| auto event = complete_shutdown_event_manager_.GetEvent(); |
| if (event.is_error()) { |
| return event; |
| } |
| if (IsIdleLocked() && !HasFutureOpsScheduledLocked()) { |
| zx_status_t status = complete_shutdown_event_manager_.Signal(); |
| if (status != ZX_OK) { |
| return zx::error(status); |
| } |
| } |
| return event; |
| } |
| |
| void Dispatcher::WaitUntilIdle() { |
| ZX_ASSERT(!IsRuntimeManagedThread()); |
| |
| fbl::AutoLock lock_(&callback_lock_); |
| if (IsIdleLocked()) { |
| return; |
| } |
| idle_event_.Wait(&callback_lock_); |
| return; |
| } |
| |
| bool Dispatcher::IsIdleLocked() { |
| // If the event waiter was signaled, the thread will be scheduled to run soon. |
| return (num_active_threads_ == 0) && callback_queue_.is_empty() && |
| (!event_waiter_ || !event_waiter_->signaled()); |
| } |
| |
| bool Dispatcher::HasFutureOpsScheduledLocked() { |
| return !waits_.is_empty() || timer_.is_armed() || shutdown_waiting_for_timer_; |
| } |
| |
| void Dispatcher::IdleCheckLocked() { |
| if (IsIdleLocked()) { |
| idle_event_.Broadcast(); |
| if (!HasFutureOpsScheduledLocked()) { |
| complete_shutdown_event_manager_.Signal(); |
| } |
| } |
| } |
| |
| bool Dispatcher::HasQueuedTasks() { |
| fbl::AutoLock lock(&callback_lock_); |
| |
| for (auto& callback_request : callback_queue_) { |
| if (callback_request.request_type() == CallbackRequest::RequestType::kTask) { |
| return true; |
| } |
| } |
| for (auto& callback_request : shutdown_queue_) { |
| if (callback_request.request_type() == CallbackRequest::RequestType::kTask) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| void Dispatcher::EventWaiter::HandleEvent(std::unique_ptr<EventWaiter> event_waiter, |
| async_dispatcher_t* dispatcher, async::WaitBase* wait, |
| zx_status_t status, const zx_packet_signal_t* signal) { |
| if (status == ZX_ERR_CANCELED) { |
| LOGF(TRACE, "Dispatcher: event waiter shutting down\n"); |
| event_waiter->dispatcher_ref_->SetEventWaiter(nullptr); |
| event_waiter->dispatcher_ref_ = nullptr; |
| return; |
| } else if (status != ZX_OK) { |
| LOGF(ERROR, "Dispatcher: event waiter error: %d\n", status); |
| event_waiter->dispatcher_ref_->SetEventWaiter(nullptr); |
| event_waiter->dispatcher_ref_ = nullptr; |
| return; |
| } |
| |
| if (signal->observed & ZX_USER_SIGNAL_0) { |
| // The callback is in charge of calling |BeginWaitWithRef| on the event waiter. |
| fbl::RefPtr<Dispatcher> dispatcher_ref = std::move(event_waiter->dispatcher_ref_); |
| event_waiter->InvokeCallback(std::move(event_waiter), dispatcher_ref); |
| } else { |
| LOGF(ERROR, "Dispatcher: event waiter got unexpected signals: %x\n", signal->observed); |
| } |
| } |
| |
| // static |
| zx_status_t Dispatcher::EventWaiter::BeginWaitWithRef(std::unique_ptr<EventWaiter> event, |
| fbl::RefPtr<Dispatcher> dispatcher) { |
| ZX_ASSERT(dispatcher != nullptr); |
| event->dispatcher_ref_ = dispatcher; |
| return BeginWait(std::move(event), dispatcher->process_shared_dispatcher_); |
| } |
| |
| zx::result<zx::event> Dispatcher::CompleteShutdownEventManager::GetEvent() { |
| if (!event_.is_valid()) { |
| // If this is the first waiter to register, we need to create the |
| // idle event manager's event. |
| zx_status_t status = zx::event::create(0, &event_); |
| if (status != ZX_OK) { |
| return zx::error(status); |
| } |
| } |
| zx::event dup; |
| zx_status_t status = event_.duplicate(ZX_RIGHTS_BASIC, &dup); |
| if (status != ZX_OK) { |
| return zx::error(status); |
| } |
| return zx::ok(std::move(dup)); |
| } |
| |
| zx_status_t Dispatcher::CompleteShutdownEventManager::Signal() { |
| if (!event_.is_valid()) { |
| return ZX_OK; // No-one is waiting for idle events. |
| } |
| zx_status_t status = event_.signal(0u, ZX_EVENT_SIGNALED); |
| event_.reset(); |
| return status; |
| } |
| |
| zx_status_t Dispatcher::RegisterPendingToken(fdf_token_t* token) { |
| fbl::AutoLock lock(&callback_lock_); |
| if (!IsRunningLocked()) { |
| return ZX_ERR_BAD_STATE; |
| } |
| if (registered_tokens_.find(token) != registered_tokens_.end()) { |
| return ZX_ERR_BAD_STATE; |
| } |
| registered_tokens_.insert(token); |
| return ZX_OK; |
| } |
| |
| zx_status_t Dispatcher::ScheduleTokenCallback(fdf_token_t* token, zx_status_t status, |
| fdf::Channel channel) { |
| CallbackRequest* callback_request_ptr = nullptr; |
| |
| { |
| fbl::AutoLock lock(&callback_lock_); |
| if (!IsRunningLocked()) { |
| return ZX_ERR_BAD_STATE; |
| } |
| |
| auto callback_request = std::make_unique<CallbackRequest>(); |
| driver_runtime::Callback callback = |
| [dispatcher = this, token, channel = std::move(channel)]( |
| std::unique_ptr<driver_runtime::CallbackRequest> callback_request, |
| zx_status_t status) mutable { |
| token->handler(static_cast<fdf_dispatcher_t*>(dispatcher), token, status, |
| channel.release()); |
| }; |
| callback_request->SetCallback(static_cast<fdf_dispatcher_t*>(this), std::move(callback)); |
| |
| callback_request_ptr = callback_request.get(); |
| |
| registered_callbacks_.push_back(std::move(callback_request)); |
| registered_tokens_.erase(token); |
| } |
| |
| // If the dispatcher is shutdown in the meanwhile, the callback request will be completed |
| // with |ZX_ERR_CANCELED| in |CompleteShutdown|. |
| QueueRegisteredCallback(callback_request_ptr, status); |
| |
| return ZX_OK; |
| } |
| |
| // static |
| void DispatcherCoordinator::WaitUntilDispatchersIdle() { |
| std::vector<fbl::RefPtr<Dispatcher>> dispatchers; |
| { |
| fbl::AutoLock lock(&(GetDispatcherCoordinator().lock_)); |
| for (auto& driver : GetDispatcherCoordinator().drivers_) { |
| driver.GetDispatchers(dispatchers); |
| } |
| } |
| for (auto& d : dispatchers) { |
| d->WaitUntilIdle(); |
| } |
| } |
| |
| // static |
| void DispatcherCoordinator::WaitUntilDispatchersDestroyed() { |
| auto& coordinator = GetDispatcherCoordinator(); |
| fbl::AutoLock lock(&coordinator.lock_); |
| if (coordinator.AreAllDriversDestroyedLocked()) { |
| return; |
| } |
| coordinator.drivers_destroyed_event_.Wait(&coordinator.lock_); |
| } |
| |
| // static |
| zx_status_t DispatcherCoordinator::TestingRun(zx::time deadline, bool once) { |
| std::optional<Dispatcher::ThreadPool>& unmanaged_thread_pool = |
| GetDispatcherCoordinator().unmanaged_thread_pool_; |
| if (unmanaged_thread_pool.has_value()) { |
| return unmanaged_thread_pool.value().loop()->Run(deadline, once); |
| } |
| |
| return ZX_ERR_BAD_STATE; |
| } |
| |
| // static |
| zx_status_t DispatcherCoordinator::TestingRunUntilIdle() { |
| std::optional<Dispatcher::ThreadPool>& unmanaged_thread_pool = |
| GetDispatcherCoordinator().unmanaged_thread_pool_; |
| if (unmanaged_thread_pool.has_value()) { |
| return unmanaged_thread_pool.value().loop()->RunUntilIdle(); |
| } |
| |
| return ZX_ERR_BAD_STATE; |
| } |
| |
| // static |
| void DispatcherCoordinator::TestingQuit() { |
| std::optional<Dispatcher::ThreadPool>& unmanaged_thread_pool = |
| GetDispatcherCoordinator().unmanaged_thread_pool_; |
| if (unmanaged_thread_pool.has_value()) { |
| unmanaged_thread_pool.value().loop()->Quit(); |
| } |
| } |
| |
| // static |
| zx_status_t DispatcherCoordinator::TestingResetQuit() { |
| std::optional<Dispatcher::ThreadPool>& unmanaged_thread_pool = |
| GetDispatcherCoordinator().unmanaged_thread_pool_; |
| if (unmanaged_thread_pool.has_value()) { |
| return unmanaged_thread_pool.value().loop()->ResetQuit(); |
| } |
| |
| return ZX_ERR_BAD_STATE; |
| } |
| |
| // static |
| zx_status_t DispatcherCoordinator::ShutdownDispatchersAsync( |
| const void* driver, fdf_env_driver_shutdown_observer_t* observer) { |
| DriverState::DriverShutdownCallback shutdown_callback; |
| |
| std::vector<fbl::RefPtr<Dispatcher>> dispatchers; |
| { |
| fbl::AutoLock lock(&(GetDispatcherCoordinator().lock_)); |
| auto driver_state_iter = GetDispatcherCoordinator().drivers_.find(driver); |
| if (!driver_state_iter.IsValid()) { |
| return ZX_ERR_INVALID_ARGS; |
| } |
| driver_state_iter->GetDispatchers(dispatchers); |
| |
| auto driver_state_ref = fbl::RefPtr<DriverState>(&(*driver_state_iter)); |
| shutdown_callback = [driver, observer, driver_state_ref = std::move(driver_state_ref)]() { |
| observer->handler(driver, observer); |
| driver_state_ref->SetDriverShutdownComplete(); |
| }; |
| // Set the driver state so that attempts to create new dispatchers on the driver |
| // return an error. |
| // If there are no dispatchers to shutdown, we will post a task to call the callback |
| // immediately rather than set it in the driver state. |
| auto status = driver_state_iter->SetDriverShuttingDown( |
| dispatchers.empty() ? nullptr : std::move(shutdown_callback)); |
| if (status != ZX_OK) { |
| return status; |
| } |
| } |
| for (auto& dispatcher : dispatchers) { |
| async::PostTask(dispatcher->GetAsyncDispatcher(), [=]() { dispatcher->ShutdownAsync(); }); |
| } |
| if (dispatchers.empty()) { |
| auto thread_pool = GetDispatcherCoordinator().default_thread_pool(); |
| ZX_ASSERT(shutdown_callback); |
| // The dispatchers have already been shutdown and no calls to |NotifyDispatcherShutdown| |
| // will occur, so we need to schedule the handler to be called. |
| async::PostTask(thread_pool->loop()->dispatcher(), |
| [callback = std::move(shutdown_callback)]() mutable { callback(); }); |
| } |
| return ZX_OK; |
| } |
| |
| // static |
| void DispatcherCoordinator::DestroyAllDispatchers() { |
| std::vector<fbl::RefPtr<Dispatcher>> dispatchers; |
| { |
| fbl::AutoLock lock(&(GetDispatcherCoordinator().lock_)); |
| |
| for (auto& driver_state : GetDispatcherCoordinator().drivers_) { |
| // We should have already shutdown all dispatchers. |
| ZX_ASSERT(driver_state.CompletedShutdown()); |
| driver_state.GetShutdownDispatchers(dispatchers); |
| } |
| } |
| |
| for (auto& dispatcher : dispatchers) { |
| dispatcher->Destroy(); |
| } |
| |
| WaitUntilDispatchersDestroyed(); |
| } |
| |
| // static |
| zx_status_t DispatcherCoordinator::TokenRegister(zx_handle_t token, fdf_dispatcher_t* dispatcher, |
| fdf_token_t* handler) { |
| DispatcherCoordinator& coordinator = GetDispatcherCoordinator(); |
| return coordinator.token_manager_.Register(token, dispatcher, handler); |
| } |
| |
| // static |
| zx_status_t DispatcherCoordinator::TokenTransfer(zx_handle_t token, fdf_handle_t handle) { |
| DispatcherCoordinator& coordinator = GetDispatcherCoordinator(); |
| return coordinator.token_manager_.Transfer(token, handle); |
| } |
| |
| zx_status_t DispatcherCoordinator::AddDispatcher(fbl::RefPtr<Dispatcher> dispatcher) { |
| fbl::AutoLock lock(&lock_); |
| |
| // Check if we already have a driver state object. |
| auto driver_state = drivers_.find(dispatcher->owner()); |
| if (driver_state == drivers_.end()) { |
| auto new_driver_state = fbl::AdoptRef(new DriverState(dispatcher->owner())); |
| drivers_.insert(new_driver_state); |
| driver_state = drivers_.find(dispatcher->owner()); |
| } else { |
| // If the driver is shutting down, we should not allow creating new dispatchers. |
| if (driver_state->IsShuttingDown()) { |
| return ZX_ERR_BAD_STATE; |
| } |
| } |
| driver_state->AddDispatcher(dispatcher); |
| dispatcher->thread_pool()->OnDispatcherAdded(); |
| return ZX_OK; |
| } |
| |
| // static |
| uint32_t DispatcherCoordinator::GetThreadLimit(std::string_view scheduler_role) { |
| auto thread_pool = GetDispatcherCoordinator().default_thread_pool(); |
| if (scheduler_role != Dispatcher::ThreadPool::kNoSchedulerRole) { |
| auto result = GetDispatcherCoordinator().GetOrCreateThreadPool(scheduler_role); |
| if (result.is_error()) { |
| return 0; |
| } |
| thread_pool = *result; |
| } |
| return thread_pool->max_threads(); |
| } |
| |
| // static |
| zx_status_t DispatcherCoordinator::SetThreadLimit(std::string_view scheduler_role, |
| uint32_t max_threads) { |
| auto thread_pool = GetDispatcherCoordinator().default_thread_pool(); |
| if (scheduler_role != Dispatcher::ThreadPool::kNoSchedulerRole) { |
| auto result = GetDispatcherCoordinator().GetOrCreateThreadPool(scheduler_role); |
| if (result.is_error()) { |
| return 0; |
| } |
| thread_pool = *result; |
| } |
| return thread_pool->set_max_threads(max_threads); |
| } |
| |
| void DispatcherCoordinator::NotifyDispatcherShutdown( |
| Dispatcher& dispatcher, fdf_dispatcher_shutdown_observer_t* dispatcher_shutdown_observer) { |
| DriverState::DriverShutdownCallback shutdown_callback = nullptr; |
| fbl::RefPtr<Dispatcher> initial_dispatcher; |
| fbl::RefPtr<DriverState> driver_state; |
| |
| auto dec = fit::defer([&]() { |
| fbl::AutoLock lock(&lock_); |
| num_notify_shutdown_threads_--; |
| // The last dispatcher may have been destroyed during a shutdown handler, so check |
| // if all drivers have been destroyed. |
| if (AreAllDriversDestroyedLocked()) { |
| drivers_destroyed_event_.Broadcast(); |
| } |
| }); |
| |
| { |
| fbl::AutoLock lock(&lock_); |
| num_notify_shutdown_threads_++; |
| |
| auto driver_state_iter = drivers_.find(dispatcher.owner()); |
| ZX_ASSERT(driver_state_iter != drivers_.end()); |
| driver_state = fbl::RefPtr<DriverState>(&(*driver_state_iter)); |
| // Prepare to call the dispatcher's shutdown observer. |
| // We need to set the dispatcher as shutdown beforehand, in case the user tries to |
| // destroy the dispatcher. |
| driver_state->SetDispatcherShutdown(dispatcher); |
| driver_state->ObserverCallStarted(); |
| } |
| |
| // We need to call the dispatcher shutdown observer before calling the driver shutdown observer |
| // (if any). |
| if (dispatcher_shutdown_observer) { |
| // We should have already set up the driver call stack before calling |
| // |NotifyDispatcherShutdown|. |
| ZX_ASSERT(driver_context::GetCurrentDispatcher() == &dispatcher); |
| dispatcher_shutdown_observer->handler(static_cast<fdf_dispatcher_t*>(&dispatcher), |
| dispatcher_shutdown_observer); |
| } |
| { |
| fbl::AutoLock lock(&lock_); |
| driver_state->ObserverCallComplete(); |
| // Check if we are still waiting for dispatchers to complete shutting down. |
| if (!driver_state->CompletedShutdown()) { |
| return; |
| } |
| // Check that we are the last shutdown handler, so we don't |
| // call any driver shutdown observer before all dispatcher shutdown observers have returned. |
| if (driver_state->num_pending_observer_calls() > 0) { |
| return; |
| } |
| // We should take ownership of the driver shutdown callback before dropping the lock. |
| // This ensures we do not attempt to call it multiple times. |
| shutdown_callback = driver_state->take_driver_shutdown_callback(); |
| if (!shutdown_callback) { |
| // No one to notify. |
| return; |
| } |
| // There should always be an initial dispatcher, as the dispatcher is the one that calls |
| // |NotifyDispatcherShutdown|. |
| initial_dispatcher = driver_state->initial_dispatcher(); |
| ZX_ASSERT(initial_dispatcher != nullptr); |
| } |
| { |
| // Make sure the shutdown context looks like it is happening from the initial |
| // dispatcher's thread. |
| driver_context::PushDriver(initial_dispatcher->owner(), initial_dispatcher.get()); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| shutdown_callback(); |
| } |
| } |
| |
| void DispatcherCoordinator::RemoveDispatcher(Dispatcher& dispatcher) { |
| fbl::AutoLock lock(&lock_); |
| |
| auto driver_state = drivers_.find(dispatcher.owner()); |
| ZX_ASSERT(driver_state != drivers_.end()); |
| |
| auto thread_pool = dispatcher.thread_pool(); |
| thread_pool->OnDispatcherRemoved(dispatcher); |
| if (thread_pool->num_dispatchers() == 0) { |
| DestroyThreadPool(thread_pool); |
| } |
| |
| driver_state->RemoveDispatcher(dispatcher); |
| // If all dispatchers have been destroyed, the driver can be removed from the |
| // driver state map. |
| if (!driver_state->HasDispatchers()) { |
| drivers_.erase(driver_state); |
| } |
| |
| if (AreAllDriversDestroyedLocked()) { |
| drivers_destroyed_event_.Broadcast(); |
| } |
| } |
| |
| zx_status_t DispatcherCoordinator::Start() { |
| DispatcherCoordinator& coordinator = GetDispatcherCoordinator(); |
| fbl::AutoLock lock(&coordinator.lock_); |
| auto thread_pool = coordinator.default_thread_pool(); |
| if (thread_pool->num_threads() != 0) { |
| return ZX_ERR_BAD_STATE; |
| } |
| return thread_pool->AddThread(); |
| } |
| |
| // static |
| void DispatcherCoordinator::EnvReset() { |
| DispatcherCoordinator& coordinator = GetDispatcherCoordinator(); |
| coordinator.Reset(); |
| } |
| |
| void DispatcherCoordinator::Reset() { |
| { |
| fbl::AutoLock al(&lock_); |
| ZX_ASSERT(drivers_.is_empty()); |
| } |
| |
| default_thread_pool()->Reset(); |
| if (unmanaged_thread_pool_.has_value()) { |
| unmanaged_thread_pool_.value().Reset(); |
| } |
| unmanaged_thread_pool_.reset(); |
| } |
| |
| zx::result<Dispatcher::ThreadPool*> DispatcherCoordinator::GetOrCreateThreadPool( |
| std::string_view scheduler_role) { |
| fbl::AutoLock al(&lock_); |
| auto iter = role_to_thread_pool_.find(std::string(scheduler_role)); |
| if (iter != role_to_thread_pool_.end()) { |
| return zx::ok(&(*iter)); |
| } |
| auto thread_pool = std::make_unique<Dispatcher::ThreadPool>(scheduler_role); |
| zx_status_t status = thread_pool->AddThread(); |
| if (status != ZX_OK) { |
| return zx::error(status); |
| } |
| auto* thread_pool_ptr = thread_pool.get(); |
| role_to_thread_pool_.insert(std::move(thread_pool)); |
| return zx::ok(thread_pool_ptr); |
| } |
| |
| void DispatcherCoordinator::DestroyThreadPool(Dispatcher::ThreadPool* thread_pool) { |
| if (thread_pool == default_thread_pool()) { |
| return; |
| } |
| |
| if (unmanaged_thread_pool_.has_value() && thread_pool == &unmanaged_thread_pool_.value()) { |
| return; |
| } |
| |
| // We should immediately remove the thread pool from the coordinator |
| // map, so that a new driver doesn't try to use a destructing thread pool. |
| std::unique_ptr<Dispatcher::ThreadPool> owned_thread_pool = |
| role_to_thread_pool_.erase(*thread_pool); |
| ZX_ASSERT(owned_thread_pool != nullptr); |
| |
| // Ensure we are running on a default thread pool thread. |
| async::PostTask(default_thread_pool()->loop()->dispatcher(), |
| [thread_pool = std::move(owned_thread_pool)]() { |
| // This will destruct the thread pool and join with its threads. |
| }); |
| } |
| |
| void Dispatcher::ThreadPool::ThreadWakeupPrologue() { |
| if (driver_context::GetRoleProfileStatus().has_value()) { |
| // We have already attempted to set the role profile for the current thread. |
| return; |
| } |
| zx_status_t status = SetRoleProfile(); |
| if (status != ZX_OK) { |
| // Failing to set the role profile is not a fatal error. |
| LOGF(WARNING, "Failed to set scheduler role: %d\n", status); |
| } |
| driver_context::SetRoleProfileStatus(status); |
| } |
| |
| zx_status_t Dispatcher::ThreadPool::SetRoleProfile() { |
| #if __Fuchsia_API_level__ >= 20 |
| zx::result client_end = component::Connect<fuchsia_scheduler::RoleManager>(); |
| if (client_end.is_error()) { |
| return client_end.status_value(); |
| } |
| auto role_manager = *std::move(client_end); |
| |
| const zx_rights_t kRights = ZX_RIGHT_TRANSFER | ZX_RIGHT_MANAGE_THREAD; |
| zx::thread duplicate; |
| |
| zx_status_t status = zx::thread::self()->duplicate(kRights, &duplicate); |
| if (status != ZX_OK) { |
| return status; |
| } |
| |
| fidl::Arena arena; |
| auto request = |
| fuchsia_scheduler::wire::RoleManagerSetRoleRequest::Builder(arena) |
| .target(fuchsia_scheduler::wire::RoleTarget::WithThread(std::move(duplicate))) |
| .role(fuchsia_scheduler::wire::RoleName{fidl::StringView::FromExternal(scheduler_role())}) |
| .Build(); |
| auto result = fidl::WireCall(role_manager)->SetRole(request); |
| if (result.status() != ZX_OK) { |
| return result.status(); |
| } |
| if (!result.value().is_ok()) { |
| return result.value().error_value(); |
| } |
| return ZX_OK; |
| #endif |
| return ZX_ERR_NOT_SUPPORTED; |
| } |
| |
| zx_status_t Dispatcher::ThreadPool::AddThread() { |
| if (is_unmanaged_) { |
| // No-op for the unmanaged thread-pool. |
| return ZX_OK; |
| } |
| |
| fbl::AutoLock lock(&lock_); |
| dispatcher_threads_needed_++; |
| |
| // This avoids starting an additional thread when there is only 1 dispatcher and |
| // we have already started an initial thread for the thread pool. |
| // Note this check is before we have incremented |num_dispatchers_| for the current dispatcher. |
| // TODO(https://fxbug.dev/42076454): we should be able to remove the scheduler role check. |
| if ((scheduler_role_ != kNoSchedulerRole) && (num_threads_ > num_dispatchers_)) [[likely]] { |
| return ZX_OK; |
| } |
| |
| if (num_threads_ >= dispatcher_threads_needed_ || num_threads_ == max_threads_) { |
| return ZX_OK; |
| } |
| auto name = "fdf-dispatcher-thread-" + std::to_string(num_threads_); |
| if (scheduler_role() != kNoSchedulerRole) { |
| name += ":"; |
| name += scheduler_role(); |
| } |
| zx_status_t status = loop_.StartThread(name.c_str()); |
| if (status == ZX_OK) { |
| num_threads_++; |
| } |
| return status; |
| } |
| |
| zx_status_t Dispatcher::ThreadPool::RemoveThread() { |
| if (is_unmanaged_) { |
| // No-op for the unmanaged thread-pool. |
| return ZX_OK; |
| } |
| |
| fbl::AutoLock lock(&lock_); |
| ZX_ASSERT(dispatcher_threads_needed_ > 0); |
| dispatcher_threads_needed_--; |
| return ZX_OK; |
| } |
| |
| void Dispatcher::ThreadPool::OnDispatcherAdded() { |
| fbl::AutoLock lock(&lock_); |
| num_dispatchers_++; |
| } |
| |
| void Dispatcher::ThreadPool::OnDispatcherRemoved(Dispatcher& dispatcher) { |
| fbl::AutoLock lock(&lock_); |
| |
| // We need to check the process shared dispatcher matches as tests inject their own. |
| if (!is_unmanaged_ && dispatcher.allow_sync_calls() && |
| dispatcher.process_shared_dispatcher() == loop()->dispatcher()) { |
| ZX_ASSERT(dispatcher_threads_needed_ > 0); |
| dispatcher_threads_needed_--; |
| } |
| |
| ZX_ASSERT(num_dispatchers_ > 0); |
| num_dispatchers_--; |
| } |
| |
| void Dispatcher::ThreadPool::Reset() { |
| { |
| fbl::AutoLock lock(&lock_); |
| ZX_ASSERT_MSG(dispatcher_threads_needed_ <= 1, "Too many dispatcher threads to reset: %d", |
| dispatcher_threads_needed_); |
| } |
| |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| loop_.RunUntilIdle(); |
| |
| { |
| fbl::AutoLock al(&lock_); |
| max_threads_ = 10; |
| num_threads_ = 0; |
| dispatcher_threads_needed_ = 0; |
| num_dispatchers_ = 0; |
| } |
| } |
| |
| void Dispatcher::ThreadPool::CacheUnboundIrq(std::unique_ptr<Dispatcher::AsyncIrq> irq) { |
| fbl::AutoLock lock(&lock_); |
| cached_irqs_.AddIrqLocked(std::move(irq)); |
| } |
| |
| void Dispatcher::ThreadPool::OnThreadWakeup() { |
| uint32_t thread_irq_generation_id = driver_context::GetIrqGenerationId(); |
| // Check if we have already tracked this thread wakeup for the current generation of irqs. |
| // |cur_generatiom_id| is atomic - we do not acquire the lock here to avoid unnecessary lock |
| // contention per thread wakeup. If the generation id changes in the meanwhile, the next wakeuup |
| // of this thread can handle that. |
| if (thread_irq_generation_id == cached_irqs_.cur_generation_id()) { |
| return; |
| } |
| |
| fbl::AutoLock lock(&lock_); |
| // We should set this first, as |cached_irqs_.NewThreadWakeupLocked| may increment the generation |
| // id if it clears the current generation. |
| driver_context::SetIrqGenerationId(cached_irqs_.cur_generation_id()); |
| cached_irqs_.NewThreadWakeupLocked(num_threads_); |
| } |
| |
| void Dispatcher::ThreadPool::CachedIrqs::AddIrqLocked(std::unique_ptr<Dispatcher::AsyncIrq> irq) { |
| // Check if we are tracking a new generation of irqs. |
| if (cur_generation_.is_empty()) { |
| IncrementGenerationId(); |
| } |
| // We should only add to the current generation of cached irqs if no thread has woken up yet. |
| if (threads_wakeup_count_ == 0) { |
| cur_generation_.push_back(std::move(irq)); |
| } else { |
| next_generation_.push_back(std::move(irq)); |
| } |
| } |
| |
| void Dispatcher::ThreadPool::CachedIrqs::NewThreadWakeupLocked(uint32_t total_number_threads) { |
| threads_wakeup_count_++; |
| // If all threads have woken up since the current generation of cached irqs was populated, |
| // we can be sure that no threads have a pending irq packet that correspond to these unbound irqs. |
| if (threads_wakeup_count_ < total_number_threads) { |
| return; |
| } |
| // Drop the current generation of irqs, and begin tracking thread wakeups for the next generation. |
| cur_generation_ = std::move(next_generation_); |
| // If the next generation already has irqs, we need to increment the generation counter |
| // so that thread wakeups will be tracked. |
| if (cur_generation_.size() > 0) { |
| IncrementGenerationId(); |
| } |
| threads_wakeup_count_ = 0; |
| } |
| |
| } // namespace driver_runtime |