// Copyright 2018 The Fuchsia Authors
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file or at
#include "kernel/owned_wait_queue.h"
#include <lib/counters.h>
#include <lib/fit/defer.h>
#include <zircon/compiler.h>
#include <arch/mp.h>
#include <fbl/algorithm.h>
#include <fbl/auto_lock.h>
#include <fbl/enum_bits.h>
#include <kernel/auto_preempt_disabler.h>
#include <kernel/scheduler.h>
#include <kernel/wait_queue_internal.h>
#include <ktl/algorithm.h>
#include <ktl/bit.h>
#include <ktl/type_traits.h>
#include <object/thread_dispatcher.h>
#include <ktl/enforce.h>
// Notes on the defined kernel counters.
// Adjustments (aka promotions and demotions)
// The number of times that a thread either gained or lost inherited profile
// pressure as a result of a PI event.
// Note that the number of promotions does not have to equal the number of
// demotions in the system. For example, a thread could slowly gain weight as
// fair scheduled threads join a wait queue it owns, then suddenly drop
// back down to its base profile when the thread releases ownership of the
// queue.
// In addition to simple promotions and demotions, the number of threads whose
// effective profile changed as a result of another thread's base profile
// changing is also tracked, although whether these changes amount to a
// promotion or a demotion is not computed.
// Max chain traversal.
// The maximum traversed length of a PI chain during execution of the propagation
// algorithm.
// The length of a propagation chain is defined as the number of nodes in an
// inheritance graph which are affected by a propagation event. For example, if
// a thread (T1) blocks in an owned wait queue (OWQ1), adding an edge between
// them, and the wait queue has no owner, then the propagation event's chain
// length is 1. This is regardless of whether or not the blocking thread
// currently owns one or more wait queues upstream from it. The OWQ1's IPVs were
// updated, but no other nodes in the graph were affected. If the OWQ1 had been
// owned by a running/runnable thread (T2), then the chain length of the
// operation would have been two instead, since both OWQ1 and T2 needed to be
// visited and updated.
KCOUNTER(pi_promotions, "kernel.pi.adj.promotions")
KCOUNTER(pi_demotions, "kernel.pi.adj.demotions")
KCOUNTER(pi_bp_changed, "kernel.pi.adj.bp_changed")
KCOUNTER_DECLARE(max_pi_chain_traverse, "kernel.pi.max_chain_traverse", Max)
namespace {
enum class ChainLengthTrackerOpt : uint32_t {
None = 0,
RecordMaxLength = 1,
EnforceLengthGuard = 2,
// By default, we always maintain the max length counter, and we enforce the
// length guard in everything but release builds.
constexpr ChainLengthTrackerOpt kEnablePiChainGuards =
((LK_DEBUGLEVEL > 0) ? ChainLengthTrackerOpt::EnforceLengthGuard : ChainLengthTrackerOpt::None);
constexpr ChainLengthTrackerOpt kDefaultChainLengthTrackerOpt =
ChainLengthTrackerOpt::RecordMaxLength | kEnablePiChainGuards;
template <ChainLengthTrackerOpt Options = kDefaultChainLengthTrackerOpt>
class ChainLengthTracker {
using Opt = ChainLengthTrackerOpt;
ChainLengthTracker() {
if constexpr (Options != Opt::None) {
nodes_visited_ = 0;
~ChainLengthTracker() {
if constexpr ((Options & Opt::EnforceLengthGuard) != Opt::None) {
// Note, the only real reason that this is an accurate max at all is
// because the counter is effectively protected by the thread lock
// (although there is no real good way to annotate that fact). When we
// finally remove the thread lock, we are going to need to do better than
// this.
auto old = max_pi_chain_traverse.ValueCurrCpu();
if (nodes_visited_ > old) {
void NodeVisited() {
if constexpr (Options != Opt::None) {
if constexpr ((Options & Opt::EnforceLengthGuard) != Opt::None) {
constexpr uint32_t kMaxChainLen = 2048;
ASSERT_MSG(nodes_visited_ <= kMaxChainLen, "visited %u", nodes_visited_);
uint32_t nodes_visited_ = 0;
using AddSingleEdgeTag = decltype(OwnedWaitQueue::AddSingleEdgeOp);
using RemoveSingleEdgeTag = decltype(OwnedWaitQueue::RemoveSingleEdgeOp);
using BaseProfileChangedTag = decltype(OwnedWaitQueue::BaseProfileChangedOp);
inline bool IpvsAreConsequential(const SchedulerState::InheritedProfileValues* ipvs) {
return (ipvs != nullptr) && ((ipvs->total_weight != SchedWeight{0}) ||
(ipvs->uncapped_utilization != SchedUtilization{0}));
template <typename UpstreamType, typename DownstreamType>
void Propagate(UpstreamType& upstream, DownstreamType& downstream, AddSingleEdgeTag)
TA_REQ(thread_lock, preempt_disabled_token) {
Scheduler::JoinNodeToPiGraph(upstream, downstream);
if constexpr (ktl::is_same_v<DownstreamType, Thread>) {
template <typename UpstreamType, typename DownstreamType>
void Propagate(UpstreamType& upstream, DownstreamType& downstream, RemoveSingleEdgeTag)
TA_REQ(thread_lock, preempt_disabled_token) {
Scheduler::SplitNodeFromPiGraph(upstream, downstream);
if constexpr (ktl::is_same_v<DownstreamType, Thread>) {
template <typename UpstreamType, typename DownstreamType>
void Propagate(UpstreamType& upstream, DownstreamType& downstream, BaseProfileChangedTag)
TA_REQ(thread_lock, preempt_disabled_token) {
Scheduler::UpstreamThreadBaseProfileChanged(upstream, downstream);
if constexpr (ktl::is_same_v<DownstreamType, Thread>) {
} // namespace
OwnedWaitQueue::~OwnedWaitQueue() {
// Something is very very wrong if we have been allowed to destruct while we
// still have an owner.
DEBUG_ASSERT(owner_ == nullptr);
void OwnedWaitQueue::DisownAllQueues(Thread* t) {
// It is important that this thread not be blocked by any other wait queues
// during this operation. If it was possible for the thread to be blocked,
// we would need to update all of the PI chain bookkeeping too.
DEBUG_ASSERT(t->wait_queue_state_.blocking_wait_queue_ == nullptr);
for (auto& q : t->wait_queue_state_.owned_wait_queues_) {
DEBUG_ASSERT(q.owner_ == t);
q.owner_ = nullptr;
void OwnedWaitQueue::WakeThreadsInternal(uint32_t wake_count, zx_time_t now,
Hook on_thread_wake_hook) {
DEBUG_ASSERT(magic() == kOwnedMagic);
auto post_op_validate = fit::defer([this]() { ValidateSchedStateStorage(); });
// Start by removing any existing owner. We will either select a new owner
// based on what the user-provided hook tells us to do, or we should end up
// with no owner.
uint32_t woken = 0;
while (woken < wake_count) {
// Consider the thread that the queue considers to be the most important to
// wake right now. If there are no threads left in the queue, then we are
// done.
Thread* t = Peek(now);
if (t == nullptr) {
// Call the user supplied hook and let them decide what to do with this
// thread (updating their own bookkeeping in the process)
using Action = Hook::Action;
Action action = on_thread_wake_hook(t);
// If we should stop, just return. We are done.
if (action == Action::Stop) {
// All other choices involve waking up this thread, so go ahead and do that now.
// If this is a wake-and-assign-owner operation, then the number of threads
// we have woken so far should be exactly one.
DEBUG_ASSERT((action != Action::SelectAndAssignOwner) || (woken == 1));
// Dequeue the thread and propagate the PI consequences. Do not actually unblock the
// thread from the scheduler's perspective just yet.
DequeueThread(t, ZX_OK);
BeginPropagate(*t, *this, RemoveSingleEdgeOp);
// No go ahead and unblock the thread we just removed from the wait queue.
// TODO(johngro) : instead of unblocking the thread now, it might be better
// to put it on a local list, then batch unblock all of the threads we woke
// at the end of everything.
// If this thread was selected to become the new queue owner, and it still
// has waiters, make the assignment and we are done.
if (action == Action::SelectAndAssignOwner) {
if (!IsEmpty()) {
// Looks like this is just a simple wake operation. Keep going as long as
// there are more threads to wake.
DEBUG_ASSERT(action == Action::SelectAndKeepGoing);
// If there are no threads left waiting in this queue, then it cannot have any owner.
DEBUG_ASSERT((owner_ == nullptr) || !IsEmpty());
void OwnedWaitQueue::ValidateSchedStateStorageUnconditional() {
if (inherited_scheduler_state_storage_ != nullptr) {
bool found = false;
for (const Thread& t : this->collection_.threads()) {
if (&t.wait_queue_state().inherited_scheduler_state_storage_ ==
inherited_scheduler_state_storage_) {
found = true;
} else {
SchedulerState::InheritedProfileValues OwnedWaitQueue::SnapshotThreadIpv(Thread& thread) {
const SchedulerState& tss = thread.scheduler_state();
SchedulerState::InheritedProfileValues ret = tss.inherited_profile_values_;
const SchedulerState::BaseProfile& bp = tss.base_profile_;
if (bp.inheritable) {
if (bp.discipline == SchedDiscipline::Fair) {
ret.total_weight += bp.fair.weight;
} else {
DEBUG_ASSERT(ret.min_deadline != SchedDuration{0});
ret.uncapped_utilization += bp.deadline.utilization;
ret.min_deadline = ktl::min(ret.min_deadline, bp.deadline.deadline_ns);
return ret;
void OwnedWaitQueue::ApplyIpvDeltaToThread(const SchedulerState::InheritedProfileValues* old_ipv,
const SchedulerState::InheritedProfileValues* new_ipv,
Thread& thread) {
DEBUG_ASSERT((old_ipv != nullptr) || (new_ipv != nullptr));
SchedWeight weight_delta = new_ipv ? new_ipv->total_weight : SchedWeight{0};
SchedUtilization util_delta = new_ipv ? new_ipv->uncapped_utilization : SchedUtilization{0};
if (old_ipv != nullptr) {
weight_delta -= old_ipv->total_weight;
util_delta -= old_ipv->uncapped_utilization;
SchedulerState& tss = thread.scheduler_state();
SchedulerState::InheritedProfileValues& thread_ipv = tss.inherited_profile_values_;
thread_ipv.total_weight += weight_delta;
thread_ipv.uncapped_utilization += util_delta;
DEBUG_ASSERT(thread_ipv.total_weight >= SchedWeight{0});
DEBUG_ASSERT(thread_ipv.uncapped_utilization >= SchedUtilization{0});
// If a set of IPVs is going away, and the value which is going away was the
// minimum, then we need to recompute the new minimum by checking the
// minimum across all of this thread's owned wait queues.
// TODO(johngro): Consider keeping the set of owned wait queues as a WAVL
// tree, indexed by minimum relative deadline, so that this can be
// maintained in O(1) time instead of O(N).
if ((new_ipv != nullptr) && (new_ipv->min_deadline <= thread_ipv.min_deadline)) {
thread_ipv.min_deadline = ktl::min(thread_ipv.min_deadline, new_ipv->min_deadline);
} else {
if ((old_ipv != nullptr) && (old_ipv->min_deadline <= thread_ipv.min_deadline)) {
SchedDuration new_min_deadline{SchedDuration::Max()};
for (auto& other_queue : thread.wait_queue_state().owned_wait_queues_) {
if (!other_queue.IsEmpty()) {
DEBUG_ASSERT(other_queue.inherited_scheduler_state_storage_ != nullptr);
const SchedulerState::InheritedProfileValues& other_ipvs =
new_min_deadline = ktl::min(new_min_deadline, other_ipvs.min_deadline);
thread_ipv.min_deadline = new_min_deadline;
if (new_ipv != nullptr) {
thread_ipv.min_deadline = ktl::min(thread_ipv.min_deadline, new_ipv->min_deadline);
DEBUG_ASSERT(thread_ipv.min_deadline > SchedDuration{0});
void OwnedWaitQueue::ApplyIpvDeltaToOwq(const SchedulerState::InheritedProfileValues* old_ipv,
const SchedulerState::InheritedProfileValues* new_ipv,
OwnedWaitQueue& owq) {
SchedWeight weight_delta = new_ipv ? new_ipv->total_weight : SchedWeight{0};
SchedUtilization util_delta = new_ipv ? new_ipv->uncapped_utilization : SchedUtilization{0};
if (old_ipv != nullptr) {
weight_delta -= old_ipv->total_weight;
util_delta -= old_ipv->uncapped_utilization;
DEBUG_ASSERT(owq.inherited_scheduler_state_storage_ != nullptr);
SchedulerState::WaitQueueInheritedSchedulerState& iss = *owq.inherited_scheduler_state_storage_;
iss.ipvs.total_weight += weight_delta;
iss.ipvs.uncapped_utilization += util_delta;
iss.ipvs.min_deadline = owq.collection_.MinInheritableRelativeDeadline();
DEBUG_ASSERT(iss.ipvs.total_weight >= SchedWeight{0});
DEBUG_ASSERT(iss.ipvs.uncapped_utilization >= SchedUtilization{0});
DEBUG_ASSERT(iss.ipvs.min_deadline > SchedDuration{0});
bool OwnedWaitQueue::CheckForCycle(const OwnedWaitQueue* owq, const Thread* owner_thread,
const Thread* blocking_thread) {
// If the owner of OWQ is being set to nullptr, then there cannot be a cycle.
if (owner_thread == nullptr) {
return false;
// ASSERT if this operation goes on for way too long, but do not record this
// as a chain traversal for kcounter purposes.
ChainLengthTracker<kEnablePiChainGuards> inf_loop_guard;
// Trace the downstream path from the proposed owner thread. If it leads back
// to either |owq| or |blocking_thread|, then we would have a cycle if
// |blocking_thread| blocked in |owq|, and |owner_thread| were to become the
// owner.
const Thread* thread_iter = owner_thread;
while (true) {
// Peek at the wait queue which is blocking this thread.
const OwnedWaitQueue* next_owq =
// If there is no blocking wait queue, or it is not an owned wait queue,
// then there is no cycle and we are finished.
if (next_owq == nullptr) {
return false;
// If the OWQ blocking this thread is the OWQ involved in cycle detection,
// then we have found a cycle.
if (next_owq == owq) {
return true;
// Move on to the next thread. If there is no next thread, then there is no
// cycle. Alternatively, if we are considering blocking a thread, and this
// is the same thread as the OWQ's owner, then we have detected a cycle.
thread_iter = next_owq->owner_;
if (thread_iter == nullptr) {
return false;
if (thread_iter == blocking_thread) {
return true;
template <OwnedWaitQueue::PropagateOp OpType>
void OwnedWaitQueue::BeginPropagate(Thread& upstream_node, OwnedWaitQueue& downstream_node,
PropagateOpTag<OpType> op) {
// When needed, base profile changes will directly call FinishPropagate.
static_assert(OpType != PropagateOp::BaseProfileChanged);
SchedulerState::InheritedProfileValues ipv_snapshot;
// Are we starting from a thread during an edge remove operation? If so,
// and we were the last thread to leave the queue, then there is no longer
// any IPV storage for our downstream wait queue which needs to be updated.
// If the wait queue has no owner either, then we are done with propagation.
if constexpr (OpType == PropagateOp::RemoveSingleEdge) {
if (downstream_node.IsEmpty() && (downstream_node.owner_ == nullptr)) {
ipv_snapshot = SnapshotThreadIpv(upstream_node);
if constexpr (OpType == PropagateOp::RemoveSingleEdge) {
FinishPropagate(upstream_node, downstream_node, nullptr, &ipv_snapshot, op);
} else if constexpr (OpType == PropagateOp::AddSingleEdge) {
FinishPropagate(upstream_node, downstream_node, &ipv_snapshot, nullptr, op);
template <OwnedWaitQueue::PropagateOp OpType>
void OwnedWaitQueue::BeginPropagate(OwnedWaitQueue& upstream_node, Thread& downstream_node,
PropagateOpTag<OpType> op) {
// When needed, base profile changes will directly call FinishPropagate.
static_assert(OpType != PropagateOp::BaseProfileChanged);
if constexpr (OpType == PropagateOp::AddSingleEdge) {
// If we are adding an owner to this OWQ, we should be able to assert that
// it does not currently have one.
DEBUG_ASSERT(upstream_node.owner_ == nullptr);
upstream_node.owner_ = &downstream_node;
} else if constexpr (OpType == PropagateOp::RemoveSingleEdge) {
// If we are removing the owner of this OWQ, or we are updating the base
// profile of the immediately upstream thread, we should be able to assert
// that the owq's current owner is the thread passed to this method.
DEBUG_ASSERT(upstream_node.owner_ == &downstream_node);
upstream_node.owner_ = nullptr;
// If the OWQ we are starting from has no active waiters, then there are no
// IPV deltas to propagate. After updating the links, we are finished.
if (upstream_node.IsEmpty()) {
DEBUG_ASSERT(upstream_node.inherited_scheduler_state_storage_ != nullptr);
SchedulerState::InheritedProfileValues& ipvs =
if constexpr (OpType == PropagateOp::RemoveSingleEdge) {
FinishPropagate(upstream_node, downstream_node, nullptr, &ipvs, op);
} else if constexpr (OpType == PropagateOp::AddSingleEdge) {
FinishPropagate(upstream_node, downstream_node, &ipvs, nullptr, op);
template <OwnedWaitQueue::PropagateOp OpType, typename UpstreamNodeType,
typename DownstreamNodeType>
void OwnedWaitQueue::FinishPropagate(UpstreamNodeType& upstream_node,
DownstreamNodeType& downstream_node,
const SchedulerState::InheritedProfileValues* added_ipv,
const SchedulerState::InheritedProfileValues* lost_ipv,
PropagateOpTag<OpType> op) {
// Propagation must start from a(n) (OWQ|Thread) and proceed to a(n) (Thread|OWQ).
static_assert((ktl::is_same_v<UpstreamNodeType, OwnedWaitQueue> &&
ktl::is_same_v<DownstreamNodeType, Thread>) ||
(ktl::is_same_v<UpstreamNodeType, Thread> &&
ktl::is_same_v<DownstreamNodeType, OwnedWaitQueue>),
"Bad types for FinishPropagate. Must be either OWQ -> Thread, or Thread -> OWQ");
constexpr bool kStartingFromThread = ktl::is_same_v<UpstreamNodeType, Thread>;
// If neither the IPVs we are adding, nor the IPVs we are removing, are
// "consequential" (meaning, the have either some fair weight, or some
// deadline capacity, or both), then we can just get out now. There are no
// effective changes to propagate.
if (!IpvsAreConsequential(added_ipv) && !IpvsAreConsequential(lost_ipv)) {
// Set up the pointers we will use as iterators for traversing the inheritance
// graph. Snapshot the starting node's current inherited profile values which
// we need to propagate.
OwnedWaitQueue* owq_iter;
Thread* thread_iter;
if constexpr (kStartingFromThread) {
thread_iter = &upstream_node;
owq_iter = &downstream_node;
// Is this a base profile changed operation? If so, we should already have
// a link between our thread and the downstream owned wait queue. Go ahead
// and ASSERT this. We don't need to bother to check the other
// combinations; those have already been asserted during
// BeginPropagate.
if constexpr (OpType == PropagateOp::BaseProfileChanged) {
thread_iter->wait_queue_state().blocking_wait_queue_ == static_cast<WaitQueue*>(owq_iter),
"blocking wait queue %p owq_iter %p",
thread_iter->wait_queue_state().blocking_wait_queue_, static_cast<WaitQueue*>(owq_iter));
} else {
// Base profile changes should never start from OWQs.
static_assert(OpType != PropagateOp::BaseProfileChanged);
owq_iter = &upstream_node;
thread_iter = &downstream_node;
if constexpr (OpType == PropagateOp::AddSingleEdge) {
DEBUG_ASSERT(added_ipv != nullptr);
DEBUG_ASSERT(lost_ipv == nullptr);
} else if constexpr (OpType == PropagateOp::RemoveSingleEdge) {
DEBUG_ASSERT(added_ipv == nullptr);
DEBUG_ASSERT(lost_ipv != nullptr);
} else if constexpr (OpType == PropagateOp::BaseProfileChanged) {
"Base profile propagation changes may only start from Threads, not OwnedWaitQueues");
DEBUG_ASSERT(added_ipv != nullptr);
DEBUG_ASSERT(lost_ipv != nullptr);
} else {
static_assert(OpType != OpType, "Unrecognized propagation operation");
// When we have finally finished updating everything, make sure to update
// our max traversal statistic.
ChainLengthTracker len_tracker;
// OK - we are finally ready to get to work. Use a slightly-evil(tm) goto in
// order to start our propagate loop with the proper phase (either
// thread-to-OWQ first, or OWQ-to-thread first)
if constexpr (kStartingFromThread == false) {
goto start_from_owq;
} else if constexpr (OpType == PropagateOp::RemoveSingleEdge) {
// Are we starting from a thread during an edge remove operation? If so,
// and if we were the last thread to leave our wait queue, then we don't
// need to bother to update its IPVs anymore (it cannot have any IPVs if it
// has no waiters), so we can just skip it an move on to its owner thread.
// Additionally, we know that it must have an owner thread at this point in
// time. If if didn't, BeginPropagate would have already bailed out.
if (owq_iter->IsEmpty()) {
thread_iter = owq_iter->owner_;
DEBUG_ASSERT(thread_iter != nullptr);
goto start_from_owq;
while (true) {
// We should not be here if this OWQ has no waiters, or if we have not
// found a place to store our ISS. That special case was handled above.
DEBUG_ASSERT(owq_iter->Count() > 0);
DEBUG_ASSERT(owq_iter->inherited_scheduler_state_storage_ != nullptr);
// Record what our deadline pressure was before we accumulate the upstream
// pressure into this node. We will need it to reason about what to do
// with our dynamic scheduling parameters after IPV accumulation.
// OWQs which are receiving deadline pressure have defined dynamic
// scheduler parameters (start_time, end_time, time_slice_ns) finish
// times, which they inherited from their upstream deadline threads. Fair
// threads do not have things like a defined start time while they are
// blocked, they will get a new set of dynamic parameters the next time
// they unblock and are scheduled to run.
// After we have finished accumulating the IPV deltas, we a few different
// potential situations:
// 1) The utilization (deadline pressure) has not changed. Therefore,
// nothing about the dynamic parameters needs to change either.
// 2) The utilization has changed, and it was 0 before. This is the first
// deadline thread to join the queue, so we can just copy its dynamic
// parameters.
// 3) The utilization has changed, and it is now 0. The final deadline
// thread has left this queue, and our dynamic params are now
// undefined. Strictly speaking, we don't have to do anything, but in
// builds with extra checks enabled, we reset the dynamic parameters
// so that they have a deterministic value.
// 4) The utilization has changed, but it was not zero before, and isn't
// zero now either. We call into the scheduler code to compute what
// the new dynamic parameters should be.
SchedulerState::WaitQueueInheritedSchedulerState& owq_iss =
const SchedUtilization utilization_before = owq_iss.ipvs.uncapped_utilization;
ApplyIpvDeltaToOwq(lost_ipv, added_ipv, *owq_iter);
const SchedUtilization utilization_after = owq_iss.ipvs.uncapped_utilization;
if (utilization_before != utilization_after) {
if (utilization_before == SchedUtilization{0}) {
// First deadline thread just arrived, copy its parameters.
const SchedulerState& ss = thread_iter->scheduler_state();
owq_iss.start_time = ss.start_time_;
owq_iss.finish_time = ss.finish_time_;
owq_iss.time_slice_ns = ss.time_slice_ns_;
} else if (utilization_after == SchedUtilization{0}) {
// Last deadline thread just left, reset our dynamic params.
} else {
// The overall utilization has changed, but there was deadline
// pressure both before and after. We need to recompute the dynamic
// scheduler parameters.
Propagate(upstream_node, *owq_iter, op);
// If we no longer have any deadline pressure, our parameters should now
// be reset to initialization defaults.
if (utilization_after == SchedUtilization{0}) {
// Advance to the next thread, if any. If there isn't another thread,
// then we are finished, simply break out of the propagation loop.
thread_iter = owq_iter->owner_;
if (thread_iter == nullptr) {
// clang-format off
[[maybe_unused]] start_from_owq:
// clang-format on
// Propagate from the current owq_iter to the current thread_iter.
// Apply the change in pressure to the next thread in the chain.
ApplyIpvDeltaToThread(lost_ipv, added_ipv, *thread_iter);
Propagate(upstream_node, *thread_iter, op);
owq_iter = DowncastToOwq(thread_iter->wait_queue_state().blocking_wait_queue_);
if (owq_iter == nullptr) {
void OwnedWaitQueue::SetThreadBaseProfileAndPropagate(Thread& thread,
const SchedulerState::BaseProfile& profile) {
AutoEagerReschedDisabler eager_resched_disabler;
SchedulerState& state = thread.scheduler_state();
SchedulerState::InheritedProfileValues old_ipvs;
OwnedWaitQueue* owq = DowncastToOwq(thread.wait_queue_state().blocking_wait_queue_);
// If our thread is blocked in an owned wait queue, we need observe the
// thread's transmitted IPVs before and after the base profile change in order
// to properly handle propagation.
if (owq != nullptr) {
old_ipvs = SnapshotThreadIpv(thread);
// Regardless of the state of the thread whose base profile has changed, we
// need to update the base profile and let the scheduler know. The scheduler
// code will handle:
// 1) Updating our effective profile
// 2) Repositioning us in our wait queue (if we are blocked)
// 3) Updating our dynamic scheduling parameters (if we are either runnable or
// a blocked deadline thread)
// 4) Updating the scheduler's state (if we happen to be a runnable thread).
state.base_profile_ = profile;
// Now, if we are blocked in an owned wait queue, propagate the consequences
// of the base profile change downstream.
if (owq != nullptr) {
SchedulerState::InheritedProfileValues new_ipvs = SnapshotThreadIpv(thread);
FinishPropagate(thread, *owq, &new_ipvs, &old_ipvs, BaseProfileChangedOp);
zx_status_t OwnedWaitQueue::AssignOwner(Thread* new_owner) {
DEBUG_ASSERT(magic() == kOwnedMagic);
// If there is a change of owners, and the change would produce a cycle, then
// fail the call instead.
if ((owner_ != new_owner) && CheckForCycle(this, new_owner)) {
return ZX_OK;
void OwnedWaitQueue::AssignOwnerInternal(Thread* new_owner) {
// If there is no change, then we are done already.
if (owner_ == new_owner) {
// Start by releasing the old owner (if any) and propagating the PI effects.
if (owner_ != nullptr) {
BeginPropagate(*this, *owner_, RemoveSingleEdgeOp);
// If there is a new owner to assign, do so now and propagate the PI effects.
if (new_owner != nullptr) {
BeginPropagate(*this, *new_owner, AddSingleEdgeOp);
zx_status_t OwnedWaitQueue::BlockAndAssignOwner(const Deadline& deadline, Thread* new_owner,
ResourceOwnership resource_ownership,
Interruptible interruptible) {
Thread* current_thread = Thread::Current::Get();
DEBUG_ASSERT(magic() == kOwnedMagic);
DEBUG_ASSERT(current_thread->state() == THREAD_RUNNING);
// TODO(johngro) : when we start to use distributed locking, start by
// obtaining the queue lock for the current thread, followed by the queue lock
// for this OWQ. Then validate that the operation is still technically legal.
if (CheckForCycle(this, new_owner, current_thread)) {
// TODO(johngro) : Change this when we reach the point that we are ready to
// propagate errors in the case that someone attempts to create a cycle.
// For now, we avoid the cycle by forcing the OWQ to become unowned.
#if 0
new_owner = nullptr;
// If the there is a change of owner happening, start by releasing the current
// owner.
const bool owner_changed = (owner_ != new_owner);
if (owner_changed) {
// Perform the first half of the BlockEtc operation. This will attempt to add
// an edge between the thread which is blocking, and the OWQ it is blocking in
// (this). We know that this cannot produce a cycle in the graph because we
// know that this OWQ does not currently have an owner.
// If the block preamble fails, then the state of the actual wait queue is
// unchanged and we can just get out now.
zx_status_t res = BlockEtcPreamble(deadline, 0u, resource_ownership, interruptible);
if (res != ZX_OK) {
// There are only three reasons why the pre-wait operation should ever fail.
// 1) ZX_ERR_TIMED_OUT : The timeout has already expired.
// 2) ZX_ERR_INTERNAL_INTR_KILLED : The thread has been signaled for death.
// 3) ZX_ERR_INTERNAL_INTR_RETRY : The thread has been signaled for suspend.
// No matter what, we are not actually going to block in the wait queue.
// Even so, however, we still need to assign the owner to what was
// requested by the thread. Just because we didn't manage to block does
// not mean that ownership assignment gets skipped.
if (owner_changed) {
return res;
// We succeeded in placing our thread into our wait collection. Make sure we
// update the scheduler state storage location if needed, then propagate the
// effects down the chain.
BeginPropagate(*current_thread, *this, AddSingleEdgeOp);
// Finally, assign the new owner (if we have one).
if (owner_changed && (new_owner != nullptr)) {
DEBUG_ASSERT(owner_ == nullptr);
// Finally, go ahead and run the second half of the BlockEtc operation.
// This will actually block our thread and handle setting any timeout timers
// in the process.
// It is very important that no attempts to access |this| are made after
// either of the calls to BlockEtcPostamble (below). When someone eventually
// comes along and unblocks us from the queue, they have already taken care of
// removing us from the this wait queue. It it totally possible that the wait
// queue we were blocking in has been destroyed by the time we make it out of
// BlockEtcPostable, making |this| no longer a valid pointer.
res = BlockEtcPostamble(deadline);
return res;
void OwnedWaitQueue::WakeThreads(uint32_t wake_count, Hook on_thread_wake_hook) {
DEBUG_ASSERT(magic() == kOwnedMagic);
zx_time_t now = current_time();
WakeThreadsInternal(wake_count, now, on_thread_wake_hook);
void OwnedWaitQueue::WakeAndRequeue(uint32_t wake_count, OwnedWaitQueue* requeue_target,
uint32_t requeue_count, Thread* requeue_owner,
Hook on_thread_wake_hook, Hook on_thread_requeue_hook) {
DEBUG_ASSERT(magic() == kOwnedMagic);
DEBUG_ASSERT(requeue_target != nullptr);
DEBUG_ASSERT(requeue_target->magic() == kOwnedMagic);
zx_time_t now = current_time();
auto post_op_validate = fit::defer([this]() { ValidateSchedStateStorage(); });
// If the potential new owner of the requeue wait queue is already dead,
// then it cannot become the owner of the requeue wait queue.
if (requeue_owner != nullptr) {
// It should not be possible for a thread which is not yet running to be
// declared as the owner of an OwnedWaitQueue. Any attempts to assign
// ownership to a thread which is not yet started should have been rejected
// by layers of code above us, and a proper status code returned to the
// user.
DEBUG_ASSERT(requeue_owner->state() != THREAD_INITIAL);
if (requeue_owner->state() == THREAD_DEATH) {
requeue_owner = nullptr;
// Wake the specified number of threads and assign a new owner if needed.
WakeThreadsInternal(wake_count, now, on_thread_wake_hook);
// If the requeue target currently has an owner, and the owner is changing,
// start by clearing the owner from the queue.
if (requeue_target->owner_ && (requeue_target->owner_ != requeue_owner)) {
// If there are still threads left in the wake queue (this), and we were asked to
// requeue threads, then do so.
uint32_t requeued = 0;
if (!this->IsEmpty()) {
while (requeued < requeue_count) {
// Consider the thread that the queue considers to be the most important to
// wake right now. If there are no threads left in the queue, then we are
// done.
Thread* t = Peek(now);
if (t == nullptr) {
// Call the user's requeue hook so that we can decide what to do
// with this thread.
using Action = Hook::Action;
Action action = on_thread_requeue_hook(t);
// It is illegal to ask for a requeue operation to assign ownership.
DEBUG_ASSERT(action != Action::SelectAndAssignOwner);
// If we are supposed to stop, do so now.
if (action == Action::Stop) {
// If attempting to move this thread to the requeue target would create a
// cycle with the new owner, then clear ownership of the queue instead.
// TODO(johngro): Change this when we switch to propagating the error
// instead of simply removing ownership.
if (CheckForCycle(requeue_target, requeue_owner, t)) {
requeue_owner = nullptr;
// Actually move the thread from this to the requeue_target.
t->wait_queue_state().blocking_wait_queue_ = nullptr;
BeginPropagate(*t, *this, RemoveSingleEdgeOp);
t->wait_queue_state().blocking_wait_queue_ = requeue_target;
BeginPropagate(*t, *requeue_target, AddSingleEdgeOp);
// SelectAndKeepGoing is the only legal choice left.
DEBUG_ASSERT(action == Action::SelectAndKeepGoing);
// If there is no one waiting in the requeue target, then it is not allowed to
// have an owner.
if (requeue_target->IsEmpty()) {
requeue_owner = nullptr;
// Explicit instantiation of a variant of the generic BeginPropagate method used in
// during thread unblock operations.
template void OwnedWaitQueue::BeginPropagate(Thread&, OwnedWaitQueue&, RemoveSingleEdgeTag);