liburingutils: Benchmark to compare recvmsg+poll with liburingutils

Default there are 16 sender threads.
Each sender thread sends 1 Million messages. Each message payload is
4096 bytes.

On the receiver side, there is one receiver thread.

===================================
With io_uring:

Start server:

simpleperf stat -e cpu-cycles taskset -a f0 ./IOUringSocketHandlerReceiver -io-uring

Start sender:

./IOUringSocketHandlerSender

=======================================

With recvmsg:

Start server:

simpleperf stat -e cpu-cycles taskset -a f0 ./IOUringSocketHandlerReceiver -sync

Start sender:

./IOUringSocketHandlerSender

=========================================

Bug: 406299670
Test: On Pixel 8 Pro (Husky)
Change-Id: I3b4ca100b6b0a3b459b86a3f7b25957440d8ae84
Signed-off-by: Akilesh Kailash <akailash@google.com>
diff --git a/Android.bp b/Android.bp
index d1fe021..d1d3623 100644
--- a/Android.bp
+++ b/Android.bp
@@ -81,3 +81,37 @@
         "-Werror",
     ],
 }
+
+cc_benchmark {
+    name: "IOUringSocketHandlerReceiver",
+    srcs: ["src/IOUringSocketHandlerReceiver.cpp"],
+    shared_libs: [
+        "libbase",
+        "liblog",
+    ],
+    static_libs: [
+        "liburing",
+        "liburingutils",
+    ],
+    cflags: [
+        "-Wall",
+        "-Werror",
+    ],
+}
+
+cc_benchmark {
+    name: "IOUringSocketHandlerSender",
+    srcs: ["src/IOUringSocketHandlerSender.cpp"],
+    shared_libs: [
+        "libbase",
+        "liblog",
+    ],
+    static_libs: [
+        "liburing",
+        "liburingutils",
+    ],
+    cflags: [
+        "-Wall",
+        "-Werror",
+    ],
+}
diff --git a/src/IOUringSocketHandlerReceiver.cpp b/src/IOUringSocketHandlerReceiver.cpp
new file mode 100644
index 0000000..dd4763f
--- /dev/null
+++ b/src/IOUringSocketHandlerReceiver.cpp
@@ -0,0 +1,246 @@
+#include <iostream>
+#include <cstring>
+#include <vector>
+#include <thread>
+#include <chrono>
+#include <cstring>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <poll.h>
+#include <unistd.h>
+#include <random>
+
+#include <android-base/file.h>
+#include <android-base/logging.h>
+
+#include <android-base/scopeguard.h>
+#include <IOUringSocketHandler/IOUringSocketHandler.h>
+
+#include <benchmark/benchmark.h>
+
+// Registered buffers
+#define MAX_BUFFERS 256
+
+// Threads sending 4k payload - 1 Million times
+const int MESSAGE_SIZE = 4096; // 4KB
+const int NUM_MESSAGES_PER_THREAD = 1000000;
+
+static bool io_uring = false;
+static bool sync_receive = false;
+
+// The benchmark is set to run 4 times with
+// the following combinations:
+//
+// a: {1, 4, 8, 16} -> This is the number of sender threads
+// b: {0, 1} -> Whether sender is blocking or non-blocking
+#define BENCH_OPTIONS                 \
+  MeasureProcessCPUTime()             \
+      ->Unit(benchmark::kSecond) \
+      ->Iterations(1)                \
+      ->Repetitions(1)                \
+      ->ReportAggregatesOnly(true) \
+      ->ArgsProduct({{16}, {0}});
+
+static void SetLabel(benchmark::State& state) {
+    std::string num_senders = std::to_string(state.range(0));
+    std::string type = state.range(1) == 0 ? "non-blocking" : "blocking";
+    state.SetLabel(num_senders + "-SendThreads" + "/" + type);
+}
+
+// Receive using io_uring
+bool receiveThreaduring(int sock_recv, int num_threads, uint64_t& total_bytes_received,
+                        double& average_latency) {
+    std::unique_ptr<IOUringSocketHandler> async_listener_;
+    async_listener_ = std::make_unique<IOUringSocketHandler>(sock_recv);
+    if (!async_listener_->SetupIoUring(MAX_BUFFERS)) {
+        LOG(ERROR) << "SetupIoUring failed";
+        return false;
+    }
+    async_listener_->AllocateAndRegisterBuffers(
+        MAX_BUFFERS, MESSAGE_SIZE);
+
+    if (!async_listener_->EnqueueMultishotRecvmsg()) {
+        LOG(ERROR) << "EnqueueMultishotRecvmsg failed";
+        return false;
+    }
+
+    long long received_messages = 0;
+    auto start_time = std::chrono::high_resolution_clock::now();
+    long long total_latency = 0;
+
+    while (received_messages < num_threads * NUM_MESSAGES_PER_THREAD) {
+        struct ucred* cred = nullptr;
+        void* this_recv = nullptr;
+        size_t len = 0;
+        auto receive_time = std::chrono::high_resolution_clock::now();
+        async_listener_->ReceiveData(&this_recv, len, &cred);
+        // Release the buffer from here onwards
+        {
+            auto scope_guard =
+                android::base::make_scope_guard([&async_listener_]() -> void {
+                  async_listener_->ReleaseBuffer(); });
+            auto end_receive_time = std::chrono::high_resolution_clock::now();
+            total_latency += std::chrono::duration_cast<std::chrono::microseconds>(
+                              end_receive_time - receive_time).count();
+
+            if (len <= 0) {
+                LOG(DEBUG) << "Received zero length for: " << received_messages;
+                continue;
+            }
+            received_messages++;
+            total_bytes_received += len;
+        }
+    }
+
+    auto end_time = std::chrono::high_resolution_clock::now();
+    average_latency = static_cast<double>(total_latency) / received_messages;
+    return true;
+}
+
+// Function for receiving thread using recvmsg()
+void receiveThread(int sock_recv, int num_threads, uint64_t& total_bytes_received,
+                   double& average_latency) {
+    char recv_buffer[MESSAGE_SIZE];
+    struct ucred cred;
+
+    struct iovec iov_recv;
+    iov_recv.iov_base = recv_buffer;
+    iov_recv.iov_len = MESSAGE_SIZE;
+
+    struct msghdr msg_recv;
+    memset(&msg_recv, 0, sizeof(msg_recv));
+    msg_recv.msg_iov = &iov_recv;
+    msg_recv.msg_iovlen = 1;
+
+    char control_buffer_recv[CMSG_SPACE(sizeof(cred))];
+    memset(control_buffer_recv, 0, sizeof(control_buffer_recv));
+    msg_recv.msg_control = control_buffer_recv;
+    msg_recv.msg_controllen = sizeof(control_buffer_recv);
+
+    struct pollfd pfd;
+    pfd.fd = sock_recv;
+    pfd.events = POLLIN;
+
+    long long received_messages = 0;
+    auto start_time = std::chrono::high_resolution_clock::now();
+    long long total_latency = 0;
+
+    while (received_messages < num_threads * NUM_MESSAGES_PER_THREAD) {
+        auto receive_time = std::chrono::high_resolution_clock::now();
+        if (poll(&pfd, 1, -1) > 0) {
+            ssize_t received_bytes = recvmsg(sock_recv, &msg_recv, 0);
+            if (received_bytes < 0) {
+                perror("recvmsg failed");
+                break;
+            }
+
+            auto end_receive_time = std::chrono::high_resolution_clock::now();
+            total_latency += std::chrono::duration_cast<std::chrono::microseconds>(
+                              end_receive_time - receive_time).count();
+
+            received_messages++;
+            total_bytes_received += received_bytes;
+        }
+    }
+
+    auto end_time = std::chrono::high_resolution_clock::now();
+    average_latency = static_cast<double>(total_latency) / received_messages;
+}
+
+static int CreateServerSocket(std::string& path) {
+    int sock_recv = socket(AF_UNIX, SOCK_DGRAM, 0);
+    if (sock_recv < 0) {
+        PLOG(ERROR) << "socket failed";
+        return -1;
+    }
+
+    std::string tmp_path = android::base::GetExecutableDirectory();
+    std::string socket_path = tmp_path + "/temp.sock";
+    struct sockaddr_un addr_recv;
+    memset(&addr_recv, 0, sizeof(addr_recv));
+    addr_recv.sun_family = AF_UNIX;
+    strcpy(addr_recv.sun_path, socket_path.c_str());
+
+    unlink(socket_path.c_str()); // Remove existing socket file if any
+
+    if (bind(sock_recv, (struct sockaddr*)&addr_recv, sizeof(addr_recv)) < 0) {
+        PLOG(ERROR) << "bind failed";
+        close(sock_recv);
+        return -1;
+    }
+
+    path = socket_path;
+    return sock_recv;
+}
+
+static void SocketBenchMark(benchmark::State& state, const bool io_uring) {
+  state.PauseTiming();
+  while (state.KeepRunning()) {
+    std::string socket_path;
+    int sock_recv = CreateServerSocket(socket_path);
+    if (sock_recv < 0) {
+        LOG(ERROR) << "CreateServerSocket failed";
+        return;
+    }
+
+    const size_t num_sender_threads = state.range(0);
+    const size_t blocking = state.range(1);
+    uint64_t total_bytes_received = 0;
+    double average_latency = 0;
+    state.ResumeTiming();
+    if (io_uring) {
+        receiveThreaduring(sock_recv, num_sender_threads,
+                           std::ref(total_bytes_received), std::ref(average_latency));
+    } else {
+        receiveThread(sock_recv, num_sender_threads,
+                      std::ref(total_bytes_received), std::ref(average_latency));
+    }
+    state.PauseTiming();
+
+    state.counters["Total_Data"] = total_bytes_received;
+    state.counters["Latency(usec)"] = average_latency;
+    state.SetBytesProcessed(total_bytes_received);
+    state.SetItemsProcessed(num_sender_threads * NUM_MESSAGES_PER_THREAD);
+
+    // Cleanup
+    close(sock_recv);
+    unlink(socket_path.c_str()); // Remove the socket file
+  }
+  SetLabel(state);
+}
+
+static void BM_ReceiveIOUring(benchmark::State& state) {
+    if (io_uring) {
+        SocketBenchMark(state, true);
+    }
+}
+BENCHMARK(BM_ReceiveIOUring)->BENCH_OPTIONS
+
+static void BM_ReceiveSync(benchmark::State& state) {
+    if (sync_receive) {
+        SocketBenchMark(state, false);
+    }
+}
+BENCHMARK(BM_ReceiveSync)->BENCH_OPTIONS
+
+int main(int argc, char** argv) {
+    android::base::InitLogging(argv, &android::base::StderrLogger);
+
+    if (argc != 2) {
+        std::cerr << "IOUringSocketHandlerReceiver {-io-uring | -sync}\n";
+        return 0;
+    }
+    if (std::string(argv[1]) == "-io-uring") {
+        io_uring = true;
+    } else if (std::string(argv[1]) == "-sync") {
+        sync_receive = true;
+    } else {
+        std::cerr << "IOUringSocketHandlerReceiver {-io-uring | -sync}\n";
+        return 0;
+    }
+
+    benchmark::Initialize(&argc, argv);
+    benchmark::RunSpecifiedBenchmarks();
+    return 0;
+}
diff --git a/src/IOUringSocketHandlerSender.cpp b/src/IOUringSocketHandlerSender.cpp
new file mode 100644
index 0000000..3e9aae6
--- /dev/null
+++ b/src/IOUringSocketHandlerSender.cpp
@@ -0,0 +1,197 @@
+#include <iostream>
+#include <cstring>
+#include <vector>
+#include <thread>
+#include <chrono>
+#include <cstring>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <poll.h>
+#include <unistd.h>
+#include <random>
+
+#include <android-base/file.h>
+#include <android-base/logging.h>
+
+#include <android-base/scopeguard.h>
+#include <IOUringSocketHandler/IOUringSocketHandler.h>
+
+#include <benchmark/benchmark.h>
+
+// Registered buffers
+#define MAX_BUFFERS 256
+
+// Threads sending 4k payload - 1 Million times
+const int MESSAGE_SIZE = 4096; // 4KB
+const int NUM_MESSAGES_PER_THREAD = 1000000;
+
+// The benchmark is set to run 4 times with
+// the following combinations:
+//
+// a: {1, 4, 8, 16} -> This is the number of sender threads
+// b: {0, 1} -> Whether sender is non-blocking or blocking
+#define BENCH_OPTIONS                 \
+  MeasureProcessCPUTime()             \
+      ->Unit(benchmark::kSecond) \
+      ->Iterations(1)                \
+      ->Repetitions(1)                \
+      ->ReportAggregatesOnly(true) \
+      ->ArgsProduct({{16}, {0}});
+
+// Function to generate a random string
+std::string generateRandomString(size_t length) {
+    static const char charset[] =
+        "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+    std::random_device rd;
+    std::mt19937 gen(rd());
+    std::uniform_int_distribution<> dis(0, sizeof(charset) - 2);
+
+    std::string str(length, 0);
+    for (size_t i = 0; i < length; ++i) {
+        str[i] = charset[dis(gen)];
+    }
+    return str;
+}
+
+static void SetLabel(benchmark::State& state) {
+    std::string num_senders = std::to_string(state.range(0));
+    std::string type = state.range(1) == 0 ? "non-blocking" : "blocking";
+    state.SetLabel(num_senders + "-SendThreads" + "/" + type);
+}
+
+// Function for sending thread
+void sendThread(int sock_send, int blocking) {
+    std::string message = generateRandomString(MESSAGE_SIZE);
+
+    struct ucred cred;
+    memset(&cred, 0, sizeof(cred));
+    cred.pid = getpid();
+    cred.uid = getuid();
+    cred.gid = getgid();
+
+    struct iovec iov_send;
+    iov_send.iov_base = const_cast<char*>(message.data());
+    iov_send.iov_len = MESSAGE_SIZE;
+
+    struct msghdr msg_send;
+    memset(&msg_send, 0, sizeof(msg_send));
+    msg_send.msg_iov = &iov_send;
+    msg_send.msg_iovlen = 1;
+
+    char control_buffer_send[CMSG_SPACE(sizeof(cred))];
+    memset(control_buffer_send, 0, sizeof(control_buffer_send));
+    msg_send.msg_control = control_buffer_send;
+    msg_send.msg_controllen = sizeof(control_buffer_send);
+
+    struct cmsghdr* cmsg_send = CMSG_FIRSTHDR(&msg_send);
+    cmsg_send->cmsg_level = SOL_SOCKET;
+    cmsg_send->cmsg_type = SCM_CREDENTIALS;
+    cmsg_send->cmsg_len = CMSG_LEN(sizeof(cred));
+    memcpy(CMSG_DATA(cmsg_send), &cred, sizeof(cred));
+
+    int flags = 0;
+    if (!blocking) {
+        flags = MSG_DONTWAIT;
+    }
+    for (int i = 0; i < NUM_MESSAGES_PER_THREAD; ++i) {
+        ssize_t sent_bytes;
+        while (true) {
+            sent_bytes = sendmsg(sock_send, &msg_send, flags);
+            if (sent_bytes >= 0) {
+                break; // Success
+            }
+            if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                // Try again
+                continue;
+            } else {
+                perror("sendmsg failed");
+                return;
+            }
+        }
+    }
+    LOG(DEBUG) << "sendThread exiting";
+}
+
+static void SocketBenchMark(benchmark::State& state, const bool) {
+  state.PauseTiming();
+  while (state.KeepRunning()) {
+    std::string tmp_path = android::base::GetExecutableDirectory();
+    std::string socket_path = tmp_path + "/temp.sock";
+
+    const size_t num_sender_threads = state.range(0);
+    const size_t blocking = state.range(1);
+    std::vector<int> sender_sockets(num_sender_threads);
+    // Sender socket setup (for each thread)
+    for (int i = 0; i < num_sender_threads; ++i) {
+        int sock_send = socket(AF_UNIX, SOCK_DGRAM, 0);
+        if (sock_send < 0) {
+            perror("socket failed");
+            return;
+        }
+
+        if (!blocking) {
+          // Set non-blocking for the sender socket
+          int flags = fcntl(sock_send, F_GETFL, 0);
+          if (flags == -1) {
+            perror("fcntl F_GETFL failed");
+            close(sock_send);
+            for (int j = 0; j < i; ++j) { // Close previously opened sockets
+              close(sender_sockets[j]);
+            }
+            return;
+          }
+          if (fcntl(sock_send, F_SETFL, flags | O_NONBLOCK) == -1) {
+            perror("fcntl F_SETFL failed");
+            close(sock_send);
+            for (int j = 0; j < i; ++j) { // Close previously opened sockets
+              close(sender_sockets[j]);
+            }
+            return;
+          }
+        }
+
+        struct sockaddr_un addr_send;
+        memset(&addr_send, 0, sizeof(addr_send));
+        addr_send.sun_family = AF_UNIX;
+        strcpy(addr_send.sun_path, socket_path.c_str()); // Connect to the receiver
+
+        if (connect(sock_send, (struct sockaddr*)&addr_send, sizeof(addr_send)) < 0) {
+            perror("connect failed");
+            close(sock_send);
+            for (int j = 0; j < i; ++j) { // Close previously opened sockets
+                close(sender_sockets[j]);
+            }
+            return;
+        }
+
+        sender_sockets[i] = sock_send;
+    }
+
+    std::vector<std::thread> send_threads;
+    for (int i = 0; i < num_sender_threads; ++i) {
+        send_threads.emplace_back(sendThread, sender_sockets[i], blocking);
+    }
+    for (auto& thread : send_threads) {
+        thread.join();
+    }
+
+    for (int sock : sender_sockets) {
+        close(sock);
+    }
+    LOG(INFO) << "Sending data complete";
+  }
+  SetLabel(state);
+}
+
+static void BM_Sender(benchmark::State& state) {
+    SocketBenchMark(state, true);
+}
+BENCHMARK(BM_Sender)->BENCH_OPTIONS
+
+int main(int argc, char** argv) {
+    android::base::InitLogging(argv, &android::base::StderrLogger);
+    benchmark::Initialize(&argc, argv);
+    benchmark::RunSpecifiedBenchmarks();
+    return 0;
+}