blob: 58988d7d316bba12689b58ce1926cf2f4ebbd14f [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/sys/fuzzing/common/child-process.h"
#include <lib/async/cpp/executor.h>
#include <lib/fdio/spawn.h>
#include <lib/syslog/cpp/macros.h>
#include <poll.h>
#include <stdio.h>
#include <unistd.h>
#include <zircon/process.h>
#include <zircon/processargs.h>
#include <zircon/status.h>
#include <algorithm>
#include "src/lib/files/eintr_wrapper.h"
namespace fuzzing {
namespace {
constexpr const size_t kBufSize = 0x400;
zx_status_t ReadAndSend(int fd, AsyncSender<std::string> sender) {
if (fd < 0) {
FX_LOGS(ERROR) << "Invalid fd: " << fd;
return ZX_ERR_INVALID_ARGS;
}
std::array<char, kBufSize> buf;
auto start = buf.begin();
auto end = start;
std::string line;
while (true) {
// Has data; repeatedly send strings up to a newline.
while (start != end) {
auto newline = std::find(start, end, '\n');
if (newline == end) {
break;
}
line += std::string(&*start, newline - start);
start = newline + 1;
// Forward the data. If the receiver closes; just keeping draining the pipe.
if (auto status = sender.Send(std::move(line));
status != ZX_OK && status != ZX_ERR_PEER_CLOSED) {
return status;
}
line.clear();
}
// Need more data. First move any remaining data to the start of the buffer.
if (start != buf.begin()) {
auto tmp = start;
start = buf.begin();
memmove(&*start, &*tmp, end - tmp);
end -= tmp - start;
} else if (end == buf.end()) {
// A log line filled the buffer. Add it to `line` and keep going.
line += std::string(&*start, end - start);
end = start;
}
// Now try to read more data from the file descriptor.
auto bytes_read = HANDLE_EINTR(read(fd, &*end, buf.end() - end));
if (bytes_read < 0) {
if (errno == EBADF) {
// Stream was closed because process exited.
return ZX_ERR_PEER_CLOSED;
}
FX_LOGS(ERROR) << "Failed to read output from process (fd=" << fd << "): " << strerror(errno);
return ZX_ERR_IO;
}
if (bytes_read == 0) {
// File descriptor is closed.just send whatever's left.
if (start != end) {
line += std::string(&*start, end - start);
if (auto status = sender.Send(std::move(line)); status != ZX_OK) {
return status;
}
}
return ZX_ERR_PEER_CLOSED;
}
end += bytes_read;
}
}
} // namespace
ChildProcess::ChildProcess(ExecutorPtr executor) : executor_(std::move(executor)) { Reset(); }
ChildProcess::~ChildProcess() { KillSync(); }
zx_status_t ChildProcess::AddArg(std::string_view arg) {
static const char* kPkgPrefix = "/pkg/";
auto arg_len = (args_.empty() ? strlen(kPkgPrefix) : 0) + arg.size();
if (auto status = ReserveBytes(arg_len); status != ZX_OK) {
FX_LOGS(WARNING) << "Failed to add argument: '" << arg << "'";
return status;
}
if (args_.empty()) {
args_.emplace_back(std::string(kPkgPrefix) + std::string(arg));
} else {
args_.emplace_back(arg);
}
return ZX_OK;
}
zx_status_t ChildProcess::AddArgs(std::initializer_list<const char*> args) {
auto orig_num_args = args_.size();
auto orig_num_bytes = num_bytes_;
for (const auto* arg : args) {
if (auto status = AddArg(arg); status != ZX_OK) {
args_.resize(orig_num_args);
num_bytes_ = orig_num_bytes;
return status;
}
}
return ZX_OK;
}
zx_status_t ChildProcess::SetEnvVar(std::string_view name, std::string_view value) {
if (auto status = ReserveBytes(name.size() + 1 + value.size()); status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to add environment variable: '" << name << "=" << value << "'";
return status;
}
envvars_[std::string(name)] = value;
return ZX_OK;
}
zx_status_t ChildProcess::AddStdinPipe() {
int wpipe = -1;
if (auto status = AddPipe(STDIN_FILENO, nullptr, &wpipe); status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to create pipe to process stdin: " << zx_status_get_string(status);
return status;
}
{
std::lock_guard lock(mutex_);
input_closed_ = false;
}
stdin_thread_ = std::thread([this, wpipe] {
bool close_input = false;
std::vector<std::string> input_lines;
while (!close_input) {
{
std::unique_lock lock(mutex_);
input_cond_.wait(lock, [this, &close_input, &input_lines]() FXL_REQUIRE(mutex_) {
close_input = input_closed_;
input_lines = std::move(input_lines_);
input_lines_.clear();
return true;
});
}
for (auto& line : input_lines) {
const char* buf = line.data();
size_t off = 0;
size_t len = line.size();
while (off < len) {
auto num_written = HANDLE_EINTR(write(wpipe, &buf[off], len - off));
if (num_written < 0) {
FX_LOGS(ERROR) << "Failed to write input to process: " << strerror(errno);
close_input = true;
break;
}
off += num_written;
}
}
}
close(wpipe);
});
return ZX_OK;
}
zx_status_t ChildProcess::AddStdoutPipe() {
int rpipe = -1;
if (auto status = AddPipe(STDOUT_FILENO, &rpipe, nullptr); status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to create pipe from process stdout: " << zx_status_get_string(status);
return status;
}
AsyncSender<std::string> sender;
stdout_ = AsyncReceiver<std::string>::MakePtr(&sender);
stdout_thread_ = std::thread([this, rpipe, sender = std::move(sender)]() mutable {
stdout_result_ = ReadAndSend(rpipe, std::move(sender));
});
return ZX_OK;
}
zx_status_t ChildProcess::AddStderrPipe() {
int rpipe = -1;
if (auto status = AddPipe(STDERR_FILENO, &rpipe, nullptr); status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to create pipe from process stderr: " << zx_status_get_string(status);
return status;
}
AsyncSender<std::string> sender;
stderr_ = AsyncReceiver<std::string>::MakePtr(&sender);
stderr_thread_ = std::thread([this, rpipe, sender = std::move(sender)]() mutable {
stderr_result_ = ReadAndSend(rpipe, std::move(sender));
});
return ZX_OK;
}
zx_status_t ChildProcess::AddPipe(int target_fd, int* out_rpipe, int* out_wpipe) {
if (auto status = ReserveHandles(1); status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to add pipe with fd=" << target_fd;
return status;
}
if (spawned_) {
FX_LOGS(ERROR) << "Cannot add stdio pipes after spawning.";
return ZX_ERR_BAD_STATE;
}
int fds[2];
if (pipe(fds) != 0) {
FX_LOGS(ERROR) << "Failed to transfer file descriptor: " << strerror(errno);
return ZX_ERR_IO;
}
fdio_spawn_action_t action = {.action = FDIO_SPAWN_ACTION_TRANSFER_FD,
.fd = {
.local_fd = -1,
.target_fd = target_fd,
}};
if (out_rpipe && !out_wpipe) {
*out_rpipe = fds[0];
action.fd.local_fd = fds[1];
} else if (!out_rpipe && out_wpipe) {
action.fd.local_fd = fds[0];
*out_wpipe = fds[1];
} else {
FX_NOTREACHED() << "Exactly one of [out_rpipe, out_wpipe] must be non-null";
}
actions_.emplace_back(std::move(action));
return ZX_OK;
}
zx_status_t ChildProcess::AddChannel(uint32_t id, zx::channel channel) {
if (auto status = ReserveHandles(1); status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to add channel with id=" << id;
return status;
}
fdio_spawn_action_t action{.action = FDIO_SPAWN_ACTION_ADD_HANDLE,
.h = {
.id = PA_HND(PA_USER0, id),
.handle = channel.release(),
}};
actions_.emplace_back(std::move(action));
return ZX_OK;
}
zx_status_t ChildProcess::ReserveBytes(size_t num_bytes) {
if (num_bytes_ + num_bytes > ZX_CHANNEL_MAX_MSG_BYTES / 2) {
FX_LOGS(WARNING) << "Spawn message bytes limit exceeded; " << num_bytes_
<< " bytes previously added";
return ZX_ERR_OUT_OF_RANGE;
}
num_bytes_ += num_bytes;
return ZX_OK;
}
zx_status_t ChildProcess::ReserveHandles(size_t num_handles) {
if (num_handles_ + num_handles > ZX_CHANNEL_MAX_MSG_HANDLES / 2) {
FX_LOGS(WARNING) << "Spawn message handles limit exceeded; " << num_handles_
<< " handles previously added";
return ZX_ERR_OUT_OF_RANGE;
}
num_handles_ += num_handles;
return ZX_OK;
}
zx_status_t ChildProcess::Spawn() {
// Convert args and envvars to C-style strings.
// The envvars vector holds the constructed strings backing the pointers in environ.
std::vector<std::string> envvars;
std::vector<const char*> environ;
std::ostringstream log_str;
for (const auto& [key, value] : envvars_) {
std::ostringstream oss;
oss << key << "=" << value;
envvars.push_back(oss.str());
log_str << envvars.back() << " ";
environ.push_back(envvars.back().c_str());
}
environ.push_back(nullptr);
std::vector<const char*> argv;
argv.reserve(args_.size() + 1);
for (const auto& arg : args_) {
log_str << arg << " ";
argv.push_back(arg.c_str());
}
argv.push_back(nullptr);
FX_LOGS(INFO) << log_str.str();
// Build spawn actions
if (spawned_) {
FX_LOGS(ERROR) << "ChildProcess must be reset before it can be respawned.";
return ZX_ERR_BAD_STATE;
}
spawned_ = true;
// Spawn the process!
auto flags = FDIO_SPAWN_CLONE_ALL & (~FDIO_SPAWN_CLONE_STDIO);
auto* handle = process_.reset_and_get_address();
char err_msg[FDIO_SPAWN_ERR_MSG_MAX_LENGTH];
if (auto status = fdio_spawn_etc(ZX_HANDLE_INVALID, flags, argv[0], &argv[0], &environ[0],
actions_.size(), actions_.data(), handle, err_msg);
status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to spawn process: " << err_msg << " (" << zx_status_get_string(status)
<< ")";
return status;
}
return ZX_OK;
}
bool ChildProcess::IsAlive() {
if (!process_) {
return false;
}
auto status = process_.get_info(ZX_INFO_PROCESS, &info_, sizeof(info_), nullptr, nullptr);
if (status == ZX_ERR_BAD_HANDLE) {
return false;
}
FX_CHECK(status == ZX_OK);
return (info_.flags & ZX_INFO_PROCESS_FLAG_EXITED) == 0;
}
zx_status_t ChildProcess::Duplicate(zx::process* out) {
return process_.duplicate(ZX_RIGHT_SAME_RIGHTS, out);
}
zx_status_t ChildProcess::WriteToStdin(std::string_view line) {
if (!IsAlive()) {
FX_LOGS(WARNING) << "Cannot write to process standard input: not running";
return ZX_ERR_BAD_STATE;
}
{
std::lock_guard lock(mutex_);
if (input_closed_) {
FX_LOGS(WARNING) << "Cannot write to process standard input: closed";
return ZX_ERR_PEER_CLOSED;
}
input_lines_.emplace_back(std::string(line));
}
input_cond_.notify_one();
return ZX_OK;
}
ZxPromise<std::string> ChildProcess::ReadFromStdout() {
if (killed_ || !stdout_) {
return fpromise::make_promise(
[]() -> ZxResult<std::string> { return fpromise::error(ZX_ERR_BAD_STATE); });
}
return stdout_->Receive()
.or_else([this]() -> ZxResult<std::string> {
if (stdout_thread_.joinable()) {
stdout_thread_.join();
}
return fpromise::error(stdout_result_);
})
.wrap_with(scope_);
}
ZxPromise<std::string> ChildProcess::ReadFromStderr() {
if (killed_ || !stderr_) {
return fpromise::make_promise(
[]() -> ZxResult<std::string> { return fpromise::error(ZX_ERR_BAD_STATE); });
}
return stderr_->Receive()
.or_else([this]() -> ZxResult<std::string> {
if (stderr_thread_.joinable()) {
stderr_thread_.join();
}
return fpromise::error(stderr_result_);
})
.wrap_with(scope_);
}
void ChildProcess::CloseStdin() {
{
std::lock_guard lock(mutex_);
input_closed_ = true;
}
input_cond_.notify_one();
}
ZxResult<ProcessStats> ChildProcess::GetStats() {
ProcessStats stats;
if (auto status = GetStatsForProcess(process_, &stats); status != ZX_OK) {
return fpromise::error(status);
}
return fpromise::ok(std::move(stats));
}
ZxPromise<int64_t> ChildProcess::Wait() {
return fpromise::make_promise([this, terminated = ZxFuture<zx_packet_signal_t>()](
Context& context) mutable -> ZxResult<int64_t> {
if (!IsAlive()) {
return fpromise::ok(info_.return_code);
}
if (!terminated) {
terminated = executor_->MakePromiseWaitHandle(zx::unowned_handle(process_.get()),
ZX_PROCESS_TERMINATED);
}
if (!terminated(context)) {
return fpromise::pending();
}
if (terminated.is_error()) {
auto status = terminated.error();
FX_LOGS(WARNING) << "Failed to wait for process to terminate: "
<< zx_status_get_string(status);
return fpromise::error(status);
}
if (IsAlive()) {
FX_LOGS(WARNING) << "Failed to terminate process.";
return fpromise::error(ZX_ERR_BAD_STATE);
}
return fpromise::ok(info_.return_code);
})
.wrap_with(scope_);
}
ZxPromise<> ChildProcess::Kill() {
return fpromise::make_promise(
[this, wait = ZxFuture<int64_t>()](Context& context) mutable -> ZxResult<> {
if (!wait) {
KillSync();
wait = Wait();
}
if (!wait(context)) {
return fpromise::pending();
}
if (wait.is_error()) {
return fpromise::error(wait.take_error());
}
return fpromise::ok();
})
.wrap_with(scope_);
}
void ChildProcess::KillSync() {
process_.kill();
CloseStdin();
if (stdin_thread_.joinable()) {
stdin_thread_.join();
}
if (stdout_thread_.joinable()) {
stdout_thread_.join();
}
if (stderr_thread_.joinable()) {
stderr_thread_.join();
}
killed_ = true;
}
void ChildProcess::Reset() {
KillSync();
spawned_ = false;
killed_ = false;
num_bytes_ = 0;
num_handles_ = 0;
args_.clear();
envvars_.clear();
process_.reset();
memset(&info_, 0, sizeof(info_));
{
std::lock_guard lock(mutex_);
input_closed_ = false;
input_lines_.clear();
}
stdout_.reset();
stdout_result_ = ZX_OK;
stderr_.reset();
stderr_result_ = ZX_OK;
actions_.clear();
}
} // namespace fuzzing