| // Copyright 2023 The Pigweed Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| // use this file except in compliance with the License. You may obtain a copy of |
| // the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| // License for the specific language governing permissions and limitations under |
| // the License. |
| |
| #include "pw_async2/dispatcher_base.h" |
| |
| #include <mutex> |
| |
| #include "pw_assert/check.h" |
| #include "pw_sync/lock_annotations.h" |
| |
| namespace pw::async2 { |
| |
| void Context::ReEnqueue() { |
| Waker waker; |
| waker_->InternalCloneInto(waker); |
| std::move(waker).Wake(); |
| } |
| |
| void Context::InternalStoreWaker(Waker& waker_out) { |
| waker_->InternalCloneInto(waker_out); |
| } |
| |
| void Task::RemoveAllWakersLocked() { |
| while (wakers_ != nullptr) { |
| Waker* current = wakers_; |
| wakers_ = current->next_; |
| current->task_ = nullptr; |
| current->next_ = nullptr; |
| } |
| } |
| |
| void Task::AddWakerLocked(Waker& waker) { |
| waker.task_ = this; |
| waker.next_ = wakers_; |
| wakers_ = &waker; |
| } |
| |
| void Task::RemoveWakerLocked(Waker& waker) { |
| if (&waker == wakers_) { |
| wakers_ = wakers_->next_; |
| } else { |
| Waker* current = wakers_; |
| while (current->next_ != &waker) { |
| current = current->next_; |
| } |
| current->next_ = current->next_->next_; |
| } |
| waker.task_ = nullptr; |
| waker.next_ = nullptr; |
| } |
| |
| bool Task::IsRegistered() const { |
| std::lock_guard lock(dispatcher_lock()); |
| return state_ != Task::State::kUnposted; |
| } |
| |
| void Task::Deregister() { |
| pw::sync::Mutex* task_execution_lock; |
| { |
| // Fast path: the task is not running. |
| std::lock_guard lock(dispatcher_lock()); |
| if (TryDeregister()) { |
| return; |
| } |
| // The task was running, so we have to wait for the task to stop being |
| // run by acquiring the `task_lock`. |
| task_execution_lock = &dispatcher_->task_execution_lock_; |
| } |
| |
| // NOTE: there is a race here where `task_execution_lock_` may be |
| // invalidated by concurrent destruction of the dispatcher. |
| // |
| // This restriction is documented above, but is still fairly footgun-y. |
| std::lock_guard task_lock(*task_execution_lock); |
| std::lock_guard lock(dispatcher_lock()); |
| PW_CHECK(TryDeregister()); |
| } |
| |
| bool Task::TryDeregister() { |
| switch (state_) { |
| case Task::State::kUnposted: |
| return true; |
| case Task::State::kSleeping: |
| dispatcher_->RemoveSleepingTaskLocked(*this); |
| break; |
| case Task::State::kRunning: |
| return false; |
| case Task::State::kWoken: |
| dispatcher_->RemoveWokenTaskLocked(*this); |
| break; |
| } |
| state_ = Task::State::kUnposted; |
| RemoveAllWakersLocked(); |
| |
| // Wake the dispatcher up if this was the last task so that it can see that |
| // all tasks have completed. |
| if (dispatcher_->first_woken_ == nullptr && |
| dispatcher_->sleeping_ == nullptr && dispatcher_->wants_wake_) { |
| dispatcher_->DoWake(); |
| } |
| dispatcher_ = nullptr; |
| return true; |
| } |
| |
| Waker::Waker(Waker&& other) noexcept { |
| std::lock_guard lock(dispatcher_lock()); |
| if (other.task_ == nullptr) { |
| return; |
| } |
| Task& task = *other.task_; |
| task.RemoveWakerLocked(other); |
| task.AddWakerLocked(*this); |
| } |
| |
| Waker& Waker::operator=(Waker&& other) noexcept { |
| std::lock_guard lock(dispatcher_lock()); |
| RemoveFromTaskWakerListLocked(); |
| if (other.task_ == nullptr) { |
| return *this; |
| } |
| Task& task = *other.task_; |
| task.RemoveWakerLocked(other); |
| task.AddWakerLocked(*this); |
| return *this; |
| } |
| |
| void Waker::Wake() && { |
| std::lock_guard lock(dispatcher_lock()); |
| if (task_ != nullptr) { |
| task_->dispatcher_->WakeTask(*task_); |
| RemoveFromTaskWakerListLocked(); |
| } |
| } |
| |
| void Waker::InternalCloneInto(Waker& out) & { |
| std::lock_guard lock(dispatcher_lock()); |
| // The `out` waker already points to this task, so no work is necessary. |
| if (out.task_ == task_) { |
| return; |
| } |
| // Remove the output waker from its existing task's list. |
| out.RemoveFromTaskWakerListLocked(); |
| out.task_ = task_; |
| // Only add if the waker being cloned is actually associated with a task. |
| if (task_ != nullptr) { |
| task_->AddWakerLocked(out); |
| } |
| } |
| |
| bool Waker::IsEmpty() const { |
| std::lock_guard lock(dispatcher_lock()); |
| return task_ == nullptr; |
| } |
| |
| void Waker::InsertIntoTaskWakerList() { |
| std::lock_guard lock(dispatcher_lock()); |
| InsertIntoTaskWakerListLocked(); |
| } |
| |
| void Waker::InsertIntoTaskWakerListLocked() { |
| if (task_ != nullptr) { |
| task_->AddWakerLocked(*this); |
| } |
| } |
| |
| void Waker::RemoveFromTaskWakerList() { |
| std::lock_guard lock(dispatcher_lock()); |
| RemoveFromTaskWakerListLocked(); |
| } |
| |
| void Waker::RemoveFromTaskWakerListLocked() { |
| if (task_ != nullptr) { |
| task_->RemoveWakerLocked(*this); |
| } |
| } |
| |
| void NativeDispatcherBase::Deregister() { |
| std::lock_guard lock(dispatcher_lock()); |
| UnpostTaskList(first_woken_); |
| first_woken_ = nullptr; |
| last_woken_ = nullptr; |
| UnpostTaskList(sleeping_); |
| sleeping_ = nullptr; |
| } |
| |
| void NativeDispatcherBase::Post(Task& task) { |
| bool wake_dispatcher = false; |
| { |
| std::lock_guard lock(dispatcher_lock()); |
| PW_DASSERT(task.state_ == Task::State::kUnposted); |
| PW_DASSERT(task.dispatcher_ == nullptr); |
| task.state_ = Task::State::kWoken; |
| task.dispatcher_ = this; |
| AddTaskToWokenList(task); |
| if (wants_wake_) { |
| wake_dispatcher = true; |
| wants_wake_ = false; |
| } |
| } |
| // Note: unlike in ``WakeTask``, here we know that the ``Dispatcher`` will |
| // not be destroyed out from under our feet because we're in a method being |
| // called on the ``Dispatcher`` by a user. |
| if (wake_dispatcher) { |
| DoWake(); |
| } |
| } |
| |
| NativeDispatcherBase::SleepInfo NativeDispatcherBase::AttemptRequestWake( |
| bool allow_empty) { |
| std::lock_guard lock(dispatcher_lock()); |
| // Don't allow sleeping if there are already tasks waiting to be run. |
| if (first_woken_ != nullptr) { |
| return SleepInfo::DontSleep(); |
| } |
| if (!allow_empty && sleeping_ == nullptr) { |
| return SleepInfo::DontSleep(); |
| } |
| /// Indicate that the ``Dispatcher`` is sleeping and will need a ``DoWake`` |
| /// call once more work can be done. |
| wants_wake_ = true; |
| // Once timers are added, this should check them. |
| return SleepInfo::Indefinitely(); |
| } |
| |
| NativeDispatcherBase::RunOneTaskResult NativeDispatcherBase::RunOneTask( |
| Dispatcher& dispatcher, Task* task_to_look_for) { |
| std::lock_guard task_lock(task_execution_lock_); |
| Task* task; |
| { |
| std::lock_guard lock(dispatcher_lock()); |
| task = PopWokenTask(); |
| if (task == nullptr) { |
| bool all_complete = first_woken_ == nullptr && sleeping_ == nullptr; |
| return RunOneTaskResult( |
| /*completed_all_tasks=*/all_complete, |
| /*completed_main_task=*/false, |
| /*ran_a_task=*/false); |
| } |
| task->state_ = Task::State::kRunning; |
| } |
| |
| bool complete; |
| { |
| Waker waker(*task); |
| Context context(dispatcher, waker); |
| complete = task->Pend(context).IsReady(); |
| } |
| if (complete) { |
| bool all_complete; |
| { |
| std::lock_guard lock(dispatcher_lock()); |
| switch (task->state_) { |
| case Task::State::kUnposted: |
| case Task::State::kSleeping: |
| PW_DASSERT(false); |
| PW_UNREACHABLE; |
| case Task::State::kRunning: |
| break; |
| case Task::State::kWoken: |
| RemoveWokenTaskLocked(*task); |
| break; |
| } |
| task->state_ = Task::State::kUnposted; |
| task->dispatcher_ = nullptr; |
| task->RemoveAllWakersLocked(); |
| all_complete = first_woken_ == nullptr && sleeping_ == nullptr; |
| } |
| task->DoDestroy(); |
| return RunOneTaskResult( |
| /*completed_all_tasks=*/all_complete, |
| /*completed_main_task=*/task == task_to_look_for, |
| /*ran_a_task=*/true); |
| } else { |
| std::lock_guard lock(dispatcher_lock()); |
| if (task->state_ == Task::State::kRunning) { |
| task->state_ = Task::State::kSleeping; |
| AddTaskToSleepingList(*task); |
| } |
| return RunOneTaskResult( |
| /*completed_all_tasks=*/false, |
| /*completed_main_task=*/false, |
| /*ran_a_task=*/true); |
| } |
| } |
| |
| void NativeDispatcherBase::UnpostTaskList(Task* task) { |
| while (task != nullptr) { |
| task->state_ = Task::State::kUnposted; |
| task->dispatcher_ = nullptr; |
| task->prev_ = nullptr; |
| Task* next = task->next_; |
| task->next_ = nullptr; |
| task->RemoveAllWakersLocked(); |
| task = next; |
| } |
| } |
| |
| void NativeDispatcherBase::RemoveTaskFromList(Task& task) { |
| if (task.prev_ != nullptr) { |
| task.prev_->next_ = task.next_; |
| } |
| if (task.next_ != nullptr) { |
| task.next_->prev_ = task.prev_; |
| } |
| task.prev_ = nullptr; |
| task.next_ = nullptr; |
| } |
| |
| void NativeDispatcherBase::RemoveWokenTaskLocked(Task& task) { |
| if (first_woken_ == &task) { |
| first_woken_ = task.next_; |
| } |
| if (last_woken_ == &task) { |
| last_woken_ = task.prev_; |
| } |
| RemoveTaskFromList(task); |
| } |
| |
| void NativeDispatcherBase::RemoveSleepingTaskLocked(Task& task) { |
| if (sleeping_ == &task) { |
| sleeping_ = task.next_; |
| } |
| RemoveTaskFromList(task); |
| } |
| |
| void NativeDispatcherBase::AddTaskToWokenList(Task& task) { |
| if (first_woken_ == nullptr) { |
| first_woken_ = &task; |
| } else { |
| last_woken_->next_ = &task; |
| task.prev_ = last_woken_; |
| } |
| last_woken_ = &task; |
| } |
| |
| void NativeDispatcherBase::AddTaskToSleepingList(Task& task) { |
| if (sleeping_ != nullptr) { |
| sleeping_->prev_ = &task; |
| } |
| task.next_ = sleeping_; |
| sleeping_ = &task; |
| } |
| |
| void NativeDispatcherBase::WakeTask(Task& task) { |
| switch (task.state_) { |
| case Task::State::kWoken: |
| // Do nothing-- this has already been woken. |
| return; |
| case Task::State::kUnposted: |
| // This should be unreachable. |
| PW_CHECK(false); |
| case Task::State::kRunning: |
| // Wake again to indicate that this task should be run once more, |
| // as the state of the world may have changed since the task |
| // started running. |
| break; |
| case Task::State::kSleeping: |
| RemoveSleepingTaskLocked(task); |
| // Wake away! |
| break; |
| } |
| task.state_ = Task::State::kWoken; |
| AddTaskToWokenList(task); |
| if (wants_wake_) { |
| // Note: it's quite annoying to make this call under the lock, as it can |
| // result in extra thread wakeup/sleep cycles. |
| // |
| // However, releasing the lock first would allow for the possibility that |
| // the ``Dispatcher`` has been destroyed, making the call invalid. |
| DoWake(); |
| } |
| } |
| |
| Task* NativeDispatcherBase::PopWokenTask() { |
| if (first_woken_ == nullptr) { |
| return nullptr; |
| } |
| Task& task = *first_woken_; |
| if (task.next_ != nullptr) { |
| task.next_->prev_ = nullptr; |
| } else { |
| last_woken_ = nullptr; |
| } |
| first_woken_ = task.next_; |
| task.prev_ = nullptr; |
| task.next_ = nullptr; |
| return &task; |
| } |
| |
| } // namespace pw::async2 |