| #include <iostream> |
| #include <cstring> |
| #include <vector> |
| #include <thread> |
| #include <chrono> |
| #include <cstring> |
| #include <unistd.h> |
| #include <sys/socket.h> |
| #include <sys/un.h> |
| #include <poll.h> |
| #include <unistd.h> |
| #include <random> |
| |
| #include <android-base/file.h> |
| #include <android-base/logging.h> |
| |
| #include <android-base/scopeguard.h> |
| #include <IOUringSocketHandler/IOUringSocketHandler.h> |
| |
| #include <benchmark/benchmark.h> |
| |
| // Registered buffers |
| #define MAX_BUFFERS 256 |
| |
| // Threads sending 4k payload - 1 Million times |
| const int MESSAGE_SIZE = 4096; // 4KB |
| const int NUM_MESSAGES_PER_THREAD = 1000000; |
| |
| // The benchmark is set to run 4 times with |
| // the following combinations: |
| // |
| // a: {1, 4, 8, 16} -> This is the number of sender threads |
| // b: {0, 1} -> Whether sender is non-blocking or blocking |
| #define BENCH_OPTIONS \ |
| MeasureProcessCPUTime() \ |
| ->Unit(benchmark::kSecond) \ |
| ->Iterations(1) \ |
| ->Repetitions(1) \ |
| ->ReportAggregatesOnly(true) \ |
| ->ArgsProduct({{16}, {0}}); |
| |
| // Function to generate a random string |
| std::string generateRandomString(size_t length) { |
| static const char charset[] = |
| "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; |
| std::random_device rd; |
| std::mt19937 gen(rd()); |
| std::uniform_int_distribution<> dis(0, sizeof(charset) - 2); |
| |
| std::string str(length, 0); |
| for (size_t i = 0; i < length; ++i) { |
| str[i] = charset[dis(gen)]; |
| } |
| return str; |
| } |
| |
| static void SetLabel(benchmark::State& state) { |
| std::string num_senders = std::to_string(state.range(0)); |
| std::string type = state.range(1) == 0 ? "non-blocking" : "blocking"; |
| state.SetLabel(num_senders + "-SendThreads" + "/" + type); |
| } |
| |
| // Function for sending thread |
| void sendThread(int sock_send, int blocking) { |
| std::string message = generateRandomString(MESSAGE_SIZE); |
| |
| struct ucred cred; |
| memset(&cred, 0, sizeof(cred)); |
| cred.pid = getpid(); |
| cred.uid = getuid(); |
| cred.gid = getgid(); |
| |
| struct iovec iov_send; |
| iov_send.iov_base = const_cast<char*>(message.data()); |
| iov_send.iov_len = MESSAGE_SIZE; |
| |
| struct msghdr msg_send; |
| memset(&msg_send, 0, sizeof(msg_send)); |
| msg_send.msg_iov = &iov_send; |
| msg_send.msg_iovlen = 1; |
| |
| char control_buffer_send[CMSG_SPACE(sizeof(cred))]; |
| memset(control_buffer_send, 0, sizeof(control_buffer_send)); |
| msg_send.msg_control = control_buffer_send; |
| msg_send.msg_controllen = sizeof(control_buffer_send); |
| |
| struct cmsghdr* cmsg_send = CMSG_FIRSTHDR(&msg_send); |
| cmsg_send->cmsg_level = SOL_SOCKET; |
| cmsg_send->cmsg_type = SCM_CREDENTIALS; |
| cmsg_send->cmsg_len = CMSG_LEN(sizeof(cred)); |
| memcpy(CMSG_DATA(cmsg_send), &cred, sizeof(cred)); |
| |
| int flags = 0; |
| if (!blocking) { |
| flags = MSG_DONTWAIT; |
| } |
| for (int i = 0; i < NUM_MESSAGES_PER_THREAD; ++i) { |
| ssize_t sent_bytes; |
| while (true) { |
| sent_bytes = sendmsg(sock_send, &msg_send, flags); |
| if (sent_bytes >= 0) { |
| break; // Success |
| } |
| if (errno == EAGAIN || errno == EWOULDBLOCK) { |
| // Try again |
| continue; |
| } else { |
| perror("sendmsg failed"); |
| return; |
| } |
| } |
| } |
| LOG(DEBUG) << "sendThread exiting"; |
| } |
| |
| static void SocketBenchMark(benchmark::State& state, const bool) { |
| state.PauseTiming(); |
| while (state.KeepRunning()) { |
| std::string tmp_path = android::base::GetExecutableDirectory(); |
| std::string socket_path = tmp_path + "/temp.sock"; |
| |
| const size_t num_sender_threads = state.range(0); |
| const size_t blocking = state.range(1); |
| std::vector<int> sender_sockets(num_sender_threads); |
| // Sender socket setup (for each thread) |
| for (int i = 0; i < num_sender_threads; ++i) { |
| int sock_send = socket(AF_UNIX, SOCK_DGRAM, 0); |
| if (sock_send < 0) { |
| perror("socket failed"); |
| return; |
| } |
| |
| if (!blocking) { |
| // Set non-blocking for the sender socket |
| int flags = fcntl(sock_send, F_GETFL, 0); |
| if (flags == -1) { |
| perror("fcntl F_GETFL failed"); |
| close(sock_send); |
| for (int j = 0; j < i; ++j) { // Close previously opened sockets |
| close(sender_sockets[j]); |
| } |
| return; |
| } |
| if (fcntl(sock_send, F_SETFL, flags | O_NONBLOCK) == -1) { |
| perror("fcntl F_SETFL failed"); |
| close(sock_send); |
| for (int j = 0; j < i; ++j) { // Close previously opened sockets |
| close(sender_sockets[j]); |
| } |
| return; |
| } |
| } |
| |
| struct sockaddr_un addr_send; |
| memset(&addr_send, 0, sizeof(addr_send)); |
| addr_send.sun_family = AF_UNIX; |
| strcpy(addr_send.sun_path, socket_path.c_str()); // Connect to the receiver |
| |
| if (connect(sock_send, (struct sockaddr*)&addr_send, sizeof(addr_send)) < 0) { |
| perror("connect failed"); |
| close(sock_send); |
| for (int j = 0; j < i; ++j) { // Close previously opened sockets |
| close(sender_sockets[j]); |
| } |
| return; |
| } |
| |
| sender_sockets[i] = sock_send; |
| } |
| |
| std::vector<std::thread> send_threads; |
| for (int i = 0; i < num_sender_threads; ++i) { |
| send_threads.emplace_back(sendThread, sender_sockets[i], blocking); |
| } |
| for (auto& thread : send_threads) { |
| thread.join(); |
| } |
| |
| for (int sock : sender_sockets) { |
| close(sock); |
| } |
| LOG(INFO) << "Sending data complete"; |
| } |
| SetLabel(state); |
| } |
| |
| static void BM_Sender(benchmark::State& state) { |
| SocketBenchMark(state, true); |
| } |
| BENCHMARK(BM_Sender)->BENCH_OPTIONS |
| |
| int main(int argc, char** argv) { |
| android::base::InitLogging(argv, &android::base::StderrLogger); |
| benchmark::Initialize(&argc, argv); |
| benchmark::RunSpecifiedBenchmarks(); |
| return 0; |
| } |