[export]
Batch pending rpcs in client and server side before sending them down to tcp. If multiple rpcs get queued up into chaotic good, it should send them out over one tcp endpoint write.

----
DO NOT SUBMIT. This PR is for testing purposes only. [cl/627143927](http://cl/627143927)

PiperOrigin-RevId: 627143927
diff --git a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h
index f89d2d6..38db73d 100644
--- a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h
+++ b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h
@@ -67,6 +67,30 @@
         data_endpoint_.Write(std::move(buffers.data)));
   }
 
+  void SerializeFrameIntoBuffers(const FrameInterface& frame,
+                                 SliceBuffer& control_buffer,
+                                 SliceBuffer& data_buffer) {
+    auto buffers = frame.Serialize(&encoder_);
+    if (grpc_chaotic_good_trace.enabled()) {
+      gpr_log(GPR_INFO, "CHAOTIC_GOOD: WriteFrame to:%s %s",
+              ResolvedAddressToString(control_endpoint_.GetPeerAddress())
+                  .value_or("<<unknown peer address>>")
+                  .c_str(),
+              frame.ToString().c_str());
+    }
+    buffers.control.MoveFirstNBytesIntoSliceBuffer(buffers.control.Length(),
+                                                   control_buffer);
+    buffers.data.MoveFirstNBytesIntoSliceBuffer(buffers.data.Length(),
+                                                data_buffer);
+  }
+
+  auto WriteSerializedFrames(SliceBuffer& control_buffer,
+                             SliceBuffer& data_buffer) {
+    return TryJoin<absl::StatusOr>(
+        control_endpoint_.Write(std::move(control_buffer)),
+        data_endpoint_.Write(std::move(data_buffer)));
+  }
+
   // Read frame header and payloads for control and data portions of one frame.
   // Resolves to StatusOr<tuple<FrameHeader, BufferPair>>.
   auto ReadFrameBytes() {
diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc
index 6b3160e..3ac4f1e 100644
--- a/src/core/ext/transport/chaotic_good/client_transport.cc
+++ b/src/core/ext/transport/chaotic_good/client_transport.cc
@@ -59,11 +59,17 @@
     RefCountedPtr<ChaoticGoodTransport> transport) {
   return Loop([this, transport = std::move(transport)] {
     return TrySeq(
-        // Get next outgoing frame.
-        outgoing_frames_.Next(),
+        // Get all the next outgoing frames.
+        outgoing_frames_.AllNext(),
         // Serialize and write it out.
-        [transport = transport.get()](ClientFrame client_frame) {
-          return transport->WriteFrame(GetFrameInterface(client_frame));
+        [transport = transport.get()](std::vector<ClientFrame> client_frame) {
+          SliceBuffer control_buffer;
+          SliceBuffer data_buffer;
+          for (auto& frame : client_frame) {
+            transport->SerializeFrameIntoBuffers(GetFrameInterface(frame),
+                                                 control_buffer, data_buffer);
+          }
+          return transport->WriteSerializedFrames(control_buffer, data_buffer);
         },
         []() -> LoopCtl<absl::Status> {
           // The write failures will be caught in TrySeq and exit loop.
diff --git a/src/core/ext/transport/chaotic_good/server_transport.cc b/src/core/ext/transport/chaotic_good/server_transport.cc
index 7975975..bbd0b3b 100644
--- a/src/core/ext/transport/chaotic_good/server_transport.cc
+++ b/src/core/ext/transport/chaotic_good/server_transport.cc
@@ -54,11 +54,17 @@
     RefCountedPtr<ChaoticGoodTransport> transport) {
   return Loop([this, transport = std::move(transport)] {
     return TrySeq(
-        // Get next outgoing frame.
-        outgoing_frames_.Next(),
+        // Get all the next outgoing frames.
+        outgoing_frames_.AllNext(),
         // Serialize and write it out.
-        [transport = transport.get()](ServerFrame client_frame) {
-          return transport->WriteFrame(GetFrameInterface(client_frame));
+        [transport = transport.get()](std::vector<ServerFrame> server_frame) {
+          SliceBuffer control_buffer;
+          SliceBuffer data_buffer;
+          for (auto& frame : server_frame) {
+            transport->SerializeFrameIntoBuffers(GetFrameInterface(frame),
+                                                 control_buffer, data_buffer);
+          }
+          return transport->WriteSerializedFrames(control_buffer, data_buffer);
         },
         []() -> LoopCtl<absl::Status> {
           // The write failures will be caught in TrySeq and exit loop.
diff --git a/src/core/lib/promise/mpsc.h b/src/core/lib/promise/mpsc.h
index 2362de3..0978d8f 100644
--- a/src/core/lib/promise/mpsc.h
+++ b/src/core/lib/promise/mpsc.h
@@ -204,6 +204,27 @@
     };
   }
 
+  // Return a promise that will resolve to a vector containing all available
+  // items.
+  auto AllNext() {
+    return [this]() -> Poll<std::vector<T>> {
+      if (buffer_it_ == buffer_.end()) {
+        if (center_->PollReceiveBatch(buffer_) && !buffer_.empty()) {
+          return std::move(buffer_);
+        }
+      }
+      if (buffer_it_ != buffer_.end()) {
+        std::vector<T> dest;
+        buffer_it_ = buffer_.begin();
+        while (buffer_it_ != buffer_.end()) {
+          dest.push_back(std::move(*buffer_it_++));
+        }
+        return Poll<std::vector<T>>(std::move(dest));
+      }
+      return Pending{};
+    };
+  }
+
  private:
   // Received items. We move out of here one by one, but don't resize the
   // vector. Instead, when we run out of items, we poll the center for more -
diff --git a/test/core/promise/mpsc_test.cc b/test/core/promise/mpsc_test.cc
index 3d6e669..1b4a528 100644
--- a/test/core/promise/mpsc_test.cc
+++ b/test/core/promise/mpsc_test.cc
@@ -175,6 +175,30 @@
   activity.Deactivate();
 }
 
+TEST(MpscTest, AllNextReceiveWorks) {
+  StrictMock<MockActivity> activity;
+  MpscReceiver<Payload> receiver(1);
+  MpscSender<Payload> sender = receiver.MakeSender();
+
+  EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(1)), true);
+  EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(2)), true);
+  EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(3)), true);
+  EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(4)), true);
+  EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(5)), true);
+  EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(6)), true);
+  EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(7)), true);
+
+  activity.Activate();
+  auto payloads = NowOrNever(receiver.AllNext());
+  EXPECT_TRUE(payloads.has_value());
+  EXPECT_EQ(payloads.value().size(), 7);
+  int i = 1;
+  for (const auto& payload : *payloads) {
+    EXPECT_EQ(payload, MakePayload(i++));
+  }
+  activity.Deactivate();
+}
+
 }  // namespace
 }  // namespace grpc_core
 
diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc
index b5dc5f8..da9a42a 100644
--- a/test/core/transport/chaotic_good/client_transport_test.cc
+++ b/test/core/transport/chaotic_good/client_transport_test.cc
@@ -123,14 +123,14 @@
   transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
   StrictMock<MockFunction<void()>> on_done;
   EXPECT_CALL(on_done, Call());
+  // With batching enabled, the control endpoint is expected to write
+  // the headers and message length frame in one go.
   control_endpoint.ExpectWrite(
       {SerializedFrameHeader(FrameType::kFragment, 1, 1,
                              sizeof(kPathDemoServiceStep), 0, 0, 0),
        EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep,
-                                          sizeof(kPathDemoServiceStep))},
-      nullptr);
-  control_endpoint.ExpectWrite(
-      {SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
+                                          sizeof(kPathDemoServiceStep)),
+       SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
       nullptr);
   data_endpoint.ExpectWrite(
       {EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr);
@@ -213,20 +213,19 @@
       {SerializedFrameHeader(FrameType::kFragment, 1, 1,
                              sizeof(kPathDemoServiceStep), 0, 0, 0),
        EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep,
-                                          sizeof(kPathDemoServiceStep))},
-      nullptr);
-  control_endpoint.ExpectWrite(
-      {SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
+                                          sizeof(kPathDemoServiceStep)),
+       SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
       nullptr);
   data_endpoint.ExpectWrite(
       {EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr);
+  // With batching enabled, the control endpoint is expected to write
+  // the message length frame of the second message and trailers in one go.
   control_endpoint.ExpectWrite(
-      {SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
+      {SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0),
+       SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)},
       nullptr);
   data_endpoint.ExpectWrite(
       {EventEngineSlice::FromCopiedString("1"), Zeros(63)}, nullptr);
-  control_endpoint.ExpectWrite(
-      {SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr);
   call.initiator.SpawnGuarded("test-send",
                               [initiator = call.initiator]() mutable {
                                 return SendClientToServerMessages(initiator, 2);
diff --git a/test/core/transport/chaotic_good/server_transport_test.cc b/test/core/transport/chaotic_good/server_transport_test.cc
index a5dd000..c73902f 100644
--- a/test/core/transport/chaotic_good/server_transport_test.cc
+++ b/test/core/transport/chaotic_good/server_transport_test.cc
@@ -170,14 +170,14 @@
   EXPECT_CALL(*control_endpoint.endpoint, Read)
       .InSequence(control_endpoint.read_sequence)
       .WillOnce(Return(false));
+  // With batching enabled, the control endpoint is expected to write
+  // the headers and message length frame in one go.
   control_endpoint.ExpectWrite(
       {SerializedFrameHeader(FrameType::kFragment, 1, 1,
                              sizeof(kPathDemoServiceStep), 0, 0, 0),
        EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep,
-                                          sizeof(kPathDemoServiceStep))},
-      nullptr);
-  control_endpoint.ExpectWrite(
-      {SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 8, 56, 0)},
+                                          sizeof(kPathDemoServiceStep)),
+       SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 8, 56, 0)},
       nullptr);
   data_endpoint.ExpectWrite(
       {EventEngineSlice::FromCopiedString("87654321"), Zeros(56)}, nullptr);