| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <lib/async-testutils/test_loop_dispatcher.h> |
| |
| #include <zircon/assert.h> |
| #include <zircon/status.h> |
| #include <zircon/syscalls.h> |
| |
| #define TO_NODE(type, ptr) ((list_node_t*)&ptr->state) |
| #define FROM_NODE(type, ptr) ((type*)((char*)(ptr)-offsetof(type, state))) |
| |
| namespace async { |
| |
| namespace { |
| |
| // The packet key used to signal timer expirations. |
| constexpr uint64_t kTimerExpirationKey = 0u; |
| |
| // Convenience functions for task, wait, and list node management. |
| inline list_node_t* WaitToNode(async_wait_t* wait) { |
| return TO_NODE(async_wait_t, wait); |
| } |
| |
| inline async_wait_t* NodeToWait(list_node_t* node) { |
| return FROM_NODE(async_wait_t, node); |
| } |
| |
| inline list_node_t* TaskToNode(async_task_t* task) { |
| return TO_NODE(async_task_t, task); |
| } |
| |
| inline async_task_t* NodeToTask(list_node_t* node) { |
| return FROM_NODE(async_task_t, node); |
| } |
| |
| inline void InsertTask(list_node_t* task_list, async_task_t* task) { |
| list_node_t* node; |
| for (node = task_list->prev; node != task_list; node = node->prev) { |
| if (task->deadline >= NodeToTask(node)->deadline) { |
| break; |
| } |
| } |
| list_add_after(node, TaskToNode(task)); |
| } |
| } // namespace |
| |
| TestLoopDispatcher::TestLoopDispatcher(TimeKeeper* time_keeper) |
| : time_keeper_(time_keeper) { |
| ZX_DEBUG_ASSERT(time_keeper_); |
| list_initialize(&wait_list_); |
| list_initialize(&task_list_); |
| list_initialize(&due_list_); |
| zx_status_t status = zx::port::create(0u, &port_); |
| ZX_ASSERT_MSG(status == ZX_OK, |
| "zx_port_create: %s", |
| zx_status_get_string(status)); |
| } |
| |
| TestLoopDispatcher::~TestLoopDispatcher() { |
| Shutdown(); |
| time_keeper_->CancelTimers(this); |
| }; |
| |
| zx::time TestLoopDispatcher::Now() { return time_keeper_->Now(); } |
| |
| // TODO(ZX-2390): Return ZX_ERR_CANCELED if dispatcher is shutting down. |
| zx_status_t TestLoopDispatcher::BeginWait(async_wait_t* wait) { |
| ZX_DEBUG_ASSERT(wait); |
| |
| // Along with the above assertion, the following check guarantees that the |
| // packet to be sent to |port_| on completion of this wait will not be |
| // mistaken for a timer expiration. |
| static_assert(0u == kTimerExpirationKey, |
| "Timer expirations must be signaled with a packet key of 0"); |
| |
| list_add_head(&wait_list_, WaitToNode(wait)); |
| zx_status_t status = zx_object_wait_async(wait->object, port_.get(), |
| reinterpret_cast<uintptr_t>(wait), |
| wait->trigger, |
| ZX_WAIT_ASYNC_ONCE); |
| |
| if (status != ZX_OK) { |
| // In this rare condition, the wait failed. Since a dispatched handler will |
| // never be invoked on the wait object, we remove it ourselves. |
| list_delete(WaitToNode(wait)); |
| } |
| return status; |
| } |
| |
| zx_status_t TestLoopDispatcher::CancelWait(async_wait_t* wait) { |
| ZX_DEBUG_ASSERT(wait); |
| |
| list_node_t* node = WaitToNode(wait); |
| if (!list_in_list(node)) { |
| return ZX_ERR_NOT_FOUND; |
| } |
| |
| // |wait| already might be encoded in |due_packet_|. |
| if (due_packet_ && due_packet_->key != kTimerExpirationKey) { |
| if (wait == reinterpret_cast<async_wait_t*>(due_packet_->key)) { |
| due_packet_.reset(); |
| list_delete(node); |
| return ZX_OK; |
| } |
| } |
| |
| zx_status_t status = port_.cancel(*zx::unowned_handle(wait->object), |
| reinterpret_cast<uintptr_t>(wait)); |
| if (status == ZX_OK) { |
| list_delete(node); |
| } |
| return status; |
| } |
| |
| // TODO(ZX-2390): Return ZX_ERR_CANCELED if dispatcher is shutting down. |
| zx_status_t TestLoopDispatcher::PostTask(async_task_t* task) { |
| ZX_DEBUG_ASSERT(task); |
| |
| InsertTask(&task_list_, task); |
| if (NodeToTask(list_peek_head(&task_list_)) == task) { |
| time_keeper_->RegisterTimer(GetNextTaskDueTime(), this); |
| } |
| return ZX_OK; |
| } |
| |
| zx_status_t TestLoopDispatcher::CancelTask(async_task_t* task) { |
| ZX_DEBUG_ASSERT(task); |
| list_node_t* node = TaskToNode(task); |
| if (!list_in_list(node)) { |
| return ZX_ERR_NOT_FOUND; |
| } |
| list_delete(node); |
| return ZX_OK; |
| } |
| |
| void TestLoopDispatcher::FireTimer() { |
| zx_port_packet_t timer_packet{}; |
| timer_packet.key = kTimerExpirationKey; |
| timer_packet.type = ZX_PKT_TYPE_USER; |
| zx_status_t status = port_.queue(&timer_packet); |
| ZX_ASSERT_MSG(status == ZX_OK, |
| "zx_port_queue: %s", |
| zx_status_get_string(status)); |
| } |
| |
| zx::time TestLoopDispatcher::GetNextTaskDueTime() { |
| list_node_t* node = list_is_empty(&due_list_) ? |
| list_peek_head(&task_list_) : |
| list_peek_head(&due_list_); |
| if (!node) { |
| return zx::time::infinite(); |
| } |
| return zx::time(NodeToTask(node)->deadline); |
| } |
| |
| |
| void TestLoopDispatcher::ExtractNextDuePacket() { |
| ZX_DEBUG_ASSERT(!due_packet_); |
| bool tasks_are_due = GetNextTaskDueTime() <= Now(); |
| |
| // If no tasks are due, flush all timer expiration packets until either |
| // there are no more packets to dequeue or a wait packet is reached. |
| do { |
| auto packet = fbl::make_unique<zx_port_packet_t>(); |
| if (ZX_OK != port_.wait(zx::time(0), packet.get())) { return; } |
| due_packet_.swap(packet); |
| } while (!tasks_are_due && due_packet_->key == kTimerExpirationKey); |
| } |
| |
| bool TestLoopDispatcher::HasPendingWork() { |
| if (GetNextTaskDueTime() <= Now()) { return true; } |
| if (!due_packet_) { ExtractNextDuePacket(); } |
| return !!due_packet_; |
| } |
| |
| void TestLoopDispatcher::DispatchNextDueTask() { |
| // if something is already in the due list, dispatch that. |
| list_node_t* node = list_peek_head(&due_list_); |
| if (node) { |
| list_delete(node); |
| async_task_t* task = NodeToTask(node); |
| task->handler(this, task, ZX_OK); |
| |
| // If the due list is now empty and there are still pending tasks, |
| // register a timer for the next due time. |
| if (list_is_empty(&due_list_) && !list_is_empty(&task_list_)) { |
| time_keeper_->RegisterTimer(GetNextTaskDueTime(), this); |
| } |
| } |
| } |
| |
| bool TestLoopDispatcher::DispatchNextDueMessage() { |
| if (!list_is_empty(&due_list_)) { |
| DispatchNextDueTask(); |
| return true; |
| } |
| |
| if (!due_packet_) { ExtractNextDuePacket(); } |
| |
| if (!due_packet_) { |
| return false; |
| } else if (due_packet_->key == kTimerExpirationKey) { |
| ExtractDueTasks(); |
| DispatchNextDueTask(); |
| due_packet_.reset(); |
| } else { // |due_packet_| encodes a finished wait. |
| // Move the next due packet to the stack, as invoking the associated |
| // wait's handler might try to extract another. |
| zx_port_packet_t packet = *due_packet_; |
| due_packet_.reset(); |
| async_wait_t* wait = reinterpret_cast<async_wait_t*>(packet.key); |
| list_delete(WaitToNode(wait)); |
| wait->handler(this, wait, ZX_OK, &packet.signal); |
| } |
| return true; |
| } |
| |
| void TestLoopDispatcher::ExtractDueTasks() { |
| list_node_t* node; |
| list_node_t* tail = nullptr; |
| zx::time current_time = time_keeper_->Now(); |
| list_for_every(&task_list_, node) { |
| if (NodeToTask(node)->deadline > current_time.get()) { break; } |
| tail = node; |
| } |
| if (tail) { |
| list_node_t* head = task_list_.next; |
| task_list_.next = tail->next; |
| tail->next->prev = &task_list_; |
| due_list_.next = head; |
| head->prev = &due_list_; |
| due_list_.prev = tail; |
| tail->next = &due_list_; |
| } |
| } |
| |
| void TestLoopDispatcher::Shutdown() { |
| list_node_t* node; |
| while ((node = list_remove_head(&wait_list_))) { |
| async_wait_t* wait = NodeToWait(node); |
| wait->handler(this, wait, ZX_ERR_CANCELED, nullptr); |
| } |
| while ((node = list_remove_head(&due_list_))) { |
| async_task_t* task = NodeToTask(node); |
| task->handler(this, task, ZX_ERR_CANCELED); |
| } |
| while ((node = list_remove_head(&task_list_))) { |
| async_task_t* task = NodeToTask(node); |
| task->handler(this, task, ZX_ERR_CANCELED); |
| } |
| } |
| |
| } // namespace async |