blob: eb2401fbe193632c0458e5f403963fca5ffe7852 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <zircon/syscalls.h>
#include <zircon/types.h>
#include <fbl/auto_call.h>
#include <dispatcher-pool/dispatcher-channel.h>
#include <dispatcher-pool/dispatcher-event-source.h>
#include <dispatcher-pool/dispatcher-execution-domain.h>
#include <dispatcher-pool/dispatcher-thread-pool.h>
#include <utility>
namespace dispatcher {
zx_status_t Channel::Activate(zx::channel* client_channel_out,
fbl::RefPtr<ExecutionDomain> domain,
ProcessHandler process_handler,
ChannelClosedHandler channel_closed_handler) {
// Arg and constant state checks first
if ((client_channel_out == nullptr) || client_channel_out->is_valid())
return ZX_ERR_INVALID_ARGS;
if (domain == nullptr)
return ZX_ERR_INVALID_ARGS;
// Create the channel endpoints.
zx::channel channel;
zx_status_t res;
res = zx::channel::create(0u, &channel, client_channel_out);
if (res != ZX_OK)
return res;
// Attempt to activate.
res = Activate(std::move(channel),
std::move(domain),
std::move(process_handler),
std::move(channel_closed_handler));
// If something went wrong, make sure we close the channel endpoint we were
// going to give back to the caller.
if (res != ZX_OK)
client_channel_out->reset();
return res;
}
zx_status_t Channel::Activate(zx::channel channel,
fbl::RefPtr<ExecutionDomain> domain,
ProcessHandler process_handler,
ChannelClosedHandler channel_closed_handler) {
// In order to activate, the supplied execution domain and channel, and
// process handler must all be valid. Only the deactivate handler is
// optional.
if ((domain == nullptr) || !channel.is_valid() || (process_handler == nullptr))
return ZX_ERR_INVALID_ARGS;
zx_status_t ret;
{
fbl::AutoLock obj_lock(&obj_lock_);
if ((process_handler_ != nullptr) || (channel_closed_handler_ != nullptr))
return ZX_ERR_BAD_STATE;
ret = ActivateLocked(std::move(channel), std::move(domain));
// If we succeeded, take control of the handlers provided by our caller.
// Otherwise, wait until we are outside of our lock before we let the
// handler state go out of scope and destruct.
if (ret == ZX_OK) {
ZX_DEBUG_ASSERT(process_handler_ == nullptr);
ZX_DEBUG_ASSERT(channel_closed_handler_ == nullptr);
process_handler_ = std::move(process_handler);
channel_closed_handler_ = std::move(channel_closed_handler);
}
}
return ret;
}
void Channel::Deactivate() {
ProcessHandler old_process_handler;
ChannelClosedHandler old_channel_closed_handler;
{
fbl::AutoLock obj_lock(&obj_lock_);
InternalDeactivateLocked();
// If we are in the process of actively dispatching, do not discard our
// handlers just yet. They are currently being used by the dispatch
// thread. Instead, wait until the dispatch thread unwinds and allow it
// to clean up the handlers.
//
// Otherwise, transfer the handler state into local storage and let them
// destruct after we have released the object lock.
if (dispatch_state() != DispatchState::Dispatching) {
ZX_DEBUG_ASSERT((dispatch_state() == DispatchState::Idle) ||
(dispatch_state() == DispatchState::WaitingOnPort));
old_process_handler = std::move(process_handler_);
old_channel_closed_handler = std::move(channel_closed_handler_);
}
}
}
zx_status_t Channel::ActivateLocked(zx::channel channel, fbl::RefPtr<ExecutionDomain> domain) {
ZX_DEBUG_ASSERT((domain != nullptr) && channel.is_valid());
// Take ownership of the channel resource and execution domain reference.
zx_status_t res = EventSource::ActivateLocked(std::move(channel), std::move(domain));
if (res != ZX_OK) {
return res;
}
// Setup our initial async wait operation on our thread pool's port.
res = WaitOnPortLocked();
if (res != ZX_OK) {
InternalDeactivateLocked();
return res;
}
return res;
}
void Channel::Dispatch(ExecutionDomain* domain) {
// No one should be calling us if we have no messages to read.
ZX_DEBUG_ASSERT(domain != nullptr);
ZX_DEBUG_ASSERT(process_handler_ != nullptr);
ZX_DEBUG_ASSERT(pending_pkt_.signal.observed & process_signal_mask());
bool signal_channel_closed = (pending_pkt_.signal.observed & ZX_CHANNEL_PEER_CLOSED);
// Do we have messages to dispatch?
if (pending_pkt_.signal.observed & ZX_CHANNEL_READABLE) {
// Process all of the pending messages in the channel before re-joining
// the thread pool.
//
// TODO(johngro) : Start to establish some sort of fair scheduler-like
// behavior. We do not want to dominate the thread pool processing a
// single channel for a single client.
ZX_DEBUG_ASSERT(pending_pkt_.signal.count);
for (uint64_t i = 0; i < pending_pkt_.signal.count; ++i) {
if (domain->deactivated())
break;
if (process_handler_(this) != ZX_OK) {
signal_channel_closed = true;
break;
}
}
}
// If the other side has closed our channel, or there was an error during
// dispatch, attempt to call our deactivate handler (if it still exists).
if (signal_channel_closed && (channel_closed_handler_ != nullptr)) {
channel_closed_handler_(this);
}
// Ok, for better or worse, dispatch is now complete. Enter the lock and
// deal with state transition. If things are still healthy, attempt to wait
// on our thread-pool's port. If things are not healthy, go through the
// process of deactivation.
ProcessHandler old_process_handler;
ChannelClosedHandler old_channel_closed_handler;
{
fbl::AutoLock obj_lock(&obj_lock_);
ZX_DEBUG_ASSERT(dispatch_state() == DispatchState::Dispatching);
dispatch_state_ = DispatchState::Idle;
// If we had an error during processing, or our peer closed their end of
// the channel, make sure that we have released our handle and our
// domain reference.
if (signal_channel_closed) {
InternalDeactivateLocked();
}
// If we are still active, attempt to set up the next wait operation.
// If this fails (it should never fail) then automatically deactivate
// ourselves.
if (is_active()) {
ZX_DEBUG_ASSERT(handle_.is_valid());
zx_status_t res = WaitOnPortLocked();
if (res != ZX_OK) {
// TODO(johngro) : Log something about this.
InternalDeactivateLocked();
} else {
ZX_DEBUG_ASSERT(dispatch_state() == DispatchState::WaitingOnPort);
}
}
// If we have become deactivated for any reason, transfer our handler
// state to local storage so that the handlers can destruct from outside
// of our main lock.
if (!is_active()) {
old_process_handler = std::move(process_handler_);
old_channel_closed_handler = std::move(channel_closed_handler_);
}
}
}
zx_status_t Channel::Read(void* buf,
uint32_t buf_len,
uint32_t* bytes_read_out,
zx::handle* rxed_handle) {
if (!buf || !buf_len || !bytes_read_out ||
((rxed_handle != nullptr) && rxed_handle->is_valid()))
return ZX_ERR_INVALID_ARGS;
fbl::AutoLock obj_lock(&obj_lock_);
if (!handle_.is_valid())
return ZX_ERR_BAD_HANDLE;
uint32_t rxed_handle_count = 0;
return zx_channel_read(handle_.get(),
0,
buf,
rxed_handle ? rxed_handle->reset_and_get_address() : nullptr,
buf_len,
rxed_handle ? 1 : 0,
bytes_read_out,
&rxed_handle_count);
}
zx_status_t Channel::Write(const void* buf,
uint32_t buf_len,
zx::handle&& tx_handle) {
if (!buf || !buf_len)
return ZX_ERR_INVALID_ARGS;
fbl::AutoLock obj_lock(&obj_lock_);
if (!handle_.is_valid())
return ZX_ERR_BAD_HANDLE;
if (!tx_handle.is_valid())
return zx_channel_write(handle_.get(), 0, buf, buf_len, nullptr, 0);
zx_handle_t h = tx_handle.release();
return zx_channel_write(handle_.get(), 0, buf, buf_len, &h, 1);
}
} // namespace dispatcher