blob: 9f59b514e2a731b15df0df367e7c85c16638cdb1 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/sys/fuzzing/libfuzzer/process.h"
#include <lib/async/cpp/executor.h>
#include <lib/fdio/spawn.h>
#include <lib/syslog/cpp/macros.h>
#include <poll.h>
#include <stdio.h>
#include <unistd.h>
#include <zircon/status.h>
#include <iostream>
#include "src/lib/files/eintr_wrapper.h"
namespace fuzzing {
namespace {
ZxResult<fdio_spawn_action_t> MakeSpawnAction(SpawnAction action, int target_fd,
int* piped_fd = nullptr) {
fdio_spawn_action_t tmp;
tmp.action = action;
switch (action) {
case kTransfer: {
int fds[2];
if (pipe(fds) != 0) {
FX_LOGS(ERROR) << "Failed to transfer file descriptor: " << strerror(errno);
return fpromise::error(ZX_ERR_IO);
int local_fd = target_fd == STDIN_FILENO ? fds[0] : fds[1];
*piped_fd = target_fd == STDIN_FILENO ? fds[1] : fds[0];
tmp.fd.local_fd = local_fd;
tmp.fd.target_fd = target_fd;
case kClone: {
tmp.fd.local_fd = target_fd;
tmp.fd.target_fd = target_fd;
return fpromise::ok(std::move(tmp));
} // namespace
Process::Process(ExecutorPtr executor) : executor_(std::move(executor)) {
static_assert(STDERR_FILENO + 1 == kNumStreams);
streams_[STDOUT_FILENO].buf = std::make_unique<Stream::Buffer>();
streams_[STDERR_FILENO].buf = std::make_unique<Stream::Buffer>();
void Process::SetStdoutSpawnAction(SpawnAction action) {
streams_[STDOUT_FILENO].spawn_action = action;
void Process::SetStderrSpawnAction(SpawnAction action) {
streams_[STDERR_FILENO].spawn_action = action;
ZxPromise<> Process::Spawn(const std::vector<std::string>& args) {
return fpromise::make_promise([this, args = std::vector<std::string>(args)]() -> ZxResult<> {
// Convert args to C-style strings.
std::vector<const char*> argv;
for (const auto& arg : args) {
if (verbose_) {
std::cerr << arg << " ";
if (verbose_) {
std::cerr << std::endl;
// Build spawn actions
std::vector<fdio_spawn_action_t> actions;
for (int fileno = STDIN_FILENO; fileno < kNumStreams; ++fileno) {
auto& stream = streams_[fileno];
if (!stream.on_spawn) {
FX_LOGS(ERROR) << "Process must be reset before it can be respawned.";
return fpromise::error(ZX_ERR_BAD_STATE);
auto result = MakeSpawnAction(stream.spawn_action, fileno, &stream.fd);
if (result.is_error()) {
return fpromise::error(result.error());
// Spawn the process!
auto* handle = process_.reset_and_get_address();
auto result =
AsZxResult(fdio_spawn_etc(ZX_HANDLE_INVALID, flags, argv[0], &argv[0], nullptr,
actions.size(),, handle, err_msg));
if (result.is_error()) {
FX_LOGS(ERROR) << "Failed to spawn process: " << err_msg;
return result;
.inspect([this](const ZxResult<>& result) {
for (auto& stream : streams_) {
if (result.is_ok()) {
} else if (stream.on_spawn) {
ZxPromise<size_t> Process::WriteToStdin(const void* buf, size_t count) {
ZxBridge<> bridge;
return AwaitPrevious(STDIN_FILENO, std::move(bridge.consumer))
.and_then([this, buf, count]() -> ZxResult<size_t> {
auto fd = streams_[STDIN_FILENO].fd;
auto num_written = HANDLE_EINTR(write(fd, buf, count));
if (num_written < 0) {
FX_LOGS(ERROR) << "Failed to write input to process: " << strerror(errno);
return fpromise::error(ZX_ERR_IO);
return fpromise::ok(static_cast<size_t>(num_written));
.inspect([completer = std::move(bridge.completer)](const ZxResult<size_t>& result) mutable {
ZxPromise<size_t> Process::WriteAndCloseStdin(const void* buf, size_t count) {
return WriteToStdin(buf, count).inspect([this](const ZxResult<size_t>& result) { CloseStdin(); });
void Process::CloseStdin() {
auto& stream = streams_[STDIN_FILENO];
auto discarded = std::move(stream.previous);
ZxPromise<std::string> Process::ReadFromStdout() { return ReadLine(STDOUT_FILENO); }
ZxPromise<std::string> Process::ReadFromStderr() { return ReadLine(STDERR_FILENO); }
ZxPromise<std::string> Process::ReadLine(int fd) {
ZxBridge<> bridge;
auto& stream = streams_[fd];
return AwaitPrevious(fd, std::move(bridge.consumer))
.and_then([&stream, ready = ZxFuture<>()](Context& context) mutable -> ZxResult<std::string> {
auto fd = stream.fd;
auto* buf = stream.buf.get();
if (fd < 0 || !buf) {
FX_LOGS(ERROR) << "Invalid fd: " << fd;
return fpromise::error(ZX_ERR_INVALID_ARGS);
while (true) {
if (stream.start != stream.end) {
auto newline = std::find(stream.start, stream.end, '\n');
if (newline != stream.end) {
// Found a newline; return the string up to it and search again.
std::string line(stream.start, newline - stream.start);
stream.start = newline + 1;
return fpromise::ok(std::move(line));
if (!ready) {
// Need more data. First move any remaining data to the start of the buffer.
if (stream.start != buf->begin()) {
auto tmp = stream.start;
stream.start = buf->begin();
memmove(&*stream.start, &*tmp, stream.end - tmp);
stream.end -= tmp - stream.start;
} else if (stream.end == buf->end()) {
FX_LOGS(WARNING) << "a single log line exceeds " << buf->size()
<< " characters; skipping...";
stream.end = stream.start;
// Now create a future to wait for the file descriptor to be readable.
ZxBridge<> bridge;
auto task = [completer = std::move(bridge.completer)](zx_status_t status,
uint32_t events) mutable {
if (status != ZX_OK) {
stream.fd_waiter->Wait(std::move(task), fd, POLLIN);
ready = bridge.consumer.promise_or(fpromise::error(ZX_ERR_CANCELED));
if (!ready(context)) {
return fpromise::pending();
if (ready.is_error()) {
auto status = ready.error();
if (status == ZX_ERR_CANCELED) {
// Stream was closed due to process exiting.
return fpromise::error(ZX_ERR_STOP);
FX_LOGS(ERROR) << "Failed to wait for output from process: "
<< zx_status_get_string(ready.error());
return fpromise::error(ready.error());
// File descriptor is readable; read from it!
auto bytes_read = HANDLE_EINTR(read(fd, &*stream.end, buf->end() - stream.end));
if (bytes_read < 0) {
if (errno == EBADF) {
// Stream was closed due to process exiting.
return fpromise::error(ZX_ERR_STOP);
FX_LOGS(ERROR) << "Failed to read output from process: " << strerror(errno);
return fpromise::error(ZX_ERR_IO);
if (bytes_read == 0 && stream.start != stream.end) {
// File descriptor is closed; just return whatever's left.
std::string line(stream.start, stream.end - stream.start);
stream.start = stream.end;
return fpromise::ok(std::move(line));
if (bytes_read == 0) {
// Return an error to let the caller know there's no more data available.
return fpromise::error(ZX_ERR_STOP);
stream.end += bytes_read;
ready = nullptr;
.inspect([completer = std::move(bridge.completer),
verbose = verbose_](const ZxResult<std::string>& result) mutable {
if (result.is_error()) {
if (verbose) {
std::cerr << result.value() << std::endl;
ZxPromise<> Process::AwaitPrevious(int fd, ZxConsumer<> consumer) {
auto previous = std::move(streams_[fd].previous);
if (!previous) {
return fpromise::make_promise([]() -> ZxResult<> {
FX_LOGS(ERROR) << "Stream has been closed.";
return fpromise::error(ZX_ERR_BAD_STATE);
streams_[fd].previous = std::move(consumer);
return previous.promise();
ZxPromise<> Process::Kill() {
return fpromise::make_promise(
[this, kill = ZxFuture<zx_packet_signal_t>()](Context& context) mutable -> ZxResult<> {
if (!process_) {
return fpromise::ok();
if (!kill) {
for (auto& stream : streams_) {
kill = executor_->MakePromiseWaitHandle(zx::unowned_handle(process_.get()),
if (!kill(context)) {
return fpromise::pending();
if (kill.is_error()) {
return fpromise::error(kill.error());
for (auto& stream : streams_) {
auto discarded = std::move(stream.previous);
return fpromise::ok();
void Process::Reset() {
for (auto& stream : streams_) {
if (stream.buf) {
stream.start = stream.buf->begin();
stream.end = stream.start;
ZxBridge<> bridge;
stream.fd = -1;
stream.on_spawn = std::move(bridge.completer);
stream.previous = std::move(bridge.consumer);
stream.fd_waiter = std::make_unique<fsl::FDWaiter>(executor_->dispatcher());
} // namespace fuzzing