blob: d32a67df6e1c51cfd2384fdc3d164d3011d468d6 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/syslog/cpp/macros.h>
#include <grpc/support/log.h>
#include <gtest/gtest.h>
#include "src/virtualization/lib/guest_interaction/server/server_operation_state.h"
#include "src/virtualization/lib/guest_interaction/test/operation_test_lib.h"
#include <grpc++/grpc++.h>
// Server Get State Machine Test Cases
//
// 1. Requested file does not exist.
// 2. Server fails to open the requested file.
// 3. Requested file is below the fragmentation size.
// 4. Requested file is above the fragmentation size.
// 5. The file the server is reading from goes into a bad state.
TEST_F(AsyncEndToEndTest, ServerMissingFile) {
// The server's call to check if the requested file exists will return false.
GetCallData<FakePlatform>* server_call_data =
new GetCallData<FakePlatform>(service_.get(), server_cq_.get());
server_call_data->platform_interface_.SetFileExistsReturn(false);
// Create components required to perform a client Get request.
grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
GetResponse get_response;
GetRequest get_request;
get_request.set_source("/some/bogus/path");
std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
// Wait for the request to go out, and then request to read from the server.
ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
reader->Read(&get_response, reader.get());
// Server CompletionQueue should get the client request and reply that the
// requested file does not exist.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client should get the server's message and then wait for the server to
// call Finish.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
op_status = get_response.status();
ASSERT_EQ(op_status, OperationStatus::SERVER_MISSING_FILE_FAILURE);
reader->Finish(&grpc_status, reader.get());
// Server finishes.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client gets final status from server.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerFileOpenFailure) {
// The server's call to check if the requested file exists will return false.
GetCallData<FakePlatform>* server_call_data =
new GetCallData<FakePlatform>(service_.get(), server_cq_.get());
server_call_data->platform_interface_.SetFileExistsReturn(true);
server_call_data->platform_interface_.SetOpenFileReturn(-1);
// Create components required to perform a client Get request.
grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
GetResponse get_response;
GetRequest get_request;
get_request.set_source("/file/with/permissions/issues");
std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
// Wait for the request to go out, and then request to read from the server.
ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
reader->Read(&get_response, reader.get());
// Server CompletionQueue should get the client request and reply that the
// requested file does not exist.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client should get the server's message and then wait for the server to
// call Finish.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
op_status = get_response.status();
ASSERT_EQ(op_status, OperationStatus::SERVER_FILE_READ_FAILURE);
reader->Finish(&grpc_status, reader.get());
// Server finishes.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client gets final status from server.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerUnfragmentedRead) {
// The server will be told that the file exists and that it was able to open
// open it. It will be notified that the file is not bad, and EOF will be
// false the first time and then true the second. The call to read from the
// file will return empty.
GetCallData<FakePlatform>* server_call_data =
new GetCallData<FakePlatform>(service_.get(), server_cq_.get());
server_call_data->platform_interface_.SetFileExistsReturn(true);
server_call_data->platform_interface_.SetOpenFileReturn(1);
server_call_data->platform_interface_.SetReadFileContents("test");
// Create components required to perform a client Get request.
grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
GetResponse get_response;
GetRequest get_request;
get_request.set_source("/some/test/file");
std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
// Wait for the request to go out, and then request to read from the server.
ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
reader->Read(&get_response, reader.get());
// Server CompletionQueue should get the client request, open the file, and
// send back the first chunk of contents.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client should get the server's message and then wait for more data.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
ASSERT_STREQ(get_response.data().c_str(), "test");
op_status = get_response.status();
reader->Read(&get_response, reader.get());
// The server should hit EOF and send an empty chunk of data back to the
// client.
server_call_data->platform_interface_.SetReadFileContents("");
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
op_status = get_response.status();
reader->Read(&get_response, reader.get());
// The server should then call Finish.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// The client read should fail and then it should finish.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), false);
reader->Finish(&grpc_status, reader.get());
// Client gets final status from server.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerFragmentedRead) {
// The server will be told that the file exists and that it was able to open
// open it. It will be notified that the file is not bad, and EOF will be
// false the first time and then true the second. The call to read from the
// file will return empty.
GetCallData<FakePlatform>* server_call_data =
new GetCallData<FakePlatform>(service_.get(), server_cq_.get());
server_call_data->platform_interface_.SetFileExistsReturn(true);
server_call_data->platform_interface_.SetOpenFileReturn(1);
server_call_data->platform_interface_.SetReadFileContents("test");
// Create components required to perform a client Get request.
grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
GetResponse get_response;
GetRequest get_request;
get_request.set_source("/some/test/file");
std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
// Wait for the request to go out, and then request to read from the server.
ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
reader->Read(&get_response, reader.get());
// Server CompletionQueue should get the client request, open the file, and
// send back the first chunk of contents.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client should get the server's message and then wait for more data.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
ASSERT_EQ(get_response.data(), std::string("test"));
op_status = get_response.status();
reader->Read(&get_response, reader.get());
// Repeat the process.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
ASSERT_EQ(get_response.data(), std::string("test"));
op_status = get_response.status();
reader->Read(&get_response, reader.get());
// The server should hit EOF and send an empty chunk of data back to the
// client.
server_call_data->platform_interface_.SetReadFileContents("");
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
op_status = get_response.status();
reader->Read(&get_response, reader.get());
// The server should then call Finish.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// The client read should fail and then it should finish.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), false);
reader->Finish(&grpc_status, reader.get());
// Client gets final status from server.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerBadFile) {
// The server will be told that the file exists and that it was able to open
// open it. When the server goes to read from the file, it will be informed
// that the file is bad.
GetCallData<FakePlatform>* server_call_data =
new GetCallData<FakePlatform>(service_.get(), server_cq_.get());
server_call_data->platform_interface_.SetFileExistsReturn(true);
server_call_data->platform_interface_.SetOpenFileReturn(1);
server_call_data->platform_interface_.SetReadFileReturn(-1);
// Create components required to perform a client Get request.
grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
GetResponse get_response;
GetRequest get_request;
get_request.set_source("/some/test/file");
std::unique_ptr<grpc::ClientAsyncReader<GetResponse>> reader =
stub_->AsyncGet(&client_ctx, get_request, client_cq_.get(), &client_ctx);
// Wait for the request to go out and then request to read from the server.
ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
reader->Read(&get_response, reader.get());
// Server CompletionQueue should get the client request, open the file, and
// send back a response indicating the read failed.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client should get the server's message and then wait for more data.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
op_status = get_response.status();
reader->Read(&get_response, reader.get());
// The server should finish.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// The client will get back a bad status and finish.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), false);
op_status = get_response.status();
ASSERT_EQ(op_status, OperationStatus::SERVER_FILE_READ_FAILURE);
reader->Finish(&grpc_status, reader.get());
// Client gets final status from server.
ASSERT_GRPC_CQ_NEXT(client_cq_, reader.get(), true);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
// Server Put State Machine Test
//
// 1. Destination is a directory.
// 2. Server fails to create destination directory.
// 3. Server fails to open destination file.
// 4. Server fails to write to the destination file.
// 5. Client sends one file fragment and then finishes.
// 6. Client sends multiple file fragments.
TEST_F(AsyncEndToEndTest, ServerPutDestinationIsDirectory) {
// The server's call to check the destination will indicate that it is a
// directory.
PutCallData<FakePlatform>* server_call_data =
new PutCallData<FakePlatform>(service_.get(), server_cq_.get());
server_call_data->platform_interface_.SetDirectoryExistsReturn(true);
// Create components required to perform a client Put request.
grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
PutRequest put_request;
PutResponse put_response;
put_request.set_destination("/some/directory");
put_request.clear_data();
std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
// Server should get initial client connection and begin reading.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Wait for the request to go out, and then try to write more data to the
// server.
ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
writer->Write(put_request, writer.get());
// Server CompletionQueue should get the client request and reply that the
// requested file cannot be created.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client will write the entire file even if the server is not listening.
// Call Finish to collect the final status.
// Bug: https://github.com/grpc/grpc/issues/14812
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
writer->Finish(&grpc_status, writer.get());
// Client gets final status from server.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
op_status = put_response.status();
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_EQ(op_status, OperationStatus::SERVER_CREATE_FILE_FAILURE);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerPutCreateDirectoryFailure) {
// The server's call to check the destination will indicate that it is a
// directory.
PutCallData<FakePlatform>* server_call_data =
new PutCallData<FakePlatform>(service_.get(), server_cq_.get());
server_call_data->platform_interface_.SetDirectoryExistsReturn(false);
server_call_data->platform_interface_.SetCreateDirectoryReturn(false);
// Create components required to perform a client Put request.
grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
PutRequest put_request;
PutResponse put_response;
put_request.set_destination("/privilege/issues");
put_request.clear_data();
std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
// Server CompletionQueue will notify that there is a new client stub.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Wait for the request to go out, and then try to write more data to the
// server.
ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
writer->Write(put_request, writer.get());
// Server CompletionQueue should get the client request and reply that the
// requested file cannot be created.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client should call finish to get final status messages for the transfer
// and gRPC channel.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
writer->Finish(&grpc_status, writer.get());
// Client gets final status from server.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
op_status = put_response.status();
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_EQ(op_status, OperationStatus::SERVER_CREATE_FILE_FAILURE);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerPutCreateFileFailure) {
// The server's call to check the destination will indicate that it is a
// directory.
PutCallData<FakePlatform>* server_call_data =
new PutCallData<FakePlatform>(service_.get(), server_cq_.get());
server_call_data->platform_interface_.SetDirectoryExistsReturn(false);
server_call_data->platform_interface_.SetCreateDirectoryReturn(true);
server_call_data->platform_interface_.SetOpenFileReturn(-1);
// Create components required to perform a client Put request.
grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
PutRequest put_request;
PutResponse put_response;
put_request.set_destination("/privilege/issues");
put_request.clear_data();
std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
// Server CompletionQueue should notify that there is a new stub.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Wait for the request to go out, and then try to write more data to the
// server.
ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
writer->Write(put_request, writer.get());
// Server CompletionQueue should get the client request and reply that the
// requested file cannot be created.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client should call finish to get final status messages for the transfer
// and gRPC channel.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
writer->Finish(&grpc_status, writer.get());
// Client gets final status from server.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
op_status = put_response.status();
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_EQ(op_status, OperationStatus::SERVER_FILE_WRITE_FAILURE);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerPutWriteFileFailure) {
// The server's call to check the destination will indicate that it is a
// directory.
PutCallData<FakePlatform>* server_call_data =
new PutCallData<FakePlatform>(service_.get(), server_cq_.get());
server_call_data->platform_interface_.SetDirectoryExistsReturn(false);
server_call_data->platform_interface_.SetCreateDirectoryReturn(true);
server_call_data->platform_interface_.SetOpenFileReturn(1);
server_call_data->platform_interface_.SetWriteFileReturn(-1);
// Create components required to perform a client Put request.
grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
PutRequest put_request;
PutResponse put_response;
put_request.set_destination("/write/fail/path");
put_request.clear_data();
std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
// Server CompletionQueue should notify that there is a new stub.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Wait for the request to go out, and then try to write more data to the
// server.
ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
writer->Write(put_request, writer.get());
// Server CompletionQueue should get the client request and reply that the
// requested file cannot be written.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client should get the server's message and then call finish to get final
// status messages for the transfer and gRPC channel.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
writer->Finish(&grpc_status, writer.get());
// Client gets final status from server.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
op_status = put_response.status();
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_EQ(op_status, OperationStatus::SERVER_FILE_WRITE_FAILURE);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerPutOneFragment) {
// The server's call to check the destination will indicate that it is a
// directory.
PutCallData<FakePlatform>* server_call_data =
new PutCallData<FakePlatform>(service_.get(), server_cq_.get());
server_call_data->platform_interface_.SetDirectoryExistsReturn(false);
server_call_data->platform_interface_.SetCreateDirectoryReturn(true);
server_call_data->platform_interface_.SetOpenFileReturn(1);
server_call_data->platform_interface_.SetWriteFileReturn(1);
server_call_data->platform_interface_.SetCloseFileReturn(0);
// Create components required to perform a client Put request.
grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
PutRequest put_request;
PutResponse put_response;
put_request.set_destination("/destination/file");
put_request.clear_data();
std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
// Server CompletionQueue should notify that there is a new stub.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Wait for the request to go out, and then try to write a fragment to the
// server.
ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
writer->Write(put_request, writer.get());
// Server CompletionQueue should get the client request, write out the
// contents, and wait for more data.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client should call WritesDone to indicate that the transfer is complete.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
writer->WritesDone(writer.get());
// Server should get the final message from the client and report status
// information.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, false);
server_call_data->Proceed(false);
// Client should get the server's message and then call finish to get final
// status messages for the transfer and gRPC channel.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
writer->Finish(&grpc_status, writer.get());
// Client gets final status from server.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
op_status = put_response.status();
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_EQ(op_status, OperationStatus::OK);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, ServerPutMultipleFragments) {
// The server's call to check the destination will indicate that it is a
// directory.
PutCallData<FakePlatform>* server_call_data =
new PutCallData<FakePlatform>(service_.get(), server_cq_.get());
server_call_data->platform_interface_.SetDirectoryExistsReturn(false);
server_call_data->platform_interface_.SetCreateDirectoryReturn(true);
server_call_data->platform_interface_.SetOpenFileReturn(1);
server_call_data->platform_interface_.SetWriteFileReturn(1);
server_call_data->platform_interface_.SetCloseFileReturn(0);
// Create components required to perform a client Put request.
grpc::ClientContext client_ctx;
grpc::Status grpc_status;
OperationStatus op_status;
PutRequest put_request;
PutResponse put_response;
put_request.set_destination("/destination/file");
put_request.clear_data();
std::unique_ptr<grpc::ClientAsyncWriter<PutRequest>> writer =
stub_->AsyncPut(&client_ctx, &put_response, client_cq_.get(), &client_ctx);
// Server CompletionQueue gets new client stub.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Wait for the request to go out, and then try to write a fragment to the
// server.
ASSERT_GRPC_CQ_NEXT(client_cq_, &client_ctx, true);
writer->Write(put_request, writer.get());
// Server CompletionQueue should get the client request, write out the
// contents, and wait for more data.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Write a second fragment to the server.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
writer->Write(put_request, writer.get());
// Server processes the second fragment.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client should get the server's message and then call WritesDone to
// indicate that the transfer is complete.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
writer->WritesDone(writer.get());
// Server should get the final message from the client and report status
// information.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, false);
server_call_data->Proceed(false);
// Client should get the server's message and then call finish to get final
// status information.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
writer->Finish(&grpc_status, writer.get());
// Client gets final status from server.
ASSERT_GRPC_CQ_NEXT(client_cq_, writer.get(), true);
op_status = put_response.status();
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_EQ(op_status, OperationStatus::OK);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
// Server Exec State Machine Test Cases
// 1. Server fails to create subprocess.
// 2. Exec Read fails to write to subprocess stdin.
// 3. Exec Read finishes when client is done writing to stdin.
// 4. Exec Read finishes when child process has exited.
// 5. Exec Write sends stdin/stderr to client until subprocess exits.
TEST_F(AsyncEndToEndTest, Server_Exec_ForkFail) {
// The server's call to Exec should fail.
ExecCallData<FakePlatform>* server_call_data =
new ExecCallData<FakePlatform>(service_.get(), server_cq_.get());
server_call_data->platform_interface_.SetExecReturn(-1);
// Client creates a new stub.
std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
// Queue up a read to be populated eventually when the server finishes. Use
// the address of the exec response so we can enforce that the read operation
// has been performed later in the test.
ExecResponse exec_response;
cli_rw->Read(&exec_response, &exec_response);
// Server should get the new stub request and issue a read.
server_call_data->platform_interface_.SetParseCommandReturn({"echo", "hello"});
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client should be notified that its stub has been created.
ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
ExecRequest exec_request;
exec_request.clear_argv();
exec_request.clear_env_vars();
exec_request.clear_std_in();
cli_rw->Write(exec_request, &exec_request);
// Server should get the write first
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Flush the write operation completion notification.
ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
// Ensure that the client queue has a new entry and that it is the response
// to the read request.
ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_response, true);
// The client calls Finish to get final status.
grpc::Status grpc_status;
cli_rw->Finish(&grpc_status, cli_rw.get());
ASSERT_GRPC_CQ_NEXT(client_cq_, cli_rw.get(), true);
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
ASSERT_EQ(exec_response.status(), OperationStatus::SERVER_EXEC_FORK_FAILURE);
}
TEST_F(AsyncEndToEndTest, Server_ExecRead_StdinEof) {
// Exec service boilerplate.
std::shared_ptr<grpc::ServerContext> srv_ctx = std::make_shared<grpc::ServerContext>();
std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> srv_rw =
std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(srv_ctx.get());
service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(),
srv_ctx.get());
// Client creates a new stub.
std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
// The read side of the server exec routine should see that the subprocess
// is still alive, but should fail to write to its stdin and then delete
// itself.
ASSERT_GRPC_CQ_NEXT(server_cq_, srv_ctx.get(), true);
ExecReadCallData<FakePlatform>* server_call_data =
new ExecReadCallData<FakePlatform>(srv_ctx, srv_rw, 0, 0);
server_call_data->platform_interface_.SetKillPidReturn(0);
server_call_data->platform_interface_.SetWriteFileReturn(-1);
// Client sends something to subprocess stdin.
ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
ExecRequest exec_request;
exec_request.clear_argv();
exec_request.clear_env_vars();
exec_request.clear_std_in();
cli_rw->Write(exec_request, &exec_request);
ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
// The server will get the request, attempt to write to the subprocess stdin,
// fail, and delete itself. The only indication of the failure will be that
// the reference count to the ServerAsyncReaderWriter decrements.
uint32_t initial_use_count = srv_rw.use_count();
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
uint32_t final_use_count = srv_rw.use_count();
ASSERT_LT(final_use_count, initial_use_count);
}
TEST_F(AsyncEndToEndTest, Server_ExecRead_ClientDone) {
// Exec service boilerplate.
std::shared_ptr<grpc::ServerContext> srv_ctx = std::make_shared<grpc::ServerContext>();
std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> srv_rw =
std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(srv_ctx.get());
service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(),
srv_ctx.get());
// Client creates a new stub.
std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
// The read side of the server exec routine should see that the subprocess
// is still alive and succeed in writing to it.
ASSERT_GRPC_CQ_NEXT(server_cq_, srv_ctx.get(), true);
ExecReadCallData<FakePlatform>* server_call_data =
new ExecReadCallData<FakePlatform>(srv_ctx, srv_rw, 0, 0);
server_call_data->platform_interface_.SetKillPidReturn(0);
server_call_data->platform_interface_.SetWriteFileReturn(1);
// Client sends something to subprocess stdin.
ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
ExecRequest exec_request;
exec_request.clear_argv();
exec_request.clear_env_vars();
exec_request.clear_std_in();
cli_rw->Write(exec_request, &exec_request);
// Server writes it into the subprocess.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// Client indicates that it is done writing.
ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
cli_rw->WritesDone(&exec_request);
// The server will get a false status from the completion queue and delete
// itself. The only indication of the failure will be that the reference
// count to the ServerAsyncReaderWriter decrements.
uint32_t initial_use_count = srv_rw.use_count();
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, false);
server_call_data->Proceed(false);
ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
uint32_t final_use_count = srv_rw.use_count();
ASSERT_LT(final_use_count, initial_use_count);
}
TEST_F(AsyncEndToEndTest, Server_ExecRead_SubprocessExits) {
// Exec service boilerplate.
std::shared_ptr<grpc::ServerContext> srv_ctx = std::make_shared<grpc::ServerContext>();
std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> srv_rw =
std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(srv_ctx.get());
service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(),
srv_ctx.get());
// Client creates a new stub.
std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
// The read side of the server exec routine should see that the subprocess
// has exited.
ASSERT_GRPC_CQ_NEXT(server_cq_, srv_ctx.get(), true);
ExecReadCallData<FakePlatform>* server_call_data =
new ExecReadCallData<FakePlatform>(srv_ctx, srv_rw, 0, 0);
server_call_data->platform_interface_.SetKillPidReturn(-1);
// Client sends something to subprocess stdin.
ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
ExecRequest exec_request;
exec_request.clear_argv();
exec_request.clear_env_vars();
exec_request.clear_std_in();
cli_rw->Write(exec_request, &exec_request);
// The server will get the request, realize the subprocess has exited, and
// delete itself. The only indication of the failure will be that the
// reference count to the ServerAsyncReaderWriter decrements.
uint32_t initial_use_count = srv_rw.use_count();
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
uint32_t final_use_count = srv_rw.use_count();
ASSERT_LT(final_use_count, initial_use_count);
// Issue a finish on behalf of the server.
srv_rw->Finish(grpc::Status::OK, srv_rw.get());
ASSERT_GRPC_CQ_NEXT(server_cq_, srv_rw.get(), true);
// Cleanup the client reader-writer.
ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
cli_rw->WritesDone(&exec_request);
ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
grpc::Status grpc_status;
cli_rw->Finish(&grpc_status, &exec_request);
ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_request, true);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}
TEST_F(AsyncEndToEndTest, Server_ExecWrite_WriteUntilChildExits) {
// Exec service boilerplate.
std::shared_ptr<grpc::ServerContext> srv_ctx = std::make_shared<grpc::ServerContext>();
std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> srv_rw =
std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(srv_ctx.get());
service_->RequestExec(srv_ctx.get(), srv_rw.get(), server_cq_.get(), server_cq_.get(),
srv_ctx.get());
// Client creates a new stub.
std::unique_ptr<grpc::ClientContext> cli_ctx = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncReaderWriter<ExecRequest, ExecResponse>> cli_rw;
cli_rw = stub_->AsyncExec(cli_ctx.get(), client_cq_.get(), cli_ctx.get());
// The write side of the server exec routine will poll the child pid and see
// that it has exited.
ASSERT_GRPC_CQ_NEXT(server_cq_, srv_ctx.get(), true);
ExecWriteCallData<FakePlatform>* server_call_data =
new ExecWriteCallData<FakePlatform>(srv_ctx, srv_rw, 0, 0, 0);
// Client reads from the server.
ASSERT_GRPC_CQ_NEXT(client_cq_, cli_ctx.get(), true);
ExecResponse exec_response;
cli_rw->Read(&exec_response, &exec_response);
// The server will finish and delete itself.
ASSERT_GRPC_CQ_NEXT(server_cq_, server_call_data, true);
server_call_data->Proceed(true);
// The client will get the initial server write and issue another read.
ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_response, true);
cli_rw->Read(&exec_response, &exec_response);
// The client should see that the server has finished and request the finish
// status.
ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_response, false);
grpc::Status grpc_status;
cli_rw->Finish(&grpc_status, &exec_response);
ASSERT_GRPC_CQ_NEXT(client_cq_, &exec_response, true);
ASSERT_TRUE(grpc_status.ok()) << grpc_status.error_message();
}