liburingutils: Benchmark to compare recvmsg+poll with liburingutils
Default there are 16 sender threads.
Each sender thread sends 1 Million messages. Each message payload is
4096 bytes.
On the receiver side, there is one receiver thread.
===================================
With io_uring:
Start server:
simpleperf stat -e cpu-cycles taskset -a f0 ./IOUringSocketHandlerReceiver -io-uring
Start sender:
./IOUringSocketHandlerSender
=======================================
With recvmsg:
Start server:
simpleperf stat -e cpu-cycles taskset -a f0 ./IOUringSocketHandlerReceiver -sync
Start sender:
./IOUringSocketHandlerSender
=========================================
Bug: 406299670
Test: On Pixel 8 Pro (Husky)
Change-Id: I3b4ca100b6b0a3b459b86a3f7b25957440d8ae84
Signed-off-by: Akilesh Kailash <akailash@google.com>
diff --git a/Android.bp b/Android.bp
index d1fe021..d1d3623 100644
--- a/Android.bp
+++ b/Android.bp
@@ -81,3 +81,37 @@
"-Werror",
],
}
+
+cc_benchmark {
+ name: "IOUringSocketHandlerReceiver",
+ srcs: ["src/IOUringSocketHandlerReceiver.cpp"],
+ shared_libs: [
+ "libbase",
+ "liblog",
+ ],
+ static_libs: [
+ "liburing",
+ "liburingutils",
+ ],
+ cflags: [
+ "-Wall",
+ "-Werror",
+ ],
+}
+
+cc_benchmark {
+ name: "IOUringSocketHandlerSender",
+ srcs: ["src/IOUringSocketHandlerSender.cpp"],
+ shared_libs: [
+ "libbase",
+ "liblog",
+ ],
+ static_libs: [
+ "liburing",
+ "liburingutils",
+ ],
+ cflags: [
+ "-Wall",
+ "-Werror",
+ ],
+}
diff --git a/src/IOUringSocketHandlerReceiver.cpp b/src/IOUringSocketHandlerReceiver.cpp
new file mode 100644
index 0000000..dd4763f
--- /dev/null
+++ b/src/IOUringSocketHandlerReceiver.cpp
@@ -0,0 +1,246 @@
+#include <iostream>
+#include <cstring>
+#include <vector>
+#include <thread>
+#include <chrono>
+#include <cstring>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <poll.h>
+#include <unistd.h>
+#include <random>
+
+#include <android-base/file.h>
+#include <android-base/logging.h>
+
+#include <android-base/scopeguard.h>
+#include <IOUringSocketHandler/IOUringSocketHandler.h>
+
+#include <benchmark/benchmark.h>
+
+// Registered buffers
+#define MAX_BUFFERS 256
+
+// Threads sending 4k payload - 1 Million times
+const int MESSAGE_SIZE = 4096; // 4KB
+const int NUM_MESSAGES_PER_THREAD = 1000000;
+
+static bool io_uring = false;
+static bool sync_receive = false;
+
+// The benchmark is set to run 4 times with
+// the following combinations:
+//
+// a: {1, 4, 8, 16} -> This is the number of sender threads
+// b: {0, 1} -> Whether sender is blocking or non-blocking
+#define BENCH_OPTIONS \
+ MeasureProcessCPUTime() \
+ ->Unit(benchmark::kSecond) \
+ ->Iterations(1) \
+ ->Repetitions(1) \
+ ->ReportAggregatesOnly(true) \
+ ->ArgsProduct({{16}, {0}});
+
+static void SetLabel(benchmark::State& state) {
+ std::string num_senders = std::to_string(state.range(0));
+ std::string type = state.range(1) == 0 ? "non-blocking" : "blocking";
+ state.SetLabel(num_senders + "-SendThreads" + "/" + type);
+}
+
+// Receive using io_uring
+bool receiveThreaduring(int sock_recv, int num_threads, uint64_t& total_bytes_received,
+ double& average_latency) {
+ std::unique_ptr<IOUringSocketHandler> async_listener_;
+ async_listener_ = std::make_unique<IOUringSocketHandler>(sock_recv);
+ if (!async_listener_->SetupIoUring(MAX_BUFFERS)) {
+ LOG(ERROR) << "SetupIoUring failed";
+ return false;
+ }
+ async_listener_->AllocateAndRegisterBuffers(
+ MAX_BUFFERS, MESSAGE_SIZE);
+
+ if (!async_listener_->EnqueueMultishotRecvmsg()) {
+ LOG(ERROR) << "EnqueueMultishotRecvmsg failed";
+ return false;
+ }
+
+ long long received_messages = 0;
+ auto start_time = std::chrono::high_resolution_clock::now();
+ long long total_latency = 0;
+
+ while (received_messages < num_threads * NUM_MESSAGES_PER_THREAD) {
+ struct ucred* cred = nullptr;
+ void* this_recv = nullptr;
+ size_t len = 0;
+ auto receive_time = std::chrono::high_resolution_clock::now();
+ async_listener_->ReceiveData(&this_recv, len, &cred);
+ // Release the buffer from here onwards
+ {
+ auto scope_guard =
+ android::base::make_scope_guard([&async_listener_]() -> void {
+ async_listener_->ReleaseBuffer(); });
+ auto end_receive_time = std::chrono::high_resolution_clock::now();
+ total_latency += std::chrono::duration_cast<std::chrono::microseconds>(
+ end_receive_time - receive_time).count();
+
+ if (len <= 0) {
+ LOG(DEBUG) << "Received zero length for: " << received_messages;
+ continue;
+ }
+ received_messages++;
+ total_bytes_received += len;
+ }
+ }
+
+ auto end_time = std::chrono::high_resolution_clock::now();
+ average_latency = static_cast<double>(total_latency) / received_messages;
+ return true;
+}
+
+// Function for receiving thread using recvmsg()
+void receiveThread(int sock_recv, int num_threads, uint64_t& total_bytes_received,
+ double& average_latency) {
+ char recv_buffer[MESSAGE_SIZE];
+ struct ucred cred;
+
+ struct iovec iov_recv;
+ iov_recv.iov_base = recv_buffer;
+ iov_recv.iov_len = MESSAGE_SIZE;
+
+ struct msghdr msg_recv;
+ memset(&msg_recv, 0, sizeof(msg_recv));
+ msg_recv.msg_iov = &iov_recv;
+ msg_recv.msg_iovlen = 1;
+
+ char control_buffer_recv[CMSG_SPACE(sizeof(cred))];
+ memset(control_buffer_recv, 0, sizeof(control_buffer_recv));
+ msg_recv.msg_control = control_buffer_recv;
+ msg_recv.msg_controllen = sizeof(control_buffer_recv);
+
+ struct pollfd pfd;
+ pfd.fd = sock_recv;
+ pfd.events = POLLIN;
+
+ long long received_messages = 0;
+ auto start_time = std::chrono::high_resolution_clock::now();
+ long long total_latency = 0;
+
+ while (received_messages < num_threads * NUM_MESSAGES_PER_THREAD) {
+ auto receive_time = std::chrono::high_resolution_clock::now();
+ if (poll(&pfd, 1, -1) > 0) {
+ ssize_t received_bytes = recvmsg(sock_recv, &msg_recv, 0);
+ if (received_bytes < 0) {
+ perror("recvmsg failed");
+ break;
+ }
+
+ auto end_receive_time = std::chrono::high_resolution_clock::now();
+ total_latency += std::chrono::duration_cast<std::chrono::microseconds>(
+ end_receive_time - receive_time).count();
+
+ received_messages++;
+ total_bytes_received += received_bytes;
+ }
+ }
+
+ auto end_time = std::chrono::high_resolution_clock::now();
+ average_latency = static_cast<double>(total_latency) / received_messages;
+}
+
+static int CreateServerSocket(std::string& path) {
+ int sock_recv = socket(AF_UNIX, SOCK_DGRAM, 0);
+ if (sock_recv < 0) {
+ PLOG(ERROR) << "socket failed";
+ return -1;
+ }
+
+ std::string tmp_path = android::base::GetExecutableDirectory();
+ std::string socket_path = tmp_path + "/temp.sock";
+ struct sockaddr_un addr_recv;
+ memset(&addr_recv, 0, sizeof(addr_recv));
+ addr_recv.sun_family = AF_UNIX;
+ strcpy(addr_recv.sun_path, socket_path.c_str());
+
+ unlink(socket_path.c_str()); // Remove existing socket file if any
+
+ if (bind(sock_recv, (struct sockaddr*)&addr_recv, sizeof(addr_recv)) < 0) {
+ PLOG(ERROR) << "bind failed";
+ close(sock_recv);
+ return -1;
+ }
+
+ path = socket_path;
+ return sock_recv;
+}
+
+static void SocketBenchMark(benchmark::State& state, const bool io_uring) {
+ state.PauseTiming();
+ while (state.KeepRunning()) {
+ std::string socket_path;
+ int sock_recv = CreateServerSocket(socket_path);
+ if (sock_recv < 0) {
+ LOG(ERROR) << "CreateServerSocket failed";
+ return;
+ }
+
+ const size_t num_sender_threads = state.range(0);
+ const size_t blocking = state.range(1);
+ uint64_t total_bytes_received = 0;
+ double average_latency = 0;
+ state.ResumeTiming();
+ if (io_uring) {
+ receiveThreaduring(sock_recv, num_sender_threads,
+ std::ref(total_bytes_received), std::ref(average_latency));
+ } else {
+ receiveThread(sock_recv, num_sender_threads,
+ std::ref(total_bytes_received), std::ref(average_latency));
+ }
+ state.PauseTiming();
+
+ state.counters["Total_Data"] = total_bytes_received;
+ state.counters["Latency(usec)"] = average_latency;
+ state.SetBytesProcessed(total_bytes_received);
+ state.SetItemsProcessed(num_sender_threads * NUM_MESSAGES_PER_THREAD);
+
+ // Cleanup
+ close(sock_recv);
+ unlink(socket_path.c_str()); // Remove the socket file
+ }
+ SetLabel(state);
+}
+
+static void BM_ReceiveIOUring(benchmark::State& state) {
+ if (io_uring) {
+ SocketBenchMark(state, true);
+ }
+}
+BENCHMARK(BM_ReceiveIOUring)->BENCH_OPTIONS
+
+static void BM_ReceiveSync(benchmark::State& state) {
+ if (sync_receive) {
+ SocketBenchMark(state, false);
+ }
+}
+BENCHMARK(BM_ReceiveSync)->BENCH_OPTIONS
+
+int main(int argc, char** argv) {
+ android::base::InitLogging(argv, &android::base::StderrLogger);
+
+ if (argc != 2) {
+ std::cerr << "IOUringSocketHandlerReceiver {-io-uring | -sync}\n";
+ return 0;
+ }
+ if (std::string(argv[1]) == "-io-uring") {
+ io_uring = true;
+ } else if (std::string(argv[1]) == "-sync") {
+ sync_receive = true;
+ } else {
+ std::cerr << "IOUringSocketHandlerReceiver {-io-uring | -sync}\n";
+ return 0;
+ }
+
+ benchmark::Initialize(&argc, argv);
+ benchmark::RunSpecifiedBenchmarks();
+ return 0;
+}
diff --git a/src/IOUringSocketHandlerSender.cpp b/src/IOUringSocketHandlerSender.cpp
new file mode 100644
index 0000000..3e9aae6
--- /dev/null
+++ b/src/IOUringSocketHandlerSender.cpp
@@ -0,0 +1,197 @@
+#include <iostream>
+#include <cstring>
+#include <vector>
+#include <thread>
+#include <chrono>
+#include <cstring>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <poll.h>
+#include <unistd.h>
+#include <random>
+
+#include <android-base/file.h>
+#include <android-base/logging.h>
+
+#include <android-base/scopeguard.h>
+#include <IOUringSocketHandler/IOUringSocketHandler.h>
+
+#include <benchmark/benchmark.h>
+
+// Registered buffers
+#define MAX_BUFFERS 256
+
+// Threads sending 4k payload - 1 Million times
+const int MESSAGE_SIZE = 4096; // 4KB
+const int NUM_MESSAGES_PER_THREAD = 1000000;
+
+// The benchmark is set to run 4 times with
+// the following combinations:
+//
+// a: {1, 4, 8, 16} -> This is the number of sender threads
+// b: {0, 1} -> Whether sender is non-blocking or blocking
+#define BENCH_OPTIONS \
+ MeasureProcessCPUTime() \
+ ->Unit(benchmark::kSecond) \
+ ->Iterations(1) \
+ ->Repetitions(1) \
+ ->ReportAggregatesOnly(true) \
+ ->ArgsProduct({{16}, {0}});
+
+// Function to generate a random string
+std::string generateRandomString(size_t length) {
+ static const char charset[] =
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<> dis(0, sizeof(charset) - 2);
+
+ std::string str(length, 0);
+ for (size_t i = 0; i < length; ++i) {
+ str[i] = charset[dis(gen)];
+ }
+ return str;
+}
+
+static void SetLabel(benchmark::State& state) {
+ std::string num_senders = std::to_string(state.range(0));
+ std::string type = state.range(1) == 0 ? "non-blocking" : "blocking";
+ state.SetLabel(num_senders + "-SendThreads" + "/" + type);
+}
+
+// Function for sending thread
+void sendThread(int sock_send, int blocking) {
+ std::string message = generateRandomString(MESSAGE_SIZE);
+
+ struct ucred cred;
+ memset(&cred, 0, sizeof(cred));
+ cred.pid = getpid();
+ cred.uid = getuid();
+ cred.gid = getgid();
+
+ struct iovec iov_send;
+ iov_send.iov_base = const_cast<char*>(message.data());
+ iov_send.iov_len = MESSAGE_SIZE;
+
+ struct msghdr msg_send;
+ memset(&msg_send, 0, sizeof(msg_send));
+ msg_send.msg_iov = &iov_send;
+ msg_send.msg_iovlen = 1;
+
+ char control_buffer_send[CMSG_SPACE(sizeof(cred))];
+ memset(control_buffer_send, 0, sizeof(control_buffer_send));
+ msg_send.msg_control = control_buffer_send;
+ msg_send.msg_controllen = sizeof(control_buffer_send);
+
+ struct cmsghdr* cmsg_send = CMSG_FIRSTHDR(&msg_send);
+ cmsg_send->cmsg_level = SOL_SOCKET;
+ cmsg_send->cmsg_type = SCM_CREDENTIALS;
+ cmsg_send->cmsg_len = CMSG_LEN(sizeof(cred));
+ memcpy(CMSG_DATA(cmsg_send), &cred, sizeof(cred));
+
+ int flags = 0;
+ if (!blocking) {
+ flags = MSG_DONTWAIT;
+ }
+ for (int i = 0; i < NUM_MESSAGES_PER_THREAD; ++i) {
+ ssize_t sent_bytes;
+ while (true) {
+ sent_bytes = sendmsg(sock_send, &msg_send, flags);
+ if (sent_bytes >= 0) {
+ break; // Success
+ }
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ // Try again
+ continue;
+ } else {
+ perror("sendmsg failed");
+ return;
+ }
+ }
+ }
+ LOG(DEBUG) << "sendThread exiting";
+}
+
+static void SocketBenchMark(benchmark::State& state, const bool) {
+ state.PauseTiming();
+ while (state.KeepRunning()) {
+ std::string tmp_path = android::base::GetExecutableDirectory();
+ std::string socket_path = tmp_path + "/temp.sock";
+
+ const size_t num_sender_threads = state.range(0);
+ const size_t blocking = state.range(1);
+ std::vector<int> sender_sockets(num_sender_threads);
+ // Sender socket setup (for each thread)
+ for (int i = 0; i < num_sender_threads; ++i) {
+ int sock_send = socket(AF_UNIX, SOCK_DGRAM, 0);
+ if (sock_send < 0) {
+ perror("socket failed");
+ return;
+ }
+
+ if (!blocking) {
+ // Set non-blocking for the sender socket
+ int flags = fcntl(sock_send, F_GETFL, 0);
+ if (flags == -1) {
+ perror("fcntl F_GETFL failed");
+ close(sock_send);
+ for (int j = 0; j < i; ++j) { // Close previously opened sockets
+ close(sender_sockets[j]);
+ }
+ return;
+ }
+ if (fcntl(sock_send, F_SETFL, flags | O_NONBLOCK) == -1) {
+ perror("fcntl F_SETFL failed");
+ close(sock_send);
+ for (int j = 0; j < i; ++j) { // Close previously opened sockets
+ close(sender_sockets[j]);
+ }
+ return;
+ }
+ }
+
+ struct sockaddr_un addr_send;
+ memset(&addr_send, 0, sizeof(addr_send));
+ addr_send.sun_family = AF_UNIX;
+ strcpy(addr_send.sun_path, socket_path.c_str()); // Connect to the receiver
+
+ if (connect(sock_send, (struct sockaddr*)&addr_send, sizeof(addr_send)) < 0) {
+ perror("connect failed");
+ close(sock_send);
+ for (int j = 0; j < i; ++j) { // Close previously opened sockets
+ close(sender_sockets[j]);
+ }
+ return;
+ }
+
+ sender_sockets[i] = sock_send;
+ }
+
+ std::vector<std::thread> send_threads;
+ for (int i = 0; i < num_sender_threads; ++i) {
+ send_threads.emplace_back(sendThread, sender_sockets[i], blocking);
+ }
+ for (auto& thread : send_threads) {
+ thread.join();
+ }
+
+ for (int sock : sender_sockets) {
+ close(sock);
+ }
+ LOG(INFO) << "Sending data complete";
+ }
+ SetLabel(state);
+}
+
+static void BM_Sender(benchmark::State& state) {
+ SocketBenchMark(state, true);
+}
+BENCHMARK(BM_Sender)->BENCH_OPTIONS
+
+int main(int argc, char** argv) {
+ android::base::InitLogging(argv, &android::base::StderrLogger);
+ benchmark::Initialize(&argc, argv);
+ benchmark::RunSpecifiedBenchmarks();
+ return 0;
+}