blob: 3e9aae6612b5823ece87f0c32587646d1e9cb4b2 [file] [log] [blame]
#include <iostream>
#include <cstring>
#include <vector>
#include <thread>
#include <chrono>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <poll.h>
#include <unistd.h>
#include <random>
#include <android-base/file.h>
#include <android-base/logging.h>
#include <android-base/scopeguard.h>
#include <IOUringSocketHandler/IOUringSocketHandler.h>
#include <benchmark/benchmark.h>
// Registered buffers
#define MAX_BUFFERS 256
// Threads sending 4k payload - 1 Million times
const int MESSAGE_SIZE = 4096; // 4KB
const int NUM_MESSAGES_PER_THREAD = 1000000;
// The benchmark is set to run 4 times with
// the following combinations:
//
// a: {1, 4, 8, 16} -> This is the number of sender threads
// b: {0, 1} -> Whether sender is non-blocking or blocking
#define BENCH_OPTIONS \
MeasureProcessCPUTime() \
->Unit(benchmark::kSecond) \
->Iterations(1) \
->Repetitions(1) \
->ReportAggregatesOnly(true) \
->ArgsProduct({{16}, {0}});
// Function to generate a random string
std::string generateRandomString(size_t length) {
static const char charset[] =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, sizeof(charset) - 2);
std::string str(length, 0);
for (size_t i = 0; i < length; ++i) {
str[i] = charset[dis(gen)];
}
return str;
}
static void SetLabel(benchmark::State& state) {
std::string num_senders = std::to_string(state.range(0));
std::string type = state.range(1) == 0 ? "non-blocking" : "blocking";
state.SetLabel(num_senders + "-SendThreads" + "/" + type);
}
// Function for sending thread
void sendThread(int sock_send, int blocking) {
std::string message = generateRandomString(MESSAGE_SIZE);
struct ucred cred;
memset(&cred, 0, sizeof(cred));
cred.pid = getpid();
cred.uid = getuid();
cred.gid = getgid();
struct iovec iov_send;
iov_send.iov_base = const_cast<char*>(message.data());
iov_send.iov_len = MESSAGE_SIZE;
struct msghdr msg_send;
memset(&msg_send, 0, sizeof(msg_send));
msg_send.msg_iov = &iov_send;
msg_send.msg_iovlen = 1;
char control_buffer_send[CMSG_SPACE(sizeof(cred))];
memset(control_buffer_send, 0, sizeof(control_buffer_send));
msg_send.msg_control = control_buffer_send;
msg_send.msg_controllen = sizeof(control_buffer_send);
struct cmsghdr* cmsg_send = CMSG_FIRSTHDR(&msg_send);
cmsg_send->cmsg_level = SOL_SOCKET;
cmsg_send->cmsg_type = SCM_CREDENTIALS;
cmsg_send->cmsg_len = CMSG_LEN(sizeof(cred));
memcpy(CMSG_DATA(cmsg_send), &cred, sizeof(cred));
int flags = 0;
if (!blocking) {
flags = MSG_DONTWAIT;
}
for (int i = 0; i < NUM_MESSAGES_PER_THREAD; ++i) {
ssize_t sent_bytes;
while (true) {
sent_bytes = sendmsg(sock_send, &msg_send, flags);
if (sent_bytes >= 0) {
break; // Success
}
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// Try again
continue;
} else {
perror("sendmsg failed");
return;
}
}
}
LOG(DEBUG) << "sendThread exiting";
}
static void SocketBenchMark(benchmark::State& state, const bool) {
state.PauseTiming();
while (state.KeepRunning()) {
std::string tmp_path = android::base::GetExecutableDirectory();
std::string socket_path = tmp_path + "/temp.sock";
const size_t num_sender_threads = state.range(0);
const size_t blocking = state.range(1);
std::vector<int> sender_sockets(num_sender_threads);
// Sender socket setup (for each thread)
for (int i = 0; i < num_sender_threads; ++i) {
int sock_send = socket(AF_UNIX, SOCK_DGRAM, 0);
if (sock_send < 0) {
perror("socket failed");
return;
}
if (!blocking) {
// Set non-blocking for the sender socket
int flags = fcntl(sock_send, F_GETFL, 0);
if (flags == -1) {
perror("fcntl F_GETFL failed");
close(sock_send);
for (int j = 0; j < i; ++j) { // Close previously opened sockets
close(sender_sockets[j]);
}
return;
}
if (fcntl(sock_send, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl F_SETFL failed");
close(sock_send);
for (int j = 0; j < i; ++j) { // Close previously opened sockets
close(sender_sockets[j]);
}
return;
}
}
struct sockaddr_un addr_send;
memset(&addr_send, 0, sizeof(addr_send));
addr_send.sun_family = AF_UNIX;
strcpy(addr_send.sun_path, socket_path.c_str()); // Connect to the receiver
if (connect(sock_send, (struct sockaddr*)&addr_send, sizeof(addr_send)) < 0) {
perror("connect failed");
close(sock_send);
for (int j = 0; j < i; ++j) { // Close previously opened sockets
close(sender_sockets[j]);
}
return;
}
sender_sockets[i] = sock_send;
}
std::vector<std::thread> send_threads;
for (int i = 0; i < num_sender_threads; ++i) {
send_threads.emplace_back(sendThread, sender_sockets[i], blocking);
}
for (auto& thread : send_threads) {
thread.join();
}
for (int sock : sender_sockets) {
close(sock);
}
LOG(INFO) << "Sending data complete";
}
SetLabel(state);
}
static void BM_Sender(benchmark::State& state) {
SocketBenchMark(state, true);
}
BENCHMARK(BM_Sender)->BENCH_OPTIONS
int main(int argc, char** argv) {
android::base::InitLogging(argv, &android::base::StderrLogger);
benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();
return 0;
}