blob: ce322258a1a44f99b63eaaef7b60b72dacdf4fed [file] [log] [blame]
/*
* Copyright (c) 2018 Apple Inc. All rights reserved.
*
* @APPLE_APACHE_LICENSE_HEADER_START@
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @APPLE_APACHE_LICENSE_HEADER_END@
*/
#include "internal.h"
#if DISPATCH_EVENT_BACKEND_WINDOWS
static HANDLE hPort = NULL;
enum _dispatch_windows_port {
DISPATCH_PORT_POKE = 0,
DISPATCH_PORT_TIMER_CLOCK_WALL,
DISPATCH_PORT_TIMER_CLOCK_UPTIME,
DISPATCH_PORT_TIMER_CLOCK_MONOTONIC,
DISPATCH_PORT_FILE_HANDLE,
DISPATCH_PORT_PIPE_HANDLE_READ,
DISPATCH_PORT_PIPE_HANDLE_WRITE,
DISPATCH_PORT_SOCKET_READ,
DISPATCH_PORT_SOCKET_WRITE,
};
enum _dispatch_muxnote_events {
DISPATCH_MUXNOTE_EVENT_READ = 1 << 0,
DISPATCH_MUXNOTE_EVENT_WRITE = 1 << 1,
};
#pragma mark dispatch_unote_t
typedef struct dispatch_muxnote_s {
LIST_ENTRY(dispatch_muxnote_s) dmn_list;
LIST_HEAD(, dispatch_unote_linkage_s) dmn_readers_head;
LIST_HEAD(, dispatch_unote_linkage_s) dmn_writers_head;
// This refcount solves a race condition that can happen with I/O completion
// ports. When we enqueue packets with muxnote pointers associated with
// them, it's possible that those packets might not be processed until after
// the event has been unregistered. We increment this upon creating a
// muxnote or posting to a completion port, and we decrement it upon
// unregistering the event or processing a packet. When it hits zero, we
// dispose the muxnote.
os_atomic(uintptr_t) dmn_refcount;
dispatch_unote_ident_t dmn_ident;
int8_t dmn_filter;
enum _dispatch_muxnote_handle_type {
DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID,
DISPATCH_MUXNOTE_HANDLE_TYPE_FILE,
DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE,
DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET,
} dmn_handle_type;
enum _dispatch_muxnote_events dmn_events;
// For pipes, this event is used to synchronize the monitoring thread with
// I/O completion port processing. For sockets, this is the event used with
// WSAEventSelect().
HANDLE dmn_event;
// Pipe monitoring thread control
HANDLE dmn_thread;
os_atomic(bool) dmn_stop;
// Socket events registered with WSAEventSelect()
long dmn_network_events;
// Threadpool wait handle for socket events
PTP_WAIT dmn_threadpool_wait;
} *dispatch_muxnote_t;
static LIST_HEAD(dispatch_muxnote_bucket_s, dispatch_muxnote_s)
_dispatch_sources[DSL_HASH_SIZE];
DISPATCH_ALWAYS_INLINE
static inline struct dispatch_muxnote_bucket_s *
_dispatch_unote_muxnote_bucket(uint32_t ident)
{
return &_dispatch_sources[DSL_HASH(ident)];
}
DISPATCH_ALWAYS_INLINE
static inline dispatch_muxnote_t
_dispatch_unote_muxnote_find(struct dispatch_muxnote_bucket_s *dmb,
dispatch_unote_ident_t ident, int8_t filter)
{
dispatch_muxnote_t dmn;
if (filter == EVFILT_WRITE) filter = EVFILT_READ;
LIST_FOREACH(dmn, dmb, dmn_list) {
if (dmn->dmn_ident == ident && dmn->dmn_filter == filter) {
break;
}
}
return dmn;
}
static dispatch_muxnote_t
_dispatch_muxnote_create(dispatch_unote_t du,
enum _dispatch_muxnote_events events)
{
dispatch_muxnote_t dmn;
int8_t filter = du._du->du_filter;
HANDLE handle = (HANDLE)du._du->du_ident;
dmn = _dispatch_calloc(1, sizeof(*dmn));
if (dmn == NULL) {
DISPATCH_INTERNAL_CRASH(0, "_dispatch_calloc");
}
os_atomic_store(&dmn->dmn_refcount, 1, relaxed);
dmn->dmn_ident = (dispatch_unote_ident_t)handle;
dmn->dmn_filter = filter;
dmn->dmn_events = events;
LIST_INIT(&dmn->dmn_readers_head);
LIST_INIT(&dmn->dmn_writers_head);
switch (filter) {
case EVFILT_SIGNAL:
WIN_PORT_ERROR();
free(dmn);
return NULL;
case EVFILT_WRITE:
case EVFILT_READ:
switch (GetFileType(handle)) {
case FILE_TYPE_UNKNOWN:
// ensure that an invalid handle was not passed
(void)dispatch_assume(GetLastError() == NO_ERROR);
DISPATCH_INTERNAL_CRASH(0, "unknown handle type");
case FILE_TYPE_REMOTE:
DISPATCH_INTERNAL_CRASH(0, "unused handle type");
case FILE_TYPE_CHAR:
// The specified file is a character file, typically a
// LPT device or a console.
WIN_PORT_ERROR();
free(dmn);
return NULL;
case FILE_TYPE_DISK:
// The specified file is a disk file
dmn->dmn_handle_type = DISPATCH_MUXNOTE_HANDLE_TYPE_FILE;
break;
case FILE_TYPE_PIPE:
// The specified file is a socket, a named pipe, or an
// anonymous pipe.
dmn->dmn_handle_type = _dispatch_handle_is_socket(handle)
? DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET
: DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE;
break;
}
break;
default:
DISPATCH_INTERNAL_CRASH(0, "unexpected filter");
}
return dmn;
}
static void
_dispatch_muxnote_disarm_events(dispatch_muxnote_t dmn,
enum _dispatch_muxnote_events events)
{
long lNetworkEvents;
dmn->dmn_events &= ~events;
switch (dmn->dmn_handle_type) {
case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID:
DISPATCH_INTERNAL_CRASH(0, "invalid handle");
case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE:
break;
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
if ((events & DISPATCH_MUXNOTE_EVENT_READ) && dmn->dmn_thread) {
// Keep trying to cancel ReadFile() until the thread exits
os_atomic_store(&dmn->dmn_stop, true, relaxed);
SetEvent(dmn->dmn_event);
do {
CancelIoEx((HANDLE)dmn->dmn_ident, /* lpOverlapped */ NULL);
} while (WaitForSingleObject(dmn->dmn_thread, 1) == WAIT_TIMEOUT);
CloseHandle(dmn->dmn_thread);
dmn->dmn_thread = NULL;
}
break;
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
lNetworkEvents = dmn->dmn_network_events;
if (events & DISPATCH_MUXNOTE_EVENT_READ) {
lNetworkEvents &= ~FD_READ;
}
if (events & DISPATCH_MUXNOTE_EVENT_WRITE) {
lNetworkEvents &= ~FD_WRITE;
}
if (lNetworkEvents == dmn->dmn_network_events) {
break;
}
int iResult;
if (lNetworkEvents & (FD_READ | FD_WRITE)) {
iResult = WSAEventSelect((SOCKET)dmn->dmn_ident,
(WSAEVENT)dmn->dmn_event, lNetworkEvents);
} else {
lNetworkEvents = 0;
iResult = WSAEventSelect((SOCKET)dmn->dmn_ident, NULL, 0);
}
if (iResult != 0) {
DISPATCH_INTERNAL_CRASH(WSAGetLastError(), "WSAEventSelect");
}
dmn->dmn_network_events = lNetworkEvents;
if (!lNetworkEvents && dmn->dmn_threadpool_wait) {
SetThreadpoolWait(dmn->dmn_threadpool_wait, NULL, NULL);
WaitForThreadpoolWaitCallbacks(dmn->dmn_threadpool_wait,
/* fCancelPendingCallbacks */ FALSE);
CloseThreadpoolWait(dmn->dmn_threadpool_wait);
dmn->dmn_threadpool_wait = NULL;
}
break;
}
}
static void
_dispatch_muxnote_dispose(dispatch_muxnote_t dmn)
{
if (dmn->dmn_thread || dmn->dmn_threadpool_wait) {
DISPATCH_INTERNAL_CRASH(0, "disposed a muxnote with an active thread");
}
if (dmn->dmn_event) {
CloseHandle(dmn->dmn_event);
}
free(dmn);
}
static void
_dispatch_muxnote_retain(dispatch_muxnote_t dmn)
{
uintptr_t refcount = os_atomic_inc(&dmn->dmn_refcount, relaxed);
if (refcount == 0) {
DISPATCH_INTERNAL_CRASH(0, "muxnote refcount overflow");
}
if (refcount == 1) {
DISPATCH_INTERNAL_CRASH(0, "retained a disposing muxnote");
}
}
static void
_dispatch_muxnote_release(dispatch_muxnote_t dmn)
{
uintptr_t refcount = os_atomic_dec(&dmn->dmn_refcount, relaxed);
if (refcount == 0) {
_dispatch_muxnote_dispose(dmn);
} else if (refcount == UINTPTR_MAX) {
DISPATCH_INTERNAL_CRASH(0, "muxnote refcount underflow");
}
}
static unsigned WINAPI
_dispatch_pipe_monitor_thread(void *context)
{
dispatch_muxnote_t dmn = (dispatch_muxnote_t)context;
HANDLE hPipe = (HANDLE)dmn->dmn_ident;
do {
char cBuffer[1];
DWORD dwNumberOfBytesTransferred;
OVERLAPPED ov = {0};
BOOL bSuccess = ReadFile(hPipe, cBuffer, /* nNumberOfBytesToRead */ 0,
&dwNumberOfBytesTransferred, &ov);
DWORD dwBytesAvailable;
DWORD dwError = GetLastError();
if (!bSuccess && dwError == ERROR_IO_PENDING) {
bSuccess = GetOverlappedResult(hPipe, &ov,
&dwNumberOfBytesTransferred, /* bWait */ TRUE);
dwError = GetLastError();
}
if (bSuccess) {
bSuccess = PeekNamedPipe(hPipe, NULL, 0, NULL, &dwBytesAvailable,
NULL);
dwError = GetLastError();
}
if (bSuccess) {
if (dwBytesAvailable == 0) {
// This can happen with a zero-byte write. Try again.
continue;
}
} else if (dwError == ERROR_NO_DATA) {
// The pipe is nonblocking. Try again.
Sleep(0);
continue;
} else {
_dispatch_debug("pipe[0x%llx]: GetLastError() returned %lu",
(long long)hPipe, dwError);
if (dwError == ERROR_OPERATION_ABORTED) {
continue;
}
os_atomic_store(&dmn->dmn_stop, true, relaxed);
dwBytesAvailable = 0;
}
// Make sure the muxnote stays alive until the packet is dequeued
_dispatch_muxnote_retain(dmn);
// The lpOverlapped parameter does not actually need to point to an
// OVERLAPPED struct. It's really just a pointer to pass back to
// GetQueuedCompletionStatus().
bSuccess = PostQueuedCompletionStatus(hPort,
dwBytesAvailable, (ULONG_PTR)DISPATCH_PORT_PIPE_HANDLE_READ,
(LPOVERLAPPED)dmn);
if (!bSuccess) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
// If data is written into the pipe and not read right away, ReadFile()
// will keep returning immediately and we'll flood the completion port.
// This event lets us synchronize with _dispatch_event_loop_drain() so
// that we only post events when it's ready for them.
WaitForSingleObject(dmn->dmn_event, INFINITE);
} while (!os_atomic_load(&dmn->dmn_stop, relaxed));
_dispatch_debug("pipe[0x%llx]: monitor exiting", (long long)hPipe);
return 0;
}
static DWORD
_dispatch_pipe_write_availability(HANDLE hPipe)
{
IO_STATUS_BLOCK iosb;
FILE_PIPE_LOCAL_INFORMATION fpli;
NTSTATUS status = _dispatch_NtQueryInformationFile(hPipe, &iosb, &fpli,
sizeof(fpli), FilePipeLocalInformation);
if (!NT_SUCCESS(status)) {
return 1;
}
return fpli.WriteQuotaAvailable;
}
static VOID CALLBACK
_dispatch_socket_callback(PTP_CALLBACK_INSTANCE inst, void *context,
PTP_WAIT pwa, TP_WAIT_RESULT res)
{
dispatch_muxnote_t dmn = (dispatch_muxnote_t)context;
SOCKET sock = (SOCKET)dmn->dmn_ident;
WSANETWORKEVENTS events;
if (WSAEnumNetworkEvents(sock, (WSAEVENT)dmn->dmn_event, &events) == 0) {
long lNetworkEvents = events.lNetworkEvents;
DWORD dwBytesAvailable = 1;
if (lNetworkEvents & FD_CLOSE) {
dwBytesAvailable = 0;
// Post to all registered read and write handlers
lNetworkEvents |= FD_READ | FD_WRITE;
} else if (lNetworkEvents & FD_READ) {
ioctlsocket(sock, FIONREAD, &dwBytesAvailable);
}
if (lNetworkEvents & FD_READ) {
_dispatch_muxnote_retain(dmn);
if (!PostQueuedCompletionStatus(hPort, dwBytesAvailable,
(ULONG_PTR)DISPATCH_PORT_SOCKET_READ, (LPOVERLAPPED)dmn)) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
if (lNetworkEvents & FD_WRITE) {
_dispatch_muxnote_retain(dmn);
if (!PostQueuedCompletionStatus(hPort, dwBytesAvailable,
(ULONG_PTR)DISPATCH_PORT_SOCKET_WRITE, (LPOVERLAPPED)dmn)) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
} else {
_dispatch_debug("socket[0x%llx]: WSAEnumNetworkEvents() failed (%d)",
(long long)sock, WSAGetLastError());
}
SetThreadpoolWait(pwa, dmn->dmn_event, /* pftTimeout */ NULL);
}
static BOOL
_dispatch_io_trigger(dispatch_muxnote_t dmn)
{
BOOL bSuccess;
long lNetworkEvents;
switch (dmn->dmn_handle_type) {
case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID:
DISPATCH_INTERNAL_CRASH(0, "invalid handle");
case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE:
_dispatch_muxnote_retain(dmn);
bSuccess = PostQueuedCompletionStatus(hPort, 0,
(ULONG_PTR)DISPATCH_PORT_FILE_HANDLE, (LPOVERLAPPED)dmn);
if (bSuccess == FALSE) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
break;
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
if ((dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) &&
!dmn->dmn_thread) {
dmn->dmn_thread = (HANDLE)_beginthreadex(/* security */ NULL,
/* stack_size */ 1, _dispatch_pipe_monitor_thread,
(void *)dmn, /* initflag */ 0, /* thrdaddr */ NULL);
if (!dmn->dmn_thread) {
DISPATCH_INTERNAL_CRASH(errno, "_beginthread");
}
dmn->dmn_event = CreateEventW(NULL, /* bManualReset */ FALSE,
/* bInitialState */ FALSE, NULL);
if (!dmn->dmn_event) {
DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateEventW");
}
}
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
_dispatch_muxnote_retain(dmn);
DWORD available;
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) {
// We can't query a pipe which has a read source open on it
// because the ReadFile() in the background thread might cause
// NtQueryInformationFile() to block
available = 1;
} else {
available =
_dispatch_pipe_write_availability((HANDLE)dmn->dmn_ident);
}
bSuccess = PostQueuedCompletionStatus(hPort, available,
(ULONG_PTR)DISPATCH_PORT_PIPE_HANDLE_WRITE,
(LPOVERLAPPED)dmn);
if (bSuccess == FALSE) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
break;
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
if (!dmn->dmn_event) {
dmn->dmn_event = CreateEventW(NULL, /* bManualReset */ FALSE,
/* bInitialState */ FALSE, NULL);
if (!dmn->dmn_event) {
DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateEventW");
}
}
if (!dmn->dmn_threadpool_wait) {
dmn->dmn_threadpool_wait = CreateThreadpoolWait(
_dispatch_socket_callback, dmn,
/* PTP_CALLBACK_ENVIRON */ NULL);
if (!dmn->dmn_threadpool_wait) {
DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateThreadpoolWait");
}
SetThreadpoolWait(dmn->dmn_threadpool_wait, dmn->dmn_event,
/* pftTimeout */ NULL);
}
lNetworkEvents = FD_CLOSE;
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) {
lNetworkEvents |= FD_READ;
}
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
lNetworkEvents |= FD_WRITE;
}
if (dmn->dmn_network_events != lNetworkEvents) {
if (WSAEventSelect((SOCKET)dmn->dmn_ident, (WSAEVENT)dmn->dmn_event,
lNetworkEvents) != 0) {
DISPATCH_INTERNAL_CRASH(WSAGetLastError(), "WSAEventSelect");
}
dmn->dmn_network_events = lNetworkEvents;
}
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
// FD_WRITE is edge-triggered, not level-triggered, so it will only
// be signaled if the socket becomes writable after a send() fails
// with WSAEWOULDBLOCK. We can work around this by performing a
// zero-byte send(). If the socket is writable, the send() will
// succeed and we can immediately post a packet, and if it isn't, it
// will fail with WSAEWOULDBLOCK and WSAEventSelect() will report
// the next time it becomes available.
if (send((SOCKET)dmn->dmn_ident, "", 0, 0) == 0) {
_dispatch_muxnote_retain(dmn);
bSuccess = PostQueuedCompletionStatus(hPort, 1,
(ULONG_PTR)DISPATCH_PORT_SOCKET_WRITE,
(LPOVERLAPPED)dmn);
if (bSuccess == FALSE) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
}
break;
}
return TRUE;
}
DISPATCH_ALWAYS_INLINE
static inline enum _dispatch_muxnote_events
_dispatch_unote_required_events(dispatch_unote_t du)
{
switch (du._du->du_filter) {
case DISPATCH_EVFILT_CUSTOM_ADD:
case DISPATCH_EVFILT_CUSTOM_OR:
case DISPATCH_EVFILT_CUSTOM_REPLACE:
return 0;
case EVFILT_WRITE:
return DISPATCH_MUXNOTE_EVENT_WRITE;
default:
return DISPATCH_MUXNOTE_EVENT_READ;
}
}
bool
_dispatch_unote_register_muxed(dispatch_unote_t du)
{
struct dispatch_muxnote_bucket_s *dmb;
dispatch_muxnote_t dmn;
enum _dispatch_muxnote_events events;
events = _dispatch_unote_required_events(du);
dmb = _dispatch_unote_muxnote_bucket(du._du->du_ident);
dmn = _dispatch_unote_muxnote_find(dmb, du._du->du_ident,
du._du->du_filter);
if (dmn) {
if (events & ~dmn->dmn_events) {
dmn->dmn_events |= events;
if (_dispatch_io_trigger(dmn) == FALSE) {
return false;
}
}
} else {
dmn = _dispatch_muxnote_create(du, events);
if (!dmn) {
return false;
}
if (_dispatch_io_trigger(dmn) == FALSE) {
_dispatch_muxnote_release(dmn);
return false;
}
LIST_INSERT_HEAD(dmb, dmn, dmn_list);
}
dispatch_unote_linkage_t dul = _dispatch_unote_get_linkage(du);
switch (dmn->dmn_handle_type) {
case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID:
DISPATCH_INTERNAL_CRASH(0, "invalid handle");
case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE:
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
if (events & DISPATCH_MUXNOTE_EVENT_READ) {
LIST_INSERT_HEAD(&dmn->dmn_readers_head, dul, du_link);
} else if (events & DISPATCH_MUXNOTE_EVENT_WRITE) {
LIST_INSERT_HEAD(&dmn->dmn_writers_head, dul, du_link);
}
break;
}
dul->du_muxnote = dmn;
_dispatch_unote_state_set(du, DISPATCH_WLH_ANON, DU_STATE_ARMED);
return true;
}
void
_dispatch_unote_resume_muxed(dispatch_unote_t du)
{
dispatch_unote_linkage_t dul = _dispatch_unote_get_linkage(du);
dispatch_muxnote_t dmn = dul->du_muxnote;
dispatch_assert(_dispatch_unote_registered(du));
_dispatch_io_trigger(dmn);
}
bool
_dispatch_unote_unregister_muxed(dispatch_unote_t du)
{
dispatch_unote_linkage_t dul = _dispatch_unote_get_linkage(du);
dispatch_muxnote_t dmn = dul->du_muxnote;
switch (dmn->dmn_handle_type) {
case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID:
DISPATCH_INTERNAL_CRASH(0, "invalid handle");
case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE:
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
LIST_REMOVE(dul, du_link);
_LIST_TRASH_ENTRY(dul, du_link);
break;
}
dul->du_muxnote = NULL;
enum _dispatch_muxnote_events disarmed = 0;
if (LIST_EMPTY(&dmn->dmn_readers_head)) {
disarmed |= DISPATCH_MUXNOTE_EVENT_READ;
}
if (LIST_EMPTY(&dmn->dmn_writers_head)) {
disarmed |= DISPATCH_MUXNOTE_EVENT_WRITE;
}
_dispatch_muxnote_disarm_events(dmn, disarmed);
if (!dmn->dmn_events) {
LIST_REMOVE(dmn, dmn_list);
_dispatch_muxnote_release(dmn);
}
_dispatch_unote_state_set(du, DU_STATE_UNREGISTERED);
return true;
}
static void
_dispatch_event_merge_file_handle(dispatch_muxnote_t dmn)
{
dispatch_unote_linkage_t dul, dul_next;
LIST_FOREACH_SAFE(dul, &dmn->dmn_readers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
// consumed by dux_merge_evt()
_dispatch_retain_unote_owner(du);
dispatch_assert(dux_needs_rearm(du._du));
_dispatch_unote_state_clear_bit(du, DU_STATE_ARMED);
os_atomic_store2o(du._dr, ds_pending_data, ~1, relaxed);
dux_merge_evt(du._du, EV_ADD | EV_ENABLE | EV_DISPATCH, 1, 0);
}
LIST_FOREACH_SAFE(dul, &dmn->dmn_writers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
// consumed by dux_merge_evt()
_dispatch_retain_unote_owner(du);
dispatch_assert(dux_needs_rearm(du._du));
_dispatch_unote_state_clear_bit(du, DU_STATE_ARMED);
os_atomic_store2o(du._dr, ds_pending_data, ~1, relaxed);
dux_merge_evt(du._du, EV_ADD | EV_ENABLE | EV_DISPATCH, 1, 0);
}
// Retained when posting the completion packet
_dispatch_muxnote_release(dmn);
}
static void
_dispatch_event_merge_pipe_handle_read(dispatch_muxnote_t dmn,
DWORD dwBytesAvailable)
{
dispatch_unote_linkage_t dul, dul_next;
LIST_FOREACH_SAFE(dul, &dmn->dmn_readers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
// consumed by dux_merge_evt()
_dispatch_retain_unote_owner(du);
dispatch_unote_state_t du_state = _dispatch_unote_state(du);
du_state &= ~DU_STATE_ARMED;
uintptr_t data = dwBytesAvailable;
uint32_t flags;
if (dwBytesAvailable > 0) {
flags = EV_ADD | EV_ENABLE | EV_DISPATCH;
} else {
du_state |= DU_STATE_NEEDS_DELETE;
flags = EV_DELETE | EV_DISPATCH;
}
_dispatch_unote_state_set(du, du_state);
os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed);
dux_merge_evt(du._du, flags, data, 0);
}
SetEvent(dmn->dmn_event);
// Retained when posting the completion packet
_dispatch_muxnote_release(dmn);
}
static void
_dispatch_event_merge_pipe_handle_write(dispatch_muxnote_t dmn,
DWORD dwBytesAvailable)
{
dispatch_unote_linkage_t dul, dul_next;
LIST_FOREACH_SAFE(dul, &dmn->dmn_writers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
// consumed by dux_merge_evt()
_dispatch_retain_unote_owner(du);
_dispatch_unote_state_clear_bit(du, DU_STATE_ARMED);
uintptr_t data = dwBytesAvailable;
if (dwBytesAvailable > 0) {
os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed);
} else {
os_atomic_store2o(du._dr, ds_pending_data, 0, relaxed);
}
dux_merge_evt(du._du, EV_ADD | EV_ENABLE | EV_DISPATCH, data, 0);
}
// Retained when posting the completion packet
_dispatch_muxnote_release(dmn);
}
static void
_dispatch_event_merge_socket(dispatch_unote_t du, DWORD dwBytesAvailable)
{
// consumed by dux_merge_evt()
_dispatch_retain_unote_owner(du);
dispatch_unote_state_t du_state = _dispatch_unote_state(du);
du_state &= ~DU_STATE_ARMED;
uintptr_t data = dwBytesAvailable;
uint32_t flags;
if (dwBytesAvailable > 0) {
flags = EV_ADD | EV_ENABLE | EV_DISPATCH;
} else {
du_state |= DU_STATE_NEEDS_DELETE;
flags = EV_DELETE | EV_DISPATCH;
}
_dispatch_unote_state_set(du, du_state);
os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed);
dux_merge_evt(du._du, flags, data, 0);
}
static void
_dispatch_event_merge_socket_read(dispatch_muxnote_t dmn,
DWORD dwBytesAvailable)
{
dispatch_unote_linkage_t dul, dul_next;
LIST_FOREACH_SAFE(dul, &dmn->dmn_readers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
_dispatch_event_merge_socket(du, dwBytesAvailable);
}
// Retained when posting the completion packet
_dispatch_muxnote_release(dmn);
}
static void
_dispatch_event_merge_socket_write(dispatch_muxnote_t dmn,
DWORD dwBytesAvailable)
{
dispatch_unote_linkage_t dul, dul_next;
LIST_FOREACH_SAFE(dul, &dmn->dmn_writers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
_dispatch_event_merge_socket(du, dwBytesAvailable);
}
// Retained when posting the completion packet
_dispatch_muxnote_release(dmn);
}
#pragma mark timers
typedef struct _dispatch_windows_timeout_s {
PTP_TIMER pTimer;
enum _dispatch_windows_port ullIdent;
bool bArmed;
} *dispatch_windows_timeout_t;
#define DISPATCH_WINDOWS_TIMEOUT_INITIALIZER(clock) \
[DISPATCH_CLOCK_##clock] = { \
.pTimer = NULL, \
.ullIdent = DISPATCH_PORT_TIMER_CLOCK_##clock, \
.bArmed = FALSE, \
}
static struct _dispatch_windows_timeout_s _dispatch_windows_timeout[] = {
DISPATCH_WINDOWS_TIMEOUT_INITIALIZER(WALL),
DISPATCH_WINDOWS_TIMEOUT_INITIALIZER(UPTIME),
DISPATCH_WINDOWS_TIMEOUT_INITIALIZER(MONOTONIC),
};
static void
_dispatch_event_merge_timer(dispatch_clock_t clock)
{
uint32_t tidx = DISPATCH_TIMER_INDEX(clock, 0);
_dispatch_windows_timeout[clock].bArmed = FALSE;
_dispatch_timers_heap_dirty(_dispatch_timers_heap, tidx);
_dispatch_timers_heap[tidx].dth_needs_program = true;
_dispatch_timers_heap[tidx].dth_armed = false;
}
static void CALLBACK
_dispatch_timer_callback(PTP_CALLBACK_INSTANCE Instance, PVOID Context,
PTP_TIMER Timer)
{
BOOL bSuccess;
bSuccess = PostQueuedCompletionStatus(hPort, 0, (ULONG_PTR)Context,
NULL);
if (bSuccess == FALSE) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
void
_dispatch_event_loop_timer_arm(dispatch_timer_heap_t dth DISPATCH_UNUSED,
uint32_t tidx, dispatch_timer_delay_s range,
dispatch_clock_now_cache_t nows)
{
dispatch_windows_timeout_t timer;
FILETIME ftDueTime;
LARGE_INTEGER liTime;
switch (DISPATCH_TIMER_CLOCK(tidx)) {
case DISPATCH_CLOCK_WALL:
timer = &_dispatch_windows_timeout[DISPATCH_CLOCK_WALL];
liTime.QuadPart = range.delay +
_dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx), nows);
break;
case DISPATCH_CLOCK_UPTIME:
case DISPATCH_CLOCK_MONOTONIC:
timer = &_dispatch_windows_timeout[DISPATCH_TIMER_CLOCK(tidx)];
liTime.QuadPart = -((range.delay + 99) / 100);
break;
}
if (timer->pTimer == NULL) {
timer->pTimer = CreateThreadpoolTimer(_dispatch_timer_callback,
(LPVOID)timer->ullIdent, NULL);
if (timer->pTimer == NULL) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"CreateThreadpoolTimer");
}
}
ftDueTime.dwHighDateTime = liTime.HighPart;
ftDueTime.dwLowDateTime = liTime.LowPart;
SetThreadpoolTimer(timer->pTimer, &ftDueTime, /*msPeriod=*/0,
/*msWindowLength=*/0);
timer->bArmed = TRUE;
}
void
_dispatch_event_loop_timer_delete(dispatch_timer_heap_t dth DISPATCH_UNUSED,
uint32_t tidx)
{
dispatch_windows_timeout_t timer;
switch (DISPATCH_TIMER_CLOCK(tidx)) {
case DISPATCH_CLOCK_WALL:
timer = &_dispatch_windows_timeout[DISPATCH_CLOCK_WALL];
break;
case DISPATCH_CLOCK_UPTIME:
case DISPATCH_CLOCK_MONOTONIC:
timer = &_dispatch_windows_timeout[DISPATCH_TIMER_CLOCK(tidx)];
break;
}
SetThreadpoolTimer(timer->pTimer, NULL, /*msPeriod=*/0,
/*msWindowLength=*/0);
timer->bArmed = FALSE;
}
#pragma mark dispatch_loop
static void
_dispatch_windows_port_init(void *context DISPATCH_UNUSED)
{
hPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);
if (hPort == NULL) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"CreateIoCompletionPort");
}
#if DISPATCH_USE_MGR_THREAD
_dispatch_trace_item_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q);
dx_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q, 0);
#endif
}
void
_dispatch_event_loop_poke(dispatch_wlh_t wlh DISPATCH_UNUSED,
uint64_t dq_state DISPATCH_UNUSED, uint32_t flags DISPATCH_UNUSED)
{
static dispatch_once_t _dispatch_windows_port_init_pred;
BOOL bSuccess;
dispatch_once_f(&_dispatch_windows_port_init_pred, NULL,
_dispatch_windows_port_init);
bSuccess = PostQueuedCompletionStatus(hPort, 0, DISPATCH_PORT_POKE,
NULL);
(void)dispatch_assume(bSuccess);
}
DISPATCH_NOINLINE
void
_dispatch_event_loop_drain(uint32_t flags)
{
DWORD dwNumberOfBytesTransferred;
ULONG_PTR ulCompletionKey;
LPOVERLAPPED pOV;
BOOL bSuccess;
pOV = (LPOVERLAPPED)&pOV;
bSuccess = GetQueuedCompletionStatus(hPort, &dwNumberOfBytesTransferred,
&ulCompletionKey, &pOV,
(flags & KEVENT_FLAG_IMMEDIATE) ? 0 : INFINITE);
while (bSuccess) {
switch (ulCompletionKey) {
case DISPATCH_PORT_POKE:
break;
case DISPATCH_PORT_TIMER_CLOCK_WALL:
_dispatch_event_merge_timer(DISPATCH_CLOCK_WALL);
break;
case DISPATCH_PORT_TIMER_CLOCK_UPTIME:
_dispatch_event_merge_timer(DISPATCH_CLOCK_UPTIME);
break;
case DISPATCH_PORT_TIMER_CLOCK_MONOTONIC:
_dispatch_event_merge_timer(DISPATCH_CLOCK_MONOTONIC);
break;
case DISPATCH_PORT_FILE_HANDLE:
_dispatch_event_merge_file_handle((dispatch_muxnote_t)pOV);
break;
case DISPATCH_PORT_PIPE_HANDLE_READ:
_dispatch_event_merge_pipe_handle_read((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
break;
case DISPATCH_PORT_PIPE_HANDLE_WRITE:
_dispatch_event_merge_pipe_handle_write((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
break;
case DISPATCH_PORT_SOCKET_READ:
_dispatch_event_merge_socket_read((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
break;
case DISPATCH_PORT_SOCKET_WRITE:
_dispatch_event_merge_socket_write((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
break;
default:
DISPATCH_INTERNAL_CRASH(ulCompletionKey,
"unsupported completion key");
}
bSuccess = GetQueuedCompletionStatus(hPort,
&dwNumberOfBytesTransferred, &ulCompletionKey, &pOV, 0);
}
if (bSuccess == FALSE && pOV != NULL) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"GetQueuedCompletionStatus");
}
}
void
_dispatch_event_loop_cancel_waiter(dispatch_sync_context_t dsc DISPATCH_UNUSED)
{
WIN_PORT_ERROR();
}
void
_dispatch_event_loop_wake_owner(dispatch_sync_context_t dsc,
dispatch_wlh_t wlh, uint64_t old_state, uint64_t new_state)
{
(void)dsc; (void)wlh; (void)old_state; (void)new_state;
}
void
_dispatch_event_loop_wait_for_ownership(dispatch_sync_context_t dsc)
{
if (dsc->dsc_release_storage) {
_dispatch_queue_release_storage(dsc->dc_data);
}
}
void
_dispatch_event_loop_end_ownership(dispatch_wlh_t wlh, uint64_t old_state,
uint64_t new_state, uint32_t flags)
{
(void)wlh; (void)old_state; (void)new_state; (void)flags;
}
#if DISPATCH_WLH_DEBUG
void
_dispatch_event_loop_assert_not_owned(dispatch_wlh_t wlh)
{
(void)wlh;
}
#endif
void
_dispatch_event_loop_leave_immediate(uint64_t dq_state)
{
(void)dq_state;
}
#endif // DISPATCH_EVENT_BACKEND_WINDOWS