blob: 8657d3dcd3d5a2be6d036df5ef902f98e29f1c3d [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef SRC_VIRTUALIZATION_LIB_GUEST_INTERACTION_SERVER_SERVER_OPERATION_STATE_H_
#define SRC_VIRTUALIZATION_LIB_GUEST_INTERACTION_SERVER_SERVER_OPERATION_STATE_H_
#include <filesystem>
#include <grpc/support/log.h>
#include "src/lib/fxl/logging.h"
#include "src/virtualization/lib/guest_interaction/common.h"
#include "src/virtualization/lib/guest_interaction/platform_interface/platform_interface.h"
#include "src/virtualization/lib/guest_interaction/proto/guest_interaction.grpc.pb.h"
#include <grpc++/grpc++.h>
// Manages the transfer of a file from the guest VM to the host.
//
// When the client requests a file, the server sends a stream of messages
// containing the file's contents until either gRPC breaks or the server
// hits the end of the requested file.
template <class T>
class GetCallData final : public CallData {
public:
GetCallData(GuestInteractionService::AsyncService* service, grpc::ServerCompletionQueue* cq);
void Proceed(bool ok) override;
T platform_interface_;
private:
void TryRead();
void Finish();
GuestInteractionService::AsyncService* service_;
grpc::ServerCompletionQueue* cq_;
grpc::ServerContext ctx_;
grpc::ServerAsyncWriter<GetResponse> writer_;
GetRequest request_;
int32_t fd_;
enum CallStatus { CREATE, INITIATE_TRANSFER, TRANSFER, END_TRANSFER, FINISH };
CallStatus status_;
};
template <class T>
GetCallData<T>::GetCallData(GuestInteractionService::AsyncService* service,
grpc::ServerCompletionQueue* cq)
: service_(service), cq_(cq), writer_(&ctx_), fd_(0), status_(CREATE) {
Proceed(true);
}
// Tracks the state of a file transfer from the guest VM to the host.
//
// When instantiated, GetCallData immediately calls Proceed which calls
// RequestGet so that the server can handle incoming Get requests.
//
// The client specifies the source file in its initial request and then the
// server streams the file contents back to the guest until either the gRPC
// channel breaks (ok == false) or it hits EOF on the source file.
//
// According to the gRPC docs for a server Write operation:
// ok means that the data/metadata/status/etc is going to go to the wire. If it
// is false, it not going to the wire because the call is already dead (i.e.,
// canceled, deadline expired, other side dropped the channel, etc).
template <class T>
void GetCallData<T>::Proceed(bool ok) {
if (!ok) {
Finish();
return;
}
switch (status_) {
case CREATE:
status_ = INITIATE_TRANSFER;
service_->RequestGet(&ctx_, &request_, &writer_, cq_, cq_, this);
return;
case INITIATE_TRANSFER:
// Allow new GetRequest streams to be handled.
new GetCallData<T>(service_, cq_);
if (!platform_interface_.FileExists(request_.source())) {
GetResponse get_response;
get_response.clear_data();
get_response.set_status(OperationStatus::SERVER_MISSING_FILE_FAILURE);
writer_.Write(get_response, this);
status_ = END_TRANSFER;
return;
}
fd_ = platform_interface_.OpenFile(request_.source(), READ);
if (fd_ < 0) {
GetResponse get_response;
get_response.clear_data();
get_response.set_status(OperationStatus::SERVER_FILE_READ_FAILURE);
writer_.Write(get_response, this);
status_ = END_TRANSFER;
return;
}
status_ = TRANSFER;
TryRead();
return;
case TRANSFER:
TryRead();
return;
case END_TRANSFER:
writer_.Finish(grpc::Status::OK, this);
status_ = FINISH;
return;
case FINISH:
Finish();
return;
}
}
template <class T>
void GetCallData<T>::TryRead() {
GetResponse get_response;
char data_chunk[CHUNK_SIZE];
int32_t data_read = platform_interface_.ReadFile(fd_, data_chunk, CHUNK_SIZE);
if (data_read < 0) {
if (-data_read == EAGAIN || -data_read == EWOULDBLOCK) {
// Reading would have caused blocking, so send back an empty message.
get_response.set_status(OperationStatus::OK);
get_response.clear_data();
writer_.Write(get_response, this);
} else {
// Reading failed in an unexpected way. Notify client and finish.
get_response.set_status(OperationStatus::SERVER_FILE_READ_FAILURE);
get_response.clear_data();
writer_.Write(get_response, this);
status_ = END_TRANSFER;
}
} else if (data_read == 0) {
// Read size of 0 indicates EOF.
get_response.set_status(OperationStatus::OK);
get_response.clear_data();
writer_.Write(get_response, this);
status_ = END_TRANSFER;
} else {
get_response.set_status(OperationStatus::OK);
get_response.set_data(data_chunk, data_read);
writer_.Write(get_response, this);
}
}
template <class T>
void GetCallData<T>::Finish() {
if (fd_ > 0) {
platform_interface_.CloseFile(fd_);
}
delete this;
}
template <class T>
class PutCallData final : public CallData {
public:
PutCallData(GuestInteractionService::AsyncService* service, grpc::ServerCompletionQueue* cq)
: service_(service), cq_(cq), reader_(&ctx_), status_(CREATE), fd_(0) {
Proceed(true);
}
void Proceed(bool ok) override;
T platform_interface_;
private:
// Attempt to read the latest message from the client and write it into
// the output file. If the output file stream has gone into a bad state
// or the client has sent a final empty byte
void TryWrite();
void SendFinalStatus(OperationStatus status);
// gRPC async boilerplate
GuestInteractionService::AsyncService* service_;
grpc::ServerCompletionQueue* cq_;
grpc::ServerContext ctx_;
grpc::ServerAsyncReader<PutResponse, PutRequest> reader_;
enum CallStatus { CREATE, INITIATE_TRANSFER, TRANSFER, FINISH };
CallStatus status_;
int32_t fd_;
// File transfer bits
PutRequest new_request_;
};
template <class T>
void PutCallData<T>::Proceed(bool ok) {
switch (status_) {
case CREATE:
status_ = INITIATE_TRANSFER;
service_->RequestPut(&ctx_, &reader_, cq_, cq_, this);
return;
case INITIATE_TRANSFER:
// Allow new PutRequest streams to be handled.
new PutCallData(service_, cq_);
reader_.Read(&new_request_, this);
status_ = TRANSFER;
return;
case TRANSFER:
if (!ok) {
SendFinalStatus(OperationStatus::OK);
return;
}
TryWrite();
return;
case FINISH:
if (fd_ > 0) {
platform_interface_.CloseFile(fd_);
}
delete this;
return;
}
}
template <class T>
void PutCallData<T>::SendFinalStatus(OperationStatus status) {
PutResponse put_response;
put_response.set_status(status);
reader_.Finish(put_response, grpc::Status::OK, this);
status_ = FINISH;
}
template <class T>
void PutCallData<T>::TryWrite() {
if (fd_ == 0) {
std::string destination = new_request_.destination();
std::filesystem::path outpath = destination;
if (platform_interface_.DirectoryExists(destination) ||
(destination.length() > 0 && destination[destination.length() - 1] == '/')) {
// If the client provides the path to a directory, return a failure.
SendFinalStatus(OperationStatus::SERVER_CREATE_FILE_FAILURE);
return;
} else if (!platform_interface_.DirectoryExists(outpath.parent_path().string())) {
// If the client wants to send the file to a nonexistent directory,
// create it for them.
if (!platform_interface_.CreateDirectory(outpath.parent_path().string())) {
SendFinalStatus(OperationStatus::SERVER_CREATE_FILE_FAILURE);
return;
}
}
fd_ = platform_interface_.OpenFile(destination, WRITE);
}
if (fd_ < 0) {
SendFinalStatus(OperationStatus::SERVER_FILE_WRITE_FAILURE);
return;
}
if (platform_interface_.WriteFile(fd_, new_request_.data().c_str(),
new_request_.data().length()) < 0) {
SendFinalStatus(OperationStatus::SERVER_FILE_WRITE_FAILURE);
}
reader_.Read(&new_request_, this);
}
// The Exec operation provides a bidirectional streaming interface for the
// client. The client can stream input to the running exec-ed program's stdin
// and the server replies with a stdout/stderr stream and ultimately provides a
// return code.
//
// The requested program needs to initially be exec-ed and then the input and
// output streams must be continually serviced by read and write routines.
// There are some complications imposed by the limitations of the gRPC
// CompletionQueue.
// 1. Only one outstanding read operation can be queued for a given stream.
// 2. Only one outstanding write operation can be queued for a given stream.
// 3. The completion queue makes no distinction between read and write
// operations when they complete.
//
// To ensure that multiple Reads or Writes are not enqueued, read and write
// state must be tracked in separate state machines. An initial ExecCallData
// services incoming client requests. ExecCallData forks a process and hands
// off a stdin pipe to an ExecReadCallData and hands stdout and stderr pipes to
// an ExecWriteCallData. The ExecReadCallData and ExecWriteCallData each
// manage one direction of the data stream and enforce the single read/single
// write rule.
//
// The writer is ultimately responsible for reporting the final exit status to
// the client, so it is the responsibility of the writer to reap the pid of the
// child process.
// While a program is running under the supervision of the guest interaction
// daemon, the caller may continue to stream input to the programs stdin. The
// ExecReadCallData is tasked with consuming the incoming stream of caller
// stdin and writing it to the running program's stdin.
template <class T>
class ExecReadCallData final : public CallData {
public:
ExecReadCallData(
std::shared_ptr<grpc::ServerContext> ctx,
const std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>& rw,
int child_pid, int stdin_fd);
void Proceed(bool ok) override;
void Finish();
T platform_interface_;
private:
std::shared_ptr<grpc::ServerContext> ctx_;
std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> reader_;
int child_pid_;
int stdin_fd_;
ExecRequest request_;
};
// While a child process is running, ExecWriteCallData streams its stdin/stderr
// back to the client. When the child process exits, the ExecWriteCallData
// sends all remaining stdout/stderr back to the client along with the return
// code.
template <class T>
class ExecWriteCallData final : public CallData {
public:
ExecWriteCallData(
std::shared_ptr<grpc::ServerContext> ctx,
const std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>& rw,
int child_pid, int stdout_fd, int stderr_fd);
// Read out of stdin and stderr. Send any new information back to the
// client. If the program has exited, also include the exit code.
void Proceed(bool ok) override;
void Finish();
T platform_interface_;
private:
std::string ReadFd(int fd);
std::string DrainFd(int fd);
std::shared_ptr<grpc::ServerContext> ctx_;
std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> writer_;
int child_pid_;
int stdout_fd_;
int stderr_fd_;
enum CallStatus { WRITE, FINISH };
CallStatus status_;
};
template <class T>
class ExecCallData final : public CallData {
public:
ExecCallData(GuestInteractionService::AsyncService* service, grpc::ServerCompletionQueue* cq);
void Proceed(bool ok) override;
T platform_interface_;
private:
std::vector<std::string> CreateEnv(const ExecRequest& exec_request);
GuestInteractionService::AsyncService* service_;
grpc::ServerCompletionQueue* cq_;
std::shared_ptr<grpc::ServerContext> ctx_;
std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>> stream_;
ExecRequest exec_request_;
enum CallStatus { INITIATE_READ, FORK, FINISH_IN_ERROR };
CallStatus status_;
};
template <class T>
ExecReadCallData<T>::ExecReadCallData(
std::shared_ptr<grpc::ServerContext> ctx,
const std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>& rw,
int child_pid, int stdin_fd)
: ctx_(ctx), reader_(rw), child_pid_(child_pid), stdin_fd_(stdin_fd) {
reader_->Read(&request_, this);
}
template <class T>
void ExecReadCallData<T>::Proceed(bool ok) {
// If not ok, then the client has finished the write stream. Clean up
// the file descriptors and delete the reader.
if (!ok) {
Finish();
return;
}
int kill_response = platform_interface_.KillPid(child_pid_, 0);
if (kill_response != 0) {
// The child process no longer exists and any new stdin from the
// client is meaningless.
Finish();
return;
}
// The command, arguments, and environment variables can only be used
// when initially forking the child process, so they are meaningless
// when the child process is running. Only handle the stdin here.
int32_t write_status =
platform_interface_.WriteFile(stdin_fd_, request_.std_in().c_str(), request_.std_in().size());
if (write_status < 0) {
Finish();
return;
}
reader_->Read(&request_, this);
}
template <class T>
void ExecReadCallData<T>::Finish() {
platform_interface_.CloseFile(stdin_fd_);
delete this;
}
template <class T>
ExecWriteCallData<T>::ExecWriteCallData(
std::shared_ptr<grpc::ServerContext> ctx,
const std::shared_ptr<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>& rw,
int child_pid, int stdout_fd, int stderr_fd)
: ctx_(ctx),
writer_(rw),
child_pid_(child_pid),
stdout_fd_(stdout_fd),
stderr_fd_(stderr_fd),
status_(WRITE) {
Proceed(true);
}
template <class T>
void ExecWriteCallData<T>::Proceed(bool ok) {
if (!ok) {
platform_interface_.KillPid(child_pid_, 9);
int status;
platform_interface_.WaitPid(child_pid_, &status, 0);
Finish();
return;
}
if (status_ != WRITE) {
GPR_ASSERT(status_ == FINISH);
Finish();
return;
}
int status;
int32_t poll_pid = platform_interface_.WaitPid(child_pid_, &status, WNOHANG);
std::string std_out;
std::string std_err;
ExecResponse exec_response;
if (poll_pid != 0) {
std_out = DrainFd(stdout_fd_);
std_err = DrainFd(stderr_fd_);
exec_response.set_std_out(std_out);
exec_response.set_std_err(std_err);
exec_response.set_ret_code(WEXITSTATUS(status));
exec_response.set_status(OperationStatus::OK);
writer_->WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, this);
status_ = FINISH;
} else {
std_out = ReadFd(stdout_fd_);
std_err = ReadFd(stderr_fd_);
exec_response.set_std_out(std_out);
exec_response.set_std_err(std_err);
exec_response.clear_ret_code();
exec_response.clear_status();
writer_->Write(exec_response, this);
}
}
template <class T>
void ExecWriteCallData<T>::Finish() {
platform_interface_.CloseFile(stdout_fd_);
platform_interface_.CloseFile(stderr_fd_);
delete this;
}
template <class T>
std::string ExecWriteCallData<T>::ReadFd(int32_t fd) {
std::string out_string;
char read_buf[CHUNK_SIZE / 2];
int32_t bytes_read = platform_interface_.ReadFile(fd, read_buf, CHUNK_SIZE / 2 - 1);
if (bytes_read < 0) {
read_buf[0] = '\0';
} else {
read_buf[bytes_read] = '\0';
}
out_string = std::string(read_buf);
return out_string;
}
template <class T>
std::string ExecWriteCallData<T>::DrainFd(int32_t fd) {
std::string return_string = "";
while (true) {
std::string new_substring = ReadFd(fd);
if (new_substring.size() > 0) {
return_string += new_substring;
} else {
return return_string;
}
}
}
template <class T>
ExecCallData<T>::ExecCallData(GuestInteractionService::AsyncService* service,
grpc::ServerCompletionQueue* cq)
: service_(service), cq_(cq), status_(INITIATE_READ) {
// The ServerContext provides connection metadata to the streaming
// reader/writer. No context actually needs to be preserved, but
// ExecCallData is just the entry point to the exec process. Once the
// child is forked, the ExecCallData is deleted. To ensure that a valid
// ServerContext can always be read by the reader/writer, allocate one
// here and later pass a shared reference around to the read and write
// routines.
ctx_ = std::make_shared<grpc::ServerContext>();
stream_ = std::make_shared<grpc::ServerAsyncReaderWriter<ExecResponse, ExecRequest>>(ctx_.get());
service_->RequestExec(ctx_.get(), stream_.get(), cq_, cq_, this);
}
// Do one initial read to get the client's command. Upon receiving the
// first command, fork and hand off control to a dedicated reader/writer.
template <class T>
void ExecCallData<T>::Proceed(bool ok) {
if (status_ == INITIATE_READ) {
new ExecCallData(service_, cq_);
stream_->Read(&exec_request_, this);
status_ = FORK;
} else if (status_ != FORK) {
GPR_ASSERT(status_ == FINISH_IN_ERROR);
delete this;
return;
} else {
// Generate string forms of the argv and environment variables
std::vector<std::string> args = platform_interface_.ParseCommand(exec_request_.argv());
std::vector<std::string> env = CreateEnv(exec_request_);
// Repackage the string representations as C-strings
std::vector<char*> exec_args;
for (const std::string& arg : args) {
exec_args.push_back(const_cast<char*>(arg.c_str()));
}
exec_args.push_back(nullptr);
std::vector<char*> exec_env;
for (const std::string& env_pair : env) {
exec_env.push_back(const_cast<char*>(env_pair.c_str()));
}
exec_env.push_back(nullptr);
// Create the arguments to exec from the C-string vectors
char** args_ptr;
if (args.empty()) {
ExecResponse exec_response;
exec_response.clear_std_out();
exec_response.clear_std_err();
exec_response.clear_ret_code();
exec_response.set_status(OperationStatus::SERVER_EXEC_COMMAND_PARSE_FAILURE);
stream_->WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, this);
status_ = FINISH_IN_ERROR;
return;
}
args_ptr = exec_args.data();
char** env_ptr;
if (exec_env.empty()) {
env_ptr = nullptr;
} else {
env_ptr = exec_env.data();
}
// File descriptors to be populated when exec-ing
int32_t std_in;
int32_t std_out;
int32_t std_err;
// fork and exec
int32_t child_pid = platform_interface_.Exec(args_ptr, env_ptr, &std_in, &std_out, &std_err);
if (child_pid < 0) {
ExecResponse exec_response;
exec_response.clear_std_out();
exec_response.clear_std_err();
exec_response.clear_ret_code();
exec_response.set_status(OperationStatus::SERVER_EXEC_FORK_FAILURE);
stream_->WriteAndFinish(exec_response, grpc::WriteOptions(), grpc::Status::OK, this);
platform_interface_.CloseFile(std_in);
platform_interface_.CloseFile(std_out);
platform_interface_.CloseFile(std_err);
status_ = FINISH_IN_ERROR;
return;
}
// Set read FD's to nonblocking mode.
platform_interface_.SetFileNonblocking(std_out);
platform_interface_.SetFileNonblocking(std_err);
// If the client specified any stdin, write that into the stdin FD.
platform_interface_.WriteFile(std_in, exec_request_.std_in().c_str(),
exec_request_.std_in().size());
new ExecReadCallData<T>(ctx_, stream_, child_pid, std_in);
new ExecWriteCallData<T>(ctx_, stream_, child_pid, std_out, std_err);
delete this;
return;
}
}
template <class T>
std::vector<std::string> ExecCallData<T>::CreateEnv(const ExecRequest& exec_request) {
std::vector<std::string> env;
for (const ExecEnv& env_var : exec_request.env_vars()) {
std::string env_pair = env_var.key() + "=" + env_var.value();
env.push_back(env_pair);
}
return env;
}
#endif // SRC_VIRTUALIZATION_LIB_GUEST_INTERACTION_SERVER_SERVER_OPERATION_STATE_H_