blob: df707c38edc28d2c71197c865729b24d0f537d44 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "rx_queue.h"
#include <zircon/assert.h>
#include "device_interface.h"
#include "log.h"
namespace network::internal {
namespace {
constexpr uint64_t kTriggerRxKey = 1;
constexpr uint64_t kSessionSwitchKey = 2;
constexpr uint64_t kFifoWatchKey = 3;
constexpr uint64_t kQuitWatchKey = 4;
} // namespace
RxQueue::~RxQueue() {
// We tie rx_watch_port_ to the lifetime of the watch thread, and dispose of it alongside the
// thread in `RxQueue::JoinThread`. This assertion protects us from destruction paths where
// `RxQueue::JoinThread` is not called.
"RxQueue destroyed without disposing of port and thread first.");
zx_status_t RxQueue::Create(DeviceInterface* parent, std::unique_ptr<RxQueue>* out) {
fbl::AllocChecker ac;
std::unique_ptr<RxQueue> queue(new (&ac) RxQueue(parent));
if (!ac.check()) {
fbl::AutoLock lock(&queue->lock_);
// The RxQueue's capacity is the device's FIFO rx depth as opposed to the hardware's queue depth
// so we can (possibly) reduce the amount of reads on the rx fifo during rx interrupts.
auto capacity = parent->rx_fifo_depth();
zx_status_t status;
if ((status = IndexedSlab<InFlightBuffer>::Create(capacity, &queue->in_flight_)) != ZX_OK) {
return status;
if ((status = RingQueue<uint32_t>::Create(capacity, &queue->available_queue_)) != ZX_OK) {
return status;
auto device_depth = parent->info().rx_depth;
queue->space_buffers_.reset(new (&ac) rx_space_buffer_t[device_depth]);
if (!ac.check()) {
queue->buffer_parts_.reset(new (&ac) BufferParts[device_depth]);
if (!ac.check()) {
for (uint32_t i = 0; i < device_depth; i++) {
queue->space_buffers_[i].data.parts_list = queue->buffer_parts_[i].data();
if ((status = zx::port::create(0, &queue->rx_watch_port_)) != ZX_OK) {
LOGF_ERROR("network-device: failed to create rx watch port: %s", zx_status_get_string(status));
return status;
thrd_t watch_thread;
if (thrd_create_with_name(
&watch_thread, [](void* ctx) { return reinterpret_cast<RxQueue*>(ctx)->WatchThread(); },
reinterpret_cast<void*>(queue.get()), "netdevice:rx_watch") != thrd_success) {
LOG_ERROR("network-device: rx queue failed to create thread");
queue->rx_watch_thread_ = watch_thread;
*out = std::move(queue);
return ZX_OK;
void RxQueue::TriggerRxWatch() {
zx_port_packet_t packet;
packet.type = ZX_PKT_TYPE_USER;
packet.key = kTriggerRxKey;
packet.status = ZX_OK;
zx_status_t status = rx_watch_port_.queue(&packet);
if (status != ZX_OK) {
LOGF_ERROR("network-device: TriggerRxWatch failed: %s", zx_status_get_string(status));
void RxQueue::TriggerSessionChanged() {
zx_port_packet_t packet;
packet.type = ZX_PKT_TYPE_USER;
packet.key = kSessionSwitchKey;
packet.status = ZX_OK;
zx_status_t status = rx_watch_port_.queue(&packet);
if (status != ZX_OK) {
LOGF_ERROR("network-device: TriggerSessionChanged failed: %s", zx_status_get_string(status));
void RxQueue::JoinThread() {
if (rx_watch_thread_.has_value()) {
zx_port_packet_t packet;
packet.type = ZX_PKT_TYPE_USER;
packet.key = kQuitWatchKey;
zx_status_t status = rx_watch_port_.queue(&packet);
if (status != ZX_OK) {
LOGF_ERROR("network-device: RxQueue::JoinThread failed to send quit key: %s",
thrd_join(*rx_watch_thread_, nullptr);
// Dispose of the port and the thread handle.
void RxQueue::PurgeSession(Session* session) {
fbl::AutoLock lock(&lock_);
// Get rid of all available buffers that belong to the session and stop its rx path.
for (auto nu = available_queue_->count(); nu > 0; nu--) {
auto b = available_queue_->Pop();
if (in_flight_->Get(b).session == session) {
} else {
// Push back to the end of the queue.
void RxQueue::Reclaim() {
for (auto i = in_flight_->begin(); i != in_flight_->end(); ++i) {
auto& buff = in_flight_->Get(*i);
// reclaim the buffer if the device has it
if (buff.flags & kDeviceHasBuffer) {
void RxQueue::ReclaimBuffer(uint32_t id) {
auto& buff = in_flight_->Get(id);
if (buff.session->RxReturned()) {
buff.flags &= static_cast<uint16_t>(~kDeviceHasBuffer);
} else {
std::tuple<RxQueue::InFlightBuffer*, uint32_t> RxQueue::GetBuffer() {
if (available_queue_->count() != 0) {
auto idx = available_queue_->Pop();
return std::make_tuple(&in_flight_->Get(idx), idx);
// need to fetch more from the session
if (in_flight_->available() == 0) {
// no more space to keep in flight buffers
LOG_ERROR("network-device: can't fit more in-flight buffers");
return std::make_tuple(nullptr, 0);
SessionTransaction transaction(this);
if (parent_->LoadRxDescriptors(&transaction) != ZX_OK) {
// failed to load descriptors from session.
return std::make_tuple(nullptr, 0);
// LoadRxDescriptors can't return OK if it couldn't load any descriptors.
auto idx = available_queue_->Pop();
return std::make_tuple(&in_flight_->Get(idx), idx);
zx_status_t RxQueue::PrepareBuff(rx_space_buffer_t* buff) {
auto res = GetBuffer();
auto session_buffer = std::get<0>(res);
auto index = std::get<1>(res);
if (session_buffer == nullptr) {
zx_status_t status;
buff->id = index;
if ((status = session_buffer->session->FillRxSpace(session_buffer->descriptor_index, buff)) !=
ZX_OK) {
// if the session can't fill Rx for any reason, kill it.
// put the index back at the end of the available queue
return status;
// mark that this buffer belongs to implementation.
session_buffer->flags |= kDeviceHasBuffer;
return ZX_OK;
void RxQueue::CompleteRxList(const rx_buffer_t* rx, size_t count) {
fbl::AutoLock lock(&lock_);
device_buffer_count_ -= count;
while (count--) {
auto& session_buffer = in_flight_->Get(rx->id);
// mark that the buffer does not belong to device anymore:
session_buffer.flags &= static_cast<uint16_t>(~kDeviceHasBuffer);
if (session_buffer.session->CompleteRx(session_buffer.descriptor_index, rx)) {
// we can reuse the descriptor
} else {
if (device_buffer_count_ <= parent_->rx_notify_threshold()) {
void RxQueue::ReturnBuffers() { parent_->CommitAllSessions(); }
int RxQueue::WatchThread() {
auto loop = [this]() -> zx_status_t {
fbl::RefPtr<RefCountedFifo> observed_fifo(nullptr);
bool waiting_on_fifo = false;
for (;;) {
zx_port_packet_t packet;
zx_status_t status;
bool fifo_readable = false;
if ((status = rx_watch_port_.wait(zx::time::infinite(), &packet)) != ZX_OK) {
LOGF_ERROR("RxQueue::WatchThread port wait failed %s", zx_status_get_string(status));
return status;
switch (packet.key) {
case kQuitWatchKey:
LOG_TRACE("RxQueue::WatchThread got quit key");
return ZX_OK;
case kSessionSwitchKey:
if (observed_fifo && waiting_on_fifo) {
status = rx_watch_port_.cancel(observed_fifo->fifo, kFifoWatchKey);
if (status != ZX_OK) {
LOGF_ERROR("RxQueue::WatchThread port cancel failed %s",
return status;
observed_fifo = parent_->primary_rx_fifo();
LOGF_TRACE("RxQueue primary FIFO changed, valid=%d", static_cast<bool>(observed_fifo));
case kFifoWatchKey:
if ((packet.signal.observed & ZX_FIFO_PEER_CLOSED) || packet.status != ZX_OK) {
// If observing the FIFO fails, we're assuming that the sesions is being closed. We're
// just going to dispose of our reference to the observed FIFO and wait for
// `DeviceInterface` to signal us that a new primary session is available when that
// happens.
LOGF_TRACE("RxQueue fifo closed or bad status %s", zx_status_get_string(packet.status));
} else {
fifo_readable = true;
waiting_on_fifo = false;
ZX_ASSERT_MSG(packet.key == kTriggerRxKey, "Unrecognized packet in rx queue");
size_t pushed = 0;
bool should_wait_on_fifo;
fbl::AutoLock lock(&lock_);
size_t push_count = parent_->info().rx_depth - device_buffer_count_;
if (parent_->IsDataPlaneOpen()) {
for (; pushed < push_count; pushed++) {
if (PrepareBuff(&space_buffers_[pushed]) != ZX_OK) {
if (fifo_readable && push_count == 0 && in_flight_->available()) {
SessionTransaction transaction(this);
// We only need to wait on the FIFO if we didn't get enough buffers.
// Otherwise, we'll trigger the loop again once the device calls CompleteRx.
should_wait_on_fifo = device_buffer_count_ < parent_->info().rx_depth;
// We release the main rx queue lock before calling into the parent device,
// so we don't cause a re-entrant deadlock.
// We keep the buffers_lock_ since those are tied to our lifecycle.
if (pushed != 0) {
parent_->QueueRxSpace(space_buffers_.get(), static_cast<uint32_t>(pushed));
// No point waiting in RX fifo if we filled the device buffers, we'll get a signal to wait
// on the fifo later.
if (should_wait_on_fifo) {
if (!observed_fifo) {
// This can happen if we get triggered to fetch more buffers, but the primary session is
// already tearing down, it's fine to just proceed.
LOG_TRACE("RxQueue::WatchThread Should wait but no FIFO is here");
} else if (!waiting_on_fifo) {
status = observed_fifo->fifo.wait_async(rx_watch_port_, kFifoWatchKey,
if (status == ZX_OK) {
waiting_on_fifo = true;
} else {
LOGF_ERROR("RxQueue::WatchThread wait_async failed: %s", zx_status_get_string(status));
return status;
zx_status_t status = loop();
if (status != ZX_OK) {
LOGF_ERROR("network-device: RxQueue::WatchThread finished loop with error: %s",
LOG_TRACE("network-device: watch thread done");
return 0;
uint32_t RxQueue::SessionTransaction::remaining() __TA_REQUIRES(queue_->lock_) {
// NB: __TA_REQUIRES here is just encoding that a SessionTransaction always holds a lock for
// its parent queue, the protection from misuse comes from the annotations on
// `SessionTransaction`'s constructor and destructor.
return queue_->in_flight_->available();
void RxQueue::SessionTransaction::Push(Session* session, uint16_t descriptor)
__TA_REQUIRES(queue_->lock_) {
// NB: __TA_REQUIRES here is just encoding that a SessionTransaction always holds a lock for
// its parent queue, the protection from misuse comes from the annotations on
// `SessionTransaction`'s constructor and destructor.
uint32_t idx = queue_->in_flight_->Push(InFlightBuffer(session, descriptor));
} // namespace network::internal