| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <lib/fpromise/scheduler.h> |
| |
| #include <map> |
| #include <queue> |
| #include <utility> |
| |
| namespace fpromise { |
| namespace subtle { |
| |
| scheduler::scheduler() = default; |
| |
| scheduler::~scheduler() = default; |
| |
| void scheduler::schedule_task(pending_task task) { |
| assert(task); |
| runnable_tasks_.push(std::move(task)); |
| } |
| |
| suspended_task::ticket scheduler::obtain_ticket(uint32_t initial_refs) { |
| suspended_task::ticket ticket = next_ticket_++; |
| tickets_.emplace(ticket, ticket_record(initial_refs)); |
| return ticket; |
| } |
| |
| void scheduler::finalize_ticket(suspended_task::ticket ticket, pending_task* task) { |
| auto it = tickets_.find(ticket); |
| assert(it != tickets_.end()); |
| assert(!it->second.task); |
| assert(it->second.ref_count > 0); |
| assert(task); |
| |
| it->second.ref_count--; |
| if (!*task) { |
| // task already finished |
| } else if (it->second.was_resumed) { |
| // task immediately became runnable |
| runnable_tasks_.push(std::move(*task)); |
| } else if (it->second.ref_count > 0) { |
| // task remains suspended |
| it->second.task = std::move(*task); |
| suspended_task_count_++; |
| } // else, task was abandoned and caller retains ownership of it |
| if (it->second.ref_count == 0) { |
| tickets_.erase(it); |
| } |
| } |
| |
| void scheduler::duplicate_ticket(suspended_task::ticket ticket) { |
| auto it = tickets_.find(ticket); |
| assert(it != tickets_.end()); |
| assert(it->second.ref_count > 0); |
| |
| it->second.ref_count++; |
| assert(it->second.ref_count != 0); // did we really make 4 billion refs?! |
| } |
| |
| pending_task scheduler::release_ticket(suspended_task::ticket ticket) { |
| auto it = tickets_.find(ticket); |
| assert(it != tickets_.end()); |
| assert(it->second.ref_count > 0); |
| |
| it->second.ref_count--; |
| if (it->second.ref_count == 0) { |
| pending_task task = std::move(it->second.task); |
| if (task) { |
| assert(suspended_task_count_ > 0); |
| suspended_task_count_--; |
| } |
| tickets_.erase(it); |
| return task; |
| } |
| return pending_task(); |
| } |
| |
| bool scheduler::resume_task_with_ticket(suspended_task::ticket ticket) { |
| auto it = tickets_.find(ticket); |
| assert(it != tickets_.end()); |
| assert(it->second.ref_count > 0); |
| |
| bool did_resume = false; |
| it->second.ref_count--; |
| if (!it->second.was_resumed) { |
| it->second.was_resumed = true; |
| if (it->second.task) { |
| did_resume = true; |
| assert(suspended_task_count_ > 0); |
| suspended_task_count_--; |
| runnable_tasks_.push(std::move(it->second.task)); |
| } |
| } |
| if (it->second.ref_count == 0) { |
| tickets_.erase(it); |
| } |
| return did_resume; |
| } |
| |
| void scheduler::take_runnable_tasks(task_queue* tasks) { |
| assert(tasks && tasks->empty()); |
| runnable_tasks_.swap(*tasks); |
| } |
| |
| void scheduler::take_all_tasks(task_queue* tasks) { |
| assert(tasks && tasks->empty()); |
| |
| runnable_tasks_.swap(*tasks); |
| if (suspended_task_count_ > 0) { |
| for (auto& item : tickets_) { |
| if (item.second.task) { |
| assert(suspended_task_count_ > 0); |
| suspended_task_count_--; |
| tasks->push(std::move(item.second.task)); |
| } |
| } |
| } |
| } |
| |
| } // namespace subtle |
| } // namespace fpromise |