liburingutils: Use io_uring_peek_batch_cqe API

1: Use io_uring_peek_batch_cqe() API to check if there are CQEs
pending in the ring. If so, consume them before invoking next syscall.
This will ensure that all the CQEs are quickly processed which
are already waiting to be consumed from the ring.

2: Remove TASK_RUN flag. Since wait_cqe() will ensure syscall is
   generated, this flag is not required.

3: Add unit test which generates different size messages and verifies
data integrity.

4: Cap the max threads to 1 always.

Bug: 406299670
Test: ./IOUringSocketHandler_tests

[----------] 1024 tests from Io/IOUringSocketHandlerTest (122980 ms total)

[----------] Global test environment tear-down
[==========] 1027 tests from 2 test suites ran. (122986 ms total)
[  PASSED  ] 1027 tests.

Change-Id: Ib205018b200f482c211fad3d181dfbe386a737e4
Signed-off-by: Akilesh Kailash <akailash@google.com>
diff --git a/include/IOUringSocketHandler/IOUringSocketHandler.h b/include/IOUringSocketHandler/IOUringSocketHandler.h
index 61b843d..be91bec 100644
--- a/include/IOUringSocketHandler/IOUringSocketHandler.h
+++ b/include/IOUringSocketHandler/IOUringSocketHandler.h
@@ -173,4 +173,12 @@
     bool registered_buffers_ = false;
     bool registered_ring_fd_ = false;
     bool ring_setup_ = false;
+
+    // Vector of cqe entries obtained after peek.
+    std::vector<struct io_uring_cqe*> cqe_vector_;
+    // Count of cqe entries which are not consumed yet.
+    int active_count_ = 0;
+    // Index into the cqe_vector_ to process the entries
+    // which are not consumed yet.
+    int active_index_ = -1;
 };
diff --git a/src/IOUringSocketHandler.cpp b/src/IOUringSocketHandler.cpp
index cbcf225..a2efd18 100644
--- a/src/IOUringSocketHandler.cpp
+++ b/src/IOUringSocketHandler.cpp
@@ -78,6 +78,7 @@
 
     io_uring_prep_recvmsg_multishot(sqe, socket_, &msg, 0);
     sqe->flags |= IOSQE_BUFFER_SELECT;
+
     sqe->buf_group = bgid_;
     int ret = io_uring_submit(&mCtx->ring);
     if (ret < 0) {
@@ -97,6 +98,7 @@
         buffers_.push_back(std::move(buffer));
     }
 
+    cqe_vector_.resize(num_buffers_);
     return RegisterBuffers();
 }
 
@@ -135,10 +137,9 @@
 
     // COOP_TASKRUN - Do not send IPI to process
     // SINGLE_ISSUER - Only one thread is doing the work on the ring
-    // TASKRUN_FLAG - we use peek_cqe - Hence, trigger task work if required
     // DEFER_TASKRUN - trigger task work when CQE is explicitly polled
     params.flags |= (IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER |
-                     IORING_SETUP_TASKRUN_FLAG | IORING_SETUP_DEFER_TASKRUN);
+                     IORING_SETUP_DEFER_TASKRUN);
 
     int ret = io_uring_queue_init_params(queue_size + 1, &mCtx->ring, &params);
     if (ret) {
@@ -157,6 +158,13 @@
         registered_ring_fd_ = true;
     }
 
+    unsigned int values[2];
+    values[0] = values[1] = 1;
+    ret = io_uring_register_iowq_max_workers(&mCtx->ring, values);
+    if (ret) {
+        LOG(ERROR) << "io_uring_register_iowq_max_workers failed: " << ret;
+    }
+
     return true;
 }
 
@@ -182,73 +190,93 @@
 
 void IOUringSocketHandler::ReceiveData(void** payload, size_t& payload_len, struct ucred** cred) {
   while (true) {
-      // Try to get the data if CQE is already present in the ring buffer.
-      // Note: Since IORING_SETUP_TASKRUN_FLAG flag is set, this will trigger
-      // a task run if there are no outstanding CQEs to be reaped.
-      if (io_uring_peek_cqe(&mCtx->ring, &cqe) < 0) {
-          int ret = io_uring_wait_cqe(&mCtx->ring, &cqe);
-          if (ret) {
-              EnqueueMultishotRecvmsg();
-              continue;
-          }
-      }
-
-      bool cqe_f_buffer = (cqe->flags & IORING_CQE_F_BUFFER);
-
-      // A failure here would most likely be related to ENOBUFS.
-      // However, for every failure, we need to re-arm the multishot sqe.
-      if ((cqe->res < 0) || !cqe_f_buffer) {
-          // No buffers were selected from registered buffers even
-          // though we had valid payload.
-          if ((cqe->res > 0) && !cqe_f_buffer) {
-              LOG(ERROR) << "No buffers selected. cqe->res: " << cqe->res
-                         << " cqe_flags: " << cqe->flags;
-          }
-          if (cqe->res != -ENOBUFS) {
-              LOG(ERROR) << "cqe failed with error: " << cqe->res;
-          }
-          io_uring_cqe_seen(&mCtx->ring, cqe);
+    if (active_count_ > 0) {
+      // Consume next CQE from the existing active batch
+      cqe = cqe_vector_[active_index_];
+      active_count_ -= 1;
+      active_index_ += 1;
+    } else {
+      // No active batch, try to get new CQEs
+      active_index_ = 0;
+      // Try to peek a batch without blocking
+      int count = io_uring_peek_batch_cqe(&mCtx->ring, cqe_vector_.data(), num_buffers_);
+      if (count > 0 ) {
+        // Peek successful, store the count and process the first CQE now
+        active_count_ = count;
+        cqe = cqe_vector_[active_index_]; // Get the first one (index 0)
+        active_count_ -= 1;
+        active_index_ += 1;
+      } else {
+        // No batch is active
+        active_index_ = -1;
+        active_count_ = 0;
+        // Peek failed (no CQEs ready), block waiting for a single CQE
+        // Since DEFER_TASK_RUN flag is set for the ring, this
+        // will trigger the task and initiate the receive of packets
+        int ret = io_uring_wait_cqe(&mCtx->ring, &cqe);
+        if (ret) {
           EnqueueMultishotRecvmsg();
           continue;
-      }
-
-      // Pick the buffer-id where the payload data is sent.
-      active_buffer_id_ = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
-
-      void* this_recv = buffers_[active_buffer_id_].get();
-      struct io_uring_recvmsg_out* out = io_uring_recvmsg_validate(this_recv, cqe->res, &msg);
-
-      if (!out) {
-          ReleaseBuffer();
-          continue;
-      }
-
-      // Fetch ucred control data from cmsg
-      struct cmsghdr* cmsg;
-      cmsg = io_uring_recvmsg_cmsg_firsthdr(out, &msg);
-
-      struct ucred* cr = nullptr;
-      while (cmsg != nullptr) {
-          if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_CREDENTIALS) {
-              cr = (struct ucred*)CMSG_DATA(cmsg);
-              break;
-          }
-          cmsg = io_uring_recvmsg_cmsg_nexthdr(out, &msg, cmsg);
-      }
-
-      *payload = io_uring_recvmsg_payload(out, &msg);
-      payload_len = io_uring_recvmsg_payload_length(out, cqe->res, &msg);
-      *cred = cr;
-
-      // We have the valid data. Return it to the client.
-      // Note: We don't check "cred" pointer as senders can just send
-      // payload without credentials. It is up to the caller on how
-      // to handle it.
-      if ((*payload != nullptr) && (payload_len > 0)) {
-          break;
-      } else {
-          // Release the buffer and re-check the CQE buffers in the ring.
-          ReleaseBuffer();
+        }
       }
     }
+
+    bool cqe_f_buffer = (cqe->flags & IORING_CQE_F_BUFFER);
+
+    // A failure here would most likely be related to ENOBUFS.
+    // However, for every failure, we need to re-arm the multishot sqe.
+    if ((cqe->res < 0) || !cqe_f_buffer) {
+      // No buffers were selected from registered buffers even
+      // though we had valid payload.
+      if ((cqe->res > 0) && !cqe_f_buffer) {
+        LOG(ERROR) << "No buffers selected. cqe->res: " << cqe->res
+                   << " cqe_flags: " << cqe->flags;
+      }
+      if (cqe->res != -ENOBUFS) {
+        LOG(ERROR) << "cqe failed with error: " << cqe->res;
+      }
+      io_uring_cqe_seen(&mCtx->ring, cqe);
+      EnqueueMultishotRecvmsg();
+      continue;
+    }
+
+    // Pick the buffer-id where the payload data is sent.
+    active_buffer_id_ = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
+
+    void* this_recv = buffers_[active_buffer_id_].get();
+    struct io_uring_recvmsg_out* out = io_uring_recvmsg_validate(this_recv, cqe->res, &msg);
+
+    if (!out) {
+      ReleaseBuffer();
+      continue;
+    }
+
+    // Fetch ucred control data from cmsg
+    struct cmsghdr* cmsg;
+    cmsg = io_uring_recvmsg_cmsg_firsthdr(out, &msg);
+
+    struct ucred* cr = nullptr;
+    while (cmsg != nullptr) {
+      if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_CREDENTIALS) {
+        cr = (struct ucred*)CMSG_DATA(cmsg);
+        break;
+      }
+      cmsg = io_uring_recvmsg_cmsg_nexthdr(out, &msg, cmsg);
+    }
+
+    *payload = io_uring_recvmsg_payload(out, &msg);
+    payload_len = io_uring_recvmsg_payload_length(out, cqe->res, &msg);
+    *cred = cr;
+
+    // We have the valid data. Return it to the client.
+    // Note: We don't check "cred" pointer as senders can just send
+    // payload without credentials. It is up to the caller on how
+    // to handle it.
+    if ((*payload != nullptr) && (payload_len > 0)) {
+      break;
+    } else {
+      // Release the buffer and re-check the CQE buffers in the ring.
+      ReleaseBuffer();
+    }
+  }
 }
diff --git a/src/IOUringSocketHandler_test.cpp b/src/IOUringSocketHandler_test.cpp
index 6ff012f..2135891 100644
--- a/src/IOUringSocketHandler_test.cpp
+++ b/src/IOUringSocketHandler_test.cpp
@@ -41,6 +41,7 @@
 struct TestParam {
     int queue_depth;
     int numMessages;
+    int messageSize;
 };
 
 class IOUringSocketHandlerTest : public ::testing::TestWithParam<TestParam> {
@@ -65,7 +66,6 @@
     int queue_depth_ = 1;
     int sock_recv_;
     std::string socket_path_;
-    const int kMessageSize = 4096;
     std::vector<std::string> sent_messages; // Store sent messages for comparison
 };
 
@@ -161,7 +161,7 @@
 void IOUringSocketHandlerTest::SendMsg(int sock_send, const bool non_block) {
    const TestParam params = GetParam();
    for (int i = 0; i < params.numMessages; ++i) {
-     std::string message = generateRandomString(kMessageSize);
+     std::string message = generateRandomString(params.messageSize);
 
      sent_messages.push_back(message);
      struct ucred cred;
@@ -172,7 +172,7 @@
 
      struct iovec iov_send;
      iov_send.iov_base = const_cast<char*>(message.data());
-     iov_send.iov_len = kMessageSize;
+     iov_send.iov_len = params.messageSize;
 
      struct msghdr msg_send;
      memset(&msg_send, 0, sizeof(msg_send));
@@ -217,7 +217,7 @@
     const TestParam params = GetParam();
     ASSERT_TRUE(uring_listener->SetupIoUring(params.queue_depth));
     uring_listener->AllocateAndRegisterBuffers(
-        params.queue_depth, kMessageSize);
+        params.queue_depth, params.messageSize);
 
     ASSERT_TRUE(uring_listener->EnqueueMultishotRecvmsg());
 
@@ -311,22 +311,26 @@
 }
 
 std::vector<TestParam> GetConfigs() {
-    std::vector<TestParam> testParams;
+  std::vector<TestParam> testParams;
 
-    std::vector<int> queue_depth = {1, 8, 16, 32, 64, 128, 256, 512};
-    std::vector<int> num_messages = {1, 100, 250, 500, 1000, 1500, 2000, 5000};
+  std::vector<int> queue_depth = {1, 8, 16, 32, 64, 128, 256, 512};
+  std::vector<int> num_messages = {1, 100, 250, 500, 1000, 1500, 2000, 5000};
+  std::vector<int> message_sizes = {1, 100, 520, 1024, 2042, 3168, 4068, 4096};
 
-    // This will test 64 combinations
+  // This will test 512 combinations
+  for (auto message_size: message_sizes) {
     for (auto q_depth : queue_depth) {
       for (auto n_messages : num_messages) {
         TestParam param;
         param.queue_depth = q_depth;
         param.numMessages = n_messages;
+        param.messageSize = message_size;
         testParams.push_back(std::move(param));
       }
     }
+  }
 
-    return testParams;
+  return testParams;
 }
 
 INSTANTIATE_TEST_SUITE_P(Io, IOUringSocketHandlerTest,