blob: cbcf225fb46a7f75fe1ae1177935c77997a57999 [file] [log] [blame]
/*
* Copyright (C) 2025 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "IOUringSocketHandler"
#include <sys/resource.h>
#include <sys/utsname.h>
#include <unistd.h>
#include <limits.h>
#include <linux/time_types.h>
#include <sys/cdefs.h>
#include <sys/prctl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include <chrono>
#include <thread>
#include <cutils/sockets.h>
#include <private/android_logger.h>
#include <IOUringSocketHandler/IOUringSocketHandler.h>
#include <android-base/logging.h>
#include <android-base/scopeguard.h>
bool IOUringSocketHandler::IsIouringSupported() {
return IsIouringSupportedByKernel();
}
bool IOUringSocketHandler::IsIouringSupportedByKernel() {
struct utsname uts {};
unsigned int major, minor;
uname(&uts);
if (sscanf(uts.release, "%u.%u", &major, &minor) != 2) {
return false;
}
// We will only support kernels from 6.1 and higher.
return major > 6 || (major == 6 && minor >= 1);
}
IOUringSocketHandler::IOUringSocketHandler(int socket_fd) : socket_(socket_fd) {}
IOUringSocketHandler::~IOUringSocketHandler() {
if (registered_ring_fd_) {
io_uring_unregister_ring_fd(&mCtx->ring);
}
DeRegisterBuffers();
if (ring_setup_) {
io_uring_queue_exit(&mCtx->ring);
}
}
bool IOUringSocketHandler::EnqueueMultishotRecvmsg() {
struct io_uring_sqe* sqe = io_uring_get_sqe(&mCtx->ring);
memset(&msg, 0, sizeof(msg));
msg.msg_controllen = control_len_;
io_uring_prep_recvmsg_multishot(sqe, socket_, &msg, 0);
sqe->flags |= IOSQE_BUFFER_SELECT;
sqe->buf_group = bgid_;
int ret = io_uring_submit(&mCtx->ring);
if (ret < 0) {
LOG(ERROR) << "EnqueueMultishotRecvmsg failed: ret: " << ret;
return false;
}
return true;
}
bool IOUringSocketHandler::AllocateAndRegisterBuffers(size_t num_buffers, size_t buf_size) {
num_buffers_ = num_buffers;
control_len_ = CMSG_ALIGN(sizeof(struct ucred)) + sizeof(struct cmsghdr);
buffer_size_ = sizeof(struct io_uring_recvmsg_out) + control_len_ + buf_size;
for (size_t i = 0; i < num_buffers_; i++) {
std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(buffer_size_);
buffers_.push_back(std::move(buffer));
}
return RegisterBuffers();
}
bool IOUringSocketHandler::RegisterBuffers() {
int ret = 0;
br_ = io_uring_setup_buf_ring(&mCtx->ring, num_buffers_, bgid_, 0, &ret);
if (!br_) {
LOG(ERROR) << "io_uring_setup_buf_ring failed with error: " << ret;
return false;
}
for (size_t i = 0; i < num_buffers_; i++) {
void* buffer = buffers_[i].get();
io_uring_buf_ring_add(br_, buffer, buffer_size_, i, io_uring_buf_ring_mask(num_buffers_),
i);
}
io_uring_buf_ring_advance(br_, num_buffers_);
LOG(DEBUG) << "RegisterBuffers success: " << num_buffers_;
registered_buffers_ = true;
return true;
}
void IOUringSocketHandler::DeRegisterBuffers() {
if (registered_buffers_) {
io_uring_free_buf_ring(&mCtx->ring, br_, num_buffers_, bgid_);
registered_buffers_ = false;
}
buffers_.clear();
num_buffers_ = 0;
control_len_ = 0;
buffer_size_ = 0;
}
bool IOUringSocketHandler::SetupIoUring(int queue_size) {
mCtx = std::unique_ptr<uring_context>(new uring_context());
struct io_uring_params params = {};
// COOP_TASKRUN - Do not send IPI to process
// SINGLE_ISSUER - Only one thread is doing the work on the ring
// TASKRUN_FLAG - we use peek_cqe - Hence, trigger task work if required
// DEFER_TASKRUN - trigger task work when CQE is explicitly polled
params.flags |= (IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER |
IORING_SETUP_TASKRUN_FLAG | IORING_SETUP_DEFER_TASKRUN);
int ret = io_uring_queue_init_params(queue_size + 1, &mCtx->ring, &params);
if (ret) {
LOG(ERROR) << "io_uring_queue_init_params failed with ret: " << ret;
return false;
} else {
LOG(DEBUG) << "io_uring_queue_init_params success";
}
ring_setup_ = true;
ret = io_uring_register_ring_fd(&mCtx->ring);
if (ret < 0) {
LOG(ERROR) << "io_uring_register_ring_fd failed: " << ret;
} else {
registered_ring_fd_ = true;
}
return true;
}
void IOUringSocketHandler::ReleaseBuffer() {
if (active_buffer_id_ == -1) {
return;
}
// If there are no more CQE data, re-arm the SQE
bool is_more_cqe = (cqe->flags & IORING_CQE_F_MORE);
// Put the buffer back to the pool
io_uring_buf_ring_add(br_, buffers_[active_buffer_id_].get(), buffer_size_, active_buffer_id_,
io_uring_buf_ring_mask(num_buffers_), 0);
// Advance the CQE pointer and buffer ring.
io_uring_buf_ring_cq_advance(&mCtx->ring, br_, 1);
active_buffer_id_ = -1;
if (!is_more_cqe) {
EnqueueMultishotRecvmsg();
}
}
void IOUringSocketHandler::ReceiveData(void** payload, size_t& payload_len, struct ucred** cred) {
while (true) {
// Try to get the data if CQE is already present in the ring buffer.
// Note: Since IORING_SETUP_TASKRUN_FLAG flag is set, this will trigger
// a task run if there are no outstanding CQEs to be reaped.
if (io_uring_peek_cqe(&mCtx->ring, &cqe) < 0) {
int ret = io_uring_wait_cqe(&mCtx->ring, &cqe);
if (ret) {
EnqueueMultishotRecvmsg();
continue;
}
}
bool cqe_f_buffer = (cqe->flags & IORING_CQE_F_BUFFER);
// A failure here would most likely be related to ENOBUFS.
// However, for every failure, we need to re-arm the multishot sqe.
if ((cqe->res < 0) || !cqe_f_buffer) {
// No buffers were selected from registered buffers even
// though we had valid payload.
if ((cqe->res > 0) && !cqe_f_buffer) {
LOG(ERROR) << "No buffers selected. cqe->res: " << cqe->res
<< " cqe_flags: " << cqe->flags;
}
if (cqe->res != -ENOBUFS) {
LOG(ERROR) << "cqe failed with error: " << cqe->res;
}
io_uring_cqe_seen(&mCtx->ring, cqe);
EnqueueMultishotRecvmsg();
continue;
}
// Pick the buffer-id where the payload data is sent.
active_buffer_id_ = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
void* this_recv = buffers_[active_buffer_id_].get();
struct io_uring_recvmsg_out* out = io_uring_recvmsg_validate(this_recv, cqe->res, &msg);
if (!out) {
ReleaseBuffer();
continue;
}
// Fetch ucred control data from cmsg
struct cmsghdr* cmsg;
cmsg = io_uring_recvmsg_cmsg_firsthdr(out, &msg);
struct ucred* cr = nullptr;
while (cmsg != nullptr) {
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_CREDENTIALS) {
cr = (struct ucred*)CMSG_DATA(cmsg);
break;
}
cmsg = io_uring_recvmsg_cmsg_nexthdr(out, &msg, cmsg);
}
*payload = io_uring_recvmsg_payload(out, &msg);
payload_len = io_uring_recvmsg_payload_length(out, cqe->res, &msg);
*cred = cr;
// We have the valid data. Return it to the client.
// Note: We don't check "cred" pointer as senders can just send
// payload without credentials. It is up to the caller on how
// to handle it.
if ((*payload != nullptr) && (payload_len > 0)) {
break;
} else {
// Release the buffer and re-check the CQE buffers in the ring.
ReleaseBuffer();
}
}
}