blob: dadd7e0cad06f8baede5e22d2523cc8cdfafc703 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "rx_queue.h"
#include <lib/async/cpp/task.h>
#include <lib/fdf/cpp/env.h>
#include <zircon/assert.h>
#include "log.h"
#include "session.h"
namespace network::internal {
constexpr char kRxSchedulerRole[] = "fuchsia.devices.network.core.rx";
std::atomic<uint32_t> RxQueue::num_instances_ = 0;
RxQueue::~RxQueue() {
// running_ is tied to the lifetime of the watch thread, it's cleared in`RxQueue::JoinThread`.
// This assertion protects us from destruction paths where `RxQueue::JoinThread` is not called.
ZX_ASSERT_MSG(!running_, "RxQueue destroyed without disposing of port and thread first.");
}
zx::result<std::unique_ptr<RxQueue>> RxQueue::Create(DeviceInterface* parent) {
fbl::AllocChecker ac;
std::unique_ptr<RxQueue> queue(new (&ac) RxQueue(parent));
if (!ac.check()) {
return zx::error(ZX_ERR_NO_MEMORY);
}
fbl::AutoLock lock(&queue->parent_->rx_lock());
// The RxQueue's capacity is the device's FIFO rx depth as opposed to the hardware's queue depth
// so we can (possibly) reduce the amount of reads on the rx fifo during rx interrupts.
auto capacity = parent->rx_fifo_depth();
zx::result available_queue = RingQueue<uint32_t>::Create(capacity);
if (available_queue.is_error()) {
return available_queue.take_error();
}
queue->available_queue_ = std::move(available_queue.value());
zx::result in_flight = IndexedSlab<InFlightBuffer>::Create(capacity);
if (in_flight.is_error()) {
return in_flight.take_error();
}
queue->in_flight_ = std::move(in_flight.value());
auto device_depth = parent->info().rx_depth().value_or(0);
std::unique_ptr<fuchsia_hardware_network_driver::wire::RxSpaceBuffer[]> buffers(
new (&ac) fuchsia_hardware_network_driver::wire::RxSpaceBuffer[device_depth]);
if (!ac.check()) {
return zx::error(ZX_ERR_NO_MEMORY);
}
if (zx_status_t status = zx::port::create(0, &queue->rx_watch_port_); status != ZX_OK) {
LOGF_ERROR("failed to create rx watch port: %s", zx_status_get_string(status));
return zx::error(status);
}
// Make sure the driver framework allows for the creation of the necessary threads. Keep track of
// the number of RX queue instances globally.
uint32_t instances = ++num_instances_;
if (zx_status_t status =
fdf_env_set_thread_limit(kRxSchedulerRole, strlen(kRxSchedulerRole), instances);
status != ZX_OK && status != ZX_ERR_OUT_OF_RANGE) {
// ZX_ERR_OUT_OF_RANGE indicates that the value is less than the current value. This can happen
// if a number of threads have recently shut down or two RX queues are being created at the same
// time and the loading of the atomic and setting of the limit are interleaved. It's safe to
// ignore that in this context, the important part is that there are enough threads.
LOGF_ERROR("failed to update thread limit: %s", zx_status_get_string(status));
return zx::error(status);
}
// In order to ensure that the async::PostTask below works this has to be a synchronized
// dispatcher that allows synchronous calls. Any other combination of dispatcher and options could
// lead to the async::PostTask call being inlined, meaning the task would run on the calling
// thread, blocking it indefinitely. Use a unique owner to ensure inlining of calls from inside
// the task. Calls to a dispatcher with the same owner might not be inlined.
auto dispatcher = fdf_env::DispatcherBuilder::CreateSynchronizedWithOwner(
queue.get(), fdf::SynchronizedDispatcher::Options::kAllowSyncCalls, "netdevice:rx_watch",
[queue = queue.get()](fdf_dispatcher_t*) { queue->dispatcher_shutdown_.Signal(); },
kRxSchedulerRole);
if (dispatcher.is_error()) {
LOGF_ERROR("rx queue failed to create dispatcher: %s", dispatcher.status_string());
return dispatcher.take_error();
}
queue->dispatcher_ = std::move(dispatcher.value());
async::PostTask(queue->dispatcher_.async_dispatcher(),
[queue = queue.get(), rx_buffers = std::move(buffers)]() mutable {
queue->WatchThread(std::move(rx_buffers));
});
queue->running_ = true;
return zx::ok(std::move(queue));
}
void RxQueue::TriggerRxWatch() {
if (!running_) {
return;
}
zx_port_packet_t packet;
packet.type = ZX_PKT_TYPE_USER;
packet.key = kTriggerRxKey;
packet.status = ZX_OK;
zx_status_t status = rx_watch_port_.queue(&packet);
if (status != ZX_OK) {
LOGF_ERROR("TriggerRxWatch failed: %s", zx_status_get_string(status));
}
}
void RxQueue::TriggerSessionChanged() {
if (!running_) {
return;
}
zx_port_packet_t packet;
packet.type = ZX_PKT_TYPE_USER;
packet.key = kSessionSwitchKey;
packet.status = ZX_OK;
zx_status_t status = rx_watch_port_.queue(&packet);
if (status != ZX_OK) {
LOGF_ERROR("TriggerSessionChanged failed: %s", zx_status_get_string(status));
}
}
void RxQueue::JoinThread() {
if (dispatcher_.get()) {
zx_port_packet_t packet;
packet.type = ZX_PKT_TYPE_USER;
packet.key = kQuitWatchKey;
zx_status_t status = rx_watch_port_.queue(&packet);
if (status != ZX_OK) {
LOGF_ERROR("RxQueue::JoinThread failed to send quit key: %s", zx_status_get_string(status));
}
// Mark the queue as not running anymore.
running_ = false;
dispatcher_.ShutdownAsync();
dispatcher_shutdown_.Wait();
dispatcher_.reset();
--num_instances_;
}
}
void RxQueue::PurgeSession(Session& session) {
fbl::AutoLock lock(&parent_->rx_lock());
// Get rid of all available buffers that belong to the session and stop its rx path.
session.AssertParentRxLock(*parent_);
session.StopRx();
for (auto nu = available_queue_->count(); nu > 0; nu--) {
auto b = available_queue_->Pop();
if (in_flight_->Get(b).session == &session) {
in_flight_->Free(b);
} else {
// Push back to the end of the queue.
available_queue_->Push(b);
}
}
}
std::tuple<RxQueue::InFlightBuffer*, uint32_t> RxQueue::GetBuffer() {
if (available_queue_->count() != 0) {
auto idx = available_queue_->Pop();
return std::make_tuple(&in_flight_->Get(idx), idx);
}
// Need to fetch more from the session.
if (in_flight_->available() == 0) {
// No more space to keep in flight buffers.
LOG_ERROR("can't fit more in-flight buffers");
return std::make_tuple(nullptr, 0);
}
RxSessionTransaction transaction(this);
switch (zx_status_t status = parent_->LoadRxDescriptors(transaction); status) {
case ZX_OK:
break;
default:
LOGF_ERROR("failed to load rx buffer descriptors: %s", zx_status_get_string(status));
__FALLTHROUGH;
case ZX_ERR_PEER_CLOSED: // Primary FIFO closed.
case ZX_ERR_SHOULD_WAIT: // No Rx buffers available in FIFO.
case ZX_ERR_BAD_STATE: // Primary session stopped or paused.
return std::make_tuple(nullptr, 0);
}
// LoadRxDescriptors can't return OK if it couldn't load any descriptors.
auto idx = available_queue_->Pop();
return std::make_tuple(&in_flight_->Get(idx), idx);
}
zx_status_t RxQueue::PrepareBuff(fuchsia_hardware_network_driver::wire::RxSpaceBuffer* buff) {
auto [session_buffer, index] = GetBuffer();
if (session_buffer == nullptr) {
return ZX_ERR_NO_RESOURCES;
}
buff->id = index;
if (zx_status_t status =
session_buffer->session->FillRxSpace(session_buffer->descriptor_index, buff);
status != ZX_OK) {
// If the session can't fill Rx for any reason, kill it.
session_buffer->session->Kill();
// Put the index back at the end of the available queue.
available_queue_->Push(index);
return status;
}
session_buffer->session->RxTaken();
device_buffer_count_++;
return ZX_OK;
}
void RxQueue::CompleteRxList(
const fidl::VectorView<::fuchsia_hardware_network_driver::wire::RxBuffer>& rx_buffer_list) {
fbl::AutoLock lock(&parent_->rx_lock());
SharedAutoLock control_lock(&parent_->control_lock());
device_buffer_count_ -= rx_buffer_list.count();
for (const auto& rx_buffer : rx_buffer_list.get()) {
ZX_ASSERT_MSG(rx_buffer.data.count() <= MAX_BUFFER_PARTS,
"too many buffer parts in rx buffer: %ld", rx_buffer.data.count());
std::array<SessionRxBuffer, MAX_BUFFER_PARTS> session_parts;
auto session_parts_iter = session_parts.begin();
bool drop_frame = false;
uint32_t total_length = 0;
Session* primary_session = nullptr;
cpp20::span rx_parts = rx_buffer.data.get();
for (const fuchsia_hardware_network_driver::wire::RxBufferPart& rx_part : rx_parts) {
InFlightBuffer& in_flight_buffer = in_flight_->Get(rx_part.id);
total_length += rx_part.length;
*session_parts_iter++ = SessionRxBuffer{
.descriptor = in_flight_buffer.descriptor_index,
.offset = rx_part.offset,
.length = rx_part.length,
};
if (primary_session && in_flight_buffer.session != primary_session) {
// Received buffers from different sessions, meaning the primary session just changed and
// the device chained things together.
// If we don't want to drop this frame, we'd need to figure out which one is the new primary
// session, try and allocate buffers from it and copy things.
// That's complicated enough and this is unexpected enough that the current decision is to
// drop the frame on the floor.
LOGF_WARN(
"dropping chained frame with %ld buffers spanning different sessions: "
"%s, %s",
rx_buffer.data.count(), primary_session->name(), in_flight_buffer.session->name());
drop_frame = true;
}
ZX_DEBUG_ASSERT(in_flight_buffer.session != nullptr);
primary_session = in_flight_buffer.session;
}
if (!primary_session) {
// Buffer contained no parts.
LOG_WARN("attempted to return an rx buffer with no parts");
continue;
}
// Drop any frames containing no data or where inconsistencies were found above.
if (total_length == 0 || drop_frame) {
for (const fuchsia_hardware_network_driver::wire::RxBufferPart& rx_part : rx_parts) {
InFlightBuffer& in_flight_buffer = in_flight_->Get(rx_part.id);
in_flight_buffer.session->AssertParentRxLock(*parent_);
if (in_flight_buffer.session->CompleteUnfulfilledRx()) {
// Make buffer available again for reuse if session is still valid.
available_queue_->Push(rx_part.id);
} else {
// Free it otherwise.
in_flight_->Free(rx_part.id);
}
}
continue;
}
primary_session->AssertParentControlLockShared(*parent_);
parent_->NotifyPortRxFrame(rx_buffer.meta.port, total_length);
const RxFrameInfo frame_info = {
.meta = rx_buffer.meta,
.port_id_salt = parent_->GetPortSalt(rx_buffer.meta.port),
.buffers = cpp20::span(session_parts.begin(), session_parts_iter),
.total_length = total_length,
};
primary_session->AssertParentRxLock(*parent_);
if (primary_session->CompleteRx(frame_info)) {
std::for_each(rx_parts.begin(), rx_parts.end(),
[this](const fuchsia_hardware_network_driver::wire::RxBufferPart& rx)
__TA_REQUIRES(parent_->rx_lock()) { available_queue_->Push(rx.id); });
} else {
std::for_each(rx_parts.begin(), rx_parts.end(),
[this](const fuchsia_hardware_network_driver::wire::RxBufferPart& rx)
__TA_REQUIRES(parent_->rx_lock()) { in_flight_->Free(rx.id); });
}
}
parent_->CommitAllSessions();
if (device_buffer_count_ <= parent_->rx_notify_threshold()) {
TriggerRxWatch();
}
}
int RxQueue::WatchThread(
std::unique_ptr<fuchsia_hardware_network_driver::wire::RxSpaceBuffer[]> space_buffers) {
auto loop = [this, space_buffers = std::move(space_buffers)]() -> zx_status_t {
fbl::RefPtr<RefCountedFifo> observed_fifo(nullptr);
bool waiting_on_fifo = false;
for (;;) {
zx_port_packet_t packet;
bool fifo_readable = false;
if (zx_status_t status = rx_watch_port_.wait(zx::time::infinite(), &packet);
status != ZX_OK) {
LOGF_ERROR("RxQueue::WatchThread port wait failed %s", zx_status_get_string(status));
return status;
}
parent_->NotifyRxQueuePacket(packet.key);
switch (packet.key) {
case kQuitWatchKey:
LOG_TRACE("RxQueue::WatchThread got quit key");
return ZX_OK;
case kSessionSwitchKey:
if (observed_fifo && waiting_on_fifo) {
if (zx_status_t status = rx_watch_port_.cancel(observed_fifo->fifo, kFifoWatchKey);
status != ZX_OK) {
LOGF_ERROR("RxQueue::WatchThread port cancel failed %s",
zx_status_get_string(status));
return status;
}
waiting_on_fifo = false;
}
observed_fifo = parent_->primary_rx_fifo();
LOGF_TRACE("RxQueue primary FIFO changed, valid=%d", static_cast<bool>(observed_fifo));
break;
case kFifoWatchKey:
if ((packet.signal.observed & ZX_FIFO_PEER_CLOSED) || packet.status != ZX_OK) {
// If observing the FIFO fails, we're assuming that the session is being closed. We're
// just going to dispose of our reference to the observed FIFO and wait for
// `DeviceInterface` to signal us that a new primary session is available when that
// happens.
observed_fifo.reset();
LOGF_TRACE("RxQueue fifo closed or bad status %s", zx_status_get_string(packet.status));
} else {
fifo_readable = true;
}
waiting_on_fifo = false;
break;
default:
ZX_ASSERT_MSG(packet.key == kTriggerRxKey, "Unrecognized packet in rx queue");
break;
}
size_t pushed = 0;
bool should_wait_on_fifo;
fbl::AutoLock rx_lock(&parent_->rx_lock());
SharedAutoLock control_lock(&parent_->control_lock());
const uint16_t rx_depth = parent_->info().rx_depth().value_or(0);
size_t push_count = rx_depth - device_buffer_count_;
if (parent_->IsDataPlaneOpen()) {
for (; pushed < push_count; pushed++) {
if (PrepareBuff(&space_buffers[pushed]) != ZX_OK) {
break;
}
}
}
if (fifo_readable && push_count == 0 && in_flight_->available()) {
RxSessionTransaction transaction(this);
parent_->LoadRxDescriptors(transaction);
}
// We only need to wait on the FIFO if we didn't get enough buffers.
// Otherwise, we'll trigger the loop again once the device calls CompleteRx.
//
// Similarly, we should not wait on the FIFO if the device has not started yet.
should_wait_on_fifo = device_buffer_count_ < rx_depth && parent_->IsDataPlaneOpen();
// We release the main rx queue and control locks before calling into the parent device so we
// don't cause a re-entrant deadlock.
rx_lock.release();
control_lock.release();
if (pushed != 0) {
// Send buffers in batches of at most |MAX_RX_SPACE_BUFFERS| at a time to stay within the
// FIDL channel maximum.
netdriver::wire::RxSpaceBuffer* buffers = space_buffers.get();
while (pushed > 0) {
const uint32_t batch = std::min(static_cast<uint32_t>(pushed), MAX_RX_SPACE_BUFFERS);
parent_->QueueRxSpace(cpp20::span(buffers, static_cast<uint32_t>(batch)));
buffers += batch;
pushed -= batch;
}
}
// No point waiting in RX fifo if we filled the device buffers, we'll get a signal to wait
// on the fifo later.
if (should_wait_on_fifo) {
if (!observed_fifo) {
// This can happen if we get triggered to fetch more buffers, but the primary session is
// already tearing down, it's fine to just proceed.
LOG_TRACE("RxQueue::WatchThread Should wait but no FIFO is here");
} else if (!waiting_on_fifo) {
zx_status_t status = observed_fifo->fifo.wait_async(
rx_watch_port_, kFifoWatchKey, ZX_FIFO_READABLE | ZX_FIFO_PEER_CLOSED, 0);
if (status == ZX_OK) {
waiting_on_fifo = true;
} else {
LOGF_ERROR("RxQueue::WatchThread wait_async failed: %s", zx_status_get_string(status));
return status;
}
}
}
}
};
zx_status_t status = loop();
if (status != ZX_OK) {
LOGF_ERROR("RxQueue::WatchThread finished loop with error: %s", zx_status_get_string(status));
}
LOG_TRACE("watch thread done");
return 0;
}
uint32_t RxQueue::SessionTransaction::remaining() __TA_REQUIRES(queue_->parent_->rx_lock()) {
// NB: __TA_REQUIRES here is just encoding that a SessionTransaction always holds a lock for
// its parent queue, the protection from misuse comes from the annotations on
// `SessionTransaction`'s constructor and destructor.
return queue_->in_flight_->available();
}
void RxQueue::SessionTransaction::Push(Session* session, uint16_t descriptor)
__TA_REQUIRES(queue_->parent_->rx_lock()) {
// NB: __TA_REQUIRES here is just encoding that a SessionTransaction always holds a lock for
// its parent queue, the protection from misuse comes from the annotations on
// `SessionTransaction`'s constructor and destructor.
uint32_t idx = queue_->in_flight_->Push(InFlightBuffer(session, descriptor));
queue_->available_queue_->Push(idx);
}
} // namespace network::internal