| #include <iostream> |
| #include <cstring> |
| #include <vector> |
| #include <thread> |
| #include <chrono> |
| #include <cstring> |
| #include <unistd.h> |
| #include <sys/socket.h> |
| #include <sys/un.h> |
| #include <poll.h> |
| #include <unistd.h> |
| #include <random> |
| |
| #include <android-base/file.h> |
| #include <android-base/logging.h> |
| |
| #include <android-base/scopeguard.h> |
| #include <IOUringSocketHandler/IOUringSocketHandler.h> |
| |
| #include <benchmark/benchmark.h> |
| |
| // Registered buffers |
| #define MAX_BUFFERS 256 |
| |
| // Threads sending 4k payload - 1 Million times |
| const int MESSAGE_SIZE = 4096; // 4KB |
| const int NUM_MESSAGES_PER_THREAD = 1000000; |
| |
| static bool io_uring = false; |
| static bool sync_receive = false; |
| |
| // The benchmark is set to run 4 times with |
| // the following combinations: |
| // |
| // a: {1, 4, 8, 16} -> This is the number of sender threads |
| // b: {0, 1} -> Whether sender is blocking or non-blocking |
| #define BENCH_OPTIONS \ |
| MeasureProcessCPUTime() \ |
| ->Unit(benchmark::kSecond) \ |
| ->Iterations(1) \ |
| ->Repetitions(1) \ |
| ->ReportAggregatesOnly(true) \ |
| ->ArgsProduct({{16}, {0}}); |
| |
| static void SetLabel(benchmark::State& state) { |
| std::string num_senders = std::to_string(state.range(0)); |
| std::string type = state.range(1) == 0 ? "non-blocking" : "blocking"; |
| state.SetLabel(num_senders + "-SendThreads" + "/" + type); |
| } |
| |
| // Receive using io_uring |
| bool receiveThreaduring(int sock_recv, int num_threads, uint64_t& total_bytes_received, |
| double& average_latency) { |
| std::unique_ptr<IOUringSocketHandler> async_listener_; |
| async_listener_ = std::make_unique<IOUringSocketHandler>(sock_recv); |
| if (!async_listener_->SetupIoUring(MAX_BUFFERS)) { |
| LOG(ERROR) << "SetupIoUring failed"; |
| return false; |
| } |
| async_listener_->AllocateAndRegisterBuffers( |
| MAX_BUFFERS, MESSAGE_SIZE); |
| |
| if (!async_listener_->EnqueueMultishotRecvmsg()) { |
| LOG(ERROR) << "EnqueueMultishotRecvmsg failed"; |
| return false; |
| } |
| |
| long long received_messages = 0; |
| long long total_latency = 0; |
| |
| while (received_messages < num_threads * NUM_MESSAGES_PER_THREAD) { |
| struct ucred* cred = nullptr; |
| void* this_recv = nullptr; |
| size_t len = 0; |
| auto receive_time = std::chrono::high_resolution_clock::now(); |
| async_listener_->ReceiveData(&this_recv, len, &cred); |
| // Release the buffer from here onwards |
| { |
| auto scope_guard = |
| android::base::make_scope_guard([&async_listener_]() -> void { |
| async_listener_->ReleaseBuffer(); }); |
| auto end_receive_time = std::chrono::high_resolution_clock::now(); |
| total_latency += std::chrono::duration_cast<std::chrono::microseconds>( |
| end_receive_time - receive_time).count(); |
| |
| if (len <= 0) { |
| LOG(DEBUG) << "Received zero length for: " << received_messages; |
| continue; |
| } |
| received_messages++; |
| total_bytes_received += len; |
| } |
| } |
| |
| average_latency = static_cast<double>(total_latency) / received_messages; |
| return true; |
| } |
| |
| // Function for receiving thread using recvmsg() |
| void receiveThread(int sock_recv, int num_threads, uint64_t& total_bytes_received, |
| double& average_latency) { |
| char recv_buffer[MESSAGE_SIZE]; |
| struct ucred cred; |
| |
| struct iovec iov_recv; |
| iov_recv.iov_base = recv_buffer; |
| iov_recv.iov_len = MESSAGE_SIZE; |
| |
| struct msghdr msg_recv; |
| memset(&msg_recv, 0, sizeof(msg_recv)); |
| msg_recv.msg_iov = &iov_recv; |
| msg_recv.msg_iovlen = 1; |
| |
| char control_buffer_recv[CMSG_SPACE(sizeof(cred))]; |
| memset(control_buffer_recv, 0, sizeof(control_buffer_recv)); |
| msg_recv.msg_control = control_buffer_recv; |
| msg_recv.msg_controllen = sizeof(control_buffer_recv); |
| |
| struct pollfd pfd; |
| pfd.fd = sock_recv; |
| pfd.events = POLLIN; |
| |
| long long received_messages = 0; |
| long long total_latency = 0; |
| |
| while (received_messages < num_threads * NUM_MESSAGES_PER_THREAD) { |
| auto receive_time = std::chrono::high_resolution_clock::now(); |
| if (poll(&pfd, 1, -1) > 0) { |
| ssize_t received_bytes = recvmsg(sock_recv, &msg_recv, 0); |
| if (received_bytes < 0) { |
| perror("recvmsg failed"); |
| break; |
| } |
| |
| auto end_receive_time = std::chrono::high_resolution_clock::now(); |
| total_latency += std::chrono::duration_cast<std::chrono::microseconds>( |
| end_receive_time - receive_time).count(); |
| |
| received_messages++; |
| total_bytes_received += received_bytes; |
| } |
| } |
| |
| average_latency = static_cast<double>(total_latency) / received_messages; |
| } |
| |
| static int CreateServerSocket(std::string& path) { |
| int sock_recv = socket(AF_UNIX, SOCK_DGRAM, 0); |
| if (sock_recv < 0) { |
| PLOG(ERROR) << "socket failed"; |
| return -1; |
| } |
| |
| std::string tmp_path = android::base::GetExecutableDirectory(); |
| std::string socket_path = tmp_path + "/temp.sock"; |
| struct sockaddr_un addr_recv; |
| memset(&addr_recv, 0, sizeof(addr_recv)); |
| addr_recv.sun_family = AF_UNIX; |
| strcpy(addr_recv.sun_path, socket_path.c_str()); |
| |
| unlink(socket_path.c_str()); // Remove existing socket file if any |
| |
| if (bind(sock_recv, (struct sockaddr*)&addr_recv, sizeof(addr_recv)) < 0) { |
| PLOG(ERROR) << "bind failed"; |
| close(sock_recv); |
| return -1; |
| } |
| |
| path = socket_path; |
| return sock_recv; |
| } |
| |
| static void SocketBenchMark(benchmark::State& state, const bool io_uring) { |
| state.PauseTiming(); |
| while (state.KeepRunning()) { |
| std::string socket_path; |
| int sock_recv = CreateServerSocket(socket_path); |
| if (sock_recv < 0) { |
| LOG(ERROR) << "CreateServerSocket failed"; |
| return; |
| } |
| |
| const size_t num_sender_threads = state.range(0); |
| uint64_t total_bytes_received = 0; |
| double average_latency = 0; |
| state.ResumeTiming(); |
| if (io_uring) { |
| receiveThreaduring(sock_recv, num_sender_threads, |
| std::ref(total_bytes_received), std::ref(average_latency)); |
| } else { |
| receiveThread(sock_recv, num_sender_threads, |
| std::ref(total_bytes_received), std::ref(average_latency)); |
| } |
| state.PauseTiming(); |
| |
| state.counters["Total_Data"] = total_bytes_received; |
| state.counters["Latency(usec)"] = average_latency; |
| state.SetBytesProcessed(total_bytes_received); |
| state.SetItemsProcessed(num_sender_threads * NUM_MESSAGES_PER_THREAD); |
| |
| // Cleanup |
| close(sock_recv); |
| unlink(socket_path.c_str()); // Remove the socket file |
| } |
| SetLabel(state); |
| } |
| |
| static void BM_ReceiveIOUring(benchmark::State& state) { |
| if (io_uring) { |
| SocketBenchMark(state, true); |
| } |
| } |
| BENCHMARK(BM_ReceiveIOUring)->BENCH_OPTIONS |
| |
| static void BM_ReceiveSync(benchmark::State& state) { |
| if (sync_receive) { |
| SocketBenchMark(state, false); |
| } |
| } |
| BENCHMARK(BM_ReceiveSync)->BENCH_OPTIONS |
| |
| int main(int argc, char** argv) { |
| android::base::InitLogging(argv, &android::base::StderrLogger); |
| |
| if (argc != 2) { |
| std::cerr << "IOUringSocketHandlerReceiver {-io-uring | -sync}\n"; |
| return 0; |
| } |
| if (std::string(argv[1]) == "-io-uring") { |
| io_uring = true; |
| } else if (std::string(argv[1]) == "-sync") { |
| sync_receive = true; |
| } else { |
| std::cerr << "IOUringSocketHandlerReceiver {-io-uring | -sync}\n"; |
| return 0; |
| } |
| |
| benchmark::Initialize(&argc, argv); |
| benchmark::RunSpecifiedBenchmarks(); |
| return 0; |
| } |