| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "tx_queue.h" |
| |
| #include <lib/async/cpp/task.h> |
| #include <lib/fdf/cpp/env.h> |
| #include <zircon/assert.h> |
| #include <zircon/compiler.h> |
| #include <zircon/errors.h> |
| #include <zircon/types.h> |
| |
| #include "device_interface.h" |
| #include "fbl/auto_lock.h" |
| #include "lib/fit/defer.h" |
| #include "log.h" |
| #include "session.h" |
| #include "zircon/status.h" |
| |
| namespace network::internal { |
| |
| constexpr char kTxSchedulerRole[] = "fuchsia.devices.network.core.tx"; |
| |
| std::atomic<uint32_t> TxQueue::num_instances_ = 0; |
| |
| TxQueue::~TxQueue() { |
| // running_ is tied to the lifetime of the watch thread, it's cleared in`TxQueue::JoinThread`. |
| // This assertion protects us from destruction paths where `TxQueue::JoinThread` is not called. |
| ZX_ASSERT_MSG(!running_, "TxQueue destroyed without disposing of port and thread first."); |
| } |
| |
| void TxQueue::JoinThread() { |
| if (!dispatcher_.get()) { |
| return; |
| } |
| |
| if (zx_status_t status = EnqueueUserPacket(kQuitKey); status != ZX_OK) { |
| LOGF_ERROR("TxQueue::JoinThread failed to send quit key: %s", zx_status_get_string(status)); |
| } |
| |
| // Mark the queue as not running anymore. |
| running_ = false; |
| |
| dispatcher_.ShutdownAsync(); |
| dispatcher_shutdown_.Wait(); |
| dispatcher_.reset(); |
| --num_instances_; |
| } |
| |
| zx_status_t TxQueue::EnqueueUserPacket(uint64_t key) { |
| zx_port_packet_t packet; |
| packet.type = ZX_PKT_TYPE_USER; |
| packet.key = key; |
| packet.status = ZX_OK; |
| return port_.queue(&packet); |
| } |
| |
| void TxQueue::SessionTransaction::Commit() { |
| // when we destroy a session transaction, we commit all the queued buffers to |
| // the device. |
| if (queued_ != 0) { |
| // Send buffers in batches of at most |MAX_TX_BUFFERS| at a time to stay within the FIDL |
| // channel maximum. |
| netdriver::wire::TxBuffer* buffers = buffers_.data(); |
| while (queued_ > 0) { |
| const uint32_t batch = std::min(static_cast<uint32_t>(queued_), MAX_TX_BUFFERS); |
| queue_->parent_->QueueTx(cpp20::span(buffers, batch)); |
| buffers += batch; |
| queued_ -= batch; |
| } |
| } |
| } |
| |
| fuchsia_hardware_network_driver::wire::TxBuffer* TxQueue::SessionTransaction::GetBuffer() { |
| ZX_ASSERT(available_ != 0); |
| return &buffers_[queued_]; |
| } |
| |
| void TxQueue::SessionTransaction::Push(uint16_t descriptor) { |
| ZX_ASSERT(available_ != 0); |
| session_->TxTaken(); |
| buffers_[queued_].id = queue_->Enqueue(session_, descriptor); |
| available_--; |
| queued_++; |
| } |
| |
| TxQueue::SessionTransaction::SessionTransaction( |
| cpp20::span<fuchsia_hardware_network_driver::wire::TxBuffer> buffers, TxQueue* parent, |
| Session* session) |
| : buffers_(buffers), queue_(parent), session_(session), queued_(0) { |
| // only get available slots after lock is acquired: |
| // 0 available slots if parent is not enabled. |
| available_ = queue_->parent_->IsDataPlaneOpen() ? queue_->in_flight_->available() : 0; |
| } |
| |
| zx::result<std::unique_ptr<TxQueue>> TxQueue::Create(DeviceInterface* parent) { |
| // The Tx queue capacity is based on the underlying device's tx queue capacity. |
| auto capacity = parent->info().tx_depth().value_or(0); |
| |
| fbl::AllocChecker ac; |
| std::unique_ptr<TxQueue> queue(new (&ac) TxQueue(parent)); |
| if (!ac.check()) { |
| return zx::error(ZX_ERR_NO_MEMORY); |
| } |
| |
| fbl::AutoLock lock(&queue->parent_->tx_lock()); |
| |
| zx::result return_queue = RingQueue<uint32_t>::Create(capacity); |
| if (return_queue.is_error()) { |
| return return_queue.take_error(); |
| } |
| queue->return_queue_ = std::move(return_queue.value()); |
| |
| zx::result in_flight = IndexedSlab<InFlightBuffer>::Create(capacity); |
| if (in_flight.is_error()) { |
| return in_flight.take_error(); |
| } |
| queue->in_flight_ = std::move(in_flight.value()); |
| std::unique_ptr<fuchsia_hardware_network_driver::wire::TxBuffer[]> tx_buffers( |
| new (&ac) fuchsia_hardware_network_driver::wire::TxBuffer[capacity]); |
| if (!ac.check()) { |
| return zx::error(ZX_ERR_NO_MEMORY); |
| } |
| // TODO(https://github.com/llvm/llvm-project/issues/54497): remove unnecessary |
| // type extraction once clang-format can deal with [] in template parameter. |
| using BufferParts_t = BufferParts<fuchsia_hardware_network_driver::wire::BufferRegion>[]; |
| std::unique_ptr<BufferParts_t> buffer_parts( |
| new (&ac) BufferParts<fuchsia_hardware_network_driver::wire::BufferRegion>[capacity]); |
| if (!ac.check()) { |
| return zx::error(ZX_ERR_NO_MEMORY); |
| } |
| for (int i = 0; i < capacity; ++i) { |
| tx_buffers[i].data = |
| fidl::VectorView<fuchsia_hardware_network_driver::wire::BufferRegion>::FromExternal( |
| buffer_parts[i].data(), 0); |
| } |
| |
| if (zx_status_t status = zx::port::create(0, &queue->port_); status != ZX_OK) { |
| LOGF_ERROR("failed to create tx queue port"); |
| return zx::error(status); |
| } |
| |
| // Make sure the driver framework allows for the creation of the necessary threads. Keep track of |
| // the number of TX queue instances globally. |
| uint32_t instances = ++num_instances_; |
| if (zx_status_t status = |
| fdf_env_set_thread_limit(kTxSchedulerRole, strlen(kTxSchedulerRole), instances); |
| status != ZX_OK && status != ZX_ERR_OUT_OF_RANGE) { |
| // ZX_ERR_OUT_OF_RANGE indicates that the value is less than the current value. This can happen |
| // if a number of threads have recently shut down or two TX queues are being created at the same |
| // time and the loading of the atomic and setting of the limit are interleaved. It's safe to |
| // ignore that in this context, the important part is that there are enough threads. |
| LOGF_ERROR("failed to update thread limit: %s", zx_status_get_string(status)); |
| return zx::error(status); |
| } |
| |
| // In order to ensure that the async::PostTask below works this has to be a synchronized |
| // dispatcher that allows synchronous calls. Any other combination of dispatcher and options could |
| // lead to the async::PostTask call being inlined, meaning the task would run on the calling |
| // thread, blocking it indefinitely. Use a unique owner to ensure inlining of calls from inside |
| // the task. Calls to a dispatcher with the same owner might not be inlined. |
| auto dispatcher = fdf_env::DispatcherBuilder::CreateSynchronizedWithOwner( |
| queue.get(), fdf::SynchronizedDispatcher::Options::kAllowSyncCalls, "netdevice:tx_watch", |
| [queue = queue.get()](fdf_dispatcher_t*) { queue->dispatcher_shutdown_.Signal(); }, |
| kTxSchedulerRole); |
| if (dispatcher.is_error()) { |
| LOGF_ERROR("Failed to create tx dispatcher: %s", dispatcher.status_string()); |
| return dispatcher.take_error(); |
| } |
| queue->dispatcher_ = std::move(dispatcher.value()); |
| |
| async::PostTask(queue->dispatcher_.async_dispatcher(), |
| [queue = queue.get(), tx_buffers = std::move(tx_buffers), |
| buffer_parts = std::move(buffer_parts), |
| capacity]() { queue->Thread(cpp20::span(&tx_buffers[0], capacity)); }); |
| queue->running_ = true; |
| |
| return zx::ok(std::move(queue)); |
| } |
| |
| void TxQueue::Thread(cpp20::span<fuchsia_hardware_network_driver::wire::TxBuffer> buffers) { |
| for (;;) { |
| zx_port_packet_t packet; |
| zx_status_t status = port_.wait(zx::time::infinite(), &packet); |
| if (status != ZX_OK) { |
| LOGF_ERROR("tx thread port wait failed: %s", zx_status_get_string(status)); |
| return; |
| } |
| switch (packet.type) { |
| case ZX_PKT_TYPE_USER: |
| switch (packet.key) { |
| case kResumeKey: |
| if (zx_status_t status = UpdateFifoWatches(); status != ZX_OK) { |
| LOGF_ERROR("failed to install FIFO watches: %s", zx_status_get_string(status)); |
| return; |
| } |
| break; |
| case kQuitKey: |
| return; |
| default: |
| ZX_PANIC("unexpected user packet key %ld", packet.key); |
| } |
| break; |
| case ZX_PKT_TYPE_SIGNAL_ONE: |
| if (zx_status_t status = HandleFifoSignal(buffers, packet.key, packet.signal.observed); |
| status != ZX_OK) { |
| LOGF_ERROR("failed to handle FIFO signal: %s", zx_status_get_string(status)); |
| return; |
| } |
| break; |
| default: |
| ZX_PANIC("unexpected packet type %d", packet.type); |
| } |
| } |
| } |
| |
| zx_status_t TxQueue::UpdateFifoWatches() { |
| fbl::AutoLock lock(&parent_->tx_lock()); |
| for (auto it = sessions_.begin(); it != sessions_.end(); it++) { |
| SessionWaiter& waiter = *it; |
| Session& session = *waiter.session; |
| |
| if (session.IsPaused()) { |
| if (!waiter.wait_installed) { |
| continue; |
| } |
| zx_status_t status = port_.cancel(session.tx_fifo(), it.key()); |
| switch (status) { |
| case ZX_OK: |
| waiter.wait_installed = false; |
| continue; |
| default: |
| LOGF_ERROR("failed to cancel FIFO wait for session %s: %s", session.name(), |
| zx_status_get_string(status)); |
| return status; |
| } |
| } |
| |
| if (waiter.wait_installed) { |
| continue; |
| } |
| |
| if (zx_status_t status = session.tx_fifo().wait_async( |
| port_, it.key(), ZX_FIFO_PEER_CLOSED | ZX_FIFO_READABLE, 0); |
| status != ZX_OK) { |
| LOGF_ERROR("failed to start FIFO wait for session %s: %s", session.name(), |
| zx_status_get_string(status)); |
| return status; |
| } |
| |
| waiter.wait_installed = true; |
| } |
| |
| return ZX_OK; |
| } |
| |
| zx_status_t TxQueue::HandleFifoSignal( |
| cpp20::span<fuchsia_hardware_network_driver::wire::TxBuffer> buffers, SessionKey key, |
| zx_signals_t signals) { |
| fbl::AutoLock lock(&parent_->tx_lock()); |
| SessionWaiter* find_session = sessions_.Get(key); |
| // Session already removed from Tx queue, packet was lingering in the port. |
| if (find_session == nullptr) { |
| return ZX_OK; |
| } |
| |
| SessionWaiter& waiter = *find_session; |
| waiter.wait_installed = false; |
| Session& session = *waiter.session; |
| const zx::fifo& fifo = session.tx_fifo(); |
| SessionTransaction transaction(buffers, this, &session); |
| |
| // TA really doesn't like defers or its interplay with AutoLock. |
| auto defer = fit::defer([&transaction, &lock]() __TA_NO_THREAD_SAFETY_ANALYSIS { |
| // Committing the transaction must not be holding any locks, because we call |
| // into the device. |
| lock.release(); |
| transaction.Commit(); |
| }); |
| |
| if (signals & ZX_FIFO_READABLE) { |
| session.AssertParentTxLock(*parent_); |
| zx_status_t status = session.FetchTx(transaction); |
| switch (status) { |
| case ZX_OK: |
| case ZX_ERR_SHOULD_WAIT: |
| break; |
| case ZX_ERR_PEER_CLOSED: |
| // FIFO is closed, don't reinstall the wait. |
| case ZX_ERR_IO_OVERRUN: |
| // We've run out of device buffers, don't reinstall the wait until we're |
| // notified of new buffers. |
| return ZX_OK; |
| default: |
| LOGF_ERROR("unexpected error for session %s: %s; killing it", session.name(), |
| zx_status_get_string(status)); |
| session.Kill(); |
| return ZX_OK; |
| } |
| } |
| |
| if (signals & ZX_FIFO_PEER_CLOSED) { |
| // FIFO is closed, don't reinstall the wait. |
| return ZX_OK; |
| } |
| |
| if (zx_status_t status = fifo.wait_async(port_, key, ZX_FIFO_PEER_CLOSED | ZX_FIFO_READABLE, 0); |
| status != ZX_OK) { |
| LOGF_ERROR("failed to start FIFO wait for session %s: %s", session.name(), |
| zx_status_get_string(status)); |
| return status; |
| } |
| |
| waiter.wait_installed = true; |
| |
| return ZX_OK; |
| } |
| |
| void TxQueue::Resume() { |
| if (!running_) { |
| return; |
| } |
| |
| if (zx_status_t status = EnqueueUserPacket(kResumeKey); status != ZX_OK) { |
| LOGF_ERROR("TxQueue::Resume failed: %s", zx_status_get_string(status)); |
| } |
| } |
| |
| uint32_t TxQueue::Enqueue(Session* session, uint16_t descriptor) { |
| return in_flight_->Push(InFlightBuffer(session, ZX_OK, descriptor)); |
| } |
| |
| bool TxQueue::ReturnBuffers() { |
| size_t count = 0; |
| |
| uint16_t desc_buffer[kMaxFifoDepth]; |
| ZX_ASSERT(return_queue_->count() <= kMaxFifoDepth); |
| |
| uint16_t* descs = desc_buffer; |
| Session* cur_session = nullptr; |
| // record if the queue was full, meaning that sessions may have halted fetching tx frames due |
| // to overruns: |
| bool was_full = in_flight_->available() == 0; |
| while (return_queue_->count()) { |
| auto& buff = in_flight_->Get(return_queue_->Peek()); |
| if (cur_session != nullptr && buff.session != cur_session) { |
| // dispatch accumulated to session. This should only happen if we have outstanding |
| // return buffers from different sessions. |
| cur_session->ReturnTxDescriptors(desc_buffer, count); |
| count = 0; |
| descs = desc_buffer; |
| } |
| cur_session = buff.session; |
| cur_session->MarkTxReturnResult(buff.descriptor_index, buff.result); |
| *descs++ = buff.descriptor_index; |
| count++; |
| // pop from return queue and free space in in_flight queue: |
| in_flight_->Free(return_queue_->Pop()); |
| } |
| |
| if (cur_session && count != 0) { |
| cur_session->ReturnTxDescriptors(desc_buffer, count); |
| } |
| |
| return was_full; |
| } |
| |
| void TxQueue::CompleteTxList( |
| const fidl::VectorView<::fuchsia_hardware_network_driver::wire::TxResult>& tx_results) { |
| fbl::AutoLock lock(&parent_->tx_lock()); |
| for (const auto& tx : tx_results.get()) { |
| InFlightBuffer& buff = in_flight_->Get(tx.id); |
| buff.result = tx.status; |
| return_queue_->Push(tx.id); |
| } |
| bool was_full = ReturnBuffers(); |
| parent_->NotifyTxReturned(was_full); |
| parent_->NotifyTxComplete(); |
| } |
| |
| TxQueue::SessionKey TxQueue::AddSession(Session* session) { |
| sessions_.Grow(); |
| std::optional new_key = sessions_.Push(SessionWaiter{ |
| .session = session, |
| .wait_installed = false, |
| }); |
| // We grew the slab before pushing, we must have space. |
| ZX_ASSERT(new_key.has_value()); |
| |
| TxQueue::SessionKey key = new_key.value(); |
| |
| if (zx_status_t status = EnqueueUserPacket(kResumeKey); status != ZX_OK) { |
| LOGF_ERROR("failed to notify of new session %s with key %ld: %s", session->name(), key, |
| zx_status_get_string(status)); |
| } |
| |
| return key; |
| } |
| |
| void TxQueue::RemoveSession(SessionKey key) { |
| std::optional s = sessions_.Erase(key); |
| ZX_ASSERT_MSG(s.has_value(), "attempted to remove unknown session %ld", key); |
| } |
| |
| } // namespace network::internal |