| // Copyright 2023 Google Inc. All Rights Reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #ifndef NINJA_ASYNC_LOOP_WIN32_H |
| #define NINJA_ASYNC_LOOP_WIN32_H |
| |
| /// Win32 implementation of the AsyncLoop class. |
| |
| #include <assert.h> |
| |
| #include <list> |
| #include <unordered_map> |
| |
| #include "async_loop.h" |
| #include "async_loop_timers.h" |
| #include "interrupt_handling.h" |
| #include "metrics.h" // GetTimeMillis() |
| #include "util.h" // Win32Fatal() |
| #include "win32port.h" // PRId64 |
| |
| // Set to 1 to add special code paths to add extra debugging checks. |
| #define DEBUG 0 |
| |
| #if DEBUG |
| #define DEBUG_OVERLAPPED_POINTERS 1 |
| #else |
| #define DEBUG_OVERLAPPED_POINTERS 0 |
| #endif |
| |
| // Technical note on Win32 overlapped i/o: |
| // |
| // - Calling CreateIoCompletionPort() on a handle that |
| // was already associated with the same port returns |
| // ERROR_INVALID_PARAMETER. |
| // |
| // This means it is impossible to change the completion_key |
| // value for a handle once it has been set. |
| // |
| // - If a handle is associated with a completion port, |
| // calling DuplicateHandle() will create a new handle |
| // that is _already_ associated with the same completion port. |
| // |
| // This means that calling CreateIoCompletionPort(new_handle, ...) |
| // will fail with ERROR_INVALID_PARAMETER. |
| // |
| // But it also means the completion_key for both handles will |
| // always be the same. Thus this value becomes useless to |
| // distinguish handles in general. The only reliable way is to |
| // use the OVERLAPPED* pointer. |
| // |
| // - Even if an overlapped ReadFile() call succeeds immediately, |
| // a completion event will still be sent to the port for it. |
| // Same with WriteFile(), ConnectNamedPipe() and other Win32 I/O functions. |
| // |
| |
| // The Win32-specific state of an AsyncHandle instance, without any |
| // AsyncLoop reference. Wraps an OVERLAPPED struct |
| struct AsyncHandle::State { |
| enum class Kind { |
| None = 0, |
| Read, |
| Write, |
| Connect, |
| Accept, |
| }; |
| |
| // Constructors. |
| explicit State(AsyncLoop& async_loop) : async_loop_(async_loop) {} |
| |
| State(IpcHandle handle, AsyncLoop& async_loop, AsyncHandle::Callback&& callback) |
| : handle_(std::move(handle)), async_loop_(async_loop), callback_(std::move(callback)) { |
| if (handle_) |
| async_loop_.AttachHandle(this); |
| } |
| |
| // Destructor. |
| ~State() { |
| if (!completed_) |
| Cancel(); |
| if (handle_) { |
| async_loop_.DetachHandle(this); |
| } |
| } |
| |
| // Disallow copy operations. Even though this is implied by |
| // the use of moveable-only fields like |handle_|. |
| State(const State&) = delete; |
| State& operator=(const State&) = delete; |
| |
| // Disable move operations, since the address of this instance's |
| // |overlapped_| field (which should equal the instance's address) is |
| // passed to the Windows kernel, and thus cannot change. |
| State(State&&) noexcept = delete; |
| State& operator=(State&&) noexcept = delete; |
| |
| // Return AsyncLoop this instance belongs to. |
| AsyncLoop& async_loop() const { return async_loop_; } |
| AsyncLoop::Impl& loop() const { return *async_loop_.impl_; } |
| |
| bool is_valid() const { return !!handle_; } |
| |
| // Is an asynchronous operation running? |
| bool IsRunning() const { return !completed_; } |
| |
| HANDLE native_handle() const { return handle_.native_handle(); } |
| |
| // Cancel current asynchronous operation, if any. |
| void Cancel() { |
| if (!completed_ && kind_ != Kind::None) { |
| async_loop_.CancelHandle(this); |
| ::CancelIoEx(handle_.native_handle(), &overlapped_); |
| completed_ = true; |
| } |
| } |
| |
| // Reset the callback. |
| void ResetCallback(AsyncHandle::Callback&& cb) { callback_ = std::move(cb); } |
| |
| void ResetHandle(IpcHandle handle) { |
| if (handle_) { |
| Cancel(); |
| async_loop_.DetachHandle(this); |
| } |
| handle_ = std::move(handle); |
| if (handle_) |
| async_loop_.AttachHandle(this); |
| } |
| |
| HANDLE ReleaseHandle() { |
| if (!handle_) |
| return INVALID_HANDLE_VALUE; |
| |
| Cancel(); |
| async_loop_.DetachHandle(this); |
| return handle_.ReleaseNativeHandle(); |
| } |
| |
| // Reset state to start a new asynchronous operation. Cancels current one |
| // if any. |
| void Reset(Kind kind = Kind::None) { |
| kind_ = kind; |
| if (!completed_) { |
| ::CancelIoEx(handle_.native_handle(), &overlapped_); |
| completed_ = true; |
| } |
| error_ = 0; |
| actual_bytes_ = 0; |
| accept_handle_.Close(); |
| } |
| |
| // Start async read operation. |
| void StartRead(void* buffer, size_t size) { |
| Reset(Kind::Read); |
| completed_ = false; |
| if (!ReadFile(handle_.native_handle(), buffer, size, &actual_bytes_, |
| &overlapped_)) { |
| error_ = GetLastError(); |
| completed_ = (error_ != ERROR_IO_PENDING); |
| } |
| async_loop_.UpdateHandle(this); |
| } |
| |
| // Start async write operation. |
| void StartWrite(const void* buffer, size_t size) { |
| Reset(Kind::Write); |
| completed_ = false; |
| if (!::WriteFile(handle_.native_handle(), buffer, size, &actual_bytes_, |
| &overlapped_)) { |
| error_ = GetLastError(); |
| completed_ = (error_ != ERROR_IO_PENDING); |
| } |
| async_loop_.UpdateHandle(this); |
| } |
| |
| // Start async connect operation. |
| void StartConnect() { |
| Reset(Kind::Connect); |
| // Connection to named pipes is immediate on Win32. |
| completed_ = true; |
| async_loop_.UpdateHandle(this); |
| } |
| |
| // Start async accept operation. |
| void StartAccept() { |
| // Note: duplicate the server handle here to be able to pass |
| // the result to the final AsyncHandle::TakeAcceptedHandle() |
| // call as a separate client connection handle. |
| // |
| // The duplicate is already associated with the i/o completion port |
| // and does not need to be attached to the AsyncLoop instance. |
| Reset(Kind::Accept); |
| completed_ = false; |
| accept_handle_ = handle_.Clone(); |
| |
| if (!ConnectNamedPipe(accept_handle_.native_handle(), &overlapped_)) { |
| error_ = GetLastError(); |
| completed_ = (error_ != ERROR_IO_PENDING); |
| } |
| async_loop_.UpdateHandle(this); |
| } |
| |
| // Called when a completion event is received. Do not invoke the callback |
| // yet though. |
| void OnCompletion(AsyncError error, size_t transfer_size) { |
| completed_ = true; |
| error_ = error; |
| actual_bytes_ = transfer_size; |
| } |
| |
| // Invoke the callback. |
| void InvokeCallback() { callback_(error_, actual_bytes_); } |
| |
| // Return accepted handle, if any. |
| IpcHandle TakeAcceptedHandle() { return std::move(accept_handle_); } |
| |
| OVERLAPPED overlapped_ = {}; // Must be first! |
| IpcHandle handle_; |
| Kind kind_ = Kind::None; |
| bool completed_ = false; |
| DWORD error_ = 0; |
| DWORD actual_bytes_ = 0; |
| IpcHandle accept_handle_; |
| AsyncLoop& async_loop_; |
| AsyncHandle::Callback callback_; |
| }; |
| |
| // Win32-specific implementation of the AsyncLoop internal interface. |
| class AsyncLoop::Impl { |
| /// The list of pending AsyncHandle::States that have completed but whose |
| /// Invoke() method wasn't called yet. Identified by their address. |
| struct PendingList : public std::vector<AsyncHandle::State*> { |
| // Remove one async_id from the list. |
| bool Remove(AsyncHandle::State* async_op) { |
| for (auto it = begin(); it != end(); ++it) { |
| if (*it == async_op) { |
| // Order does not matter for this list, so just move last item |
| // to current location to avoid O(n) removal cost. |
| auto it_last = end() - 1; |
| if (it != it_last) |
| *it = *it_last; |
| pop_back(); |
| return true; |
| } |
| } |
| return false; |
| } |
| }; |
| |
| PendingList pending_ops_; |
| |
| // The number of AsyncHandle instances attached to this loop. |
| size_t attached_count_ = 0; |
| |
| // The number of AsyncHandle instances waiting for an event. |
| size_t waiting_count_ = 0; |
| |
| #if DEBUG_OVERLAPPED_POINTERS |
| // A map used to convert OVERLAPPED* pointer values to the address |
| // of a known AsyncHandle::State. Used for safety. |
| using OverlappedMap = std::unordered_map<OVERLAPPED*, AsyncHandle::State*>; |
| OverlappedMap overlapped_map_; |
| #endif |
| |
| public: |
| ~Impl() { |
| #if DEBUG_OVERLAPPED_POINTERS |
| assert(overlapped_map_.empty() && |
| "Destroying AsyncLop before AsyncHandle!"); |
| #endif |
| assert(timers_.empty() && |
| "Destroying AsyncLoop before children AsyncTimer instances!"); |
| } |
| |
| AsyncLoopTimers& timers() { return timers_; } |
| |
| void AttachHandle(AsyncHandle::State* async_op) { |
| assert(async_op && "Attaching null async op!"); |
| assert(async_op->handle_ && "Attaching invalid handle!"); |
| |
| HANDLE handle = async_op->handle_.native_handle(); |
| if (handle == INVALID_HANDLE_VALUE) |
| return; |
| |
| auto key = reinterpret_cast<ULONG_PTR>(handle); |
| if (!CreateIoCompletionPort(handle, ioport_.get(), key, 0)) { |
| DWORD error = GetLastError(); |
| // CreateIoCompletionPort() will return invalid parameter |
| // when trying to call it with the duplicate of a handle that |
| // was already associated with the port. Ignore it. |
| if (error != ERROR_INVALID_PARAMETER) |
| Win32Fatal("CreateIoCompletionPortRead"); |
| } |
| |
| #if DEBUG_OVERLAPPED_POINTERS |
| auto ret = overlapped_map_.emplace(&async_op->overlapped_, async_op); |
| assert(ret.second && |
| "Adding AsyncHandle::State for known OVERLAPPED pointer!"); |
| #endif // DEBUG_OVERLAPPED_POINTERS |
| |
| attached_count_ += 1; |
| } |
| |
| void DetachHandle(AsyncHandle::State* async_op) { |
| assert(async_op->handle_ && "Detaching invalid handle!"); |
| if (!async_op->handle_) |
| return; |
| |
| assert(attached_count_ > 0 && |
| "Detaching AsyncHandle::State from empty AsyncLoop!"); |
| attached_count_--; |
| |
| #if DEBUG_OVERLAPPED_POINTERS |
| auto overlapped_it = overlapped_map_.find(&async_op->overlapped_); |
| assert(overlapped_it != overlapped_map_.end() && |
| "Releasing AsyncHandle::State for unknown OVERLAPPED pointer!"); |
| overlapped_map_.erase(overlapped_it); |
| #endif // DEBUG_OVERLAPPED_POINTERS |
| } |
| |
| void UpdateHandle(AsyncHandle::State* state) { |
| if (state->completed_) |
| pending_ops_.push_back(state); |
| else |
| waiting_count_++; |
| } |
| |
| void CancelHandle(AsyncHandle::State* astate) { |
| pending_ops_.Remove(astate); |
| assert(waiting_count_ > 0); |
| waiting_count_--; |
| } |
| |
| ExitStatus RunOnce(int64_t timeout_ms, AsyncLoop& loop) { |
| // Any pending operations (e.g. from StartAsyncConnect()) means |
| // no timeout is needed. |
| if (!pending_ops_.empty()) |
| timeout_ms = 0; |
| |
| // Handle timeout. |
| bool has_timers = false; |
| int64_t timer_expiration_ms = timers_.ComputeNextExpiration(); |
| if (timer_expiration_ms >= 0) { |
| has_timers = true; |
| int64_t timer_timeout_ms = timer_expiration_ms - NowMs(); |
| if (timer_timeout_ms < 0) |
| timer_timeout_ms = 0; |
| |
| if (timeout_ms < 0) |
| timeout_ms = timer_timeout_ms; |
| else if (timeout_ms > timer_timeout_ms) |
| timeout_ms = timer_timeout_ms; |
| } |
| |
| // Do we have any async operation here? |
| if (timeout_ms < 0 && !waiting_count_) |
| return ExitIdle; |
| |
| DWORD error = 0; |
| DWORD transfer_size = 0; |
| ULONG_PTR completion_key = 0; |
| bool received_key = true; |
| OVERLAPPED* overlapped_ptr = nullptr; |
| DWORD timeout = timeout_ms < 0 ? INFINITE : static_cast<DWORD>(timeout_ms); |
| if (!GetQueuedCompletionStatus(ioport_.get(), &transfer_size, |
| &completion_key, &overlapped_ptr, timeout)) { |
| error = GetLastError(); |
| if (error == ERROR_BROKEN_PIPE) { |
| // Pass the error to the callback. |
| } else if (error == WAIT_TIMEOUT) { |
| received_key = false; |
| if (pending_ops_.empty() && !has_timers) |
| return ExitTimeout; |
| } else { |
| Win32Fatal("GetQueuedCompletionStatus"); |
| } |
| } |
| |
| if (received_key) { |
| // NotifyInterrupted is the only thing that posts a NULL completion key. |
| if (!completion_key) |
| return ExitInterrupted; |
| |
| #if DEBUG_OVERLAPPED_POINTERS |
| auto overlapped_it = overlapped_map_.find(overlapped_ptr); |
| assert(overlapped_it != overlapped_map_.end() && |
| "Received completion for unknown OVERLAPPED pointer!"); |
| AsyncHandle::State* async_op = overlapped_it->second; |
| assert(async_op == |
| reinterpret_cast<AsyncHandle::State*>(overlapped_ptr) && |
| "Inconsistent OVERLAPPED pointer value!"); |
| #else // !DEBUG_OVERLAPPED_POINTERS |
| AsyncHandle::State* async_op = |
| reinterpret_cast<AsyncHandle::State*>(overlapped_ptr); |
| #endif |
| async_op->OnCompletion(error, transfer_size); |
| assert(waiting_count_ > 0); |
| waiting_count_ -= 1; |
| pending_ops_.push_back(async_op); |
| } |
| |
| // Handle all pending operations. |
| // Note that InvokeCallback() may invoke a callback that changes the |
| // state of pending_ops_. |
| while (!pending_ops_.empty()) { |
| AsyncHandle::State* async_op = pending_ops_.back(); |
| pending_ops_.pop_back(); |
| async_op->InvokeCallback(); |
| } |
| |
| if (has_timers && !timers_.ProcessExpiration(NowMs())) |
| return ExitTimeout; |
| |
| return ExitSuccess; |
| } |
| |
| void ClearInterrupt() { |
| // Nothing to do here. |
| } |
| |
| void EnableInterruptCatcher() { |
| interrupt_handler_.reset(new InterruptCompletionPortHandler(ioport_.get())); |
| } |
| |
| void DisableInterruptCatcher() { interrupt_handler_.reset(); } |
| |
| private: |
| /// Helper class to create and close an I/O completion port handle |
| /// in the right order. |
| struct ScopedIoPort { |
| ScopedIoPort() |
| : handle_(::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1)) { |
| if (!handle_) |
| Win32Fatal("CreateIoCompletionPort"); |
| } |
| |
| ~ScopedIoPort() { ::CloseHandle(handle_); } |
| |
| HANDLE get() const { return handle_; } |
| |
| private: |
| HANDLE handle_; |
| }; |
| |
| ScopedIoPort ioport_; |
| std::unique_ptr<InterruptCompletionPortHandler> interrupt_handler_; |
| AsyncLoopTimers timers_; |
| }; |
| |
| #endif // NINJA_ASYNC_LOOP_WIN32_H |