blob: 3a8876a798f56b02919b66c96b579ff96dd27485 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/async/time.h>
#include <lib/fidl/llcpp/async_binding.h>
#include <lib/fidl/llcpp/async_transaction.h>
#include <lib/fidl/llcpp/client_base.h>
#include <lib/fidl/epitaph.h>
#include <zircon/syscalls.h>
#include <type_traits>
namespace fidl {
namespace internal {
AsyncBinding::AsyncBinding(async_dispatcher_t* dispatcher, zx::channel channel, void* impl,
bool is_server, TypeErasedOnUnboundFn on_unbound_fn,
DispatchFn dispatch_fn)
: wait_({{ASYNC_STATE_INIT},
&AsyncBinding::OnMessage,
channel.get(),
ZX_CHANNEL_PEER_CLOSED | ZX_CHANNEL_READABLE,
0}),
dispatcher_(dispatcher),
channel_(std::move(channel)),
interface_(impl),
on_unbound_fn_(std::move(on_unbound_fn)),
dispatch_fn_(std::move(dispatch_fn)),
is_server_(is_server) {
ZX_ASSERT(channel_);
}
AsyncBinding::~AsyncBinding() {
ZX_ASSERT(channel_);
if (on_delete_) {
if (out_channel_)
*out_channel_ = std::move(channel_);
sync_completion_signal(on_delete_);
}
}
void AsyncBinding::OnUnbind(zx_status_t status, UnboundReason reason) {
ZX_ASSERT(keep_alive_);
// Move the internal reference into this scope.
auto binding = std::move(keep_alive_);
{
std::scoped_lock lock(lock_);
// Indicate that no other thread should wait for unbind.
unbind_ = true;
// If the peer was not closed, and the user invoked Close() or there was a dispatch error,
// overwrite the unbound reason and recover the epitaph or error status. Note that
// UnboundReason::kUnbind is simply the default value for unbind_info_.reason.
if (reason != UnboundReason::kPeerClosed && unbind_info_.reason != UnboundReason::kUnbind) {
reason = unbind_info_.reason;
status = unbind_info_.status;
}
}
// Store the error handler and interface pointers before the binding is deleted.
auto on_unbound_fn = std::move(on_unbound_fn_);
auto* intf = interface_;
// Release the internal reference and wait for the deleter to run.
auto channel = WaitForDelete(std::move(binding));
// If required, send the epitaph.
if (channel && reason == UnboundReason::kClose) {
status = fidl_epitaph_write(channel.get(), status);
}
// Execute the unbound hook if specified.
if (on_unbound_fn) {
on_unbound_fn(intf, reason, status, std::move(channel));
}
// With no unbound callback, we close the channel here.
}
void AsyncBinding::MessageHandler(zx_status_t status, const zx_packet_signal_t* signal) {
if (status != ZX_OK)
return OnUnbind(status, UnboundReason::kInternalError);
if (signal->observed & ZX_CHANNEL_READABLE) {
char bytes[ZX_CHANNEL_MAX_MSG_BYTES];
zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES];
for (uint64_t i = 0; i < signal->count; i++) {
fidl_msg_t msg = {
.bytes = bytes,
.handles = handles,
.num_bytes = 0u,
.num_handles = 0u,
};
status = channel_.read(0, bytes, handles, ZX_CHANNEL_MAX_MSG_BYTES,
ZX_CHANNEL_MAX_MSG_HANDLES, &msg.num_bytes, &msg.num_handles);
if (status != ZX_OK || msg.num_bytes < sizeof(fidl_message_header_t)) {
if (status == ZX_OK)
status = ZX_ERR_INTERNAL;
return OnUnbind(status, UnboundReason::kInternalError);
}
// Flag indicating whether this thread still has access to the binding.
bool binding_released = false;
// Dispatch the message. If binding_released is not set, keep_alive_ is still valid and this
// thread will continue to read messages on this binding.
dispatch_fn_(keep_alive_, &msg, &binding_released, &status);
if (binding_released)
return;
// If there was any error enabling dispatch, destroy the binding.
if (status != ZX_OK)
return OnDispatchError(status);
}
// Add the wait back to the dispatcher.
if ((status = EnableNextDispatch()) != ZX_OK)
return OnDispatchError(status);
} else {
ZX_DEBUG_ASSERT(signal->observed & ZX_CHANNEL_PEER_CLOSED);
OnUnbind(ZX_ERR_PEER_CLOSED, UnboundReason::kPeerClosed);
}
}
zx_status_t AsyncBinding::BeginWait() {
std::scoped_lock lock(lock_);
ZX_ASSERT(!begun_);
begun_ = true;
auto status = async_begin_wait(dispatcher_, &wait_);
// On error, release the internal reference so it can be destroyed.
if (status != ZX_OK)
keep_alive_ = nullptr;
return status;
}
zx_status_t AsyncBinding::EnableNextDispatch() {
std::scoped_lock lock(lock_);
if (unbind_)
return ZX_ERR_CANCELED;
auto status = async_begin_wait(dispatcher_, &wait_);
if (status != ZX_OK && unbind_info_.status == ZX_OK)
unbind_info_ = {UnboundReason::kInternalError, status};
return status;
}
void AsyncBinding::UnbindInternal(std::shared_ptr<AsyncBinding>&& calling_ref,
zx_status_t* epitaph) {
ZX_ASSERT(calling_ref);
// Move the calling reference into this scope.
auto binding = std::move(calling_ref);
{
std::scoped_lock lock(lock_);
// Another thread has entered this critical section already via Unbind(), Close(), or
// OnUnbind(). Release our reference and return to unblock that caller.
if (unbind_)
return;
unbind_ = true; // Indicate that waits should no longer be added to the dispatcher.
// Attempt to cancel the current wait. On failure, a dispatcher thread will invoke OnUnbind().
if (async_cancel_wait(dispatcher_, &wait_) != ZX_OK) {
if (epitaph) // Store the epitaph in binding state.
unbind_info_ = {is_server_ ? UnboundReason::kClose : UnboundReason::kPeerClosed, *epitaph};
return;
}
}
keep_alive_ = nullptr; // No one left to access the internal reference.
// Stash data which must outlive the AsyncBinding.
auto on_unbound_fn = std::move(on_unbound_fn_);
auto* intf = interface_;
auto* dispatcher = dispatcher_;
UnboundReason reason = UnboundReason::kUnbind;
if (epitaph) {
// For a client binding, epitaph is only non-null when the epitaph message is received. As this
// function will have been invoked from the message handler, the async_cancel_wait() above will
// necessarily fail. As such, this code should only be executed on a server binding.
ZX_ASSERT(is_server_);
// TODO(madhaviyengar): Once Transaction::Reply() returns a status instead of invoking Close(),
// reason should only ever be UnboundReason::kClose.
reason = *epitaph == ZX_ERR_PEER_CLOSED ? UnboundReason::kPeerClosed : UnboundReason::kClose;
}
// Wait for deletion and take the channel. This will only wait on internal code which will not
// block indefinitely.
auto channel = WaitForDelete(std::move(binding));
// If required, send the epitaph. UnboundReason::kClose is passed to the channel unbound hook
// indicating that the epitaph was sent as well as the return status of the send.
if (channel && reason == UnboundReason::kClose)
*epitaph = fidl_epitaph_write(channel.get(), *epitaph);
if (!on_unbound_fn)
return; // channel goes out of scope here and gets closed.
// Send the error handler as part of a new task on the dispatcher. This avoids nesting user code
// in the same thread context which could cause deadlock.
auto task = new UnboundTask{
.task = {{ASYNC_STATE_INIT}, &AsyncBinding::OnUnboundTask, async_now(dispatcher)},
.on_unbound_fn = std::move(on_unbound_fn),
.intf = intf,
.channel = std::move(channel),
.status = epitaph ? *epitaph : ZX_OK,
.reason = reason};
ZX_ASSERT(async_post_task(dispatcher, &task->task) == ZX_OK);
}
zx::channel AsyncBinding::WaitForDelete(std::shared_ptr<AsyncBinding>&& calling_ref) {
sync_completion_t on_delete;
calling_ref->on_delete_ = &on_delete;
zx::channel channel;
calling_ref->out_channel_ = &channel;
calling_ref.reset();
ZX_ASSERT(sync_completion_wait(&on_delete, ZX_TIME_INFINITE) == ZX_OK);
return channel;
}
void AsyncBinding::OnDispatchError(zx_status_t error) {
ZX_ASSERT(error != ZX_OK);
if (error == ZX_ERR_CANCELED) {
OnUnbind(ZX_OK, UnboundReason::kUnbind);
return;
}
OnUnbind(error, UnboundReason::kInternalError);
}
std::shared_ptr<AsyncBinding> AsyncBinding::CreateServerBinding(
async_dispatcher_t* dispatcher, zx::channel channel, void* impl,
TypeErasedServerDispatchFn dispatch_fn, TypeErasedOnUnboundFn on_unbound_fn) {
auto ret = std::shared_ptr<AsyncBinding>(new AsyncBinding(
dispatcher, std::move(channel), impl, true, std::move(on_unbound_fn),
[dispatch_fn](std::shared_ptr<AsyncBinding>& binding, fidl_msg_t* msg,
bool* binding_released, zx_status_t* status) {
auto hdr = reinterpret_cast<fidl_message_header_t*>(msg->bytes);
AsyncTransaction txn(hdr->txid, dispatch_fn, binding_released, status);
txn.Dispatch(std::move(binding), msg);
}));
ret->keep_alive_ = ret; // We keep the binding alive until somebody decides to close the channel.
return ret;
}
std::shared_ptr<AsyncBinding> AsyncBinding::CreateClientBinding(
async_dispatcher_t* dispatcher, zx::channel channel, TypeErasedOnUnboundFn on_unbound_fn) {
auto ret = std::shared_ptr<AsyncBinding>(new AsyncBinding(
dispatcher, std::move(channel), nullptr, false, std::move(on_unbound_fn), nullptr));
ret->keep_alive_ = ret; // Keep the binding alive until an unbind operation or channel error.
return ret;
}
} // namespace internal
} // namespace fidl