| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "socket_channel_relay.h" |
| |
| #include <utility> |
| |
| #include <lib/async/default.h> |
| #include <zircon/assert.h> |
| |
| #include "garnet/drivers/bluetooth/lib/common/byte_buffer.h" |
| #include "garnet/drivers/bluetooth/lib/common/log.h" |
| |
| namespace btlib::data::internal { |
| |
| template <typename ChannelT, typename RxDataT> |
| SocketChannelRelay<ChannelT, RxDataT>::SocketChannelRelay( |
| zx::socket socket, fbl::RefPtr<ChannelT> channel, |
| DeactivationCallback deactivation_cb) |
| : state_(RelayState::kActivating), |
| socket_(std::move(socket)), |
| channel_(channel), |
| dispatcher_(async_get_default_dispatcher()), |
| deactivation_cb_(std::move(deactivation_cb)), |
| weak_ptr_factory_(this) { |
| ZX_DEBUG_ASSERT(dispatcher_); |
| ZX_DEBUG_ASSERT(socket_); |
| ZX_DEBUG_ASSERT(channel_); |
| |
| // Note: binding |this| is safe, as BindWait() wraps the bound method inside |
| // of a lambda which verifies that |this| hasn't been destroyed. |
| BindWait(ZX_SOCKET_READABLE, "socket read waiter", &sock_read_waiter_, |
| fit::bind_member(this, &SocketChannelRelay::OnSocketReadable)); |
| BindWait(ZX_SOCKET_WRITE_THRESHOLD, "socket write waiter", |
| &sock_write_waiter_, |
| fit::bind_member(this, &SocketChannelRelay::OnSocketWritable)); |
| BindWait(ZX_SOCKET_PEER_CLOSED, "socket close waiter", &sock_close_waiter_, |
| fit::bind_member(this, &SocketChannelRelay::OnSocketClosed)); |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| SocketChannelRelay<ChannelT, RxDataT>::~SocketChannelRelay() { |
| ZX_DEBUG_ASSERT(thread_checker_.IsCreationThreadCurrent()); |
| |
| if (state_ != RelayState::kDeactivated) { |
| bt_log(DEBUG, "l2cap", |
| "Deactivating relay for channel %u in dtor; will require Channel's " |
| "mutex", |
| channel_->id()); |
| Deactivate(); |
| } |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| bool SocketChannelRelay<ChannelT, RxDataT>::Activate() { |
| ZX_DEBUG_ASSERT(state_ == RelayState::kActivating); |
| |
| // Note: we assume that BeginWait() does not synchronously dispatch any |
| // events. The wait handler will assert otherwise. |
| if (!BeginWait("socket close waiter", &sock_close_waiter_)) { |
| // Perhaps |dispatcher| is already stopped. |
| return false; |
| } |
| |
| if (!BeginWait("socket read waiter", &sock_read_waiter_)) { |
| // Perhaps |dispatcher| is already stopped. |
| return false; |
| } |
| |
| const auto self = weak_ptr_factory_.GetWeakPtr(); |
| const auto channel_id = channel_->id(); |
| const bool activate_success = channel_->Activate( |
| [self, channel_id](RxDataT rx_data) { |
| // Note: this lambda _may_ be invoked synchronously. |
| if (self) { |
| self->OnChannelDataReceived(std::move(rx_data)); |
| } else { |
| bt_log(DEBUG, "l2cap", |
| "Ignoring data received on destroyed relay (channel_id=%#.4x)", |
| channel_id); |
| } |
| }, |
| [self, channel_id] { |
| if (self) { |
| self->OnChannelClosed(); |
| } else { |
| bt_log( |
| DEBUG, "l2cap", |
| "Ignoring channel closure on destroyed relay (channel_id=%#.4x)", |
| channel_id); |
| } |
| }, |
| dispatcher_); |
| if (!activate_success) { |
| return false; |
| } |
| |
| state_ = RelayState::kActivated; |
| return true; |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| void SocketChannelRelay<ChannelT, RxDataT>::Deactivate() { |
| ZX_DEBUG_ASSERT(state_ != RelayState::kDeactivated); |
| |
| state_ = RelayState::kDeactivating; |
| if (!socket_write_queue_.empty()) { |
| bt_log(TRACE, "l2cap", |
| "Dropping %zu packets from channel %u due to channel closure", |
| socket_write_queue_.size(), channel_->id()); |
| socket_write_queue_.clear(); |
| } |
| channel_->Deactivate(); |
| |
| // We assume that UnbindAndCancelWait() will not trigger a re-entrant call |
| // into Deactivate(). And the RelayIsDestroyedWhenDispatcherIsShutDown test |
| // verifies that to be the case. (If we had re-entrant calls, a |
| // ZX_DEBUG_ASSERT() in the lambda bound by BindWait() would cause an abort.) |
| UnbindAndCancelWait(&sock_read_waiter_); |
| UnbindAndCancelWait(&sock_write_waiter_); |
| UnbindAndCancelWait(&sock_close_waiter_); |
| socket_.reset(); |
| |
| // Any further callbacks are bugs. Update state_, to help us detect |
| // those bugs. |
| state_ = RelayState::kDeactivated; |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| void SocketChannelRelay<ChannelT, RxDataT>::DeactivateAndRequestDestruction() { |
| Deactivate(); |
| if (deactivation_cb_) { |
| // NOTE: deactivation_cb_ is expected to destroy |this|. Since |this| |
| // owns deactivation_cb_, we move() deactivation_cb_ outside of |this| |
| // before invoking the callback. |
| auto moved_deactivation_cb = std::move(deactivation_cb_); |
| moved_deactivation_cb(); |
| } |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| void SocketChannelRelay<ChannelT, RxDataT>::OnSocketReadable( |
| zx_status_t status) { |
| ZX_DEBUG_ASSERT(state_ == RelayState::kActivated); |
| if (!CopyFromSocketToChannel() || |
| !BeginWait("socket read waiter", &sock_read_waiter_)) { |
| DeactivateAndRequestDestruction(); |
| } |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| void SocketChannelRelay<ChannelT, RxDataT>::OnSocketWritable( |
| zx_status_t status) { |
| ZX_DEBUG_ASSERT(state_ == RelayState::kActivated); |
| ZX_DEBUG_ASSERT(!socket_write_queue_.empty()); |
| ServiceSocketWriteQueue(); |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| void SocketChannelRelay<ChannelT, RxDataT>::OnSocketClosed(zx_status_t status) { |
| ZX_DEBUG_ASSERT(state_ == RelayState::kActivated); |
| DeactivateAndRequestDestruction(); |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| void SocketChannelRelay<ChannelT, RxDataT>::OnChannelDataReceived( |
| RxDataT rx_data) { |
| ZX_DEBUG_ASSERT(thread_checker_.IsCreationThreadCurrent()); |
| // Note: kActivating is deliberately permitted, as ChannelImpl::Activate() |
| // will synchronously deliver any queued frames. |
| ZX_DEBUG_ASSERT(state_ != RelayState::kDeactivated); |
| |
| if (state_ == RelayState::kDeactivating) { |
| bt_log(TRACE, "l2cap", |
| "Ignorning %s on socket for channel %u while deactivating", __func__, |
| channel_->id()); |
| return; |
| } |
| |
| socket_write_queue_.push_back(std::move(rx_data)); |
| ServiceSocketWriteQueue(); |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| void SocketChannelRelay<ChannelT, RxDataT>::OnChannelClosed() { |
| ZX_DEBUG_ASSERT(thread_checker_.IsCreationThreadCurrent()); |
| ZX_DEBUG_ASSERT(state_ != RelayState::kActivating); |
| ZX_DEBUG_ASSERT(state_ != RelayState::kDeactivated); |
| |
| if (state_ == RelayState::kDeactivating) { |
| bt_log(TRACE, "l2cap", |
| "Ignorning %s on socket for channel %u while deactivating", __func__, |
| channel_->id()); |
| return; |
| } |
| |
| ZX_DEBUG_ASSERT(state_ == RelayState::kActivated); |
| if (!socket_write_queue_.empty()) { |
| ServiceSocketWriteQueue(); |
| } |
| DeactivateAndRequestDestruction(); |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| bool SocketChannelRelay<ChannelT, RxDataT>::CopyFromSocketToChannel() { |
| // Subtle: we make the read buffer larger than the TX MTU, so that we can |
| // detect truncated datagrams. |
| const size_t read_buf_size = channel_->tx_mtu() + 1; |
| |
| // TODO(NET-1390): Consider yielding occasionally. As-is, we run the risk of |
| // starving other SocketChannelRelays on the same |dispatcher| (and anyone |
| // else on |dispatcher|), if a misbehaving process spams its zx::socket. And |
| // even if starvation isn't an issue, latency/jitter might be. |
| zx_status_t read_res; |
| uint8_t read_buf[read_buf_size]; |
| do { |
| size_t n_bytes_read = 0; |
| read_res = socket_.read(0, read_buf, read_buf_size, &n_bytes_read); |
| ZX_DEBUG_ASSERT_MSG(read_res == ZX_OK || read_res == ZX_ERR_SHOULD_WAIT || |
| read_res == ZX_ERR_PEER_CLOSED, |
| "%s", zx_status_get_string(read_res)); |
| ZX_DEBUG_ASSERT_MSG(n_bytes_read <= read_buf_size, |
| "(n_bytes_read=%zu, read_buf_size=%zu)", n_bytes_read, |
| read_buf_size); |
| if (read_res == ZX_ERR_SHOULD_WAIT) { |
| bt_log(DEBUG, "l2cap", "Failed to read from socket for channel %u: %s", |
| channel_->id(), zx_status_get_string(read_res)); |
| return true; |
| } |
| |
| if (read_res == ZX_ERR_PEER_CLOSED) { |
| bt_log(DEBUG, "l2cap", "Failed to read from socket for channel %u: %s", |
| channel_->id(), zx_status_get_string(read_res)); |
| return false; |
| } |
| |
| ZX_DEBUG_ASSERT(n_bytes_read > 0); |
| if (n_bytes_read > channel_->tx_mtu()) { |
| return false; |
| } |
| |
| // TODO(NET-1391): For low latency and low jitter, IWBN to avoid allocating |
| // dynamic memory on every read. |
| bool write_success = |
| channel_->Send(std::make_unique<common::DynamicByteBuffer>( |
| common::BufferView(read_buf, n_bytes_read))); |
| if (!write_success) { |
| bt_log(DEBUG, "l2cap", "Failed to write %zu bytes to channel %u", |
| n_bytes_read, channel_->id()); |
| } |
| } while (read_res == ZX_OK); |
| |
| return true; |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| void SocketChannelRelay<ChannelT, RxDataT>::ServiceSocketWriteQueue() { |
| // TODO(NET-1477): Similarly to CopyFromSocketToChannel(), we may want to |
| // consider yielding occasionally. The data-rate from the Channel into the |
| // socket write queue should be bounded by PHY layer data rates, which are |
| // much lower than the CPU's data processing throughput, so starvation |
| // shouldn't be an issue. However, latency might be. |
| zx_status_t write_res; |
| do { |
| ZX_DEBUG_ASSERT(!socket_write_queue_.empty()); |
| ZX_DEBUG_ASSERT(ValidateRxData(socket_write_queue_.front())); |
| ZX_DEBUG_ASSERT(GetRxDataLen(socket_write_queue_.front())); |
| |
| const RxDataT& rx_data = socket_write_queue_.front(); |
| const bool copy_success = InvokeWithRxData( |
| [&](const common::ByteBuffer& rx_bytes) { |
| size_t n_bytes_written = 0; |
| write_res = socket_.write(0, rx_bytes.data(), rx_bytes.size(), |
| &n_bytes_written); |
| ZX_DEBUG_ASSERT_MSG(write_res == ZX_OK || |
| write_res == ZX_ERR_SHOULD_WAIT || |
| write_res == ZX_ERR_PEER_CLOSED, |
| "%s", zx_status_get_string(write_res)); |
| if (write_res != ZX_OK) { |
| ZX_DEBUG_ASSERT(n_bytes_written == 0); |
| bt_log(SPEW, "l2cap", |
| "Failed to write %zu bytes to socket for channel %u: %s", |
| rx_bytes.size(), channel_->id(), |
| zx_status_get_string(write_res)); |
| return; |
| } |
| ZX_DEBUG_ASSERT_MSG(n_bytes_written == rx_bytes.size(), |
| "(n_bytes_written=%zu, rx_bytes.size()=%zu)", |
| n_bytes_written, rx_bytes.size()); |
| socket_write_queue_.pop_front(); |
| }, |
| rx_data); |
| ZX_DEBUG_ASSERT(copy_success); |
| } while (write_res == ZX_OK && !socket_write_queue_.empty()); |
| |
| if (!socket_write_queue_.empty() && write_res == ZX_ERR_SHOULD_WAIT) { |
| // Since we hava data to write, we want to be woken when the socket has free |
| // space in its buffer. And, to avoid spinning, we want to be woken only |
| // when the free space is large enough for our first pending RxDataT. |
| // |
| // Note: it is safe to leave TX_THRESHOLD set, even when our queue is empty, |
| // because we will only be woken if we also have an active Wait for |
| // ZX_SOCKET_WRITE_THRESHOLD, and Waits are one-shot. |
| const size_t rx_data_len = GetRxDataLen(socket_write_queue_.front()); |
| const auto prop_set_res = socket_.set_property( |
| ZX_PROP_SOCKET_TX_THRESHOLD, &rx_data_len, sizeof(rx_data_len)); |
| switch (prop_set_res) { |
| case ZX_OK: |
| if (!BeginWait("socket write waiter", &sock_write_waiter_)) { |
| DeactivateAndRequestDestruction(); |
| } |
| break; |
| case ZX_ERR_PEER_CLOSED: |
| // Peer closed the socket after the while loop above. Nothing to do |
| // here, as closure event will be handled by OnSocketClosed(). |
| break; |
| default: |
| ZX_PANIC("Unexpected zx_object_set_property() result: %s", |
| zx_status_get_string(prop_set_res)); |
| break; |
| } |
| } |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| void SocketChannelRelay<ChannelT, RxDataT>::BindWait( |
| zx_signals_t trigger, const char* wait_name, async::Wait* wait, |
| fit::function<void(zx_status_t)> handler) { |
| wait->set_object(socket_.get()); |
| wait->set_trigger(trigger); |
| wait->set_handler([self = weak_ptr_factory_.GetWeakPtr(), |
| channel_id = channel_->id(), wait_name, |
| expected_wait = wait, handler = std::move(handler)]( |
| async_dispatcher_t* actual_dispatcher, |
| async::WaitBase* actual_wait, zx_status_t status, |
| const zx_packet_signal_t* signal) { |
| ZX_DEBUG_ASSERT_MSG(self, "(%s, channel_id=%u)", wait_name, channel_id); |
| ZX_DEBUG_ASSERT_MSG(actual_dispatcher == self->dispatcher_, |
| "(%s, channel_id=%u)", wait_name, channel_id); |
| ZX_DEBUG_ASSERT_MSG(actual_wait == expected_wait, "(%s, channel_id=%u)", |
| wait_name, channel_id); |
| ZX_DEBUG_ASSERT_MSG(status == ZX_OK || status == ZX_ERR_CANCELED, |
| "(%s, channel_id=%u)", wait_name, channel_id); |
| |
| if (status == ZX_ERR_CANCELED) { // Dispatcher is shutting down. |
| bt_log(TRACE, "l2cap", "%s canceled on socket for channel %u", wait_name, |
| channel_id); |
| self->DeactivateAndRequestDestruction(); |
| return; |
| } |
| |
| ZX_DEBUG_ASSERT_MSG(signal, "(%s, channel_id=%u)", wait_name, channel_id); |
| ZX_DEBUG_ASSERT_MSG(signal->trigger == expected_wait->trigger(), |
| "(%s, channel_id=%u)", wait_name, channel_id); |
| ZX_DEBUG_ASSERT_MSG(self->thread_checker_.IsCreationThreadCurrent(), |
| "(%s, channel_id=%u)", wait_name, channel_id); |
| ZX_DEBUG_ASSERT_MSG(self->state_ != RelayState::kActivating, |
| "(%s, channel_id=%u)", wait_name, channel_id); |
| ZX_DEBUG_ASSERT_MSG(self->state_ != RelayState::kDeactivated, |
| "(%s, channel_id=%u)", wait_name, channel_id); |
| |
| if (self->state_ == RelayState::kDeactivating) { |
| bt_log(TRACE, "l2cap", |
| "Ignorning %s on socket for channel %u while deactivating", |
| wait_name, channel_id); |
| return; |
| } |
| handler(status); |
| }); |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| bool SocketChannelRelay<ChannelT, RxDataT>::BeginWait(const char* wait_name, |
| async::Wait* wait) { |
| ZX_DEBUG_ASSERT(state_ != RelayState::kDeactivating); |
| ZX_DEBUG_ASSERT(state_ != RelayState::kDeactivated); |
| |
| if (wait->is_pending()) { |
| return true; |
| } |
| |
| zx_status_t wait_res = wait->Begin(dispatcher_); |
| ZX_DEBUG_ASSERT(wait_res == ZX_OK || wait_res == ZX_ERR_BAD_STATE); |
| |
| if (wait_res != ZX_OK) { |
| bt_log(ERROR, "l2cap", "Failed to enable waiting on %s: ", wait_name, |
| zx_status_get_string(wait_res)); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| template <typename ChannelT, typename RxDataT> |
| void SocketChannelRelay<ChannelT, RxDataT>::UnbindAndCancelWait( |
| async::Wait* wait) { |
| ZX_DEBUG_ASSERT(state_ != RelayState::kActivating); |
| ZX_DEBUG_ASSERT(state_ != RelayState::kDeactivated); |
| zx_status_t cancel_res; |
| wait->set_handler(nullptr); |
| cancel_res = wait->Cancel(); |
| ZX_DEBUG_ASSERT_MSG(cancel_res == ZX_OK || cancel_res == ZX_ERR_NOT_FOUND, |
| "Cancel failed: %s", zx_status_get_string(cancel_res)); |
| } |
| |
| } // namespace btlib::data::internal |