liburingutils: Add API to handle socket message asynchronous
Bug: 385143770
Test: Unit test
Change-Id: I32ed7511e4e94e43766acb9ef3ddffc5d2eb0320
Signed-off-by: Akilesh Kailash <akailash@google.com>
diff --git a/Android.bp b/Android.bp
index 5fa331e..f625107 100644
--- a/Android.bp
+++ b/Android.bp
@@ -6,15 +6,19 @@
name: "liburingutils",
srcs: [
- "src/LibUringUtils.cpp",
+ "src/IOUringSocketHandler.cpp",
],
cflags: ["-Werror"],
export_include_dirs: ["include"],
+ static_libs: [
+ "liburing",
+ ],
shared_libs: [
"libbase",
+ "liblog",
],
tidy: true,
@@ -32,10 +36,13 @@
}
cc_test {
- name: "liburingutils_tests",
+ name: "IOUringSocketHandler_tests",
test_suites: ["device-tests"],
srcs: [
- "src/LibUringUtils_test.cpp",
+ "src/IOUringSocketHandler_test.cpp",
+ ],
+ static_libs: [
+ "liburing",
],
shared_libs: [
"liburingutils",
diff --git a/include/IOUringSocketHandler/IOUringSocketHandler.h b/include/IOUringSocketHandler/IOUringSocketHandler.h
new file mode 100644
index 0000000..226a5b2
--- /dev/null
+++ b/include/IOUringSocketHandler/IOUringSocketHandler.h
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2025 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include <liburing.h>
+
+/*
+ * IOUringSocketHandler is a helper class for using io_uring with a socket.
+ *
+ * Typical usage from a given thread:
+ *
+ * As a one time setup:
+ * 1. Create an instance of IOUringSocketHandler with the socket file descriptor.
+ * 2. Setup io_uring ring buffer.
+ * 3. Allocate buffers for the ring buffer.
+ * 4. Register buffers with io_uring.
+ * 5. EnqueueMultishotRecvmsg() will submit the SQE to receive the data
+ *
+ * In the I/O path:
+ *
+ * 6. Receive data from the socket through ReceiveData()
+ * 7. Release the buffer to io_uring.
+ *
+ * Note that the thread which sets up the io_uring instance should handle the
+ * I/O through ReceiveData() call.
+ */
+
+class IOUringSocketHandler {
+public:
+ IOUringSocketHandler(int socket_fd);
+ ~IOUringSocketHandler();
+
+ // Setup io_uring ring buffer
+ // queue_size: The size of the io_uring submission queue.
+ // Determines the maximum number of outstanding I/O requests.
+ // return: true on success, false on failure (e.g., if io_uring_setup fails).
+ //
+ // This function initializes the io_uring context and sets up the submission
+ // and completion queues. It prepares the io_uring instance for I/O operations.
+ // A larger queue_size allows for more concurrent I/O operations but consumes
+ // more memory.
+ bool SetupIoUring(int queue_size);
+
+ // Allocate 'num_buffers' of size 'buf_size'
+ //
+ // num_buffers: The number of buffers to allocate.
+ // buf_size: The size of each buffer in bytes.
+ //
+ // This function allocates a set of buffers that will be used for I/O operations
+ // with io_uring. These buffers are typically used to hold data that is read from
+ // or written to files or sockets. The allocated buffers are managed internally
+ // and are later registered with io_uring.
+ //
+ // The num_buffers will be the payload for the caller. Internally, it
+ // allocates additional metadata:
+ // a: sizeof(struct ucred) + sizeof(struct cmsghdr)
+ // b: sizeof(struct io_uring_recvmsg_out)
+ // This allows sender to send the ucred credential information if required.
+ //
+ // This function also registers the allocated buffers with the io_uring instance.
+ // Registering buffers allows the kernel to access them directly, avoiding the need
+ // to copy data between user space and kernel space during I/O operations. This
+ // improves performance.
+ //
+ // Please see additional details on how num_buffers will be used
+ // by the io_uring: https://man7.org/linux/man-pages/man3/io_uring_setup_buf_ring.3.html
+ bool AllocateAndRegisterBuffers(size_t num_buffers, size_t buf_size);
+
+ // Free up registered buffers with the io_uring instance.
+ //
+ // All the buffers allocated using AllocateAndRegisterBuffers() API will be
+ // freed and de-registered. Callers can then call
+ // AllocateAndRegisterBuffers() to re-register new set of bufferes with the
+ // ring.
+ void DeRegisterBuffers();
+
+ // ARM io_uring recvmsg opcode
+ //
+ // return: true on success, false on failure (e.g., if submission queue is full).
+ //
+ // This function enqueues a "multishot recvmsg" operation into the io_uring submission queue.
+ // Multishot recvmsg allows receiving multiple messages from a socket with a single
+ // io_uring submission. The function prepares the submission queue
+ // entry (SQE) for the recvmsg operation.
+ bool EnqueueMultishotRecvmsg();
+
+ // Release the buffer to io_uring
+ //
+ // This function releases a buffer back to the io_uring subsystem after it has been
+ // used for an I/O operation. This makes the buffer available for reuse in subsequent
+ // I/O operations.
+ //
+ // Additionally, when the buffer is released, a check is done to see if
+ // there are more CQE entries available. If not, EnqueueMultishotRecvmsg()
+ // is invoked so that the SQE submission is done for receiving next set of
+ // I/O.
+ void ReleaseBuffer();
+
+ // Receive payload data of size payload_len. Additionally, receive
+ // credential data.
+ //
+ // payload: A pointer to a void pointer. This will be set to point to the received
+ // payload data.
+ //
+ // payload_len: A reference to a size_t. This will be set to the length of the
+ // received payload data.
+ //
+ // cred: A pointer to a struct ucred pointer. This will be set to point to the
+ // user credentials associated with the received data (if available).
+ // If the sender doesn't have credential information in the payload,
+ // then nullptr will be returned.
+ //
+ // This function retrieves the data received from a recvmsg operation. It extracts the payload
+ // data and its length, as well as the user credentials associated with the sender. The
+ // caller is responsible for freeing the allocated memory for the payload and credentials
+ // when they are no longer needed.
+ void ReceiveData(void** payload, size_t& payload_len, struct ucred** cred);
+
+ // check if io_uring is supported
+ //
+ // return: true if io_uring is supported by the kernel, false otherwise.
+ //
+ // This function checks if the io_uring feature is supported by the underlying Linux kernel.
+ static bool isIouringEnabled();
+
+private:
+ static bool isIouringSupportedByKernel();
+ // Register buffers with io_uring
+ //
+ // return: true on success, false on failure (e.g., if io_uring_register_buffers fails).
+ //
+ // This function registers the previously allocated buffers with the io_uring instance.
+ // Registering buffers allows the kernel to access them directly, avoiding the need
+ // to copy data between user space and kernel space during I/O operations. This
+ // improves performance.
+ bool RegisterBuffers();
+
+ struct uring_context {
+ struct io_uring ring;
+ };
+ // Socket fd
+ int socket_;
+ std::unique_ptr<uring_context> mCtx;
+ std::vector<std::unique_ptr<uint8_t[]>> buffers_;
+ struct msghdr msg;
+ int control_len_;
+ size_t num_buffers_ = 0;
+ int buffer_size_;
+ int active_buffer_id_ = -1;
+ struct io_uring_cqe* cqe;
+ // A constant buffer group id as we don't support multiple buffer groups
+ // yet.
+ const int bgid_ = 7;
+ struct io_uring_buf_ring* br_;
+ bool registered_buffers_ = false;
+ bool ring_setup_ = false;
+};
diff --git a/include/liburingutils/LibUringUtils.h b/include/liburingutils/LibUringUtils.h
deleted file mode 100644
index a6dbefc..0000000
--- a/include/liburingutils/LibUringUtils.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (C) 2025 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __LIBURING_UTILS_H
-#define __LIBURING_UTILS_H
-
-class LibUringUtils {
-public:
- static bool isIouringEnabled();
-
-private:
- static bool isIouringSupportedByKernel();
-};
-
-#endif
diff --git a/src/IOUringSocketHandler.cpp b/src/IOUringSocketHandler.cpp
new file mode 100644
index 0000000..0c68281
--- /dev/null
+++ b/src/IOUringSocketHandler.cpp
@@ -0,0 +1,206 @@
+/*
+ * Copyright (C) 2025 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "IOUringSocketHandler"
+
+#include <sys/resource.h>
+#include <sys/utsname.h>
+#include <unistd.h>
+
+#include <limits.h>
+#include <linux/time_types.h>
+#include <sys/cdefs.h>
+#include <sys/prctl.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <chrono>
+#include <thread>
+
+#include <cutils/sockets.h>
+#include <private/android_logger.h>
+
+#include <IOUringSocketHandler/IOUringSocketHandler.h>
+
+#include <android-base/logging.h>
+#include <android-base/scopeguard.h>
+
+bool IOUringSocketHandler::isIouringEnabled() {
+ return isIouringSupportedByKernel();
+}
+
+bool IOUringSocketHandler::isIouringSupportedByKernel() {
+ struct utsname uts {};
+ unsigned int major, minor;
+
+ uname(&uts);
+ if (sscanf(uts.release, "%u.%u", &major, &minor) != 2) {
+ return false;
+ }
+
+ // We will only support kernels from 6.1 and higher.
+ return major > 6 || (major == 6 && minor >= 1);
+}
+
+IOUringSocketHandler::IOUringSocketHandler(int socket_fd) : socket_(socket_fd) {}
+
+IOUringSocketHandler::~IOUringSocketHandler() {
+ DeRegisterBuffers();
+ if (ring_setup_) {
+ io_uring_queue_exit(&mCtx->ring);
+ }
+}
+
+bool IOUringSocketHandler::EnqueueMultishotRecvmsg() {
+ struct io_uring_sqe* sqe = io_uring_get_sqe(&mCtx->ring);
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_controllen = control_len_;
+ io_uring_prep_recvmsg_multishot(sqe, socket_, &msg, 0);
+ sqe->flags |= IOSQE_BUFFER_SELECT;
+ sqe->buf_group = bgid_;
+ int ret = io_uring_submit(&mCtx->ring);
+ if (ret < 0) {
+ LOG(ERROR) << "EnqueueMultishotRecvmsg failed: ret: " << ret;
+ return false;
+ }
+ return true;
+}
+
+bool IOUringSocketHandler::AllocateAndRegisterBuffers(size_t num_buffers, size_t buf_size) {
+ num_buffers_ = num_buffers;
+ control_len_ = CMSG_ALIGN(sizeof(struct ucred)) + sizeof(struct cmsghdr);
+
+ buffer_size_ = sizeof(struct io_uring_recvmsg_out) + control_len_ + buf_size;
+
+ for (size_t i = 0; i < num_buffers_; i++) {
+ std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(buffer_size_);
+ buffers_.push_back(std::move(buffer));
+ }
+ return RegisterBuffers();
+}
+
+bool IOUringSocketHandler::RegisterBuffers() {
+ int ret = 0;
+ br_ = io_uring_setup_buf_ring(&mCtx->ring, num_buffers_, bgid_, 0, &ret);
+ if (!br_) {
+ LOG(ERROR) << "io_uring_setup_buf_ring failed with error: " << ret;
+ return false;
+ }
+ for (size_t i = 0; i < num_buffers_; i++) {
+ void* buffer = buffers_[i].get();
+ io_uring_buf_ring_add(br_, buffer, buffer_size_, i, io_uring_buf_ring_mask(num_buffers_),
+ i);
+ }
+ io_uring_buf_ring_advance(br_, num_buffers_);
+ LOG(DEBUG) << "RegisterBuffers success: " << num_buffers_;
+ registered_buffers_ = true;
+ return true;
+}
+
+void IOUringSocketHandler::DeRegisterBuffers() {
+ if (registered_buffers_) {
+ io_uring_free_buf_ring(&mCtx->ring, br_, num_buffers_, bgid_);
+ }
+ buffers_.clear();
+ num_buffers_ = 0;
+ control_len_ = 0;
+ buffer_size_ = 0;
+}
+
+bool IOUringSocketHandler::SetupIoUring(int queue_size) {
+ mCtx = std::unique_ptr<uring_context>(new uring_context());
+ struct io_uring_params params = {};
+
+ // COOP_TASKRUN - No IPI to logd
+ // SINGLE_ISSUER - Only one thread is doing the work on the ring
+ // TASKRUN_FLAG - we use peek_cqe - Hence, trigger task work if required
+ // DEFER_TASKRUN - trigger task work when CQE is explicitly polled
+ params.flags |= (IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER |
+ IORING_SETUP_TASKRUN_FLAG | IORING_SETUP_DEFER_TASKRUN);
+
+ int ret = io_uring_queue_init_params(queue_size + 1, &mCtx->ring, ¶ms);
+ if (ret) {
+ LOG(ERROR) << "io_uring_queue_init_params failed with ret: " << ret;
+ return false;
+ } else {
+ LOG(INFO) << "io_uring_queue_init_params success";
+ }
+
+ ring_setup_ = true;
+ return true;
+}
+
+void IOUringSocketHandler::ReleaseBuffer() {
+ if (active_buffer_id_ == -1) {
+ return;
+ }
+
+ // Put the buffer back to the pool
+ io_uring_buf_ring_add(br_, buffers_[active_buffer_id_].get(), buffer_size_, active_buffer_id_,
+ io_uring_buf_ring_mask(num_buffers_), 0);
+ io_uring_buf_ring_cq_advance(&mCtx->ring, br_, 1);
+ active_buffer_id_ = -1;
+
+ // If there are no more CQE data, re-arm the SQE
+ bool is_more_cqe = (cqe->flags & IORING_CQE_F_MORE);
+ if (!is_more_cqe) {
+ EnqueueMultishotRecvmsg();
+ }
+}
+
+void IOUringSocketHandler::ReceiveData(void** payload, size_t& payload_len, struct ucred** cred) {
+ if (io_uring_peek_cqe(&mCtx->ring, &cqe) < 0) {
+ int ret = io_uring_wait_cqe(&mCtx->ring, &cqe);
+ if (ret) {
+ LOG(ERROR) << "WaitCqe failed: " << ret;
+ EnqueueMultishotRecvmsg();
+ return;
+ }
+ }
+
+ if (cqe->res < 0) {
+ io_uring_cqe_seen(&mCtx->ring, cqe);
+ EnqueueMultishotRecvmsg();
+ return;
+ }
+
+ active_buffer_id_ = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
+
+ void* this_recv = buffers_[active_buffer_id_].get();
+ struct io_uring_recvmsg_out* o = io_uring_recvmsg_validate(this_recv, cqe->res, &msg);
+
+ if (!o) {
+ return;
+ }
+
+ struct cmsghdr* cmsg;
+ cmsg = io_uring_recvmsg_cmsg_firsthdr(o, &msg);
+
+ struct ucred* cr = nullptr;
+ while (cmsg != nullptr) {
+ if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_CREDENTIALS) {
+ cr = (struct ucred*)CMSG_DATA(cmsg);
+ break;
+ }
+ cmsg = io_uring_recvmsg_cmsg_nexthdr(o, &msg, cmsg);
+ }
+
+ *payload = io_uring_recvmsg_payload(o, &msg);
+ payload_len = io_uring_recvmsg_payload_length(o, cqe->res, &msg);
+ *cred = cr;
+}
diff --git a/src/LibUringUtils_test.cpp b/src/IOUringSocketHandler_test.cpp
similarity index 62%
rename from src/LibUringUtils_test.cpp
rename to src/IOUringSocketHandler_test.cpp
index aff5206..67361f6 100644
--- a/src/LibUringUtils_test.cpp
+++ b/src/IOUringSocketHandler_test.cpp
@@ -14,20 +14,18 @@
* limitations under the License.
*/
-#include <liburingutils/LibUringUtils.h>
+#include <IOUringSocketHandler/IOUringSocketHandler.h>
#include <gtest/gtest.h>
-class LibUringUtilsTest : public testing::Test {
+class IOUringSocketHandlerTest : public testing::Test {
public:
void testIsIouringEnabled(bool expectedResult) {
- EXPECT_EQ(LibUringUtils::isIouringEnabled(), expectedResult);
+ EXPECT_EQ(IOUringSocketHandler::isIouringEnabled(), expectedResult);
}
};
-TEST_F(LibUringUtilsTest, ReturnsIouringNotEnabled) {
- // TODO: b/385143770 - Change this test to base on the real OS version,
- // this default expected value is true for the binary built from the
- // latest version of source code.
- testIsIouringEnabled(true);
+TEST_F(IOUringSocketHandlerTest, ReturnsIouringNotEnabled) {
+ // TODO: b/385143770 - Change this behavior to check the OS version and Liburing version.
+ testIsIouringEnabled(false);
}
diff --git a/src/LibUringUtils.cpp b/src/LibUringUtils.cpp
deleted file mode 100644
index 25151d3..0000000
--- a/src/LibUringUtils.cpp
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (C) 2025 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#define LOG_TAG "LibUringUtils"
-
-#include <android-base/strings.h>
-#include <liburingutils/LibUringUtils.h>
-#include <sys/resource.h>
-#include <sys/utsname.h>
-#include <unistd.h>
-
-bool LibUringUtils::isIouringEnabled() {
- // TODO: b/385143770 - Change this behavior to also check the Liburing version.
- return isIouringSupportedByKernel();
-}
-
-bool LibUringUtils::isIouringSupportedByKernel() {
- struct utsname uts {};
- unsigned int major, minor;
-
- uname(&uts);
- if (sscanf(uts.release, "%u.%u", &major, &minor) != 2) {
- return false;
- }
-
- // We will only support kernels from 6.1 and higher.
- return major > 6 || (major == 6 && minor >= 1);
-}