blob: 0bc517e32d423799c4e03c17bf31413868b33d39 [file] [log] [blame]
// Copyright 2023 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef NINJA_ASYNC_LOOP_POSIX_H
#define NINJA_ASYNC_LOOP_POSIX_H
// Posix implementation of the AsyncLoop class.
// This contains specialized code paths for Linux ppoll() and MacOS kqueue()
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <poll.h>
#include <signal.h>
#include <stddef.h>
#include <string.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#ifdef USE_KQUEUE
#include <sys/event.h>
#endif
#include <algorithm>
#include <unordered_map>
#include "async_loop.h"
#include "async_loop_timers.h"
#include "interrupt_handling.h"
#include "metrics.h" // For GetTimeMillis()
#include "util.h" // For Fatal() and Win32Fatal().
#ifdef USE_KQUEUE
// The know state of a kqueue read or write filter for a given file descriptor.
// Unknown means that the kernel queue does not know about this filter yet.
enum class KqueueFilterState {
Unknown = 0,
Disabled,
Enabled,
};
#endif // USE_KQUEUE
class AsyncHandle::State {
public:
enum class Kind {
None = 0,
Read,
Write,
Connect,
Accept,
};
// Constructors.
explicit State(AsyncLoop& async_loop) : async_loop_(async_loop) {}
State(IpcHandle handle, AsyncLoop& async_loop,
AsyncHandle::Callback&& callback)
: fd_(std::move(handle)), callback_(std::move(callback)),
async_loop_(async_loop) {
if (fd_)
async_loop_.AttachHandle(this);
}
// Destructor. Note that this type is unmoveable due to Win32
// requirements.
~State() {
if (fd_)
async_loop_.DetachHandle(this);
}
// Disallow copy operations. Even though this is implied by
// the use of moveable-only fields like |fd_|.
State(const State&) = delete;
State& operator=(const State&) = delete;
// Disallow move operations. The address of these instances are recorded
// in the |AsyncLoop::Impl::watches_| data structure.
State(State&&) noexcept = delete;
State& operator=(State&&) noexcept = delete;
// Return AsyncLoop instance this instance belongs to.
AsyncLoop& async_loop() const { return async_loop_; }
// Return AsyncLoop implementation instance.
AsyncLoop::Impl& loop() const { return *async_loop_.impl_; }
// Return file descriptor.
int fd() const { return fd_.native_handle(); }
int native_handle() const { return fd(); }
// Return true if the handle for this instnace is valid.
bool is_valid() const { return !!fd_; }
// Release the native handle for this instance, making it invalid.
int ReleaseHandle() {
Reset(Kind::None);
if (fd_)
async_loop_.DetachHandle(this);
return fd_.ReleaseNativeHandle();
}
// Is an asynchronous operation running?
bool IsRunning() const { return kind_ != Kind::None && !completed_; }
// Cancel current asynchronous operation, if any.
void Cancel() {
if (kind_ != Kind::None) {
async_loop_.CancelHandle(this);
Reset(Kind::None);
}
}
// Reset the callback.
void ResetCallback(AsyncHandle::Callback&& cb) { callback_ = std::move(cb); }
void ResetHandle(IpcHandle handle) {
Reset(Kind::None);
if (fd_)
async_loop_.DetachHandle(this);
fd_ = std::move(handle);
if (fd_)
async_loop_.AttachHandle(this);
}
bool NeedsEvent() const { return !completed_ && kind_ != Kind::None; }
bool NeedsWriteEvent() const {
return !completed_ && (kind_ == Kind::Write || kind_ == Kind::Connect);
}
bool NeedsReadEvent() const {
return !completed_ && (kind_ == Kind::Read || kind_ == Kind::Accept);
}
short poll_events() const {
if (completed_)
return 0;
if (kind_ == Kind::Write || kind_ == Kind::Connect)
return POLLOUT;
if (kind_ == Kind::Read || kind_ == Kind::Accept)
return POLLIN | POLLPRI;
return 0;
}
short poll_revents() const { return poll_events() | POLLERR | POLLHUP; }
// Reset state to start a new asynchronous operation. Cancels current one
// if any.
void Reset(Kind kind = Kind::None) {
kind_ = kind;
completed_ = false;
error_ = 0;
buffer_ = nullptr;
wanted_size_ = 0;
actual_size_ = 0;
accept_handle_.Close();
}
void StartRead(void* buffer, size_t size) {
Reset(Kind::Read);
buffer_ = buffer;
wanted_size_ = size;
if (!size)
completed_ = true;
else
async_loop().UpdateHandle(this);
}
void StartWrite(const void* buffer, size_t size) {
Reset(Kind::Write);
buffer_ = const_cast<void*>(buffer);
wanted_size_ = size;
if (!size)
completed_ = true;
else
async_loop().UpdateHandle(this);
}
void StartConnect() {
Reset(Kind::Connect);
async_loop().UpdateHandle(this);
}
void StartAccept() {
Reset(Kind::Accept);
async_loop().UpdateHandle(this);
}
bool OnEvent() {
error_ = 0;
actual_size_ = 0;
assert(!completed_);
completed_ = true;
switch (kind_) {
case Kind::Read: {
int ret;
do {
ret = ::read(fd_.native_handle(), buffer_, wanted_size_);
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
error_ = errno;
if (error_ == EAGAIN || error_ == EWOULDBLOCK) {
// A spurious wakeup happened.
return false;
}
} else {
actual_size_ = static_cast<size_t>(ret);
}
break;
}
case Kind::Write: {
int ret;
do {
ret = ::write(fd_.native_handle(), buffer_, wanted_size_);
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
error_ = errno;
if (error_ == EAGAIN || error_ == EWOULDBLOCK) {
// A spurious wakeup happened.
return false;
}
} else {
actual_size_ = static_cast<size_t>(ret);
}
break;
}
case Kind::Connect: {
int so_error =
IpcHandle::GetNativeAsyncConnectStatus(fd_.native_handle());
if (so_error != 0) {
if (so_error == EAGAIN || so_error == EINPROGRESS)
return false;
error_ = so_error;
}
break;
}
case Kind::Accept: {
int ret;
do {
ret = ::accept(fd_.native_handle(), nullptr, 0);
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
error_ = errno;
if (error_ == EAGAIN || error_ == EWOULDBLOCK)
return false;
} else {
accept_handle_ = IpcHandle(ret);
// NOTE: On Linux, client handle does not inherit O_NONBLOCK
// but it will on BSDs, so portable programs should set the
// flag explicitly. TODO(digit).
}
break;
}
default:
assert(false && "Invalid runtime async op type!");
return false;
}
async_loop_.UpdateHandle(this);
callback_(error_, actual_size_);
return true;
}
IpcHandle TakeAcceptedHandle() { return std::move(accept_handle_); }
IpcHandle fd_;
Kind kind_ = Kind::None;
bool completed_ = false;
#ifdef USE_KQUEUE
// These values are used exclusively by AsyncLoop::Impl::Watches
KqueueFilterState kqueue_read_filter_ = KqueueFilterState::Unknown;
KqueueFilterState kqueue_write_filter_ = KqueueFilterState::Unknown;
#endif // USE_KQUEUE
int error_ = 0;
void* buffer_ = nullptr;
size_t wanted_size_ = 0;
size_t actual_size_ = 0;
AsyncHandle::Callback callback_;
IpcHandle accept_handle_;
AsyncLoop& async_loop_;
};
class AsyncLoop::Impl {
/// The list of pending AsyncHandle::States that have completed but whose
/// Invoke() method wasn't called yet. Identified by their address.
struct PendingList : public std::vector<AsyncHandle::State*> {
// Remove one async state from the list.
bool Remove(AsyncHandle::State* astate) {
for (auto it = begin(); it != end(); ++it) {
if (*it == astate) {
// Order does not matter for this list, so just move last item
// to current location to avoid O(n) removal cost.
auto it_last = end() - 1;
if (it != it_last)
*it = *it_last;
pop_back();
return true;
}
}
return false;
}
};
PendingList pending_ops_;
#ifdef USE_KQUEUE
class Watches {
IpcHandle queue_ = -1;
std::vector<AsyncHandle::State*> states_;
std::vector<struct kevent> events_;
static constexpr size_t npos = ~size_t(0);
size_t FindState(AsyncHandle::State* state) const {
size_t result = 0;
for (auto it = states_.begin(); it != states_.end(); ++it, ++result) {
if (*it == state)
return result;
}
return npos;
}
void InsertIntoQueue(AsyncHandle::State* state) {
// Do not create any filter yet, these will be added on demand
// during WaitForEvents().
state->kqueue_read_filter_ = KqueueFilterState::Unknown;
state->kqueue_write_filter_ = KqueueFilterState::Unknown;
}
void RemoveFromQueue(AsyncHandle::State* state) {
// Remove the filters directly, instead of delaying the operation
// to WaitForEvents().
struct kevent events[2];
int count = 0;
if (state->kqueue_read_filter_ != KqueueFilterState::Unknown) {
events[count++] = {
static_cast<uintptr_t>(state->fd()),
EVFILT_READ,
EV_DELETE,
0,
0,
state,
};
}
if (state->kqueue_write_filter_ != KqueueFilterState::Unknown) {
events[count++] = {
static_cast<uintptr_t>(state->fd()),
EVFILT_WRITE,
EV_DELETE,
0,
0,
state,
};
}
if (count > 0) {
int ret = kevent(queue_.native_handle(), events, count, NULL, 0, NULL);
if (ret < 0)
ErrnoFatal("kevent.Remove");
}
}
public:
Watches() : queue_(kqueue()) {
if (!queue_)
ErrnoFatal("kqueue()");
}
~Watches() {
assert(states_.empty() &&
"Destroying AsyncLoop before child AsyncHandle instances!");
}
bool HasWaiters() const {
for (const auto* state : states_) {
if (state->IsRunning())
return true;
}
return false;
}
void Insert(AsyncHandle::State* state) {
assert(FindState(state) == npos && "Async state already in the set!");
states_.push_back(state);
InsertIntoQueue(state);
}
void Remove(AsyncHandle::State* state) {
size_t state_pos = FindState(state);
assert(state_pos != npos && "Async state not in the set!");
RemoveFromQueue(state);
// Order is not important
states_[state_pos] = states_.back();
states_.pop_back();
}
void Update(AsyncHandle::State* state) {
assert(FindState(state) != npos && "Updating unknown async state!");
// Do not do anything, changes will be computed on
// demand in the next WaitForEvents() call.
}
int WaitForEvents(const struct timespec* ts, PendingList& pending_ops) {
events_.resize(2 * states_.size());
// Compute the changes to send to the kernel.
int num_changes = 0;
int num_events = 0;
// Helper function to compute the set of flags to apply to
// a given Kqueue filter. Return 0 if there is no change to apply.
// Also updates num_events.
auto check_filter = [&num_events](KqueueFilterState& filter,
bool event_needed) -> uint16_t {
uint16_t flags = 0;
if (event_needed) {
num_events++;
if (filter != KqueueFilterState::Enabled) {
flags = EV_ENABLE | EV_CLEAR |
((filter == KqueueFilterState::Unknown) ? EV_ADD : 0);
filter = KqueueFilterState::Enabled;
}
} else if (filter == KqueueFilterState::Enabled) {
flags = EV_DELETE;
filter = KqueueFilterState::Disabled;
}
return flags;
};
for (auto* state : states_) {
uint16_t flags =
check_filter(state->kqueue_read_filter_, state->NeedsReadEvent());
if (flags) {
events_[num_changes++] = {
static_cast<uintptr_t>(state->fd()),
EVFILT_READ,
flags,
0,
0,
state,
};
}
flags =
check_filter(state->kqueue_write_filter_, state->NeedsWriteEvent());
if (flags) {
events_[num_changes++] = {
static_cast<uintptr_t>(state->fd()),
EVFILT_WRITE,
flags,
0,
0,
state,
};
}
}
struct kevent* events = &events_.front();
int ret = kevent(queue_.native_handle(), events, num_changes, events,
events_.size(), ts);
if (ret < 0) {
if (errno != EINTR)
ErrnoFatal("kevent.Wait");
return -1;
}
// kevent() always returns immediately if the size of the events array
// is 0, even if there is a timeout so use pselect() instead.
if (ret == 0 && num_events == 0) {
int ret = pselect(0, NULL, NULL, NULL, ts, NULL);
if (ret < 0 && errno != EINTR)
ErrnoFatal("pselect");
return ret;
}
struct kevent* event = &events_.front();
for (int n = 0; n < ret; ++n, ++event) {
auto* state = static_cast<AsyncHandle::State*>(event->udata);
assert(state);
assert(static_cast<uintptr_t>(state->fd()) == event->ident);
// Do not try to interpret EV_ERROR here, assume that the OnError()
// method will retry the syscall and get the error from it.
if (event->filter == EVFILT_READ) {
assert(state->NeedsReadEvent());
assert(state->kqueue_read_filter_ == KqueueFilterState::Enabled);
state->kqueue_read_filter_ = KqueueFilterState::Disabled;
pending_ops.push_back(state);
} else if (event->filter == EVFILT_WRITE) {
assert(state->NeedsWriteEvent());
assert(state->kqueue_write_filter_ == KqueueFilterState::Enabled);
state->kqueue_write_filter_ = KqueueFilterState::Disabled;
pending_ops.push_back(state);
}
}
return ret;
}
};
#elif defined(USE_PPOLL)
class Watches {
// Use two parallel vectors, where polls_[n] is an entry matching
// states_[n], since only one async operation can exist for each file
// descriptor. They must be separate because the address of polls_.front()
// will be passed to the kernel for ppoll(), which expects a contiguous
// array of pollfd items in memory.
std::vector<AsyncHandle::State*> states_;
std::vector<pollfd> polls_;
size_t FindState(AsyncHandle::State* state) {
size_t result = 0;
for (auto* s : states_) {
if (s == state)
return result;
++result;
}
return npos;
}
public:
static constexpr size_t npos = ~size_t(0);
~Watches() {
assert(states_.empty() &&
"Destroying AsyncLoop before child AsyncHandle instances!");
}
bool HasWaiters() const {
for (const auto& poll : polls_) {
if (poll.events != 0)
return true;
}
return false;
}
void Insert(AsyncHandle::State* state) {
assert(FindState(state) == npos && "Async state already in the set!");
states_.push_back(state);
polls_.push_back({ state->fd(), state->poll_events(), 0 });
}
void Remove(AsyncHandle::State* state) {
size_t state_pos = FindState(state);
assert(state_pos != npos && "Async state not in the set!");
// Order is not important
size_t last_pos = states_.size() - 1;
if (state_pos != last_pos) {
states_[state_pos] = states_[last_pos];
polls_[state_pos] = polls_[last_pos];
}
states_.pop_back();
polls_.pop_back();
}
void Update(AsyncHandle::State* state) {
size_t state_pos = FindState(state);
assert(state_pos != npos && "Updating unknown async state!");
polls_[state_pos].events = state->poll_events();
}
int WaitForEvents(const struct timespec* ts, PendingList& pending_ops) {
int ret = ppoll(&polls_.front(), static_cast<nfds_t>(polls_.size()), ts,
nullptr);
if (ret < 0) {
if (errno != EINTR)
ErrnoFatal("ppoll");
} else if (ret > 0) {
auto cur_poll = polls_.begin();
auto cur_state = states_.begin();
for (; cur_poll != polls_.end(); ++cur_poll, ++cur_state) {
AsyncHandle::State* state = *cur_state;
assert(state->fd() == cur_poll->fd);
if (cur_poll->revents & state->poll_revents())
pending_ops.push_back(state);
}
}
return ret;
}
};
#else // !USE_KQUEUE && !USE_PPOLL
class Watches {
static constexpr int kInvalid = -2;
mutable int max_fds_ = kInvalid;
using ListType = std::vector<AsyncHandle::State*>;
ListType states_;
fd_set read_fds_;
fd_set write_fds_;
fd_set event_read_fds_;
fd_set event_write_fds_;
void Invalidate() { max_fds_ = kInvalid; }
int num_fds() const {
// Recompute max file descriptor if needed.
if (max_fds_ == kInvalid) {
max_fds_ = -1;
for (AsyncHandle::State* state : states_) {
if (state->NeedsReadEvent() || state->NeedsWriteEvent()) {
if (state->fd() > max_fds_)
max_fds_ = state->fd();
}
}
}
return max_fds_ + 1;
}
ListType::iterator FindState(AsyncHandle::State* state) {
return std::find(states_.begin(), states_.end(), state);
}
public:
Watches() {
FD_ZERO(&read_fds_);
FD_ZERO(&write_fds_);
}
void Insert(AsyncHandle::State* state) {
if (state->fd() >= static_cast<int>(FD_SETSIZE))
Fatal("File descriptor too large for pselect(): %d\n", state->fd());
auto it = FindState(state);
assert(it == states_.end() && "Async state already in the set!");
if (it != states_.end())
return;
states_.push_back(state);
if (state->NeedsReadEvent())
FD_SET(state->fd(), &read_fds_);
if (state->NeedsWriteEvent())
FD_SET(state->fd(), &write_fds_);
Invalidate();
}
void Remove(AsyncHandle::State* state) {
auto it = FindState(state);
assert(it != states_.end() && "Removing unknown Async state!");
if (it == states_.end())
return;
FD_CLR(state->fd(), &read_fds_);
FD_CLR(state->fd(), &write_fds_);
Invalidate();
}
void Update(AsyncHandle::State* state) {
int fd = state->fd();
if (state->NeedsReadEvent()) {
FD_SET(fd, &read_fds_);
} else {
FD_CLR(fd, &read_fds_);
}
if (state->NeedsWriteEvent()) {
FD_SET(fd, &write_fds_);
} else {
FD_CLR(fd, &write_fds_);
}
}
bool HasWaiters() const {
for (const auto* state : states_) {
if (state->IsRunning())
return true;
}
return false;
}
int WaitForEvents(const struct timespec* ts, PendingList& pending_ops) {
event_read_fds_ = read_fds_;
event_write_fds_ = write_fds_;
int count = num_fds();
int ret = pselect(count, &event_read_fds_, &event_write_fds_, nullptr, ts,
nullptr);
if (ret < 0) {
if (errno != EINTR)
ErrnoFatal("pselect");
} else if (ret > 0) {
for (auto* state : states_) {
int fd = state->fd();
bool has_event = false;
has_event |=
state->NeedsWriteEvent() && FD_ISSET(fd, &event_write_fds_);
has_event |=
state->NeedsReadEvent() && FD_ISSET(fd, &event_read_fds_);
if (has_event)
pending_ops.emplace_back(state);
}
}
return ret;
}
};
#endif // !USE_KQUEUE && !USE_PPOLL
public:
~Impl() {
assert(!watches_.HasWaiters() &&
"Destroying AsyncLoop before children AsyncHandle instances!");
assert(timers_.empty() &&
"Destroying AsyncLoop before children AsyncTimer instances!");
}
AsyncLoopTimers& timers() { return timers_; }
void AttachHandle(AsyncHandle::State* state) {
assert(state->fd_ && "Trying to attach invalid handle");
watches_.Insert(state);
}
void DetachHandle(AsyncHandle::State* state) {
assert(state->fd_ && "Trying to detach invalid handle");
watches_.Remove(state);
}
void UpdateHandle(AsyncHandle::State* state) { watches_.Update(state); }
void CancelHandle(AsyncHandle::State* state) {
pending_ops_.Remove(state);
watches_.Update(state);
}
AsyncLoop::ExitStatus RunOnce(int64_t timeout_ms, AsyncLoop& loop) {
assert(pending_ops_.empty());
/// An interrupt occured outside of this loop, return immediately.
if (interrupt_catcher_.get() && interrupt_catcher_->interrupted()) {
return ExitInterrupted;
}
/// Handle timeout.
bool has_timers = false;
int64_t timer_expiration_ms = timers_.ComputeNextExpiration();
if (timer_expiration_ms >= 0) {
has_timers = true;
int64_t now_ms = NowMs();
int64_t timer_timeout_ms = timer_expiration_ms - now_ms;
if (timer_timeout_ms < 0) {
timer_timeout_ms = 0;
}
if (timeout_ms < 0)
timeout_ms = timer_timeout_ms;
else if (timeout_ms > timer_timeout_ms)
timeout_ms = timer_timeout_ms;
}
const struct timespec* ts = NULL;
struct timespec timeout_ts = {};
if (timeout_ms >= 0) {
timeout_ts.tv_sec = static_cast<time_t>(timeout_ms / 1000);
timeout_ts.tv_nsec = static_cast<long>((timeout_ms % 1000) * 1000000LL);
ts = &timeout_ts;
}
/// Exit immediately if there is nothing to do and no timeout.
if (!watches_.HasWaiters() && pending_ops_.empty() && timeout_ms < 0)
return ExitIdle;
int ret = watches_.WaitForEvents(ts, pending_ops_);
if (ret == -1) {
return ExitInterrupted;
}
if (interrupt_catcher_.get()) {
interrupt_catcher_->HandlePendingInterrupt();
if (interrupt_catcher_->interrupted())
return ExitInterrupted;
}
ExitStatus result = ExitTimeout;
// Handle all pending operations.
// Note that OnEvent() may invoke a callback that changes the
// state of pending_ops_.
while (!pending_ops_.empty()) {
AsyncHandle::State* state = pending_ops_.back();
pending_ops_.pop_back();
if (state->OnEvent())
result = ExitSuccess;
}
if (has_timers && timers_.ProcessExpiration(NowMs()))
result = ExitSuccess;
return result;
}
void ClearInterrupt() {
if (interrupt_catcher_)
interrupt_catcher_->Clear();
}
int GetInterruptSignal() const {
if (!interrupt_catcher_)
return 0;
return interrupt_catcher_->interrupted();
}
sigset_t GetOldSignalMask() const {
if (!interrupt_catcher_) {
sigset_t old_mask;
sigset_t empty;
sigemptyset(&empty);
sigprocmask(SIG_BLOCK, &empty, &old_mask);
return old_mask;
}
return interrupt_catcher_->old_mask();
}
void EnableInterruptCatcher() {
interrupt_catcher_.reset(new InterruptCatcher());
}
void DisableInterruptCatcher() { interrupt_catcher_.reset(); }
private:
std::unique_ptr<InterruptCatcher> interrupt_catcher_;
AsyncLoopTimers timers_;
Watches watches_;
};
#endif // NINJA_ASYNC_LOOP_POSIX_H