blob: 45a0303bf67ec411db421651f165c3f5d5d60432 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors
//
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT
#include "object/channel_dispatcher.h"
#include <assert.h>
#include <lib/counters.h>
#include <platform.h>
#include <string.h>
#include <trace.h>
#include <zircon/errors.h>
#include <zircon/rights.h>
#include <zircon/syscalls/object.h>
#include <zircon/types.h>
#include <fbl/alloc_checker.h>
#include <fbl/auto_lock.h>
#include <kernel/event.h>
#include <object/handle.h>
#include <object/message_packet.h>
#include <object/process_dispatcher.h>
#include <object/thread_dispatcher.h>
#define LOCAL_TRACE 0
KCOUNTER(channel_packet_depth_1, "channel.depth.1")
KCOUNTER(channel_packet_depth_4, "channel.depth.4")
KCOUNTER(channel_packet_depth_16, "channel.depth.16")
KCOUNTER(channel_packet_depth_64, "channel.depth.64")
KCOUNTER(channel_packet_depth_256, "channel.depth.256")
KCOUNTER(channel_packet_depth_unbounded, "channel.depth.unbounded")
KCOUNTER(channel_full, "channel.full")
KCOUNTER(dispatcher_channel_create_count, "dispatcher.channel.create")
KCOUNTER(dispatcher_channel_destroy_count, "dispatcher.channel.destroy")
namespace {
// Temporary hack to chase down bugs like fxbug.dev/47000 where upwards of 250MB of ipc
// memory is consumed. The bet is that even if each message is at max size there
// should be one or two channels with thousands of messages. If so, this check adds
// no overhead to the existing code. See fxbug.dev/47691.
// TODO(cpu): This limit can be lower but mojo's ChannelTest.PeerStressTest sends
// about 3K small messages. Switching to size limit is more reasonable.
constexpr size_t kMaxPendingMessageCount = 3500;
constexpr size_t kWarnPendingMessageCount = kMaxPendingMessageCount / 2;
// This value is part of the zx_channel_call contract.
constexpr uint32_t kMinKernelGeneratedTxid = 0x80000000u;
bool IsKernelGeneratedTxid(zx_txid_t txid) { return txid >= kMinKernelGeneratedTxid; }
} // namespace
// static
int64_t ChannelDispatcher::get_channel_full_count() { return channel_full.SumAcrossAllCpus(); }
// static
zx_status_t ChannelDispatcher::Create(KernelHandle<ChannelDispatcher>* handle0,
KernelHandle<ChannelDispatcher>* handle1,
zx_rights_t* rights) {
fbl::AllocChecker ac;
auto holder0 = fbl::AdoptRef(new (&ac) PeerHolder<ChannelDispatcher>());
if (!ac.check()) {
return ZX_ERR_NO_MEMORY;
}
auto holder1 = holder0;
KernelHandle new_handle0(fbl::AdoptRef(new (&ac) ChannelDispatcher(ktl::move(holder0))));
if (!ac.check()) {
return ZX_ERR_NO_MEMORY;
}
KernelHandle new_handle1(fbl::AdoptRef(new (&ac) ChannelDispatcher(ktl::move(holder1))));
if (!ac.check()) {
return ZX_ERR_NO_MEMORY;
}
new_handle0.dispatcher()->InitPeer(new_handle1.dispatcher());
new_handle1.dispatcher()->InitPeer(new_handle0.dispatcher());
*rights = default_rights();
*handle0 = ktl::move(new_handle0);
*handle1 = ktl::move(new_handle1);
return ZX_OK;
}
ChannelDispatcher::ChannelDispatcher(fbl::RefPtr<PeerHolder<ChannelDispatcher>> holder)
: PeeredDispatcher(ktl::move(holder), ZX_CHANNEL_WRITABLE) {
kcounter_add(dispatcher_channel_create_count, 1);
}
ChannelDispatcher::~ChannelDispatcher() {
kcounter_add(dispatcher_channel_destroy_count, 1);
// At this point the other endpoint no longer holds
// a reference to us, so we can be sure we're discarding
// any remaining messages safely.
// It's not possible to do this safely in on_zero_handles()
messages_.clear();
switch (max_message_count_) {
case 0 ... 1:
kcounter_add(channel_packet_depth_1, 1);
break;
case 2 ... 4:
kcounter_add(channel_packet_depth_4, 1);
break;
case 5 ... 16:
kcounter_add(channel_packet_depth_16, 1);
break;
case 17 ... 64:
kcounter_add(channel_packet_depth_64, 1);
break;
case 65 ... 256:
kcounter_add(channel_packet_depth_256, 1);
break;
default:
kcounter_add(channel_packet_depth_unbounded, 1);
break;
}
}
void ChannelDispatcher::RemoveWaiter(MessageWaiter* waiter) {
Guard<Mutex> guard{get_lock()};
if (!waiter->InContainer()) {
return;
}
waiters_.erase(*waiter);
}
void ChannelDispatcher::on_zero_handles_locked() {
canary_.Assert();
// (3A) Abort any waiting Call operations
// because we've been canceled by reason
// of our local handle going away.
// Remove waiter from list.
while (!waiters_.is_empty()) {
auto waiter = waiters_.pop_front();
waiter->Cancel(ZX_ERR_CANCELED);
}
}
void ChannelDispatcher::set_owner(zx_koid_t new_owner) {
// Testing for ZX_KOID_INVALID is an optimization so we don't
// pay the cost of grabbing the lock when the endpoint moves
// from the process to channel; the one that we must get right
// is from channel to new owner.
if (new_owner == ZX_KOID_INVALID) {
return;
}
Guard<Mutex> get_lock_guard{get_lock()};
Guard<Mutex> messages_guard{&channel_lock_};
owner_ = new_owner;
}
// This requires holding the shared channel lock. The thread analysis
// can reason about repeated calls to get_lock() on the shared object,
// but cannot reason about the aliasing between left->get_lock() and
// right->get_lock(), which occurs above in on_zero_handles.
void ChannelDispatcher::OnPeerZeroHandlesLocked() {
canary_.Assert();
{
Guard<Mutex> messages_guard{&channel_lock_};
peer_has_closed_ = true;
}
UpdateStateLocked(ZX_CHANNEL_WRITABLE, ZX_CHANNEL_PEER_CLOSED);
// (3B) Abort any waiting Call operations
// because we've been canceled by reason
// of the opposing endpoint going away.
// Remove waiter from list.
while (!waiters_.is_empty()) {
auto waiter = waiters_.pop_front();
waiter->Cancel(ZX_ERR_PEER_CLOSED);
}
}
// This method should never acquire |get_lock()|. See the comment at |channel_lock_| for details.
zx_status_t ChannelDispatcher::Read(zx_koid_t owner, uint32_t* msg_size, uint32_t* msg_handle_count,
MessagePacketPtr* msg, bool may_discard) {
canary_.Assert();
auto max_size = *msg_size;
auto max_handle_count = *msg_handle_count;
Guard<Mutex> guard{&channel_lock_};
if (owner != owner_) {
return ZX_ERR_BAD_HANDLE;
}
if (messages_.is_empty()) {
return peer_has_closed_ ? ZX_ERR_PEER_CLOSED : ZX_ERR_SHOULD_WAIT;
}
*msg_size = messages_.front().data_size();
*msg_handle_count = messages_.front().num_handles();
zx_status_t status = ZX_OK;
if (*msg_size > max_size || *msg_handle_count > max_handle_count) {
if (!may_discard) {
return ZX_ERR_BUFFER_TOO_SMALL;
}
status = ZX_ERR_BUFFER_TOO_SMALL;
}
*msg = messages_.pop_front();
if (messages_.is_empty()) {
ClearSignals(ZX_CHANNEL_READABLE);
}
return status;
}
zx_status_t ChannelDispatcher::Write(zx_koid_t owner, MessagePacketPtr msg) {
canary_.Assert();
// See the notes in ChannelDispatcher::Call about the reasoning behind the
// AutoPreemptDisabler.
AutoPreemptDisabler preempt_disabler;
Guard<Mutex> guard{get_lock()};
// Failing this test is only possible if this process has two threads racing:
// one thread is issuing channel_write() and one thread is moving the handle
// to another process.
if (owner != owner_) {
return ZX_ERR_BAD_HANDLE;
}
if (!peer()) {
return ZX_ERR_PEER_CLOSED;
}
AssertHeld(*peer()->get_lock());
if (peer()->TryWriteToMessageWaiter(msg)) {
return ZX_OK;
}
if (peer()->GetObserverListSizeLocked() > kLongObserverListThreshold) {
preempt_disabler.Enable();
}
peer()->WriteSelf(ktl::move(msg));
return ZX_OK;
}
zx_txid_t ChannelDispatcher::GenerateTxid() {
// Values 1..kMinKernelGeneratedTxid are reserved for userspace.
return (++txid_) | kMinKernelGeneratedTxid;
}
zx_status_t ChannelDispatcher::Call(zx_koid_t owner, MessagePacketPtr msg, zx_time_t deadline,
MessagePacketPtr* reply) {
canary_.Assert();
ChannelDispatcher::MessageWaiter* waiter = ThreadDispatcher::GetCurrent()->GetMessageWaiter();
if (unlikely(waiter->BeginWait(fbl::RefPtr(this)) != ZX_OK)) {
// If a thread tries BeginWait'ing twice, the VDSO contract around retrying
// channel calls has been violated. Shoot the misbehaving process.
ProcessDispatcher::GetCurrent()->Kill(ZX_TASK_RETCODE_VDSO_KILL);
return ZX_ERR_BAD_STATE;
}
{
// Disable preemption while we hold this lock. If our server is running
// with a deadline profile, (and we are not) then after we queue the message
// and signal the server, it is possible that the server thread:
//
// 1) Gets assigned to our core.
// 2) It reads the message we just sent.
// 3) It processes the message and responds with a write to this channel
// before we get a chance to drop the lock.
//
// This will result in an undesirable thrash sequence where:
//
// 1) The server thread contests the lock we are holding.
// 2) It suffers through the adaptive mutex spin (but it is on our CPU, so
// it will never discover that the lock is available)
// 3) It will then drop into a block transmitting its profile pressure, and
// allowing us to run again.
// 4) we will run for a very short time until we finish our notifications.
// 5) As soon as we drop the lock, we will immediately bounce back to the
// server thread which will complete its operation.
//
// Disabling preemption helps to avoid this thread, but comes with a caveat.
// It may be that the observer list we need to notify is Very Long and takes
// a significant amount of time to filter and signal. We _really_ do not
// want to be running with preemption disabled for very long as it can hold
// off time critical tasks. So, if our observer queue is "long" (as defined
// by |kLongObserverListThreshold|), we drop the APD before performing the
// notify. This is not great, and could result in lock thrash, but at least
// we will not hold off time critical tasks.
//
// TODO(johngro): This mitigation is really not great. We would much prefer
// an approach where we do something like move the notification step outside
// of the lock, or break the locks protecting the two message and waiter
// queues into two locks instead of a single shared lock, so that we never
// have to disable preemption no matter how long the list of waiters is.
// This solutions get complicated however, owning to lifecycle issues for
// the various SignalObservers, and the common locking structure of
// PeeredDispatchers. See fxb/100122. TL;DR - someday, when we have had
// the time to carefully refactor the locking here, come back and remove
// this APD mitigation.
//
AutoPreemptDisabler preempt_disabler;
Guard<Mutex> guard{get_lock()};
// See Write() for an explanation of this test.
if (owner != owner_) {
return ZX_ERR_BAD_HANDLE;
}
if (!peer()) {
waiter->EndWait(reply);
return ZX_ERR_PEER_CLOSED;
}
alloc_txid:
const zx_txid_t txid = GenerateTxid();
// If there are waiting messages, ensure we have not allocated a txid
// that's already in use. This is unlikely. It's atypical for multiple
// threads to be invoking channel_call() on the same channel at once, so
// the waiter list is most commonly empty.
for (ChannelDispatcher::MessageWaiter& w : waiters_) {
if (w.get_txid() == txid) {
goto alloc_txid;
}
}
// Install our txid in the waiter and the outbound message
waiter->set_txid(txid);
msg->set_txid(txid);
// (0) Before writing the outbound message and waiting, add our
// waiter to the list.
waiters_.push_back(waiter);
// (1) Write outbound message to opposing endpoint. Re-enable preemption if
// our current wait observer queue is "long".
AssertHeld(*peer()->get_lock());
if (peer()->GetObserverListSizeLocked() > kLongObserverListThreshold) {
preempt_disabler.Enable();
}
peer()->WriteSelf(ktl::move(msg));
}
auto process = ProcessDispatcher::GetCurrent();
const TimerSlack slack = process->GetTimerSlackPolicy();
const Deadline slackDeadline(deadline, slack);
// Reuse the code from the half-call used for retrying a Call after thread
// suspend.
return ResumeInterruptedCall(waiter, slackDeadline, reply);
}
zx_status_t ChannelDispatcher::ResumeInterruptedCall(MessageWaiter* waiter,
const Deadline& deadline,
MessagePacketPtr* reply) {
canary_.Assert();
// (2) Wait for notification via waiter's event or for the
// deadline to hit.
{
ThreadDispatcher::AutoBlocked by(ThreadDispatcher::Blocked::CHANNEL);
zx_status_t status = waiter->Wait(deadline);
if (status == ZX_ERR_INTERNAL_INTR_RETRY) {
// If we got interrupted, return out to usermode, but
// do not clear the waiter.
return status;
}
}
// (3) see (3A), (3B) above or (3C) below for paths where
// the waiter could be signaled and removed from the list.
//
// If the deadline hits, the waiter is not removed
// from the list *but* another thread could still
// cause (3A), (3B), or (3C) before the lock below.
{
Guard<Mutex> guard{get_lock()};
// (4) If any of (3A), (3B), or (3C) have occurred,
// we were removed from the waiters list already
// and EndWait() returns a non-ZX_ERR_TIMED_OUT status.
// Otherwise, the status is ZX_ERR_TIMED_OUT and it
// is our job to remove the waiter from the list.
zx_status_t status = waiter->EndWait(reply);
if (status == ZX_ERR_TIMED_OUT) {
waiters_.erase(*waiter);
}
return status;
}
}
bool ChannelDispatcher::TryWriteToMessageWaiter(MessagePacketPtr& msg) {
canary_.Assert();
if (waiters_.is_empty()) {
return false;
}
// If the far side has "call" waiters waiting for replies, see if this message's txid matches one
// of them. If so, deliver it. Note, because callers use a kernel generated txid we can skip
// checking the list if this message's txid isn't kernel generated.
const zx_txid_t txid = msg->get_txid();
if (!IsKernelGeneratedTxid(txid)) {
return false;
}
for (auto& waiter : waiters_) {
// (3C) Deliver message to waiter.
// Remove waiter from list.
if (waiter.get_txid() == txid) {
waiters_.erase(waiter);
waiter.Deliver(ktl::move(msg));
return true;
}
}
return false;
}
void ChannelDispatcher::WriteSelf(MessagePacketPtr msg) {
canary_.Assert();
// Once we've acquired the channel_lock_ we're going to make a copy of the previously active
// signals and raise the READABLE signal before dropping the lock. After we've dropped the lock,
// we'll notify observers using the previously active signals plus READABLE.
//
// There are several things to note about this sequence:
//
// 1. We must hold channel_lock_ while updating the stored signals (RaiseSignalsLocked) to
// synchronize with thread adding, removing, or canceling observers otherwise we may create a
// spurious READABLE signal (see NoSpuriousReadableSignalWhenRacing test).
//
// 2. We must release the channel_lock_ before notifying observers to ensure that Read can execute
// concurrently with NotifyObserversLocked, which is a potentially long running call.
//
// 3. We can skip the call to NotifyObserversLocked if the previously active signals contained
// READABLE (because there can't be any observers still waiting for READABLE if that signal is
// already active).
zx_signals_t previous_signals;
{
Guard<Mutex> guard{&channel_lock_};
messages_.push_back(ktl::move(msg));
previous_signals = RaiseSignalsLocked(ZX_CHANNEL_READABLE);
const size_t size = messages_.size();
if (size > max_message_count_) {
max_message_count_ = size;
}
// TODO(cpu): Remove this hack. See comment in kMaxPendingMessageCount definition.
if (size >= kWarnPendingMessageCount) {
if (size == kWarnPendingMessageCount) {
const auto* process = ProcessDispatcher::GetCurrent();
char pname[ZX_MAX_NAME_LEN];
process->get_name(pname);
printf("KERN: warning! channel (%zu) has %zu messages (%s) (write).\n", get_koid(), size,
pname);
} else if (size > kMaxPendingMessageCount) {
const auto* process = ProcessDispatcher::GetCurrent();
char pname[ZX_MAX_NAME_LEN];
process->get_name(pname);
printf("KERN: channel (%zu) has %zu messages (%s) (write). Raising exception.\n",
get_koid(), size, pname);
Thread::Current::SignalPolicyException(ZX_EXCP_POLICY_CODE_CHANNEL_FULL_WRITE, 0u);
kcounter_add(channel_full, 1);
}
}
}
// Don't bother waking observers if ZX_CHANNEL_READABLE was already active.
if ((previous_signals & ZX_CHANNEL_READABLE) == 0) {
NotifyObserversLocked(previous_signals | ZX_CHANNEL_READABLE);
}
}
ChannelDispatcher::MessageWaiter::~MessageWaiter() {
if (unlikely(channel_)) {
channel_->RemoveWaiter(this);
}
DEBUG_ASSERT(!InContainer());
}
zx_status_t ChannelDispatcher::MessageWaiter::BeginWait(fbl::RefPtr<ChannelDispatcher> channel) {
if (unlikely(channel_)) {
return ZX_ERR_BAD_STATE;
}
DEBUG_ASSERT(!InContainer());
status_ = ZX_ERR_TIMED_OUT;
channel_ = ktl::move(channel);
event_.Unsignal();
return ZX_OK;
}
void ChannelDispatcher::MessageWaiter::Deliver(MessagePacketPtr msg) {
DEBUG_ASSERT(channel_);
msg_ = ktl::move(msg);
status_ = ZX_OK;
event_.Signal(ZX_OK);
}
void ChannelDispatcher::MessageWaiter::Cancel(zx_status_t status) {
DEBUG_ASSERT(!InContainer());
DEBUG_ASSERT(channel_);
status_ = status;
event_.Signal(status);
}
zx_status_t ChannelDispatcher::MessageWaiter::Wait(const Deadline& deadline) {
if (unlikely(!channel_)) {
return ZX_ERR_BAD_STATE;
}
return event_.Wait(deadline);
}
// Returns any delivered message via out and the status.
zx_status_t ChannelDispatcher::MessageWaiter::EndWait(MessagePacketPtr* out) {
if (unlikely(!channel_)) {
return ZX_ERR_BAD_STATE;
}
*out = ktl::move(msg_);
channel_ = nullptr;
return status_;
}