blob: fa668d28404c87a6e90ae1fd0ef73771aa2a2cbe [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors.All rights reserved.
// Use of this source code is governed by a BSD - style license that can be
// found in the LICENSE file.
#include <lib/async-testutils/test_loop_dispatcher.h>
#include <zircon/assert.h>
#include <zircon/syscalls.h>
#define TO_NODE(type, ptr) ((list_node_t*)&ptr->state)
#define FROM_NODE(type, ptr) ((type*)((char*)(ptr)-offsetof(type, state)))
namespace async {
namespace {
// Convenience functions for task, wait, and list node management.
inline list_node_t* WaitToNode(async_wait_t* wait) {
return TO_NODE(async_wait_t, wait);
}
inline async_wait_t* NodeToWait(list_node_t* node) {
return FROM_NODE(async_wait_t, node);
}
inline list_node_t* TaskToNode(async_task_t* task) {
return TO_NODE(async_task_t, task);
}
inline async_task_t* NodeToTask(list_node_t* node) {
return FROM_NODE(async_task_t, node);
}
inline void InsertTask(list_node_t* task_list, async_task_t* task) {
list_node_t* node;
for (node = task_list->prev; node != task_list; node = node->prev) {
if (task->deadline >= NodeToTask(node)->deadline) break;
}
list_add_after(node, TaskToNode(task));
}
} // namespace
TestLoopDispatcher::TestLoopDispatcher() {
list_initialize(&wait_list_);
list_initialize(&task_list_);
list_initialize(&due_list_);
zx_status_t status = zx::port::create(0u, &port_);
ZX_ASSERT_MSG(status == ZX_OK, "status=%d", status);
}
TestLoopDispatcher::~TestLoopDispatcher() {
Shutdown();
};
zx_status_t TestLoopDispatcher::BeginWait(async_wait_t* wait) {
ZX_DEBUG_ASSERT(wait);
list_add_head(&wait_list_, WaitToNode(wait));
zx_status_t status = zx_object_wait_async(wait->object, port_.get(),
reinterpret_cast<uintptr_t>(wait),
wait->trigger,
ZX_WAIT_ASYNC_ONCE);
if (status != ZX_OK) {
// In this rare condition, the wait failed. Since a dispatched handler will
// never be invoked on the wait object, we remove it ourselves.
list_delete(WaitToNode(wait));
}
return status;
}
zx_status_t TestLoopDispatcher::CancelWait(async_wait_t* wait) {
ZX_DEBUG_ASSERT(wait);
zx_status_t status = port_.cancel(wait->object, reinterpret_cast<uintptr_t>(wait));
if (status == ZX_OK) {
list_delete(WaitToNode(wait));
}
return status;
}
zx_status_t TestLoopDispatcher::PostTask(async_task_t* task) {
ZX_DEBUG_ASSERT(task);
InsertTask(&task_list_, task);
return ZX_OK;
}
zx_status_t TestLoopDispatcher::CancelTask(async_task_t* task) {
ZX_DEBUG_ASSERT(task);
list_node_t* node = TaskToNode(task);
if (!list_in_list(node)) {
return ZX_ERR_NOT_FOUND;
}
list_delete(node);
return ZX_OK;
}
void TestLoopDispatcher::AdvanceTimeTo(zx::time time) {
ZX_ASSERT_MSG(!is_dispatching_, "Cannot advance time while dispatching.");
if (current_time_ < time) {
current_time_ = time;
}
}
bool TestLoopDispatcher::RunUntil(zx::time deadline) {
ZX_ASSERT(!is_dispatching_);
bool did_work = false;
for (;;) {
bool ran_handler = false;
is_dispatching_ = true;
if (has_quit_) break;
ran_handler |= DispatchPendingTasks();
did_work |= ran_handler;
if (has_quit_) break;
ran_handler |= DispatchPendingWaits();
did_work |= ran_handler;
if (ran_handler) continue;
is_dispatching_ = false;
zx::time next_due_time = GetNextTaskDueTime();
if (next_due_time > deadline) {
AdvanceTimeTo(deadline);
break;
}
AdvanceTimeTo(next_due_time);
}
is_dispatching_ = false;
has_quit_ = false;
return did_work;
}
zx::time TestLoopDispatcher::GetNextTaskDueTime() {
list_node_t* node = list_peek_head(&task_list_);
if (!node) {
return zx::time::infinite();
}
return zx::time(NodeToTask(node)->deadline);
}
bool TestLoopDispatcher::DispatchPendingWaits() {
// First, enqueue a user packet into |port_| to mark the tail of the waits
// queued at the time of this method call.
// Set the key of this packet to |wait_id|, to uniquely identify this packet
// from others that might be left queued at the port from previous iterations
// that prematurely broke due to a quit.
zx_port_packet_t user_packet{};
user_packet.key = wait_id_;
user_packet.type = ZX_PKT_TYPE_USER;
ZX_ASSERT(ZX_OK == port_.queue(&user_packet, 1u));
bool did_work = false;
for (;;) {
if (has_quit_) break;
zx_port_packet_t packet;
// Grace of the user packet, |port_| should always have a queued wait.
ZX_ASSERT(ZX_OK == port_.wait(zx::time(0), &packet, 1));
if (packet.type == ZX_PKT_TYPE_USER) {
if (packet.key == wait_id_) {
// The packet is one we queued at the beginning of this call:
// all pending waits have been dispatched.
break;
} else {
// The packet is a leftover from a previously quit iteration:
// carry on.
continue;
}
}
async_wait_t* wait = reinterpret_cast<async_wait_t*>(packet.key);
list_delete(WaitToNode(wait));
wait->handler(this, wait, ZX_OK, &packet.signal);
did_work = true;
}
++wait_id_;
return did_work;
}
bool TestLoopDispatcher::DispatchPendingTasks() {
ExtractDueTasks();
// Dequeue and dispatch due tasks one at a time.
bool did_work = false;
list_node_t* node;
while ((node = list_peek_head(&due_list_))) {
if (has_quit_) break;
list_delete(node);
async_task_t* task = NodeToTask(node);
task->handler(this, task, ZX_OK);
did_work = true;
}
return did_work;
}
void TestLoopDispatcher::ExtractDueTasks() {
list_node_t* node;
list_node_t* tail = NULL;
list_for_every(&task_list_, node) {
if (NodeToTask(node)->deadline > current_time_.get()) break;
tail = node;
}
if (tail) {
list_node_t* head = task_list_.next;
task_list_.next = tail->next;
tail->next->prev = &task_list_;
due_list_.next = head;
head->prev = &due_list_;
due_list_.prev = tail;
tail->next = &due_list_;
}
}
void TestLoopDispatcher::Shutdown() {
list_node_t* node;
while ((node = list_remove_head(&wait_list_))) {
async_wait_t* wait = NodeToWait(node);
wait->handler(this, wait, ZX_ERR_CANCELED, NULL);
}
while ((node = list_remove_head(&due_list_))) {
async_task_t* task = NodeToTask(node);
task->handler(this, task, ZX_ERR_CANCELED);
}
while ((node = list_remove_head(&task_list_))) {
async_task_t* task = NodeToTask(node);
task->handler(this, task, ZX_ERR_CANCELED);
}
}
} // namespace async