[virtualization] cleanup *_operation_test
In both client_operation_test and server_operation_test:
- Fix numerous leaks in test teardown.
- Improve error messages throughout.
- Replace sleep with async loop wait.
- Remove unnecessary auxiliary loop thread.
- Add stronger assertions on all GRPC completion queue operations.
- Update tests to run on qemu environments since they don't require
virtualization.
Fixed: 59639, 48556, 48557, 67086
Bug: 47487
Change-Id: Idff30a0182a3c9c6ecc65fabde35efe0b26b3706
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/473874
Commit-Queue: Bruno Dal Bo <brunodalbo@google.com>
Reviewed-by: Abdulla Kamar <abdulla@google.com>
Reviewed-by: Tamir Duberstein <tamird@google.com>
diff --git a/src/virtualization/lib/guest_interaction/BUILD.gn b/src/virtualization/lib/guest_interaction/BUILD.gn
index 37eb4b1..d0205a9 100644
--- a/src/virtualization/lib/guest_interaction/BUILD.gn
+++ b/src/virtualization/lib/guest_interaction/BUILD.gn
@@ -126,10 +126,14 @@
"//sdk/lib/sys/cpp",
"//src/lib/fxl",
"//src/lib/fxl/test:gtest_main",
+ "//src/lib/testing/predicates",
"//src/virtualization/lib/grpc:grpc",
"//third_party/grpc:grpc++",
"//zircon/system/ulib/async-loop:async-loop-cpp",
]
+
+ # TODO(fxbug.dev/68325): Fix UB within GRPC and re-enable ubsan.
+ exclude_toolchain_tags = [ "ubsan" ]
}
executable("server_operation_test") {
@@ -145,6 +149,9 @@
"//third_party/grpc:grpc++",
"//zircon/system/ulib/async-loop:async-loop-cpp",
]
+
+ # TODO(fxbug.dev/68325): Fix UB within GRPC and re-enable ubsan.
+ exclude_toolchain_tags = [ "ubsan" ]
}
test_package("guest_interaction_tests") {
@@ -165,41 +172,20 @@
tests = [
{
name = "client_operation_test"
-
- # TODO(fxbug.dev/47487): Switch to [ nuc_env ] once gRPC leaks are fixed.
- if (nuc_env_fails_on_asan != []) {
- nuc_scope = nuc_env_fails_on_asan[0]
-
- environments = [
- {
- dimensions = nuc_scope.dimensions
-
- # TODO(fxbug.dev/59639): This test appears to flake roughly once per month during the
- # Client_Exec_ImmediateFailure test.
- tags = [ "flaky" ]
- },
- ]
- } else {
- # Disable this test for all non-NUC environments.
- environments = []
- }
},
{
name = "server_operation_test"
-
- # TODO(fxbug.dev/48556): Reenable once gRPC leaks are fixed.
- environments = nuc_env_fails_on_asan
},
{
name = "server_daemon_test"
- # TODO(fxbug.dev/48127): Currently hangs in ASan. Fix and switch to [ nuc_env ].
+ # TODO(fxbug.dev/48129): Currently hangs in ASan. Fix and switch to [ nuc_env ].
environments = nuc_env_fails_on_asan
},
{
name = "guest_interaction_service_test"
- # TODO(fxbug.dev/48127): Currently hangs in ASan. Fix and switch to [ nuc_env ].
+ # TODO(fxbug.dev/48129): Currently hangs in ASan. Fix and switch to [ nuc_env ].
environments = nuc_env_fails_on_asan
},
]
diff --git a/src/virtualization/lib/guest_interaction/server/server_operation_state.h b/src/virtualization/lib/guest_interaction/server/server_operation_state.h
index df45612..0826f08 100644
--- a/src/virtualization/lib/guest_interaction/server/server_operation_state.h
+++ b/src/virtualization/lib/guest_interaction/server/server_operation_state.h
@@ -173,6 +173,7 @@
// or the client has sent a final empty byte
void TryWrite();
void SendFinalStatus(OperationStatus status);
+ void Finish();
// gRPC async boilerplate
GuestInteractionService::AsyncService* service_;
@@ -190,6 +191,10 @@
template <class T>
void PutCallData<T>::Proceed(bool ok) {
+ if (!ok && status_ != TRANSFER) {
+ Finish();
+ return;
+ }
switch (status_) {
case CREATE:
status_ = INITIATE_TRANSFER;
@@ -209,15 +214,20 @@
TryWrite();
return;
case FINISH:
- if (fd_ > 0) {
- platform_interface_.CloseFile(fd_);
- }
- delete this;
+ Finish();
return;
}
}
template <class T>
+void PutCallData<T>::Finish() {
+ if (fd_ > 0) {
+ platform_interface_.CloseFile(fd_);
+ }
+ delete this;
+}
+
+template <class T>
void PutCallData<T>::SendFinalStatus(OperationStatus status) {
PutResponse put_response;
put_response.set_status(status);
@@ -255,6 +265,7 @@
if (platform_interface_.WriteFile(fd_, new_request_.data().c_str(),
new_request_.data().length()) < 0) {
SendFinalStatus(OperationStatus::SERVER_FILE_WRITE_FAILURE);
+ return;
}
reader_.Read(&new_request_, this);
}
@@ -360,7 +371,7 @@
std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> stream_;
ExecRequest exec_request_;
- enum CallStatus { INITIATE_READ, FORK, FINISH_IN_ERROR };
+ enum class CallStatus { INITIATE_READ, FORK, FINISH_IN_ERROR };
CallStatus status_;
};
@@ -508,7 +519,7 @@
template <class T>
ExecCallData<T>::ExecCallData(GuestInteractionService::AsyncService* service,
grpc::ServerCompletionQueue* cq)
- : service_(service), cq_(cq), status_(INITIATE_READ) {
+ : service_(service), cq_(cq), status_(CallStatus::INITIATE_READ) {
// The ServerContext provides connection metadata to the streaming
// reader/writer. No context actually needs to be preserved, but
// ExecCallData is just the entry point to the exec process. Once the
@@ -525,92 +536,100 @@
// first command, fork and hand off control to a dedicated reader/writer.
template <class T>
void ExecCallData<T>::Proceed(bool ok) {
- if (status_ == INITIATE_READ) {
- new ExecCallData(service_, cq_);
- stream_->Read(&exec_request_, this);
- status_ = FORK;
- } else if (status_ != FORK) {
- GPR_ASSERT(status_ == FINISH_IN_ERROR);
- delete this;
- return;
- } else {
- // Generate string forms of the argv and environment variables
- std::vector<std::string> args = platform_interface_.ParseCommand(exec_request_.argv());
- std::vector<std::string> env = CreateEnv(exec_request_);
-
- // Repackage the string representations as C-strings
- std::vector<char*> exec_args;
- for (const std::string& arg : args) {
- exec_args.push_back(const_cast<char*>(arg.c_str()));
- }
- exec_args.push_back(nullptr);
-
- std::vector<char*> exec_env;
- for (const std::string& env_pair : env) {
- exec_env.push_back(const_cast<char*>(env_pair.c_str()));
- }
- exec_env.push_back(nullptr);
-
- // Create the arguments to exec from the C-string vectors
- char** args_ptr;
- if (args.empty()) {
- ExecResponse exec_response;
- exec_response.clear_std_out();
- exec_response.clear_std_err();
- exec_response.clear_ret_code();
- exec_response.set_status(OperationStatus::SERVER_EXEC_COMMAND_PARSE_FAILURE);
-
- stream_->WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, this);
- status_ = FINISH_IN_ERROR;
-
- return;
- }
-
- args_ptr = exec_args.data();
-
- char** env_ptr;
- if (exec_env.empty()) {
- env_ptr = nullptr;
- } else {
- env_ptr = exec_env.data();
- }
-
- // File descriptors to be populated when exec-ing
- int32_t std_in;
- int32_t std_out;
- int32_t std_err;
-
- // fork and exec
- int32_t child_pid = platform_interface_.Exec(args_ptr, env_ptr, &std_in, &std_out, &std_err);
-
- if (child_pid < 0) {
- ExecResponse exec_response;
- exec_response.clear_std_out();
- exec_response.clear_std_err();
- exec_response.clear_ret_code();
- exec_response.set_status(OperationStatus::SERVER_EXEC_FORK_FAILURE);
-
- stream_->WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, this);
-
- platform_interface_.CloseFile(std_in);
- platform_interface_.CloseFile(std_out);
- platform_interface_.CloseFile(std_err);
- status_ = FINISH_IN_ERROR;
- return;
- }
- // Set read FD's to nonblocking mode.
- platform_interface_.SetFileNonblocking(std_out);
- platform_interface_.SetFileNonblocking(std_err);
-
- // If the client specified any stdin, write that into the stdin FD.
- platform_interface_.WriteFile(std_in, exec_request_.std_in().c_str(),
- exec_request_.std_in().size());
-
- new ExecReadCallData<T>(ctx_, stream_, child_pid, std_in);
- new ExecWriteCallData<T>(ctx_, stream_, child_pid, std_out, std_err);
+ // State machine doesn't expect a not ok command queue event on this tag.
+ // Get rid of resource if that happens.
+ if (!ok) {
delete this;
return;
}
+ switch (status_) {
+ case CallStatus::INITIATE_READ:
+ new ExecCallData(service_, cq_);
+ stream_->Read(&exec_request_, this);
+ status_ = CallStatus::FORK;
+ return;
+ case CallStatus::FINISH_IN_ERROR:
+ delete this;
+ return;
+ case CallStatus::FORK:
+ break;
+ }
+
+ // Generate string forms of the argv and environment variables
+ std::vector<std::string> args = platform_interface_.ParseCommand(exec_request_.argv());
+ std::vector<std::string> env = CreateEnv(exec_request_);
+
+ // Repackage the string representations as C-strings
+ std::vector<char*> exec_args;
+ for (const std::string& arg : args) {
+ exec_args.push_back(const_cast<char*>(arg.c_str()));
+ }
+ exec_args.push_back(nullptr);
+
+ std::vector<char*> exec_env;
+ for (const std::string& env_pair : env) {
+ exec_env.push_back(const_cast<char*>(env_pair.c_str()));
+ }
+ exec_env.push_back(nullptr);
+
+ // Create the arguments to exec from the C-string vectors
+ char** args_ptr;
+ if (args.empty()) {
+ ExecResponse exec_response;
+ exec_response.clear_std_out();
+ exec_response.clear_std_err();
+ exec_response.clear_ret_code();
+ exec_response.set_status(OperationStatus::SERVER_EXEC_COMMAND_PARSE_FAILURE);
+
+ stream_->WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, this);
+ status_ = CallStatus::FINISH_IN_ERROR;
+
+ return;
+ }
+
+ args_ptr = exec_args.data();
+
+ char** env_ptr;
+ if (exec_env.empty()) {
+ env_ptr = nullptr;
+ } else {
+ env_ptr = exec_env.data();
+ }
+
+ // File descriptors to be populated when exec-ing
+ int32_t std_in;
+ int32_t std_out;
+ int32_t std_err;
+
+ // fork and exec
+ int32_t child_pid = platform_interface_.Exec(args_ptr, env_ptr, &std_in, &std_out, &std_err);
+
+ if (child_pid < 0) {
+ ExecResponse exec_response;
+ exec_response.clear_std_out();
+ exec_response.clear_std_err();
+ exec_response.clear_ret_code();
+ exec_response.set_status(OperationStatus::SERVER_EXEC_FORK_FAILURE);
+
+ stream_->WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, this);
+
+ platform_interface_.CloseFile(std_in);
+ platform_interface_.CloseFile(std_out);
+ platform_interface_.CloseFile(std_err);
+ status_ = CallStatus::FINISH_IN_ERROR;
+ return;
+ }
+ // Set read FD's to nonblocking mode.
+ platform_interface_.SetFileNonblocking(std_out);
+ platform_interface_.SetFileNonblocking(std_err);
+
+ // If the client specified any stdin, write that into the stdin FD.
+ platform_interface_.WriteFile(std_in, exec_request_.std_in().c_str(),
+ exec_request_.std_in().size());
+
+ new ExecReadCallData<T>(ctx_, stream_, child_pid, std_in);
+ new ExecWriteCallData<T>(ctx_, stream_, child_pid, std_out, std_err);
+ delete this;
}
template <class T>
diff --git a/src/virtualization/lib/guest_interaction/test/client_operation_test.cc b/src/virtualization/lib/guest_interaction/test/client_operation_test.cc
index b348302..f1031de 100644
--- a/src/virtualization/lib/guest_interaction/test/client_operation_test.cc
+++ b/src/virtualization/lib/guest_interaction/test/client_operation_test.cc
@@ -7,6 +7,7 @@
#include <gtest/gtest.h>
+#include "src/lib/testing/predicates/status.h"
#include "src/virtualization/lib/guest_interaction/client/client_operation_state.h"
#include "src/virtualization/lib/guest_interaction/test/operation_test_lib.h"
@@ -22,18 +23,13 @@
// 6. Server immediately hangs up on client at start of transfer.
TEST_F(AsyncEndToEndTest, GetMissingFile) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Create a service that can accept incoming Get requests.
GetRequest incoming_request;
grpc::ServerContext srv_ctx;
grpc::ServerAsyncWriter<GetResponse> response_writer(&srv_ctx);
service_->RequestGet(&srv_ctx, &incoming_request, &response_writer, server_cq_.get(),
- server_cq_.get(), this);
+ server_cq_.get(), &srv_ctx);
// Create components required to perform a client Get request.
zx_status_t operation_status = ZX_OK;
@@ -48,48 +44,44 @@
stub_->AsyncGet(&(client_call_data->ctx_), get_request, client_cq_.get(), client_call_data);
// Wait for the request to go out.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server CompletionQueue should get the client request.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
GetResponse outgoing_response;
outgoing_response.clear_data();
outgoing_response.set_status(OperationStatus::SERVER_MISSING_FILE_FAILURE);
- response_writer.Write(outgoing_response, nullptr);
+ response_writer.Write(outgoing_response, &outgoing_response);
// Client should get the server's message and then wait for the server to
// call Finish.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server finishes.
- server_cq_->Next(&tag, &cq_status);
- response_writer.Finish(grpc::Status::OK, nullptr);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &outgoing_response, true);
+ response_writer.Finish(grpc::Status::OK, &outgoing_response);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &outgoing_response, true);
// Client gets final status from server, runs the callback, and then
// deletes itself.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// The client sets the operation status in the callback.
- ASSERT_EQ(operation_status, ZX_ERR_NOT_FOUND);
+ ASSERT_STATUS(operation_status, ZX_ERR_NOT_FOUND);
}
TEST_F(AsyncEndToEndTest, SmallFile) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Create a service that can accept incoming Get requests.
GetRequest incoming_request;
grpc::ServerContext srv_ctx;
grpc::ServerAsyncWriter<GetResponse> response_writer(&srv_ctx);
service_->RequestGet(&srv_ctx, &incoming_request, &response_writer, server_cq_.get(),
- server_cq_.get(), this);
+ server_cq_.get(), &srv_ctx);
// Create components required to perform a client Get request.
zx_status_t operation_status = ZX_ERR_PEER_CLOSED;
@@ -103,8 +95,8 @@
stub_->AsyncGet(&(client_call_data->ctx_), get_request, client_cq_.get(), client_call_data);
// Wait for the request to go out.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// The mock will notify the client that all writes are successful.
client_call_data->platform_interface_.SetOpenFileReturn(1);
@@ -112,48 +104,44 @@
// Server CompletionQueue should get the client request.
// Send back a short message.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
GetResponse outgoing_response;
outgoing_response.set_data("Small file contents");
outgoing_response.set_status(OperationStatus::OK);
- response_writer.Write(outgoing_response, nullptr);
+ response_writer.Write(outgoing_response, &response_writer);
// Client should get the server's message and then wait for the server to
// send more data.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server finishes, indicating that there is no more data.
- server_cq_->Next(&tag, &cq_status);
- response_writer.Finish(grpc::Status::OK, nullptr);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
+ response_writer.Finish(grpc::Status::OK, &response_writer);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
// Client should get a bad status from the queue and then wait for the query
// of the finish status.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, false);
+ client_call_data->Proceed(false);
// Client gets final status, runs the callback, and then deletes itself.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// The client sets the operation status in the callback.
- ASSERT_EQ(operation_status, ZX_OK);
+ ASSERT_OK(operation_status);
}
TEST_F(AsyncEndToEndTest, LargeFile) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Create a service that can accept incoming Get requests.
GetRequest incoming_request;
grpc::ServerContext srv_ctx;
grpc::ServerAsyncWriter<GetResponse> response_writer(&srv_ctx);
service_->RequestGet(&srv_ctx, &incoming_request, &response_writer, server_cq_.get(),
- server_cq_.get(), this);
+ server_cq_.get(), &response_writer);
// Create components required to perform a client Get request.
zx_status_t operation_status = ZX_ERR_PEER_CLOSED;
@@ -167,8 +155,8 @@
stub_->AsyncGet(&(client_call_data->ctx_), get_request, client_cq_.get(), client_call_data);
// Wait for the request to go out.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// The mock will notify the client that all writes are successful.
client_call_data->platform_interface_.SetOpenFileReturn(1);
@@ -176,61 +164,57 @@
// Server CompletionQueue should get the client request.
// Send back a short message.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
GetResponse outgoing_response;
outgoing_response.set_data("large file contents");
outgoing_response.set_status(OperationStatus::OK);
- response_writer.Write(outgoing_response, nullptr);
+ response_writer.Write(outgoing_response, &response_writer);
// Client should get the server's message and then wait for the server to
// send more data.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server CompletionQueue should get the client request.
// Send back a short message.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
outgoing_response.set_data("large file contents");
outgoing_response.set_status(OperationStatus::OK);
- response_writer.Write(outgoing_response, nullptr);
+ response_writer.Write(outgoing_response, &response_writer);
// Client should get the server's message and then wait for the server to
// send more data.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server finishes, indicating that there is no more data.
- server_cq_->Next(&tag, &cq_status);
- response_writer.Finish(grpc::Status::OK, nullptr);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
+ response_writer.Finish(grpc::Status::OK, &response_writer);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
// Client should get a bad status from the queue and then wait for the query
// the finish status.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, false);
+ client_call_data->Proceed(false);
// Client gets final status, runs the callback, and then deletes itself.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// The client sets the operation status in the callback.
- ASSERT_EQ(operation_status, ZX_OK);
+ ASSERT_OK(operation_status);
}
TEST_F(AsyncEndToEndTest, BrokenWrite) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Create a service that can accept incoming Get requests.
GetRequest incoming_request;
grpc::ServerContext srv_ctx;
grpc::ServerAsyncWriter<GetResponse> response_writer(&srv_ctx);
service_->RequestGet(&srv_ctx, &incoming_request, &response_writer, server_cq_.get(),
- server_cq_.get(), this);
+ server_cq_.get(), &response_writer);
// Create components required to perform a client Get request.
zx_status_t operation_status = ZX_ERR_PEER_CLOSED;
@@ -244,8 +228,8 @@
stub_->AsyncGet(&(client_call_data->ctx_), get_request, client_cq_.get(), client_call_data);
// Wait for the request to go out.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// The mock will notify the client that write has failed.
client_call_data->platform_interface_.SetOpenFileReturn(1);
@@ -254,42 +238,38 @@
// Server CompletionQueue should get the client request.
// Send back a short message.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
GetResponse outgoing_response;
outgoing_response.set_data("Small file contents");
outgoing_response.set_status(OperationStatus::OK);
- response_writer.Write(outgoing_response, nullptr);
+ response_writer.Write(outgoing_response, &response_writer);
// Client should get the server's message, fail to write, and then finish.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server finishes, indicating that there is no more data.
- server_cq_->Next(&tag, &cq_status);
- response_writer.Finish(grpc::Status::OK, nullptr);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
+ response_writer.Finish(grpc::Status::OK, &response_writer);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
// Client finishes and deletes itself.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// The client sets the operation status in the callback.
- ASSERT_EQ(operation_status, ZX_ERR_IO);
+ ASSERT_STATUS(operation_status, ZX_ERR_IO);
}
TEST_F(AsyncEndToEndTest, GrpcFailure) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Create a service that can accept incoming Get requests.
GetRequest incoming_request;
grpc::ServerContext srv_ctx;
grpc::ServerAsyncWriter<GetResponse> response_writer(&srv_ctx);
service_->RequestGet(&srv_ctx, &incoming_request, &response_writer, server_cq_.get(),
- server_cq_.get(), this);
+ server_cq_.get(), &response_writer);
// Create components required to perform a client Get request.
zx_status_t operation_status = ZX_OK;
@@ -304,19 +284,20 @@
// Wait for the request to go out and then tell the client that it was
// unsuccessful.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
client_call_data->Proceed(false);
// Server finishes, indicating that there is no more data.
- server_cq_->Next(&tag, &cq_status);
- response_writer.Finish(grpc::Status::OK, nullptr);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
+ response_writer.Finish(grpc::Status::OK, &response_writer);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &response_writer, true);
// Client finishes and deletes itself.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// The client sets the operation status in the callback.
- ASSERT_EQ(operation_status, ZX_ERR_PEER_CLOSED);
+ ASSERT_STATUS(operation_status, ZX_ERR_PEER_CLOSED);
}
// Client Put State Machine Test Cases
@@ -327,17 +308,12 @@
// 4. gRPC fails while the client is transferring the file.
TEST_F(AsyncEndToEndTest, PutReadFails) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Create a service that can accept incoming Get requests.
zx_status_t operation_status = ZX_OK;
grpc::ServerContext srv_ctx;
grpc::ServerAsyncReader<PutResponse, PutRequest> request_reader(&srv_ctx);
- service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), this);
+ service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), &srv_ctx);
// Create components required to perform a client Put request.
int32_t fake_fd = 0;
@@ -350,9 +326,9 @@
client_call_data);
// Server should get the client request.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
PutRequest put_request;
- request_reader.Read(&put_request, nullptr);
+ request_reader.Read(&put_request, &request_reader);
// Set the mock up to inform the client that the source file exists but
// fails to open.
@@ -361,40 +337,35 @@
client_call_data->platform_interface_.SetReadFileReturn(-1);
// Wait for the request to go out.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server CompletionQueue should get the client's finish message.
- server_cq_->Next(&tag, &cq_status);
- ASSERT_FALSE(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, false);
PutResponse put_response;
put_response.set_status(OperationStatus::OK);
- request_reader.Finish(put_response, grpc::Status::OK, nullptr);
+ request_reader.Finish(put_response, grpc::Status::OK, &request_reader);
// Client should get the server's finish message and delete itself.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
// The client sets the operation status in the callback.
- ASSERT_EQ(operation_status, ZX_ERR_IO);
+ ASSERT_STATUS(operation_status, ZX_ERR_IO);
}
TEST_F(AsyncEndToEndTest, PutOneFragment) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Create a service that can accept incoming Get requests.
zx_status_t operation_status = ZX_ERR_IO;
grpc::ServerContext srv_ctx;
grpc::ServerAsyncReader<PutResponse, PutRequest> request_reader(&srv_ctx);
- service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), this);
+ service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), &srv_ctx);
// Create components required to perform a client Put request.
int32_t fake_fd = 0;
@@ -407,9 +378,9 @@
client_call_data);
// Server should get the client request.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
PutRequest put_request;
- request_reader.Read(&put_request, nullptr);
+ request_reader.Read(&put_request, &request_reader);
// Set the mock up to inform the client that the source file exists but
// fails to open.
@@ -418,50 +389,44 @@
client_call_data->platform_interface_.SetReadFileContents("test");
// Wait for the request to go out.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server CompletionQueue should get the client's message and request another
// file fragment.
- server_cq_->Next(&tag, &cq_status);
- ASSERT_TRUE(cq_status);
- request_reader.Read(&put_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
+ request_reader.Read(&put_request, &request_reader);
// Client hits the end of the file and finishes.
client_call_data->platform_interface_.SetReadFileContents("");
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server gets the finish and finishes with the client.
- server_cq_->Next(&tag, &cq_status);
- ASSERT_FALSE(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, false);
PutResponse put_response;
put_response.set_status(OperationStatus::OK);
- request_reader.Finish(put_response, grpc::Status::OK, nullptr);
+ request_reader.Finish(put_response, grpc::Status::OK, &request_reader);
// Client should get the server's finish message and delete itself.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
// The client sets the operation status in the callback.
- ASSERT_EQ(operation_status, ZX_OK);
+ ASSERT_OK(operation_status);
}
TEST_F(AsyncEndToEndTest, PutMultipleFragments) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Create a service that can accept incoming Get requests.
zx_status_t operation_status = ZX_ERR_IO;
grpc::ServerContext srv_ctx;
grpc::ServerAsyncReader<PutResponse, PutRequest> request_reader(&srv_ctx);
- service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), this);
+ service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), &srv_ctx);
// Create components required to perform a client Put request.
int32_t fake_fd = 0;
@@ -474,9 +439,9 @@
client_call_data);
// Server should get the client request.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
PutRequest put_request;
- request_reader.Read(&put_request, nullptr);
+ request_reader.Read(&put_request, &request_reader);
// Set the mock up to inform the client that the source file exists but
// fails to open.
@@ -485,60 +450,53 @@
client_call_data->platform_interface_.SetReadFileContents("test");
// Wait for the request to go out.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server CompletionQueue should get the client's message and request another
// file fragment.
- server_cq_->Next(&tag, &cq_status);
- ASSERT_TRUE(cq_status);
- request_reader.Read(&put_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
+ request_reader.Read(&put_request, &request_reader);
// Send a second file fragment.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server CompletionQueue should get the client's message and request another
// file fragment.
- server_cq_->Next(&tag, &cq_status);
- ASSERT_TRUE(cq_status);
- request_reader.Read(&put_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
+ request_reader.Read(&put_request, &request_reader);
// Client hits the end of the file and writes done.
client_call_data->platform_interface_.SetReadFileContents("");
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server gets the finish and finishes with the client.
- server_cq_->Next(&tag, &cq_status);
- ASSERT_FALSE(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, false);
PutResponse put_response;
put_response.set_status(OperationStatus::OK);
- request_reader.Finish(put_response, grpc::Status::OK, nullptr);
+ request_reader.Finish(put_response, grpc::Status::OK, &request_reader);
// Client should get the server's finish message and delete itself.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
// The client sets the operation status in the callback.
- ASSERT_EQ(operation_status, ZX_OK);
+ ASSERT_OK(operation_status);
}
TEST_F(AsyncEndToEndTest, PutGrpcFailure) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Create a service that can accept incoming Get requests.
zx_status_t operation_status = ZX_OK;
grpc::ServerContext srv_ctx;
grpc::ServerAsyncReader<PutResponse, PutRequest> request_reader(&srv_ctx);
- service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), this);
+ service_->RequestPut(&srv_ctx, &request_reader, server_cq_.get(), server_cq_.get(), &srv_ctx);
// Create components required to perform a client Put request.
int32_t fake_fd = 0;
@@ -551,9 +509,9 @@
client_call_data);
// Server should get the client request.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
PutRequest put_request;
- request_reader.Read(&put_request, nullptr);
+ request_reader.Read(&put_request, &request_reader);
// Set the mock up to inform the client that the source file exists but
// fails to open.
@@ -562,19 +520,21 @@
client_call_data->platform_interface_.SetReadFileContents("test");
// Wait for the request to go out.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server CompletionQueue should get the client's message.
- server_cq_->Next(&tag, &cq_status);
- request_reader.Read(&put_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, true);
+ request_reader.Read(&put_request, &request_reader);
// Inject a gRPC failure into the client procedure.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
client_call_data->Proceed(false);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &request_reader, false);
+
// The client sets the operation status in the callback.
- ASSERT_EQ(operation_status, ZX_ERR_PEER_CLOSED);
+ ASSERT_STATUS(operation_status, ZX_ERR_PEER_CLOSED);
}
// Client Exec State Machine Tests
@@ -585,25 +545,24 @@
// 3. Server sends stdout/stderr and then terminates the transfer.
TEST_F(AsyncEndToEndTest, Client_Exec_ImmediateFailure) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Create a service that can accept incoming Exec requests.
- zx_status_t operation_status = ZX_OK;
- zx_status_t termination_status = ZX_OK;
+ bool operation_status_done = false;
+ bool termination_status_done = false;
grpc::ServerContext srv_ctx;
grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest> rw(&srv_ctx);
- service_->RequestExec(&srv_ctx, &rw, server_cq_.get(), server_cq_.get(), this);
+ service_->RequestExec(&srv_ctx, &rw, server_cq_.get(), server_cq_.get(), &srv_ctx);
fuchsia::netemul::guest::CommandListenerPtr listener;
- listener.events().OnStarted = [&operation_status](zx_status_t status) {
- operation_status = status;
+ listener.events().OnStarted = [&operation_status_done](zx_status_t status) {
+ operation_status_done = true;
+ EXPECT_STATUS(status, ZX_ERR_INTERNAL);
+ operation_status_done = status;
};
- listener.events().OnTerminated = [&termination_status](zx_status_t status, int32_t exit_code) {
- termination_status = status;
+ listener.events().OnTerminated = [&termination_status_done](zx_status_t status,
+ int32_t exit_code) {
+ EXPECT_STATUS(status, ZX_ERR_PEER_CLOSED);
+ termination_status_done = true;
};
std::unique_ptr<ListenerInterface> listener_interface =
std::make_unique<ListenerInterface>(listener.NewRequest());
@@ -617,72 +576,69 @@
stub_->AsyncExec(client_call_data->ctx_.get(), client_cq_.get(), client_call_data);
// Server should get the new stub request.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
// Inject a failure into the client.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
client_call_data->Proceed(false);
// The client sets the operation status in the callback.
- WaitForCallback(&operation_status, ZX_ERR_INTERNAL);
- WaitForCallback(&termination_status, ZX_ERR_PEER_CLOSED);
+ RunLoopUntil([&operation_status_done, &termination_status_done]() {
+ return operation_status_done && termination_status_done;
+ });
}
TEST_F(AsyncEndToEndTest, Client_ExecWrite_Test) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Create a service that can accept incoming Exec requests.
grpc::ServerContext srv_ctx;
std::shared_ptr<grpc::ClientContext> cli_ctx = std::make_shared<grpc::ClientContext>();
grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest> srv_rw(&srv_ctx);
std::shared_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
- service_->RequestExec(&srv_ctx, &srv_rw, server_cq_.get(), server_cq_.get(), this);
+ service_->RequestExec(&srv_ctx, &srv_rw, server_cq_.get(), server_cq_.get(), &srv_ctx);
// Create components required to perform a client Exec request.
std::string empty_argv = "echo hello";
std::vector<ExecEnv> empty_env;
ExecWriteCallData<FakePlatform>* client_call_data;
- cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), &client_call_data);
+ cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
// Clear the initial event that is generated by the stub creation. This
// would normally be handled by the top-level ExecCallData.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
client_call_data = new ExecWriteCallData<FakePlatform>(empty_argv, empty_env, 0, cli_ctx, cli_rw);
// Server should get the new stub request and begin reading.
ExecRequest exec_request;
- server_cq_->Next(&tag, &cq_status);
- srv_rw.Read(&exec_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
+ srv_rw.Read(&exec_request, &exec_request);
// Client should read successfully from stdin and send a message to the
// server.
client_call_data->platform_interface_.SetReadFileContents("test");
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server should continue reading.
- server_cq_->Next(&tag, &cq_status);
- srv_rw.Read(&exec_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &exec_request, true);
+ srv_rw.Read(&exec_request, &exec_request);
// Client should hit end of file on stdin.
client_call_data->platform_interface_.SetReadFileContents("");
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Server should finish.
- server_cq_->Next(&tag, &cq_status);
- srv_rw.Finish(grpc::Status::OK, nullptr);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &exec_request, true);
+ srv_rw.Finish(grpc::Status::OK, &exec_request);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &exec_request, true);
// Client should get the finish message and delete itself.
int32_t initial_use_count = cli_rw.use_count();
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
int32_t final_use_count = cli_rw.use_count();
@@ -693,19 +649,16 @@
}
TEST_F(AsyncEndToEndTest, Client_ExecRead_Test) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
+ constexpr int32_t kReturnCode = 1234;
// Create a service that can accept incoming Exec requests.
- zx_status_t operation_status = ZX_ERR_PEER_CLOSED;
- int32_t return_code;
+ bool operation_status_done = false;
fuchsia::netemul::guest::CommandListenerPtr listener;
- listener.events().OnTerminated = [&operation_status, &return_code](zx_status_t status,
- int32_t ret_code) {
- operation_status = status;
- return_code = ret_code;
+ listener.events().OnTerminated = [&operation_status_done, kReturnCode](zx_status_t status,
+ int32_t ret_code) {
+ operation_status_done = true;
+ EXPECT_OK(status);
+ EXPECT_EQ(ret_code, kReturnCode);
};
std::unique_ptr<ListenerInterface> listener_interface =
std::make_unique<ListenerInterface>(listener.NewRequest());
@@ -715,44 +668,41 @@
grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest> srv_rw(&srv_ctx);
std::shared_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
- service_->RequestExec(&srv_ctx, &srv_rw, server_cq_.get(), server_cq_.get(), this);
+ service_->RequestExec(&srv_ctx, &srv_rw, server_cq_.get(), server_cq_.get(), &srv_ctx);
// Create components required to perform a client Exec request.
ExecReadCallData<FakePlatform>* client_call_data;
- cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), &client_call_data);
+ cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
// Clear the inital event that is generated by the stub creation. This would
// normally be handled by the top-level ExecCallData.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
client_call_data =
new ExecReadCallData<FakePlatform>(0, 0, cli_ctx, cli_rw, std::move(listener_interface));
// Server should get the new stub request and immediately finish.
- server_cq_->Next(&tag, &cq_status);
-
- int32_t ret_code = 1234;
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &srv_ctx, true);
ExecResponse exec_response;
exec_response.clear_std_out();
exec_response.clear_std_err();
- exec_response.set_ret_code(ret_code);
- srv_rw.WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, nullptr);
+ exec_response.set_ret_code(kReturnCode);
+ srv_rw.WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, &exec_response);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, &exec_response, true);
// Client should get the message.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// Client should get the finish message.
- client_cq_->Next(&tag, &cq_status);
- ASSERT_FALSE(cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, false);
+ client_call_data->Proceed(false);
// Client should run the callback and clean up.
- client_cq_->Next(&tag, &cq_status);
- client_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, client_call_data, true);
+ client_call_data->Proceed(true);
// The client sets the operation status in the callback.
- WaitForCallback(&operation_status, ZX_OK);
- ASSERT_EQ(return_code, ret_code);
+ RunLoopUntil([&operation_status_done]() { return operation_status_done; });
}
diff --git a/src/virtualization/lib/guest_interaction/test/operation_test_lib.h b/src/virtualization/lib/guest_interaction/test/operation_test_lib.h
index 90f44f9..a214382 100644
--- a/src/virtualization/lib/guest_interaction/test/operation_test_lib.h
+++ b/src/virtualization/lib/guest_interaction/test/operation_test_lib.h
@@ -9,11 +9,15 @@
#include <fcntl.h>
#include <lib/async-loop/cpp/loop.h>
#include <lib/async-loop/default.h>
+#include <lib/fit/function.h>
#include <lib/syslog/cpp/macros.h>
+#include <lib/zx/clock.h>
#include <netinet/in.h>
#include <sys/socket.h>
+#include <zircon/status.h>
#include <iostream>
+#include <memory>
#include <thread>
#include <grpc/support/log.h>
@@ -22,96 +26,66 @@
#include <grpc++/grpc++.h>
-// Adapted from gRPC's async_end2end_test.cc
-class TestScenario {
- public:
- TestScenario(bool non_block, bool inproc_stub, const grpc::string& creds_type,
- const grpc::string& content)
- : disable_blocking(non_block),
- inproc(inproc_stub),
- credentials_type(creds_type),
- message_content(content) {}
- void Log() const;
- bool disable_blocking;
- bool inproc;
- const grpc::string credentials_type;
- const grpc::string message_content;
-};
+// Helper macro used in tests to wait for next event on GRPC completion queues
+// and assert on the returned values.
+#define ASSERT_GRPC_CQ_NEXT(cq, expect_tag, expect_ok) \
+ { \
+ bool ok__; \
+ void* tag__; \
+ ASSERT_TRUE((cq)->Next(&tag__, &ok__)); \
+ ASSERT_EQ(tag__, static_cast<void*>(expect_tag)); \
+ ASSERT_EQ(ok__, expect_ok); \
+ }
-static std::ostream& operator<<(std::ostream& out, const TestScenario& scenario) {
- return out << "TestScenario{disable_blocking=" << (scenario.disable_blocking ? "true" : "false")
- << ", inproc=" << (scenario.inproc ? "true" : "false") << ", credentials='"
- << scenario.credentials_type << "', message_size=" << scenario.message_content.size()
- << "}";
-}
-
-void TestScenario::Log() const {
- std::ostringstream out;
- out << *this;
- gpr_log(GPR_DEBUG, "%s", out.str().c_str());
-}
-
-class AsyncEndToEndTest : public testing::TestWithParam<TestScenario> {
+class AsyncEndToEndTest : public testing::Test {
protected:
- AsyncEndToEndTest() {}
-
- void SetUp() override {
- client_cq_ = std::make_unique<grpc::CompletionQueue>();
- ASSERT_EQ(loop_.ResetQuit(), ZX_OK);
- ASSERT_EQ(loop_.StartThread(), ZX_OK);
-
+ AsyncEndToEndTest()
+ : client_cq_(std::make_unique<grpc::CompletionQueue>()),
+ service_(std::make_unique<GuestInteractionService::AsyncService>()) {
// Setup server
- BuildAndStartServer();
- }
-
- void TearDown() override {
- server_->Shutdown();
-
- // The server shutdown calls shutdown on the server's CompletionQueue. The
- // client's CompletionQueue needs to be cleaned up manually.
- void* ignored_tag;
- bool ignored_ok;
-
- client_cq_->Shutdown();
- while (client_cq_->Next(&ignored_tag, &ignored_ok))
- ;
-
- stub_.reset();
- loop_.Quit();
- loop_.JoinThreads();
- }
-
- void BuildAndStartServer() {
grpc::ServerBuilder builder;
- service_.reset(new GuestInteractionService::AsyncService());
builder.RegisterService(service_.get());
server_cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
- }
- void ResetStub() {
+ // Setup stub
grpc::ChannelArguments args;
std::shared_ptr<grpc::Channel> channel = server_->InProcessChannel(args);
stub_ = GuestInteractionService::NewStub(channel);
}
- void WaitForCallback(zx_status_t* returned_status, zx_status_t desired_status) {
- for (uint32_t i = 0; i < callback_wait_time_; i++) {
- if (*returned_status == desired_status) {
- break;
- }
- sleep(1);
+ void TearDown() override {
+ server_->Shutdown();
+ void* tag;
+ bool regular_event;
+
+ server_cq_->Shutdown();
+ while (server_cq_->Next(&tag, ®ular_event)) {
+ static_cast<CallData*>(tag)->Proceed(false);
+ EXPECT_FALSE(regular_event) << "Unexpected remaining event in server cq";
}
- ASSERT_EQ(*returned_status, desired_status);
+
+ client_cq_->Shutdown();
+ while (client_cq_->Next(&tag, ®ular_event)) {
+ EXPECT_FALSE(regular_event) << "Unexpected remaining event in client cq";
+ }
}
+ void RunLoopUntil(fit::function<bool()> check) {
+ constexpr zx::duration kLoopStep = zx::msec(10);
+ while (!check()) {
+ zx_status_t status = loop_.Run(zx::deadline_after(kLoopStep), true);
+ ASSERT_TRUE(status == ZX_ERR_TIMED_OUT || status == ZX_OK)
+ << "Failed to run loop " << zx_status_get_string(status);
+ }
+ }
+
+ const std::unique_ptr<grpc::CompletionQueue> client_cq_;
+ const std::unique_ptr<GuestInteractionService::AsyncService> service_;
std::unique_ptr<grpc::ServerCompletionQueue> server_cq_;
- std::unique_ptr<grpc::CompletionQueue> client_cq_;
std::unique_ptr<GuestInteractionService::Stub> stub_;
std::unique_ptr<grpc::Server> server_;
- std::unique_ptr<GuestInteractionService::AsyncService> service_;
async::Loop loop_ = async::Loop(&kAsyncLoopConfigAttachToCurrentThread);
- uint32_t callback_wait_time_ = 5; // Number of seconds to wait for Exec fidl responses to run.
};
#endif // SRC_VIRTUALIZATION_LIB_GUEST_INTERACTION_TEST_OPERATION_TEST_LIB_H_
diff --git a/src/virtualization/lib/guest_interaction/test/server_operation_test.cc b/src/virtualization/lib/guest_interaction/test/server_operation_test.cc
index 4d6ccac..d32a67d 100644
--- a/src/virtualization/lib/guest_interaction/test/server_operation_test.cc
+++ b/src/virtualization/lib/guest_interaction/test/server_operation_test.cc
@@ -21,11 +21,6 @@
// 5. The file the server is reading from goes into a bad state.
TEST_F(AsyncEndToEndTest, ServerMissingFile) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// The server's call to check if the requested file exists will return false.
GetCallData<FakePlatform>* server_call_data =
new GetCallData<FakePlatform>(service_.get(), server_cq_.get());
@@ -33,47 +28,44 @@
server_call_data->platform_interface_.SetFileExistsReturn(false);
// Create components required to perform a client Get request.
- grpc::ClientContext client_ctx_;
+ grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
GetResponse get_response;
GetRequest get_request;
get_request.set_source("/some/bogus/path");
- std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader_ =
- stub_->AsyncGet(&client_ctx_, get_request, client_cq_.get(), nullptr);
+ std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
+ stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
// Wait for the request to go out, and then request to read from the server.
- client_cq_->Next(&tag, &cq_status);
- reader_->Read(&get_response, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+ reader->Read(&get_response, reader.get());
// Server CompletionQueue should get the client request and reply that the
// requested file does not exist.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client should get the server's message and then wait for the server to
// call Finish.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
op_status = get_response.status();
ASSERT_EQ(op_status, OperationStatus::SERVER_MISSING_FILE_FAILURE);
- reader_->Finish(&grpc_status, nullptr);
+ reader->Finish(&grpc_status, reader.get());
// Server finishes.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client gets final status from server.
- client_cq_->Next(&tag, &cq_status);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerFileOpenFailure) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// The server's call to check if the requested file exists will return false.
GetCallData<FakePlatform>* server_call_data =
new GetCallData<FakePlatform>(service_.get(), server_cq_.get());
@@ -82,47 +74,44 @@
server_call_data->platform_interface_.SetOpenFileReturn(-1);
// Create components required to perform a client Get request.
- grpc::ClientContext client_ctx_;
+ grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
GetResponse get_response;
GetRequest get_request;
get_request.set_source("/file/with/permissions/issues");
- std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader_ =
- stub_->AsyncGet(&client_ctx_, get_request, client_cq_.get(), nullptr);
+ std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
+ stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
// Wait for the request to go out, and then request to read from the server.
- client_cq_->Next(&tag, &cq_status);
- reader_->Read(&get_response, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+ reader->Read(&get_response, reader.get());
// Server CompletionQueue should get the client request and reply that the
// requested file does not exist.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client should get the server's message and then wait for the server to
// call Finish.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
op_status = get_response.status();
ASSERT_EQ(op_status, OperationStatus::SERVER_FILE_READ_FAILURE);
- reader_->Finish(&grpc_status, nullptr);
+ reader->Finish(&grpc_status, reader.get());
// Server finishes.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client gets final status from server.
- client_cq_->Next(&tag, &cq_status);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerUnfragmentedRead) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// The server will be told that the file exists and that it was able to open
// open it. It will be notified that the file is not bad, and EOF will be
// false the first time and then true the second. The call to read from the
@@ -135,61 +124,57 @@
server_call_data->platform_interface_.SetReadFileContents("test");
// Create components required to perform a client Get request.
- grpc::ClientContext client_ctx_;
+ grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
GetResponse get_response;
GetRequest get_request;
get_request.set_source("/some/test/file");
- std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader_ =
- stub_->AsyncGet(&client_ctx_, get_request, client_cq_.get(), nullptr);
+ std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
+ stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
// Wait for the request to go out, and then request to read from the server.
- client_cq_->Next(&tag, &cq_status);
- reader_->Read(&get_response, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+ reader->Read(&get_response, reader.get());
// Server CompletionQueue should get the client request, open the file, and
// send back the first chunk of contents.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client should get the server's message and then wait for more data.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
ASSERT_STREQ(get_response.data().c_str(), "test");
op_status = get_response.status();
- reader_->Read(&get_response, nullptr);
+ reader->Read(&get_response, reader.get());
// The server should hit EOF and send an empty chunk of data back to the
// client.
server_call_data->platform_interface_.SetReadFileContents("");
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
op_status = get_response.status();
- reader_->Read(&get_response, nullptr);
+ reader->Read(&get_response, reader.get());
// The server should then call Finish.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// The client read should fail and then it should finish.
- client_cq_->Next(&tag, &cq_status);
- ASSERT_FALSE(cq_status);
- reader_->Finish(&grpc_status, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), false);
+ reader->Finish(&grpc_status, reader.get());
// Client gets final status from server.
- client_cq_->Next(&tag, &cq_status);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerFragmentedRead) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// The server will be told that the file exists and that it was able to open
// open it. It will be notified that the file is not bad, and EOF will be
// false the first time and then true the second. The call to read from the
@@ -202,70 +187,66 @@
server_call_data->platform_interface_.SetReadFileContents("test");
// Create components required to perform a client Get request.
- grpc::ClientContext client_ctx_;
+ grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
GetResponse get_response;
GetRequest get_request;
get_request.set_source("/some/test/file");
- std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader_ =
- stub_->AsyncGet(&client_ctx_, get_request, client_cq_.get(), nullptr);
+ std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
+ stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
// Wait for the request to go out, and then request to read from the server.
- client_cq_->Next(&tag, &cq_status);
- reader_->Read(&get_response, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+ reader->Read(&get_response, reader.get());
// Server CompletionQueue should get the client request, open the file, and
// send back the first chunk of contents.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client should get the server's message and then wait for more data.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
ASSERT_EQ(get_response.data(), std::string("test"));
op_status = get_response.status();
- reader_->Read(&get_response, nullptr);
+ reader->Read(&get_response, reader.get());
// Repeat the process.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
ASSERT_EQ(get_response.data(), std::string("test"));
op_status = get_response.status();
- reader_->Read(&get_response, nullptr);
+ reader->Read(&get_response, reader.get());
// The server should hit EOF and send an empty chunk of data back to the
// client.
server_call_data->platform_interface_.SetReadFileContents("");
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
op_status = get_response.status();
- reader_->Read(&get_response, nullptr);
+ reader->Read(&get_response, reader.get());
// The server should then call Finish.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// The client read should fail and then it should finish.
- client_cq_->Next(&tag, &cq_status);
- ASSERT_FALSE(cq_status);
- reader_->Finish(&grpc_status, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), false);
+ reader->Finish(&grpc_status, reader.get());
// Client gets final status from server.
- client_cq_->Next(&tag, &cq_status);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerBadFile) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// The server will be told that the file exists and that it was able to open
// open it. When the server goes to read from the file, it will be informed
// that the file is bad.
@@ -277,44 +258,45 @@
server_call_data->platform_interface_.SetReadFileReturn(-1);
// Create components required to perform a client Get request.
- grpc::ClientContext client_ctx_;
+ grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
GetResponse get_response;
GetRequest get_request;
get_request.set_source("/some/test/file");
- std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader_ =
- stub_->AsyncGet(&client_ctx_, get_request, client_cq_.get(), nullptr);
+ std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
+ stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
// Wait for the request to go out and then request to read from the server.
- client_cq_->Next(&tag, &cq_status);
- reader_->Read(&get_response, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+ reader->Read(&get_response, reader.get());
// Server CompletionQueue should get the client request, open the file, and
// send back a response indicating the read failed.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client should get the server's message and then wait for more data.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
op_status = get_response.status();
- reader_->Read(&get_response, nullptr);
+ reader->Read(&get_response, reader.get());
// The server should finish.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// The client will get back a bad status and finish.
- client_cq_->Next(&tag, &cq_status);
- ASSERT_FALSE(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), false);
op_status = get_response.status();
ASSERT_EQ(op_status, OperationStatus::SERVER_FILE_READ_FAILURE);
- reader_->Finish(&grpc_status, nullptr);
+ reader->Finish(&grpc_status, reader.get());
// Client gets final status from server.
- client_cq_->Next(&tag, &cq_status);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
// Server Put State Machine Test
@@ -327,11 +309,6 @@
// 6. Client sends multiple file fragments.
TEST_F(AsyncEndToEndTest, ServerPutDestinationIsDirectory) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// The server's call to check the destination will indicate that it is a
// directory.
PutCallData<FakePlatform>* server_call_data =
@@ -340,7 +317,7 @@
server_call_data->platform_interface_.SetDirectoryExistsReturn(true);
// Create components required to perform a client Put request.
- grpc::ClientContext client_ctx_;
+ grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
PutRequest put_request;
@@ -348,42 +325,41 @@
put_request.set_destination("/some/directory");
put_request.clear_data();
- std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer_ =
- stub_->AsyncPut(&client_ctx_, &put_response, client_cq_.get(), nullptr);
+ std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
+ stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
// Server should get initial client connection and begin reading.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Wait for the request to go out, and then try to write more data to the
// server.
- client_cq_->Next(&tag, &cq_status);
- writer_->Write(put_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+ writer->Write(put_request, writer.get());
// Server CompletionQueue should get the client request and reply that the
// requested file cannot be created.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client will write the entire file even if the server is not listening.
// Call Finish to collect the final status.
// Bug: https://github.com/grpc/grpc/issues/14812
- client_cq_->Next(&tag, &cq_status);
- writer_->Finish(&grpc_status, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+ writer->Finish(&grpc_status, writer.get());
// Client gets final status from server.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
op_status = put_response.status();
+
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
+
ASSERT_EQ(op_status, OperationStatus::SERVER_CREATE_FILE_FAILURE);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerPutCreateDirectoryFailure) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// The server's call to check the destination will indicate that it is a
// directory.
PutCallData<FakePlatform>* server_call_data =
@@ -393,7 +369,7 @@
server_call_data->platform_interface_.SetCreateDirectoryReturn(false);
// Create components required to perform a client Put request.
- grpc::ClientContext client_ctx_;
+ grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
PutRequest put_request;
@@ -401,41 +377,40 @@
put_request.set_destination("/privilege/issues");
put_request.clear_data();
- std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer_ =
- stub_->AsyncPut(&client_ctx_, &put_response, client_cq_.get(), nullptr);
+ std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
+ stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
// Server CompletionQueue will notify that there is a new client stub.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Wait for the request to go out, and then try to write more data to the
// server.
- client_cq_->Next(&tag, &cq_status);
- writer_->Write(put_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+ writer->Write(put_request, writer.get());
// Server CompletionQueue should get the client request and reply that the
// requested file cannot be created.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client should call finish to get final status messages for the transfer
// and gRPC channel.
- client_cq_->Next(&tag, &cq_status);
- writer_->Finish(&grpc_status, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+ writer->Finish(&grpc_status, writer.get());
// Client gets final status from server.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
op_status = put_response.status();
+
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
+
ASSERT_EQ(op_status, OperationStatus::SERVER_CREATE_FILE_FAILURE);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerPutCreateFileFailure) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// The server's call to check the destination will indicate that it is a
// directory.
PutCallData<FakePlatform>* server_call_data =
@@ -446,7 +421,7 @@
server_call_data->platform_interface_.SetOpenFileReturn(-1);
// Create components required to perform a client Put request.
- grpc::ClientContext client_ctx_;
+ grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
PutRequest put_request;
@@ -454,41 +429,40 @@
put_request.set_destination("/privilege/issues");
put_request.clear_data();
- std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer_ =
- stub_->AsyncPut(&client_ctx_, &put_response, client_cq_.get(), nullptr);
+ std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
+ stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
// Server CompletionQueue should notify that there is a new stub.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Wait for the request to go out, and then try to write more data to the
// server.
- client_cq_->Next(&tag, &cq_status);
- writer_->Write(put_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+ writer->Write(put_request, writer.get());
// Server CompletionQueue should get the client request and reply that the
// requested file cannot be created.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client should call finish to get final status messages for the transfer
// and gRPC channel.
- client_cq_->Next(&tag, &cq_status);
- writer_->Finish(&grpc_status, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+ writer->Finish(&grpc_status, writer.get());
// Client gets final status from server.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
op_status = put_response.status();
+
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
+
ASSERT_EQ(op_status, OperationStatus::SERVER_FILE_WRITE_FAILURE);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerPutWriteFileFailure) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// The server's call to check the destination will indicate that it is a
// directory.
PutCallData<FakePlatform>* server_call_data =
@@ -500,7 +474,7 @@
server_call_data->platform_interface_.SetWriteFileReturn(-1);
// Create components required to perform a client Put request.
- grpc::ClientContext client_ctx_;
+ grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
PutRequest put_request;
@@ -508,41 +482,40 @@
put_request.set_destination("/write/fail/path");
put_request.clear_data();
- std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer_ =
- stub_->AsyncPut(&client_ctx_, &put_response, client_cq_.get(), nullptr);
+ std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
+ stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
// Server CompletionQueue should notify that there is a new stub.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Wait for the request to go out, and then try to write more data to the
// server.
- client_cq_->Next(&tag, &cq_status);
- writer_->Write(put_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+ writer->Write(put_request, writer.get());
// Server CompletionQueue should get the client request and reply that the
// requested file cannot be written.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client should get the server's message and then call finish to get final
// status messages for the transfer and gRPC channel.
- client_cq_->Next(&tag, &cq_status);
- writer_->Finish(&grpc_status, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+ writer->Finish(&grpc_status, writer.get());
// Client gets final status from server.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
op_status = put_response.status();
+
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
+
ASSERT_EQ(op_status, OperationStatus::SERVER_FILE_WRITE_FAILURE);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerPutOneFragment) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// The server's call to check the destination will indicate that it is a
// directory.
PutCallData<FakePlatform>* server_call_data =
@@ -555,7 +528,7 @@
server_call_data->platform_interface_.SetCloseFileReturn(0);
// Create components required to perform a client Put request.
- grpc::ClientContext client_ctx_;
+ grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
PutRequest put_request;
@@ -563,51 +536,49 @@
put_request.set_destination("/destination/file");
put_request.clear_data();
- std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer_ =
- stub_->AsyncPut(&client_ctx_, &put_response, client_cq_.get(), nullptr);
+ std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
+ stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
// Server CompletionQueue should notify that there is a new stub.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Wait for the request to go out, and then try to write a fragment to the
// server.
- client_cq_->Next(&tag, &cq_status);
- writer_->Write(put_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+ writer->Write(put_request, writer.get());
// Server CompletionQueue should get the client request, write out the
// contents, and wait for more data.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client should call WritesDone to indicate that the transfer is complete.
- client_cq_->Next(&tag, &cq_status);
- writer_->WritesDone(nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+ writer->WritesDone(writer.get());
// Server should get the final message from the client and report status
// information.
- server_cq_->Next(&tag, &cq_status);
- ASSERT_FALSE(cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, false);
+ server_call_data->Proceed(false);
// Client should get the server's message and then call finish to get final
// status messages for the transfer and gRPC channel.
- client_cq_->Next(&tag, &cq_status);
- writer_->Finish(&grpc_status, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+ writer->Finish(&grpc_status, writer.get());
// Client gets final status from server.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
op_status = put_response.status();
+
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
+
ASSERT_EQ(op_status, OperationStatus::OK);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerPutMultipleFragments) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// The server's call to check the destination will indicate that it is a
// directory.
PutCallData<FakePlatform>* server_call_data =
@@ -620,7 +591,7 @@
server_call_data->platform_interface_.SetCloseFileReturn(0);
// Create components required to perform a client Put request.
- grpc::ClientContext client_ctx_;
+ grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
PutRequest put_request;
@@ -628,51 +599,55 @@
put_request.set_destination("/destination/file");
put_request.clear_data();
- std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer_ =
- stub_->AsyncPut(&client_ctx_, &put_response, client_cq_.get(), nullptr);
+ std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
+ stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
// Server CompletionQueue gets new client stub.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Wait for the request to go out, and then try to write a fragment to the
// server.
- client_cq_->Next(&tag, &cq_status);
- writer_->Write(put_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
+ writer->Write(put_request, writer.get());
// Server CompletionQueue should get the client request, write out the
// contents, and wait for more data.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Write a second fragment to the server.
- client_cq_->Next(&tag, &cq_status);
- writer_->Write(put_request, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+ writer->Write(put_request, writer.get());
// Server processes the second fragment.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client should get the server's message and then call WritesDone to
// indicate that the transfer is complete.
- client_cq_->Next(&tag, &cq_status);
- writer_->WritesDone(nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+ writer->WritesDone(writer.get());
// Server should get the final message from the client and report status
// information.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, false);
+ server_call_data->Proceed(false);
// Client should get the server's message and then call finish to get final
// status information.
- client_cq_->Next(&tag, &cq_status);
- writer_->Finish(&grpc_status, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
+ writer->Finish(&grpc_status, writer.get());
// Client gets final status from server.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
op_status = put_response.status();
+
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
+
ASSERT_EQ(op_status, OperationStatus::OK);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
// Server Exec State Machine Test Cases
@@ -683,11 +658,6 @@
// 5. Exec Write sends stdin/stderr to client until subprocess exits.
TEST_F(AsyncEndToEndTest, Server_Exec_ForkFail) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// The server's call to Exec should fail.
ExecCallData<FakePlatform>* server_call_data =
new ExecCallData<FakePlatform>(service_.get(), server_cq_.get());
@@ -697,7 +667,7 @@
std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
- cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), nullptr);
+ cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
// Queue up a read to be populated eventually when the server finishes. Use
// the address of the exec response so we can enforce that the read operation
@@ -707,63 +677,61 @@
// Server should get the new stub request and issue a read.
server_call_data->platform_interface_.SetParseCommandReturn({"echo", "hello"});
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client should be notified that its stub has been created.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
ExecRequest exec_request;
exec_request.clear_argv();
exec_request.clear_env_vars();
exec_request.clear_std_in();
- cli_rw->Write(exec_request, nullptr);
+ cli_rw->Write(exec_request, &exec_request);
// Server should get the write first
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Flush the write operation completion notification.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
// Ensure that the client queue has a new entry and that it is the response
// to the read request.
- client_cq_->Next(&tag, &cq_status);
- ASSERT_EQ(tag, &exec_response);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_response, true);
// The client calls Finish to get final status.
grpc::Status grpc_status;
- cli_rw->Finish(&grpc_status, nullptr);
+ cli_rw->Finish(&grpc_status, cli_rw.get());
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, cli_rw.get(), true);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
+
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
ASSERT_EQ(exec_response.status(), OperationStatus::SERVER_EXEC_FORK_FAILURE);
}
TEST_F(AsyncEndToEndTest, Server_ExecRead_StdinEof) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Exec service boilerplate.
std::shared_ptr<grpc::ServerContext> srv_ctx = std::make_shared<grpc::ServerContext>();
std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> srv_rw =
std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(srv_ctx.get());
- service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(), this);
+ service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(),
+ srv_ctx.get());
// Client creates a new stub.
std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
- cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), nullptr);
+ cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
// The read side of the server exec routine should see that the subprocess
// is still alive, but should fail to write to its stdin and then delete
// itself.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, srv_ctx.get(), true);
ExecReadCallData<FakePlatform>* server_call_data =
new ExecReadCallData<FakePlatform>(srv_ctx, srv_rw, 0, 0);
@@ -771,49 +739,45 @@
server_call_data->platform_interface_.SetWriteFileReturn(-1);
// Client sends something to subprocess stdin.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
ExecRequest exec_request;
exec_request.clear_argv();
exec_request.clear_env_vars();
exec_request.clear_std_in();
- cli_rw->Write(exec_request, nullptr);
- client_cq_->Next(&tag, &cq_status);
+ cli_rw->Write(exec_request, &exec_request);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
// The server will get the request, attempt to write to the subprocess stdin,
// fail, and delete itself. The only indication of the failure will be that
// the reference count to the ServerAsyncReaderWriter decrements.
uint32_t initial_use_count = srv_rw.use_count();
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
uint32_t final_use_count = srv_rw.use_count();
ASSERT_LT(final_use_count, initial_use_count);
}
TEST_F(AsyncEndToEndTest, Server_ExecRead_ClientDone) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Exec service boilerplate.
std::shared_ptr<grpc::ServerContext> srv_ctx = std::make_shared<grpc::ServerContext>();
std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> srv_rw =
std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(srv_ctx.get());
- service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(), this);
+ service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(),
+ srv_ctx.get());
// Client creates a new stub.
std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
- cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), nullptr);
+ cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
// The read side of the server exec routine should see that the subprocess
// is still alive and succeed in writing to it.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, srv_ctx.get(), true);
ExecReadCallData<FakePlatform>* server_call_data =
new ExecReadCallData<FakePlatform>(srv_ctx, srv_rw, 0, 0);
@@ -821,31 +785,32 @@
server_call_data->platform_interface_.SetWriteFileReturn(1);
// Client sends something to subprocess stdin.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
ExecRequest exec_request;
exec_request.clear_argv();
exec_request.clear_env_vars();
exec_request.clear_std_in();
- cli_rw->Write(exec_request, nullptr);
+ cli_rw->Write(exec_request, &exec_request);
// Server writes it into the subprocess.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// Client indicates that it is done writing.
- client_cq_->Next(&tag, &cq_status);
- cli_rw->WritesDone(nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
+ cli_rw->WritesDone(&exec_request);
// The server will get a false status from the completion queue and delete
// itself. The only indication of the failure will be that the reference
// count to the ServerAsyncReaderWriter decrements.
uint32_t initial_use_count = srv_rw.use_count();
- server_cq_->Next(&tag, &cq_status);
- ASSERT_FALSE(cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, false);
+ server_call_data->Proceed(false);
+
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
uint32_t final_use_count = srv_rw.use_count();
@@ -853,117 +818,108 @@
}
TEST_F(AsyncEndToEndTest, Server_ExecRead_SubprocessExits) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Exec service boilerplate.
std::shared_ptr<grpc::ServerContext> srv_ctx = std::make_shared<grpc::ServerContext>();
std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> srv_rw =
std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(srv_ctx.get());
- service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(), this);
+ service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(),
+ srv_ctx.get());
// Client creates a new stub.
std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
- cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), nullptr);
+ cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
// The read side of the server exec routine should see that the subprocess
// has exited.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, srv_ctx.get(), true);
ExecReadCallData<FakePlatform>* server_call_data =
new ExecReadCallData<FakePlatform>(srv_ctx, srv_rw, 0, 0);
server_call_data->platform_interface_.SetKillPidReturn(-1);
// Client sends something to subprocess stdin.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
ExecRequest exec_request;
exec_request.clear_argv();
exec_request.clear_env_vars();
exec_request.clear_std_in();
- cli_rw->Write(exec_request, nullptr);
+ cli_rw->Write(exec_request, &exec_request);
// The server will get the request, realize the subprocess has exited, and
// delete itself. The only indication of the failure will be that the
// reference count to the ServerAsyncReaderWriter decrements.
uint32_t initial_use_count = srv_rw.use_count();
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
uint32_t final_use_count = srv_rw.use_count();
ASSERT_LT(final_use_count, initial_use_count);
// Issue a finish on behalf of the server.
- srv_rw->Finish(grpc::Status::OK, nullptr);
- server_cq_->Next(&tag, &cq_status);
+ srv_rw->Finish(grpc::Status::OK, srv_rw.get());
+ ASSERT_GRPC_CQ_NEXT(server_cq_, srv_rw.get(), true);
// Cleanup the client reader-writer.
- client_cq_->Next(&tag, &cq_status);
- cli_rw->WritesDone(nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
+ cli_rw->WritesDone(&exec_request);
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
grpc::Status grpc_status;
- cli_rw->Finish(&grpc_status, nullptr);
+ cli_rw->Finish(&grpc_status, &exec_request);
- client_cq_->Next(&tag, &cq_status);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, Server_ExecWrite_WriteUntilChildExits) {
- ResetStub();
- // Accounting bits for managing CompletionQueue state.
- void* tag;
- bool cq_status;
-
// Exec service boilerplate.
std::shared_ptr<grpc::ServerContext> srv_ctx = std::make_shared<grpc::ServerContext>();
std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> srv_rw =
std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(srv_ctx.get());
- service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(), this);
+ service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(),
+ srv_ctx.get());
// Client creates a new stub.
std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
- cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), nullptr);
+ cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
// The write side of the server exec routine will poll the child pid and see
// that it has exited.
- server_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, srv_ctx.get(), true);
ExecWriteCallData<FakePlatform>* server_call_data =
new ExecWriteCallData<FakePlatform>(srv_ctx, srv_rw, 0, 0, 0);
// Client reads from the server.
- client_cq_->Next(&tag, &cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
ExecResponse exec_response;
- cli_rw->Read(&exec_response, nullptr);
+ cli_rw->Read(&exec_response, &exec_response);
// The server will finish and delete itself.
- server_cq_->Next(&tag, &cq_status);
- server_call_data->Proceed(cq_status);
+ ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
+ server_call_data->Proceed(true);
// The client will get the initial server write and issue another read.
- client_cq_->Next(&tag, &cq_status);
- cli_rw->Read(&exec_response, nullptr);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_response, true);
+ cli_rw->Read(&exec_response, &exec_response);
// The client should see that the server has finished and request the finish
// status.
- client_cq_->Next(&tag, &cq_status);
- ASSERT_FALSE(cq_status);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_response, false);
grpc::Status grpc_status;
- cli_rw->Finish(&grpc_status, nullptr);
- client_cq_->Next(&tag, &cq_status);
+ cli_rw->Finish(&grpc_status, &exec_response);
+ ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_response, true);
- ASSERT_TRUE(grpc_status.ok());
+ ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}