[virtualization] cleanup *_operation_test

In both client_operation_test and server_operation_test:
- Fix numerous leaks in test teardown.
- Improve error messages throughout.
- Replace sleep with async loop wait.
- Remove unnecessary auxiliary loop thread.
- Add stronger assertions on all GRPC completion queue operations.
- Update tests to run on qemu environments since they don't require
  virtualization.

Fixed: 59639, 48556, 48557, 67086
Bug: 47487
Change-Id: Idff30a0182a3c9c6ecc65fabde35efe0b26b3706
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/473874
Commit-Queue: Bruno Dal Bo <brunodalbo@google.com>
Reviewed-by: Abdulla Kamar <abdulla@google.com>
Reviewed-by: Tamir Duberstein <tamird@google.com>
diff --git a/src/virtualization/lib/guest_interaction/BUILD.gn b/src/virtualization/lib/guest_interaction/BUILD.gn
index 37eb4b1..d0205a9 100644
--- a/src/virtualization/lib/guest_interaction/BUILD.gn
+++ b/src/virtualization/lib/guest_interaction/BUILD.gn
@@ -126,10 +126,14 @@
     "//sdk/lib/sys/cpp",
     "//src/lib/fxl",
     "//src/lib/fxl/test:gtest_main",
+    "//src/lib/testing/predicates",
     "//src/virtualization/lib/grpc:grpc",
     "//third_party/grpc:grpc++",
     "//zircon/system/ulib/async-loop:async-loop-cpp",
   ]
+
+  # TODO(fxbug.dev/68325): Fix UB within GRPC and re-enable ubsan.
+  exclude_toolchain_tags = [ "ubsan" ]
 }
 
 executable("server_operation_test") {
@@ -145,6 +149,9 @@
     "//third_party/grpc:grpc++",
     "//zircon/system/ulib/async-loop:async-loop-cpp",
   ]
+
+  # TODO(fxbug.dev/68325): Fix UB within GRPC and re-enable ubsan.
+  exclude_toolchain_tags = [ "ubsan" ]
 }
 
 test_package("guest_interaction_tests") {
@@ -165,41 +172,20 @@
   tests = [
     {
       name = "client_operation_test"
-
-      # TODO(fxbug.dev/47487): Switch to [ nuc_env ] once gRPC leaks are fixed.
-      if (nuc_env_fails_on_asan != []) {
-        nuc_scope = nuc_env_fails_on_asan[0]
-
-        environments = [
-          {
-            dimensions = nuc_scope.dimensions
-
-            # TODO(fxbug.dev/59639): This test appears to flake roughly once per month during the
-            # Client_Exec_ImmediateFailure test.
-            tags = [ "flaky" ]
-          },
-        ]
-      } else {
-        # Disable this test for all non-NUC environments.
-        environments = []
-      }
     },
     {
       name = "server_operation_test"
-
-      # TODO(fxbug.dev/48556): Reenable once gRPC leaks are fixed.
-      environments = nuc_env_fails_on_asan
     },
     {
       name = "server_daemon_test"
 
-      # TODO(fxbug.dev/48127): Currently hangs in ASan. Fix and switch to [ nuc_env ].
+      # TODO(fxbug.dev/48129): Currently hangs in ASan. Fix and switch to [ nuc_env ].
       environments = nuc_env_fails_on_asan
     },
     {
       name = "guest_interaction_service_test"
 
-      # TODO(fxbug.dev/48127): Currently hangs in ASan. Fix and switch to [ nuc_env ].
+      # TODO(fxbug.dev/48129): Currently hangs in ASan. Fix and switch to [ nuc_env ].
       environments = nuc_env_fails_on_asan
     },
   ]
diff --git a/src/virtualization/lib/guest_interaction/server/server_operation_state.h b/src/virtualization/lib/guest_interaction/server/server_operation_state.h
index df45612..0826f08 100644
--- a/src/virtualization/lib/guest_interaction/server/server_operation_state.h
+++ b/src/virtualization/lib/guest_interaction/server/server_operation_state.h
@@ -173,6 +173,7 @@
   // or the client has sent a final empty byte
   void TryWrite();
   void SendFinalStatus(OperationStatus status);
+  void Finish();
 
   // gRPC async boilerplate
   GuestInteractionService::AsyncService* service_;
@@ -190,6 +191,10 @@
 
 template <class T>
 void PutCallData<T>::Proceed(bool ok) {
+  if (!ok && status_ != TRANSFER) {
+    Finish();
+    return;
+  }
   switch (status_) {
     case CREATE:
       status_ = INITIATE_TRANSFER;
@@ -209,15 +214,20 @@
       TryWrite();
       return;
     case FINISH:
-      if (fd_ > 0) {
-        platform_interface_.CloseFile(fd_);
-      }
-      delete this;
+      Finish();
       return;
   }
 }
 
 template <class T>
+void PutCallData<T>::Finish() {
+  if (fd_ > 0) {
+    platform_interface_.CloseFile(fd_);
+  }
+  delete this;
+}
+
+template <class T>
 void PutCallData<T>::SendFinalStatus(OperationStatus status) {
   PutResponse put_response;
   put_response.set_status(status);
@@ -255,6 +265,7 @@
   if (platform_interface_.WriteFile(fd_, new_request_.data().c_str(),
                                     new_request_.data().length()) < 0) {
     SendFinalStatus(OperationStatus::SERVER_FILE_WRITE_FAILURE);
+    return;
   }
   reader_.Read(&new_request_, this);
 }
@@ -360,7 +371,7 @@
   std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> stream_;
   ExecRequest exec_request_;
 
-  enum CallStatus { INITIATE_READ, FORK, FINISH_IN_ERROR };
+  enum class CallStatus { INITIATE_READ, FORK, FINISH_IN_ERROR };
   CallStatus status_;
 };
 
@@ -508,7 +519,7 @@
 template <class T>
 ExecCallData<T>::ExecCallData(GuestInteractionService::AsyncService* service,
                               grpc::ServerCompletionQueue* cq)
-    : service_(service), cq_(cq), status_(INITIATE_READ) {
+    : service_(service), cq_(cq), status_(CallStatus::INITIATE_READ) {
   // The ServerContext provides connection metadata to the streaming
   // reader/writer.  No context actually needs to be preserved, but
   // ExecCallData is just the entry point to the exec process.  Once the
@@ -525,92 +536,100 @@
 // first command, fork and hand off control to a dedicated reader/writer.
 template <class T>
 void ExecCallData<T>::Proceed(bool ok) {
-  if (status_ == INITIATE_READ) {
-    new ExecCallData(service_, cq_);
-    stream_->Read(&exec_request_, this);
-    status_ = FORK;
-  } else if (status_ != FORK) {
-    GPR_ASSERT(status_ == FINISH_IN_ERROR);
-    delete this;
-    return;
-  } else {
-    // Generate string forms of the argv and environment variables
-    std::vector<std::string> args = platform_interface_.ParseCommand(exec_request_.argv());
-    std::vector<std::string> env = CreateEnv(exec_request_);
-
-    // Repackage the string representations as C-strings
-    std::vector<char*> exec_args;
-    for (const std::string& arg : args) {
-      exec_args.push_back(const_cast<char*>(arg.c_str()));
-    }
-    exec_args.push_back(nullptr);
-
-    std::vector<char*> exec_env;
-    for (const std::string& env_pair : env) {
-      exec_env.push_back(const_cast<char*>(env_pair.c_str()));
-    }
-    exec_env.push_back(nullptr);
-
-    // Create the arguments to exec from the C-string vectors
-    char** args_ptr;
-    if (args.empty()) {
-      ExecResponse exec_response;
-      exec_response.clear_std_out();
-      exec_response.clear_std_err();
-      exec_response.clear_ret_code();
-      exec_response.set_status(OperationStatus::SERVER_EXEC_COMMAND_PARSE_FAILURE);
-
-      stream_->WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, this);
-      status_ = FINISH_IN_ERROR;
-
-      return;
-    }
-
-    args_ptr = exec_args.data();
-
-    char** env_ptr;
-    if (exec_env.empty()) {
-      env_ptr = nullptr;
-    } else {
-      env_ptr = exec_env.data();
-    }
-
-    // File descriptors to be populated when exec-ing
-    int32_t std_in;
-    int32_t std_out;
-    int32_t std_err;
-
-    // fork and exec
-    int32_t child_pid = platform_interface_.Exec(args_ptr, env_ptr, &std_in, &std_out, &std_err);
-
-    if (child_pid < 0) {
-      ExecResponse exec_response;
-      exec_response.clear_std_out();
-      exec_response.clear_std_err();
-      exec_response.clear_ret_code();
-      exec_response.set_status(OperationStatus::SERVER_EXEC_FORK_FAILURE);
-
-      stream_->WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, this);
-
-      platform_interface_.CloseFile(std_in);
-      platform_interface_.CloseFile(std_out);
-      platform_interface_.CloseFile(std_err);
-      status_ = FINISH_IN_ERROR;
-      return;
-    }
-    // Set read FD's to nonblocking mode.
-    platform_interface_.SetFileNonblocking(std_out);
-    platform_interface_.SetFileNonblocking(std_err);
-
-    // If the client specified any stdin, write that into the stdin FD.
-    platform_interface_.WriteFile(std_in, exec_request_.std_in().c_str(),
-                                  exec_request_.std_in().size());
-
-    new ExecReadCallData<T>(ctx_, stream_, child_pid, std_in);
-    new ExecWriteCallData<T>(ctx_, stream_, child_pid, std_out, std_err);
+  // State machine doesn't expect a not ok command queue event on this tag.
+  // Get rid of resource if that happens.
+  if (!ok) {
     delete this;
     return;
   }
+  switch (status_) {
+    case CallStatus::INITIATE_READ:
+      new ExecCallData(service_, cq_);
+      stream_->Read(&exec_request_, this);
+      status_ = CallStatus::FORK;
+      return;
+    case CallStatus::FINISH_IN_ERROR:
+      delete this;
+      return;
+    case CallStatus::FORK:
+      break;
+  }
+
+  // Generate string forms of the argv and environment variables
+  std::vector<std::string> args = platform_interface_.ParseCommand(exec_request_.argv());
+  std::vector<std::string> env = CreateEnv(exec_request_);
+
+  // Repackage the string representations as C-strings
+  std::vector<char*> exec_args;
+  for (const std::string& arg : args) {
+    exec_args.push_back(const_cast<char*>(arg.c_str()));
+  }
+  exec_args.push_back(nullptr);
+
+  std::vector<char*> exec_env;
+  for (const std::string& env_pair : env) {
+    exec_env.push_back(const_cast<char*>(env_pair.c_str()));
+  }
+  exec_env.push_back(nullptr);
+
+  // Create the arguments to exec from the C-string vectors
+  char** args_ptr;
+  if (args.empty()) {
+    ExecResponse exec_response;
+    exec_response.clear_std_out();
+    exec_response.clear_std_err();
+    exec_response.clear_ret_code();
+    exec_response.set_status(OperationStatus::SERVER_EXEC_COMMAND_PARSE_FAILURE);
+
+    stream_->WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, this);
+    status_ = CallStatus::FINISH_IN_ERROR;
+
+    return;
+  }
+
+  args_ptr = exec_args.data();
+
+  char** env_ptr;
+  if (exec_env.empty()) {
+    env_ptr = nullptr;
+  } else {
+    env_ptr = exec_env.data();
+  }
+
+  // File descriptors to be populated when exec-ing
+  int32_t std_in;
+  int32_t std_out;
+  int32_t std_err;
+
+  // fork and exec
+  int32_t child_pid = platform_interface_.Exec(args_ptr, env_ptr, &std_in, &std_out, &std_err);
+
+  if (child_pid < 0) {
+    ExecResponse exec_response;
+    exec_response.clear_std_out();
+    exec_response.clear_std_err();
+    exec_response.clear_ret_code();
+    exec_response.set_status(OperationStatus::SERVER_EXEC_FORK_FAILURE);
+
+    stream_->WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, this);
+
+    platform_interface_.CloseFile(std_in);
+    platform_interface_.CloseFile(std_out);
+    platform_interface_.CloseFile(std_err);
+    status_ = CallStatus::FINISH_IN_ERROR;
+    return;
+  }
+  // Set read FD's to nonblocking mode.
+  platform_interface_.SetFileNonblocking(std_out);
+  platform_interface_.SetFileNonblocking(std_err);
+
+  // If the client specified any stdin, write that into the stdin FD.
+  platform_interface_.WriteFile(std_in, exec_request_.std_in().c_str(),
+                                exec_request_.std_in().size());
+
+  new ExecReadCallData<T>(ctx_, stream_, child_pid, std_in);
+  new ExecWriteCallData<T>(ctx_, stream_, child_pid, std_out, std_err);
+  delete this;
 }
 
 template <class T>
diff --git a/src/virtualization/lib/guest_interaction/test/client_operation_test.cc b/src/virtualization/lib/guest_interaction/test/client_operation_test.cc
index b348302..f1031de 100644
--- a/src/virtualization/lib/guest_interaction/test/client_operation_test.cc
+++ b/src/virtualization/lib/guest_interaction/test/client_operation_test.cc
@@ -7,6 +7,7 @@
 
 #include <gtest/gtest.h>
 
+#include "src/lib/testing/predicates/status.h"
 #include "src/virtualization/lib/guest_interaction/client/client_operation_state.h"
 #include "src/virtualization/lib/guest_interaction/test/operation_test_lib.h"
 
@@ -22,18 +23,13 @@
 // 6. Server immediately hangs up on client at start of transfer.
 
 TEST_F(AsyncEndToEndTest, GetMissingFile) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Create a service that can accept incoming Get requests.
   GetRequest incoming_request;
   grpc::ServerContext srv_ctx;
   grpc::ServerAsyncWriter<GetResponse> response_writer(&srv_ctx);
 
   service_->RequestGet(&srv_ctx, &incoming_request, &response_writer, server_cq_.get(),
-                       server_cq_.get(), this);
+                       server_cq_.get(), &srv_ctx);
 
   // Create components required to perform a client Get request.
   zx_status_t operation_status = ZX_OK;
@@ -48,48 +44,44 @@
       stub_->AsyncGet(&(client_call_data->ctx_), get_request, client_cq_.get(), client_call_data);
 
   // Wait for the request to go out.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server CompletionQueue should get the client request.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
 
   GetResponse outgoing_response;
   outgoing_response.clear_data();
   outgoing_response.set_status(OperationStatus::SERVER_MISSING_FILE_FAILURE);
-  response_writer.Write(outgoing_response, nullptr);
+  response_writer.Write(outgoing_response, &outgoing_response);
 
   // Client should get the server's message and then wait for the server to
   // call Finish.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server finishes.
-  server_cq_->Next(&tag, &cq_status);
-  response_writer.Finish(grpc::Status::OK, nullptr);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &outgoing_response, true);
+  response_writer.Finish(grpc::Status::OK, &outgoing_response);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &outgoing_response, true);
 
   // Client gets final status from server, runs the callback, and then
   // deletes itself.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // The client sets the operation status in the callback.
-  ASSERT_EQ(operation_status, ZX_ERR_NOT_FOUND);
+  ASSERT_STATUS(operation_status, ZX_ERR_NOT_FOUND);
 }
 
 TEST_F(AsyncEndToEndTest, SmallFile) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Create a service that can accept incoming Get requests.
   GetRequest incoming_request;
   grpc::ServerContext srv_ctx;
   grpc::ServerAsyncWriter<GetResponse> response_writer(&srv_ctx);
 
   service_->RequestGet(&srv_ctx, &incoming_request, &response_writer, server_cq_.get(),
-                       server_cq_.get(), this);
+                       server_cq_.get(), &srv_ctx);
 
   // Create components required to perform a client Get request.
   zx_status_t operation_status = ZX_ERR_PEER_CLOSED;
@@ -103,8 +95,8 @@
       stub_->AsyncGet(&(client_call_data->ctx_), get_request, client_cq_.get(), client_call_data);
 
   // Wait for the request to go out.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // The mock will notify the client that all writes are successful.
   client_call_data->platform_interface_.SetOpenFileReturn(1);
@@ -112,48 +104,44 @@
 
   // Server CompletionQueue should get the client request.
   // Send back a short message.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
 
   GetResponse outgoing_response;
   outgoing_response.set_data("Small file contents");
   outgoing_response.set_status(OperationStatus::OK);
-  response_writer.Write(outgoing_response, nullptr);
+  response_writer.Write(outgoing_response, &response_writer);
 
   // Client should get the server's message and then wait for the server to
   // send more data.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server finishes, indicating that there is no more data.
-  server_cq_->Next(&tag, &cq_status);
-  response_writer.Finish(grpc::Status::OK, nullptr);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
+  response_writer.Finish(grpc::Status::OK, &response_writer);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
 
   // Client should get a bad status from the queue and then wait for the query
   // of the finish status.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, false);
+  client_call_data->Proceed(false);
 
   // Client gets final status, runs the callback, and then deletes itself.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // The client sets the operation status in the callback.
-  ASSERT_EQ(operation_status, ZX_OK);
+  ASSERT_OK(operation_status);
 }
 
 TEST_F(AsyncEndToEndTest, LargeFile) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Create a service that can accept incoming Get requests.
   GetRequest incoming_request;
   grpc::ServerContext srv_ctx;
   grpc::ServerAsyncWriter<GetResponse> response_writer(&srv_ctx);
 
   service_->RequestGet(&srv_ctx, &incoming_request, &response_writer, server_cq_.get(),
-                       server_cq_.get(), this);
+                       server_cq_.get(), &response_writer);
 
   // Create components required to perform a client Get request.
   zx_status_t operation_status = ZX_ERR_PEER_CLOSED;
@@ -167,8 +155,8 @@
       stub_->AsyncGet(&(client_call_data->ctx_), get_request, client_cq_.get(), client_call_data);
 
   // Wait for the request to go out.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // The mock will notify the client that all writes are successful.
   client_call_data->platform_interface_.SetOpenFileReturn(1);
@@ -176,61 +164,57 @@
 
   // Server CompletionQueue should get the client request.
   // Send back a short message.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
 
   GetResponse outgoing_response;
   outgoing_response.set_data("large file contents");
   outgoing_response.set_status(OperationStatus::OK);
-  response_writer.Write(outgoing_response, nullptr);
+  response_writer.Write(outgoing_response, &response_writer);
 
   // Client should get the server's message and then wait for the server to
   // send more data.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server CompletionQueue should get the client request.
   // Send back a short message.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
 
   outgoing_response.set_data("large file contents");
   outgoing_response.set_status(OperationStatus::OK);
-  response_writer.Write(outgoing_response, nullptr);
+  response_writer.Write(outgoing_response, &response_writer);
 
   // Client should get the server's message and then wait for the server to
   // send more data.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server finishes, indicating that there is no more data.
-  server_cq_->Next(&tag, &cq_status);
-  response_writer.Finish(grpc::Status::OK, nullptr);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
+  response_writer.Finish(grpc::Status::OK, &response_writer);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
 
   // Client should get a bad status from the queue and then wait for the query
   // the finish status.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, false);
+  client_call_data->Proceed(false);
 
   // Client gets final status, runs the callback, and then deletes itself.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // The client sets the operation status in the callback.
-  ASSERT_EQ(operation_status, ZX_OK);
+  ASSERT_OK(operation_status);
 }
 
 TEST_F(AsyncEndToEndTest, BrokenWrite) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Create a service that can accept incoming Get requests.
   GetRequest incoming_request;
   grpc::ServerContext srv_ctx;
   grpc::ServerAsyncWriter<GetResponse> response_writer(&srv_ctx);
 
   service_->RequestGet(&srv_ctx, &incoming_request, &response_writer, server_cq_.get(),
-                       server_cq_.get(), this);
+                       server_cq_.get(), &response_writer);
 
   // Create components required to perform a client Get request.
   zx_status_t operation_status = ZX_ERR_PEER_CLOSED;
@@ -244,8 +228,8 @@
       stub_->AsyncGet(&(client_call_data->ctx_), get_request, client_cq_.get(), client_call_data);
 
   // Wait for the request to go out.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // The mock will notify the client that write has failed.
   client_call_data->platform_interface_.SetOpenFileReturn(1);
@@ -254,42 +238,38 @@
 
   // Server CompletionQueue should get the client request.
   // Send back a short message.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
 
   GetResponse outgoing_response;
   outgoing_response.set_data("Small file contents");
   outgoing_response.set_status(OperationStatus::OK);
-  response_writer.Write(outgoing_response, nullptr);
+  response_writer.Write(outgoing_response, &response_writer);
 
   // Client should get the server's message, fail to write, and then finish.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server finishes, indicating that there is no more data.
-  server_cq_->Next(&tag, &cq_status);
-  response_writer.Finish(grpc::Status::OK, nullptr);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
+  response_writer.Finish(grpc::Status::OK, &response_writer);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
 
   // Client finishes and deletes itself.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // The client sets the operation status in the callback.
-  ASSERT_EQ(operation_status, ZX_ERR_IO);
+  ASSERT_STATUS(operation_status, ZX_ERR_IO);
 }
 
 TEST_F(AsyncEndToEndTest, GrpcFailure) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Create a service that can accept incoming Get requests.
   GetRequest incoming_request;
   grpc::ServerContext srv_ctx;
   grpc::ServerAsyncWriter<GetResponse> response_writer(&srv_ctx);
 
   service_->RequestGet(&srv_ctx, &incoming_request, &response_writer, server_cq_.get(),
-                       server_cq_.get(), this);
+                       server_cq_.get(), &response_writer);
 
   // Create components required to perform a client Get request.
   zx_status_t operation_status = ZX_OK;
@@ -304,19 +284,20 @@
 
   // Wait for the request to go out and then tell the client that it was
   // unsuccessful.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
   client_call_data->Proceed(false);
 
   // Server finishes, indicating that there is no more data.
-  server_cq_->Next(&tag, &cq_status);
-  response_writer.Finish(grpc::Status::OK, nullptr);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
+  response_writer.Finish(grpc::Status::OK, &response_writer);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
 
   // Client finishes and deletes itself.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // The client sets the operation status in the callback.
-  ASSERT_EQ(operation_status, ZX_ERR_PEER_CLOSED);
+  ASSERT_STATUS(operation_status, ZX_ERR_PEER_CLOSED);
 }
 
 // Client Put State Machine Test Cases
@@ -327,17 +308,12 @@
 // 4. gRPC fails while the client is transferring the file.
 
 TEST_F(AsyncEndToEndTest, PutReadFails) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Create a service that can accept incoming Get requests.
   zx_status_t operation_status = ZX_OK;
   grpc::ServerContext srv_ctx;
   grpc::ServerAsyncReader<PutResponse, PutRequest> request_reader(&srv_ctx);
 
-  service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), this);
+  service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), &srv_ctx);
 
   // Create components required to perform a client Put request.
   int32_t fake_fd = 0;
@@ -350,9 +326,9 @@
                       client_call_data);
 
   // Server should get the client request.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
   PutRequest put_request;
-  request_reader.Read(&put_request, nullptr);
+  request_reader.Read(&put_request, &request_reader);
 
   // Set the mock up to inform the client that the source file exists but
   // fails to open.
@@ -361,40 +337,35 @@
   client_call_data->platform_interface_.SetReadFileReturn(-1);
 
   // Wait for the request to go out.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server CompletionQueue should get the client's finish message.
-  server_cq_->Next(&tag, &cq_status);
-  ASSERT_FALSE(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, false);
 
   PutResponse put_response;
   put_response.set_status(OperationStatus::OK);
-  request_reader.Finish(put_response, grpc::Status::OK, nullptr);
+  request_reader.Finish(put_response, grpc::Status::OK, &request_reader);
 
   // Client should get the server's finish message and delete itself.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
 
   // The client sets the operation status in the callback.
-  ASSERT_EQ(operation_status, ZX_ERR_IO);
+  ASSERT_STATUS(operation_status, ZX_ERR_IO);
 }
 
 TEST_F(AsyncEndToEndTest, PutOneFragment) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Create a service that can accept incoming Get requests.
   zx_status_t operation_status = ZX_ERR_IO;
   grpc::ServerContext srv_ctx;
   grpc::ServerAsyncReader<PutResponse, PutRequest> request_reader(&srv_ctx);
 
-  service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), this);
+  service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), &srv_ctx);
 
   // Create components required to perform a client Put request.
   int32_t fake_fd = 0;
@@ -407,9 +378,9 @@
                       client_call_data);
 
   // Server should get the client request.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
   PutRequest put_request;
-  request_reader.Read(&put_request, nullptr);
+  request_reader.Read(&put_request, &request_reader);
 
   // Set the mock up to inform the client that the source file exists but
   // fails to open.
@@ -418,50 +389,44 @@
   client_call_data->platform_interface_.SetReadFileContents("test");
 
   // Wait for the request to go out.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server CompletionQueue should get the client's message and request another
   // file fragment.
-  server_cq_->Next(&tag, &cq_status);
-  ASSERT_TRUE(cq_status);
-  request_reader.Read(&put_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
+  request_reader.Read(&put_request, &request_reader);
 
   // Client hits the end of the file and finishes.
   client_call_data->platform_interface_.SetReadFileContents("");
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server gets the finish and finishes with the client.
-  server_cq_->Next(&tag, &cq_status);
-  ASSERT_FALSE(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, false);
   PutResponse put_response;
   put_response.set_status(OperationStatus::OK);
-  request_reader.Finish(put_response, grpc::Status::OK, nullptr);
+  request_reader.Finish(put_response, grpc::Status::OK, &request_reader);
 
   // Client should get the server's finish message and delete itself.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
 
   // The client sets the operation status in the callback.
-  ASSERT_EQ(operation_status, ZX_OK);
+  ASSERT_OK(operation_status);
 }
 
 TEST_F(AsyncEndToEndTest, PutMultipleFragments) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Create a service that can accept incoming Get requests.
   zx_status_t operation_status = ZX_ERR_IO;
   grpc::ServerContext srv_ctx;
   grpc::ServerAsyncReader<PutResponse, PutRequest> request_reader(&srv_ctx);
 
-  service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), this);
+  service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), &srv_ctx);
 
   // Create components required to perform a client Put request.
   int32_t fake_fd = 0;
@@ -474,9 +439,9 @@
                       client_call_data);
 
   // Server should get the client request.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
   PutRequest put_request;
-  request_reader.Read(&put_request, nullptr);
+  request_reader.Read(&put_request, &request_reader);
 
   // Set the mock up to inform the client that the source file exists but
   // fails to open.
@@ -485,60 +450,53 @@
   client_call_data->platform_interface_.SetReadFileContents("test");
 
   // Wait for the request to go out.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server CompletionQueue should get the client's message and request another
   // file fragment.
-  server_cq_->Next(&tag, &cq_status);
-  ASSERT_TRUE(cq_status);
-  request_reader.Read(&put_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
+  request_reader.Read(&put_request, &request_reader);
 
   // Send a second file fragment.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server CompletionQueue should get the client's message and request another
   // file fragment.
-  server_cq_->Next(&tag, &cq_status);
-  ASSERT_TRUE(cq_status);
-  request_reader.Read(&put_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
+  request_reader.Read(&put_request, &request_reader);
 
   // Client hits the end of the file and writes done.
   client_call_data->platform_interface_.SetReadFileContents("");
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server gets the finish and finishes with the client.
-  server_cq_->Next(&tag, &cq_status);
-  ASSERT_FALSE(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, false);
   PutResponse put_response;
   put_response.set_status(OperationStatus::OK);
-  request_reader.Finish(put_response, grpc::Status::OK, nullptr);
+  request_reader.Finish(put_response, grpc::Status::OK, &request_reader);
 
   // Client should get the server's finish message and delete itself.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
 
   // The client sets the operation status in the callback.
-  ASSERT_EQ(operation_status, ZX_OK);
+  ASSERT_OK(operation_status);
 }
 
 TEST_F(AsyncEndToEndTest, PutGrpcFailure) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Create a service that can accept incoming Get requests.
   zx_status_t operation_status = ZX_OK;
   grpc::ServerContext srv_ctx;
   grpc::ServerAsyncReader<PutResponse, PutRequest> request_reader(&srv_ctx);
 
-  service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), this);
+  service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), &srv_ctx);
 
   // Create components required to perform a client Put request.
   int32_t fake_fd = 0;
@@ -551,9 +509,9 @@
                       client_call_data);
 
   // Server should get the client request.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
   PutRequest put_request;
-  request_reader.Read(&put_request, nullptr);
+  request_reader.Read(&put_request, &request_reader);
 
   // Set the mock up to inform the client that the source file exists but
   // fails to open.
@@ -562,19 +520,21 @@
   client_call_data->platform_interface_.SetReadFileContents("test");
 
   // Wait for the request to go out.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server CompletionQueue should get the client's message.
-  server_cq_->Next(&tag, &cq_status);
-  request_reader.Read(&put_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
+  request_reader.Read(&put_request, &request_reader);
 
   // Inject a gRPC failure into the client procedure.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
   client_call_data->Proceed(false);
 
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, false);
+
   // The client sets the operation status in the callback.
-  ASSERT_EQ(operation_status, ZX_ERR_PEER_CLOSED);
+  ASSERT_STATUS(operation_status, ZX_ERR_PEER_CLOSED);
 }
 
 // Client Exec State Machine Tests
@@ -585,25 +545,24 @@
 // 3. Server sends stdout/stderr and then terminates the transfer.
 
 TEST_F(AsyncEndToEndTest, Client_Exec_ImmediateFailure) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Create a service that can accept incoming Exec requests.
-  zx_status_t operation_status = ZX_OK;
-  zx_status_t termination_status = ZX_OK;
+  bool operation_status_done = false;
+  bool termination_status_done = false;
   grpc::ServerContext srv_ctx;
   grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest> rw(&srv_ctx);
 
-  service_->RequestExec(&srv_ctx, &rw, server_cq_.get(), server_cq_.get(), this);
+  service_->RequestExec(&srv_ctx, &rw, server_cq_.get(), server_cq_.get(), &srv_ctx);
 
   fuchsia::netemul::guest::CommandListenerPtr listener;
-  listener.events().OnStarted = [&operation_status](zx_status_t status) {
-    operation_status = status;
+  listener.events().OnStarted = [&operation_status_done](zx_status_t status) {
+    operation_status_done = true;
+    EXPECT_STATUS(status, ZX_ERR_INTERNAL);
+    operation_status_done = status;
   };
-  listener.events().OnTerminated = [&termination_status](zx_status_t status, int32_t exit_code) {
-    termination_status = status;
+  listener.events().OnTerminated = [&termination_status_done](zx_status_t status,
+                                                              int32_t exit_code) {
+    EXPECT_STATUS(status, ZX_ERR_PEER_CLOSED);
+    termination_status_done = true;
   };
   std::unique_ptr<ListenerInterface> listener_interface =
       std::make_unique<ListenerInterface>(listener.NewRequest());
@@ -617,72 +576,69 @@
       stub_->AsyncExec(client_call_data->ctx_.get(), client_cq_.get(), client_call_data);
 
   // Server should get the new stub request.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
 
   // Inject a failure into the client.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
   client_call_data->Proceed(false);
 
   // The client sets the operation status in the callback.
-  WaitForCallback(&operation_status, ZX_ERR_INTERNAL);
-  WaitForCallback(&termination_status, ZX_ERR_PEER_CLOSED);
+  RunLoopUntil([&operation_status_done, &termination_status_done]() {
+    return operation_status_done && termination_status_done;
+  });
 }
 
 TEST_F(AsyncEndToEndTest, Client_ExecWrite_Test) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Create a service that can accept incoming Exec requests.
   grpc::ServerContext srv_ctx;
   std::shared_ptr<grpc::ClientContext> cli_ctx = std::make_shared<grpc::ClientContext>();
   grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest> srv_rw(&srv_ctx);
   std::shared_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
 
-  service_->RequestExec(&srv_ctx, &srv_rw, server_cq_.get(), server_cq_.get(), this);
+  service_->RequestExec(&srv_ctx, &srv_rw, server_cq_.get(), server_cq_.get(), &srv_ctx);
 
   // Create components required to perform a client Exec request.
   std::string empty_argv = "echo hello";
   std::vector<ExecEnv> empty_env;
   ExecWriteCallData<FakePlatform>* client_call_data;
-  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), &client_call_data);
+  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
 
   // Clear the initial event that is generated by the stub creation.  This
   // would normally be handled by the top-level ExecCallData.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
 
   client_call_data = new ExecWriteCallData<FakePlatform>(empty_argv, empty_env, 0, cli_ctx, cli_rw);
 
   // Server should get the new stub request and begin reading.
   ExecRequest exec_request;
-  server_cq_->Next(&tag, &cq_status);
-  srv_rw.Read(&exec_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
+  srv_rw.Read(&exec_request, &exec_request);
 
   // Client should read successfully from stdin and send a message to the
   // server.
   client_call_data->platform_interface_.SetReadFileContents("test");
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server should continue reading.
-  server_cq_->Next(&tag, &cq_status);
-  srv_rw.Read(&exec_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &exec_request, true);
+  srv_rw.Read(&exec_request, &exec_request);
 
   // Client should hit end of file on stdin.
   client_call_data->platform_interface_.SetReadFileContents("");
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Server should finish.
-  server_cq_->Next(&tag, &cq_status);
-  srv_rw.Finish(grpc::Status::OK, nullptr);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &exec_request, true);
+  srv_rw.Finish(grpc::Status::OK, &exec_request);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &exec_request, true);
 
   // Client should get the finish message and delete itself.
   int32_t initial_use_count = cli_rw.use_count();
 
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   int32_t final_use_count = cli_rw.use_count();
 
@@ -693,19 +649,16 @@
 }
 
 TEST_F(AsyncEndToEndTest, Client_ExecRead_Test) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
+  constexpr int32_t kReturnCode = 1234;
 
   // Create a service that can accept incoming Exec requests.
-  zx_status_t operation_status = ZX_ERR_PEER_CLOSED;
-  int32_t return_code;
+  bool operation_status_done = false;
   fuchsia::netemul::guest::CommandListenerPtr listener;
-  listener.events().OnTerminated = [&operation_status, &return_code](zx_status_t status,
-                                                                     int32_t ret_code) {
-    operation_status = status;
-    return_code = ret_code;
+  listener.events().OnTerminated = [&operation_status_done, kReturnCode](zx_status_t status,
+                                                                         int32_t ret_code) {
+    operation_status_done = true;
+    EXPECT_OK(status);
+    EXPECT_EQ(ret_code, kReturnCode);
   };
   std::unique_ptr<ListenerInterface> listener_interface =
       std::make_unique<ListenerInterface>(listener.NewRequest());
@@ -715,44 +668,41 @@
   grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest> srv_rw(&srv_ctx);
   std::shared_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
 
-  service_->RequestExec(&srv_ctx, &srv_rw, server_cq_.get(), server_cq_.get(), this);
+  service_->RequestExec(&srv_ctx, &srv_rw, server_cq_.get(), server_cq_.get(), &srv_ctx);
 
   // Create components required to perform a client Exec request.
   ExecReadCallData<FakePlatform>* client_call_data;
-  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), &client_call_data);
+  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
 
   // Clear the inital event that is generated by the stub creation.  This would
   // normally be handled by the top-level ExecCallData.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
 
   client_call_data =
       new ExecReadCallData<FakePlatform>(0, 0, cli_ctx, cli_rw, std::move(listener_interface));
 
   // Server should get the new stub request and immediately finish.
-  server_cq_->Next(&tag, &cq_status);
-
-  int32_t ret_code = 1234;
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
 
   ExecResponse exec_response;
   exec_response.clear_std_out();
   exec_response.clear_std_err();
-  exec_response.set_ret_code(ret_code);
-  srv_rw.WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, nullptr);
+  exec_response.set_ret_code(kReturnCode);
+  srv_rw.WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, &exec_response);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, &exec_response, true);
 
   // Client should get the message.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // Client should get the finish message.
-  client_cq_->Next(&tag, &cq_status);
-  ASSERT_FALSE(cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, false);
+  client_call_data->Proceed(false);
 
   // Client should run the callback and clean up.
-  client_cq_->Next(&tag, &cq_status);
-  client_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+  client_call_data->Proceed(true);
 
   // The client sets the operation status in the callback.
-  WaitForCallback(&operation_status, ZX_OK);
-  ASSERT_EQ(return_code, ret_code);
+  RunLoopUntil([&operation_status_done]() { return operation_status_done; });
 }
diff --git a/src/virtualization/lib/guest_interaction/test/operation_test_lib.h b/src/virtualization/lib/guest_interaction/test/operation_test_lib.h
index 90f44f9..a214382 100644
--- a/src/virtualization/lib/guest_interaction/test/operation_test_lib.h
+++ b/src/virtualization/lib/guest_interaction/test/operation_test_lib.h
@@ -9,11 +9,15 @@
 #include <fcntl.h>
 #include <lib/async-loop/cpp/loop.h>
 #include <lib/async-loop/default.h>
+#include <lib/fit/function.h>
 #include <lib/syslog/cpp/macros.h>
+#include <lib/zx/clock.h>
 #include <netinet/in.h>
 #include <sys/socket.h>
+#include <zircon/status.h>
 
 #include <iostream>
+#include <memory>
 #include <thread>
 
 #include <grpc/support/log.h>
@@ -22,96 +26,66 @@
 
 #include <grpc++/grpc++.h>
 
-// Adapted from gRPC's async_end2end_test.cc
-class TestScenario {
- public:
-  TestScenario(bool non_block, bool inproc_stub, const grpc::string& creds_type,
-               const grpc::string& content)
-      : disable_blocking(non_block),
-        inproc(inproc_stub),
-        credentials_type(creds_type),
-        message_content(content) {}
-  void Log() const;
-  bool disable_blocking;
-  bool inproc;
-  const grpc::string credentials_type;
-  const grpc::string message_content;
-};
+// Helper macro used in tests to wait for next event on GRPC completion queues
+// and assert on the returned values.
+#define ASSERT_GRPC_CQ_NEXT(cq, expect_tag, expect_ok) \
+  {                                                    \
+    bool ok__;                                         \
+    void* tag__;                                       \
+    ASSERT_TRUE((cq)->Next(&tag__, &ok__));            \
+    ASSERT_EQ(tag__, static_cast<void*>(expect_tag));  \
+    ASSERT_EQ(ok__, expect_ok);                        \
+  }
 
-static std::ostream& operator<<(std::ostream& out, const TestScenario& scenario) {
-  return out << "TestScenario{disable_blocking=" << (scenario.disable_blocking ? "true" : "false")
-             << ", inproc=" << (scenario.inproc ? "true" : "false") << ", credentials='"
-             << scenario.credentials_type << "', message_size=" << scenario.message_content.size()
-             << "}";
-}
-
-void TestScenario::Log() const {
-  std::ostringstream out;
-  out << *this;
-  gpr_log(GPR_DEBUG, "%s", out.str().c_str());
-}
-
-class AsyncEndToEndTest : public testing::TestWithParam<TestScenario> {
+class AsyncEndToEndTest : public testing::Test {
  protected:
-  AsyncEndToEndTest() {}
-
-  void SetUp() override {
-    client_cq_ = std::make_unique<grpc::CompletionQueue>();
-    ASSERT_EQ(loop_.ResetQuit(), ZX_OK);
-    ASSERT_EQ(loop_.StartThread(), ZX_OK);
-
+  AsyncEndToEndTest()
+      : client_cq_(std::make_unique<grpc::CompletionQueue>()),
+        service_(std::make_unique<GuestInteractionService::AsyncService>()) {
     // Setup server
-    BuildAndStartServer();
-  }
-
-  void TearDown() override {
-    server_->Shutdown();
-
-    // The server shutdown calls shutdown on the server's CompletionQueue.  The
-    // client's CompletionQueue needs to be cleaned up manually.
-    void* ignored_tag;
-    bool ignored_ok;
-
-    client_cq_->Shutdown();
-    while (client_cq_->Next(&ignored_tag, &ignored_ok))
-      ;
-
-    stub_.reset();
-    loop_.Quit();
-    loop_.JoinThreads();
-  }
-
-  void BuildAndStartServer() {
     grpc::ServerBuilder builder;
-    service_.reset(new GuestInteractionService::AsyncService());
     builder.RegisterService(service_.get());
     server_cq_ = builder.AddCompletionQueue();
     server_ = builder.BuildAndStart();
-  }
 
-  void ResetStub() {
+    // Setup stub
     grpc::ChannelArguments args;
     std::shared_ptr<grpc::Channel> channel = server_->InProcessChannel(args);
     stub_ = GuestInteractionService::NewStub(channel);
   }
 
-  void WaitForCallback(zx_status_t* returned_status, zx_status_t desired_status) {
-    for (uint32_t i = 0; i < callback_wait_time_; i++) {
-      if (*returned_status == desired_status) {
-        break;
-      }
-      sleep(1);
+  void TearDown() override {
+    server_->Shutdown();
+    void* tag;
+    bool regular_event;
+
+    server_cq_->Shutdown();
+    while (server_cq_->Next(&tag, &regular_event)) {
+      static_cast<CallData*>(tag)->Proceed(false);
+      EXPECT_FALSE(regular_event) << "Unexpected remaining event in server cq";
     }
-    ASSERT_EQ(*returned_status, desired_status);
+
+    client_cq_->Shutdown();
+    while (client_cq_->Next(&tag, &regular_event)) {
+      EXPECT_FALSE(regular_event) << "Unexpected remaining event in client cq";
+    }
   }
 
+  void RunLoopUntil(fit::function<bool()> check) {
+    constexpr zx::duration kLoopStep = zx::msec(10);
+    while (!check()) {
+      zx_status_t status = loop_.Run(zx::deadline_after(kLoopStep), true);
+      ASSERT_TRUE(status == ZX_ERR_TIMED_OUT || status == ZX_OK)
+          << "Failed to run loop " << zx_status_get_string(status);
+    }
+  }
+
+  const std::unique_ptr<grpc::CompletionQueue> client_cq_;
+  const std::unique_ptr<GuestInteractionService::AsyncService> service_;
   std::unique_ptr<grpc::ServerCompletionQueue> server_cq_;
-  std::unique_ptr<grpc::CompletionQueue> client_cq_;
   std::unique_ptr<GuestInteractionService::Stub> stub_;
   std::unique_ptr<grpc::Server> server_;
-  std::unique_ptr<GuestInteractionService::AsyncService> service_;
   async::Loop loop_ = async::Loop(&kAsyncLoopConfigAttachToCurrentThread);
-  uint32_t callback_wait_time_ = 5;  // Number of seconds to wait for Exec fidl responses to run.
 };
 
 #endif  // SRC_VIRTUALIZATION_LIB_GUEST_INTERACTION_TEST_OPERATION_TEST_LIB_H_
diff --git a/src/virtualization/lib/guest_interaction/test/server_operation_test.cc b/src/virtualization/lib/guest_interaction/test/server_operation_test.cc
index 4d6ccac..d32a67d 100644
--- a/src/virtualization/lib/guest_interaction/test/server_operation_test.cc
+++ b/src/virtualization/lib/guest_interaction/test/server_operation_test.cc
@@ -21,11 +21,6 @@
 // 5. The file the server is reading from goes into a bad state.
 
 TEST_F(AsyncEndToEndTest, ServerMissingFile) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // The server's call to check if the requested file exists will return false.
   GetCallData<FakePlatform>* server_call_data =
       new GetCallData<FakePlatform>(service_.get(), server_cq_.get());
@@ -33,47 +28,44 @@
   server_call_data->platform_interface_.SetFileExistsReturn(false);
 
   // Create components required to perform a client Get request.
-  grpc::ClientContext client_ctx_;
+  grpc::ClientContext client_ctx;
   grpc::Status grpc_status;
   OperationStatus op_status;
   GetResponse get_response;
   GetRequest get_request;
   get_request.set_source("/some/bogus/path");
 
-  std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader_ =
-      stub_->AsyncGet(&client_ctx_, get_request, client_cq_.get(), nullptr);
+  std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
+      stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
 
   // Wait for the request to go out, and then request to read from the server.
-  client_cq_->Next(&tag, &cq_status);
-  reader_->Read(&get_response, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+  reader->Read(&get_response, reader.get());
 
   // Server CompletionQueue should get the client request and reply that the
   // requested file does not exist.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client should get the server's message and then wait for the server to
   // call Finish.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
   op_status = get_response.status();
   ASSERT_EQ(op_status, OperationStatus::SERVER_MISSING_FILE_FAILURE);
-  reader_->Finish(&grpc_status, nullptr);
+  reader->Finish(&grpc_status, reader.get());
 
   // Server finishes.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client gets final status from server.
-  client_cq_->Next(&tag, &cq_status);
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }
 
 TEST_F(AsyncEndToEndTest, ServerFileOpenFailure) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // The server's call to check if the requested file exists will return false.
   GetCallData<FakePlatform>* server_call_data =
       new GetCallData<FakePlatform>(service_.get(), server_cq_.get());
@@ -82,47 +74,44 @@
   server_call_data->platform_interface_.SetOpenFileReturn(-1);
 
   // Create components required to perform a client Get request.
-  grpc::ClientContext client_ctx_;
+  grpc::ClientContext client_ctx;
   grpc::Status grpc_status;
   OperationStatus op_status;
   GetResponse get_response;
   GetRequest get_request;
   get_request.set_source("/file/with/permissions/issues");
 
-  std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader_ =
-      stub_->AsyncGet(&client_ctx_, get_request, client_cq_.get(), nullptr);
+  std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
+      stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
 
   // Wait for the request to go out, and then request to read from the server.
-  client_cq_->Next(&tag, &cq_status);
-  reader_->Read(&get_response, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+  reader->Read(&get_response, reader.get());
 
   // Server CompletionQueue should get the client request and reply that the
   // requested file does not exist.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client should get the server's message and then wait for the server to
   // call Finish.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
   op_status = get_response.status();
   ASSERT_EQ(op_status, OperationStatus::SERVER_FILE_READ_FAILURE);
-  reader_->Finish(&grpc_status, nullptr);
+  reader->Finish(&grpc_status, reader.get());
 
   // Server finishes.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client gets final status from server.
-  client_cq_->Next(&tag, &cq_status);
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }
 
 TEST_F(AsyncEndToEndTest, ServerUnfragmentedRead) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // The server will be told that the file exists and that it was able to open
   // open it.  It will be notified that the file is not bad, and EOF will be
   // false the first time and then true the second.  The call to read from the
@@ -135,61 +124,57 @@
   server_call_data->platform_interface_.SetReadFileContents("test");
 
   // Create components required to perform a client Get request.
-  grpc::ClientContext client_ctx_;
+  grpc::ClientContext client_ctx;
   grpc::Status grpc_status;
   OperationStatus op_status;
   GetResponse get_response;
   GetRequest get_request;
   get_request.set_source("/some/test/file");
 
-  std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader_ =
-      stub_->AsyncGet(&client_ctx_, get_request, client_cq_.get(), nullptr);
+  std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
+      stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
 
   // Wait for the request to go out, and then request to read from the server.
-  client_cq_->Next(&tag, &cq_status);
-  reader_->Read(&get_response, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+  reader->Read(&get_response, reader.get());
 
   // Server CompletionQueue should get the client request, open the file, and
   // send back the first chunk of contents.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client should get the server's message and then wait for more data.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
   ASSERT_STREQ(get_response.data().c_str(), "test");
   op_status = get_response.status();
-  reader_->Read(&get_response, nullptr);
+  reader->Read(&get_response, reader.get());
 
   // The server should hit EOF and send an empty chunk of data back to the
   // client.
   server_call_data->platform_interface_.SetReadFileContents("");
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
   op_status = get_response.status();
-  reader_->Read(&get_response, nullptr);
+  reader->Read(&get_response, reader.get());
 
   // The server should then call Finish.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // The client read should fail and then it should finish.
-  client_cq_->Next(&tag, &cq_status);
-  ASSERT_FALSE(cq_status);
-  reader_->Finish(&grpc_status, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), false);
+  reader->Finish(&grpc_status, reader.get());
 
   // Client gets final status from server.
-  client_cq_->Next(&tag, &cq_status);
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }
 
 TEST_F(AsyncEndToEndTest, ServerFragmentedRead) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // The server will be told that the file exists and that it was able to open
   // open it.  It will be notified that the file is not bad, and EOF will be
   // false the first time and then true the second.  The call to read from the
@@ -202,70 +187,66 @@
   server_call_data->platform_interface_.SetReadFileContents("test");
 
   // Create components required to perform a client Get request.
-  grpc::ClientContext client_ctx_;
+  grpc::ClientContext client_ctx;
   grpc::Status grpc_status;
   OperationStatus op_status;
   GetResponse get_response;
   GetRequest get_request;
   get_request.set_source("/some/test/file");
 
-  std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader_ =
-      stub_->AsyncGet(&client_ctx_, get_request, client_cq_.get(), nullptr);
+  std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
+      stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
 
   // Wait for the request to go out, and then request to read from the server.
-  client_cq_->Next(&tag, &cq_status);
-  reader_->Read(&get_response, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+  reader->Read(&get_response, reader.get());
 
   // Server CompletionQueue should get the client request, open the file, and
   // send back the first chunk of contents.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client should get the server's message and then wait for more data.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
   ASSERT_EQ(get_response.data(), std::string("test"));
   op_status = get_response.status();
-  reader_->Read(&get_response, nullptr);
+  reader->Read(&get_response, reader.get());
 
   // Repeat the process.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
   ASSERT_EQ(get_response.data(), std::string("test"));
   op_status = get_response.status();
-  reader_->Read(&get_response, nullptr);
+  reader->Read(&get_response, reader.get());
 
   // The server should hit EOF and send an empty chunk of data back to the
   // client.
   server_call_data->platform_interface_.SetReadFileContents("");
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
   op_status = get_response.status();
-  reader_->Read(&get_response, nullptr);
+  reader->Read(&get_response, reader.get());
 
   // The server should then call Finish.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // The client read should fail and then it should finish.
-  client_cq_->Next(&tag, &cq_status);
-  ASSERT_FALSE(cq_status);
-  reader_->Finish(&grpc_status, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), false);
+  reader->Finish(&grpc_status, reader.get());
 
   // Client gets final status from server.
-  client_cq_->Next(&tag, &cq_status);
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }
 
 TEST_F(AsyncEndToEndTest, ServerBadFile) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // The server will be told that the file exists and that it was able to open
   // open it.  When the server goes to read from the file, it will be informed
   // that the file is bad.
@@ -277,44 +258,45 @@
   server_call_data->platform_interface_.SetReadFileReturn(-1);
 
   // Create components required to perform a client Get request.
-  grpc::ClientContext client_ctx_;
+  grpc::ClientContext client_ctx;
   grpc::Status grpc_status;
   OperationStatus op_status;
   GetResponse get_response;
   GetRequest get_request;
   get_request.set_source("/some/test/file");
 
-  std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader_ =
-      stub_->AsyncGet(&client_ctx_, get_request, client_cq_.get(), nullptr);
+  std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
+      stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
 
   // Wait for the request to go out and then request to read from the server.
-  client_cq_->Next(&tag, &cq_status);
-  reader_->Read(&get_response, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+  reader->Read(&get_response, reader.get());
 
   // Server CompletionQueue should get the client request, open the file, and
   // send back a response indicating the read failed.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client should get the server's message and then wait for more data.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
   op_status = get_response.status();
-  reader_->Read(&get_response, nullptr);
+  reader->Read(&get_response, reader.get());
 
   // The server should finish.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // The client will get back a bad status and finish.
-  client_cq_->Next(&tag, &cq_status);
-  ASSERT_FALSE(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), false);
   op_status = get_response.status();
   ASSERT_EQ(op_status, OperationStatus::SERVER_FILE_READ_FAILURE);
-  reader_->Finish(&grpc_status, nullptr);
+  reader->Finish(&grpc_status, reader.get());
 
   // Client gets final status from server.
-  client_cq_->Next(&tag, &cq_status);
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }
 
 // Server Put State Machine Test
@@ -327,11 +309,6 @@
 // 6. Client sends multiple file fragments.
 
 TEST_F(AsyncEndToEndTest, ServerPutDestinationIsDirectory) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // The server's call to check the destination will indicate that it is a
   // directory.
   PutCallData<FakePlatform>* server_call_data =
@@ -340,7 +317,7 @@
   server_call_data->platform_interface_.SetDirectoryExistsReturn(true);
 
   // Create components required to perform a client Put request.
-  grpc::ClientContext client_ctx_;
+  grpc::ClientContext client_ctx;
   grpc::Status grpc_status;
   OperationStatus op_status;
   PutRequest put_request;
@@ -348,42 +325,41 @@
   put_request.set_destination("/some/directory");
   put_request.clear_data();
 
-  std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer_ =
-      stub_->AsyncPut(&client_ctx_, &put_response, client_cq_.get(), nullptr);
+  std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
+      stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
 
   // Server should get initial client connection and begin reading.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Wait for the request to go out, and then try to write more data to the
   // server.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Write(put_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+  writer->Write(put_request, writer.get());
 
   // Server CompletionQueue should get the client request and reply that the
   // requested file cannot be created.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client will write the entire file even if the server is not listening.
   // Call Finish to collect the final status.
   // Bug: https://github.com/grpc/grpc/issues/14812
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Finish(&grpc_status, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+  writer->Finish(&grpc_status, writer.get());
 
   // Client gets final status from server.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
   op_status = put_response.status();
+
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
+
   ASSERT_EQ(op_status, OperationStatus::SERVER_CREATE_FILE_FAILURE);
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }
 
 TEST_F(AsyncEndToEndTest, ServerPutCreateDirectoryFailure) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // The server's call to check the destination will indicate that it is a
   // directory.
   PutCallData<FakePlatform>* server_call_data =
@@ -393,7 +369,7 @@
   server_call_data->platform_interface_.SetCreateDirectoryReturn(false);
 
   // Create components required to perform a client Put request.
-  grpc::ClientContext client_ctx_;
+  grpc::ClientContext client_ctx;
   grpc::Status grpc_status;
   OperationStatus op_status;
   PutRequest put_request;
@@ -401,41 +377,40 @@
   put_request.set_destination("/privilege/issues");
   put_request.clear_data();
 
-  std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer_ =
-      stub_->AsyncPut(&client_ctx_, &put_response, client_cq_.get(), nullptr);
+  std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
+      stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
 
   // Server CompletionQueue will notify that there is a new client stub.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Wait for the request to go out, and then try to write more data to the
   // server.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Write(put_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+  writer->Write(put_request, writer.get());
 
   // Server CompletionQueue should get the client request and reply that the
   // requested file cannot be created.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client should call finish to get final status messages for the transfer
   // and gRPC channel.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Finish(&grpc_status, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+  writer->Finish(&grpc_status, writer.get());
 
   // Client gets final status from server.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
   op_status = put_response.status();
+
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
+
   ASSERT_EQ(op_status, OperationStatus::SERVER_CREATE_FILE_FAILURE);
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }
 
 TEST_F(AsyncEndToEndTest, ServerPutCreateFileFailure) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // The server's call to check the destination will indicate that it is a
   // directory.
   PutCallData<FakePlatform>* server_call_data =
@@ -446,7 +421,7 @@
   server_call_data->platform_interface_.SetOpenFileReturn(-1);
 
   // Create components required to perform a client Put request.
-  grpc::ClientContext client_ctx_;
+  grpc::ClientContext client_ctx;
   grpc::Status grpc_status;
   OperationStatus op_status;
   PutRequest put_request;
@@ -454,41 +429,40 @@
   put_request.set_destination("/privilege/issues");
   put_request.clear_data();
 
-  std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer_ =
-      stub_->AsyncPut(&client_ctx_, &put_response, client_cq_.get(), nullptr);
+  std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
+      stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
 
   // Server CompletionQueue should notify that there is a new stub.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Wait for the request to go out, and then try to write more data to the
   // server.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Write(put_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+  writer->Write(put_request, writer.get());
 
   // Server CompletionQueue should get the client request and reply that the
   // requested file cannot be created.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client should call finish to get final status messages for the transfer
   // and gRPC channel.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Finish(&grpc_status, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+  writer->Finish(&grpc_status, writer.get());
 
   // Client gets final status from server.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
   op_status = put_response.status();
+
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
+
   ASSERT_EQ(op_status, OperationStatus::SERVER_FILE_WRITE_FAILURE);
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }
 
 TEST_F(AsyncEndToEndTest, ServerPutWriteFileFailure) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // The server's call to check the destination will indicate that it is a
   // directory.
   PutCallData<FakePlatform>* server_call_data =
@@ -500,7 +474,7 @@
   server_call_data->platform_interface_.SetWriteFileReturn(-1);
 
   // Create components required to perform a client Put request.
-  grpc::ClientContext client_ctx_;
+  grpc::ClientContext client_ctx;
   grpc::Status grpc_status;
   OperationStatus op_status;
   PutRequest put_request;
@@ -508,41 +482,40 @@
   put_request.set_destination("/write/fail/path");
   put_request.clear_data();
 
-  std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer_ =
-      stub_->AsyncPut(&client_ctx_, &put_response, client_cq_.get(), nullptr);
+  std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
+      stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
 
   // Server CompletionQueue should notify that there is a new stub.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Wait for the request to go out, and then try to write more data to the
   // server.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Write(put_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+  writer->Write(put_request, writer.get());
 
   // Server CompletionQueue should get the client request and reply that the
   // requested file cannot be written.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client should get the server's message and then call finish to get final
   // status messages for the transfer and gRPC channel.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Finish(&grpc_status, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+  writer->Finish(&grpc_status, writer.get());
 
   // Client gets final status from server.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
   op_status = put_response.status();
+
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
+
   ASSERT_EQ(op_status, OperationStatus::SERVER_FILE_WRITE_FAILURE);
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }
 
 TEST_F(AsyncEndToEndTest, ServerPutOneFragment) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // The server's call to check the destination will indicate that it is a
   // directory.
   PutCallData<FakePlatform>* server_call_data =
@@ -555,7 +528,7 @@
   server_call_data->platform_interface_.SetCloseFileReturn(0);
 
   // Create components required to perform a client Put request.
-  grpc::ClientContext client_ctx_;
+  grpc::ClientContext client_ctx;
   grpc::Status grpc_status;
   OperationStatus op_status;
   PutRequest put_request;
@@ -563,51 +536,49 @@
   put_request.set_destination("/destination/file");
   put_request.clear_data();
 
-  std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer_ =
-      stub_->AsyncPut(&client_ctx_, &put_response, client_cq_.get(), nullptr);
+  std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
+      stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
 
   // Server CompletionQueue should notify that there is a new stub.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Wait for the request to go out, and then try to write a fragment to the
   // server.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Write(put_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+  writer->Write(put_request, writer.get());
 
   // Server CompletionQueue should get the client request, write out the
   // contents, and wait for more data.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client should call WritesDone to indicate that the transfer is complete.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->WritesDone(nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+  writer->WritesDone(writer.get());
 
   // Server should get the final message from the client and report status
   // information.
-  server_cq_->Next(&tag, &cq_status);
-  ASSERT_FALSE(cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, false);
+  server_call_data->Proceed(false);
 
   // Client should get the server's message and then call finish to get final
   // status messages for the transfer and gRPC channel.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Finish(&grpc_status, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+  writer->Finish(&grpc_status, writer.get());
 
   // Client gets final status from server.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
   op_status = put_response.status();
+
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
+
   ASSERT_EQ(op_status, OperationStatus::OK);
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }
 
 TEST_F(AsyncEndToEndTest, ServerPutMultipleFragments) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // The server's call to check the destination will indicate that it is a
   // directory.
   PutCallData<FakePlatform>* server_call_data =
@@ -620,7 +591,7 @@
   server_call_data->platform_interface_.SetCloseFileReturn(0);
 
   // Create components required to perform a client Put request.
-  grpc::ClientContext client_ctx_;
+  grpc::ClientContext client_ctx;
   grpc::Status grpc_status;
   OperationStatus op_status;
   PutRequest put_request;
@@ -628,51 +599,55 @@
   put_request.set_destination("/destination/file");
   put_request.clear_data();
 
-  std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer_ =
-      stub_->AsyncPut(&client_ctx_, &put_response, client_cq_.get(), nullptr);
+  std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
+      stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
 
   // Server CompletionQueue gets new client stub.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Wait for the request to go out, and then try to write a fragment to the
   // server.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Write(put_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+  writer->Write(put_request, writer.get());
 
   // Server CompletionQueue should get the client request, write out the
   // contents, and wait for more data.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Write a second fragment to the server.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Write(put_request, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+  writer->Write(put_request, writer.get());
 
   // Server processes the second fragment.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client should get the server's message and then call WritesDone to
   // indicate that the transfer is complete.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->WritesDone(nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+  writer->WritesDone(writer.get());
 
   // Server should get the final message from the client and report status
   // information.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, false);
+  server_call_data->Proceed(false);
 
   // Client should get the server's message and then call finish to get final
   // status information.
-  client_cq_->Next(&tag, &cq_status);
-  writer_->Finish(&grpc_status, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+  writer->Finish(&grpc_status, writer.get());
 
   // Client gets final status from server.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
   op_status = put_response.status();
+
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
+
   ASSERT_EQ(op_status, OperationStatus::OK);
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }
 
 // Server Exec State Machine Test Cases
@@ -683,11 +658,6 @@
 // 5. Exec Write sends stdin/stderr to client until subprocess exits.
 
 TEST_F(AsyncEndToEndTest, Server_Exec_ForkFail) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // The server's call to Exec should fail.
   ExecCallData<FakePlatform>* server_call_data =
       new ExecCallData<FakePlatform>(service_.get(), server_cq_.get());
@@ -697,7 +667,7 @@
   std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
   std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
 
-  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), nullptr);
+  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
 
   // Queue up a read to be populated eventually when the server finishes.  Use
   // the address of the exec response so we can enforce that the read operation
@@ -707,63 +677,61 @@
 
   // Server should get the new stub request and issue a read.
   server_call_data->platform_interface_.SetParseCommandReturn({"echo", "hello"});
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client should be notified that its stub has been created.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
 
   ExecRequest exec_request;
   exec_request.clear_argv();
   exec_request.clear_env_vars();
   exec_request.clear_std_in();
-  cli_rw->Write(exec_request, nullptr);
+  cli_rw->Write(exec_request, &exec_request);
 
   // Server should get the write first
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Flush the write operation completion notification.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
 
   // Ensure that the client queue has a new entry and that it is the response
   // to the read request.
-  client_cq_->Next(&tag, &cq_status);
-  ASSERT_EQ(tag, &exec_response);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_response, true);
 
   // The client calls Finish to get final status.
   grpc::Status grpc_status;
-  cli_rw->Finish(&grpc_status, nullptr);
+  cli_rw->Finish(&grpc_status, cli_rw.get());
 
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, cli_rw.get(), true);
 
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
+
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
   ASSERT_EQ(exec_response.status(), OperationStatus::SERVER_EXEC_FORK_FAILURE);
 }
 
 TEST_F(AsyncEndToEndTest, Server_ExecRead_StdinEof) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Exec service boilerplate.
   std::shared_ptr<grpc::ServerContext> srv_ctx = std::make_shared<grpc::ServerContext>();
   std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> srv_rw =
       std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(srv_ctx.get());
 
-  service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(), this);
+  service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(),
+                        srv_ctx.get());
 
   // Client creates a new stub.
   std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
   std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
 
-  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), nullptr);
+  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
 
   // The read side of the server exec routine should see that the subprocess
   // is still alive, but should fail to write to its stdin and then delete
   // itself.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, srv_ctx.get(), true);
 
   ExecReadCallData<FakePlatform>* server_call_data =
       new ExecReadCallData<FakePlatform>(srv_ctx, srv_rw, 0, 0);
@@ -771,49 +739,45 @@
   server_call_data->platform_interface_.SetWriteFileReturn(-1);
 
   // Client sends something to subprocess stdin.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
 
   ExecRequest exec_request;
   exec_request.clear_argv();
   exec_request.clear_env_vars();
   exec_request.clear_std_in();
 
-  cli_rw->Write(exec_request, nullptr);
-  client_cq_->Next(&tag, &cq_status);
+  cli_rw->Write(exec_request, &exec_request);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
 
   // The server will get the request, attempt to write to the subprocess stdin,
   // fail, and delete itself.  The only indication of the failure will be that
   // the reference count to the ServerAsyncReaderWriter decrements.
   uint32_t initial_use_count = srv_rw.use_count();
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
   uint32_t final_use_count = srv_rw.use_count();
 
   ASSERT_LT(final_use_count, initial_use_count);
 }
 
 TEST_F(AsyncEndToEndTest, Server_ExecRead_ClientDone) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Exec service boilerplate.
   std::shared_ptr<grpc::ServerContext> srv_ctx = std::make_shared<grpc::ServerContext>();
   std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> srv_rw =
       std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(srv_ctx.get());
 
-  service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(), this);
+  service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(),
+                        srv_ctx.get());
 
   // Client creates a new stub.
   std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
   std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
 
-  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), nullptr);
+  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
 
   // The read side of the server exec routine should see that the subprocess
   // is still alive and succeed in writing to it.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, srv_ctx.get(), true);
 
   ExecReadCallData<FakePlatform>* server_call_data =
       new ExecReadCallData<FakePlatform>(srv_ctx, srv_rw, 0, 0);
@@ -821,31 +785,32 @@
   server_call_data->platform_interface_.SetWriteFileReturn(1);
 
   // Client sends something to subprocess stdin.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
 
   ExecRequest exec_request;
   exec_request.clear_argv();
   exec_request.clear_env_vars();
   exec_request.clear_std_in();
 
-  cli_rw->Write(exec_request, nullptr);
+  cli_rw->Write(exec_request, &exec_request);
 
   // Server writes it into the subprocess.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // Client indicates that it is done writing.
-  client_cq_->Next(&tag, &cq_status);
-  cli_rw->WritesDone(nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
+  cli_rw->WritesDone(&exec_request);
 
   // The server will get a false status from the completion queue and delete
   // itself.  The only indication of the failure will be that the reference
   // count to the ServerAsyncReaderWriter decrements.
   uint32_t initial_use_count = srv_rw.use_count();
 
-  server_cq_->Next(&tag, &cq_status);
-  ASSERT_FALSE(cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, false);
+  server_call_data->Proceed(false);
+
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
 
   uint32_t final_use_count = srv_rw.use_count();
 
@@ -853,117 +818,108 @@
 }
 
 TEST_F(AsyncEndToEndTest, Server_ExecRead_SubprocessExits) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Exec service boilerplate.
   std::shared_ptr<grpc::ServerContext> srv_ctx = std::make_shared<grpc::ServerContext>();
   std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> srv_rw =
       std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(srv_ctx.get());
 
-  service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(), this);
+  service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(),
+                        srv_ctx.get());
 
   // Client creates a new stub.
   std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
   std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
 
-  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), nullptr);
+  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
 
   // The read side of the server exec routine should see that the subprocess
   // has exited.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, srv_ctx.get(), true);
 
   ExecReadCallData<FakePlatform>* server_call_data =
       new ExecReadCallData<FakePlatform>(srv_ctx, srv_rw, 0, 0);
   server_call_data->platform_interface_.SetKillPidReturn(-1);
 
   // Client sends something to subprocess stdin.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
 
   ExecRequest exec_request;
   exec_request.clear_argv();
   exec_request.clear_env_vars();
   exec_request.clear_std_in();
 
-  cli_rw->Write(exec_request, nullptr);
+  cli_rw->Write(exec_request, &exec_request);
 
   // The server will get the request, realize the subprocess has exited, and
   // delete itself.  The only indication of the failure will be that the
   // reference count to the ServerAsyncReaderWriter decrements.
   uint32_t initial_use_count = srv_rw.use_count();
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
   uint32_t final_use_count = srv_rw.use_count();
 
   ASSERT_LT(final_use_count, initial_use_count);
 
   // Issue a finish on behalf of the server.
-  srv_rw->Finish(grpc::Status::OK, nullptr);
-  server_cq_->Next(&tag, &cq_status);
+  srv_rw->Finish(grpc::Status::OK, srv_rw.get());
+  ASSERT_GRPC_CQ_NEXT(server_cq_, srv_rw.get(), true);
 
   // Cleanup the client reader-writer.
-  client_cq_->Next(&tag, &cq_status);
-  cli_rw->WritesDone(nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
+  cli_rw->WritesDone(&exec_request);
 
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
   grpc::Status grpc_status;
-  cli_rw->Finish(&grpc_status, nullptr);
+  cli_rw->Finish(&grpc_status, &exec_request);
 
-  client_cq_->Next(&tag, &cq_status);
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }
 
 TEST_F(AsyncEndToEndTest, Server_ExecWrite_WriteUntilChildExits) {
-  ResetStub();
-  // Accounting bits for managing CompletionQueue state.
-  void* tag;
-  bool cq_status;
-
   // Exec service boilerplate.
   std::shared_ptr<grpc::ServerContext> srv_ctx = std::make_shared<grpc::ServerContext>();
   std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> srv_rw =
       std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(srv_ctx.get());
 
-  service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(), this);
+  service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(),
+                        srv_ctx.get());
 
   // Client creates a new stub.
   std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
   std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
 
-  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), nullptr);
+  cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
 
   // The write side of the server exec routine will poll the child pid and see
   // that it has exited.
-  server_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, srv_ctx.get(), true);
 
   ExecWriteCallData<FakePlatform>* server_call_data =
       new ExecWriteCallData<FakePlatform>(srv_ctx, srv_rw, 0, 0, 0);
 
   // Client reads from the server.
-  client_cq_->Next(&tag, &cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
 
   ExecResponse exec_response;
-  cli_rw->Read(&exec_response, nullptr);
+  cli_rw->Read(&exec_response, &exec_response);
 
   // The server will finish and delete itself.
-  server_cq_->Next(&tag, &cq_status);
-  server_call_data->Proceed(cq_status);
+  ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+  server_call_data->Proceed(true);
 
   // The client will get the initial server write and issue another read.
-  client_cq_->Next(&tag, &cq_status);
-  cli_rw->Read(&exec_response, nullptr);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_response, true);
+  cli_rw->Read(&exec_response, &exec_response);
 
   // The client should see that the server has finished and request the finish
   // status.
-  client_cq_->Next(&tag, &cq_status);
-  ASSERT_FALSE(cq_status);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_response, false);
 
   grpc::Status grpc_status;
 
-  cli_rw->Finish(&grpc_status, nullptr);
-  client_cq_->Next(&tag, &cq_status);
+  cli_rw->Finish(&grpc_status, &exec_response);
+  ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_response, true);
 
-  ASSERT_TRUE(grpc_status.ok());
+  ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
 }