| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <lib/fit/thread_safety.h> |
| #include <lib/fpromise/single_threaded_executor.h> |
| |
| #include <condition_variable> |
| #include <mutex> |
| |
| namespace fpromise { |
| |
| // The dispatcher runs tasks and provides the suspended task resolver. |
| // |
| // The lifetime of this object is somewhat complex since there are pointers |
| // to it from multiple sources which are released in different ways. |
| // |
| // - |single_threaded_executor| holds a pointer in |dispatcher_| which it releases |
| // after calling |shutdown()| to inform the dispatcher of its own demise |
| // - |suspended_task| holds a pointer to the dispatcher's resolver |
| // interface and the number of outstanding pointers corresponds to the |
| // number of outstanding suspended task tickets tracked by |scheduler_|. |
| // |
| // The dispatcher deletes itself once all pointers have been released. |
| class single_threaded_executor::dispatcher_impl final : public suspended_task::resolver { |
| public: |
| dispatcher_impl(); |
| |
| void shutdown(); |
| void schedule_task(pending_task task); |
| void run(context_impl& context); |
| suspended_task suspend_current_task(); |
| |
| suspended_task::ticket duplicate_ticket(suspended_task::ticket ticket) override; |
| void resolve_ticket(suspended_task::ticket ticket, bool resume_task) override; |
| |
| private: |
| ~dispatcher_impl() override; |
| |
| void wait_for_runnable_tasks(fpromise::subtle::scheduler::task_queue* out_tasks); |
| void run_task(pending_task* task, context& context); |
| |
| suspended_task::ticket current_task_ticket_ = 0; |
| std::condition_variable wake_; |
| |
| // A bunch of state that is guarded by a mutex. |
| struct { |
| std::mutex mutex_; |
| bool was_shutdown_ FIT_GUARDED(mutex_) = false; |
| bool need_wake_ FIT_GUARDED(mutex_) = false; |
| fpromise::subtle::scheduler scheduler_ FIT_GUARDED(mutex_); |
| } guarded_; |
| }; |
| |
| single_threaded_executor::single_threaded_executor() |
| : context_(this), dispatcher_(new dispatcher_impl()) {} |
| |
| single_threaded_executor::~single_threaded_executor() { dispatcher_->shutdown(); } |
| |
| void single_threaded_executor::schedule_task(pending_task task) { |
| assert(task); |
| dispatcher_->schedule_task(std::move(task)); |
| } |
| |
| void single_threaded_executor::run() { dispatcher_->run(context_); } |
| |
| single_threaded_executor::context_impl::context_impl(single_threaded_executor* executor) |
| : executor_(executor) {} |
| |
| single_threaded_executor::context_impl::~context_impl() = default; |
| |
| single_threaded_executor* single_threaded_executor::context_impl::executor() const { |
| return executor_; |
| } |
| |
| suspended_task single_threaded_executor::context_impl::suspend_task() { |
| return executor_->dispatcher_->suspend_current_task(); |
| } |
| |
| single_threaded_executor::dispatcher_impl::dispatcher_impl() = default; |
| |
| single_threaded_executor::dispatcher_impl::~dispatcher_impl() { |
| std::lock_guard<std::mutex> lock(guarded_.mutex_); |
| assert(guarded_.was_shutdown_); |
| assert(!guarded_.scheduler_.has_runnable_tasks()); |
| assert(!guarded_.scheduler_.has_suspended_tasks()); |
| assert(!guarded_.scheduler_.has_outstanding_tickets()); |
| } |
| |
| void single_threaded_executor::dispatcher_impl::shutdown() { |
| fpromise::subtle::scheduler::task_queue tasks; // drop outside of the lock |
| { |
| std::lock_guard<std::mutex> lock(guarded_.mutex_); |
| assert(!guarded_.was_shutdown_); |
| guarded_.was_shutdown_ = true; |
| guarded_.scheduler_.take_all_tasks(&tasks); |
| if (guarded_.scheduler_.has_outstanding_tickets()) { |
| return; // can't delete self yet |
| } |
| } |
| |
| // Must destroy self outside of the lock. |
| delete this; |
| } |
| |
| void single_threaded_executor::dispatcher_impl::schedule_task(pending_task task) { |
| { |
| std::lock_guard<std::mutex> lock(guarded_.mutex_); |
| assert(!guarded_.was_shutdown_); |
| guarded_.scheduler_.schedule_task(std::move(task)); |
| if (!guarded_.need_wake_) { |
| return; // don't need to wake |
| } |
| guarded_.need_wake_ = false; |
| } |
| |
| // It is more efficient to notify outside the lock. |
| wake_.notify_one(); |
| } |
| |
| void single_threaded_executor::dispatcher_impl::run(context_impl& context) { |
| fpromise::subtle::scheduler::task_queue tasks; |
| for (;;) { |
| wait_for_runnable_tasks(&tasks); |
| if (tasks.empty()) { |
| return; // all done! |
| } |
| |
| do { |
| run_task(&tasks.front(), context); |
| tasks.pop(); // the task may be destroyed here if it was not suspended |
| } while (!tasks.empty()); |
| } |
| } |
| |
| // Must only be called while |run_task()| is running a task. |
| // This happens when the task's continuation calls |context::suspend_task()| |
| // upon the context it received as an argument. |
| suspended_task single_threaded_executor::dispatcher_impl::suspend_current_task() { |
| std::lock_guard<std::mutex> lock(guarded_.mutex_); |
| assert(!guarded_.was_shutdown_); |
| if (current_task_ticket_ == 0) { |
| current_task_ticket_ = guarded_.scheduler_.obtain_ticket(2 /*initial_refs*/); |
| } else { |
| guarded_.scheduler_.duplicate_ticket(current_task_ticket_); |
| } |
| return suspended_task(this, current_task_ticket_); |
| } |
| |
| // Unfortunately std::unique_lock does not support thread-safety annotations |
| void single_threaded_executor::dispatcher_impl::wait_for_runnable_tasks( |
| fpromise::subtle::scheduler::task_queue* out_tasks) FIT_NO_THREAD_SAFETY_ANALYSIS { |
| std::unique_lock<std::mutex> lock(guarded_.mutex_); |
| for (;;) { |
| assert(!guarded_.was_shutdown_); |
| guarded_.scheduler_.take_runnable_tasks(out_tasks); |
| if (!out_tasks->empty()) { |
| return; // got some tasks |
| } |
| if (!guarded_.scheduler_.has_suspended_tasks()) { |
| return; // all done! |
| } |
| guarded_.need_wake_ = true; |
| wake_.wait(lock); |
| guarded_.need_wake_ = false; |
| } |
| } |
| |
| void single_threaded_executor::dispatcher_impl::run_task(pending_task* task, context& context) { |
| assert(current_task_ticket_ == 0); |
| const bool finished = (*task)(context); |
| assert(!*task == finished); |
| (void)finished; |
| if (current_task_ticket_ == 0) { |
| return; // task was not suspended, no ticket was produced |
| } |
| |
| std::lock_guard<std::mutex> lock(guarded_.mutex_); |
| assert(!guarded_.was_shutdown_); |
| guarded_.scheduler_.finalize_ticket(current_task_ticket_, task); |
| current_task_ticket_ = 0; |
| } |
| |
| suspended_task::ticket single_threaded_executor::dispatcher_impl::duplicate_ticket( |
| suspended_task::ticket ticket) { |
| std::lock_guard<std::mutex> lock(guarded_.mutex_); |
| guarded_.scheduler_.duplicate_ticket(ticket); |
| return ticket; |
| } |
| |
| void single_threaded_executor::dispatcher_impl::resolve_ticket(suspended_task::ticket ticket, |
| bool resume_task) { |
| pending_task abandoned_task; // drop outside of the lock |
| bool do_wake = false; |
| { |
| std::lock_guard<std::mutex> lock(guarded_.mutex_); |
| if (resume_task) { |
| guarded_.scheduler_.resume_task_with_ticket(ticket); |
| } else { |
| abandoned_task = guarded_.scheduler_.release_ticket(ticket); |
| } |
| if (guarded_.was_shutdown_) { |
| assert(!guarded_.need_wake_); |
| if (guarded_.scheduler_.has_outstanding_tickets()) { |
| return; // can't shutdown yet |
| } |
| } else if (guarded_.need_wake_ && (guarded_.scheduler_.has_runnable_tasks() || |
| !guarded_.scheduler_.has_suspended_tasks())) { |
| guarded_.need_wake_ = false; |
| do_wake = true; |
| } else { |
| return; // nothing else to do |
| } |
| } |
| |
| // Must do this outside of the lock. |
| if (do_wake) { |
| wake_.notify_one(); |
| } else { |
| delete this; |
| } |
| } |
| |
| } // namespace fpromise |