blob: 5870ca463d9201ec6b692d24e5a500421536a3b8 [file] [log] [blame] [edit]
// Copyright 2016 The Fuchsia Authors
//
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT
#include "object/channel_dispatcher.h"
#include <assert.h>
#include <lib/counters.h>
#include <platform.h>
#include <string.h>
#include <trace.h>
#include <zircon/errors.h>
#include <zircon/rights.h>
#include <zircon/syscalls/object.h>
#include <zircon/types.h>
#include <fbl/alloc_checker.h>
#include <kernel/event.h>
#include <ktl/algorithm.h>
#include <object/handle.h>
#include <object/message_packet.h>
#include <object/process_dispatcher.h>
#include <object/thread_dispatcher.h>
#define LOCAL_TRACE 0
KCOUNTER(channel_packet_depth_1, "channel.depth.1")
KCOUNTER(channel_packet_depth_4, "channel.depth.4")
KCOUNTER(channel_packet_depth_16, "channel.depth.16")
KCOUNTER(channel_packet_depth_64, "channel.depth.64")
KCOUNTER(channel_packet_depth_256, "channel.depth.256")
KCOUNTER(channel_packet_depth_unbounded, "channel.depth.unbounded")
KCOUNTER(channel_full, "channel.full")
KCOUNTER(dispatcher_channel_create_count, "dispatcher.channel.create")
KCOUNTER(dispatcher_channel_destroy_count, "dispatcher.channel.destroy")
namespace {
// Temporary hack to chase down bugs like https://fxbug.dev/42123699 where upwards of 250MB of ipc
// memory is consumed. The bet is that even if each message is at max size there
// should be one or two channels with thousands of messages. If so, this check adds
// no overhead to the existing code. See https://fxbug.dev/42124465.
// TODO(cpu): This limit can be lower but mojo's ChannelTest.PeerStressTest sends
// about 3K small messages. Switching to size limit is more reasonable.
constexpr size_t kMaxPendingMessageCount = 3500;
constexpr size_t kWarnPendingMessageCount = kMaxPendingMessageCount / 2;
// This value is part of the zx_channel_call contract.
constexpr uint32_t kMinKernelGeneratedTxid = 0x80000000u;
// The maximum number of handles to include in channel message traces.
constexpr uint32_t kMaxTraceHandles = 4;
bool IsKernelGeneratedTxid(zx_txid_t txid) { return txid >= kMinKernelGeneratedTxid; }
// Randomly generated multilinear hash coefficients. These should be sufficient for non-user builds
// where tracing syscalls are enabled. In the future, if we elect to enable tracing facilities in
// user builds, this can be strengthened by generating the coefficients during boot.
constexpr uint64_t kHashCoefficients[] = {
0xa573c3ccbd7e2010ULL, 0x165cbcf3a0de8544ULL, 0x8b975f576f025514ULL,
0xabc406ce862c9a1dULL, 0xf292bea1a3fe6bedULL, 0x1c7c06b8b02b4585ULL,
};
// 64bit to 32bit hash using the multilinear hash family ax + by + c.
inline uint32_t HashValue(uint64_t a, uint64_t b, uint64_t c, uint64_t value) {
const uint32_t x = static_cast<uint32_t>(value);
const uint32_t y = static_cast<uint32_t>(value >> 32);
return static_cast<uint32_t>((a * x + b * y + c) >> 32);
}
// Two hash functions using different randomly generated coefficients.
inline uint32_t HashA(uint64_t value) {
return HashValue(kHashCoefficients[0], kHashCoefficients[1], kHashCoefficients[2], value);
}
inline uint32_t HashB(uint64_t value) {
return HashValue(kHashCoefficients[3], kHashCoefficients[4], kHashCoefficients[5], value);
}
inline uint32_t HashB(uint32_t high, uint32_t low) {
return HashB(static_cast<uint64_t>(high) << 32 | low);
}
// Generates a flow id using a universal hash function of the minimum endpoint koid and the txid or
// message packet address, depending on whether the txid is non-zero.
//
// In general, koids are guaranteed to be unique over the lifetime of a particular system boot.
// Using the min endpoint koid ensures both endpoints use the same hash input. A txid is shared
// between sender and receiver is expected to be unique (guaranteed for kernel-generated txids)
// among the set of txids for messages pending in a particular channel. Likewise, the message packet
// address is shared between the sender and receiver and is guaranteed to be unique among the set of
// pointers to pending messages.
//
// Given that the (koid, txid) or (koid, &msg) pair is likely to be unique over the span of the
// flow, the likelihood of id confusion is equivalent to the likelihood of hash collisions by
// temporally overlapping flows.
uint64_t ChannelMessageFlowId(const MessagePacket& msg, const ChannelDispatcher* channel) {
const zx_koid_t min_koid = ktl::min(channel->get_koid(), channel->get_related_koid());
// Use the top bit of the message id to indicate whether the input was a txid, which can be used
// to correlate a later response message, or a message pointer, which cannot. The 32 bit txid is
// combined with the bottom 32 bits of the channel koid as inputs to HashB to improve the
// uniqueness of the message id.
const uint32_t is_txid_mask = 1u << 31;
const uint32_t message_id =
msg.fidl_header().txid == 0
? HashB(reinterpret_cast<uint64_t>(&msg)) & ~is_txid_mask
: HashB(msg.fidl_header().txid, static_cast<uint32_t>(min_koid)) | is_txid_mask;
const uint64_t high = HashA(min_koid);
const uint64_t low = message_id;
return high << 32 | low;
}
enum class MessageOp : uint8_t {
Write,
Read,
ChannelCallWriteRequest,
ChannelCallReadResponse,
};
inline void TraceMessage(const MessagePacket& msg, const ChannelDispatcher* channel,
MessageOp message_op) {
// We emit these trace events non-standardly to work around some compatibility issues:
//
// 1) We partially inline the trace macro so that we can purposely emit 0-length durations.
//
// chrome://tracing requires flow events to be contained in a duration. Perfetto requires flows
// events to be attached to a "slice". However, the Perfetto viewer treats instant events as
// 0-length slices. This means that we can assign flows to them, and they get a special easy to
// click on arrow instead of a tiny duration bar. Using a 0-length duration gets us nice
// instant events in the Perfetto viewer, while still supporting flows in chrome://tracing.
//
// 2) Even though we know exactly when the duration ends, we emit a Begin/End pair instead of
// using a duration-complete event.
//
// Because we do so little work between creating the duration-complete scope and then emitting
// the flow event, if we emit a duration-complete event, the two events may be created with the
// same timestamp. Since the duration-complete event is only written when the scope ends, it is
// written _after_ the flow event in the trace, causing the flow to not be associated with the
// previous event, not it. By using a Begin/End pair, we ensure that though the events have the
// same timestamp, they will be read in the correct order and the flow events will be
// associated correctly.
uint64_t ts;
const auto get_timestamp = [&ts] { return ts = KTrace::Timestamp(); };
if constexpr (CHANNEL_MESSAGE_BODY_TRACING_ENABLED) {
zx_info_handle_basic_t handle_info[kMaxTraceHandles];
const auto get_handle_info = [&handle_info, &msg]() -> ktl::span<const uint8_t> {
size_t num_handles = ktl::min(msg.num_handles(), kMaxTraceHandles);
for (size_t i = 0; i < num_handles; ++i) {
const auto& handle = msg.handles()[i];
handle_info[i] = {
.koid = handle->dispatcher()->get_koid(),
.rights = handle->rights(),
.type = handle->dispatcher()->get_type(),
.related_koid = handle->dispatcher()->get_related_koid(),
.reserved = 0u,
.padding1 = {},
};
}
return ktl::span<const uint8_t>(reinterpret_cast<const uint8_t*>(handle_info),
sizeof(zx_info_handle_basic_t) * num_handles);
};
if (message_op == MessageOp::Write || message_op == MessageOp::ChannelCallWriteRequest) {
// Record message body when sending.
KTRACE_DURATION_BEGIN_TIMESTAMP(
"kernel:ipc", "ChannelMessage", get_timestamp(), ("ordinal", msg.fidl_header().ordinal),
("bytes", msg.start_of_payload()), ("handles", get_handle_info()));
} else {
// Don't record message body when receiving.
KTRACE_DURATION_BEGIN_TIMESTAMP("kernel:ipc", "ChannelMessage", get_timestamp(),
("ordinal", msg.fidl_header().ordinal));
}
} else {
KTRACE_DURATION_BEGIN_TIMESTAMP("kernel:ipc", "ChannelMessage", get_timestamp(),
("ordinal", msg.fidl_header().ordinal));
}
// When the txid is kernel-generated, Read and Write message ops are just steps in the overall
// flow that is bounded by ChannelCallWriteRequest and ChannelCallReadResponse message ops.
switch (message_op) {
case MessageOp::Write:
if (IsKernelGeneratedTxid(msg.fidl_header().txid)) {
KTRACE_FLOW_STEP_TIMESTAMP("kernel:ipc", "ChannelFlow", ts,
ChannelMessageFlowId(msg, channel));
break;
}
[[fallthrough]];
case MessageOp::ChannelCallWriteRequest:
KTRACE_FLOW_BEGIN_TIMESTAMP("kernel:ipc", "ChannelFlow", ts,
ChannelMessageFlowId(msg, channel));
break;
case MessageOp::Read:
if (IsKernelGeneratedTxid(msg.fidl_header().txid)) {
KTRACE_FLOW_STEP_TIMESTAMP("kernel:ipc", "ChannelFlow", ts,
ChannelMessageFlowId(msg, channel));
break;
}
[[fallthrough]];
case MessageOp::ChannelCallReadResponse:
KTRACE_FLOW_END_TIMESTAMP("kernel:ipc", "ChannelFlow", ts,
ChannelMessageFlowId(msg, channel));
break;
}
KTRACE_DURATION_END_TIMESTAMP("kernel:ipc", "ChannelMessage", ts);
}
} // namespace
// static
int64_t ChannelDispatcher::get_channel_full_count() { return channel_full.SumAcrossAllCpus(); }
// static
zx_status_t ChannelDispatcher::Create(KernelHandle<ChannelDispatcher>* handle0,
KernelHandle<ChannelDispatcher>* handle1,
zx_rights_t* rights) {
fbl::AllocChecker ac;
auto holder0 = fbl::AdoptRef(new (&ac) PeerHolder<ChannelDispatcher>());
if (!ac.check()) {
return ZX_ERR_NO_MEMORY;
}
auto holder1 = holder0;
KernelHandle new_handle0(fbl::AdoptRef(new (&ac) ChannelDispatcher(ktl::move(holder0))));
if (!ac.check()) {
return ZX_ERR_NO_MEMORY;
}
KernelHandle new_handle1(fbl::AdoptRef(new (&ac) ChannelDispatcher(ktl::move(holder1))));
if (!ac.check()) {
return ZX_ERR_NO_MEMORY;
}
new_handle0.dispatcher()->InitPeer(new_handle1.dispatcher());
new_handle1.dispatcher()->InitPeer(new_handle0.dispatcher());
*rights = default_rights();
*handle0 = ktl::move(new_handle0);
*handle1 = ktl::move(new_handle1);
return ZX_OK;
}
ChannelDispatcher::ChannelDispatcher(fbl::RefPtr<PeerHolder<ChannelDispatcher>> holder)
: PeeredDispatcher(ktl::move(holder), ZX_CHANNEL_WRITABLE) {
kcounter_add(dispatcher_channel_create_count, 1);
}
ChannelDispatcher::~ChannelDispatcher() {
kcounter_add(dispatcher_channel_destroy_count, 1);
// At this point the other endpoint no longer holds
// a reference to us, so we can be sure we're discarding
// any remaining messages safely.
// It's not possible to do this safely in on_zero_handles()
messages_.clear();
switch (max_message_count_) {
case 0 ... 1:
kcounter_add(channel_packet_depth_1, 1);
break;
case 2 ... 4:
kcounter_add(channel_packet_depth_4, 1);
break;
case 5 ... 16:
kcounter_add(channel_packet_depth_16, 1);
break;
case 17 ... 64:
kcounter_add(channel_packet_depth_64, 1);
break;
case 65 ... 256:
kcounter_add(channel_packet_depth_256, 1);
break;
default:
kcounter_add(channel_packet_depth_unbounded, 1);
break;
}
}
void ChannelDispatcher::RemoveWaiter(MessageWaiter* waiter) {
Guard<CriticalMutex> guard{get_lock()};
if (!waiter->InContainer()) {
return;
}
waiters_.erase(*waiter);
}
void ChannelDispatcher::CancelMessageWaitersLocked(zx_status_t status) {
while (!waiters_.is_empty()) {
MessageWaiter* waiter = waiters_.pop_front();
waiter->Cancel(status);
}
}
void ChannelDispatcher::on_zero_handles_locked() {
canary_.Assert();
// (3A) Abort any waiting Call operations because we've been canceled by reason of our local
// handle going away.
CancelMessageWaitersLocked(ZX_ERR_CANCELED);
}
void ChannelDispatcher::set_owner(zx_koid_t new_owner) {
// Testing for ZX_KOID_INVALID is an optimization so we don't
// pay the cost of grabbing the lock when the endpoint moves
// from the process to channel; the one that we must get right
// is from channel to new owner.
if (new_owner == ZX_KOID_INVALID) {
return;
}
Guard<CriticalMutex> get_lock_guard{get_lock()};
Guard<CriticalMutex> messages_guard{&channel_lock_};
owner_ = new_owner;
}
// This requires holding the shared channel lock. The thread analysis
// can reason about repeated calls to get_lock() on the shared object,
// but cannot reason about the aliasing between left->get_lock() and
// right->get_lock(), which occurs above in on_zero_handles.
void ChannelDispatcher::OnPeerZeroHandlesLocked() {
canary_.Assert();
{
Guard<CriticalMutex> messages_guard{&channel_lock_};
peer_has_closed_ = true;
}
UpdateStateLocked(ZX_CHANNEL_WRITABLE, ZX_CHANNEL_PEER_CLOSED);
// (3B) Abort any waiting Call operations because we've been canceled by reason of the opposing
// endpoint going away.
CancelMessageWaitersLocked(ZX_ERR_PEER_CLOSED);
}
// This method should never acquire |get_lock()|. See the comment at |channel_lock_| for details.
zx_status_t ChannelDispatcher::Read(zx_koid_t owner, uint32_t* msg_size, uint32_t* msg_handle_count,
MessagePacketPtr* msg, bool may_discard) {
canary_.Assert();
auto max_size = *msg_size;
auto max_handle_count = *msg_handle_count;
Guard<CriticalMutex> guard{&channel_lock_};
if (owner != owner_) {
return ZX_ERR_BAD_HANDLE;
}
if (messages_.is_empty()) {
return peer_has_closed_ ? ZX_ERR_PEER_CLOSED : ZX_ERR_SHOULD_WAIT;
}
*msg_size = messages_.front().data_size();
*msg_handle_count = messages_.front().num_handles();
zx_status_t status = ZX_OK;
if (*msg_size > max_size || *msg_handle_count > max_handle_count) {
if (!may_discard) {
return ZX_ERR_BUFFER_TOO_SMALL;
}
status = ZX_ERR_BUFFER_TOO_SMALL;
}
*msg = messages_.pop_front();
if (messages_.is_empty()) {
ClearSignals(ZX_CHANNEL_READABLE);
}
if (status == ZX_OK) {
// If status is OK then we popped a non-null message from messages_.
TraceMessage(**msg, this, MessageOp::Read);
}
return status;
}
zx_status_t ChannelDispatcher::Write(zx_koid_t owner, MessagePacketPtr msg) {
canary_.Assert();
Guard<CriticalMutex> guard{get_lock()};
DEBUG_ASSERT(msg);
TraceMessage(*msg, this, MessageOp::Write);
// Failing this test is only possible if this process has two threads racing:
// one thread is issuing channel_write() and one thread is moving the handle
// to another process.
if (owner != owner_) {
return ZX_ERR_BAD_HANDLE;
}
if (!peer()) {
return ZX_ERR_PEER_CLOSED;
}
AssertHeld(*peer()->get_lock());
if (peer()->TryWriteToMessageWaiter(msg)) {
return ZX_OK;
}
peer()->WriteSelf(ktl::move(msg), nullptr);
return ZX_OK;
}
zx_txid_t ChannelDispatcher::GenerateTxid() {
// Values 1..kMinKernelGeneratedTxid are reserved for userspace.
return (++txid_) | kMinKernelGeneratedTxid;
}
zx_status_t ChannelDispatcher::Call(zx_koid_t owner, MessagePacketPtr msg,
zx_instant_mono_t deadline, MessagePacketPtr* reply) {
canary_.Assert();
ChannelDispatcher::MessageWaiter* waiter = ThreadDispatcher::GetCurrent()->GetMessageWaiter();
if (unlikely(waiter->BeginWait(fbl::RefPtr(this)) != ZX_OK)) {
// If a thread tries BeginWait'ing twice, the VDSO contract around retrying
// channel calls has been violated. Shoot the misbehaving process.
ProcessDispatcher::GetCurrent()->Kill(ZX_TASK_RETCODE_VDSO_KILL);
return ZX_ERR_BAD_STATE;
}
{
// Use time limited preemption deferral while we hold this lock. If our
// server is running with a deadline profile, (and we are not) then after we
// queue the message and signal the server, it is possible that the server
// thread:
//
// 1) Gets assigned to our core.
// 2) It reads the message we just sent.
// 3) It processes the message and responds with a write to this channel
// before we get a chance to drop the lock.
//
// This will result in an undesirable thrash sequence where:
//
// 1) The server thread contests the lock we are holding.
// 2) It suffers through the adaptive mutex spin (but it is on our CPU, so
// it will never discover that the lock is available)
// 3) It will then drop into a block transmitting its profile pressure, and
// allowing us to run again.
// 4) we will run for a very short time until we finish our notifications.
// 5) As soon as we drop the lock, we will immediately bounce back to the
// server thread which will complete its operation.
//
// Hard disabling preemption helps to avoid this thrash, but comes with a
// caveat. It may be that the observer list we need to notify is Very Long
// and takes a significant amount of time to filter and signal. We _really_
// do not want to be running with preemption disabled for very long as it
// can hold off time critical tasks. So instead of hard disabling
// preemption we use CriticalMutex and rely on it to provide time-limited
// preemption deferral.
//
// TODO(johngro): Even with time-limited preemption deferral, this
// mitigation is not ideal. We would much prefer an approach where we do
// something like move the notification step outside of the lock, or break
// the locks protecting the two message and waiter queues into two locks
// instead of a single shared lock, so that we never have to defer
// preemption. Such a solution gets complicated however, owning to
// lifecycle issues for the various SignalObservers, and the common locking
// structure of PeeredDispatchers. See https://fxbug.dev/42050802. TL;DR - someday, when
// we have had the time to carefully refactor the locking here, come back
// and remove the use of CriticalMutex.
//
Guard<CriticalMutex> guard{get_lock()};
// See Write() for an explanation of this test.
if (owner != owner_) {
return ZX_ERR_BAD_HANDLE;
}
if (!peer()) {
waiter->EndWait(reply);
return ZX_ERR_PEER_CLOSED;
}
alloc_txid:
const zx_txid_t txid = GenerateTxid();
// If there are waiting messages, ensure we have not allocated a txid
// that's already in use. This is unlikely. It's atypical for multiple
// threads to be invoking channel_call() on the same channel at once, so
// the waiter list is most commonly empty.
for (ChannelDispatcher::MessageWaiter& w : waiters_) {
if (w.get_txid() == txid) {
goto alloc_txid;
}
}
// Install our txid in the waiter and the outbound message
waiter->set_txid(txid);
msg->set_txid(txid);
TraceMessage(*msg, this, MessageOp::ChannelCallWriteRequest);
// (0) Before writing the outbound message and waiting, add our
// waiter to the list.
waiters_.push_back(waiter);
// (1) Write outbound message to opposing endpoint.
AssertHeld(*peer()->get_lock());
peer()->WriteSelf(ktl::move(msg), waiter->get_wait_queue());
}
auto process = ProcessDispatcher::GetCurrent();
const TimerSlack slack = process->GetTimerSlackPolicy();
const Deadline slackDeadline(deadline, slack);
// Reuse the code from the half-call used for retrying a Call after thread
// suspend.
return ResumeInterruptedCall(waiter, slackDeadline, reply);
}
zx_status_t ChannelDispatcher::ResumeInterruptedCall(MessageWaiter* waiter,
const Deadline& deadline,
MessagePacketPtr* reply) {
canary_.Assert();
// (2) Wait for notification via waiter's event or for the
// deadline to hit.
{
ThreadDispatcher::AutoBlocked by(ThreadDispatcher::Blocked::CHANNEL);
zx_status_t status = waiter->Wait(deadline);
if (status == ZX_ERR_INTERNAL_INTR_RETRY) {
// If we got interrupted, return out to usermode, but
// do not clear the waiter.
return status;
}
}
// (3) see (3A), (3B) above or (3C) below for paths where
// the waiter could be signaled and removed from the list.
//
// If the deadline hits, the waiter is not removed
// from the list *but* another thread could still
// cause (3A), (3B), or (3C) before the lock below.
{
Guard<CriticalMutex> guard{get_lock()};
// (4) If any of (3A), (3B), or (3C) have occurred,
// we were removed from the waiters list already
// and EndWait() returns a non-ZX_ERR_TIMED_OUT status.
// Otherwise, the status is ZX_ERR_TIMED_OUT and it
// is our job to remove the waiter from the list.
zx_status_t status = waiter->EndWait(reply);
if (status == ZX_ERR_TIMED_OUT) {
waiters_.erase(*waiter);
}
if (*reply) {
TraceMessage(**reply, this, MessageOp::ChannelCallReadResponse);
}
return status;
}
}
bool ChannelDispatcher::TryWriteToMessageWaiter(MessagePacketPtr& msg) {
canary_.Assert();
if (waiters_.is_empty()) {
return false;
}
// If the far side has "call" waiters waiting for replies, see if this message's txid matches one
// of them. If so, deliver it. Note, because callers use a kernel generated txid we can skip
// checking the list if this message's txid isn't kernel generated.
const zx_txid_t txid = msg->get_txid();
if (!IsKernelGeneratedTxid(txid)) {
return false;
}
for (auto& waiter : waiters_) {
// (3C) Deliver message to waiter.
// Remove waiter from list.
if (waiter.get_txid() == txid) {
waiters_.erase(waiter);
waiter.Deliver(ktl::move(msg));
return true;
}
}
return false;
}
void ChannelDispatcher::WriteSelf(MessagePacketPtr msg, OwnedWaitQueue* queue_to_own) {
canary_.Assert();
// Once we've acquired the channel_lock_ we're going to make a copy of the previously active
// signals and raise the READABLE signal before dropping the lock. After we've dropped the lock,
// we'll notify observers using the previously active signals plus READABLE.
//
// There are several things to note about this sequence:
//
// 1. We must hold channel_lock_ while updating the stored signals (RaiseSignalsLocked) to
// synchronize with thread adding, removing, or canceling observers otherwise we may create a
// spurious READABLE signal (see NoSpuriousReadableSignalWhenRacing test).
//
// 2. We must release the channel_lock_ before notifying observers to ensure that Read can execute
// concurrently with NotifyObserversLocked, which is a potentially long running call.
//
// 3. We can skip the call to NotifyObserversLocked if the previously active signals contained
// READABLE (because there can't be any observers still waiting for READABLE if that signal is
// already active).
zx_signals_t previous_signals;
{
Guard<CriticalMutex> guard{&channel_lock_};
messages_.push_back(ktl::move(msg));
previous_signals = RaiseSignalsLocked(ZX_CHANNEL_READABLE);
const size_t size = messages_.size();
max_message_count_ = ktl::max(size, max_message_count_);
// TODO(cpu): Remove this hack. See comment in kMaxPendingMessageCount definition.
if (size >= kWarnPendingMessageCount) {
if (size == kWarnPendingMessageCount) {
const auto* process = ProcessDispatcher::GetCurrent();
char pname[ZX_MAX_NAME_LEN];
[[maybe_unused]] zx_status_t status = process->get_name(pname);
DEBUG_ASSERT(status == ZX_OK);
printf("KERN: warning! channel (%zu) has %zu messages (%s) (peer: %zu) (write).\n",
get_koid(), size, pname, peer()->owner_);
} else if (size > kMaxPendingMessageCount) {
const auto* process = ProcessDispatcher::GetCurrent();
char pname[ZX_MAX_NAME_LEN];
[[maybe_unused]] zx_status_t status = process->get_name(pname);
DEBUG_ASSERT(status == ZX_OK);
printf(
"KERN: channel (%zu) has %zu messages (%s) (peer: %zu) (write). Raising exception.\n",
get_koid(), size, pname, peer()->owner_);
Thread::Current::SignalPolicyException(ZX_EXCP_POLICY_CODE_CHANNEL_FULL_WRITE, 0u);
kcounter_add(channel_full, 1);
}
}
}
// Don't bother waking observers if ZX_CHANNEL_READABLE was already active.
if ((previous_signals & ZX_CHANNEL_READABLE) == 0) {
NotifyObserversLocked(previous_signals | ZX_CHANNEL_READABLE, queue_to_own);
}
}
ChannelDispatcher::MessageWaiter::~MessageWaiter() {
if (unlikely(channel_)) {
channel_->RemoveWaiter(this);
}
#if EXPERIMENTAL_CHANNEL_CALL_PROPAGATION_ENABLED
wait_queue_.ResetOwnerIfNoWaiters();
#endif
DEBUG_ASSERT(!InContainer());
}
zx_status_t ChannelDispatcher::MessageWaiter::BeginWait(fbl::RefPtr<ChannelDispatcher> channel) {
if (unlikely(channel_)) {
return ZX_ERR_BAD_STATE;
}
DEBUG_ASSERT(!InContainer());
status_ = ZX_ERR_TIMED_OUT;
channel_ = ktl::move(channel);
#if EXPERIMENTAL_CHANNEL_CALL_PROPAGATION_ENABLED
const auto do_transaction =
[&]() TA_REQ(chainlock_transaction_token) -> ChainLockTransaction::Result<> {
ChainLockGuard guard(wait_queue_.get_lock());
signaled_ = false;
return ChainLockTransaction::Done;
};
ChainLockTransaction::UntilDone(EagerReschedDisableAndIrqSaveOption,
CLT_TAG("ChannelDispatcher::MessageWaiter::BeginWait"),
do_transaction);
#else
event_.Unsignal();
#endif
return ZX_OK;
}
#if EXPERIMENTAL_CHANNEL_CALL_PROPAGATION_ENABLED
void ChannelDispatcher::MessageWaiter::Signal() {
// TODO(https://fxbug.dev/477068635): Consider merging this logic back into OwnedWaitQueue.
auto& wake_hooks = OwnedWaitQueue::default_wake_hooks();
const auto do_transaction = [&]()
TA_REQ(chainlock_transaction_token,
preempt_disabled_token) -> ChainLockTransaction::Result<> {
if (Thread::UnblockList threads;
wait_queue_.LockForWakeOperationOrBackoff(UINT32_MAX, wake_hooks, threads)) {
ChainLockTransaction::Finalize();
signaled_ = true;
wait_queue_.WakeThreadsLocked(ktl::move(threads), wake_hooks,
OwnedWaitQueue::WakeOption::AssignOwner, ForceInheritance::Yes);
wait_queue_.get_lock().Release();
return ChainLockTransaction::Done;
}
return ChainLockTransaction::Action::Backoff;
};
ChainLockTransaction::UntilDone(EagerReschedDisableAndIrqSaveOption,
CLT_TAG("ChannelDispatcher::MessageWaiter::Signal"),
do_transaction);
}
#endif
void ChannelDispatcher::MessageWaiter::Deliver(MessagePacketPtr msg) {
DEBUG_ASSERT(channel_);
msg_ = ktl::move(msg);
status_ = ZX_OK;
#if EXPERIMENTAL_CHANNEL_CALL_PROPAGATION_ENABLED
Signal();
#else
event_.Signal(ZX_OK);
#endif
}
void ChannelDispatcher::MessageWaiter::Cancel(zx_status_t status) {
DEBUG_ASSERT(!InContainer());
DEBUG_ASSERT(channel_);
status_ = status;
#if EXPERIMENTAL_CHANNEL_CALL_PROPAGATION_ENABLED
Signal();
#else
event_.Signal(status);
#endif
}
zx_status_t ChannelDispatcher::MessageWaiter::Wait(const Deadline& deadline) {
if (unlikely(!channel_)) {
return ZX_ERR_BAD_STATE;
}
#if EXPERIMENTAL_CHANNEL_CALL_PROPAGATION_ENABLED
// TODO(https://fxbug.dev/477068635): Consider merging this logic back into OwnedWaitQueue.
Thread* current_thread = Thread::Current::Get();
const auto do_transaction =
[&]() TA_REQ(chainlock_transaction_token,
preempt_disabled_token) -> ChainLockTransaction::Result<zx_status_t> {
wait_queue_.get_lock().AcquireFirstInChain();
if (signaled_) {
wait_queue_.get_lock().Release();
return ZX_OK;
}
Thread* new_owner = wait_queue_.owner();
if (OwnedWaitQueue::BAAOLockingDetails details;
wait_queue_.TryLockForBAAOOperationLocked(current_thread, new_owner, details)) {
ChainLockTransaction::Finalize();
const zx_status_t result = wait_queue_.BlockAndAssignOwnerLocked(
current_thread, deadline, details, ResourceOwnership::Normal, Interruptible::Yes,
ForceInheritance::Yes);
current_thread->get_lock().Release();
return result;
}
wait_queue_.get_lock().Release();
return ChainLockTransaction::Action::Backoff;
};
zx_status_t status = ChainLockTransaction::UntilDone(
EagerReschedDisableAndIrqSaveOption, CLT_TAG("ChannelDispatcher::MessageWaiter::Wait"),
do_transaction);
if (status != ZX_OK) {
return status;
}
return status_;
#else
return event_.Wait(deadline);
#endif
}
// Returns any delivered message via out and the status.
zx_status_t ChannelDispatcher::MessageWaiter::EndWait(MessagePacketPtr* out) {
if (unlikely(!channel_)) {
return ZX_ERR_BAD_STATE;
}
*out = ktl::move(msg_);
channel_ = nullptr;
return status_;
}