blob: 15340da45fd509e4208d8fcc3d7d4732aa07d996 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef SRC_VIRTUALIZATION_LIB_GUEST_INTERACTION_CLIENT_CLIENT_OPERATION_STATE_H_
#define SRC_VIRTUALIZATION_LIB_GUEST_INTERACTION_CLIENT_CLIENT_OPERATION_STATE_H_
#include <lib/fidl/cpp/binding.h>
#include <lib/syslog/cpp/macros.h>
#include <zircon/system/ulib/fit/include/lib/fit/function.h>
#include <grpc/support/log.h>
#include "fuchsia/netemul/guest/cpp/fidl.h"
#include "src/virtualization/lib/guest_interaction/common.h"
#include "src/virtualization/lib/guest_interaction/platform_interface/platform_interface.h"
#include "src/virtualization/lib/guest_interaction/proto/guest_interaction.grpc.pb.h"
#include <grpc++/grpc++.h>
using TransferCallback = fit::function<void(zx_status_t status)>;
static inline zx_status_t translate_rpc_status(OperationStatus status) {
switch (status) {
case OperationStatus::OK:
return ZX_OK;
case OperationStatus::GRPC_FAILURE:
return ZX_ERR_PEER_CLOSED;
case OperationStatus::CLIENT_MISSING_FILE_FAILURE:
return ZX_ERR_NOT_FOUND;
case OperationStatus::CLIENT_CREATE_FILE_FAILURE:
return ZX_ERR_ACCESS_DENIED;
case OperationStatus::CLIENT_FILE_READ_FAILURE:
return ZX_ERR_IO;
case OperationStatus::CLIENT_FILE_WRITE_FAILURE:
return ZX_ERR_IO;
case OperationStatus::SERVER_MISSING_FILE_FAILURE:
return ZX_ERR_NOT_FOUND;
case OperationStatus::SERVER_CREATE_FILE_FAILURE:
return ZX_ERR_ACCESS_DENIED;
case OperationStatus::SERVER_FILE_READ_FAILURE:
return ZX_ERR_IO;
case OperationStatus::SERVER_FILE_WRITE_FAILURE:
return ZX_ERR_IO;
case OperationStatus::SERVER_EXEC_COMMAND_PARSE_FAILURE:
return ZX_ERR_INVALID_ARGS;
case OperationStatus::SERVER_EXEC_FORK_FAILURE:
return ZX_ERR_INTERNAL;
default:
FX_LOGS(ERROR) << "Unknown gRPC transfer status: " << status;
return ZX_ERR_BAD_STATE;
}
}
// Manages the transfer of a file from the guest VM to the Fuchsia host.
//
// GetCallData will continually write new data from the guest into the
// specified destination location. When the gRPC channel is terminated, the
// termination status is queried and final status is reported through the
// caller-supplied callback.
template <class T>
class GetCallData final : public CallData {
public:
GetCallData<T>(int32_t fd, TransferCallback callback)
: status_(CREATE),
callback_(std::move(callback)),
fd_(fd),
exit_status_(OperationStatus::OK) {}
void Proceed(bool ok) override;
grpc::ClientContext ctx_;
std::unique_ptr<grpc::ClientAsyncReaderInterface<GetResponse>> reader_;
GetResponse response_;
T platform_interface_;
private:
enum CallStatus { CREATE, TRANSFER, FINISH };
CallStatus status_;
TransferCallback callback_;
int32_t fd_;
grpc::Status termination_status_;
OperationStatus exit_status_;
};
// Proceed is called when the completion queue signals that the most recent
// Read operation has completed and there is new data that can be processed.
//
// From the gRPC documentation for a client Read operation:
// `ok` indicates whether there is a valid message that got read. If not, you
// know that there are certainly no more messages that can ever be read from
// this stream. For the client-side operations, this only happens because the
// call is dead.
//
// The client attempts to write incoming data into the open file until gRPC
// indicates that the call is dead at which point it queries for final status
// and reports the transfer status back to the caller through the callback.
template <class T>
void GetCallData<T>::Proceed(bool ok) {
switch (status_) {
case CREATE:
if (!ok) {
reader_->Finish(&termination_status_, this);
exit_status_ = OperationStatus::GRPC_FAILURE;
status_ = FINISH;
return;
}
reader_->Read(&response_, this);
status_ = TRANSFER;
return;
case TRANSFER:
if (!ok) {
reader_->Finish(&termination_status_, this);
status_ = FINISH;
return;
}
if (response_.status() != OperationStatus::OK) {
exit_status_ = response_.status();
reader_->Finish(&termination_status_, this);
status_ = FINISH;
return;
}
if (platform_interface_.WriteFile(fd_, response_.data().c_str(), response_.data().size()) <
0) {
exit_status_ = OperationStatus::CLIENT_FILE_WRITE_FAILURE;
reader_->Finish(&termination_status_, this);
status_ = FINISH;
return;
}
reader_->Read(&response_, this);
status_ = TRANSFER;
return;
case FINISH:
platform_interface_.CloseFile(fd_);
if (ok || exit_status_ != OperationStatus::OK) {
callback_(translate_rpc_status(exit_status_));
} else {
callback_(translate_rpc_status(OperationStatus::GRPC_FAILURE));
}
delete this;
return;
}
}
template <class T>
class PutCallData final : public CallData {
public:
PutCallData(int32_t fd, std::string destination, TransferCallback callback)
: status_(TRANSFER),
destination_(destination),
callback_(std::move(callback)),
exit_status_(OperationStatus::OK),
fd_(fd) {}
void Proceed(bool ok) override;
grpc::ClientContext ctx_;
std::unique_ptr<grpc::ClientAsyncWriterInterface<PutRequest>> writer_;
PutResponse response_;
T platform_interface_;
private:
void Finish();
enum CallStatus { TRANSFER, END_TRANSFER, FINISH };
CallStatus status_;
std::string destination_;
TransferCallback callback_;
OperationStatus exit_status_;
int32_t fd_;
grpc::Status finish_status_;
};
template <class T>
void PutCallData<T>::Proceed(bool ok) {
// If the client gets a bad status while performing a streaming write, then
// the call is dead and no future messages will be sent.
if (!ok) {
exit_status_ = OperationStatus::GRPC_FAILURE;
Finish();
return;
}
switch (status_) {
case TRANSFER: {
PutRequest req;
req.set_destination(destination_);
char read_buf[CHUNK_SIZE];
ssize_t data_read = platform_interface_.ReadFile(fd_, read_buf, CHUNK_SIZE);
if (data_read < 0) {
if (data_read != -EAGAIN && data_read != -EWOULDBLOCK) {
// Read failed.
exit_status_ = OperationStatus::CLIENT_FILE_READ_FAILURE;
status_ = END_TRANSFER;
writer_->WritesDone(this);
return;
}
// Read would have caused to block, send empty data.
req.clear_data();
writer_->Write(req, this);
return;
}
if (data_read == 0) {
// Read hit EOF.
status_ = END_TRANSFER;
req.clear_data();
writer_->WritesDone(this);
return;
}
req.set_data(read_buf, data_read);
writer_->Write(req, this);
return;
}
case END_TRANSFER:
writer_->Finish(&finish_status_, this);
status_ = FINISH;
return;
case FINISH:
if (response_.status() != OperationStatus::OK) {
exit_status_ = response_.status();
}
Finish();
return;
}
}
template <class T>
void PutCallData<T>::Finish() {
if (fd_ > 0) {
platform_interface_.CloseFile(fd_);
}
callback_(translate_rpc_status(exit_status_));
delete this;
}
class ListenerInterface : fuchsia::netemul::guest::CommandListener {
public:
explicit ListenerInterface(fidl::InterfaceRequest<fuchsia::netemul::guest::CommandListener> req)
: binding_(this, std::move(req)) {}
void OnStarted(zx_status_t status) { binding_.events().OnStarted(status); }
void OnTerminated(zx_status_t status, int32_t ret_code) {
binding_.events().OnTerminated(status, ret_code);
}
private:
fidl::Binding<fuchsia::netemul::guest::CommandListener> binding_;
};
// Pump incoming stdin into the child process managed by the guest service.
template <class T>
class ExecWriteCallData final : public CallData {
public:
ExecWriteCallData(
const std::string& command, const std::vector<ExecEnv>& env, int32_t std_in,
const std::shared_ptr<grpc::ClientContext>& ctx,
const std::shared_ptr<grpc::ClientAsyncReaderWriterInterface<ExecRequest, ExecResponse>>& rw);
void Proceed(bool ok) override;
T platform_interface_;
private:
void Finish();
int32_t stdin_;
std::shared_ptr<grpc::ClientContext> ctx_;
std::shared_ptr<grpc::ClientAsyncReaderWriterInterface<ExecRequest, ExecResponse>> writer_;
enum CallStatus { WRITING, FINISH };
CallStatus status_;
};
template <class T>
class ExecReadCallData final : public CallData {
public:
ExecReadCallData(
int32_t std_out, int32_t std_err, const std::shared_ptr<grpc::ClientContext>& ctx,
const std::shared_ptr<grpc::ClientAsyncReaderWriterInterface<ExecRequest, ExecResponse>>& rw,
std::unique_ptr<ListenerInterface> listener);
void Proceed(bool ok) override;
T platform_interface_;
private:
void Finish();
int32_t stdout_;
int32_t stderr_;
std::shared_ptr<grpc::ClientContext> ctx_;
std::shared_ptr<grpc::ClientAsyncReaderWriterInterface<ExecRequest, ExecResponse>> reader_;
std::unique_ptr<ListenerInterface> listener_;
int32_t ret_val_;
ExecResponse response_;
OperationStatus operation_status_;
enum CallStatus { READ, FINISH };
CallStatus status_;
grpc::Status grpc_stream_status_;
};
template <class T>
class ExecCallData final : public CallData {
public:
ExecCallData(const std::string& command, const std::map<std::string, std::string>& env_vars,
int32_t std_in, int32_t std_out, int32_t std_err,
std::unique_ptr<ListenerInterface> listener);
void Proceed(bool ok) override;
std::shared_ptr<grpc::ClientContext> ctx_;
std::shared_ptr<grpc::ClientAsyncReaderWriterInterface<ExecRequest, ExecResponse>> rw_;
T platform_interface_;
private:
std::vector<ExecEnv> EnvMapToVector(const std::map<std::string, std::string>& env_vars);
int32_t stdin_;
int32_t stdout_;
int32_t stderr_;
std::unique_ptr<ListenerInterface> listener_;
std::string command_;
std::vector<ExecEnv> env_;
};
template <class T>
ExecWriteCallData<T>::ExecWriteCallData(
const std::string& command, const std::vector<ExecEnv>& env, int32_t std_in,
const std::shared_ptr<grpc::ClientContext>& ctx,
const std::shared_ptr<grpc::ClientAsyncReaderWriterInterface<ExecRequest, ExecResponse>>& rw)
: stdin_(std_in), ctx_(ctx), writer_(rw), status_(WRITING) {
// Send over the initial command request to get the child process
// running. stdin will be pumped once the first write request finishes.
ExecRequest exec_request;
exec_request.set_argv(command);
for (const ExecEnv& key_val : env) {
ExecEnv* new_env = exec_request.add_env_vars();
*new_env = key_val;
}
exec_request.clear_std_in();
writer_->Write(exec_request, this);
}
template <class T>
void ExecWriteCallData<T>::Proceed(bool ok) {
if (!ok) {
// gRPC has shut down the connection.
Finish();
return;
}
if (status_ != WRITING) {
GPR_ASSERT(status_ == FINISH);
Finish();
return;
}
char read_buf[CHUNK_SIZE];
ssize_t read_status = platform_interface_.ReadFile(stdin_, read_buf, CHUNK_SIZE);
if (read_status == -EAGAIN || read_status == -EWOULDBLOCK) {
// Reading would have caused blocking, so send back an empty message.
ExecRequest exec_request;
exec_request.clear_argv();
exec_request.clear_env_vars();
exec_request.clear_std_in();
writer_->Write(exec_request, this);
} else if (read_status <= 0) {
// Reading failed in an unexpected way. Notify client and finish.
writer_->WritesDone(this);
status_ = FINISH;
} else {
std::string new_stdin(read_buf, read_status);
ExecRequest exec_request;
exec_request.clear_argv();
exec_request.clear_env_vars();
exec_request.set_std_in(new_stdin);
writer_->Write(exec_request, this);
}
}
template <class T>
void ExecWriteCallData<T>::Finish() {
platform_interface_.CloseFile(stdin_);
delete this;
}
template <class T>
ExecReadCallData<T>::ExecReadCallData(
int32_t std_out, int32_t std_err, const std::shared_ptr<grpc::ClientContext>& ctx,
const std::shared_ptr<grpc::ClientAsyncReaderWriterInterface<ExecRequest, ExecResponse>>& rw,
std::unique_ptr<ListenerInterface> listener)
: stdout_(std_out),
stderr_(std_err),
ctx_(ctx),
reader_(rw),
listener_(std::move(listener)),
ret_val_(0),
status_(READ) {
reader_->Read(&response_, this);
}
template <class T>
void ExecReadCallData<T>::Proceed(bool ok) {
if (!ok) {
reader_->Finish(&grpc_stream_status_, this);
status_ = FINISH;
return;
}
if (status_ != READ) {
GPR_ASSERT(status_ == FINISH);
if (!grpc_stream_status_.ok() && operation_status_ == OperationStatus::OK) {
operation_status_ = OperationStatus::GRPC_FAILURE;
}
Finish();
listener_->OnTerminated(translate_rpc_status(operation_status_), ret_val_);
delete this;
return;
}
// Record the statuses at every report. The last responses will be
// passed as arguments to the supplied callback.
ret_val_ = response_.ret_code();
operation_status_ = response_.status();
std::string new_stdout = response_.std_out();
std::string new_stderr = response_.std_err();
platform_interface_.WriteFile(stdout_, new_stdout.c_str(), new_stdout.size());
platform_interface_.WriteFile(stderr_, new_stderr.c_str(), new_stderr.size());
reader_->Read(&response_, this);
}
template <class T>
void ExecReadCallData<T>::Finish() {
platform_interface_.CloseFile(stdout_);
platform_interface_.CloseFile(stderr_);
}
template <class T>
ExecCallData<T>::ExecCallData(const std::string& command,
const std::map<std::string, std::string>& env_vars, int32_t std_in,
int32_t std_out, int32_t std_err,
std::unique_ptr<ListenerInterface> listener)
: ctx_(std::make_shared<grpc::ClientContext>()),
stdin_(std_in),
stdout_(std_out),
stderr_(std_err),
listener_(std::move(listener)),
command_(std::move(command)),
env_(std::move(EnvMapToVector(env_vars))) {}
template <class T>
void ExecCallData<T>::Proceed(bool ok) {
if (!ok) {
platform_interface_.CloseFile(stdin_);
platform_interface_.CloseFile(stdout_);
platform_interface_.CloseFile(stderr_);
listener_->OnStarted(ZX_ERR_INTERNAL);
listener_->OnTerminated(translate_rpc_status(OperationStatus::GRPC_FAILURE), 0);
} else {
listener_->OnStarted(ZX_OK);
new ExecWriteCallData<T>(std::move(command_), std::move(env_), stdin_, ctx_, rw_);
new ExecReadCallData<T>(stdout_, stderr_, ctx_, rw_, std::move(listener_));
}
delete this;
}
template <class T>
std::vector<ExecEnv> ExecCallData<T>::EnvMapToVector(
const std::map<std::string, std::string>& env_vars) {
std::vector<ExecEnv> env;
for (auto const& [key, value] : env_vars) {
ExecEnv& env_var = env.emplace_back();
env_var.set_key(key);
env_var.set_value(value);
}
return env;
}
#endif // SRC_VIRTUALIZATION_LIB_GUEST_INTERACTION_CLIENT_CLIENT_OPERATION_STATE_H_