Merge pull request #15697 from srini100/backport15690

Backport #15690 from master to v1.13.x
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 98391a1..ea6775a 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -817,7 +817,6 @@
   // For intercepting recv_trailing_metadata.
   grpc_metadata_batch recv_trailing_metadata;
   grpc_transport_stream_stats collect_stats;
-  grpc_closure recv_trailing_metadata_ready;
   // For intercepting on_complete.
   grpc_closure on_complete;
 } subchannel_batch_data;
@@ -1193,24 +1192,35 @@
             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
             elem->channel_data, calld, num_batches, grpc_error_string(error));
   }
-  grpc_core::CallCombinerClosureList closures;
+  grpc_transport_stream_op_batch*
+      batches[GPR_ARRAY_SIZE(calld->pending_batches)];
+  size_t num_batches = 0;
   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
     pending_batch* pending = &calld->pending_batches[i];
     grpc_transport_stream_op_batch* batch = pending->batch;
     if (batch != nullptr) {
-      batch->handler_private.extra_arg = calld;
-      GRPC_CLOSURE_INIT(&batch->handler_private.closure,
-                        fail_pending_batch_in_call_combiner, batch,
-                        grpc_schedule_on_exec_ctx);
-      closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
-                   "pending_batches_fail");
+      batches[num_batches++] = batch;
       pending_batch_clear(calld, pending);
     }
   }
+  for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) {
+    grpc_transport_stream_op_batch* batch = batches[i];
+    batch->handler_private.extra_arg = calld;
+    GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+                      fail_pending_batch_in_call_combiner, batch,
+                      grpc_schedule_on_exec_ctx);
+    GRPC_CALL_COMBINER_START(calld->call_combiner,
+                             &batch->handler_private.closure,
+                             GRPC_ERROR_REF(error), "pending_batches_fail");
+  }
   if (yield_call_combiner) {
-    closures.RunClosures(calld->call_combiner);
-  } else {
-    closures.RunClosuresWithoutYielding(calld->call_combiner);
+    if (num_batches > 0) {
+      // Note: This will release the call combiner.
+      grpc_transport_stream_op_batch_finish_with_failure(
+          batches[0], GRPC_ERROR_REF(error), calld->call_combiner);
+    } else {
+      GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail");
+    }
   }
   GRPC_ERROR_UNREF(error);
 }
@@ -1245,22 +1255,30 @@
             " pending batches on subchannel_call=%p",
             chand, calld, num_batches, calld->subchannel_call);
   }
-  grpc_core::CallCombinerClosureList closures;
+  grpc_transport_stream_op_batch*
+      batches[GPR_ARRAY_SIZE(calld->pending_batches)];
+  size_t num_batches = 0;
   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
     pending_batch* pending = &calld->pending_batches[i];
     grpc_transport_stream_op_batch* batch = pending->batch;
     if (batch != nullptr) {
-      batch->handler_private.extra_arg = calld->subchannel_call;
-      GRPC_CLOSURE_INIT(&batch->handler_private.closure,
-                        resume_pending_batch_in_call_combiner, batch,
-                        grpc_schedule_on_exec_ctx);
-      closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
-                   "pending_batches_resume");
+      batches[num_batches++] = batch;
       pending_batch_clear(calld, pending);
     }
   }
+  for (size_t i = 1; i < num_batches; ++i) {
+    grpc_transport_stream_op_batch* batch = batches[i];
+    batch->handler_private.extra_arg = calld->subchannel_call;
+    GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+                      resume_pending_batch_in_call_combiner, batch,
+                      grpc_schedule_on_exec_ctx);
+    GRPC_CALL_COMBINER_START(calld->call_combiner,
+                             &batch->handler_private.closure, GRPC_ERROR_NONE,
+                             "pending_batches_resume");
+  }
+  GPR_ASSERT(num_batches > 0);
   // Note: This will release the call combiner.
-  closures.RunClosures(calld->call_combiner);
+  grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
 }
 
 static void maybe_clear_pending_batch(grpc_call_element* elem,
@@ -1275,10 +1293,7 @@
        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
            nullptr) &&
       (!batch->recv_message ||
-       batch->payload->recv_message.recv_message_ready == nullptr) &&
-      (!batch->recv_trailing_metadata ||
-       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
-           nullptr)) {
+       batch->payload->recv_message.recv_message_ready == nullptr)) {
     if (grpc_client_channel_trace.enabled()) {
       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
               calld);
@@ -1287,27 +1302,75 @@
   }
 }
 
-// Returns a pointer to the first pending batch for which predicate(batch)
-// returns true, or null if not found.
-template <typename Predicate>
-static pending_batch* pending_batch_find(grpc_call_element* elem,
-                                         const char* log_message,
-                                         Predicate predicate) {
-  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
-  call_data* calld = static_cast<call_data*>(elem->call_data);
-  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
-    pending_batch* pending = &calld->pending_batches[i];
-    grpc_transport_stream_op_batch* batch = pending->batch;
-    if (batch != nullptr && predicate(batch)) {
-      if (grpc_client_channel_trace.enabled()) {
-        gpr_log(GPR_INFO,
-                "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
-                calld, log_message, i);
-      }
-      return pending;
-    }
+// Returns true if all ops in the pending batch have been completed.
+static bool pending_batch_is_completed(
+    pending_batch* pending, call_data* calld,
+    subchannel_call_retry_state* retry_state) {
+  if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+    return false;
   }
-  return nullptr;
+  if (pending->batch->send_initial_metadata &&
+      !retry_state->completed_send_initial_metadata) {
+    return false;
+  }
+  if (pending->batch->send_message &&
+      retry_state->completed_send_message_count <
+          calld->send_messages->size()) {
+    return false;
+  }
+  if (pending->batch->send_trailing_metadata &&
+      !retry_state->completed_send_trailing_metadata) {
+    return false;
+  }
+  if (pending->batch->recv_initial_metadata &&
+      !retry_state->completed_recv_initial_metadata) {
+    return false;
+  }
+  if (pending->batch->recv_message &&
+      retry_state->completed_recv_message_count <
+          retry_state->started_recv_message_count) {
+    return false;
+  }
+  if (pending->batch->recv_trailing_metadata &&
+      !retry_state->completed_recv_trailing_metadata) {
+    return false;
+  }
+  return true;
+}
+
+// Returns true if any op in the batch was not yet started.
+static bool pending_batch_is_unstarted(
+    pending_batch* pending, call_data* calld,
+    subchannel_call_retry_state* retry_state) {
+  if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+    return false;
+  }
+  if (pending->batch->send_initial_metadata &&
+      !retry_state->started_send_initial_metadata) {
+    return true;
+  }
+  if (pending->batch->send_message &&
+      retry_state->started_send_message_count < calld->send_messages->size()) {
+    return true;
+  }
+  if (pending->batch->send_trailing_metadata &&
+      !retry_state->started_send_trailing_metadata) {
+    return true;
+  }
+  if (pending->batch->recv_initial_metadata &&
+      !retry_state->started_recv_initial_metadata) {
+    return true;
+  }
+  if (pending->batch->recv_message &&
+      retry_state->completed_recv_message_count ==
+          retry_state->started_recv_message_count) {
+    return true;
+  }
+  if (pending->batch->recv_trailing_metadata &&
+      !retry_state->started_recv_trailing_metadata) {
+    return true;
+  }
+  return false;
 }
 
 //
@@ -1494,13 +1557,8 @@
 // subchannel_batch_data
 //
 
-// Creates a subchannel_batch_data object on the call's arena with the
-// specified refcount.  If set_on_complete is true, the batch's
-// on_complete callback will be set to point to on_complete();
-// otherwise, the batch's on_complete callback will be null.
 static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
-                                                int refcount,
-                                                bool set_on_complete) {
+                                                int refcount) {
   call_data* calld = static_cast<call_data*>(elem->call_data);
   subchannel_call_retry_state* retry_state =
       static_cast<subchannel_call_retry_state*>(
@@ -1513,11 +1571,9 @@
       GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
   batch_data->batch.payload = &retry_state->batch_payload;
   gpr_ref_init(&batch_data->refs, refcount);
-  if (set_on_complete) {
-    GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
-                      grpc_schedule_on_exec_ctx);
-    batch_data->batch.on_complete = &batch_data->on_complete;
-  }
+  GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
+                    grpc_schedule_on_exec_ctx);
+  batch_data->batch.on_complete = &batch_data->on_complete;
   GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
   return batch_data;
 }
@@ -1550,14 +1606,26 @@
 static void invoke_recv_initial_metadata_callback(void* arg,
                                                   grpc_error* error) {
   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+  channel_data* chand =
+      static_cast<channel_data*>(batch_data->elem->channel_data);
+  call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
   // Find pending batch.
-  pending_batch* pending = pending_batch_find(
-      batch_data->elem, "invoking recv_initial_metadata_ready for",
-      [](grpc_transport_stream_op_batch* batch) {
-        return batch->recv_initial_metadata &&
-               batch->payload->recv_initial_metadata
-                       .recv_initial_metadata_ready != nullptr;
-      });
+  pending_batch* pending = nullptr;
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+    grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
+    if (batch != nullptr && batch->recv_initial_metadata &&
+        batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
+            nullptr) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: invoking recv_initial_metadata_ready for "
+                "pending batch at index %" PRIuPTR,
+                chand, calld, i);
+      }
+      pending = &calld->pending_batches[i];
+      break;
+    }
+  }
   GPR_ASSERT(pending != nullptr);
   // Return metadata.
   grpc_metadata_batch_move(
@@ -1593,19 +1661,10 @@
       static_cast<subchannel_call_retry_state*>(
           grpc_connected_subchannel_call_get_parent_data(
               batch_data->subchannel_call));
-  retry_state->completed_recv_initial_metadata = true;
-  // If a retry was already dispatched, then we're not going to use the
-  // result of this recv_initial_metadata op, so do nothing.
-  if (retry_state->retry_dispatched) {
-    GRPC_CALL_COMBINER_STOP(
-        calld->call_combiner,
-        "recv_initial_metadata_ready after retry dispatched");
-    return;
-  }
   // If we got an error or a Trailers-Only response and have not yet gotten
-  // the recv_trailing_metadata_ready callback, then defer propagating this
-  // callback back to the surface.  We can evaluate whether to retry when
-  // recv_trailing_metadata comes back.
+  // the recv_trailing_metadata on_complete callback, then defer
+  // propagating this callback back to the surface.  We can evaluate whether
+  // to retry when recv_trailing_metadata comes back.
   if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
                     error != GRPC_ERROR_NONE) &&
                    !retry_state->completed_recv_trailing_metadata)) {
@@ -1630,9 +1689,9 @@
   }
   // Received valid initial metadata, so commit the call.
   retry_commit(elem, retry_state);
-  // Invoke the callback to return the result to the surface.
   // Manually invoking a callback function; it does not take ownership of error.
   invoke_recv_initial_metadata_callback(batch_data, error);
+  GRPC_ERROR_UNREF(error);
 }
 
 //
@@ -1642,13 +1701,25 @@
 // Invokes recv_message_ready for a subchannel batch.
 static void invoke_recv_message_callback(void* arg, grpc_error* error) {
   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
+  channel_data* chand =
+      static_cast<channel_data*>(batch_data->elem->channel_data);
+  call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
   // Find pending op.
-  pending_batch* pending = pending_batch_find(
-      batch_data->elem, "invoking recv_message_ready for",
-      [](grpc_transport_stream_op_batch* batch) {
-        return batch->recv_message &&
-               batch->payload->recv_message.recv_message_ready != nullptr;
-      });
+  pending_batch* pending = nullptr;
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+    grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch;
+    if (batch != nullptr && batch->recv_message &&
+        batch->payload->recv_message.recv_message_ready != nullptr) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: invoking recv_message_ready for "
+                "pending batch at index %" PRIuPTR,
+                chand, calld, i);
+      }
+      pending = &calld->pending_batches[i];
+      break;
+    }
+  }
   GPR_ASSERT(pending != nullptr);
   // Return payload.
   *pending->batch->payload->recv_message.recv_message =
@@ -1680,18 +1751,10 @@
       static_cast<subchannel_call_retry_state*>(
           grpc_connected_subchannel_call_get_parent_data(
               batch_data->subchannel_call));
-  ++retry_state->completed_recv_message_count;
-  // If a retry was already dispatched, then we're not going to use the
-  // result of this recv_message op, so do nothing.
-  if (retry_state->retry_dispatched) {
-    GRPC_CALL_COMBINER_STOP(calld->call_combiner,
-                            "recv_message_ready after retry dispatched");
-    return;
-  }
   // If we got an error or the payload was nullptr and we have not yet gotten
-  // the recv_trailing_metadata_ready callback, then defer propagating this
-  // callback back to the surface.  We can evaluate whether to retry when
-  // recv_trailing_metadata comes back.
+  // the recv_trailing_metadata on_complete callback, then defer
+  // propagating this callback back to the surface.  We can evaluate whether
+  // to retry when recv_trailing_metadata comes back.
   if (GPR_UNLIKELY(
           (batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
           !retry_state->completed_recv_trailing_metadata)) {
@@ -1714,241 +1777,133 @@
   }
   // Received a valid message, so commit the call.
   retry_commit(elem, retry_state);
-  // Invoke the callback to return the result to the surface.
   // Manually invoking a callback function; it does not take ownership of error.
   invoke_recv_message_callback(batch_data, error);
-}
-
-//
-// recv_trailing_metadata handling
-//
-
-// Adds recv_trailing_metadata_ready closure to closures.
-static void add_closure_for_recv_trailing_metadata_ready(
-    grpc_call_element* elem, subchannel_batch_data* batch_data,
-    grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
-  // Find pending batch.
-  pending_batch* pending = pending_batch_find(
-      elem, "invoking recv_trailing_metadata for",
-      [](grpc_transport_stream_op_batch* batch) {
-        return batch->recv_trailing_metadata &&
-               batch->payload->recv_trailing_metadata
-                       .recv_trailing_metadata_ready != nullptr;
-      });
-  // If we generated the recv_trailing_metadata op internally via
-  // start_internal_recv_trailing_metadata(), then there will be no
-  // pending batch.
-  if (pending == nullptr) {
-    GRPC_ERROR_UNREF(error);
-    return;
-  }
-  // Return metadata.
-  grpc_metadata_batch_move(
-      &batch_data->recv_trailing_metadata,
-      pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
-  // Add closure.
-  closures->Add(pending->batch->payload->recv_trailing_metadata
-                    .recv_trailing_metadata_ready,
-                error, "recv_trailing_metadata_ready for pending batch");
-  // Update bookkeeping.
-  pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
-      nullptr;
-  maybe_clear_pending_batch(elem, pending);
-}
-
-// Adds any necessary closures for deferred recv_initial_metadata and
-// recv_message callbacks to closures.
-static void add_closures_for_deferred_recv_callbacks(
-    subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
-    grpc_core::CallCombinerClosureList* closures) {
-  if (batch_data->batch.recv_trailing_metadata) {
-    // Add closure for deferred recv_initial_metadata_ready.
-    if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
-                     nullptr)) {
-      GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
-                        invoke_recv_initial_metadata_callback,
-                        retry_state->recv_initial_metadata_ready_deferred_batch,
-                        grpc_schedule_on_exec_ctx);
-      closures->Add(&batch_data->recv_initial_metadata_ready,
-                    retry_state->recv_initial_metadata_error,
-                    "resuming recv_initial_metadata_ready");
-      retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
-    }
-    // Add closure for deferred recv_message_ready.
-    if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
-                     nullptr)) {
-      GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
-                        invoke_recv_message_callback,
-                        retry_state->recv_message_ready_deferred_batch,
-                        grpc_schedule_on_exec_ctx);
-      closures->Add(&batch_data->recv_message_ready,
-                    retry_state->recv_message_error,
-                    "resuming recv_message_ready");
-      retry_state->recv_message_ready_deferred_batch = nullptr;
-    }
-  }
-}
-
-// Returns true if any op in the batch was not yet started.
-// Only looks at send ops, since recv ops are always started immediately.
-static bool pending_batch_is_unstarted(
-    pending_batch* pending, call_data* calld,
-    subchannel_call_retry_state* retry_state) {
-  if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
-    return false;
-  }
-  if (pending->batch->send_initial_metadata &&
-      !retry_state->started_send_initial_metadata) {
-    return true;
-  }
-  if (pending->batch->send_message &&
-      retry_state->started_send_message_count < calld->send_messages->size()) {
-    return true;
-  }
-  if (pending->batch->send_trailing_metadata &&
-      !retry_state->started_send_trailing_metadata) {
-    return true;
-  }
-  return false;
-}
-
-// For any pending batch containing an op that has not yet been started,
-// adds the pending batch's completion closures to closures.
-static void add_closures_to_fail_unstarted_pending_batches(
-    grpc_call_element* elem, subchannel_call_retry_state* retry_state,
-    grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
-  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
-  call_data* calld = static_cast<call_data*>(elem->call_data);
-  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
-    pending_batch* pending = &calld->pending_batches[i];
-    if (pending_batch_is_unstarted(pending, calld, retry_state)) {
-      if (grpc_client_channel_trace.enabled()) {
-        gpr_log(GPR_INFO,
-                "chand=%p calld=%p: failing unstarted pending batch at index "
-                "%" PRIuPTR,
-                chand, calld, i);
-      }
-      closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
-                    "failing on_complete for pending batch");
-      pending->batch->on_complete = nullptr;
-      maybe_clear_pending_batch(elem, pending);
-    }
-  }
   GRPC_ERROR_UNREF(error);
 }
 
-// Intercepts recv_trailing_metadata_ready callback for retries.
-// Commits the call and returns the trailing metadata up the stack.
-static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
-  subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
-  grpc_call_element* elem = batch_data->elem;
+//
+// list of closures to execute in call combiner
+//
+
+// Represents a closure that needs to run in the call combiner as part of
+// starting or completing a batch.
+typedef struct {
+  grpc_closure* closure;
+  grpc_error* error;
+  const char* reason;
+  bool free_reason = false;
+} closure_to_execute;
+
+static void execute_closures_in_call_combiner(grpc_call_element* elem,
+                                              const char* caller,
+                                              closure_to_execute* closures,
+                                              size_t num_closures) {
   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
-  if (grpc_client_channel_trace.enabled()) {
-    gpr_log(GPR_INFO,
-            "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
-            chand, calld, grpc_error_string(error));
-  }
-  subchannel_call_retry_state* retry_state =
-      static_cast<subchannel_call_retry_state*>(
-          grpc_connected_subchannel_call_get_parent_data(
-              batch_data->subchannel_call));
-  retry_state->completed_recv_trailing_metadata = true;
-  // Get the call's status and check for server pushback metadata.
-  grpc_status_code status = GRPC_STATUS_OK;
-  grpc_mdelem* server_pushback_md = nullptr;
-  if (error != GRPC_ERROR_NONE) {
-    grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
-                          nullptr);
+  // Note that the call combiner will be yielded for each closure that
+  // we schedule.  We're already running in the call combiner, so one of
+  // the closures can be scheduled directly, but the others will
+  // have to re-enter the call combiner.
+  if (num_closures > 0) {
+    if (grpc_client_channel_trace.enabled()) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand,
+              calld, caller, closures[0].reason);
+    }
+    GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
+    if (closures[0].free_reason) {
+      gpr_free(const_cast<char*>(closures[0].reason));
+    }
+    for (size_t i = 1; i < num_closures; ++i) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: %s starting closure in call combiner: %s",
+                chand, calld, caller, closures[i].reason);
+      }
+      GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
+                               closures[i].error, closures[i].reason);
+      if (closures[i].free_reason) {
+        gpr_free(const_cast<char*>(closures[i].reason));
+      }
+    }
   } else {
-    grpc_metadata_batch* md_batch =
-        batch_data->batch.payload->recv_trailing_metadata
-            .recv_trailing_metadata;
-    GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
-    status =
-        grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
-    if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
-      server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
+    if (grpc_client_channel_trace.enabled()) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand,
+              calld, caller);
     }
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run");
   }
-  if (grpc_client_channel_trace.enabled()) {
-    gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
-            calld, grpc_status_code_to_string(status));
-  }
-  // Check if we should retry.
-  if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
-    // Unref batch_data for deferred recv_initial_metadata_ready or
-    // recv_message_ready callbacks, if any.
-    if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
-      batch_data_unref(batch_data);
-      GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
-    }
-    if (retry_state->recv_message_ready_deferred_batch != nullptr) {
-      batch_data_unref(batch_data);
-      GRPC_ERROR_UNREF(retry_state->recv_message_error);
-    }
-    batch_data_unref(batch_data);
-    return;
-  }
-  // Not retrying, so commit the call.
-  retry_commit(elem, retry_state);
-  // Construct list of closures to execute.
-  grpc_core::CallCombinerClosureList closures;
-  // First, add closure for recv_trailing_metadata_ready.
-  add_closure_for_recv_trailing_metadata_ready(
-      elem, batch_data, GRPC_ERROR_REF(error), &closures);
-  // If there are deferred recv_initial_metadata_ready or recv_message_ready
-  // callbacks, add them to closures.
-  add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
-  // Add closures to fail any pending batches that have not yet been started.
-  add_closures_to_fail_unstarted_pending_batches(
-      elem, retry_state, GRPC_ERROR_REF(error), &closures);
-  // Don't need batch_data anymore.
-  batch_data_unref(batch_data);
-  // Schedule all of the closures identified above.
-  // Note: This will release the call combiner.
-  closures.RunClosures(calld->call_combiner);
 }
 
 //
 // on_complete callback handling
 //
 
-// For any pending batch completed in batch_data, adds the necessary
-// completion closures to closures.
-static void add_closure_for_completed_pending_batch(
-    grpc_call_element* elem, subchannel_batch_data* batch_data,
-    subchannel_call_retry_state* retry_state, grpc_error* error,
-    grpc_core::CallCombinerClosureList* closures) {
-  pending_batch* pending = pending_batch_find(
-      elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
-        return batch->on_complete != nullptr &&
-               batch_data->batch.send_initial_metadata ==
-                   batch->send_initial_metadata &&
-               batch_data->batch.send_message == batch->send_message &&
-               batch_data->batch.send_trailing_metadata ==
-                   batch->send_trailing_metadata;
-      });
-  // If batch_data is a replay batch, then there will be no pending
-  // batch to complete.
-  if (pending == nullptr) {
-    GRPC_ERROR_UNREF(error);
-    return;
+// Updates retry_state to reflect the ops completed in batch_data.
+static void update_retry_state_for_completed_batch(
+    subchannel_batch_data* batch_data,
+    subchannel_call_retry_state* retry_state) {
+  if (batch_data->batch.send_initial_metadata) {
+    retry_state->completed_send_initial_metadata = true;
   }
-  // Add closure.
-  closures->Add(pending->batch->on_complete, error,
-                "on_complete for pending batch");
-  pending->batch->on_complete = nullptr;
-  maybe_clear_pending_batch(elem, pending);
+  if (batch_data->batch.send_message) {
+    ++retry_state->completed_send_message_count;
+  }
+  if (batch_data->batch.send_trailing_metadata) {
+    retry_state->completed_send_trailing_metadata = true;
+  }
+  if (batch_data->batch.recv_initial_metadata) {
+    retry_state->completed_recv_initial_metadata = true;
+  }
+  if (batch_data->batch.recv_message) {
+    ++retry_state->completed_recv_message_count;
+  }
+  if (batch_data->batch.recv_trailing_metadata) {
+    retry_state->completed_recv_trailing_metadata = true;
+  }
+}
+
+// Adds any necessary closures for deferred recv_initial_metadata and
+// recv_message callbacks to closures, updating *num_closures as needed.
+static void add_closures_for_deferred_recv_callbacks(
+    subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
+    closure_to_execute* closures, size_t* num_closures) {
+  if (batch_data->batch.recv_trailing_metadata) {
+    // Add closure for deferred recv_initial_metadata_ready.
+    if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
+                     nullptr)) {
+      closure_to_execute* closure = &closures[(*num_closures)++];
+      closure->closure = GRPC_CLOSURE_INIT(
+          &batch_data->recv_initial_metadata_ready,
+          invoke_recv_initial_metadata_callback,
+          retry_state->recv_initial_metadata_ready_deferred_batch,
+          grpc_schedule_on_exec_ctx);
+      closure->error = retry_state->recv_initial_metadata_error;
+      closure->reason = "resuming recv_initial_metadata_ready";
+      retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
+    }
+    // Add closure for deferred recv_message_ready.
+    if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
+                     nullptr)) {
+      closure_to_execute* closure = &closures[(*num_closures)++];
+      closure->closure = GRPC_CLOSURE_INIT(
+          &batch_data->recv_message_ready, invoke_recv_message_callback,
+          retry_state->recv_message_ready_deferred_batch,
+          grpc_schedule_on_exec_ctx);
+      closure->error = retry_state->recv_message_error;
+      closure->reason = "resuming recv_message_ready";
+      retry_state->recv_message_ready_deferred_batch = nullptr;
+    }
+  }
 }
 
 // If there are any cached ops to replay or pending ops to start on the
 // subchannel call, adds a closure to closures to invoke
-// start_retriable_subchannel_batches().
+// start_retriable_subchannel_batches(), updating *num_closures as needed.
 static void add_closures_for_replay_or_pending_send_ops(
     grpc_call_element* elem, subchannel_batch_data* batch_data,
-    subchannel_call_retry_state* retry_state,
-    grpc_core::CallCombinerClosureList* closures) {
+    subchannel_call_retry_state* retry_state, closure_to_execute* closures,
+    size_t* num_closures) {
   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
   bool have_pending_send_message_ops =
@@ -1974,14 +1929,95 @@
               "chand=%p calld=%p: starting next batch for pending send op(s)",
               chand, calld);
     }
-    GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
-                      start_retriable_subchannel_batches, elem,
-                      grpc_schedule_on_exec_ctx);
-    closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
-                  "starting next batch for send_* op(s)");
+    closure_to_execute* closure = &closures[(*num_closures)++];
+    closure->closure = GRPC_CLOSURE_INIT(
+        &batch_data->batch.handler_private.closure,
+        start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx);
+    closure->error = GRPC_ERROR_NONE;
+    closure->reason = "starting next batch for send_* op(s)";
   }
 }
 
+// For any pending batch completed in batch_data, adds the necessary
+// completion closures to closures, updating *num_closures as needed.
+static void add_closures_for_completed_pending_batches(
+    grpc_call_element* elem, subchannel_batch_data* batch_data,
+    subchannel_call_retry_state* retry_state, grpc_error* error,
+    closure_to_execute* closures, size_t* num_closures) {
+  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+  call_data* calld = static_cast<call_data*>(elem->call_data);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+    pending_batch* pending = &calld->pending_batches[i];
+    if (pending_batch_is_completed(pending, calld, retry_state)) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
+                chand, calld, i);
+      }
+      // Copy the trailing metadata to return it to the surface.
+      if (batch_data->batch.recv_trailing_metadata) {
+        grpc_metadata_batch_move(&batch_data->recv_trailing_metadata,
+                                 pending->batch->payload->recv_trailing_metadata
+                                     .recv_trailing_metadata);
+      }
+      closure_to_execute* closure = &closures[(*num_closures)++];
+      closure->closure = pending->batch->on_complete;
+      closure->error = GRPC_ERROR_REF(error);
+      closure->reason = "on_complete for pending batch";
+      pending->batch->on_complete = nullptr;
+      maybe_clear_pending_batch(elem, pending);
+    }
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
+// For any pending batch containing an op that has not yet been started,
+// adds the pending batch's completion closures to closures, updating
+// *num_closures as needed.
+static void add_closures_to_fail_unstarted_pending_batches(
+    grpc_call_element* elem, subchannel_call_retry_state* retry_state,
+    grpc_error* error, closure_to_execute* closures, size_t* num_closures) {
+  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+  call_data* calld = static_cast<call_data*>(elem->call_data);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
+    pending_batch* pending = &calld->pending_batches[i];
+    if (pending_batch_is_unstarted(pending, calld, retry_state)) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: failing unstarted pending batch at index "
+                "%" PRIuPTR,
+                chand, calld, i);
+      }
+      if (pending->batch->recv_initial_metadata) {
+        closure_to_execute* closure = &closures[(*num_closures)++];
+        closure->closure = pending->batch->payload->recv_initial_metadata
+                               .recv_initial_metadata_ready;
+        closure->error = GRPC_ERROR_REF(error);
+        closure->reason =
+            "failing recv_initial_metadata_ready for pending batch";
+        pending->batch->payload->recv_initial_metadata
+            .recv_initial_metadata_ready = nullptr;
+      }
+      if (pending->batch->recv_message) {
+        *pending->batch->payload->recv_message.recv_message = nullptr;
+        closure_to_execute* closure = &closures[(*num_closures)++];
+        closure->closure =
+            pending->batch->payload->recv_message.recv_message_ready;
+        closure->error = GRPC_ERROR_REF(error);
+        closure->reason = "failing recv_message_ready for pending batch";
+        pending->batch->payload->recv_message.recv_message_ready = nullptr;
+      }
+      closure_to_execute* closure = &closures[(*num_closures)++];
+      closure->closure = pending->batch->on_complete;
+      closure->error = GRPC_ERROR_REF(error);
+      closure->reason = "failing on_complete for pending batch";
+      pending->batch->on_complete = nullptr;
+      maybe_clear_pending_batch(elem, pending);
+    }
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
 // Callback used to intercept on_complete from subchannel calls.
 // Called only when retries are enabled.
 static void on_complete(void* arg, grpc_error* error) {
@@ -1999,48 +2035,135 @@
       static_cast<subchannel_call_retry_state*>(
           grpc_connected_subchannel_call_get_parent_data(
               batch_data->subchannel_call));
+  // If we have previously completed recv_trailing_metadata, then the
+  // call is finished.
+  bool call_finished = retry_state->completed_recv_trailing_metadata;
+  // Record whether we were already committed before receiving this callback.
+  const bool previously_committed = calld->retry_committed;
   // Update bookkeeping in retry_state.
-  if (batch_data->batch.send_initial_metadata) {
-    retry_state->completed_send_initial_metadata = true;
+  update_retry_state_for_completed_batch(batch_data, retry_state);
+  if (call_finished) {
+    if (grpc_client_channel_trace.enabled()) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand,
+              calld);
+    }
+  } else {
+    // Check if this batch finished the call, and if so, get its status.
+    // The call is finished if either (a) this callback was invoked with
+    // an error or (b) we receive status.
+    grpc_status_code status = GRPC_STATUS_OK;
+    grpc_mdelem* server_pushback_md = nullptr;
+    if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {  // Case (a).
+      call_finished = true;
+      grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
+                            nullptr);
+    } else if (batch_data->batch.recv_trailing_metadata) {  // Case (b).
+      call_finished = true;
+      grpc_metadata_batch* md_batch =
+          batch_data->batch.payload->recv_trailing_metadata
+              .recv_trailing_metadata;
+      GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+      status = grpc_get_status_code_from_metadata(
+          md_batch->idx.named.grpc_status->md);
+      if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+        server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
+      }
+    }
+    // If the call just finished, check if we should retry.
+    if (call_finished) {
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
+                calld, grpc_status_code_to_string(status));
+      }
+      if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
+        // Unref batch_data for deferred recv_initial_metadata_ready or
+        // recv_message_ready callbacks, if any.
+        if (batch_data->batch.recv_trailing_metadata &&
+            retry_state->recv_initial_metadata_ready_deferred_batch !=
+                nullptr) {
+          batch_data_unref(batch_data);
+          GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+        }
+        if (batch_data->batch.recv_trailing_metadata &&
+            retry_state->recv_message_ready_deferred_batch != nullptr) {
+          batch_data_unref(batch_data);
+          GRPC_ERROR_UNREF(retry_state->recv_message_error);
+        }
+        // Track number of pending subchannel send batches and determine if
+        // this was the last one.
+        bool last_callback_complete = false;
+        if (batch_data->batch.send_initial_metadata ||
+            batch_data->batch.send_message ||
+            batch_data->batch.send_trailing_metadata) {
+          --calld->num_pending_retriable_subchannel_send_batches;
+          last_callback_complete =
+              calld->num_pending_retriable_subchannel_send_batches == 0;
+        }
+        batch_data_unref(batch_data);
+        // If we just completed the last subchannel send batch, unref the
+        // call stack.
+        if (last_callback_complete) {
+          GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
+        }
+        return;
+      }
+      // Not retrying, so commit the call.
+      retry_commit(elem, retry_state);
+    }
   }
-  if (batch_data->batch.send_message) {
-    ++retry_state->completed_send_message_count;
-  }
-  if (batch_data->batch.send_trailing_metadata) {
-    retry_state->completed_send_trailing_metadata = true;
-  }
-  // If the call is committed, free cached data for send ops that we've just
-  // completed.
-  if (calld->retry_committed) {
+  // If we were already committed before receiving this callback, free
+  // cached data for send ops that we've just completed.  (If the call has
+  // just now finished, the call to retry_commit() above will have freed all
+  // cached send ops, so we don't need to do it here.)
+  if (previously_committed) {
     free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
   }
+  // Call not being retried.
   // Construct list of closures to execute.
-  grpc_core::CallCombinerClosureList closures;
-  // If a retry was already dispatched, that means we saw
-  // recv_trailing_metadata before this, so we do nothing here.
-  // Otherwise, invoke the callback to return the result to the surface.
-  if (!retry_state->retry_dispatched) {
-    // Add closure for the completed pending batch, if any.
-    add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
-                                            GRPC_ERROR_REF(error), &closures);
-    // If needed, add a callback to start_retriable_subchannel_batches() to
-    // start any replay or pending send ops on the subchannel call.
-    if (!retry_state->completed_recv_trailing_metadata) {
-      add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
-                                                  &closures);
-    }
+  // Max number of closures is number of pending batches plus one for
+  // each of:
+  // - recv_initial_metadata_ready (either deferred or unstarted)
+  // - recv_message_ready (either deferred or unstarted)
+  // - starting a new batch for pending send ops
+  closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3];
+  size_t num_closures = 0;
+  // If there are deferred recv_initial_metadata_ready or recv_message_ready
+  // callbacks, add them to closures.
+  add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures,
+                                           &num_closures);
+  // Find pending batches whose ops are now complete and add their
+  // on_complete callbacks to closures.
+  add_closures_for_completed_pending_batches(elem, batch_data, retry_state,
+                                             GRPC_ERROR_REF(error), closures,
+                                             &num_closures);
+  // Add closures to handle any pending batches that have not yet been started.
+  // If the call is finished, we fail these batches; otherwise, we add a
+  // callback to start_retriable_subchannel_batches() to start them on
+  // the subchannel call.
+  if (call_finished) {
+    add_closures_to_fail_unstarted_pending_batches(
+        elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures);
+  } else {
+    add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
+                                                closures, &num_closures);
   }
   // Track number of pending subchannel send batches and determine if this
   // was the last one.
-  --calld->num_pending_retriable_subchannel_send_batches;
-  const bool last_callback_complete =
-      calld->num_pending_retriable_subchannel_send_batches == 0;
+  bool last_callback_complete = false;
+  if (batch_data->batch.send_initial_metadata ||
+      batch_data->batch.send_message ||
+      batch_data->batch.send_trailing_metadata) {
+    --calld->num_pending_retriable_subchannel_send_batches;
+    last_callback_complete =
+        calld->num_pending_retriable_subchannel_send_batches == 0;
+  }
   // Don't need batch_data anymore.
   batch_data_unref(batch_data);
   // Schedule all of the closures identified above.
   // Note: This yeilds the call combiner.
-  closures.RunClosures(calld->call_combiner);
-  // If this was the last subchannel send batch, unref the call stack.
+  execute_closures_in_call_combiner(elem, "on_complete", closures,
+                                    num_closures);
+  // If we just completed the last subchannel send batch, unref the call stack.
   if (last_callback_complete) {
     GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
   }
@@ -2062,22 +2185,27 @@
 
 // Adds a closure to closures that will execute batch in the call combiner.
 static void add_closure_for_subchannel_batch(
-    grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
-    grpc_core::CallCombinerClosureList* closures) {
-  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
-  call_data* calld = static_cast<call_data*>(elem->call_data);
+    call_data* calld, grpc_transport_stream_op_batch* batch,
+    closure_to_execute* closures, size_t* num_closures) {
   batch->handler_private.extra_arg = calld->subchannel_call;
   GRPC_CLOSURE_INIT(&batch->handler_private.closure,
                     start_batch_in_call_combiner, batch,
                     grpc_schedule_on_exec_ctx);
+  closure_to_execute* closure = &closures[(*num_closures)++];
+  closure->closure = &batch->handler_private.closure;
+  closure->error = GRPC_ERROR_NONE;
+  // If the tracer is enabled, we log a more detailed message, which
+  // requires dynamic allocation.  This will be freed in
+  // start_retriable_subchannel_batches().
   if (grpc_client_channel_trace.enabled()) {
     char* batch_str = grpc_transport_stream_op_batch_string(batch);
-    gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
-            calld, batch_str);
+    gpr_asprintf(const_cast<char**>(&closure->reason),
+                 "starting batch in call combiner: %s", batch_str);
     gpr_free(batch_str);
+    closure->free_reason = true;
+  } else {
+    closure->reason = "start_subchannel_batch";
   }
-  closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
-                "start_subchannel_batch");
 }
 
 // Adds retriable send_initial_metadata op to batch_data.
@@ -2213,13 +2341,9 @@
   grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
       &batch_data->recv_trailing_metadata;
-  batch_data->batch.payload->recv_trailing_metadata.collect_stats =
+  batch_data->batch.collect_stats = true;
+  batch_data->batch.payload->collect_stats.collect_stats =
       &batch_data->collect_stats;
-  GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready,
-                    recv_trailing_metadata_ready, batch_data,
-                    grpc_schedule_on_exec_ctx);
-  batch_data->batch.payload->recv_trailing_metadata
-      .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready;
 }
 
 // Helper function used to start a recv_trailing_metadata batch.  This
@@ -2240,11 +2364,9 @@
           grpc_connected_subchannel_call_get_parent_data(
               calld->subchannel_call));
   // Create batch_data with 2 refs, since this batch will be unreffed twice:
-  // once for the recv_trailing_metadata_ready callback when the subchannel
-  // batch returns, and again when we actually get a recv_trailing_metadata
-  // op from the surface.
-  subchannel_batch_data* batch_data =
-      batch_data_create(elem, 2, false /* set_on_complete */);
+  // once when the subchannel batch returns, and again when we actually get
+  // a recv_trailing_metadata op from the surface.
+  subchannel_batch_data* batch_data = batch_data_create(elem, 2);
   add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
   retry_state->recv_trailing_metadata_internal_batch = batch_data;
   // Note: This will release the call combiner.
@@ -2269,7 +2391,7 @@
               "send_initial_metadata op",
               chand, calld);
     }
-    replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
+    replay_batch_data = batch_data_create(elem, 1);
     add_retriable_send_initial_metadata_op(calld, retry_state,
                                            replay_batch_data);
   }
@@ -2286,8 +2408,7 @@
               chand, calld);
     }
     if (replay_batch_data == nullptr) {
-      replay_batch_data =
-          batch_data_create(elem, 1, true /* set_on_complete */);
+      replay_batch_data = batch_data_create(elem, 1);
     }
     add_retriable_send_message_op(elem, retry_state, replay_batch_data);
   }
@@ -2306,8 +2427,7 @@
               chand, calld);
     }
     if (replay_batch_data == nullptr) {
-      replay_batch_data =
-          batch_data_create(elem, 1, true /* set_on_complete */);
+      replay_batch_data = batch_data_create(elem, 1);
     }
     add_retriable_send_trailing_metadata_op(calld, retry_state,
                                             replay_batch_data);
@@ -2319,7 +2439,7 @@
 // *num_batches as needed.
 static void add_subchannel_batches_for_pending_batches(
     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
-    grpc_core::CallCombinerClosureList* closures) {
+    closure_to_execute* closures, size_t* num_closures) {
   call_data* calld = static_cast<call_data*>(elem->call_data);
   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
     pending_batch* pending = &calld->pending_batches[i];
@@ -2375,11 +2495,13 @@
         if (retry_state->completed_recv_trailing_metadata) {
           subchannel_batch_data* batch_data =
               retry_state->recv_trailing_metadata_internal_batch;
+          closure_to_execute* closure = &closures[(*num_closures)++];
+          closure->closure = &batch_data->on_complete;
           // Batches containing recv_trailing_metadata always succeed.
-          closures->Add(
-              &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
-              "re-executing recv_trailing_metadata_ready to propagate "
-              "internally triggered result");
+          closure->error = GRPC_ERROR_NONE;
+          closure->reason =
+              "re-executing on_complete for recv_trailing_metadata "
+              "to propagate internally triggered result";
         } else {
           batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
         }
@@ -2391,19 +2513,14 @@
     if (calld->method_params == nullptr ||
         calld->method_params->retry_policy() == nullptr ||
         calld->retry_committed) {
-      add_closure_for_subchannel_batch(elem, batch, closures);
+      add_closure_for_subchannel_batch(calld, batch, closures, num_closures);
       pending_batch_clear(calld, pending);
       continue;
     }
     // Create batch with the right number of callbacks.
-    const bool has_send_ops = batch->send_initial_metadata ||
-                              batch->send_message ||
-                              batch->send_trailing_metadata;
-    const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
-                              batch->recv_message +
-                              batch->recv_trailing_metadata;
-    subchannel_batch_data* batch_data = batch_data_create(
-        elem, num_callbacks, has_send_ops /* set_on_complete */);
+    const int num_callbacks =
+        1 + batch->recv_initial_metadata + batch->recv_message;
+    subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks);
     // Cache send ops if needed.
     maybe_cache_send_ops_for_batch(calld, pending);
     // send_initial_metadata.
@@ -2430,9 +2547,11 @@
     }
     // recv_trailing_metadata.
     if (batch->recv_trailing_metadata) {
+      GPR_ASSERT(batch->collect_stats);
       add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
     }
-    add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
+    add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
+                                     num_closures);
     // Track number of pending subchannel send batches.
     // If this is the first one, take a ref to the call stack.
     if (batch->send_initial_metadata || batch->send_message ||
@@ -2460,13 +2579,15 @@
           grpc_connected_subchannel_call_get_parent_data(
               calld->subchannel_call));
   // Construct list of closures to execute, one for each pending batch.
-  grpc_core::CallCombinerClosureList closures;
+  // We can start up to 6 batches.
+  closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)];
+  size_t num_closures = 0;
   // Replay previously-returned send_* ops if needed.
   subchannel_batch_data* replay_batch_data =
       maybe_create_subchannel_batch_for_replay(elem, retry_state);
   if (replay_batch_data != nullptr) {
-    add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
-                                     &closures);
+    add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
+                                     &num_closures);
     // Track number of pending subchannel send batches.
     // If this is the first one, take a ref to the call stack.
     if (calld->num_pending_retriable_subchannel_send_batches == 0) {
@@ -2475,16 +2596,17 @@
     ++calld->num_pending_retriable_subchannel_send_batches;
   }
   // Now add pending batches.
-  add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
+  add_subchannel_batches_for_pending_batches(elem, retry_state, closures,
+                                             &num_closures);
   // Start batches on subchannel call.
   if (grpc_client_channel_trace.enabled()) {
     gpr_log(GPR_INFO,
             "chand=%p calld=%p: starting %" PRIuPTR
             " retriable batches on subchannel_call=%p",
-            chand, calld, closures.size(), calld->subchannel_call);
+            chand, calld, num_closures, calld->subchannel_call);
   }
-  // Note: This will yield the call combiner.
-  closures.RunClosures(calld->call_combiner);
+  execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches",
+                                    closures, num_closures);
 }
 
 //
diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc
index d575d2d..27d3eac 100644
--- a/src/core/ext/filters/deadline/deadline_filter.cc
+++ b/src/core/ext/filters/deadline/deadline_filter.cc
@@ -128,25 +128,21 @@
   }
 }
 
-// Callback run when we receive trailing metadata.
-static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
+// Callback run when the call is complete.
+static void on_complete(void* arg, grpc_error* error) {
   grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
   cancel_timer_if_needed(deadline_state);
-  // Invoke the original callback.
-  GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready,
-                   GRPC_ERROR_REF(error));
+  // Invoke the next callback.
+  GRPC_CLOSURE_RUN(deadline_state->next_on_complete, GRPC_ERROR_REF(error));
 }
 
-// Inject our own recv_trailing_metadata_ready callback into op.
-static void inject_recv_trailing_metadata_ready(
-    grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
-  deadline_state->original_recv_trailing_metadata_ready =
-      op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
-  GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
-                    recv_trailing_metadata_ready, deadline_state,
+// Inject our own on_complete callback into op.
+static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
+                                  grpc_transport_stream_op_batch* op) {
+  deadline_state->next_on_complete = op->on_complete;
+  GRPC_CLOSURE_INIT(&deadline_state->on_complete, on_complete, deadline_state,
                     grpc_schedule_on_exec_ctx);
-  op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
-      &deadline_state->recv_trailing_metadata_ready;
+  op->on_complete = &deadline_state->on_complete;
 }
 
 // Callback and associated state for starting the timer after call stack
@@ -230,7 +226,7 @@
     // Make sure we know when the call is complete, so that we can cancel
     // the timer.
     if (op->recv_trailing_metadata) {
-      inject_recv_trailing_metadata_ready(deadline_state, op);
+      inject_on_complete_cb(deadline_state, op);
     }
   }
 }
@@ -327,7 +323,7 @@
     // the client never sends trailing metadata, because this is the
     // hook that tells us when the call is complete on the server side.
     if (op->recv_trailing_metadata) {
-      inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
+      inject_on_complete_cb(&calld->base.deadline_state, op);
     }
   }
   // Chain to next filter.
diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h
index 1d797f4..13207cb 100644
--- a/src/core/ext/filters/deadline/deadline_filter.h
+++ b/src/core/ext/filters/deadline/deadline_filter.h
@@ -37,12 +37,12 @@
   grpc_deadline_timer_state timer_state;
   grpc_timer timer;
   grpc_closure timer_callback;
-  // Closure to invoke when we receive trailing metadata.
+  // Closure to invoke when the call is complete.
   // We use this to cancel the timer.
-  grpc_closure recv_trailing_metadata_ready;
-  // The original recv_trailing_metadata_ready closure, which we chain to
-  // after our own closure is invoked.
-  grpc_closure* original_recv_trailing_metadata_ready;
+  grpc_closure on_complete;
+  // The original on_complete closure, which we chain to after our own
+  // closure is invoked.
+  grpc_closure* next_on_complete;
 } grpc_deadline_state;
 
 //
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index 1678051..ae94ce4 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -55,8 +55,8 @@
   grpc_closure recv_initial_metadata_ready;
   // State for handling recv_trailing_metadata ops.
   grpc_metadata_batch* recv_trailing_metadata;
-  grpc_closure* original_recv_trailing_metadata_ready;
-  grpc_closure recv_trailing_metadata_ready;
+  grpc_closure* original_recv_trailing_metadata_on_complete;
+  grpc_closure recv_trailing_metadata_on_complete;
   // State for handling send_message ops.
   grpc_transport_stream_op_batch* send_message_batch;
   size_t send_message_bytes_read;
@@ -153,7 +153,8 @@
   GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error);
 }
 
-static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
+static void recv_trailing_metadata_on_complete(void* user_data,
+                                               grpc_error* error) {
   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
   if (error == GRPC_ERROR_NONE) {
@@ -162,7 +163,7 @@
   } else {
     GRPC_ERROR_REF(error);
   }
-  GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
+  GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_on_complete, error);
 }
 
 static void send_message_on_complete(void* arg, grpc_error* error) {
@@ -311,10 +312,8 @@
     /* substitute our callback for the higher callback */
     calld->recv_trailing_metadata =
         batch->payload->recv_trailing_metadata.recv_trailing_metadata;
-    calld->original_recv_trailing_metadata_ready =
-        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
-    batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
-        &calld->recv_trailing_metadata_ready;
+    calld->original_recv_trailing_metadata_on_complete = batch->on_complete;
+    batch->on_complete = &calld->recv_trailing_metadata_on_complete;
   }
 
   grpc_error* error = GRPC_ERROR_NONE;
@@ -421,8 +420,8 @@
   GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
                     recv_initial_metadata_ready, elem,
                     grpc_schedule_on_exec_ctx);
-  GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
-                    recv_trailing_metadata_ready, elem,
+  GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete,
+                    recv_trailing_metadata_on_complete, elem,
                     grpc_schedule_on_exec_ctx);
   GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
                     elem, grpc_schedule_on_exec_ctx);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 0d6b72c..a8090d1 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1149,10 +1149,12 @@
   }
 }
 
+/* Flag that this closure barrier wants stats to be updated before finishing */
+#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
 /* Flag that this closure barrier may be covering a write in a pollset, and so
    we should not complete this closure until we can prove that the write got
    scheduled */
-#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
+#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1)
 /* First bit of the reference count, stored in the high order bits (with the low
    bits being used for flags defined above) */
 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
@@ -1204,6 +1206,10 @@
         grpc_error_add_child(closure->error_data.error, error);
   }
   if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
+    if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) {
+      grpc_transport_move_stats(&s->stats, s->collecting_stats);
+      s->collecting_stats = nullptr;
+    }
     if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
         !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
       GRPC_CLOSURE_RUN(closure, closure->error_data.error);
@@ -1345,14 +1351,9 @@
   }
 
   grpc_closure* on_complete = op->on_complete;
-  // TODO(roth): This is a hack needed because we use data inside of the
-  // closure itself to do the barrier calculation (i.e., to ensure that
-  // we don't schedule the closure until all ops in the batch have been
-  // completed).  This can go away once we move to a new C++ closure API
-  // that provides the ability to create a barrier closure.
   if (on_complete == nullptr) {
-    on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
-                                    nullptr, grpc_schedule_on_exec_ctx);
+    on_complete =
+        GRPC_CLOSURE_CREATE(do_nothing, nullptr, grpc_schedule_on_exec_ctx);
   }
 
   /* use final_data as a barrier until enqueue time; the inital counter is
@@ -1360,6 +1361,12 @@
   on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
   on_complete->error_data.error = GRPC_ERROR_NONE;
 
+  if (op->collect_stats) {
+    GPR_ASSERT(s->collecting_stats == nullptr);
+    s->collecting_stats = op_payload->collect_stats.collect_stats;
+    on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
+  }
+
   if (op->cancel_stream) {
     GRPC_STATS_INC_HTTP2_OP_CANCEL();
     grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
@@ -1593,11 +1600,8 @@
 
   if (op->recv_trailing_metadata) {
     GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
-    GPR_ASSERT(s->collecting_stats == nullptr);
-    s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
     GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
-    s->recv_trailing_metadata_finished =
-        op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+    s->recv_trailing_metadata_finished = add_closure_barrier(on_complete);
     s->recv_trailing_metadata =
         op_payload->recv_trailing_metadata.recv_trailing_metadata;
     s->final_metadata_requested = true;
@@ -1956,12 +1960,11 @@
     }
     if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
         s->recv_trailing_metadata_finished != nullptr) {
-      grpc_transport_move_stats(&s->stats, s->collecting_stats);
-      s->collecting_stats = nullptr;
       grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
                                                    s->recv_trailing_metadata);
-      null_then_run_closure(&s->recv_trailing_metadata_finished,
-                            GRPC_ERROR_NONE);
+      grpc_chttp2_complete_closure_step(
+          t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE,
+          "recv_trailing_metadata_finished");
     }
   }
 }
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index 4a252d9..420c2d1 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -925,10 +925,6 @@
       result = false;
     }
     /* Check if every op that was asked for is done. */
-    /* TODO(muxi): We should not consider the recv ops here, since they
-     * have their own callbacks.  We should invoke a batch's on_complete
-     * as soon as all of the batch's send ops are complete, even if
-     * there are still recv ops pending. */
     else if (curr_op->send_initial_metadata &&
              !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
       CRONET_LOG(GPR_DEBUG, "Because");
@@ -1284,20 +1280,12 @@
              op_can_be_run(stream_op, s, &oas->state,
                            OP_RECV_TRAILING_METADATA)) {
     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_TRAILING_METADATA", oas);
-    grpc_error* error = GRPC_ERROR_NONE;
-    if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
-      error = GRPC_ERROR_REF(stream_state->cancel_error);
-    } else if (stream_state->state_op_done[OP_FAILED]) {
-      error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
-    } else if (oas->s->state.rs.trailing_metadata_valid) {
+    if (oas->s->state.rs.trailing_metadata_valid) {
       grpc_chttp2_incoming_metadata_buffer_publish(
           &oas->s->state.rs.trailing_metadata,
           stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
       stream_state->rs.trailing_metadata_valid = false;
     }
-    GRPC_CLOSURE_SCHED(
-        stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
-        error);
     stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
     result = ACTION_TAKEN_NO_CALLBACK;
   } else if (stream_op->cancel_stream &&
@@ -1410,11 +1398,6 @@
       GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
                          GRPC_ERROR_CANCELLED);
     }
-    if (op->recv_trailing_metadata) {
-      GRPC_CLOSURE_SCHED(
-          op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
-          GRPC_ERROR_CANCELLED);
-    }
     GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
     return;
   }
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index b0ca7f8..2c3bff5 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -120,6 +120,7 @@
   struct inproc_stream* stream_list_next;
 } inproc_stream;
 
+static grpc_closure do_nothing_closure;
 static bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
 static void op_state_machine(void* arg, grpc_error* error);
 
@@ -372,10 +373,6 @@
                                          const char* msg) {
   int is_sm = static_cast<int>(op == s->send_message_op);
   int is_stm = static_cast<int>(op == s->send_trailing_md_op);
-  // TODO(vjpai): We should not consider the recv ops here, since they
-  // have their own callbacks.  We should invoke a batch's on_complete
-  // as soon as all of the batch's send ops are complete, even if there
-  // are still recv ops pending.
   int is_rim = static_cast<int>(op == s->recv_initial_md_op);
   int is_rm = static_cast<int>(op == s->recv_message_op);
   int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
@@ -499,11 +496,6 @@
     s->send_trailing_md_op = nullptr;
   }
   if (s->recv_trailing_md_op) {
-    INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
-               s, error);
-    GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
-                           .recv_trailing_metadata_ready,
-                       GRPC_ERROR_REF(error));
     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
                s, error);
     complete_if_batch_end_locked(
@@ -647,12 +639,6 @@
       s->trailing_md_sent = true;
       if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
         INPROC_LOG(GPR_INFO,
-                   "op_state_machine %p scheduling trailing-metadata-ready", s);
-        GRPC_CLOSURE_SCHED(
-            s->recv_trailing_md_op->payload->recv_trailing_metadata
-                .recv_trailing_metadata_ready,
-            GRPC_ERROR_NONE);
-        INPROC_LOG(GPR_INFO,
                    "op_state_machine %p scheduling trailing-md-on-complete", s);
         GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
                            GRPC_ERROR_NONE);
@@ -725,12 +711,6 @@
   }
   if (s->recv_trailing_md_op && s->t->is_client && other &&
       other->send_message_op) {
-    INPROC_LOG(GPR_INFO,
-               "op_state_machine %p scheduling trailing-metadata-ready %p", s,
-               GRPC_ERROR_NONE);
-    GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
-                           .recv_trailing_metadata_ready,
-                       GRPC_ERROR_NONE);
     maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
   }
   if (s->to_read_trailing_md_filled) {
@@ -786,10 +766,6 @@
         INPROC_LOG(GPR_INFO,
                    "op_state_machine %p scheduling trailing-md-on-complete %p",
                    s, new_err);
-        GRPC_CLOSURE_SCHED(
-            s->recv_trailing_md_op->payload->recv_trailing_metadata
-                .recv_trailing_metadata_ready,
-            GRPC_ERROR_REF(new_err));
         GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
                            GRPC_ERROR_REF(new_err));
         s->recv_trailing_md_op = nullptr;
@@ -883,9 +859,6 @@
     // couldn't complete that because we hadn't yet sent out trailing
     // md, now's the chance
     if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
-      GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
-                             .recv_trailing_metadata_ready,
-                         GRPC_ERROR_REF(s->cancel_self_error));
       complete_if_batch_end_locked(
           s, s->cancel_self_error, s->recv_trailing_md_op,
           "cancel_stream scheduling trailing-md-on-complete");
@@ -900,8 +873,6 @@
   return ret;
 }
 
-static void do_nothing(void* arg, grpc_error* error) {}
-
 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
                               grpc_transport_stream_op_batch* op) {
   INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
@@ -921,14 +892,8 @@
   }
   grpc_error* error = GRPC_ERROR_NONE;
   grpc_closure* on_complete = op->on_complete;
-  // TODO(roth): This is a hack needed because we use data inside of the
-  // closure itself to do the barrier calculation (i.e., to ensure that
-  // we don't schedule the closure until all ops in the batch have been
-  // completed).  This can go away once we move to a new C++ closure API
-  // that provides the ability to create a barrier closure.
   if (on_complete == nullptr) {
-    on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
-                                    nullptr, grpc_schedule_on_exec_ctx);
+    on_complete = &do_nothing_closure;
   }
 
   if (op->cancel_stream) {
@@ -1061,15 +1026,6 @@
         GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
                            GRPC_ERROR_REF(error));
       }
-      if (op->recv_trailing_metadata) {
-        INPROC_LOG(
-            GPR_INFO,
-            "perform_stream_op error %p scheduling trailing-metadata-ready %p",
-            s, error);
-        GRPC_CLOSURE_SCHED(
-            op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
-            GRPC_ERROR_REF(error));
-      }
     }
     INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
                error);
@@ -1173,8 +1129,12 @@
 /*******************************************************************************
  * GLOBAL INIT AND DESTROY
  */
+static void do_nothing(void* arg, grpc_error* error) {}
+
 void grpc_inproc_transport_init(void) {
   grpc_core::ExecCtx exec_ctx;
+  GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr,
+                    grpc_schedule_on_exec_ctx);
   g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
 
   grpc_slice key_tmp = grpc_slice_from_static_string(":path");
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index e2ea334..ddd3029 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -51,7 +51,6 @@
   callback_state on_complete[6];  // Max number of pending batches.
   callback_state recv_initial_metadata_ready;
   callback_state recv_message_ready;
-  callback_state recv_trailing_metadata_ready;
 } call_data;
 
 static void run_in_call_combiner(void* arg, grpc_error* error) {
@@ -112,12 +111,6 @@
     intercept_callback(calld, state, false, "recv_message_ready",
                        &batch->payload->recv_message.recv_message_ready);
   }
-  if (batch->recv_trailing_metadata) {
-    callback_state* state = &calld->recv_trailing_metadata_ready;
-    intercept_callback(
-        calld, state, false, "recv_trailing_metadata_ready",
-        &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
-  }
   if (batch->cancel_stream) {
     // There can be more than one cancellation batch in flight at any
     // given time, so we can't just pick out a fixed index into
@@ -128,7 +121,7 @@
         static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
     intercept_callback(calld, state, true, "on_complete (cancel_stream)",
                        &batch->on_complete);
-  } else if (batch->on_complete != nullptr) {
+  } else {
     callback_state* state = get_state_for_batch(calld, batch);
     intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
   }
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
index f9ce29f..0ccd08e 100644
--- a/src/core/lib/iomgr/call_combiner.h
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -26,7 +26,6 @@
 #include <grpc/support/atm.h>
 
 #include "src/core/lib/gpr/mpscq.h"
-#include "src/core/lib/gprpp/inlined_vector.h"
 #include "src/core/lib/iomgr/closure.h"
 
 // A simple, lock-free mechanism for serializing activity related to a
@@ -110,83 +109,4 @@
 void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
                                grpc_error* error);
 
-namespace grpc_core {
-
-// Helper for running a list of closures in a call combiner.
-//
-// Each callback running in the call combiner will eventually be
-// returned to the surface, at which point the surface will yield the
-// call combiner.  So when we are running in the call combiner and have
-// more than one callback to return to the surface, we need to re-enter
-// the call combiner for all but one of those callbacks.
-class CallCombinerClosureList {
- public:
-  CallCombinerClosureList() {}
-
-  // Adds a closure to the list.  The closure must eventually result in
-  // the call combiner being yielded.
-  void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
-    closures_.emplace_back(closure, error, reason);
-  }
-
-  // Runs all closures in the call combiner and yields the call combiner.
-  //
-  // All but one of the closures in the list will be scheduled via
-  // GRPC_CALL_COMBINER_START(), and the remaining closure will be
-  // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
-  // yielding the call combiner.  If the list is empty, then the call
-  // combiner will be yielded immediately.
-  void RunClosures(grpc_call_combiner* call_combiner) {
-    for (size_t i = 1; i < closures_.size(); ++i) {
-      auto& closure = closures_[i];
-      GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
-                               closure.reason);
-    }
-    if (closures_.size() > 0) {
-      if (grpc_call_combiner_trace.enabled()) {
-        gpr_log(GPR_INFO,
-                "CallCombinerClosureList executing closure while already "
-                "holding call_combiner %p: closure=%p error=%s reason=%s",
-                call_combiner, closures_[0].closure,
-                grpc_error_string(closures_[0].error), closures_[0].reason);
-      }
-      // This will release the call combiner.
-      GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
-    } else {
-      GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
-    }
-    closures_.clear();
-  }
-
-  // Runs all closures in the call combiner, but does NOT yield the call
-  // combiner.  All closures will be scheduled via GRPC_CALL_COMBINER_START().
-  void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
-    for (size_t i = 0; i < closures_.size(); ++i) {
-      auto& closure = closures_[i];
-      GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
-                               closure.reason);
-    }
-    closures_.clear();
-  }
-
-  size_t size() const { return closures_.size(); }
-
- private:
-  struct CallCombinerClosure {
-    grpc_closure* closure;
-    grpc_error* error;
-    const char* reason;
-
-    CallCombinerClosure(grpc_closure* closure, grpc_error* error,
-                        const char* reason)
-        : closure(closure), error(error), reason(reason) {}
-  };
-
-  // There are generally a maximum of 6 closures to run in the call
-  // combiner, one for each pending op.
-  InlinedVector<CallCombinerClosure, 6> closures_;
-};
-
-}  // namespace grpc_core
-
 #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index f14c723..34a4944 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -283,10 +283,9 @@
     if (c->scheduled) {
       gpr_log(GPR_ERROR,
               "Closure already scheduled. (closure: %p, created: [%s:%d], "
-              "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], "
-              "run?: %s",
+              "previously scheduled at: [%s: %d] run?: %s",
               c, c->file_created, c->line_created, c->file_initiated,
-              c->line_initiated, file, line, c->run ? "true" : "false");
+              c->line_initiated, c->run ? "true" : "false");
       abort();
     }
     c->scheduled = true;
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index d44846c..1cf8ea9 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -233,7 +233,6 @@
   grpc_closure receiving_slice_ready;
   grpc_closure receiving_stream_ready;
   grpc_closure receiving_initial_metadata_ready;
-  grpc_closure receiving_trailing_metadata_ready;
   uint32_t test_only_last_message_flags;
 
   grpc_closure release_call;
@@ -1210,6 +1209,7 @@
 
   if (bctl->op.send_initial_metadata) {
     grpc_metadata_batch_destroy(
+
         &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
   }
   if (bctl->op.send_message) {
@@ -1217,9 +1217,14 @@
   }
   if (bctl->op.send_trailing_metadata) {
     grpc_metadata_batch_destroy(
+
         &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
   }
   if (bctl->op.recv_trailing_metadata) {
+    grpc_metadata_batch* md =
+        &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
+    recv_trailing_filter(call, md);
+
     /* propagate cancellation to any interested children */
     gpr_atm_rel_store(&call->received_final_op_atm, 1);
     parent_call* pc = get_parent_call(call);
@@ -1241,6 +1246,7 @@
       }
       gpr_mu_unlock(&pc->child_list_mu);
     }
+
     if (call->is_client) {
       get_final_status(call, set_status_value_directly,
                        call->final_op.client.status,
@@ -1250,6 +1256,7 @@
       get_final_status(call, set_cancelled_value,
                        call->final_op.server.cancelled, nullptr, nullptr);
     }
+
     GRPC_ERROR_UNREF(error);
     error = GRPC_ERROR_NONE;
   }
@@ -1531,19 +1538,6 @@
   finish_batch_step(bctl);
 }
 
-static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
-  batch_control* bctl = static_cast<batch_control*>(bctlp);
-  grpc_call* call = bctl->call;
-  GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
-  add_batch_error(bctl, GRPC_ERROR_REF(error), false);
-  if (error == GRPC_ERROR_NONE) {
-    grpc_metadata_batch* md =
-        &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
-    recv_trailing_filter(call, md);
-  }
-  finish_batch_step(bctl);
-}
-
 static void finish_batch(void* bctlp, grpc_error* error) {
   batch_control* bctl = static_cast<batch_control*>(bctlp);
   grpc_call* call = bctl->call;
@@ -1564,8 +1558,7 @@
   size_t i;
   const grpc_op* op;
   batch_control* bctl;
-  bool has_send_ops = false;
-  int num_recv_ops = 0;
+  int num_completion_callbacks_needed = 1;
   grpc_call_error error = GRPC_CALL_OK;
   grpc_transport_stream_op_batch* stream_op;
   grpc_transport_stream_op_batch_payload* stream_op_payload;
@@ -1671,7 +1664,6 @@
           stream_op_payload->send_initial_metadata.peer_string =
               &call->peer_string;
         }
-        has_send_ops = true;
         break;
       }
       case GRPC_OP_SEND_MESSAGE: {
@@ -1701,7 +1693,6 @@
             &op->data.send_message.send_message->data.raw.slice_buffer, flags);
         stream_op_payload->send_message.send_message.reset(
             call->sending_stream.get());
-        has_send_ops = true;
         break;
       }
       case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@@ -1722,7 +1713,6 @@
         call->sent_final_op = true;
         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
-        has_send_ops = true;
         break;
       }
       case GRPC_OP_SEND_STATUS_FROM_SERVER: {
@@ -1787,7 +1777,6 @@
         }
         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
-        has_send_ops = true;
         break;
       }
       case GRPC_OP_RECV_INITIAL_METADATA: {
@@ -1815,7 +1804,7 @@
           stream_op_payload->recv_initial_metadata.peer_string =
               &call->peer_string;
         }
-        ++num_recv_ops;
+        num_completion_callbacks_needed++;
         break;
       }
       case GRPC_OP_RECV_MESSAGE: {
@@ -1837,7 +1826,7 @@
                           grpc_schedule_on_exec_ctx);
         stream_op_payload->recv_message.recv_message_ready =
             &call->receiving_stream_ready;
-        ++num_recv_ops;
+        num_completion_callbacks_needed++;
         break;
       }
       case GRPC_OP_RECV_STATUS_ON_CLIENT: {
@@ -1863,16 +1852,11 @@
         call->final_op.client.error_string =
             op->data.recv_status_on_client.error_string;
         stream_op->recv_trailing_metadata = true;
+        stream_op->collect_stats = true;
         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
-        stream_op_payload->recv_trailing_metadata.collect_stats =
+        stream_op_payload->collect_stats.collect_stats =
             &call->final_info.stats.transport_stream_stats;
-        GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
-                          receiving_trailing_metadata_ready, bctl,
-                          grpc_schedule_on_exec_ctx);
-        stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
-            &call->receiving_trailing_metadata_ready;
-        ++num_recv_ops;
         break;
       }
       case GRPC_OP_RECV_CLOSE_ON_SERVER: {
@@ -1893,16 +1877,11 @@
         call->final_op.server.cancelled =
             op->data.recv_close_on_server.cancelled;
         stream_op->recv_trailing_metadata = true;
+        stream_op->collect_stats = true;
         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
-        stream_op_payload->recv_trailing_metadata.collect_stats =
+        stream_op_payload->collect_stats.collect_stats =
             &call->final_info.stats.transport_stream_stats;
-        GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
-                          receiving_trailing_metadata_ready, bctl,
-                          grpc_schedule_on_exec_ctx);
-        stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
-            &call->receiving_trailing_metadata_ready;
-        ++num_recv_ops;
         break;
       }
     }
@@ -1912,15 +1891,13 @@
   if (!is_notify_tag_closure) {
     GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
   }
-  gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
+  gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
 
-  if (has_send_ops) {
-    GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
-                      grpc_schedule_on_exec_ctx);
-    stream_op->on_complete = &bctl->finish_batch;
-  }
-
+  GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
+                    grpc_schedule_on_exec_ctx);
+  stream_op->on_complete = &bctl->finish_batch;
   gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
+
   execute_batch(call, stream_op, &bctl->start_batch);
 
 done:
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index cbdb77c..039d603 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -212,32 +212,21 @@
   if (batch->send_message) {
     batch->payload->send_message.send_message.reset();
   }
-  if (batch->cancel_stream) {
-    GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
+  if (batch->recv_message) {
+    GRPC_CALL_COMBINER_START(
+        call_combiner, batch->payload->recv_message.recv_message_ready,
+        GRPC_ERROR_REF(error), "failing recv_message_ready");
   }
-  // Construct a list of closures to execute.
-  grpc_core::CallCombinerClosureList closures;
   if (batch->recv_initial_metadata) {
-    closures.Add(
+    GRPC_CALL_COMBINER_START(
+        call_combiner,
         batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
         GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
   }
-  if (batch->recv_message) {
-    closures.Add(batch->payload->recv_message.recv_message_ready,
-                 GRPC_ERROR_REF(error), "failing recv_message_ready");
+  GRPC_CLOSURE_SCHED(batch->on_complete, error);
+  if (batch->cancel_stream) {
+    GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
   }
-  if (batch->recv_trailing_metadata) {
-    closures.Add(
-        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
-        GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
-  }
-  if (batch->on_complete != nullptr) {
-    closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
-                 "failing on_complete");
-  }
-  // Execute closures.
-  closures.RunClosures(call_combiner);
-  GRPC_ERROR_UNREF(error);
 }
 
 typedef struct {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 585b9df..b2e252d 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -122,15 +122,9 @@
 /* Transport stream op: a set of operations to perform on a transport
    against a single stream */
 typedef struct grpc_transport_stream_op_batch {
-  /** Should be scheduled when all of the non-recv operations in the batch
-      are complete.
-
-      The recv ops (recv_initial_metadata, recv_message, and
-      recv_trailing_metadata) each have their own callbacks.  If a batch
-      contains both recv ops and non-recv ops, on_complete should be
-      scheduled as soon as the non-recv ops are complete, regardless of
-      whether or not the recv ops are complete.  If a batch contains
-      only recv ops, on_complete can be null. */
+  /** Should be enqueued when all requested operations (excluding recv_message
+      and recv_initial_metadata which have their own closures) in a given batch
+      have been completed. */
   grpc_closure* on_complete;
 
   /** Values for the stream op (fields set are determined by flags above) */
@@ -155,6 +149,9 @@
    */
   bool recv_trailing_metadata : 1;
 
+  /** Collect any stats into provided buffer, zero internal stat counters */
+  bool collect_stats : 1;
+
   /** Cancel this stream with the provided error */
   bool cancel_stream : 1;
 
@@ -222,11 +219,12 @@
 
   struct {
     grpc_metadata_batch* recv_trailing_metadata;
-    grpc_transport_stream_stats* collect_stats;
-    /** Should be enqueued when initial metadata is ready to be processed. */
-    grpc_closure* recv_trailing_metadata_ready;
   } recv_trailing_metadata;
 
+  struct {
+    grpc_transport_stream_stats* collect_stats;
+  } collect_stats;
+
   /** Forcefully close this stream.
       The HTTP2 semantics should be:
       - server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and
diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc
index 8c7db64..25ab492 100644
--- a/src/core/lib/transport/transport_op_string.cc
+++ b/src/core/lib/transport/transport_op_string.cc
@@ -120,6 +120,13 @@
     gpr_strvec_add(&b, tmp);
   }
 
+  if (op->collect_stats) {
+    gpr_strvec_add(&b, gpr_strdup(" "));
+    gpr_asprintf(&tmp, "COLLECT_STATS:%p",
+                 op->payload->collect_stats.collect_stats);
+    gpr_strvec_add(&b, tmp);
+  }
+
   out = gpr_strvec_flatten(&b, nullptr);
   gpr_strvec_destroy(&b);
 
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index dd1610d..831b29c 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -621,26 +621,18 @@
 static void StartTransportStreamOp(grpc_call_element* elem,
                                    grpc_transport_stream_op_batch* op) {
   call_data* calld = static_cast<call_data*>(elem->call_data);
-  // Construct list of closures to return.
-  grpc_core::CallCombinerClosureList closures;
   if (op->recv_initial_metadata) {
-    closures.Add(op->payload->recv_initial_metadata.recv_initial_metadata_ready,
-                 GRPC_ERROR_NONE, "recv_initial_metadata");
+    GRPC_CALL_COMBINER_START(
+        calld->call_combiner,
+        op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+        GRPC_ERROR_NONE, "recv_initial_metadata");
   }
   if (op->recv_message) {
-    closures.Add(op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE,
-                 "recv_message");
+    GRPC_CALL_COMBINER_START(calld->call_combiner,
+                             op->payload->recv_message.recv_message_ready,
+                             GRPC_ERROR_NONE, "recv_message");
   }
-  if (op->recv_trailing_metadata) {
-    closures.Add(
-        op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
-        GRPC_ERROR_NONE, "recv_trailing_metadata");
-  }
-  if (op->on_complete != nullptr) {
-    closures.Add(op->on_complete, GRPC_ERROR_NONE, "on_complete");
-  }
-  // Execute closures.
-  closures.RunClosures(calld->call_combiner);
+  GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_NONE);
 }
 
 static void StartTransportOp(grpc_channel_element* elem,