| // Copyright 2023 Google Inc. All Rights Reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #ifndef NINJA_ASYNC_LOOP_POSIX_H |
| #define NINJA_ASYNC_LOOP_POSIX_H |
| |
| // Posix implementation of the AsyncLoop class. |
| // This contains specialized code paths for Linux ppoll() and MacOS kqueue() |
| |
| #include <assert.h> |
| #include <errno.h> |
| #include <fcntl.h> |
| #include <inttypes.h> |
| #include <poll.h> |
| #include <signal.h> |
| #include <stddef.h> |
| #include <string.h> |
| #include <sys/select.h> |
| #include <sys/socket.h> |
| #include <sys/un.h> |
| #include <unistd.h> |
| |
| #ifdef USE_KQUEUE |
| #include <sys/event.h> |
| #endif |
| |
| #include <algorithm> |
| #include <unordered_map> |
| |
| #include "async_loop.h" |
| #include "async_loop_timers.h" |
| #include "interrupt_handling.h" |
| #include "metrics.h" // For GetTimeMillis() |
| #include "util.h" // For Fatal() and Win32Fatal(). |
| |
| #ifdef USE_KQUEUE |
| // The know state of a kqueue read or write filter for a given file descriptor. |
| // Unknown means that the kernel queue does not know about this filter yet. |
| enum class KqueueFilterState { |
| Unknown = 0, |
| Disabled, |
| Enabled, |
| }; |
| #endif // USE_KQUEUE |
| |
| class AsyncHandle::State { |
| public: |
| enum class Kind { |
| None = 0, |
| Read, |
| Write, |
| Connect, |
| Accept, |
| }; |
| |
| // Constructors. |
| explicit State(AsyncLoop& async_loop) : async_loop_(async_loop) {} |
| |
| State(IpcHandle handle, AsyncLoop& async_loop, |
| AsyncHandle::Callback&& callback) |
| : fd_(std::move(handle)), callback_(std::move(callback)), |
| async_loop_(async_loop) { |
| if (fd_) |
| async_loop_.AttachHandle(this); |
| } |
| |
| // Destructor. Note that this type is unmoveable due to Win32 |
| // requirements. |
| ~State() { |
| if (fd_) |
| async_loop_.DetachHandle(this); |
| } |
| |
| // Disallow copy operations. Even though this is implied by |
| // the use of moveable-only fields like |fd_|. |
| State(const State&) = delete; |
| State& operator=(const State&) = delete; |
| |
| // Disallow move operations. The address of these instances are recorded |
| // in the |AsyncLoop::Impl::watches_| data structure. |
| State(State&&) noexcept = delete; |
| State& operator=(State&&) noexcept = delete; |
| |
| // Return AsyncLoop instance this instance belongs to. |
| AsyncLoop& async_loop() const { return async_loop_; } |
| |
| // Return AsyncLoop implementation instance. |
| AsyncLoop::Impl& loop() const { return *async_loop_.impl_; } |
| |
| // Return file descriptor. |
| int fd() const { return fd_.native_handle(); } |
| int native_handle() const { return fd(); } |
| |
| // Return true if the handle for this instnace is valid. |
| bool is_valid() const { return !!fd_; } |
| |
| // Release the native handle for this instance, making it invalid. |
| int ReleaseHandle() { |
| Reset(Kind::None); |
| if (fd_) |
| async_loop_.DetachHandle(this); |
| return fd_.ReleaseNativeHandle(); |
| } |
| |
| // Is an asynchronous operation running? |
| bool IsRunning() const { return kind_ != Kind::None && !completed_; } |
| |
| // Cancel current asynchronous operation, if any. |
| void Cancel() { |
| if (kind_ != Kind::None) { |
| async_loop_.CancelHandle(this); |
| Reset(Kind::None); |
| } |
| } |
| |
| // Reset the callback. |
| void ResetCallback(AsyncHandle::Callback&& cb) { callback_ = std::move(cb); } |
| |
| void ResetHandle(IpcHandle handle) { |
| Reset(Kind::None); |
| if (fd_) |
| async_loop_.DetachHandle(this); |
| fd_ = std::move(handle); |
| if (fd_) |
| async_loop_.AttachHandle(this); |
| } |
| |
| bool NeedsEvent() const { return !completed_ && kind_ != Kind::None; } |
| |
| bool NeedsWriteEvent() const { |
| return !completed_ && (kind_ == Kind::Write || kind_ == Kind::Connect); |
| } |
| |
| bool NeedsReadEvent() const { |
| return !completed_ && (kind_ == Kind::Read || kind_ == Kind::Accept); |
| } |
| |
| short poll_events() const { |
| if (completed_) |
| return 0; |
| if (kind_ == Kind::Write || kind_ == Kind::Connect) |
| return POLLOUT; |
| if (kind_ == Kind::Read || kind_ == Kind::Accept) |
| return POLLIN | POLLPRI; |
| return 0; |
| } |
| |
| short poll_revents() const { return poll_events() | POLLERR | POLLHUP; } |
| |
| // Reset state to start a new asynchronous operation. Cancels current one |
| // if any. |
| void Reset(Kind kind = Kind::None) { |
| kind_ = kind; |
| completed_ = false; |
| error_ = 0; |
| buffer_ = nullptr; |
| wanted_size_ = 0; |
| actual_size_ = 0; |
| accept_handle_.Close(); |
| } |
| |
| void StartRead(void* buffer, size_t size) { |
| Reset(Kind::Read); |
| buffer_ = buffer; |
| wanted_size_ = size; |
| if (!size) |
| completed_ = true; |
| else |
| async_loop().UpdateHandle(this); |
| } |
| |
| void StartWrite(const void* buffer, size_t size) { |
| Reset(Kind::Write); |
| buffer_ = const_cast<void*>(buffer); |
| wanted_size_ = size; |
| if (!size) |
| completed_ = true; |
| else |
| async_loop().UpdateHandle(this); |
| } |
| |
| void StartConnect() { |
| Reset(Kind::Connect); |
| async_loop().UpdateHandle(this); |
| } |
| |
| void StartAccept() { |
| Reset(Kind::Accept); |
| async_loop().UpdateHandle(this); |
| } |
| |
| bool OnEvent() { |
| error_ = 0; |
| actual_size_ = 0; |
| |
| assert(!completed_); |
| completed_ = true; |
| |
| switch (kind_) { |
| case Kind::Read: { |
| int ret; |
| do { |
| ret = ::read(fd_.native_handle(), buffer_, wanted_size_); |
| } while (ret < 0 && errno == EINTR); |
| if (ret < 0) { |
| error_ = errno; |
| if (error_ == EAGAIN || error_ == EWOULDBLOCK) { |
| // A spurious wakeup happened. |
| return false; |
| } |
| } else { |
| actual_size_ = static_cast<size_t>(ret); |
| } |
| break; |
| } |
| |
| case Kind::Write: { |
| int ret; |
| do { |
| ret = ::write(fd_.native_handle(), buffer_, wanted_size_); |
| } while (ret < 0 && errno == EINTR); |
| if (ret < 0) { |
| error_ = errno; |
| if (error_ == EAGAIN || error_ == EWOULDBLOCK) { |
| // A spurious wakeup happened. |
| return false; |
| } |
| } else { |
| actual_size_ = static_cast<size_t>(ret); |
| } |
| break; |
| } |
| |
| case Kind::Connect: { |
| int so_error = |
| IpcHandle::GetNativeAsyncConnectStatus(fd_.native_handle()); |
| if (so_error != 0) { |
| if (so_error == EAGAIN || so_error == EINPROGRESS) |
| return false; |
| |
| error_ = so_error; |
| } |
| break; |
| } |
| |
| case Kind::Accept: { |
| int ret; |
| do { |
| ret = ::accept(fd_.native_handle(), nullptr, 0); |
| } while (ret < 0 && errno == EINTR); |
| if (ret < 0) { |
| error_ = errno; |
| if (error_ == EAGAIN || error_ == EWOULDBLOCK) |
| return false; |
| } else { |
| accept_handle_ = IpcHandle(ret); |
| // NOTE: On Linux, client handle does not inherit O_NONBLOCK |
| // but it will on BSDs, so portable programs should set the |
| // flag explicitly. TODO(digit). |
| } |
| break; |
| } |
| |
| default: |
| assert(false && "Invalid runtime async op type!"); |
| return false; |
| } |
| async_loop_.UpdateHandle(this); |
| callback_(error_, actual_size_); |
| return true; |
| } |
| |
| IpcHandle TakeAcceptedHandle() { return std::move(accept_handle_); } |
| |
| IpcHandle fd_; |
| Kind kind_ = Kind::None; |
| bool completed_ = false; |
| #ifdef USE_KQUEUE |
| // These values are used exclusively by AsyncLoop::Impl::Watches |
| KqueueFilterState kqueue_read_filter_ = KqueueFilterState::Unknown; |
| KqueueFilterState kqueue_write_filter_ = KqueueFilterState::Unknown; |
| #endif // USE_KQUEUE |
| int error_ = 0; |
| void* buffer_ = nullptr; |
| size_t wanted_size_ = 0; |
| size_t actual_size_ = 0; |
| AsyncHandle::Callback callback_; |
| IpcHandle accept_handle_; |
| AsyncLoop& async_loop_; |
| }; |
| |
| class AsyncLoop::Impl { |
| /// The list of pending AsyncHandle::States that have completed but whose |
| /// Invoke() method wasn't called yet. Identified by their address. |
| struct PendingList : public std::vector<AsyncHandle::State*> { |
| // Remove one async state from the list. |
| bool Remove(AsyncHandle::State* astate) { |
| for (auto it = begin(); it != end(); ++it) { |
| if (*it == astate) { |
| // Order does not matter for this list, so just move last item |
| // to current location to avoid O(n) removal cost. |
| auto it_last = end() - 1; |
| if (it != it_last) |
| *it = *it_last; |
| pop_back(); |
| return true; |
| } |
| } |
| return false; |
| } |
| }; |
| |
| PendingList pending_ops_; |
| |
| #ifdef USE_KQUEUE |
| class Watches { |
| IpcHandle queue_ = -1; |
| std::vector<AsyncHandle::State*> states_; |
| std::vector<struct kevent> events_; |
| |
| static constexpr size_t npos = ~size_t(0); |
| |
| size_t FindState(AsyncHandle::State* state) const { |
| size_t result = 0; |
| for (auto it = states_.begin(); it != states_.end(); ++it, ++result) { |
| if (*it == state) |
| return result; |
| } |
| return npos; |
| } |
| |
| void InsertIntoQueue(AsyncHandle::State* state) { |
| // Do not create any filter yet, these will be added on demand |
| // during WaitForEvents(). |
| state->kqueue_read_filter_ = KqueueFilterState::Unknown; |
| state->kqueue_write_filter_ = KqueueFilterState::Unknown; |
| } |
| |
| void RemoveFromQueue(AsyncHandle::State* state) { |
| // Remove the filters directly, instead of delaying the operation |
| // to WaitForEvents(). |
| struct kevent events[2]; |
| int count = 0; |
| if (state->kqueue_read_filter_ != KqueueFilterState::Unknown) { |
| events[count++] = { |
| static_cast<uintptr_t>(state->fd()), |
| EVFILT_READ, |
| EV_DELETE, |
| 0, |
| 0, |
| state, |
| }; |
| } |
| if (state->kqueue_write_filter_ != KqueueFilterState::Unknown) { |
| events[count++] = { |
| static_cast<uintptr_t>(state->fd()), |
| EVFILT_WRITE, |
| EV_DELETE, |
| 0, |
| 0, |
| state, |
| }; |
| } |
| if (count > 0) { |
| int ret = kevent(queue_.native_handle(), events, count, NULL, 0, NULL); |
| if (ret < 0) |
| ErrnoFatal("kevent.Remove"); |
| } |
| } |
| |
| public: |
| Watches() : queue_(kqueue()) { |
| if (!queue_) |
| ErrnoFatal("kqueue()"); |
| } |
| |
| ~Watches() { |
| assert(states_.empty() && |
| "Destroying AsyncLoop before child AsyncHandle instances!"); |
| } |
| |
| bool HasWaiters() const { |
| for (const auto* state : states_) { |
| if (state->IsRunning()) |
| return true; |
| } |
| return false; |
| } |
| |
| void Insert(AsyncHandle::State* state) { |
| assert(FindState(state) == npos && "Async state already in the set!"); |
| states_.push_back(state); |
| InsertIntoQueue(state); |
| } |
| |
| void Remove(AsyncHandle::State* state) { |
| size_t state_pos = FindState(state); |
| assert(state_pos != npos && "Async state not in the set!"); |
| RemoveFromQueue(state); |
| // Order is not important |
| states_[state_pos] = states_.back(); |
| states_.pop_back(); |
| } |
| |
| void Update(AsyncHandle::State* state) { |
| assert(FindState(state) != npos && "Updating unknown async state!"); |
| // Do not do anything, changes will be computed on |
| // demand in the next WaitForEvents() call. |
| } |
| |
| int WaitForEvents(const struct timespec* ts, PendingList& pending_ops) { |
| events_.resize(2 * states_.size()); |
| |
| // Compute the changes to send to the kernel. |
| int num_changes = 0; |
| int num_events = 0; |
| |
| // Helper function to compute the set of flags to apply to |
| // a given Kqueue filter. Return 0 if there is no change to apply. |
| // Also updates num_events. |
| auto check_filter = [&num_events](KqueueFilterState& filter, |
| bool event_needed) -> uint16_t { |
| uint16_t flags = 0; |
| if (event_needed) { |
| num_events++; |
| if (filter != KqueueFilterState::Enabled) { |
| flags = EV_ENABLE | EV_CLEAR | |
| ((filter == KqueueFilterState::Unknown) ? EV_ADD : 0); |
| filter = KqueueFilterState::Enabled; |
| } |
| } else if (filter == KqueueFilterState::Enabled) { |
| flags = EV_DELETE; |
| filter = KqueueFilterState::Disabled; |
| } |
| return flags; |
| }; |
| |
| for (auto* state : states_) { |
| uint16_t flags = |
| check_filter(state->kqueue_read_filter_, state->NeedsReadEvent()); |
| if (flags) { |
| events_[num_changes++] = { |
| static_cast<uintptr_t>(state->fd()), |
| EVFILT_READ, |
| flags, |
| 0, |
| 0, |
| state, |
| }; |
| } |
| |
| flags = |
| check_filter(state->kqueue_write_filter_, state->NeedsWriteEvent()); |
| if (flags) { |
| events_[num_changes++] = { |
| static_cast<uintptr_t>(state->fd()), |
| EVFILT_WRITE, |
| flags, |
| 0, |
| 0, |
| state, |
| }; |
| } |
| } |
| |
| struct kevent* events = &events_.front(); |
| int ret = kevent(queue_.native_handle(), events, num_changes, events, |
| events_.size(), ts); |
| if (ret < 0) { |
| if (errno != EINTR) |
| ErrnoFatal("kevent.Wait"); |
| return -1; |
| } |
| |
| // kevent() always returns immediately if the size of the events array |
| // is 0, even if there is a timeout so use pselect() instead. |
| if (ret == 0 && num_events == 0) { |
| int ret = pselect(0, NULL, NULL, NULL, ts, NULL); |
| if (ret < 0 && errno != EINTR) |
| ErrnoFatal("pselect"); |
| return ret; |
| } |
| |
| struct kevent* event = &events_.front(); |
| for (int n = 0; n < ret; ++n, ++event) { |
| auto* state = static_cast<AsyncHandle::State*>(event->udata); |
| assert(state); |
| assert(static_cast<uintptr_t>(state->fd()) == event->ident); |
| |
| // Do not try to interpret EV_ERROR here, assume that the OnError() |
| // method will retry the syscall and get the error from it. |
| |
| if (event->filter == EVFILT_READ) { |
| assert(state->NeedsReadEvent()); |
| assert(state->kqueue_read_filter_ == KqueueFilterState::Enabled); |
| state->kqueue_read_filter_ = KqueueFilterState::Disabled; |
| pending_ops.push_back(state); |
| } else if (event->filter == EVFILT_WRITE) { |
| assert(state->NeedsWriteEvent()); |
| assert(state->kqueue_write_filter_ == KqueueFilterState::Enabled); |
| state->kqueue_write_filter_ = KqueueFilterState::Disabled; |
| pending_ops.push_back(state); |
| } |
| } |
| return ret; |
| } |
| }; |
| #elif defined(USE_PPOLL) |
| class Watches { |
| // Use two parallel vectors, where polls_[n] is an entry matching |
| // states_[n], since only one async operation can exist for each file |
| // descriptor. They must be separate because the address of polls_.front() |
| // will be passed to the kernel for ppoll(), which expects a contiguous |
| // array of pollfd items in memory. |
| std::vector<AsyncHandle::State*> states_; |
| std::vector<pollfd> polls_; |
| |
| size_t FindState(AsyncHandle::State* state) { |
| size_t result = 0; |
| for (auto* s : states_) { |
| if (s == state) |
| return result; |
| ++result; |
| } |
| return npos; |
| } |
| |
| public: |
| static constexpr size_t npos = ~size_t(0); |
| |
| ~Watches() { |
| assert(states_.empty() && |
| "Destroying AsyncLoop before child AsyncHandle instances!"); |
| } |
| |
| bool HasWaiters() const { |
| for (const auto& poll : polls_) { |
| if (poll.events != 0) |
| return true; |
| } |
| return false; |
| } |
| |
| void Insert(AsyncHandle::State* state) { |
| assert(FindState(state) == npos && "Async state already in the set!"); |
| states_.push_back(state); |
| polls_.push_back({ state->fd(), state->poll_events(), 0 }); |
| } |
| |
| void Remove(AsyncHandle::State* state) { |
| size_t state_pos = FindState(state); |
| assert(state_pos != npos && "Async state not in the set!"); |
| // Order is not important |
| size_t last_pos = states_.size() - 1; |
| if (state_pos != last_pos) { |
| states_[state_pos] = states_[last_pos]; |
| polls_[state_pos] = polls_[last_pos]; |
| } |
| states_.pop_back(); |
| polls_.pop_back(); |
| } |
| |
| void Update(AsyncHandle::State* state) { |
| size_t state_pos = FindState(state); |
| assert(state_pos != npos && "Updating unknown async state!"); |
| polls_[state_pos].events = state->poll_events(); |
| } |
| |
| int WaitForEvents(const struct timespec* ts, PendingList& pending_ops) { |
| int ret = ppoll(&polls_.front(), static_cast<nfds_t>(polls_.size()), ts, |
| nullptr); |
| if (ret < 0) { |
| if (errno != EINTR) |
| ErrnoFatal("ppoll"); |
| } else if (ret > 0) { |
| auto cur_poll = polls_.begin(); |
| auto cur_state = states_.begin(); |
| for (; cur_poll != polls_.end(); ++cur_poll, ++cur_state) { |
| AsyncHandle::State* state = *cur_state; |
| assert(state->fd() == cur_poll->fd); |
| if (cur_poll->revents & state->poll_revents()) |
| pending_ops.push_back(state); |
| } |
| } |
| return ret; |
| } |
| }; |
| |
| #else // !USE_KQUEUE && !USE_PPOLL |
| class Watches { |
| static constexpr int kInvalid = -2; |
| mutable int max_fds_ = kInvalid; |
| |
| using ListType = std::vector<AsyncHandle::State*>; |
| ListType states_; |
| fd_set read_fds_; |
| fd_set write_fds_; |
| fd_set event_read_fds_; |
| fd_set event_write_fds_; |
| |
| void Invalidate() { max_fds_ = kInvalid; } |
| |
| int num_fds() const { |
| // Recompute max file descriptor if needed. |
| if (max_fds_ == kInvalid) { |
| max_fds_ = -1; |
| for (AsyncHandle::State* state : states_) { |
| if (state->NeedsReadEvent() || state->NeedsWriteEvent()) { |
| if (state->fd() > max_fds_) |
| max_fds_ = state->fd(); |
| } |
| } |
| } |
| return max_fds_ + 1; |
| } |
| |
| ListType::iterator FindState(AsyncHandle::State* state) { |
| return std::find(states_.begin(), states_.end(), state); |
| } |
| |
| public: |
| Watches() { |
| FD_ZERO(&read_fds_); |
| FD_ZERO(&write_fds_); |
| } |
| |
| void Insert(AsyncHandle::State* state) { |
| if (state->fd() >= static_cast<int>(FD_SETSIZE)) |
| Fatal("File descriptor too large for pselect(): %d\n", state->fd()); |
| |
| auto it = FindState(state); |
| assert(it == states_.end() && "Async state already in the set!"); |
| if (it != states_.end()) |
| return; |
| states_.push_back(state); |
| if (state->NeedsReadEvent()) |
| FD_SET(state->fd(), &read_fds_); |
| if (state->NeedsWriteEvent()) |
| FD_SET(state->fd(), &write_fds_); |
| Invalidate(); |
| } |
| |
| void Remove(AsyncHandle::State* state) { |
| auto it = FindState(state); |
| assert(it != states_.end() && "Removing unknown Async state!"); |
| if (it == states_.end()) |
| return; |
| FD_CLR(state->fd(), &read_fds_); |
| FD_CLR(state->fd(), &write_fds_); |
| Invalidate(); |
| } |
| |
| void Update(AsyncHandle::State* state) { |
| int fd = state->fd(); |
| if (state->NeedsReadEvent()) { |
| FD_SET(fd, &read_fds_); |
| } else { |
| FD_CLR(fd, &read_fds_); |
| } |
| if (state->NeedsWriteEvent()) { |
| FD_SET(fd, &write_fds_); |
| } else { |
| FD_CLR(fd, &write_fds_); |
| } |
| } |
| |
| bool HasWaiters() const { |
| for (const auto* state : states_) { |
| if (state->IsRunning()) |
| return true; |
| } |
| return false; |
| } |
| |
| int WaitForEvents(const struct timespec* ts, PendingList& pending_ops) { |
| event_read_fds_ = read_fds_; |
| event_write_fds_ = write_fds_; |
| |
| int count = num_fds(); |
| int ret = pselect(count, &event_read_fds_, &event_write_fds_, nullptr, ts, |
| nullptr); |
| if (ret < 0) { |
| if (errno != EINTR) |
| ErrnoFatal("pselect"); |
| } else if (ret > 0) { |
| for (auto* state : states_) { |
| int fd = state->fd(); |
| bool has_event = false; |
| has_event |= |
| state->NeedsWriteEvent() && FD_ISSET(fd, &event_write_fds_); |
| has_event |= |
| state->NeedsReadEvent() && FD_ISSET(fd, &event_read_fds_); |
| if (has_event) |
| pending_ops.emplace_back(state); |
| } |
| } |
| return ret; |
| } |
| }; |
| #endif // !USE_KQUEUE && !USE_PPOLL |
| |
| public: |
| ~Impl() { |
| assert(!watches_.HasWaiters() && |
| "Destroying AsyncLoop before children AsyncHandle instances!"); |
| assert(timers_.empty() && |
| "Destroying AsyncLoop before children AsyncTimer instances!"); |
| } |
| |
| AsyncLoopTimers& timers() { return timers_; } |
| |
| void AttachHandle(AsyncHandle::State* state) { |
| assert(state->fd_ && "Trying to attach invalid handle"); |
| watches_.Insert(state); |
| } |
| |
| void DetachHandle(AsyncHandle::State* state) { |
| assert(state->fd_ && "Trying to detach invalid handle"); |
| watches_.Remove(state); |
| } |
| |
| void UpdateHandle(AsyncHandle::State* state) { watches_.Update(state); } |
| |
| void CancelHandle(AsyncHandle::State* state) { |
| pending_ops_.Remove(state); |
| watches_.Update(state); |
| } |
| |
| AsyncLoop::ExitStatus RunOnce(int64_t timeout_ms, AsyncLoop& loop) { |
| assert(pending_ops_.empty()); |
| |
| /// An interrupt occured outside of this loop, return immediately. |
| if (interrupt_catcher_.get() && interrupt_catcher_->interrupted()) { |
| return ExitInterrupted; |
| } |
| |
| /// Handle timeout. |
| bool has_timers = false; |
| int64_t timer_expiration_ms = timers_.ComputeNextExpiration(); |
| if (timer_expiration_ms >= 0) { |
| has_timers = true; |
| int64_t now_ms = NowMs(); |
| int64_t timer_timeout_ms = timer_expiration_ms - now_ms; |
| if (timer_timeout_ms < 0) { |
| timer_timeout_ms = 0; |
| } |
| |
| if (timeout_ms < 0) |
| timeout_ms = timer_timeout_ms; |
| else if (timeout_ms > timer_timeout_ms) |
| timeout_ms = timer_timeout_ms; |
| } |
| |
| const struct timespec* ts = NULL; |
| struct timespec timeout_ts = {}; |
| if (timeout_ms >= 0) { |
| timeout_ts.tv_sec = static_cast<time_t>(timeout_ms / 1000); |
| timeout_ts.tv_nsec = static_cast<long>((timeout_ms % 1000) * 1000000LL); |
| ts = &timeout_ts; |
| } |
| |
| /// Exit immediately if there is nothing to do and no timeout. |
| if (!watches_.HasWaiters() && pending_ops_.empty() && timeout_ms < 0) |
| return ExitIdle; |
| |
| int ret = watches_.WaitForEvents(ts, pending_ops_); |
| if (ret == -1) { |
| return ExitInterrupted; |
| } |
| |
| if (interrupt_catcher_.get()) { |
| interrupt_catcher_->HandlePendingInterrupt(); |
| if (interrupt_catcher_->interrupted()) |
| return ExitInterrupted; |
| } |
| |
| ExitStatus result = ExitTimeout; |
| |
| // Handle all pending operations. |
| // Note that OnEvent() may invoke a callback that changes the |
| // state of pending_ops_. |
| while (!pending_ops_.empty()) { |
| AsyncHandle::State* state = pending_ops_.back(); |
| pending_ops_.pop_back(); |
| if (state->OnEvent()) |
| result = ExitSuccess; |
| } |
| |
| if (has_timers && timers_.ProcessExpiration(NowMs())) |
| result = ExitSuccess; |
| |
| return result; |
| } |
| |
| void ClearInterrupt() { |
| if (interrupt_catcher_) |
| interrupt_catcher_->Clear(); |
| } |
| |
| int GetInterruptSignal() const { |
| if (!interrupt_catcher_) |
| return 0; |
| return interrupt_catcher_->interrupted(); |
| } |
| |
| sigset_t GetOldSignalMask() const { |
| if (!interrupt_catcher_) { |
| sigset_t old_mask; |
| sigset_t empty; |
| sigemptyset(&empty); |
| sigprocmask(SIG_BLOCK, &empty, &old_mask); |
| return old_mask; |
| } |
| return interrupt_catcher_->old_mask(); |
| } |
| |
| void EnableInterruptCatcher() { |
| interrupt_catcher_.reset(new InterruptCatcher()); |
| } |
| |
| void DisableInterruptCatcher() { interrupt_catcher_.reset(); } |
| |
| private: |
| std::unique_ptr<InterruptCatcher> interrupt_catcher_; |
| AsyncLoopTimers timers_; |
| Watches watches_; |
| }; |
| |
| #endif // NINJA_ASYNC_LOOP_POSIX_H |