blob: 64508a1a099215db4f94ce983d6693be501c6304 [file] [log] [blame]
// Copyright 2013 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "lib/fidl/cpp/bindings/internal/connector.h"
#include <async/default.h>
#include <zx/time.h>
#include "lib/fxl/compiler_specific.h"
#include "lib/fxl/logging.h"
#include "lib/fxl/macros.h"
namespace fidl {
namespace internal {
namespace {
constexpr zx_signals_t kSignals = ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED;
} // namespace
// ----------------------------------------------------------------------------
Connector::Connector(zx::channel channel)
: channel_(std::move(channel)),
wait_(async_get_default(), channel_.get(), kSignals,
ASYNC_FLAG_HANDLE_SHUTDOWN),
incoming_receiver_(nullptr),
error_(false),
drop_writes_(false),
enforce_errors_from_incoming_receiver_(true),
destroyed_flag_(nullptr) {
wait_.set_handler(fbl::BindMember(this, &Connector::OnHandleReady));
// Even though we don't have an incoming receiver, we still want to monitor
// the channel to know if is closed or encounters an error.
zx_status_t status = wait_.Begin();
FXL_CHECK(status == ZX_OK);
}
Connector::~Connector() {
if (destroyed_flag_)
*destroyed_flag_ = true;
}
void Connector::CloseChannel() {
wait_.Cancel();
wait_.set_object(ZX_HANDLE_INVALID);
channel_.reset();
}
zx::channel Connector::PassChannel() {
wait_.Cancel();
wait_.set_object(ZX_HANDLE_INVALID);
return std::move(channel_);
}
bool Connector::WaitForIncomingMessage(fxl::TimeDelta timeout) {
if (error_)
return false;
zx_signals_t pending = ZX_SIGNAL_NONE;
zx_status_t rv = channel_.wait_one(kSignals,
timeout == fxl::TimeDelta::Max()
? ZX_TIME_INFINITE
: zx::deadline_after(timeout.ToNanoseconds()),
&pending);
if (rv == ZX_ERR_SHOULD_WAIT || rv == ZX_ERR_TIMED_OUT)
return false;
if (rv != ZX_OK) {
NotifyError();
return false;
}
if (pending & ZX_CHANNEL_READABLE) {
bool ok = ReadSingleMessage(&rv);
FXL_ALLOW_UNUSED_LOCAL(ok);
return (rv == ZX_OK);
}
FXL_DCHECK(pending & ZX_CHANNEL_PEER_CLOSED);
NotifyError();
return false;
}
bool Connector::Accept(Message* message) {
if (error_)
return false;
FXL_CHECK(channel_);
if (drop_writes_)
return true;
zx_status_t status = WriteMessage(channel_, message);
if (status == ZX_OK)
return true;
if (status == ZX_ERR_BAD_STATE) {
// There's no point in continuing to write to this channel since the other
// end is gone. Avoid writing any future messages. Hide write failures
// from the caller since we'd like them to continue consuming any backlog
// of incoming messages before regarding the channel as closed.
drop_writes_ = true;
return true;
}
// This particular write was rejected, presumably because of bad input.
// The channel is not necessarily in a bad state.
return false;
}
async_wait_result_t Connector::OnHandleReady(
async_t* async, zx_status_t status, const zx_packet_signal_t* signal) {
if (status != ZX_OK) {
NotifyError();
return ASYNC_WAIT_FINISHED;
}
FXL_DCHECK(!error_);
if (signal->observed & ZX_CHANNEL_READABLE) {
// Return immediately if |this| was destroyed. Do not touch any members!
zx_status_t rv;
for (uint64_t i = 0; i < signal->count; i++) {
if (!ReadSingleMessage(&rv))
return ASYNC_WAIT_FINISHED;
// If we get ZX_ERR_PEER_CLOSED (or another error), we'll already have
// notified the error and likely been destroyed.
FXL_DCHECK(rv == ZX_OK || rv == ZX_ERR_SHOULD_WAIT);
if (rv != ZX_OK)
break;
}
return channel_ ? ASYNC_WAIT_AGAIN : ASYNC_WAIT_FINISHED;
}
FXL_DCHECK(signal->observed & ZX_CHANNEL_PEER_CLOSED);
// Notice that we don't notify an error until we've drained all the messages
// out of the channel.
NotifyError();
return ASYNC_WAIT_FINISHED;
}
bool Connector::ReadSingleMessage(zx_status_t* read_result) {
bool receiver_result = false;
// Detect if |this| was destroyed during message dispatch. Allow for the
// possibility of re-entering ReadMore() through message dispatch.
bool was_destroyed_during_dispatch = false;
bool* previous_destroyed_flag = destroyed_flag_;
destroyed_flag_ = &was_destroyed_during_dispatch;
zx_status_t rv =
ReadAndDispatchMessage(channel_, incoming_receiver_, &receiver_result);
if (read_result)
*read_result = rv;
if (was_destroyed_during_dispatch) {
if (previous_destroyed_flag)
*previous_destroyed_flag = true; // Propagate flag.
return false;
}
destroyed_flag_ = previous_destroyed_flag;
if (rv == ZX_ERR_SHOULD_WAIT)
return true;
if (rv != ZX_OK ||
(enforce_errors_from_incoming_receiver_ && !receiver_result)) {
NotifyError();
return false;
}
return true;
}
void Connector::NotifyError() {
error_ = true;
CloseChannel();
if (connection_error_handler_)
connection_error_handler_();
}
} // namespace internal
} // namespace fidl