blob: 9e126e6ab415dd10f09c582d189a7602a200409f [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/async-testutils/test_loop_dispatcher.h>
#include <zircon/assert.h>
#include <zircon/status.h>
#include <zircon/syscalls.h>
#define TO_NODE(type, ptr) ((list_node_t*)&ptr->state)
#define FROM_NODE(type, ptr) ((type*)((char*)(ptr)-offsetof(type, state)))
namespace async {
namespace {
// The packet key used to signal timer expirations.
constexpr uint64_t kTimerExpirationKey = 0u;
// Convenience functions for task, wait, and list node management.
inline list_node_t* WaitToNode(async_wait_t* wait) {
return TO_NODE(async_wait_t, wait);
}
inline async_wait_t* NodeToWait(list_node_t* node) {
return FROM_NODE(async_wait_t, node);
}
inline list_node_t* TaskToNode(async_task_t* task) {
return TO_NODE(async_task_t, task);
}
inline async_task_t* NodeToTask(list_node_t* node) {
return FROM_NODE(async_task_t, node);
}
inline void InsertTask(list_node_t* task_list, async_task_t* task) {
list_node_t* node;
for (node = task_list->prev; node != task_list; node = node->prev) {
if (task->deadline >= NodeToTask(node)->deadline) {
break;
}
}
list_add_after(node, TaskToNode(task));
}
} // namespace
TestLoopDispatcher::TestLoopDispatcher(TimeKeeper* time_keeper)
: time_keeper_(time_keeper) {
ZX_DEBUG_ASSERT(time_keeper_);
list_initialize(&wait_list_);
list_initialize(&task_list_);
list_initialize(&due_list_);
zx_status_t status = zx::port::create(0u, &port_);
ZX_ASSERT_MSG(status == ZX_OK,
"zx_port_create: %s",
zx_status_get_string(status));
}
TestLoopDispatcher::~TestLoopDispatcher() {
Shutdown();
time_keeper_->CancelTimers(this);
};
zx::time TestLoopDispatcher::Now() { return time_keeper_->Now(); }
// TODO(ZX-2390): Return ZX_ERR_CANCELED if dispatcher is shutting down.
zx_status_t TestLoopDispatcher::BeginWait(async_wait_t* wait) {
ZX_DEBUG_ASSERT(wait);
// Along with the above assertion, the following check guarantees that the
// packet to be sent to |port_| on completion of this wait will not be
// mistaken for a timer expiration.
static_assert(0u == kTimerExpirationKey,
"Timer expirations must be signaled with a packet key of 0");
list_add_head(&wait_list_, WaitToNode(wait));
zx_status_t status = zx_object_wait_async(wait->object, port_.get(),
reinterpret_cast<uintptr_t>(wait),
wait->trigger,
ZX_WAIT_ASYNC_ONCE);
if (status != ZX_OK) {
// In this rare condition, the wait failed. Since a dispatched handler will
// never be invoked on the wait object, we remove it ourselves.
list_delete(WaitToNode(wait));
}
return status;
}
zx_status_t TestLoopDispatcher::CancelWait(async_wait_t* wait) {
ZX_DEBUG_ASSERT(wait);
list_node_t* node = WaitToNode(wait);
if (!list_in_list(node)) {
return ZX_ERR_NOT_FOUND;
}
// |wait| already might be encoded in |due_packet_|.
if (due_packet_ && due_packet_->key != kTimerExpirationKey) {
if (wait == reinterpret_cast<async_wait_t*>(due_packet_->key)) {
due_packet_.reset();
list_delete(node);
return ZX_OK;
}
}
zx_status_t status = port_.cancel(*zx::unowned_handle(wait->object),
reinterpret_cast<uintptr_t>(wait));
if (status == ZX_OK) {
list_delete(node);
}
return status;
}
// TODO(ZX-2390): Return ZX_ERR_CANCELED if dispatcher is shutting down.
zx_status_t TestLoopDispatcher::PostTask(async_task_t* task) {
ZX_DEBUG_ASSERT(task);
InsertTask(&task_list_, task);
if (NodeToTask(list_peek_head(&task_list_)) == task) {
time_keeper_->RegisterTimer(GetNextTaskDueTime(), this);
}
return ZX_OK;
}
zx_status_t TestLoopDispatcher::CancelTask(async_task_t* task) {
ZX_DEBUG_ASSERT(task);
list_node_t* node = TaskToNode(task);
if (!list_in_list(node)) {
return ZX_ERR_NOT_FOUND;
}
list_delete(node);
return ZX_OK;
}
void TestLoopDispatcher::FireTimer() {
zx_port_packet_t timer_packet{};
timer_packet.key = kTimerExpirationKey;
timer_packet.type = ZX_PKT_TYPE_USER;
zx_status_t status = port_.queue(&timer_packet);
ZX_ASSERT_MSG(status == ZX_OK,
"zx_port_queue: %s",
zx_status_get_string(status));
}
zx::time TestLoopDispatcher::GetNextTaskDueTime() {
list_node_t* node = list_is_empty(&due_list_) ?
list_peek_head(&task_list_) :
list_peek_head(&due_list_);
if (!node) {
return zx::time::infinite();
}
return zx::time(NodeToTask(node)->deadline);
}
void TestLoopDispatcher::ExtractNextDuePacket() {
ZX_DEBUG_ASSERT(!due_packet_);
bool tasks_are_due = GetNextTaskDueTime() <= Now();
// If no tasks are due, flush all timer expiration packets until either
// there are no more packets to dequeue or a wait packet is reached.
do {
auto packet = fbl::make_unique<zx_port_packet_t>();
if (ZX_OK != port_.wait(zx::time(0), packet.get())) { return; }
due_packet_.swap(packet);
} while (!tasks_are_due && due_packet_->key == kTimerExpirationKey);
}
bool TestLoopDispatcher::HasPendingWork() {
if (GetNextTaskDueTime() <= Now()) { return true; }
if (!due_packet_) { ExtractNextDuePacket(); }
return !!due_packet_;
}
void TestLoopDispatcher::DispatchNextDueTask() {
// if something is already in the due list, dispatch that.
list_node_t* node = list_peek_head(&due_list_);
if (node) {
list_delete(node);
async_task_t* task = NodeToTask(node);
task->handler(this, task, ZX_OK);
// If the due list is now empty and there are still pending tasks,
// register a timer for the next due time.
if (list_is_empty(&due_list_) && !list_is_empty(&task_list_)) {
time_keeper_->RegisterTimer(GetNextTaskDueTime(), this);
}
}
}
bool TestLoopDispatcher::DispatchNextDueMessage() {
if (!list_is_empty(&due_list_)) {
DispatchNextDueTask();
return true;
}
if (!due_packet_) { ExtractNextDuePacket(); }
if (!due_packet_) {
return false;
} else if (due_packet_->key == kTimerExpirationKey) {
ExtractDueTasks();
DispatchNextDueTask();
due_packet_.reset();
} else { // |due_packet_| encodes a finished wait.
// Move the next due packet to the stack, as invoking the associated
// wait's handler might try to extract another.
zx_port_packet_t packet = *due_packet_;
due_packet_.reset();
async_wait_t* wait = reinterpret_cast<async_wait_t*>(packet.key);
list_delete(WaitToNode(wait));
wait->handler(this, wait, ZX_OK, &packet.signal);
}
return true;
}
void TestLoopDispatcher::ExtractDueTasks() {
list_node_t* node;
list_node_t* tail = nullptr;
zx::time current_time = time_keeper_->Now();
list_for_every(&task_list_, node) {
if (NodeToTask(node)->deadline > current_time.get()) { break; }
tail = node;
}
if (tail) {
list_node_t* head = task_list_.next;
task_list_.next = tail->next;
tail->next->prev = &task_list_;
due_list_.next = head;
head->prev = &due_list_;
due_list_.prev = tail;
tail->next = &due_list_;
}
}
void TestLoopDispatcher::Shutdown() {
list_node_t* node;
while ((node = list_remove_head(&wait_list_))) {
async_wait_t* wait = NodeToWait(node);
wait->handler(this, wait, ZX_ERR_CANCELED, nullptr);
}
while ((node = list_remove_head(&due_list_))) {
async_task_t* task = NodeToTask(node);
task->handler(this, task, ZX_ERR_CANCELED);
}
while ((node = list_remove_head(&task_list_))) {
async_task_t* task = NodeToTask(node);
task->handler(this, task, ZX_ERR_CANCELED);
}
}
} // namespace async