| /* |
| * Copyright (C) 2025 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #define LOG_TAG "IOUringSocketHandler" |
| |
| #include <sys/resource.h> |
| #include <sys/utsname.h> |
| #include <unistd.h> |
| |
| #include <limits.h> |
| #include <linux/time_types.h> |
| #include <sys/cdefs.h> |
| #include <sys/prctl.h> |
| #include <sys/socket.h> |
| #include <sys/types.h> |
| #include <sys/un.h> |
| #include <unistd.h> |
| |
| #include <chrono> |
| #include <thread> |
| |
| #include <cutils/sockets.h> |
| #include <private/android_logger.h> |
| |
| #include <IOUringSocketHandler/IOUringSocketHandler.h> |
| |
| #include <android-base/logging.h> |
| #include <android-base/scopeguard.h> |
| |
| bool IOUringSocketHandler::IsIouringSupported() { |
| return IsIouringSupportedByKernel(); |
| } |
| |
| bool IOUringSocketHandler::IsIouringSupportedByKernel() { |
| struct utsname uts {}; |
| unsigned int major, minor; |
| |
| uname(&uts); |
| if (sscanf(uts.release, "%u.%u", &major, &minor) != 2) { |
| return false; |
| } |
| |
| // We will only support kernels from 6.1 and higher. |
| return major > 6 || (major == 6 && minor >= 1); |
| } |
| |
| IOUringSocketHandler::IOUringSocketHandler(int socket_fd) : socket_(socket_fd) {} |
| |
| IOUringSocketHandler::~IOUringSocketHandler() { |
| if (registered_ring_fd_) { |
| io_uring_unregister_ring_fd(&mCtx->ring); |
| } |
| |
| DeRegisterBuffers(); |
| |
| if (ring_setup_) { |
| io_uring_queue_exit(&mCtx->ring); |
| } |
| } |
| |
| bool IOUringSocketHandler::EnqueueMultishotRecvmsg() { |
| struct io_uring_sqe* sqe = io_uring_get_sqe(&mCtx->ring); |
| memset(&msg, 0, sizeof(msg)); |
| msg.msg_controllen = control_len_; |
| |
| io_uring_prep_recvmsg_multishot(sqe, socket_, &msg, 0); |
| sqe->flags |= IOSQE_BUFFER_SELECT; |
| |
| sqe->buf_group = bgid_; |
| int ret = io_uring_submit(&mCtx->ring); |
| if (ret < 0) { |
| LOG(ERROR) << "EnqueueMultishotRecvmsg failed: ret: " << ret; |
| return false; |
| } |
| return true; |
| } |
| |
| bool IOUringSocketHandler::AllocateAndRegisterBuffers(size_t num_buffers, size_t buf_size) { |
| num_buffers_ = num_buffers; |
| control_len_ = CMSG_ALIGN(sizeof(struct ucred)) + sizeof(struct cmsghdr); |
| buffer_size_ = sizeof(struct io_uring_recvmsg_out) + control_len_ + buf_size; |
| |
| for (size_t i = 0; i < num_buffers_; i++) { |
| std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(buffer_size_); |
| buffers_.push_back(std::move(buffer)); |
| } |
| |
| cqe_vector_.resize(num_buffers_); |
| return RegisterBuffers(); |
| } |
| |
| bool IOUringSocketHandler::RegisterBuffers() { |
| int ret = 0; |
| br_ = io_uring_setup_buf_ring(&mCtx->ring, num_buffers_, bgid_, 0, &ret); |
| if (!br_) { |
| LOG(ERROR) << "io_uring_setup_buf_ring failed with error: " << ret; |
| return false; |
| } |
| for (size_t i = 0; i < num_buffers_; i++) { |
| void* buffer = buffers_[i].get(); |
| io_uring_buf_ring_add(br_, buffer, buffer_size_, i, io_uring_buf_ring_mask(num_buffers_), |
| i); |
| } |
| io_uring_buf_ring_advance(br_, num_buffers_); |
| LOG(DEBUG) << "RegisterBuffers success: " << num_buffers_; |
| registered_buffers_ = true; |
| return true; |
| } |
| |
| void IOUringSocketHandler::DeRegisterBuffers() { |
| if (registered_buffers_) { |
| io_uring_free_buf_ring(&mCtx->ring, br_, num_buffers_, bgid_); |
| registered_buffers_ = false; |
| } |
| buffers_.clear(); |
| num_buffers_ = 0; |
| control_len_ = 0; |
| buffer_size_ = 0; |
| } |
| |
| bool IOUringSocketHandler::SetupIoUring(int queue_size) { |
| mCtx = std::unique_ptr<uring_context>(new uring_context()); |
| struct io_uring_params params = {}; |
| |
| // COOP_TASKRUN - Do not send IPI to process |
| // SINGLE_ISSUER - Only one thread is doing the work on the ring |
| // DEFER_TASKRUN - trigger task work when CQE is explicitly polled |
| params.flags |= (IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER | |
| IORING_SETUP_DEFER_TASKRUN); |
| |
| int ret = io_uring_queue_init_params(queue_size + 1, &mCtx->ring, ¶ms); |
| if (ret) { |
| LOG(ERROR) << "io_uring_queue_init_params failed with ret: " << ret; |
| return false; |
| } else { |
| LOG(DEBUG) << "io_uring_queue_init_params success"; |
| } |
| |
| ring_setup_ = true; |
| |
| ret = io_uring_register_ring_fd(&mCtx->ring); |
| if (ret < 0) { |
| LOG(ERROR) << "io_uring_register_ring_fd failed: " << ret; |
| } else { |
| registered_ring_fd_ = true; |
| } |
| |
| unsigned int values[2]; |
| values[0] = values[1] = 1; |
| ret = io_uring_register_iowq_max_workers(&mCtx->ring, values); |
| if (ret) { |
| LOG(ERROR) << "io_uring_register_iowq_max_workers failed: " << ret; |
| } |
| |
| return true; |
| } |
| |
| void IOUringSocketHandler::ReleaseBuffer() { |
| if (active_buffer_id_ == -1) { |
| return; |
| } |
| |
| // If there are no more CQE data, re-arm the SQE |
| bool is_more_cqe = (cqe->flags & IORING_CQE_F_MORE); |
| |
| // Put the buffer back to the pool |
| io_uring_buf_ring_add(br_, buffers_[active_buffer_id_].get(), buffer_size_, active_buffer_id_, |
| io_uring_buf_ring_mask(num_buffers_), 0); |
| // Advance the CQE pointer and buffer ring. |
| io_uring_buf_ring_cq_advance(&mCtx->ring, br_, 1); |
| active_buffer_id_ = -1; |
| |
| if (!is_more_cqe) { |
| EnqueueMultishotRecvmsg(); |
| } |
| } |
| |
| void IOUringSocketHandler::ReceiveData(void** payload, size_t& payload_len, struct ucred** cred) { |
| while (true) { |
| if (active_count_ > 0) { |
| // Consume next CQE from the existing active batch |
| cqe = cqe_vector_[active_index_]; |
| active_count_ -= 1; |
| active_index_ += 1; |
| } else { |
| // No active batch, try to get new CQEs |
| active_index_ = 0; |
| // Try to peek a batch without blocking |
| int count = io_uring_peek_batch_cqe(&mCtx->ring, cqe_vector_.data(), num_buffers_); |
| if (count > 0 ) { |
| // Peek successful, store the count and process the first CQE now |
| active_count_ = count; |
| cqe = cqe_vector_[active_index_]; // Get the first one (index 0) |
| active_count_ -= 1; |
| active_index_ += 1; |
| } else { |
| // No batch is active |
| active_index_ = -1; |
| active_count_ = 0; |
| // Peek failed (no CQEs ready), block waiting for a single CQE |
| // Since DEFER_TASK_RUN flag is set for the ring, this |
| // will trigger the task and initiate the receive of packets |
| int ret = io_uring_wait_cqe(&mCtx->ring, &cqe); |
| if (ret) { |
| EnqueueMultishotRecvmsg(); |
| continue; |
| } |
| } |
| } |
| |
| bool cqe_f_buffer = (cqe->flags & IORING_CQE_F_BUFFER); |
| |
| // A failure here would most likely be related to ENOBUFS. |
| // However, for every failure, we need to re-arm the multishot sqe. |
| if ((cqe->res < 0) || !cqe_f_buffer) { |
| // No buffers were selected from registered buffers even |
| // though we had valid payload. |
| if ((cqe->res > 0) && !cqe_f_buffer) { |
| LOG(ERROR) << "No buffers selected. cqe->res: " << cqe->res |
| << " cqe_flags: " << cqe->flags; |
| } |
| if (cqe->res != -ENOBUFS) { |
| LOG(ERROR) << "cqe failed with error: " << cqe->res; |
| } |
| io_uring_cqe_seen(&mCtx->ring, cqe); |
| EnqueueMultishotRecvmsg(); |
| continue; |
| } |
| |
| // Pick the buffer-id where the payload data is sent. |
| active_buffer_id_ = cqe->flags >> IORING_CQE_BUFFER_SHIFT; |
| |
| void* this_recv = buffers_[active_buffer_id_].get(); |
| struct io_uring_recvmsg_out* out = io_uring_recvmsg_validate(this_recv, cqe->res, &msg); |
| |
| if (!out) { |
| ReleaseBuffer(); |
| continue; |
| } |
| |
| // Fetch ucred control data from cmsg |
| struct cmsghdr* cmsg; |
| cmsg = io_uring_recvmsg_cmsg_firsthdr(out, &msg); |
| |
| struct ucred* cr = nullptr; |
| while (cmsg != nullptr) { |
| if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_CREDENTIALS) { |
| cr = (struct ucred*)CMSG_DATA(cmsg); |
| break; |
| } |
| cmsg = io_uring_recvmsg_cmsg_nexthdr(out, &msg, cmsg); |
| } |
| |
| *payload = io_uring_recvmsg_payload(out, &msg); |
| payload_len = io_uring_recvmsg_payload_length(out, cqe->res, &msg); |
| *cred = cr; |
| |
| // We have the valid data. Return it to the client. |
| // Note: We don't check "cred" pointer as senders can just send |
| // payload without credentials. It is up to the caller on how |
| // to handle it. |
| if ((*payload != nullptr) && (payload_len > 0)) { |
| break; |
| } else { |
| // Release the buffer and re-check the CQE buffers in the ring. |
| ReleaseBuffer(); |
| } |
| } |
| } |