blob: d4656eb43b9dee90ec07790c87ea1f5ea7dce75c [file] [log] [blame]
//===--- Task.cpp - Task object and management ----------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
//
// Object management routines for asynchronous task objects.
//
//===----------------------------------------------------------------------===//
#include "swift/Runtime/Concurrency.h"
#include "swift/ABI/Task.h"
#include "swift/ABI/Metadata.h"
#include "swift/Runtime/Mutex.h"
#include "swift/Runtime/HeapObject.h"
#include "TaskPrivate.h"
#include "AsyncCall.h"
using namespace swift;
using FutureFragment = AsyncTask::FutureFragment;
void FutureFragment::destroy() {
auto queueHead = waitQueue.load(std::memory_order_acquire);
switch (queueHead.getStatus()) {
case Status::Executing:
assert(false && "destroying a task that never completed");
case Status::Success:
resultType->vw_destroy(getStoragePtr());
break;
case Status::Error:
swift_unknownObjectRelease(reinterpret_cast<OpaqueValue *>(getError()));
break;
}
}
FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask) {
using Status = FutureFragment::Status;
using WaitQueueItem = FutureFragment::WaitQueueItem;
assert(isFuture());
auto fragment = futureFragment();
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
while (true) {
switch (queueHead.getStatus()) {
case Status::Error:
case Status::Success:
// The task is done; we don't need to wait.
return queueHead.getStatus();
case Status::Executing:
// Task is now complete. We'll need to add ourselves to the queue.
break;
}
// Put the waiting task at the beginning of the wait queue.
waitingTask->getNextWaitingTask() = queueHead.getTask();
auto newQueueHead = WaitQueueItem::get(Status::Executing, waitingTask);
if (fragment->waitQueue.compare_exchange_weak(
queueHead, newQueueHead, std::memory_order_release,
std::memory_order_acquire)) {
// Escalate the priority of this task based on the priority
// of the waiting task.
swift_task_escalate(this, waitingTask->Flags.getPriority());
return FutureFragment::Status::Executing;
}
}
}
namespace {
/// An asynchronous context within a task that describes a general "Future".
/// task.
///
/// This type matches the ABI of a function `<T> () async throws -> T`, which
/// is the type used by `Task.runDetached` and `Task.group.add` to create
/// futures.
class TaskFutureWaitAsyncContext : public AsyncContext {
public:
// Error result is always present.
SwiftError *errorResult = nullptr;
// No indirect results.
TaskFutureWaitResult result;
// FIXME: Currently, this is always here, but it isn't technically
// necessary.
void* Self;
// Arguments.
AsyncTask *task;
using AsyncContext::AsyncContext;
};
}
/// Run the given task, privoding it with the result of the future.
static void runTaskWithFutureResult(
AsyncTask *waitingTask, ExecutorRef executor,
FutureFragment *futureFragment, bool hadErrorResult) {
auto waitingTaskContext =
static_cast<TaskFutureWaitAsyncContext *>(waitingTask->ResumeContext);
waitingTaskContext->result.hadErrorResult = hadErrorResult;
if (hadErrorResult) {
waitingTaskContext->result.storage =
reinterpret_cast<OpaqueValue *>(futureFragment->getError());
} else {
waitingTaskContext->result.storage = futureFragment->getStoragePtr();
}
// TODO: schedule this task on the executor rather than running it
// directly.
waitingTask->run(executor);
}
void AsyncTask::completeFuture(AsyncContext *context, ExecutorRef executor) {
using Status = FutureFragment::Status;
using WaitQueueItem = FutureFragment::WaitQueueItem;
assert(isFuture());
auto fragment = futureFragment();
// If an error was thrown, save it in the future fragment.
auto futureContext = static_cast<FutureAsyncContext *>(context);
bool hadErrorResult = false;
if (auto errorObject = futureContext->errorResult) {
fragment->getError() = errorObject;
hadErrorResult = true;
}
// Update the status to signal completion.
auto newQueueHead = WaitQueueItem::get(
hadErrorResult ? Status::Error : Status::Success,
nullptr
);
auto queueHead = fragment->waitQueue.exchange(
newQueueHead, std::memory_order_acquire);
assert(queueHead.getStatus() == Status::Executing);
// Schedule every waiting task on the executor.
auto waitingTask = queueHead.getTask();
while (waitingTask) {
// Find the next waiting task.
auto nextWaitingTask = waitingTask->getNextWaitingTask();
// Run the task.
runTaskWithFutureResult(waitingTask, executor, fragment, hadErrorResult);
// Move to the next task.
waitingTask = nextWaitingTask;
}
}
SWIFT_CC(swift)
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
auto task = static_cast<AsyncTask*>(obj);
// For a future, destroy the result.
if (task->isFuture()) {
task->futureFragment()->destroy();
}
// The task execution itself should always hold a reference to it, so
// if we get here, we know the task has finished running, which means
// swift_task_complete should have been run, which will have torn down
// the task-local allocator. There's actually nothing else to clean up
// here.
free(task);
}
/// Heap metadata for an asynchronous task.
static FullMetadata<HeapMetadata> taskHeapMetadata = {
{
{
&destroyTask
},
{
/*value witness table*/ nullptr
}
},
{
MetadataKind::Task
}
};
/// The function that we put in the context of a simple task
/// to handle the final return.
SWIFT_CC(swift)
static void completeTask(AsyncTask *task, ExecutorRef executor,
AsyncContext *context) {
// Tear down the task-local allocator immediately; there's no need
// to wait for the object to be destroyed.
_swift_task_alloc_destroy(task);
// Complete the future.
if (task->isFuture()) {
task->completeFuture(context, executor);
}
// TODO: set something in the status?
// TODO: notify the parent somehow?
// TODO: remove this task from the child-task chain?
// Release the task, balancing the retain that a running task
// has on itself.
swift_release(task);
}
AsyncTaskAndContext
swift::swift_task_create(JobFlags flags, AsyncTask *parent,
const ThinNullaryAsyncSignature::FunctionPointer *function) {
return swift_task_create_f(flags, parent, function->Function.get(),
function->ExpectedContextSize);
}
AsyncTaskAndContext
swift::swift_task_create_f(JobFlags flags, AsyncTask *parent,
ThinNullaryAsyncSignature::FunctionType *function,
size_t initialContextSize) {
return swift_task_create_future_f(
flags, parent, nullptr, function, initialContextSize);
}
AsyncTaskAndContext swift::swift_task_create_future(
JobFlags flags, AsyncTask *parent, const Metadata *futureResultType,
const FutureAsyncSignature::FunctionPointer *function) {
return swift_task_create_future_f(
flags, parent, futureResultType, function->Function.get(),
function->ExpectedContextSize);
}
AsyncTaskAndContext swift::swift_task_create_future_f(
JobFlags flags, AsyncTask *parent, const Metadata *futureResultType,
FutureAsyncSignature::FunctionType *function, size_t initialContextSize) {
assert((futureResultType != nullptr) == flags.task_isFuture());
assert(!flags.task_isFuture() ||
initialContextSize >= sizeof(FutureAsyncContext));
assert((parent != nullptr) == flags.task_isChildTask());
// Figure out the size of the header.
size_t headerSize = sizeof(AsyncTask);
if (parent) headerSize += sizeof(AsyncTask::ChildFragment);
if (futureResultType) {
headerSize += FutureFragment::fragmentSize(futureResultType);
}
headerSize = llvm::alignTo(headerSize, llvm::Align(alignof(AsyncContext)));
// Allocate the initial context together with the job.
// This means that we never get rid of this allocation.
size_t amountToAllocate = headerSize + initialContextSize;
assert(amountToAllocate % MaximumAlignment == 0);
void *allocation = malloc(amountToAllocate);
AsyncContext *initialContext =
reinterpret_cast<AsyncContext*>(
reinterpret_cast<char*>(allocation) + headerSize);
// Initialize the task so that resuming it will run the given
// function on the initial context.
AsyncTask *task =
new(allocation) AsyncTask(&taskHeapMetadata, flags,
function, initialContext);
// Initialize the child fragment if applicable.
// TODO: propagate information from the parent?
if (parent) {
auto childFragment = task->childFragment();
new (childFragment) AsyncTask::ChildFragment(parent);
}
// Initialize the future fragment if applicable.
if (futureResultType) {
auto futureFragment = task->futureFragment();
new (futureFragment) FutureFragment(futureResultType);
// Set up the context for the future so there is no error, and a successful
// result will be written into the future fragment's storage.
auto futureContext = static_cast<FutureAsyncContext *>(initialContext);
futureContext->errorResult = nullptr;
futureContext->indirectResult = futureFragment->getStoragePtr();
}
// Configure the initial context.
//
// FIXME: if we store a null pointer here using the standard ABI for
// signed null pointers, then we'll have to authenticate context pointers
// as if they might be null, even though the only time they ever might
// be is the final hop. Store a signed null instead.
initialContext->Parent = nullptr;
initialContext->ResumeParent = &completeTask;
initialContext->ResumeParentExecutor = ExecutorRef::generic();
initialContext->Flags = AsyncContextKind::Ordinary;
initialContext->Flags.setShouldNotDeallocateInCallee(true);
// Initialize the task-local allocator.
// TODO: consider providing an initial pre-allocated first slab to the
// allocator.
_swift_task_alloc_initialize(task);
return {task, initialContext};
}
void swift::swift_task_future_wait(
AsyncTask *waitingTask, ExecutorRef executor,
AsyncContext *rawContext) {
// Suspend the waiting task.
waitingTask->ResumeTask = rawContext->ResumeParent;
waitingTask->ResumeContext = rawContext;
// Wait on the future.
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
auto task = context->task;
assert(task->isFuture());
switch (task->waitFuture(waitingTask)) {
case FutureFragment::Status::Executing:
// The waiting task has been queued on the future.
return;
case FutureFragment::Status::Success:
// Run the task with a successful result.
// FIXME: Want to guarantee a tail call here
runTaskWithFutureResult(
waitingTask, executor, task->futureFragment(),
/*hadErrorResult=*/false);
return;
case FutureFragment::Status::Error:
// Run the task with an error result.
// FIXME: Want to guarantee a tail call here
runTaskWithFutureResult(
waitingTask, executor, task->futureFragment(),
/*hadErrorResult=*/true);
return;
}
}
namespace {
/// The header of a function context (closure captures) of
/// a thick async function with a non-null context.
struct ThickAsyncFunctionContext: HeapObject {
uint32_t ExpectedContextSize;
};
#if SWIFT_CONCURRENCY_COOPERATIVE_GLOBAL_EXECUTOR
class RunAndBlockSemaphore {
bool Finished = false;
public:
void wait() {
donateThreadToGlobalExecutorUntil([](void *context) {
return *reinterpret_cast<bool*>(context);
}, &Finished);
assert(Finished && "ran out of tasks before we were signalled");
}
void signal() {
Finished = true;
}
};
#else
class RunAndBlockSemaphore {
ConditionVariable Queue;
ConditionVariable::Mutex Lock;
bool Finished = false;
public:
/// Wait for a signal.
void wait() {
Lock.withLockOrWait(Queue, [&] {
return Finished;
});
}
void signal() {
Lock.withLockThenNotifyAll(Queue, [&]{
Finished = true;
});
}
};
#endif
using RunAndBlockSignature =
AsyncSignature<void(HeapObject*), /*throws*/ false>;
struct RunAndBlockContext: AsyncContext {
const void *Function;
HeapObject *FunctionContext;
RunAndBlockSemaphore *Semaphore;
};
using RunAndBlockCalleeContext =
AsyncCalleeContext<RunAndBlockContext, RunAndBlockSignature>;
} // end anonymous namespace
/// Second half of the runAndBlock async function.
SWIFT_CC(swiftasync)
static void runAndBlock_finish(AsyncTask *task, ExecutorRef executor,
AsyncContext *_context) {
auto calleeContext = static_cast<RunAndBlockCalleeContext*>(_context);
auto context = popAsyncContext(task, calleeContext);
context->Semaphore->signal();
return context->ResumeParent(task, executor, context);
}
/// First half of the runAndBlock async function.
SWIFT_CC(swiftasync)
static void runAndBlock_start(AsyncTask *task, ExecutorRef executor,
AsyncContext *_context) {
auto callerContext = static_cast<RunAndBlockContext*>(_context);
size_t calleeContextSize;
RunAndBlockSignature::FunctionType *function;
// If the function context is non-null, then the function pointer is
// an ordinary function pointer.
auto functionContext = callerContext->FunctionContext;
if (functionContext) {
function = reinterpret_cast<RunAndBlockSignature::FunctionType*>(
const_cast<void*>(callerContext->Function));
calleeContextSize =
static_cast<ThickAsyncFunctionContext*>(functionContext)
->ExpectedContextSize;
// Otherwise, the function pointer is an async function pointer.
} else {
auto fnPtr = reinterpret_cast<const RunAndBlockSignature::FunctionPointer*>(
callerContext->Function);
function = fnPtr->Function;
calleeContextSize = fnPtr->ExpectedContextSize;
}
auto calleeContext =
pushAsyncContext<RunAndBlockSignature>(task, executor, callerContext,
calleeContextSize,
&runAndBlock_finish,
functionContext);
return function(task, executor, calleeContext);
}
// TODO: Remove this hack.
void swift::swift_task_runAndBlockThread(const void *function,
HeapObject *functionContext) {
RunAndBlockSemaphore semaphore;
// Set up a task that runs the runAndBlock async function above.
auto pair = swift_task_create_f(JobFlags(JobKind::Task,
JobPriority::Default),
/*parent*/ nullptr,
&runAndBlock_start,
sizeof(RunAndBlockContext));
auto context = static_cast<RunAndBlockContext*>(pair.InitialContext);
context->Function = function;
context->FunctionContext = functionContext;
context->Semaphore = &semaphore;
// Enqueue the task.
swift_task_enqueueGlobal(pair.Task);
// Wait until the task completes.
semaphore.wait();
}
size_t swift::swift_task_getJobFlags(AsyncTask *task) {
return task->Flags.getOpaqueValue();
}
namespace {
/// Structure that gets filled in when a task is suspended by `withUnsafeContinuation`.
struct AsyncContinuationContext {
// These fields are unnecessary for resuming a continuation.
void *Unused1;
void *Unused2;
// Storage slot for the error result, if any.
SwiftError *ErrorResult;
// Pointer to where to store a normal result.
OpaqueValue *NormalResult;
// Executor on which to resume execution.
ExecutorRef ResumeExecutor;
};
static void resumeTaskAfterContinuation(AsyncTask *task,
AsyncContinuationContext *context) {
swift_task_enqueue(task, context->ResumeExecutor);
}
}
SWIFT_CC(swift)
void swift::swift_continuation_resume(/* +1 */ OpaqueValue *result,
void *continuation,
const Metadata *resumeType) {
auto task = reinterpret_cast<AsyncTask*>(continuation);
auto context = reinterpret_cast<AsyncContinuationContext*>(task->ResumeContext);
resumeType->vw_initializeWithTake(context->NormalResult, result);
resumeTaskAfterContinuation(task, context);
}
SWIFT_CC(swift)
void swift::swift_continuation_throwingResume(/* +1 */ OpaqueValue *result,
void *continuation,
const Metadata *resumeType) {
return swift_continuation_resume(result, continuation, resumeType);
}
SWIFT_CC(swift)
void swift::swift_continuation_throwingResumeWithError(/* +1 */ SwiftError *error,
void *continuation,
const Metadata *resumeType) {
auto task = reinterpret_cast<AsyncTask*>(continuation);
auto context = reinterpret_cast<AsyncContinuationContext*>(task->ResumeContext);
context->ErrorResult = error;
resumeTaskAfterContinuation(task, context);
}