blob: 4ed29a421d5806d4d171188e5dad79e6ad315c09 [file] [log] [blame]
#include <iostream>
#include <cstring>
#include <vector>
#include <thread>
#include <chrono>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <poll.h>
#include <unistd.h>
#include <random>
#include <android-base/file.h>
#include <android-base/logging.h>
#include <android-base/scopeguard.h>
#include <IOUringSocketHandler/IOUringSocketHandler.h>
#include <benchmark/benchmark.h>
// Registered buffers
#define MAX_BUFFERS 256
// Threads sending 4k payload - 1 Million times
const int MESSAGE_SIZE = 4096; // 4KB
const int NUM_MESSAGES_PER_THREAD = 1000000;
static bool io_uring = false;
static bool sync_receive = false;
// The benchmark is set to run 4 times with
// the following combinations:
//
// a: {1, 4, 8, 16} -> This is the number of sender threads
// b: {0, 1} -> Whether sender is blocking or non-blocking
#define BENCH_OPTIONS \
MeasureProcessCPUTime() \
->Unit(benchmark::kSecond) \
->Iterations(1) \
->Repetitions(1) \
->ReportAggregatesOnly(true) \
->ArgsProduct({{16}, {0}});
static void SetLabel(benchmark::State& state) {
std::string num_senders = std::to_string(state.range(0));
std::string type = state.range(1) == 0 ? "non-blocking" : "blocking";
state.SetLabel(num_senders + "-SendThreads" + "/" + type);
}
// Receive using io_uring
bool receiveThreaduring(int sock_recv, int num_threads, uint64_t& total_bytes_received,
double& average_latency) {
std::unique_ptr<IOUringSocketHandler> async_listener_;
async_listener_ = std::make_unique<IOUringSocketHandler>(sock_recv);
if (!async_listener_->SetupIoUring(MAX_BUFFERS)) {
LOG(ERROR) << "SetupIoUring failed";
return false;
}
async_listener_->AllocateAndRegisterBuffers(
MAX_BUFFERS, MESSAGE_SIZE);
if (!async_listener_->EnqueueMultishotRecvmsg()) {
LOG(ERROR) << "EnqueueMultishotRecvmsg failed";
return false;
}
long long received_messages = 0;
long long total_latency = 0;
while (received_messages < num_threads * NUM_MESSAGES_PER_THREAD) {
struct ucred* cred = nullptr;
void* this_recv = nullptr;
size_t len = 0;
auto receive_time = std::chrono::high_resolution_clock::now();
async_listener_->ReceiveData(&this_recv, len, &cred);
// Release the buffer from here onwards
{
auto scope_guard =
android::base::make_scope_guard([&async_listener_]() -> void {
async_listener_->ReleaseBuffer(); });
auto end_receive_time = std::chrono::high_resolution_clock::now();
total_latency += std::chrono::duration_cast<std::chrono::microseconds>(
end_receive_time - receive_time).count();
if (len <= 0) {
LOG(DEBUG) << "Received zero length for: " << received_messages;
continue;
}
received_messages++;
total_bytes_received += len;
}
}
average_latency = static_cast<double>(total_latency) / received_messages;
return true;
}
// Function for receiving thread using recvmsg()
void receiveThread(int sock_recv, int num_threads, uint64_t& total_bytes_received,
double& average_latency) {
char recv_buffer[MESSAGE_SIZE];
struct ucred cred;
struct iovec iov_recv;
iov_recv.iov_base = recv_buffer;
iov_recv.iov_len = MESSAGE_SIZE;
struct msghdr msg_recv;
memset(&msg_recv, 0, sizeof(msg_recv));
msg_recv.msg_iov = &iov_recv;
msg_recv.msg_iovlen = 1;
char control_buffer_recv[CMSG_SPACE(sizeof(cred))];
memset(control_buffer_recv, 0, sizeof(control_buffer_recv));
msg_recv.msg_control = control_buffer_recv;
msg_recv.msg_controllen = sizeof(control_buffer_recv);
struct pollfd pfd;
pfd.fd = sock_recv;
pfd.events = POLLIN;
long long received_messages = 0;
long long total_latency = 0;
while (received_messages < num_threads * NUM_MESSAGES_PER_THREAD) {
auto receive_time = std::chrono::high_resolution_clock::now();
if (poll(&pfd, 1, -1) > 0) {
ssize_t received_bytes = recvmsg(sock_recv, &msg_recv, 0);
if (received_bytes < 0) {
perror("recvmsg failed");
break;
}
auto end_receive_time = std::chrono::high_resolution_clock::now();
total_latency += std::chrono::duration_cast<std::chrono::microseconds>(
end_receive_time - receive_time).count();
received_messages++;
total_bytes_received += received_bytes;
}
}
average_latency = static_cast<double>(total_latency) / received_messages;
}
static int CreateServerSocket(std::string& path) {
int sock_recv = socket(AF_UNIX, SOCK_DGRAM, 0);
if (sock_recv < 0) {
PLOG(ERROR) << "socket failed";
return -1;
}
std::string tmp_path = android::base::GetExecutableDirectory();
std::string socket_path = tmp_path + "/temp.sock";
struct sockaddr_un addr_recv;
memset(&addr_recv, 0, sizeof(addr_recv));
addr_recv.sun_family = AF_UNIX;
strcpy(addr_recv.sun_path, socket_path.c_str());
unlink(socket_path.c_str()); // Remove existing socket file if any
if (bind(sock_recv, (struct sockaddr*)&addr_recv, sizeof(addr_recv)) < 0) {
PLOG(ERROR) << "bind failed";
close(sock_recv);
return -1;
}
path = socket_path;
return sock_recv;
}
static void SocketBenchMark(benchmark::State& state, const bool io_uring) {
state.PauseTiming();
while (state.KeepRunning()) {
std::string socket_path;
int sock_recv = CreateServerSocket(socket_path);
if (sock_recv < 0) {
LOG(ERROR) << "CreateServerSocket failed";
return;
}
const size_t num_sender_threads = state.range(0);
uint64_t total_bytes_received = 0;
double average_latency = 0;
state.ResumeTiming();
if (io_uring) {
receiveThreaduring(sock_recv, num_sender_threads,
std::ref(total_bytes_received), std::ref(average_latency));
} else {
receiveThread(sock_recv, num_sender_threads,
std::ref(total_bytes_received), std::ref(average_latency));
}
state.PauseTiming();
state.counters["Total_Data"] = total_bytes_received;
state.counters["Latency(usec)"] = average_latency;
state.SetBytesProcessed(total_bytes_received);
state.SetItemsProcessed(num_sender_threads * NUM_MESSAGES_PER_THREAD);
// Cleanup
close(sock_recv);
unlink(socket_path.c_str()); // Remove the socket file
}
SetLabel(state);
}
static void BM_ReceiveIOUring(benchmark::State& state) {
if (io_uring) {
SocketBenchMark(state, true);
}
}
BENCHMARK(BM_ReceiveIOUring)->BENCH_OPTIONS
static void BM_ReceiveSync(benchmark::State& state) {
if (sync_receive) {
SocketBenchMark(state, false);
}
}
BENCHMARK(BM_ReceiveSync)->BENCH_OPTIONS
int main(int argc, char** argv) {
android::base::InitLogging(argv, &android::base::StderrLogger);
if (argc != 2) {
std::cerr << "IOUringSocketHandlerReceiver {-io-uring | -sync}\n";
return 0;
}
if (std::string(argv[1]) == "-io-uring") {
io_uring = true;
} else if (std::string(argv[1]) == "-sync") {
sync_receive = true;
} else {
std::cerr << "IOUringSocketHandlerReceiver {-io-uring | -sync}\n";
return 0;
}
benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();
return 0;
}