blob: 14a94f4cc2ca481024c44299a4800bb28bce5835 [file] [log] [blame]
///===--- Actor.cpp - Standard actor implementation ------------------------===///
///
/// This source file is part of the Swift.org open source project
///
/// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
/// Licensed under Apache License v2.0 with Runtime Library Exception
///
/// See https:///swift.org/LICENSE.txt for license information
/// See https:///swift.org/CONTRIBUTORS.txt for the list of Swift project authors
///
///===----------------------------------------------------------------------===///
///
/// The default actor implementation for Swift actors, plus related
/// routines such as generic executor enqueuing and switching.
///
///===----------------------------------------------------------------------===///
#include "swift/Runtime/Concurrency.h"
#include "swift/Runtime/Atomic.h"
#include "swift/Runtime/Mutex.h"
#include "swift/Runtime/ThreadLocal.h"
#include "swift/ABI/Actor.h"
#include "llvm/ADT/PointerIntPair.h"
using namespace swift;
/// Should we yield the thread?
static bool shouldYieldThread() {
// FIXME: system scheduler integration
return false;
}
/*****************************************************************************/
/*********************** DEFAULT ACTOR IMPLEMENTATION ************************/
/*****************************************************************************/
namespace {
class DefaultActorImpl;
/// A job to process a default actor. Allocated inline in the actor.
class ProcessInlineJob : public Job {
public:
ProcessInlineJob(JobPriority priority)
: Job({JobKind::DefaultActorInline, priority}, &process) {}
SWIFT_CC(swiftasync)
static void process(Job *job, ExecutorRef executor);
static bool classof(const Job *job) {
return job->Flags.getKind() == JobKind::DefaultActorInline;
}
};
/// A job to process a default actor that's allocated separately from
/// the actor but doesn't need the override mechanics.
class ProcessOutOfLineJob : public Job {
DefaultActorImpl *Actor;
public:
ProcessOutOfLineJob(DefaultActorImpl *actor, JobPriority priority)
: Job({JobKind::DefaultActorSeparate, priority}, &process),
Actor(actor) {}
SWIFT_CC(swiftasync)
static void process(Job *job, ExecutorRef executor);
static bool classof(const Job *job) {
return job->Flags.getKind() == JobKind::DefaultActorSeparate;
}
};
/// A job to process a default actor with a new priority; allocated
/// separately from the actor.
class ProcessOverrideJob;
/// Information about the currently-running processing job.
struct RunningJobInfo {
enum KindType : uint8_t {
Inline, Override, Other
};
KindType Kind;
JobPriority Priority;
ProcessOverrideJob *OverrideJob;
bool wasInlineJob() const {
return Kind == Inline;
}
static RunningJobInfo forOther(JobPriority priority) {
return {Other, priority, nullptr};
}
static RunningJobInfo forInline(JobPriority priority) {
return {Inline, priority, nullptr};
}
static RunningJobInfo forOverride(ProcessOverrideJob *job);
void setAbandoned();
void setRunning();
bool waitForActivation();
};
class JobRef {
enum : uintptr_t {
NeedsPreprocessing = 0x1,
IsOverride = 0x2,
JobMask = ~uintptr_t(NeedsPreprocessing | IsOverride)
};
/// A Job* that may have one of the two bits above mangled into it.
uintptr_t Value;
JobRef(Job *job, unsigned flags)
: Value(reinterpret_cast<uintptr_t>(job) | flags) {}
public:
constexpr JobRef() : Value(0) {}
/// Return a reference to a job that's been properly preprocessed.
static JobRef getPreprocessed(Job *job) {
/// We allow null pointers here.
return { job, 0 };
}
/// Return a reference to a job that hasn't been preprocesssed yet.
static JobRef getUnpreprocessed(Job *job) {
assert(job && "passing a null job");
return { job, NeedsPreprocessing };
}
/// Return a reference to an override job, which needs special
/// treatment during preprocessing.
static JobRef getOverride(ProcessOverrideJob *job);
/// Is this a null reference?
operator bool() const { return Value != 0; }
/// Does this job need to be pre-processed before we can treat
/// the job queue as a proper queue?
bool needsPreprocessing() const {
return Value & NeedsPreprocessing;
}
/// Is this an unpreprocessed override job?
bool isOverride() const {
return Value & IsOverride;
}
/// Given that this is an override job, return it.
ProcessOverrideJob *getAsOverride() const {
assert(isOverride());
return reinterpret_cast<ProcessOverrideJob*>(Value & JobMask);
}
ProcessOverrideJob *getAsPreprocessedOverride() const;
Job *getAsJob() const {
assert(!isOverride());
return reinterpret_cast<Job*>(Value & JobMask);
}
Job *getAsPreprocessedJob() const {
assert(!isOverride() && !needsPreprocessing());
return reinterpret_cast<Job*>(Value);
}
bool operator==(JobRef other) const {
return Value == other.Value;
}
bool operator!=(JobRef other) const {
return Value != other.Value;
}
};
/// The default actor implementation.
///
/// Ownership of the actor is subtle. Jobs are assumed to keep the actor
/// alive as long as they're executing on it; this allows us to avoid
/// retaining and releasing whenever threads are scheduled to run a job.
/// While jobs are enqueued on the actor, there is a conceptual shared
/// ownership of the currently-enqueued jobs which is passed around
/// between threads and processing jobs and managed using extra retains
/// and releases of the actor. The basic invariant is as follows:
///
/// - Let R be 1 if there are jobs enqueued on the actor or if a job
/// is currently running on the actor; otherwise let R be 0.
/// - Let N be the number of active processing jobs for the actor.
/// - N >= R
/// - There are N - R extra retains of the actor.
///
/// We can think of this as there being one "owning" processing job
/// and K "extra" jobs. If there is a processing job that is actively
/// running the actor, it is always the owning job; otherwise, any of
/// the N jobs may win the race to become the owning job.
///
/// We then have the following ownership rules:
///
/// - When we enqueue the first job on an actor, then R becomes 1, and
/// we must create a processing job so that N >= R. We do not need to
/// retain the actor.
/// - When we create an extra job to process an actor (e.g. because of
/// priority overrides), N increases but R remains the same. We must
/// retain the actor.
/// - When we start running an actor, our job definitively becomes the
/// owning job, but neither N nor R changes. We do not need to retain
/// the actor.
/// - When we go to start running an actor and for whatever reason we
/// don't actually do so, we are eliminating an extra processing job,
/// and so N decreases but R remains the same. We must release the
/// actor.
/// - When we are running an actor and give it up, and there are no
/// remaining jobs on it, then R becomes 0 and N decreases by 1.
/// We do not need to release the actor.
/// - When we are running an actor and give it up, and there are jobs
/// remaining on it, then R remains 1 but N is decreasing by 1.
/// We must either release the actor or create a new processing job
/// for it to maintain the balance.
class DefaultActorImpl : public HeapObject {
enum class Status {
/// The actor is not currently scheduled. Completely redundant
/// with the job list being empty.
Idle,
/// There is currently a job scheduled to process the actor at the
/// stored max priority.
Scheduled,
/// There is currently a thread processing the actor at the stored
/// max priority.
Running
};
struct Flags : public FlagSet<size_t> {
enum : size_t {
Status_offset = 0,
Status_width = 2,
HasActiveInlineJob = 2,
MaxPriority = 8,
MaxPriority_width = JobFlags::Priority_width,
// FIXME: add a reference to the running thread ID so that we
// can boost priorities.
};
/// What is the current high-level status of this actor?
FLAGSET_DEFINE_FIELD_ACCESSORS(Status_offset, Status_width, Status,
getStatus, setStatus)
/// Is there currently an active processing job allocated inline
/// in the actor?
FLAGSET_DEFINE_FLAG_ACCESSORS(HasActiveInlineJob,
hasActiveInlineJob, setHasActiveInlineJob)
/// What is the maximum priority of jobs that are currently running
/// or enqueued on this actor?
///
/// Note that the above isn't quite correct: we don't actually
/// lower this after we finish processing higher-priority tasks.
/// (Doing so introduces some subtleties around kicking off
/// lower-priority processing jobs.)
FLAGSET_DEFINE_FIELD_ACCESSORS(MaxPriority, MaxPriority_width,
JobPriority,
getMaxPriority, setMaxPriority)
};
/// This is designed to fit into two words, which can generally be
/// done lock-free on all our supported platforms.
struct alignas(2 * sizeof(void*)) State {
JobRef FirstJob;
struct Flags Flags;
};
swift::atomic<State> CurrentState;
friend class ProcessInlineJob;
union {
ProcessInlineJob JobStorage;
};
public:
/// Properly construct an actor, except for the heap header.
void initialize() {
new (&CurrentState) std::atomic<State>(State{JobRef(), Flags()});
}
/// Properly destruct an actor, except for the heap header.
void destroy() {
assert(CurrentState.load(std::memory_order_relaxed).Flags.getStatus()
== Status::Idle && "actor not idle during destruction?");
}
/// Add a job to this actor.
void enqueue(Job *job);
/// Take over running this actor in the current thread, if possible.
bool tryAssumeThread(RunningJobInfo runner);
/// Give up running this actor in the current thread.
void giveUpThread(RunningJobInfo runner);
/// Claim the next job off the actor or give it up.
Job *claimNextJobOrGiveUp(bool actorIsOwned, RunningJobInfo runner);
private:
/// Schedule an inline processing job. This can generally only be
/// done if we know nobody else is trying to do it at the same time,
/// e.g. if this thread just sucessfully transitioned the actor from
/// Idle to Scheduled.
void scheduleNonOverrideProcessJob(JobPriority priority,
bool hasActiveInlineJob);
static DefaultActorImpl *fromInlineJob(Job *job) {
assert(isa<ProcessInlineJob>(job));
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Winvalid-offsetof"
return reinterpret_cast<DefaultActorImpl*>(
reinterpret_cast<char*>(job) - offsetof(DefaultActorImpl, JobStorage));
#pragma clang diagnostic pop
}
class OverrideJobCache {
ProcessOverrideJob *Job = nullptr;
bool IsNeeded = false;
#ifndef NDEBUG
bool WasCommitted = false;
#endif
public:
OverrideJobCache() = default;
OverrideJobCache(const OverrideJobCache &) = delete;
OverrideJobCache &operator=(const OverrideJobCache &) = delete;
~OverrideJobCache() {
assert(WasCommitted && "didn't commit override job!");
}
void addToState(DefaultActorImpl *actor, State &newState);
void setNotNeeded() { IsNeeded = false; }
void commit();
};
};
} /// end anonymous namespace
static_assert(sizeof(DefaultActorImpl) <= sizeof(DefaultActor) &&
alignof(DefaultActorImpl) <= alignof(DefaultActor),
"DefaultActorImpl doesn't fit in DefaultActor");
static DefaultActorImpl *asImpl(DefaultActor *actor) {
return reinterpret_cast<DefaultActorImpl*>(actor);
}
static DefaultActor *asAbstract(DefaultActorImpl *actor) {
return reinterpret_cast<DefaultActor*>(actor);
}
/*****************************************************************************/
/************************** DEFAULT ACTOR TRACKING ***************************/
/*****************************************************************************/
namespace {
enum Mode {
/// Shadow any existing frame, leaving it untouched.
ShadowExistingFrame,
/// Update any existing frame if possible.
UpdateExistingFrame
};
/// A little class for tracking whether there's a frame processing
/// default actors in the current thread.
///
/// The goal of this class is to encapsulate uses of the central variable.
/// We want to potentially use a more efficient access pattern than
/// ordinary thread-locals when that's available.
class DefaultActorProcessingFrame {
using ValueType = llvm::PointerIntPair<DefaultActorImpl*, 1, bool>;
/// The active default actor on the current thread, if any.
/// This may still need to be tracked separately from the active
/// executor, if/when we start tracking that in thread-local storage.
static SWIFT_RUNTIME_DECLARE_THREAD_LOCAL(ValueType, ThreadLocalValue);
ValueType SavedValue;
bool IsNeeded;
public:
/// Flag that this thread is processing the given actor (or null,
/// for generic processing) and set up a processing frame if we
/// don't already have one.
DefaultActorProcessingFrame(DefaultActorImpl *actor, Mode mode) {
// If we should shadow an existing frame, save any value that
// it might have set.
if (mode == ShadowExistingFrame) {
SavedValue = ThreadLocalValue.get();
IsNeeded = true;
// If we should update an existing frame, just replace any value
// that it might have set.
} else {
IsNeeded = !ThreadLocalValue.get().getInt();
SavedValue = ValueType();
}
ThreadLocalValue.set(ValueType(actor, true));
}
DefaultActorProcessingFrame(const DefaultActorProcessingFrame &) = delete;
DefaultActorProcessingFrame &operator=(
const DefaultActorProcessingFrame &) = delete;
/// Return the currently active actor.
DefaultActorImpl *getActiveActor() {
return ThreadLocalValue.get().getPointer();
}
/// Exit the frame. This isn't a destructor intentionally, because
/// we need to be able to tail-call out of frames that might have
/// optimistically made one of these.
void exit() {
ThreadLocalValue.set(SavedValue);
}
/// Return whether this frame was needed; if it was not, then it's
/// okay to abandon it without calling exit(). This is only meaningful
/// when constructed in the UpdateExistingFrame mode.
bool isNeeded() {
return IsNeeded;
}
};
/// Define the thread-local.
SWIFT_RUNTIME_DECLARE_THREAD_LOCAL(
DefaultActorProcessingFrame::ValueType,
DefaultActorProcessingFrame::ThreadLocalValue);
} /// end anonymous namespace
/*****************************************************************************/
/*********************** DEFAULT ACTOR IMPLEMENTATION ************************/
/*****************************************************************************/
/// Given that a job is enqueued normally on a default actor, get/set
/// the next job in the actor's queue.
///
/// Note that this must not be used on the override jobs that can appear
/// in the queue; those jobs are not actually in the actor's queue (they're
/// on the global execution queues). So the actor's actual queue flows
/// through the NextJob field on those objects rather than through
/// the SchedulerPrivate fields.
static JobRef getNextJobInQueue(Job *job) {
return *reinterpret_cast<JobRef*>(job->SchedulerPrivate);
}
static void setNextJobInQueue(Job *job, JobRef next) {
*reinterpret_cast<JobRef*>(job->SchedulerPrivate) = next;
}
/// Schedule a processing job that doesn't have to be an override job.
///
/// We can either do this with inline storage or heap-allocated.
/// To ues inline storage, we need to verify that the hasActiveInlineJob
/// flag is not set in the state and then successfully set it. The
/// argument reports that this has happened correctly.
///
/// We should only schedule a non-override processing job at all if
/// we're transferring ownership of the jobs in it; see the ownership
/// comment on DefaultActorImpl.
void DefaultActorImpl::scheduleNonOverrideProcessJob(JobPriority priority,
bool hasActiveInlineJob) {
Job *job;
if (hasActiveInlineJob) {
job = new ProcessOutOfLineJob(this, priority);
} else {
job = new (&JobStorage) ProcessInlineJob(priority);
}
swift_task_enqueueGlobal(job);
}
namespace {
/// A job to process a specific default actor at a higher priority than
/// it was previously running at.
///
/// When an override job is successfully registered with an actor
/// (not enqueued there), the thread processing the actor and the
/// thread processing the override job coordinate by each calling
/// one of a set of methods on the object.
class ProcessOverrideJob : public Job {
DefaultActorImpl *Actor;
ConditionVariable::Mutex Lock;
ConditionVariable Queue;
/// Has the actor made a decision about this job yet?
bool IsResolvedByActor = false;
/// Has the job made a decision about itself yet?
bool IsResolvedByJob = false;
/// Has this job been abandoned?
bool IsAbandoned = false;
public:
/// SchedulerPrivate in an override job is used for actually scheduling
/// the job, so the actor queue goes through this instead.
///
/// We also use this temporarily for the list of override jobs on
/// the actor that we need to wake up.
JobRef NextJob;
public:
ProcessOverrideJob(DefaultActorImpl *actor, JobPriority priority,
JobRef nextJob)
: Job({JobKind::DefaultActorOverride, priority}, &process),
Actor(actor), NextJob(nextJob) {}
DefaultActorImpl *getActor() const { return Actor; }
/// Called by the job to notify the actor that the job has chosen
/// to abandon its work. This is irrevocable: the job is not going
/// to have a thread behind it.
///
/// This may delete the job or cause it to be deleted on another thread.
void setAbandoned() {
bool shouldDelete = false;
Lock.withLock([&] {
assert(!IsResolvedByJob && "job already resolved itself");
IsResolvedByJob = true;
IsAbandoned = true;
shouldDelete = IsResolvedByJob && IsResolvedByActor;
});
if (shouldDelete) delete this;
}
/// Called by the job to notify the actor that the job has successfully
/// taken over the actor and is now running it.
///
/// This may delete the job object or cause it to be deleted on
/// another thread.
void setRunning() {
bool shouldDelete = false;
Lock.withLock([&] {
assert(!IsResolvedByJob && "job already resolved itself");
IsResolvedByJob = true;
shouldDelete = IsResolvedByJob && IsResolvedByActor;
});
if (shouldDelete) delete this;
}
/// Called by the job to wait for the actor to resolve what the job
/// should do.
bool waitForActivation() {
bool isActivated = false;
Lock.withLockOrWait(Queue, [&] {
assert(!IsResolvedByJob && "job already resolved itself");
if (IsResolvedByActor) {
isActivated = !IsAbandoned;
IsResolvedByJob = true;
return true;
}
return false;
});
delete this;
return isActivated;
}
/// Called by the actor to notify this job that the actor thinks it
/// should try to take over the actor. It's okay if that doesn't
/// succeed (as long as that's because some other job is going to
/// take over).
///
/// This may delete the job or cause it to be deleted on another
/// thread.
bool wakeAndActivate() {
bool shouldDelete = false;
bool mayHaveBeenActivated = false;
Lock.withLockThenNotifyAll(Queue, [&] {
assert(!IsResolvedByActor && "actor already resolved this sjob");
IsResolvedByActor = true;
mayHaveBeenActivated = IsResolvedByJob && !IsAbandoned;
shouldDelete = IsResolvedByJob && IsResolvedByActor;
});
if (shouldDelete) delete this;
return mayHaveBeenActivated;
}
/// Called by the actor to notify this job that the actor does not
/// think it should try to take over the actor. It's okay if the
/// job successfully takes over the actor anyway.
///
/// This may delete the job or cause it to be deleted on another
/// thread.
void wakeAndAbandon() {
bool shouldDelete = false;
Lock.withLockThenNotifyAll(Queue, [&] {
assert(!IsResolvedByActor && "actor already resolved this job");
IsResolvedByActor = true;
IsAbandoned = true;
shouldDelete = IsResolvedByJob && IsResolvedByActor;
});
if (shouldDelete) delete this;
}
SWIFT_CC(swiftasync)
static void process(Job *job, ExecutorRef _executor);
static bool classof(const Job *job) {
return job->Flags.getKind() == JobKind::DefaultActorOverride;
}
};
} /// end anonymous namespace
JobRef JobRef::getOverride(ProcessOverrideJob *job) {
return JobRef(job, NeedsPreprocessing | IsOverride);
}
ProcessOverrideJob *JobRef::getAsPreprocessedOverride() const {
return cast_or_null<ProcessOverrideJob>(getAsPreprocessedJob());
}
RunningJobInfo RunningJobInfo::forOverride(ProcessOverrideJob *job) {
return {Override, job->getPriority(), job};
}
/// Flag that the current processing job has been abandoned
/// and will not be running the actor.
void RunningJobInfo::setAbandoned() {
if (OverrideJob) {
OverrideJob->setAbandoned();
OverrideJob = nullptr;
}
}
/// Flag that the current processing job is now running the actor.
void RunningJobInfo::setRunning() {
if (OverrideJob) {
OverrideJob->setRunning();
OverrideJob = nullptr;
}
}
/// Try to wait for the current processing job to be activated,
/// if that's possible. It's okay to call this multiple times
/// (or to call setAbandoned/setRunning after it) as long as
/// it's all on a single value.
bool RunningJobInfo::waitForActivation() {
if (Kind == Override) {
// If we don't have an override job, it's because we've already
// waited for activation successfully.
if (!OverrideJob) return true;
bool result = OverrideJob->waitForActivation();
OverrideJob = nullptr;
return result;
}
return false;
}
/// Wake all the overrides in the given list, activating the first
/// that exactly matches the target priority, if any.
static void wakeOverrides(ProcessOverrideJob *nextOverride,
Optional<JobPriority> targetPriority) {
bool hasAlreadyActivated = false;
while (nextOverride) {
// We have to advance to the next override before we call one of
// the wake methods because they can delete the job immediately
// (and even if they don't, we'd still be racing with deletion).
auto cur = nextOverride;
nextOverride = cur->NextJob.getAsPreprocessedOverride();
if (hasAlreadyActivated ||
!targetPriority ||
cur->getPriority() != *targetPriority)
cur->wakeAndAbandon();
else
hasAlreadyActivated = cur->wakeAndActivate();
}
}
/// Flag that an override job is needed and create it.
void DefaultActorImpl::OverrideJobCache::addToState(DefaultActorImpl *actor,
State &newState) {
IsNeeded = true;
auto newPriority = newState.Flags.getMaxPriority();
auto nextJob = newState.FirstJob;
if (Job) {
Job->Flags.setPriority(newPriority);
Job->NextJob = nextJob;
} else {
// Override jobs are always "extra" from the perspective of our
// ownership rules and so require a retain of the actor. We must
// do this before changing the actor state because other jobs may
// race to release the actor as soon as we change the actor state.
swift_retain(actor);
Job = new ProcessOverrideJob(actor, newPriority, nextJob);
}
newState.FirstJob = JobRef::getOverride(Job);
}
/// Schedule the override job if we created one and still need it.
/// If we created one but didn't end up needing it (which can happen
/// with a race to override), destroy it.
void DefaultActorImpl::OverrideJobCache::commit() {
#ifndef NDEBUG
assert(!WasCommitted && "committing override job multiple timee");
WasCommitted = true;
#endif
if (Job) {
if (IsNeeded) {
swift_task_enqueueGlobal(Job);
} else {
swift_release(Job->getActor());
delete Job;
}
}
}
/// Preprocess the prefix of the actor's queue that hasn't already
/// been preprocessed:
///
/// - Split the jobs into registered overrides and actual jobs.
/// - Append the actual jobs to any already-preprocessed job list.
///
/// The returned job should become the new root of the job queue
/// (or may be immediately dequeued, in which its successor should).
/// All of the jobs in this list are guaranteed to be non-override jobs.
static Job *preprocessQueue(JobRef first,
JobRef previousFirst,
Job *previousFirstNewJob,
ProcessOverrideJob *&overridesToWake) {
assert(previousFirst || previousFirstNewJob == nullptr);
if (!first.needsPreprocessing())
return first.getAsPreprocessedJob();
Job *firstNewJob = nullptr;
while (first != previousFirst) {
// If we find something that doesn't need preprocessing, it must've
// been left by a previous queue-processing, which means that
// this must be our first attempt to preprocess in this processing.
// Just treat the queue from this point as a well-formed whole
// to which we need to add any new items we might've just found.
if (!first.needsPreprocessing()) {
assert(!previousFirst && !previousFirstNewJob);
previousFirstNewJob = first.getAsPreprocessedJob();
break;
}
// If the job is an override, add it to the list of override jobs
// that we need to wake up. Note that the list of override jobs
// flows through NextJob; we must not use getNextJobInQueue because
// that touches queue-private state, and the override job is
// not enqueued on the actor, merely registered with it.
if (first.isOverride()) {
auto overrideJob = first.getAsOverride();
first = overrideJob->NextJob;
overrideJob->NextJob = JobRef::getPreprocessed(overridesToWake);
overridesToWake = overrideJob;
continue;
}
// If the job isn't an override, add it to the front of the list of
// jobs we're building up. Note that this reverses the order of
// jobs; since enqueue() always adds jobs to the front, reversing
// the order effectively makes the actor queue FIFO, which is what
// we want.
// FIXME: but we should also sort by priority
auto job = first.getAsJob();
first = getNextJobInQueue(job);
setNextJobInQueue(job, JobRef::getPreprocessed(firstNewJob));
firstNewJob = job;
}
// If there are jobs already in the queue, put the new jobs at the end.
if (!firstNewJob) {
firstNewJob = previousFirstNewJob;
} else if (previousFirstNewJob) {
auto cur = previousFirstNewJob;
while (true) {
auto next = getNextJobInQueue(cur).getAsPreprocessedJob();
if (!next) {
setNextJobInQueue(cur, JobRef::getPreprocessed(firstNewJob));
break;
}
cur = next;
}
firstNewJob = previousFirstNewJob;
}
return firstNewJob;
}
void DefaultActorImpl::giveUpThread(RunningJobInfo runner) {
auto oldState = CurrentState.load(std::memory_order_acquire);
assert(oldState.Flags.getStatus() == Status::Running);
ProcessOverrideJob *overridesToWake = nullptr;
auto firstNewJob = preprocessQueue(oldState.FirstJob, JobRef(), nullptr,
overridesToWake);
while (true) {
State newState = oldState;
newState.FirstJob = JobRef::getPreprocessed(firstNewJob);
if (firstNewJob) {
newState.Flags.setStatus(Status::Scheduled);
} else {
newState.Flags.setStatus(Status::Idle);
}
// If the runner was an inline job, it's no longer active.
if (runner.wasInlineJob()) {
newState.Flags.setHasActiveInlineJob(false);
}
bool hasMoreJobs = (bool) newState.FirstJob;
bool hasOverrideAtNewPriority =
(runner.Priority < oldState.Flags.getMaxPriority());
bool hasActiveInlineJob = newState.Flags.hasActiveInlineJob();
bool needsNewProcessJob = hasMoreJobs && !hasOverrideAtNewPriority;
// If we want to create a new inline job below, be sure to claim that
// in the new state.
if (needsNewProcessJob && !hasActiveInlineJob) {
newState.Flags.setHasActiveInlineJob(true);
}
auto firstPreprocessed = oldState.FirstJob;
if (!CurrentState.compare_exchange_weak(oldState, newState,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_acquire)) {
// Preprocess any new queue items.
firstNewJob = preprocessQueue(oldState.FirstJob,
firstPreprocessed,
firstNewJob,
overridesToWake);
// Try again.
continue;
}
// The priority of the remaining work.
auto newPriority = newState.Flags.getMaxPriority();
// Wake any overrides.
wakeOverrides(overridesToWake, newPriority);
// This is the actor's owning job; per the ownership rules (see
// the comment on DefaultActorImpl), if there are remaining
// jobs, we need to balance out our ownership one way or another.
// We also, of course, need to ensure that there's a thread that's
// actually going to process the actor.
if (hasMoreJobs) {
// If we know that there's an override job at the new priority,
// we can let it become the owning job. We just need to release.
if (hasOverrideAtNewPriority) {
swift_release(this);
// Otherwies, enqueue a job that will try to take over running
// with the new priority. This also ensures that there's a job
// at that priority which will actually take over the actor.
} else {
scheduleNonOverrideProcessJob(newPriority, hasActiveInlineJob);
}
}
return;
}
}
/// Claim the next job on the actor or give it up forever.
///
/// The running thread doesn't need to already own the actor to do this.
/// It does need to be participating correctly in the ownership
/// scheme as a "processing job"; see the comment on DefaultActorImpl.
Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
RunningJobInfo runner) {
auto oldState = CurrentState.load(std::memory_order_acquire);
// The status had better be Running unless we're trying to acquire
// our first job.
assert(oldState.Flags.getStatus() == Status::Running ||
!actorIsOwned);
// If we don't yet own the actor, we need to try to claim the actor
// first; we cannot safely access the queue memory yet because other
// threads may concurrently be trying to do this.
if (!actorIsOwned) {
while (true) {
// A helper function when the only change we need to try is to
// update for an inline runner.
auto tryUpdateForInlineRunner = [&]{
if (!runner.wasInlineJob()) return true;
auto newState = oldState;
newState.Flags.setHasActiveInlineJob(false);
return CurrentState.compare_exchange_weak(oldState, newState,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_acquire);
};
// If the actor is out of work, or its priority doesn't match our
// priority, don't try to take over the actor.
if (!oldState.FirstJob ||
oldState.Flags.getMaxPriority() != runner.Priority) {
// The only change we need here is inline-runner bookkeeping.
if (!tryUpdateForInlineRunner())
continue;
// We're eliminating a processing thread; balance ownership.
swift_release(this);
runner.setAbandoned();
return nullptr;
}
// If the actor is currently running, we'd need to wait for
// it to stop. We can do this if we're an override job;
// otherwise we need to exit.
if (oldState.Flags.getStatus() == Status::Running) {
if (!runner.waitForActivation()) {
// The only change we need here is inline-runner bookkeeping.
if (!tryUpdateForInlineRunner())
continue;
swift_release(this);
return nullptr;
}
// Fall through into the compare-exchange below, but anticipate
// that the actor is now Scheduled instead of Running.
oldState.Flags.setStatus(Status::Scheduled);
}
// Try to set the state as Running.
assert(oldState.Flags.getStatus() == Status::Scheduled);
auto newState = oldState;
newState.Flags.setStatus(Status::Running);
// Also do our inline-runner bookkeeping.
if (runner.wasInlineJob())
newState.Flags.setHasActiveInlineJob(false);
if (!CurrentState.compare_exchange_weak(oldState, newState,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_acquire))
continue;
// If that succeeded, we can proceed to the main body.
oldState = newState;
runner.setRunning();
break;
}
}
assert(oldState.Flags.getStatus() == Status::Running);
// We should have taken care of the inline-job bookkeeping now.
assert(!oldState.Flags.hasActiveInlineJob() || !runner.wasInlineJob());
// Okay, now it's safe to look at queue state.
// Preprocess any queue items at the front of the queue.
ProcessOverrideJob *overridesToWake = nullptr;
auto newFirstJob = preprocessQueue(oldState.FirstJob, JobRef(),
nullptr, overridesToWake);
Optional<JobPriority> remainingJobPriority;
while (true) {
State newState = oldState;
// If the priority we're currently running with is adqeuate for
// all the remaining jobs, try to dequeue something.
// FIXME: should this be an exact match in priority instead of
// potentially running jobs with too high a priority?
Job *jobToRun;
if (oldState.Flags.getMaxPriority() <= runner.Priority &&
newFirstJob) {
jobToRun = newFirstJob;
newState.FirstJob = getNextJobInQueue(newFirstJob);
newState.Flags.setStatus(Status::Running);
// Otherwise, we should give up the thread.
} else {
jobToRun = nullptr;
newState.FirstJob = JobRef::getPreprocessed(newFirstJob);
newState.Flags.setStatus(newFirstJob ? Status::Scheduled
: Status::Idle);
}
// Try to update the queue. The changes we've made to the queue
// structure need to be made visible even if we aren't dequeuing
// anything.
auto firstPreprocessed = oldState.FirstJob;
if (!CurrentState.compare_exchange_weak(oldState, newState,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_acquire)) {
// Preprocess any new queue items, which will have been formed
// into a linked list leading to the last head we observed.
// (The fact that that job may not be the head anymore doesn't
// matter; we're looking for an exact match with that.)
newFirstJob = preprocessQueue(oldState.FirstJob,
firstPreprocessed,
newFirstJob,
overridesToWake);
// Loop to retry updating the state.
continue;
}
// We successfully updated the state.
// If we're giving up the thread with jobs remaining, we need
// to release the actor, and we should wake overrides with the
// right priority.
Optional<JobPriority> remainingJobPriority;
if (!jobToRun && newFirstJob) {
remainingJobPriority = newState.Flags.getMaxPriority();
}
// Wake the overrides.
wakeOverrides(overridesToWake, remainingJobPriority);
// Per the ownership rules (see the comment on DefaultActorImpl),
// release the actor if we're giving up the thread with jobs
// remaining. We intentionally do this after wakeOverrides to
// try to get the overrides running a little faster.
if (remainingJobPriority)
swift_release(this);
return jobToRun;
}
}
/// The primary function for processing an actor on a thread. Start
/// processing the given default actor as the active default actor on
/// the current thread, and keep processing whatever actor we're
/// running when code returns back to us until we're not processing
/// any actors anymore.
static void processDefaultActor(DefaultActorImpl *currentActor,
RunningJobInfo runner) {
// Register that we're processing a default actor in this frame.
DefaultActorProcessingFrame frame(currentActor, ShadowExistingFrame);
bool threadIsRunningActor = false;
while (true) {
assert(currentActor);
// Immediately check if we've been asked to yield the thread.
if (shouldYieldThread())
break;
// Claim another job from the current actor.
auto job = currentActor->claimNextJobOrGiveUp(threadIsRunningActor,
runner);
// If we failed to claim a job, we have nothing to do.
if (!job) {
// We also gave up the actor as part of failing to claim it.
// Make sure we don't try to give up the actor again.
currentActor = nullptr;
break;
}
// Run the job.
job->run(ExecutorRef::forDefaultActor(asAbstract(currentActor)));
// The current actor may have changed after the job.
// If it's become nil, we have nothing to do.
currentActor = frame.getActiveActor();
if (!currentActor)
break;
// Otherwise, we know that we're running the actor on this thread.
threadIsRunningActor = true;
}
frame.exit();
// If we still have an active actor, we should give it up.
if (currentActor)
currentActor->giveUpThread(runner);
}
void ProcessInlineJob::process(Job *job, ExecutorRef _executor) {
DefaultActorImpl *actor = DefaultActorImpl::fromInlineJob(job);
// Pull the priority out of the job before we do anything that might
// invalidate it.
auto targetPriority = job->getPriority();
auto runner = RunningJobInfo::forInline(targetPriority);
// FIXME: force tail call
return processDefaultActor(actor, runner);
}
void ProcessOutOfLineJob::process(Job *job, ExecutorRef _executor) {
auto self = cast<ProcessOutOfLineJob>(job);
DefaultActorImpl *actor = self->Actor;
// Pull the priority out of the job before we do anything that might
// invalidate it.
auto targetPriority = job->getPriority();
auto runner = RunningJobInfo::forOther(targetPriority);
delete self;
// FIXME: force tail call
return processDefaultActor(actor, runner);
}
void ProcessOverrideJob::process(Job *job, ExecutorRef _executor) {
auto self = cast<ProcessOverrideJob>(job);
// Pull the actor and priority out of the job.
auto actor = self->Actor;
auto runner = RunningJobInfo::forOverride(self);
// FIXME: force tail call
return processDefaultActor(actor, runner);
}
void DefaultActorImpl::enqueue(Job *job) {
auto oldState = CurrentState.load(std::memory_order_relaxed);
OverrideJobCache overrideJob;
while (true) {
auto newState = oldState;
// Put the job at the front of the job list (which will get
// reversed during preprocessing).
setNextJobInQueue(job, oldState.FirstJob);
newState.FirstJob = JobRef::getUnpreprocessed(job);
auto oldStatus = oldState.Flags.getStatus();
bool wasIdle = oldStatus == Status::Idle;
// Update the priority: the prriority of the job we're adding
// if the actor was idle, or the max if not. Only the running
// thread can decrease the actor's priority once it's non-idle.
// (But note that the job we enqueue can still observe a
// lowered priority.)
auto oldPriority = oldState.Flags.getMaxPriority();
auto newPriority =
wasIdle ? job->getPriority()
: std::max(oldPriority, job->getPriority());
newState.Flags.setMaxPriority(newPriority);
// If we need an override job, create it (if necessary) and
// register it with the queue.
bool needsOverride = !wasIdle && newPriority != oldPriority;
if (needsOverride) {
overrideJob.addToState(this, newState);
} else {
overrideJob.setNotNeeded();
}
// If we don't need an override job, then we might be able to
// create an inline job; flag that.
bool hasActiveInlineJob = newState.Flags.hasActiveInlineJob();
if (wasIdle && !hasActiveInlineJob)
newState.Flags.setHasActiveInlineJob(true);
// Make sure the status is at least Scheduled. We'll actually
// schedule the job below, if we succeed at this.
if (wasIdle) {
newState.Flags.setStatus(Status::Scheduled);
}
// Try the compare-exchange, and try again if it fails.
if (!CurrentState.compare_exchange_weak(oldState, newState,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_relaxed))
continue;
// Okay, we successfully updated the status. Schedule a job to
// process the actor if necessary.
// Commit the override job if we created one.
overrideJob.commit();
// If the actor is currently idle, schedule it using the
// invasive job.
if (wasIdle) {
assert(!needsOverride);
scheduleNonOverrideProcessJob(newPriority, hasActiveInlineJob);
}
return;
}
}
bool DefaultActorImpl::tryAssumeThread(RunningJobInfo runner) {
// We have to load-acquire in order to properly order accesses to
// the actor's state for the new task.
auto oldState = CurrentState.load(std::memory_order_acquire);
// If the actor is currently idle, try to mark it as running.
while (oldState.Flags.getStatus() == Status::Idle) {
assert(!oldState.FirstJob);
auto newState = oldState;
newState.Flags.setStatus(Status::Running);
newState.Flags.setMaxPriority(runner.Priority);
if (CurrentState.compare_exchange_weak(oldState, newState,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_acquire))
return true;
}
return false;
}
void swift::swift_defaultActor_initialize(DefaultActor *_actor) {
asImpl(_actor)->initialize();
}
void swift::swift_defaultActor_destroy(DefaultActor *_actor) {
asImpl(_actor)->destroy();
}
void swift::swift_defaultActor_enqueue(Job *job, DefaultActor *_actor) {
asImpl(_actor)->enqueue(job);
}
/*****************************************************************************/
/****************************** ACTOR SWITCHING ******************************/
/*****************************************************************************/
/// Can the current executor give up its thread?
static bool canGiveUpThreadForSwitch(ExecutorRef currentExecutor) {
// We can certainly "give up" a generic executor to try to run
// a task for an actor.
if (currentExecutor.isGeneric())
return true;
// If the current executor is a default actor, we know how to make
// it give up its thread.
if (currentExecutor.isDefaultActor())
return true;
return false;
}
/// Tell the current executor to give up its thread, given that it
/// returned true from canGiveUpThreadForSwitch().
///
/// Note that we don't update DefaultActorProcessingFrame here; we'll
/// do that in runOnAssumedThread.
static void giveUpThreadForSwitch(ExecutorRef currentExecutor,
RunningJobInfo runner) {
if (currentExecutor.isGeneric())
return;
asImpl(currentExecutor.getDefaultActor())->giveUpThread(runner);
}
/// Try to assume control of the current thread for the given executor
/// in order to run the given job.
///
/// This doesn't actually run the job yet.
///
/// Note that we don't update DefaultActorProcessingFrame here; we'll
/// do that in runOnAssumedThread.
static bool tryAssumeThreadForSwitch(ExecutorRef newExecutor,
RunningJobInfo runner) {
// If the new executor is generic, we don't need to do anything.
if (newExecutor.isGeneric()) {
return true;
}
// If the new executor is a default actor, ask it to assume the thread.
if (newExecutor.isDefaultActor()) {
return asImpl(newExecutor.getDefaultActor())->tryAssumeThread(runner);
}
return false;
}
/// Given that we've assumed control of an executor on this thread,
/// run the given task on it.
SWIFT_CC(swiftasync)
static void runOnAssumedThread(AsyncTask *task, ExecutorRef newExecutor,
RunningJobInfo runner) {
assert(newExecutor.isGeneric() || newExecutor.isDefaultActor());
DefaultActorImpl *actor = newExecutor.isGeneric()
? nullptr
: asImpl(newExecutor.getDefaultActor());
// Set that this actor is now the active default actor on this thread,
// and set up an actor-processing frame if there wasn't one already.
DefaultActorProcessingFrame frame(actor, UpdateExistingFrame);
// If one already existed, we should just tail-call the task; we don't
// want these frames to potentially accumulate linearly.
if (!frame.isNeeded()) {
// FIXME: force tail call
return task->run(newExecutor);
}
// Otherwise, run the new task.
task->run(newExecutor);
// Leave the processing frame, and give up the current actor if
// we have one.
//
// In principle, we could execute more tasks here, but that's probably
// not a reasonable thing to do in an assumed context rather than a
// dedicated actor-processing job.
actor = frame.getActiveActor();
frame.exit();
if (actor)
actor->giveUpThread(runner);
}
void swift::swift_task_switch(AsyncTask *task, ExecutorRef currentExecutor,
ExecutorRef newExecutor) {
assert(task && "no task provided");
// If the current executor is compatible with running the new executor,
// just continue running.
if (!currentExecutor.mustSwitchToRun(newExecutor)) {
// FIXME: force tail call
return task->run(currentExecutor);
}
// Okay, we semantically need to switch.
auto runner = RunningJobInfo::forOther(task->getPriority());
// If the current executor can give up its thread, and the new executor
// can take over a thread, try to do so; but don't do this if we've
// been asked to yield the thread.
if (canGiveUpThreadForSwitch(currentExecutor) &&
!shouldYieldThread() &&
tryAssumeThreadForSwitch(newExecutor, runner)) {
giveUpThreadForSwitch(currentExecutor, runner);
// FIXME: force tail call
return runOnAssumedThread(task, newExecutor, runner);
}
// Otherwise, just asynchronously enqueue the task on the given
// executor.
swift_task_enqueue(task, newExecutor);
}
/*****************************************************************************/
/************************* GENERIC ACTOR INTERFACES **************************/
/*****************************************************************************/
void swift::swift_task_enqueue(Job *job, ExecutorRef executor) {
assert(job && "no job provided");
if (executor.isGeneric())
return swift_task_enqueueGlobal(job);
if (executor.isDefaultActor())
return asImpl(executor.getDefaultActor())->enqueue(job);
// Just assume it's actually a default actor that we haven't tagged
// properly.
// FIXME: call the general method.
return asImpl(reinterpret_cast<DefaultActor*>(executor.getRawValue()))
->enqueue(job);
}