blob: fe6f702ca8c0d55bf062e585628fc35c4f9b0e1a [file] [log] [blame]
// Copyright 2023 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef NINJA_ASYNC_LOOP_WIN32_H
#define NINJA_ASYNC_LOOP_WIN32_H
/// Win32 implementation of the AsyncLoop class.
#include <assert.h>
#include <list>
#include <unordered_map>
#include "async_loop.h"
#include "async_loop_timers.h"
#include "interrupt_handling.h"
#include "metrics.h" // GetTimeMillis()
#include "util.h" // Win32Fatal()
#include "win32port.h" // PRId64
// Set to 1 to add special code paths to add extra debugging checks.
#define DEBUG 0
#if DEBUG
#define DEBUG_OVERLAPPED_POINTERS 1
#else
#define DEBUG_OVERLAPPED_POINTERS 0
#endif
// Technical note on Win32 overlapped i/o:
//
// - Calling CreateIoCompletionPort() on a handle that
// was already associated with the same port returns
// ERROR_INVALID_PARAMETER.
//
// This means it is impossible to change the completion_key
// value for a handle once it has been set.
//
// - If a handle is associated with a completion port,
// calling DuplicateHandle() will create a new handle
// that is _already_ associated with the same completion port.
//
// This means that calling CreateIoCompletionPort(new_handle, ...)
// will fail with ERROR_INVALID_PARAMETER.
//
// But it also means the completion_key for both handles will
// always be the same. Thus this value becomes useless to
// distinguish handles in general. The only reliable way is to
// use the OVERLAPPED* pointer.
//
// - Even if an overlapped ReadFile() call succeeds immediately,
// a completion event will still be sent to the port for it.
// Same with WriteFile(), ConnectNamedPipe() and other Win32 I/O functions.
//
// The Win32-specific state of an AsyncHandle instance, without any
// AsyncLoop reference. Wraps an OVERLAPPED struct
struct AsyncHandle::State {
enum class Kind {
None = 0,
Read,
Write,
Connect,
Accept,
};
// Constructors.
explicit State(AsyncLoop& async_loop) : async_loop_(async_loop) {}
State(IpcHandle handle, AsyncLoop& async_loop, AsyncHandle::Callback&& callback)
: handle_(std::move(handle)), async_loop_(async_loop), callback_(std::move(callback)) {
if (handle_)
async_loop_.AttachHandle(this);
}
// Destructor.
~State() {
if (!completed_)
Cancel();
if (handle_) {
async_loop_.DetachHandle(this);
}
}
// Disallow copy operations. Even though this is implied by
// the use of moveable-only fields like |handle_|.
State(const State&) = delete;
State& operator=(const State&) = delete;
// Disable move operations, since the address of this instance's
// |overlapped_| field (which should equal the instance's address) is
// passed to the Windows kernel, and thus cannot change.
State(State&&) noexcept = delete;
State& operator=(State&&) noexcept = delete;
// Return AsyncLoop this instance belongs to.
AsyncLoop& async_loop() const { return async_loop_; }
AsyncLoop::Impl& loop() const { return *async_loop_.impl_; }
bool is_valid() const { return !!handle_; }
// Is an asynchronous operation running?
bool IsRunning() const { return !completed_; }
HANDLE native_handle() const { return handle_.native_handle(); }
// Cancel current asynchronous operation, if any.
void Cancel() {
if (!completed_ && kind_ != Kind::None) {
async_loop_.CancelHandle(this);
::CancelIoEx(handle_.native_handle(), &overlapped_);
completed_ = true;
}
}
// Reset the callback.
void ResetCallback(AsyncHandle::Callback&& cb) { callback_ = std::move(cb); }
void ResetHandle(IpcHandle handle) {
if (handle_) {
Cancel();
async_loop_.DetachHandle(this);
}
handle_ = std::move(handle);
if (handle_)
async_loop_.AttachHandle(this);
}
HANDLE ReleaseHandle() {
if (!handle_)
return INVALID_HANDLE_VALUE;
Cancel();
async_loop_.DetachHandle(this);
return handle_.ReleaseNativeHandle();
}
// Reset state to start a new asynchronous operation. Cancels current one
// if any.
void Reset(Kind kind = Kind::None) {
kind_ = kind;
if (!completed_) {
::CancelIoEx(handle_.native_handle(), &overlapped_);
completed_ = true;
}
error_ = 0;
actual_bytes_ = 0;
accept_handle_.Close();
}
// Start async read operation.
void StartRead(void* buffer, size_t size) {
Reset(Kind::Read);
completed_ = false;
if (!ReadFile(handle_.native_handle(), buffer, size, &actual_bytes_,
&overlapped_)) {
error_ = GetLastError();
completed_ = (error_ != ERROR_IO_PENDING);
}
async_loop_.UpdateHandle(this);
}
// Start async write operation.
void StartWrite(const void* buffer, size_t size) {
Reset(Kind::Write);
completed_ = false;
if (!::WriteFile(handle_.native_handle(), buffer, size, &actual_bytes_,
&overlapped_)) {
error_ = GetLastError();
completed_ = (error_ != ERROR_IO_PENDING);
}
async_loop_.UpdateHandle(this);
}
// Start async connect operation.
void StartConnect() {
Reset(Kind::Connect);
// Connection to named pipes is immediate on Win32.
completed_ = true;
async_loop_.UpdateHandle(this);
}
// Start async accept operation.
void StartAccept() {
// Note: duplicate the server handle here to be able to pass
// the result to the final AsyncHandle::TakeAcceptedHandle()
// call as a separate client connection handle.
//
// The duplicate is already associated with the i/o completion port
// and does not need to be attached to the AsyncLoop instance.
Reset(Kind::Accept);
completed_ = false;
accept_handle_ = handle_.Clone();
if (!ConnectNamedPipe(accept_handle_.native_handle(), &overlapped_)) {
error_ = GetLastError();
completed_ = (error_ != ERROR_IO_PENDING);
}
async_loop_.UpdateHandle(this);
}
// Called when a completion event is received. Do not invoke the callback
// yet though.
void OnCompletion(AsyncError error, size_t transfer_size) {
completed_ = true;
error_ = error;
actual_bytes_ = transfer_size;
}
// Invoke the callback.
void InvokeCallback() { callback_(error_, actual_bytes_); }
// Return accepted handle, if any.
IpcHandle TakeAcceptedHandle() { return std::move(accept_handle_); }
OVERLAPPED overlapped_ = {}; // Must be first!
IpcHandle handle_;
Kind kind_ = Kind::None;
bool completed_ = false;
DWORD error_ = 0;
DWORD actual_bytes_ = 0;
IpcHandle accept_handle_;
AsyncLoop& async_loop_;
AsyncHandle::Callback callback_;
};
// Win32-specific implementation of the AsyncLoop internal interface.
class AsyncLoop::Impl {
/// The list of pending AsyncHandle::States that have completed but whose
/// Invoke() method wasn't called yet. Identified by their address.
struct PendingList : public std::vector<AsyncHandle::State*> {
// Remove one async_id from the list.
bool Remove(AsyncHandle::State* async_op) {
for (auto it = begin(); it != end(); ++it) {
if (*it == async_op) {
// Order does not matter for this list, so just move last item
// to current location to avoid O(n) removal cost.
auto it_last = end() - 1;
if (it != it_last)
*it = *it_last;
pop_back();
return true;
}
}
return false;
}
};
PendingList pending_ops_;
// The number of AsyncHandle instances attached to this loop.
size_t attached_count_ = 0;
// The number of AsyncHandle instances waiting for an event.
size_t waiting_count_ = 0;
#if DEBUG_OVERLAPPED_POINTERS
// A map used to convert OVERLAPPED* pointer values to the address
// of a known AsyncHandle::State. Used for safety.
using OverlappedMap = std::unordered_map<OVERLAPPED*, AsyncHandle::State*>;
OverlappedMap overlapped_map_;
#endif
public:
~Impl() {
#if DEBUG_OVERLAPPED_POINTERS
assert(overlapped_map_.empty() &&
"Destroying AsyncLop before AsyncHandle!");
#endif
assert(timers_.empty() &&
"Destroying AsyncLoop before children AsyncTimer instances!");
}
AsyncLoopTimers& timers() { return timers_; }
void AttachHandle(AsyncHandle::State* async_op) {
assert(async_op && "Attaching null async op!");
assert(async_op->handle_ && "Attaching invalid handle!");
HANDLE handle = async_op->handle_.native_handle();
if (handle == INVALID_HANDLE_VALUE)
return;
auto key = reinterpret_cast<ULONG_PTR>(handle);
if (!CreateIoCompletionPort(handle, ioport_.get(), key, 0)) {
DWORD error = GetLastError();
// CreateIoCompletionPort() will return invalid parameter
// when trying to call it with the duplicate of a handle that
// was already associated with the port. Ignore it.
if (error != ERROR_INVALID_PARAMETER)
Win32Fatal("CreateIoCompletionPortRead");
}
#if DEBUG_OVERLAPPED_POINTERS
auto ret = overlapped_map_.emplace(&async_op->overlapped_, async_op);
assert(ret.second &&
"Adding AsyncHandle::State for known OVERLAPPED pointer!");
#endif // DEBUG_OVERLAPPED_POINTERS
attached_count_ += 1;
}
void DetachHandle(AsyncHandle::State* async_op) {
assert(async_op->handle_ && "Detaching invalid handle!");
if (!async_op->handle_)
return;
assert(attached_count_ > 0 &&
"Detaching AsyncHandle::State from empty AsyncLoop!");
attached_count_--;
#if DEBUG_OVERLAPPED_POINTERS
auto overlapped_it = overlapped_map_.find(&async_op->overlapped_);
assert(overlapped_it != overlapped_map_.end() &&
"Releasing AsyncHandle::State for unknown OVERLAPPED pointer!");
overlapped_map_.erase(overlapped_it);
#endif // DEBUG_OVERLAPPED_POINTERS
}
void UpdateHandle(AsyncHandle::State* state) {
if (state->completed_)
pending_ops_.push_back(state);
else
waiting_count_++;
}
void CancelHandle(AsyncHandle::State* astate) {
pending_ops_.Remove(astate);
assert(waiting_count_ > 0);
waiting_count_--;
}
ExitStatus RunOnce(int64_t timeout_ms, AsyncLoop& loop) {
// Any pending operations (e.g. from StartAsyncConnect()) means
// no timeout is needed.
if (!pending_ops_.empty())
timeout_ms = 0;
// Handle timeout.
bool has_timers = false;
int64_t timer_expiration_ms = timers_.ComputeNextExpiration();
if (timer_expiration_ms >= 0) {
has_timers = true;
int64_t timer_timeout_ms = timer_expiration_ms - NowMs();
if (timer_timeout_ms < 0)
timer_timeout_ms = 0;
if (timeout_ms < 0)
timeout_ms = timer_timeout_ms;
else if (timeout_ms > timer_timeout_ms)
timeout_ms = timer_timeout_ms;
}
// Do we have any async operation here?
if (timeout_ms < 0 && !waiting_count_)
return ExitIdle;
DWORD error = 0;
DWORD transfer_size = 0;
ULONG_PTR completion_key = 0;
bool received_key = true;
OVERLAPPED* overlapped_ptr = nullptr;
DWORD timeout = timeout_ms < 0 ? INFINITE : static_cast<DWORD>(timeout_ms);
if (!GetQueuedCompletionStatus(ioport_.get(), &transfer_size,
&completion_key, &overlapped_ptr, timeout)) {
error = GetLastError();
if (error == ERROR_BROKEN_PIPE) {
// Pass the error to the callback.
} else if (error == WAIT_TIMEOUT) {
received_key = false;
if (pending_ops_.empty() && !has_timers)
return ExitTimeout;
} else {
Win32Fatal("GetQueuedCompletionStatus");
}
}
if (received_key) {
// NotifyInterrupted is the only thing that posts a NULL completion key.
if (!completion_key)
return ExitInterrupted;
#if DEBUG_OVERLAPPED_POINTERS
auto overlapped_it = overlapped_map_.find(overlapped_ptr);
assert(overlapped_it != overlapped_map_.end() &&
"Received completion for unknown OVERLAPPED pointer!");
AsyncHandle::State* async_op = overlapped_it->second;
assert(async_op ==
reinterpret_cast<AsyncHandle::State*>(overlapped_ptr) &&
"Inconsistent OVERLAPPED pointer value!");
#else // !DEBUG_OVERLAPPED_POINTERS
AsyncHandle::State* async_op =
reinterpret_cast<AsyncHandle::State*>(overlapped_ptr);
#endif
async_op->OnCompletion(error, transfer_size);
assert(waiting_count_ > 0);
waiting_count_ -= 1;
pending_ops_.push_back(async_op);
}
// Handle all pending operations.
// Note that InvokeCallback() may invoke a callback that changes the
// state of pending_ops_.
while (!pending_ops_.empty()) {
AsyncHandle::State* async_op = pending_ops_.back();
pending_ops_.pop_back();
async_op->InvokeCallback();
}
if (has_timers && !timers_.ProcessExpiration(NowMs()))
return ExitTimeout;
return ExitSuccess;
}
void ClearInterrupt() {
// Nothing to do here.
}
void EnableInterruptCatcher() {
interrupt_handler_.reset(new InterruptCompletionPortHandler(ioport_.get()));
}
void DisableInterruptCatcher() { interrupt_handler_.reset(); }
private:
/// Helper class to create and close an I/O completion port handle
/// in the right order.
struct ScopedIoPort {
ScopedIoPort()
: handle_(::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1)) {
if (!handle_)
Win32Fatal("CreateIoCompletionPort");
}
~ScopedIoPort() { ::CloseHandle(handle_); }
HANDLE get() const { return handle_; }
private:
HANDLE handle_;
};
ScopedIoPort ioport_;
std::unique_ptr<InterruptCompletionPortHandler> interrupt_handler_;
AsyncLoopTimers timers_;
};
#endif // NINJA_ASYNC_LOOP_WIN32_H