blob: e8f9abb57e785173f3d0ffa44821e6b5a2b8a7c3 [file]
// Copyright 2026 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "thread_pool.h"
#include <fidl/fuchsia.scheduler/cpp/wire.h>
#include <lib/component/incoming/cpp/protocol.h>
#include <lib/zx/thread.h>
#include "dispatcher_coordinator.h"
#include "dispatcher_internals.h"
#include "src/devices/lib/log/log.h"
namespace driver_runtime {
void ThreadPool::ThreadWakeupPrologue() {
zx_instant_mono_t entry_time = zx_clock_get_monotonic();
std::pair<zx_koid_t, std::atomic_int64_t*> task_entry_slot =
thread_context::GetTaskEntryTimeSlot();
if (unlikely(*task_entry_slot.second == -1)) {
// Store the pointer to our thread's entry time slot in the thread pool's list so that it can be
// checked for stalled threads by the environment.
//
// Thread safety note: While the pointer stored in `thread_entry_time_slots_` may outlive the
// runtime of the thread, and so the TLS variable it's stored in, the dispatcher will remove the
// thread pool from the list consulted to check for stalled tasks before scheduling the
// destruction of the pool, so it should not be accessed after the thread has stopped.
fbl::AutoLock guard(&lock_);
thread_entry_time_slots_.push_back(task_entry_slot);
}
task_entry_slot.second->store(entry_time);
if (++threads_entered_ == num_threads()) {
driver_runtime::GetDispatcherCoordinator().TriggerStallScanner();
}
if (scheduler_role_ != kNoSchedulerRole) {
if (thread_context::GetRoleProfileStatus().has_value()) {
// We have already attempted to set the role profile for the current thread.
return;
}
zx_status_t status = SetRoleProfile();
if (status != ZX_OK) {
// Failing to set the role profile is not a fatal error.
LOGF(WARNING, "Failed to set scheduler role: %d", status);
}
thread_context::SetRoleProfileStatus(status);
}
}
void ThreadPool::ThreadWakeupEpilogue() {
thread_context::GetTaskEntryTimeSlot().second->store(0);
--threads_entered_;
}
bool ThreadPool::ScanThreadsForStalls() {
fbl::AutoLock lock(&lock_);
zx::time current_time = zx::time(zx_clock_get_monotonic());
zx::time stalled_time = current_time - zx::msec(kStallTimeMs);
zx::time stale_time = current_time - zx::msec(kStaleTimeMs);
uint32_t stalled_threads = 0;
// TODO(468352723): Make these logs less annoying so they can be raised above DEBUG
for (auto& slot : thread_entry_time_slots_) {
zx::time timestamp(slot.second->load());
if (timestamp != zx::time(0) && timestamp < stalled_time) {
if (timestamp > stale_time) {
LOGF(DEBUG, "Found a thread (id: %u, role: '%s') that has been stalled for %ld ms",
slot.first, scheduler_role_.c_str(), (current_time - timestamp).to_msecs());
}
stalled_threads++;
}
}
if (num_threads_ > 0 && num_threads_ < MaxThreadsLocked() && stalled_threads >= num_threads_) {
// if we weren't already stalled try to spawn a new thread.
if (!stalled_) {
LOGF(
DEBUG,
"All threads on thread pool (role: '%s') are stalled (%d/%d). Spawning a new thread, if possible (max threads: %d).",
scheduler_role_.c_str(), stalled_threads, num_threads_, MaxThreadsLocked());
stalled_ = true;
AddThreadLocked();
}
return false;
} else {
// clear the stalled flag if it was set so we can warn if we become stuck again.
stalled_ = false;
return num_threads_ == threads_entered_.load();
}
}
zx_status_t ThreadPool::SetRoleProfile() {
#if FUCHSIA_API_LEVEL_AT_LEAST(20)
zx::result client_end = component::Connect<fuchsia_scheduler::RoleManager>();
if (client_end.is_error()) {
return client_end.status_value();
}
auto role_manager = *std::move(client_end);
const zx_rights_t kRights = ZX_RIGHT_TRANSFER | ZX_RIGHT_MANAGE_THREAD;
zx::thread duplicate;
zx_status_t status = zx::thread::self()->duplicate(kRights, &duplicate);
if (status != ZX_OK) {
return status;
}
fidl::Arena arena;
auto request =
fuchsia_scheduler::wire::RoleManagerSetRoleRequest::Builder(arena)
.target(fuchsia_scheduler::wire::RoleTarget::WithThread(std::move(duplicate)))
.role(fuchsia_scheduler::wire::RoleName{fidl::StringView::FromExternal(scheduler_role())})
.Build();
auto result = fidl::WireCall(role_manager)->SetRole(request);
if (result.status() != ZX_OK) {
return result.status();
}
if (!result.value().is_ok()) {
return result.value().error_value();
}
return ZX_OK;
#endif
return ZX_ERR_NOT_SUPPORTED;
}
zx_status_t ThreadPool::AddThread() {
if (is_unmanaged_) {
// No-op for the unmanaged thread-pool.
return ZX_OK;
}
fbl::AutoLock lock(&lock_);
return AddThreadLocked();
}
zx_status_t ThreadPool::AddThreadLocked() {
if (is_unmanaged_) {
// No-op for the unmanaged thread-pool.
return ZX_OK;
}
auto name = "fdf-dispatcher-thread-" + std::to_string(num_threads_);
if (scheduler_role() != kNoSchedulerRole) {
name += ":";
name += scheduler_role();
}
zx_status_t status = loop_.StartThread(name.c_str());
if (status == ZX_OK) {
num_threads_++;
}
return status;
}
zx_status_t ThreadPool::OnDispatcherSealed() {
fbl::AutoLock lock(&lock_);
ZX_ASSERT(allow_sync_call_dispatchers_ > 0);
allow_sync_call_dispatchers_--;
return ZX_OK;
}
zx_status_t ThreadPool::OnDispatcherAdded(Dispatcher& dispatcher) {
fbl::AutoLock lock(&lock_);
if (dispatcher.allow_sync_calls()) {
if ((allow_sync_call_dispatchers_ + 1) > thread_limit_) {
LOGF(WARNING,
"Dispatcher that allows sync calls created when already at thread pool thread limit");
return ZX_ERR_NO_RESOURCES;
}
++allow_sync_call_dispatchers_;
}
++num_dispatchers_;
if (DispatcherCoordinator::dynamic_thread_spawning()) {
// only start a new thread if we're not dynamically managing threads
return ZX_OK;
}
// We only want to spawn a thread if we don't have more threads than we should need to service
// the types of dispatchers we have. See |MaxThreadsLocked| for the criteria we use to decide
// on the maximum thread count.
if (num_threads_ >= MaxThreadsLocked()) [[likely]] {
LOGF(
DEBUG,
"Not spawning thread on thread pool (role: %s) because it would add more threads (%d) than dispatchers (%d) or max threads (%d)",
scheduler_role_.c_str(), num_threads_, num_dispatchers_, MaxThreadsLocked());
return ZX_OK;
}
return AddThreadLocked();
}
void ThreadPool::OnDispatcherRemoved(Dispatcher& dispatcher) {
fbl::AutoLock lock(&lock_);
if (dispatcher.allow_sync_calls()) {
ZX_ASSERT(allow_sync_call_dispatchers_ > 0);
allow_sync_call_dispatchers_--;
}
ZX_ASSERT(num_dispatchers_ > 0);
num_dispatchers_--;
}
void ThreadPool::Reset() {
{
fbl::AutoLock lock(&lock_);
ZX_ASSERT_MSG(allow_sync_call_dispatchers_ == 0,
"Resetting thread pool with sync dispatchers still active: %d",
allow_sync_call_dispatchers_);
}
loop_.Quit();
loop_.JoinThreads();
loop_.ResetQuit();
loop_.RunUntilIdle();
{
fbl::AutoLock al(&lock_);
thread_limit_ = kDefaultThreadLimit;
num_threads_ = 0;
allow_sync_call_dispatchers_ = 0;
num_dispatchers_ = 0;
}
}
void ThreadPool::CacheUnboundIrq(std::unique_ptr<AsyncIrq> irq) {
fbl::AutoLock lock(&lock_);
cached_irqs_.AddIrqLocked(std::move(irq));
}
void ThreadPool::OnThreadWakeup() {
uint32_t thread_irq_generation_id = thread_context::GetIrqGenerationId();
// Check if we have already tracked this thread wakeup for the current generation of irqs.
// |cur_generatiom_id| is atomic - we do not acquire the lock here to avoid unnecessary lock
// contention per thread wakeup. If the generation id changes in the meanwhile, the next wakeuup
// of this thread can handle that.
if (thread_irq_generation_id == cached_irqs_.cur_generation_id()) {
return;
}
fbl::AutoLock lock(&lock_);
// We should set this first, as |cached_irqs_.NewThreadWakeupLocked| may increment the generation
// id if it clears the current generation.
thread_context::SetIrqGenerationId(cached_irqs_.cur_generation_id());
cached_irqs_.NewThreadWakeupLocked(num_threads_);
}
void ThreadPool::CachedIrqs::AddIrqLocked(std::unique_ptr<AsyncIrq> irq) {
// Check if we are tracking a new generation of irqs.
if (cur_generation_.is_empty()) {
IncrementGenerationId();
}
// We should only add to the current generation of cached irqs if no thread has woken up yet.
if (threads_wakeup_count_ == 0) {
cur_generation_.push_back(std::move(irq));
} else {
next_generation_.push_back(std::move(irq));
}
}
void ThreadPool::CachedIrqs::NewThreadWakeupLocked(uint32_t total_number_threads) {
threads_wakeup_count_++;
// If all threads have woken up since the current generation of cached irqs was populated,
// we can be sure that no threads have a pending irq packet that correspond to these unbound irqs.
if (threads_wakeup_count_ < total_number_threads) {
return;
}
// Drop the current generation of irqs, and begin tracking thread wakeups for the next generation.
cur_generation_ = std::move(next_generation_);
// If the next generation already has irqs, we need to increment the generation counter
// so that thread wakeups will be tracked.
if (cur_generation_.size() > 0) {
IncrementGenerationId();
}
threads_wakeup_count_ = 0;
}
} // namespace driver_runtime