blob: 126975e74ba873a894c8087b399bff538dde41f3 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/async-testing/dispatcher_stub.h>
#include <lib/async-testing/test_loop_dispatcher.h>
#include <lib/async/default.h>
#include <lib/async/dispatcher.h>
#include <lib/async/task.h>
#include <lib/async/wait.h>
#include <lib/fit/defer.h>
#include <lib/zx/port.h>
#include <lib/zx/time.h>
#include <zircon/assert.h>
#include <zircon/compiler.h>
#include <zircon/errors.h>
#include <zircon/status.h>
#include <zircon/syscalls.h>
#include <zircon/syscalls/port.h>
#include <list>
#include <memory>
#include <mutex>
#include <set>
namespace async {
namespace {
// An asynchronous dispatcher with an abstracted sense of time, controlled by an
// external time-keeping object, for use in testing.
class TestLoopDispatcher : public DispatcherStub, public async_test_subloop_t {
public:
TestLoopDispatcher();
~TestLoopDispatcher();
TestLoopDispatcher(const TestLoopDispatcher&) = delete;
TestLoopDispatcher& operator=(const TestLoopDispatcher&) = delete;
// async_dispatcher_t operation implementations.
zx::time Now() override __TA_EXCLUDES(&dispatcher_mtx_) {
std::lock_guard<std::mutex> lock(dispatcher_mtx_);
return NowLocked();
}
zx_status_t BeginWait(async_wait_t* wait) __TA_EXCLUDES(&dispatcher_mtx_) override;
zx_status_t CancelWait(async_wait_t* wait) __TA_EXCLUDES(&dispatcher_mtx_) override;
zx_status_t PostTask(async_task_t* task) __TA_EXCLUDES(&dispatcher_mtx_) override;
zx_status_t CancelTask(async_task_t* task) __TA_EXCLUDES(&dispatcher_mtx_) override;
// async_test_loop_provider_t operations implementations.
static void AdvanceTimeTo(async_test_subloop_t* subloop, zx_time_t time)
__TA_EXCLUDES(&dispatcher_mtx_);
static uint8_t DispatchNextDueMessage(async_test_subloop_t* subloop)
__TA_EXCLUDES(&dispatcher_mtx_);
static uint8_t HasPendingWork(async_test_subloop_t* subloop) __TA_EXCLUDES(&dispatcher_mtx_);
static zx_time_t GetNextTaskDueTime(async_test_subloop_t* subloop)
__TA_EXCLUDES(&dispatcher_mtx_);
static void Finalize(async_test_subloop_t* subloop) __TA_EXCLUDES(&dispatcher_mtx_);
private:
class Activated;
class TaskActivated;
class WaitActivated;
class AsyncTaskComparator {
public:
bool operator()(async_task_t* t1, async_task_t* t2) const {
return t1->deadline < t2->deadline;
}
};
// async_test_loop_provider_t operations implementations.
void AdvanceTimeTo(zx::time time) __TA_EXCLUDES(&dispatcher_mtx_);
bool DispatchNextDueMessage() __TA_EXCLUDES(&dispatcher_mtx_);
bool HasPendingWork() __TA_EXCLUDES(&dispatcher_mtx_);
zx::time GetNextTaskDueTime() __TA_EXCLUDES(&dispatcher_mtx_);
zx::time NowLocked() const __TA_REQUIRES(&dispatcher_mtx_) { return now_; }
// Extracts activated tasks and waits to |activated_|.
void ExtractActivatedLocked() __TA_REQUIRES(&dispatcher_mtx_);
// Removes the given task or wait from |activables_| and |activated_|.
zx_status_t CancelActivatedTaskOrWaitLocked(void* task_or_wait) __TA_REQUIRES(&dispatcher_mtx_);
// Dispatches all remaining posted waits and tasks, invoking their handlers
// with status ZX_ERR_CANCELED.
void Shutdown() __TA_EXCLUDES(&dispatcher_mtx_);
// Whether the loop is shutting down.
bool in_shutdown_ __TA_GUARDED(&dispatcher_mtx_) = false;
std::mutex dispatcher_mtx_;
// The current time.
zx::time now_ __TA_GUARDED(&dispatcher_mtx_) = zx::time::infinite_past();
// Pending tasks activable in the future.
// The ordering of the set is based on the task timeline. Multiple tasks
// with the same deadline will be equivalent, and be ordered by order of
// insertion.
std::multiset<async_task_t*, AsyncTaskComparator> future_tasks_ __TA_GUARDED(&dispatcher_mtx_);
// Pending waits.
std::set<async_wait_t*> pending_waits_ __TA_GUARDED(&dispatcher_mtx_);
// Activated elements, ready to be dispatched.
std::list<std::unique_ptr<Activated>> activated_ __TA_GUARDED(&dispatcher_mtx_);
// Port used to register waits.
zx::port port_;
};
const async_test_subloop_ops_t subloop_ops = {
TestLoopDispatcher::AdvanceTimeTo, TestLoopDispatcher::DispatchNextDueMessage,
TestLoopDispatcher::HasPendingWork, TestLoopDispatcher::GetNextTaskDueTime,
TestLoopDispatcher::Finalize,
};
// An element in the loop that can be activated. It is either a task or a wait.
class TestLoopDispatcher::Activated {
public:
virtual ~Activated() {}
// Dispatch the element, calling its handler.
virtual void Dispatch() const = 0;
// Cancel the element, calling its handler with a canceled status.
virtual void Cancel() const = 0;
// Returns whether this |Activated| corresponds to the given task or wait.
virtual bool Matches(void* task_or_wait) const = 0;
// Returns the due time for this |Activable|. If the |Activable| is a task,
// this corresponds to its deadline, otherwise this is an infinite time in
// the future.
virtual zx::time DueTime() const = 0;
};
class TestLoopDispatcher::TaskActivated : public Activated {
public:
TaskActivated(async_dispatcher_t* dispatcher, async_task_t* task)
: dispatcher_(dispatcher), task_(task) {}
void Dispatch() const override { task_->handler(dispatcher_, task_, ZX_OK); }
void Cancel() const override { task_->handler(dispatcher_, task_, ZX_ERR_CANCELED); }
bool Matches(void* task_or_wait) const override { return task_or_wait == task_; }
zx::time DueTime() const override { return zx::time(task_->deadline); }
private:
async_dispatcher_t* const dispatcher_;
async_task_t* const task_;
};
class TestLoopDispatcher::WaitActivated : public Activated {
public:
WaitActivated(async_dispatcher_t* dispatcher, async_wait_t* wait, zx_port_packet_t packet)
: dispatcher_(dispatcher), wait_(wait), packet_(std::move(packet)) {}
void Dispatch() const override {
wait_->handler(dispatcher_, wait_, packet_.status, &packet_.signal);
}
void Cancel() const override { wait_->handler(dispatcher_, wait_, ZX_ERR_CANCELED, nullptr); }
bool Matches(void* task_or_wait) const override { return task_or_wait == wait_; }
zx::time DueTime() const override { return zx::time::infinite(); }
private:
async_dispatcher_t* const dispatcher_;
async_wait_t* const wait_;
zx_port_packet_t const packet_;
};
TestLoopDispatcher::TestLoopDispatcher() : async_test_subloop_t{&subloop_ops}, in_shutdown_(false) {
zx_status_t status = zx::port::create(0u, &port_);
ZX_ASSERT_MSG(status == ZX_OK, "zx_port_create: %s", zx_status_get_string(status));
}
TestLoopDispatcher::~TestLoopDispatcher() { Shutdown(); }
zx_status_t TestLoopDispatcher::BeginWait(async_wait_t* wait) {
ZX_DEBUG_ASSERT(wait);
std::lock_guard<std::mutex> lock(dispatcher_mtx_);
if (in_shutdown_) {
return ZX_ERR_CANCELED;
}
zx_status_t status = zx_object_wait_async(
wait->object, port_.get(), reinterpret_cast<uintptr_t>(wait), wait->trigger, wait->options);
if (status != ZX_OK) {
return status;
}
pending_waits_.insert(wait);
return ZX_OK;
}
zx_status_t TestLoopDispatcher::CancelWait(async_wait_t* wait) {
ZX_DEBUG_ASSERT(wait);
std::lock_guard<std::mutex> lock(dispatcher_mtx_);
auto it = pending_waits_.find(wait);
if (it != pending_waits_.end()) {
pending_waits_.erase(it);
return zx_port_cancel(port_.get(), wait->object, reinterpret_cast<uintptr_t>(wait));
}
return CancelActivatedTaskOrWaitLocked(wait);
}
zx_status_t TestLoopDispatcher::PostTask(async_task_t* task) {
ZX_DEBUG_ASSERT(task);
std::lock_guard<std::mutex> lock(dispatcher_mtx_);
if (in_shutdown_) {
return ZX_ERR_CANCELED;
}
if (task->deadline <= NowLocked().get()) {
ExtractActivatedLocked();
activated_.push_back(std::make_unique<TaskActivated>(this, task));
return ZX_OK;
}
future_tasks_.insert(task);
return ZX_OK;
}
zx_status_t TestLoopDispatcher::CancelTask(async_task_t* task) {
ZX_DEBUG_ASSERT(task);
std::lock_guard<std::mutex> lock(dispatcher_mtx_);
auto task_it = std::find(future_tasks_.begin(), future_tasks_.end(), task);
if (task_it != future_tasks_.end()) {
future_tasks_.erase(task_it);
return ZX_OK;
}
return CancelActivatedTaskOrWaitLocked(task);
}
void TestLoopDispatcher::AdvanceTimeTo(zx::time time) {
std::lock_guard<std::mutex> lock(dispatcher_mtx_);
ZX_DEBUG_ASSERT(now_ <= time);
now_ = time;
}
zx::time TestLoopDispatcher::GetNextTaskDueTime() {
std::lock_guard<std::mutex> lock(dispatcher_mtx_);
for (const auto& activated : activated_) {
if (activated->DueTime() < zx::time::infinite()) {
return activated->DueTime();
}
}
if (!future_tasks_.empty()) {
return zx::time((*future_tasks_.begin())->deadline);
}
return zx::time::infinite();
}
bool TestLoopDispatcher::HasPendingWork() {
std::lock_guard<std::mutex> lock(dispatcher_mtx_);
ExtractActivatedLocked();
return !activated_.empty();
}
bool TestLoopDispatcher::DispatchNextDueMessage() {
std::unique_ptr<Activated> activated_element = nullptr;
{
std::lock_guard<std::mutex> lock(dispatcher_mtx_);
ExtractActivatedLocked();
if (activated_.empty()) {
return false;
}
activated_element = std::move(activated_.front());
activated_.erase(activated_.begin());
}
// Release the lock to avoid deadlocking on reentrant tasks.
async_dispatcher_t* previous_dispatcher = async_get_default_dispatcher();
async_set_default_dispatcher(this);
activated_element->Dispatch();
async_set_default_dispatcher(previous_dispatcher);
return true;
}
void TestLoopDispatcher::ExtractActivatedLocked() {
zx_port_packet_t packet;
while (port_.wait(zx::time(0), &packet) == ZX_OK) {
async_wait_t* wait = reinterpret_cast<async_wait_t*>(packet.key);
pending_waits_.erase(wait);
activated_.push_back(std::make_unique<WaitActivated>(this, wait, std::move(packet)));
}
// Move all tasks that reach their deadline to the activated list.
while (!future_tasks_.empty() && (*future_tasks_.begin())->deadline <= NowLocked().get()) {
activated_.push_back(std::make_unique<TaskActivated>(this, (*future_tasks_.begin())));
future_tasks_.erase(future_tasks_.begin());
}
}
// Unique lock does not support TA annotations.
// Lock needs to be released for reentrant handlers.
void TestLoopDispatcher::Shutdown() __TA_NO_THREAD_SAFETY_ANALYSIS {
std::unique_lock<std::mutex> lock(dispatcher_mtx_);
if (in_shutdown_) {
return;
}
in_shutdown_ = true;
while (!future_tasks_.empty()) {
auto task = *future_tasks_.begin();
future_tasks_.erase(future_tasks_.begin());
lock.unlock();
task->handler(this, task, ZX_ERR_CANCELED);
lock.lock();
}
while (!pending_waits_.empty()) {
auto wait = *pending_waits_.begin();
pending_waits_.erase(pending_waits_.begin());
lock.unlock();
wait->handler(this, wait, ZX_ERR_CANCELED, nullptr);
lock.lock();
}
while (!activated_.empty()) {
auto activated = std::move(activated_.front());
activated_.erase(activated_.begin());
lock.unlock();
activated->Cancel();
lock.lock();
}
}
zx_status_t TestLoopDispatcher::CancelActivatedTaskOrWaitLocked(void* task_or_wait) {
auto activated_it =
std::find_if(activated_.begin(), activated_.end(),
[&](const auto& activated) { return activated->Matches(task_or_wait); });
if (activated_it != activated_.end()) {
activated_.erase(activated_it);
return ZX_OK;
}
return ZX_ERR_NOT_FOUND;
}
void TestLoopDispatcher::AdvanceTimeTo(async_test_subloop_t* subloop, zx_time_t time) {
TestLoopDispatcher* self = static_cast<TestLoopDispatcher*>(subloop);
return self->AdvanceTimeTo(zx::time(time));
}
uint8_t TestLoopDispatcher::DispatchNextDueMessage(async_test_subloop_t* subloop) {
TestLoopDispatcher* self = static_cast<TestLoopDispatcher*>(subloop);
return self->DispatchNextDueMessage();
}
uint8_t TestLoopDispatcher::HasPendingWork(async_test_subloop_t* subloop) {
TestLoopDispatcher* self = static_cast<TestLoopDispatcher*>(subloop);
return self->HasPendingWork();
}
zx_time_t TestLoopDispatcher::GetNextTaskDueTime(async_test_subloop_t* subloop) {
TestLoopDispatcher* self = static_cast<TestLoopDispatcher*>(subloop);
return self->GetNextTaskDueTime().get();
}
void TestLoopDispatcher::Finalize(async_test_subloop_t* subloop) {
auto self = std::unique_ptr<TestLoopDispatcher>(static_cast<TestLoopDispatcher*>(subloop));
}
} // namespace
void NewTestLoopDispatcher(async_dispatcher_t** dispatcher, async_test_subloop_t** loop) {
auto dispatcher_loop = std::make_unique<TestLoopDispatcher>();
*dispatcher = dispatcher_loop.get();
*loop = dispatcher_loop.release();
}
} // namespace async