// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <threads.h>

#include <zircon/process.h>
#include <zircon/syscalls.h>
#include <zircon/syscalls/exception.h>

#include <lib/async-loop/cpp/loop.h>
#include <lib/async/cpp/time.h>
#include <lib/async/exception.h>
#include <lib/async/receiver.h>
#include <lib/async/task.h>
#include <lib/async/time.h>
#include <lib/async/wait.h>

#include <fbl/atomic.h>
#include <fbl/auto_lock.h>
#include <fbl/function.h>
#include <fbl/mutex.h>
#include <lib/zx/event.h>
#include <unittest/unittest.h>
#include <zircon/status.h>
#include <zircon/threads.h>

namespace {

class TestWait : public async_wait_t {
public:
    TestWait(zx_handle_t object, zx_signals_t trigger)
        : async_wait_t{{ASYNC_STATE_INIT}, &TestWait::CallHandler, object, trigger} {
    }

    virtual ~TestWait() = default;

    uint32_t run_count = 0u;
    zx_status_t last_status = ZX_ERR_INTERNAL;
    const zx_packet_signal_t* last_signal = nullptr;

    zx_status_t Begin(async_dispatcher_t* dispatcher) {
        return async_begin_wait(dispatcher, this);
    }

    zx_status_t Cancel(async_dispatcher_t* dispatcher) {
        return async_cancel_wait(dispatcher, this);
    }

protected:
    virtual void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
                        const zx_packet_signal_t* signal) {
        run_count++;
        last_status = status;
        if (signal) {
            last_signal_storage_ = *signal;
            last_signal = &last_signal_storage_;
        } else {
            last_signal = nullptr;
        }
    }

private:
    static void CallHandler(async_dispatcher_t* dispatcher, async_wait_t* wait,
                            zx_status_t status, const zx_packet_signal_t* signal) {
        static_cast<TestWait*>(wait)->Handle(dispatcher, status, signal);
    }

    zx_packet_signal_t last_signal_storage_;
};

class CascadeWait : public TestWait {
public:
    CascadeWait(zx_handle_t object, zx_signals_t trigger,
                zx_signals_t signals_to_clear, zx_signals_t signals_to_set,
                bool repeat)
        : TestWait(object, trigger),
          signals_to_clear_(signals_to_clear),
          signals_to_set_(signals_to_set),
          repeat_(repeat) {}

protected:
    zx_signals_t signals_to_clear_;
    zx_signals_t signals_to_set_;
    bool repeat_;

    void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
                const zx_packet_signal_t* signal) override {
        TestWait::Handle(dispatcher, status, signal);
        zx_object_signal(object, signals_to_clear_, signals_to_set_);
        if (repeat_ && status == ZX_OK) {
            Begin(dispatcher);
        }
    }
};

class SelfCancelingWait : public TestWait {
public:
    SelfCancelingWait(zx_handle_t object, zx_signals_t trigger)
        : TestWait(object, trigger) {}

    zx_status_t cancel_result = ZX_ERR_INTERNAL;

protected:
    void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
                const zx_packet_signal_t* signal) override {
        TestWait::Handle(dispatcher, status, signal);
        cancel_result = Cancel(dispatcher);
    }
};

class TestTask : public async_task_t {
public:
    TestTask()
        : async_task_t{{ASYNC_STATE_INIT}, &TestTask::CallHandler, ZX_TIME_INFINITE} {}

    virtual ~TestTask() = default;

    zx_status_t Post(async_dispatcher_t* dispatcher) {
        this->deadline = async_now(dispatcher);
        return async_post_task(dispatcher, this);
    }

    zx_status_t PostForTime(async_dispatcher_t* dispatcher, zx::time deadline) {
        this->deadline = deadline.get();
        return async_post_task(dispatcher, this);
    }

    zx_status_t Cancel(async_dispatcher_t* dispatcher) {
        return async_cancel_task(dispatcher, this);
    }

    uint32_t run_count = 0u;
    zx_status_t last_status = ZX_ERR_INTERNAL;

protected:
    virtual void Handle(async_dispatcher_t* dispatcher, zx_status_t status) {
        run_count++;
        last_status = status;
    }

private:
    static void CallHandler(async_dispatcher_t* dispatcher, async_task_t* task, zx_status_t status) {
        static_cast<TestTask*>(task)->Handle(dispatcher, status);
    }
};

class QuitTask : public TestTask {
public:
    QuitTask() = default;

protected:
    void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override {
        TestTask::Handle(dispatcher, status);
        async_loop_quit(async_loop_from_dispatcher(dispatcher));
    }
};

class ResetQuitTask : public TestTask {
public:
    ResetQuitTask() = default;

    zx_status_t result = ZX_ERR_INTERNAL;

protected:
    void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override {
        TestTask::Handle(dispatcher, status);
        result = async_loop_reset_quit(async_loop_from_dispatcher(dispatcher));
    }
};

class RepeatingTask : public TestTask {
public:
    RepeatingTask(zx::duration interval, uint32_t repeat_count)
        : interval_(interval), repeat_count_(repeat_count) {}

    void set_finish_callback(fbl::Closure callback) {
        finish_callback_ = fbl::move(callback);
    }

protected:
    zx::duration interval_;
    uint32_t repeat_count_;
    fbl::Closure finish_callback_;

    void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override {
        TestTask::Handle(dispatcher, status);
        if (repeat_count_ == 0) {
            if (finish_callback_)
                finish_callback_();
        } else {
            repeat_count_ -= 1;
            if (status == ZX_OK) {
                deadline = zx_time_add_duration(deadline, interval_.get());
                Post(dispatcher);
            }
        }
    }
};

class SelfCancelingTask : public TestTask {
public:
    SelfCancelingTask() = default;

    zx_status_t cancel_result = ZX_ERR_INTERNAL;

protected:
    void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override {
        TestTask::Handle(dispatcher, status);
        cancel_result = Cancel(dispatcher);
    }
};

class TestReceiver : async_receiver_t {
public:
    TestReceiver()
        : async_receiver_t{{ASYNC_STATE_INIT}, &TestReceiver::CallHandler} {
    }

    virtual ~TestReceiver() = default;

    zx_status_t QueuePacket(async_dispatcher_t* dispatcher, const zx_packet_user_t* data) {
        return async_queue_packet(dispatcher, this, data);
    }

    uint32_t run_count = 0u;
    zx_status_t last_status = ZX_ERR_INTERNAL;
    const zx_packet_user_t* last_data;

protected:
    virtual void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
                        const zx_packet_user_t* data) {
        run_count++;
        last_status = status;
        if (data) {
            last_data_storage_ = *data;
            last_data = &last_data_storage_;
        } else {
            last_data = nullptr;
        }
    }

private:
    static void CallHandler(async_dispatcher_t* dispatcher, async_receiver_t* receiver,
                            zx_status_t status, const zx_packet_user_t* data) {
        static_cast<TestReceiver*>(receiver)->Handle(dispatcher, status, data);
    }

    zx_packet_user_t last_data_storage_{};
};

class TestException : public async_exception_t {
public:
    TestException(async_dispatcher_t* dispatcher, zx_handle_t task, uint32_t options)
        : async_exception_t{{ASYNC_STATE_INIT}, &TestException::CallHandler, task, options},
          dispatcher_(dispatcher) {
    }

    ~TestException() = default;

    uint32_t run_count = 0u;
    zx_status_t last_status = ZX_ERR_INTERNAL;
    const zx_port_packet_t* last_report = nullptr;

    zx_status_t Bind() {
        return async_bind_exception_port(dispatcher_, this);
    }

    zx_status_t Unbind() {
        return async_unbind_exception_port(dispatcher_, this);
    }

    zx_status_t ResumeFromException(zx_handle_t task, uint32_t options) {
        return async_resume_from_exception(dispatcher_, this, task, options);
    }

protected:
    virtual void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
                        const zx_port_packet_t* report) = 0;

    // To be called by |Handle()| to update recorded exception state.
    void UpdateState(zx_status_t status, const zx_port_packet_t* report) {
        run_count++;
        last_status = status;
        if (report) {
            last_report_storage_ = *report;
            last_report = &last_report_storage_;
        } else {
            last_report = nullptr;
        }

        // We don't resume the task here, leaving that to the test.
    }

private:
    static void CallHandler(async_dispatcher_t* dispatcher,
                            async_exception_t* receiver,
                            zx_status_t status,
                            const zx_port_packet_t* report) {
        static_cast<TestException*>(receiver)->Handle(dispatcher, status, report);
    }

    async_dispatcher_t* dispatcher_;
    zx_port_packet_t last_report_storage_;
};

class TestThreadException : public TestException {
public:
    TestThreadException(async_dispatcher_t* dispatcher, zx_handle_t task, uint32_t options)
        : TestException(dispatcher, task, options) {}

private:
    void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
                const zx_port_packet_t* report) override {
        UpdateState(status, report);
    }
};

// When we bind to a process's exception port we may get exceptions on
// threads we're not expecting, e.g., if a test bug causes a crash.
// If we don't forward on such requests the exception will just sit there.
// |test_thread| is the thread we're interested in, everything else is
// forwarded on.
class TestProcessException : public TestException {
public:
    TestProcessException(async_dispatcher_t* dispatcher, zx_handle_t task, uint32_t options,
                         zx_koid_t test_thread_tid)
        : TestException(dispatcher, task, options),
          test_thread_tid_(test_thread_tid) {
    }

private:
    void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
                const zx_port_packet_t* report) override {
        UpdateState(status, report);

        // The only exceptions we are interested in are from crashing
        // threads, see |start_thread_to_crash()|. If we get something
        // else pass it on. This is useful in order to get backtraces from
        // things like assert failures while running the test. Though note
        // that this only works if the exception async loop is running.
        if (report && report->exception.tid != test_thread_tid_) {
            ResumeTryNext(report->exception.tid);
        }
    }

    // Resume thread |tid| giving the next handler a try.
    void ResumeTryNext(zx_koid_t tid) {
        // Alas we need the thread's handle to resume it.
        zx_handle_t thread;
        auto status = zx_object_get_child(zx_process_self(), tid,
                                          ZX_RIGHT_SAME_RIGHTS, &thread);
        switch (status) {
        case ZX_OK:
            status = ResumeFromException(thread, ZX_RESUME_TRY_NEXT);
            if (status != ZX_OK) {
                CrashFromBadStatus("zx_task_resume_from_exception", status);
            }
            break;
        case ZX_ERR_NOT_FOUND:
            // This could happen if the thread no longer exists.
            break;
        default:
            CrashFromBadStatus("zx_object_get_child", status);
        }
    }

    // This is called when we want to assert-fail, but we can't until we
    // unbind the exception port bound to the process.
    __NO_RETURN void CrashFromBadStatus(const char* msg, zx_status_t status) {
        // Make sure we don't get in the way of an exception generated by
        // the following assert.
        Unbind();

        ZX_DEBUG_ASSERT_MSG(status == ZX_OK, "%s: status = %d/%s", msg, status,
                            zx_status_get_string(status));
        // Just in case.
        exit(1);
    }

    zx_koid_t test_thread_tid_;
};

// The C++ loop wrapper is one-to-one with the underlying C API so for the
// most part we will test through that interface but here we make sure that
// the C API actually exists but we don't comprehensively test what it does.
bool c_api_basic_test() {
    BEGIN_TEST;

    async_loop_t* loop;
    ASSERT_EQ(ZX_OK, async_loop_create(&kAsyncLoopConfigNoAttachToThread, &loop), "create");
    ASSERT_NONNULL(loop, "loop");

    EXPECT_EQ(ASYNC_LOOP_RUNNABLE, async_loop_get_state(loop), "runnable");

    async_loop_quit(loop);
    EXPECT_EQ(ASYNC_LOOP_QUIT, async_loop_get_state(loop), "quitting");
    async_loop_run(loop, ZX_TIME_INFINITE, false);
    EXPECT_EQ(ZX_OK, async_loop_reset_quit(loop));

    thrd_t thread{};
    EXPECT_EQ(ZX_OK, async_loop_start_thread(loop, "name", &thread), "thread start");
    EXPECT_NE(thrd_t{}, thread, "thread ws initialized");
    async_loop_quit(loop);
    async_loop_join_threads(loop);

    async_loop_shutdown(loop);
    EXPECT_EQ(ASYNC_LOOP_SHUTDOWN, async_loop_get_state(loop), "shutdown");

    async_loop_destroy(loop);

    END_TEST;
}

bool make_default_false_test() {
    BEGIN_TEST;

    {
        async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
        EXPECT_NULL(async_get_default_dispatcher(), "not default");
    }
    EXPECT_NULL(async_get_default_dispatcher(), "still not default");

    END_TEST;
}

bool make_default_true_test() {
    BEGIN_TEST;

    async_loop_config_t config{};
    config.make_default_for_current_thread = true;
    {
        async::Loop loop(&config);
        EXPECT_EQ(loop.dispatcher(), async_get_default_dispatcher(), "became default");
    }
    EXPECT_NULL(async_get_default_dispatcher(), "no longer default");

    END_TEST;
}

bool create_default_test() {
    BEGIN_TEST;

    {
        async::Loop loop(&kAsyncLoopConfigAttachToThread);
        EXPECT_EQ(loop.dispatcher(), async_get_default_dispatcher(), "became default");
    }
    EXPECT_NULL(async_get_default_dispatcher(), "no longer default");

    END_TEST;
}

bool quit_test() {
    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    EXPECT_EQ(ASYNC_LOOP_RUNNABLE, loop.GetState(), "initially not quitting");

    loop.Quit();
    EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "quitting when quit");
    EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "run returns immediately");
    EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "still quitting");

    ResetQuitTask reset_quit_task;
    EXPECT_EQ(ZX_OK, reset_quit_task.Post(loop.dispatcher()), "can post tasks even after quit");
    QuitTask quit_task;
    EXPECT_EQ(ZX_OK, quit_task.Post(loop.dispatcher()), "can post tasks even after quit");

    EXPECT_EQ(ZX_OK, loop.ResetQuit());
    EXPECT_EQ(ASYNC_LOOP_RUNNABLE, loop.GetState(), "not quitting after reset");

    EXPECT_EQ(ZX_OK, loop.Run(zx::time::infinite(), true /*once*/), "run tasks");

    EXPECT_EQ(1u, reset_quit_task.run_count, "reset quit task ran");
    EXPECT_EQ(ZX_ERR_BAD_STATE, reset_quit_task.result, "can't reset quit while loop is running");

    EXPECT_EQ(1u, quit_task.run_count, "quit task ran");
    EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "quitted");

    EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "runs returns immediately when quitted");

    loop.Shutdown();
    EXPECT_EQ(ASYNC_LOOP_SHUTDOWN, loop.GetState(), "shut down");
    EXPECT_EQ(ZX_ERR_BAD_STATE, loop.Run(), "run returns immediately when shut down");
    EXPECT_EQ(ZX_ERR_BAD_STATE, loop.ResetQuit());

    END_TEST;
}

bool time_test() {
    BEGIN_TEST;

    // Verify that the dispatcher's time-telling is strictly monotonic,
    // which is constent with ZX_CLOCK_MONOTONIC.
    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    zx::time t0 = zx::clock::get_monotonic();
    zx::time t1 = async::Now(loop.dispatcher());
    zx::time t2 = async::Now(loop.dispatcher());
    zx::time t3 = zx::clock::get_monotonic();

    EXPECT_LE(t0.get(), t1.get());
    EXPECT_LE(t1.get(), t2.get());
    EXPECT_LE(t2.get(), t3.get());

    END_TEST;
}

bool wait_test() {
    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    zx::event event;
    EXPECT_EQ(ZX_OK, zx::event::create(0u, &event), "create event");

    CascadeWait wait1(event.get(), ZX_USER_SIGNAL_1,
                      0u, ZX_USER_SIGNAL_2, false);
    CascadeWait wait2(event.get(), ZX_USER_SIGNAL_2,
                      ZX_USER_SIGNAL_1 | ZX_USER_SIGNAL_2, 0u, true);
    CascadeWait wait3(event.get(), ZX_USER_SIGNAL_3,
                      ZX_USER_SIGNAL_3, 0u, true);
    EXPECT_EQ(ZX_OK, wait1.Begin(loop.dispatcher()), "wait 1");
    EXPECT_EQ(ZX_OK, wait2.Begin(loop.dispatcher()), "wait 2");
    EXPECT_EQ(ZX_OK, wait3.Begin(loop.dispatcher()), "wait 3");

    // Initially nothing is signaled.
    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
    EXPECT_EQ(0u, wait1.run_count, "run count 1");
    EXPECT_EQ(0u, wait2.run_count, "run count 2");
    EXPECT_EQ(0u, wait3.run_count, "run count 3");

    // Set signal 1: notifies |wait1| which sets signal 2 and notifies |wait2|
    // which clears signal 1 and 2 again.
    EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_1), "signal 1");
    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
    EXPECT_EQ(1u, wait1.run_count, "run count 1");
    EXPECT_EQ(ZX_OK, wait1.last_status, "status 1");
    EXPECT_NONNULL(wait1.last_signal);
    EXPECT_EQ(ZX_USER_SIGNAL_1, wait1.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 1");
    EXPECT_EQ(ZX_USER_SIGNAL_1, wait1.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 1");
    EXPECT_EQ(1u, wait1.last_signal->count, "count 1");
    EXPECT_EQ(1u, wait2.run_count, "run count 2");
    EXPECT_EQ(ZX_OK, wait2.last_status, "status 2");
    EXPECT_NONNULL(wait2.last_signal);
    EXPECT_EQ(ZX_USER_SIGNAL_2, wait2.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 2");
    EXPECT_EQ(ZX_USER_SIGNAL_1 | ZX_USER_SIGNAL_2, wait2.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 2");
    EXPECT_EQ(1u, wait2.last_signal->count, "count 2");
    EXPECT_EQ(0u, wait3.run_count, "run count 3");

    // Set signal 1 again: does nothing because |wait1| was a one-shot.
    EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_1), "signal 1");
    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
    EXPECT_EQ(1u, wait1.run_count, "run count 1");
    EXPECT_EQ(1u, wait2.run_count, "run count 2");
    EXPECT_EQ(0u, wait3.run_count, "run count 3");

    // Set signal 2 again: notifies |wait2| which clears signal 1 and 2 again.
    EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_2), "signal 2");
    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
    EXPECT_EQ(1u, wait1.run_count, "run count 1");
    EXPECT_EQ(2u, wait2.run_count, "run count 2");
    EXPECT_EQ(ZX_OK, wait2.last_status, "status 2");
    EXPECT_NONNULL(wait2.last_signal);
    EXPECT_EQ(ZX_USER_SIGNAL_2, wait2.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 2");
    EXPECT_EQ(ZX_USER_SIGNAL_1 | ZX_USER_SIGNAL_2, wait2.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 2");
    EXPECT_EQ(1u, wait2.last_signal->count, "count 2");
    EXPECT_EQ(0u, wait3.run_count, "run count 3");

    // Set signal 3: notifies |wait3| which clears signal 3.
    // Do this a couple of times.
    for (uint32_t i = 0; i < 3; i++) {
        EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_3), "signal 3");
        EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
        EXPECT_EQ(1u, wait1.run_count, "run count 1");
        EXPECT_EQ(2u, wait2.run_count, "run count 2");
        EXPECT_EQ(i + 1u, wait3.run_count, "run count 3");
        EXPECT_EQ(ZX_OK, wait3.last_status, "status 3");
        EXPECT_NONNULL(wait3.last_signal);
        EXPECT_EQ(ZX_USER_SIGNAL_3, wait3.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 3");
        EXPECT_EQ(ZX_USER_SIGNAL_3, wait3.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 3");
        EXPECT_EQ(1u, wait3.last_signal->count, "count 3");
    }

    // Cancel wait 3 then set signal 3 again: nothing happens this time.
    EXPECT_EQ(ZX_OK, wait3.Cancel(loop.dispatcher()), "cancel");
    EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_3), "signal 3");
    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
    EXPECT_EQ(1u, wait1.run_count, "run count 1");
    EXPECT_EQ(2u, wait2.run_count, "run count 2");
    EXPECT_EQ(3u, wait3.run_count, "run count 3");

    // Redundant cancel returns an error.
    EXPECT_EQ(ZX_ERR_NOT_FOUND, wait3.Cancel(loop.dispatcher()), "cancel again");
    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
    EXPECT_EQ(1u, wait1.run_count, "run count 1");
    EXPECT_EQ(2u, wait2.run_count, "run count 2");
    EXPECT_EQ(3u, wait3.run_count, "run count 3");

    loop.Shutdown();

    END_TEST;
}

bool wait_unwaitable_handle_test() {
    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    zx::event event;
    EXPECT_EQ(ZX_OK, zx::event::create(0u, &event), "create event");
    event.replace(ZX_RIGHT_NONE, &event);

    TestWait wait(event.get(), ZX_USER_SIGNAL_0);
    EXPECT_EQ(ZX_ERR_ACCESS_DENIED, wait.Begin(loop.dispatcher()), "begin");
    EXPECT_EQ(ZX_ERR_NOT_FOUND, wait.Cancel(loop.dispatcher()), "cancel");
    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
    EXPECT_EQ(0u, wait.run_count, "run count");

    END_TEST;
}

bool wait_shutdown_test() {
    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    zx::event event;
    EXPECT_EQ(ZX_OK, zx::event::create(0u, &event), "create event");

    CascadeWait wait1(event.get(), ZX_USER_SIGNAL_0, 0u, 0u, false);
    CascadeWait wait2(event.get(), ZX_USER_SIGNAL_0, ZX_USER_SIGNAL_0, 0u, true);
    TestWait wait3(event.get(), ZX_USER_SIGNAL_1);
    SelfCancelingWait wait4(event.get(), ZX_USER_SIGNAL_0);
    SelfCancelingWait wait5(event.get(), ZX_USER_SIGNAL_1);

    EXPECT_EQ(ZX_OK, wait1.Begin(loop.dispatcher()), "begin 1");
    EXPECT_EQ(ZX_OK, wait2.Begin(loop.dispatcher()), "begin 2");
    EXPECT_EQ(ZX_OK, wait3.Begin(loop.dispatcher()), "begin 3");
    EXPECT_EQ(ZX_OK, wait4.Begin(loop.dispatcher()), "begin 4");
    EXPECT_EQ(ZX_OK, wait5.Begin(loop.dispatcher()), "begin 5");

    // Nothing signaled so nothing happens at first.
    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
    EXPECT_EQ(0u, wait1.run_count, "run count 1");
    EXPECT_EQ(0u, wait2.run_count, "run count 2");
    EXPECT_EQ(0u, wait3.run_count, "run count 3");
    EXPECT_EQ(0u, wait4.run_count, "run count 4");
    EXPECT_EQ(0u, wait5.run_count, "run count 5");

    // Set signal 1: notifies both waiters, |wait2| clears the signal and repeats
    EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_0), "signal 1");
    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
    EXPECT_EQ(1u, wait1.run_count, "run count 1");
    EXPECT_EQ(ZX_OK, wait1.last_status, "status 1");
    EXPECT_NONNULL(wait1.last_signal);
    EXPECT_EQ(ZX_USER_SIGNAL_0, wait1.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 1");
    EXPECT_EQ(ZX_USER_SIGNAL_0, wait1.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 1");
    EXPECT_EQ(1u, wait1.last_signal->count, "count 1");
    EXPECT_EQ(1u, wait2.run_count, "run count 2");
    EXPECT_EQ(ZX_OK, wait2.last_status, "status 2");
    EXPECT_NONNULL(wait2.last_signal);
    EXPECT_EQ(ZX_USER_SIGNAL_0, wait2.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 2");
    EXPECT_EQ(ZX_USER_SIGNAL_0, wait2.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 2");
    EXPECT_EQ(1u, wait2.last_signal->count, "count 2");
    EXPECT_EQ(0u, wait3.run_count, "run count 3");
    EXPECT_EQ(1u, wait4.run_count, "run count 4");
    EXPECT_EQ(ZX_USER_SIGNAL_0, wait4.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 4");
    EXPECT_EQ(ZX_USER_SIGNAL_0, wait4.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 4");
    EXPECT_EQ(ZX_ERR_NOT_FOUND, wait4.cancel_result, "cancel result 4");
    EXPECT_EQ(0u, wait5.run_count, "run count 5");

    // When the loop shuts down:
    //   |wait1| not notified because it was serviced and didn't repeat
    //   |wait2| notified because it repeated
    //   |wait3| notified because it was not yet serviced
    //   |wait4| not notified because it was serviced
    //   |wait5| notified because it was not yet serviced
    loop.Shutdown();
    EXPECT_EQ(1u, wait1.run_count, "run count 1");
    EXPECT_EQ(2u, wait2.run_count, "run count 2");
    EXPECT_EQ(ZX_ERR_CANCELED, wait2.last_status, "status 2");
    EXPECT_NULL(wait2.last_signal, "signal 2");
    EXPECT_EQ(1u, wait3.run_count, "run count 3");
    EXPECT_EQ(ZX_ERR_CANCELED, wait3.last_status, "status 3");
    EXPECT_NULL(wait3.last_signal, "signal 3");
    EXPECT_EQ(1u, wait4.run_count, "run count 4");
    EXPECT_EQ(1u, wait5.run_count, "run count 5");
    EXPECT_EQ(ZX_ERR_CANCELED, wait5.last_status, "status 5");
    EXPECT_NULL(wait5.last_signal, "signal 5");
    EXPECT_EQ(ZX_ERR_NOT_FOUND, wait5.cancel_result, "cancel result 5");

    // Try to add or cancel work after shutdown.
    TestWait wait6(event.get(), ZX_USER_SIGNAL_0);
    EXPECT_EQ(ZX_ERR_BAD_STATE, wait6.Begin(loop.dispatcher()), "begin after shutdown");
    EXPECT_EQ(ZX_ERR_NOT_FOUND, wait6.Cancel(loop.dispatcher()), "cancel after shutdown");
    EXPECT_EQ(0u, wait6.run_count, "run count 6");

    END_TEST;
}

bool task_test() {
    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);

    zx::time start_time = async::Now(loop.dispatcher());
    TestTask task1;
    RepeatingTask task2(zx::msec(1), 3u);
    TestTask task3;
    QuitTask task4;
    TestTask task5; // posted after quit

    EXPECT_EQ(ZX_OK, task1.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 1");
    EXPECT_EQ(ZX_OK, task2.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 2");
    EXPECT_EQ(ZX_OK, task3.PostForTime(loop.dispatcher(), start_time), "post 3");
    task2.set_finish_callback([&loop, &task4, &task5, start_time] {
        task4.PostForTime(loop.dispatcher(), start_time + zx::msec(10));
        task5.PostForTime(loop.dispatcher(), start_time + zx::msec(10));
    });

    // Cancel task 3.
    EXPECT_EQ(ZX_OK, task3.Cancel(loop.dispatcher()), "cancel 3");

    // Run until quit.
    EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "run loop");
    EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "quitting");
    EXPECT_EQ(1u, task1.run_count, "run count 1");
    EXPECT_EQ(ZX_OK, task1.last_status, "status 1");
    EXPECT_EQ(4u, task2.run_count, "run count 2");
    EXPECT_EQ(ZX_OK, task2.last_status, "status 2");
    EXPECT_EQ(0u, task3.run_count, "run count 3");
    EXPECT_EQ(1u, task4.run_count, "run count 4");
    EXPECT_EQ(ZX_OK, task4.last_status, "status 4");
    EXPECT_EQ(0u, task5.run_count, "run count 5");

    // Reset quit and keep running, now task5 should go ahead followed
    // by any subsequently posted tasks even if they have earlier deadlines.
    QuitTask task6;
    TestTask task7;
    EXPECT_EQ(ZX_OK, task6.PostForTime(loop.dispatcher(), start_time), "post 6");
    EXPECT_EQ(ZX_OK, task7.PostForTime(loop.dispatcher(), start_time), "post 7");
    EXPECT_EQ(ZX_OK, loop.ResetQuit());
    EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "run loop");
    EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "quitting");

    EXPECT_EQ(1u, task5.run_count, "run count 5");
    EXPECT_EQ(ZX_OK, task5.last_status, "status 5");
    EXPECT_EQ(1u, task6.run_count, "run count 6");
    EXPECT_EQ(ZX_OK, task6.last_status, "status 6");
    EXPECT_EQ(0u, task7.run_count, "run count 7");

    loop.Shutdown();

    END_TEST;
}

bool task_shutdown_test() {
    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);

    zx::time start_time = async::Now(loop.dispatcher());
    TestTask task1;
    RepeatingTask task2(zx::msec(1000), 1u);
    TestTask task3;
    TestTask task4;
    QuitTask task5;
    SelfCancelingTask task6;
    SelfCancelingTask task7;

    EXPECT_EQ(ZX_OK, task1.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 1");
    EXPECT_EQ(ZX_OK, task2.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 2");
    EXPECT_EQ(ZX_OK, task3.PostForTime(loop.dispatcher(), zx::time::infinite()), "post 3");
    EXPECT_EQ(ZX_OK, task4.PostForTime(loop.dispatcher(), zx::time::infinite()), "post 4");
    EXPECT_EQ(ZX_OK, task5.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 5");
    EXPECT_EQ(ZX_OK, task6.PostForTime(loop.dispatcher(), start_time), "post 6");
    EXPECT_EQ(ZX_OK, task7.PostForTime(loop.dispatcher(), zx::time::infinite()), "post 7");

    // Run tasks which are due up to the time when the quit task runs.
    EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "run loop");
    EXPECT_EQ(1u, task1.run_count, "run count 1");
    EXPECT_EQ(ZX_OK, task1.last_status, "status 1");
    EXPECT_EQ(1u, task2.run_count, "run count 2");
    EXPECT_EQ(ZX_OK, task2.last_status, "status 2");
    EXPECT_EQ(0u, task3.run_count, "run count 3");
    EXPECT_EQ(0u, task4.run_count, "run count 4");
    EXPECT_EQ(1u, task5.run_count, "run count 5");
    EXPECT_EQ(ZX_OK, task5.last_status, "status 5");
    EXPECT_EQ(1u, task6.run_count, "run count 6");
    EXPECT_EQ(ZX_OK, task6.last_status, "status 6");
    EXPECT_EQ(ZX_ERR_NOT_FOUND, task6.cancel_result, "cancel result 6");
    EXPECT_EQ(0u, task7.run_count, "run count 7");

    // Cancel task 4.
    EXPECT_EQ(ZX_OK, task4.Cancel(loop.dispatcher()), "cancel 4");

    // When the loop shuts down:
    //   |task1| not notified because it was serviced
    //   |task2| notified because it requested a repeat
    //   |task3| notified because it was not yet serviced
    //   |task4| not notified because it was canceled
    //   |task5| not notified because it was serviced
    //   |task6| not notified because it was serviced
    //   |task7| notified because it was not yet serviced
    loop.Shutdown();
    EXPECT_EQ(1u, task1.run_count, "run count 1");
    EXPECT_EQ(2u, task2.run_count, "run count 2");
    EXPECT_EQ(ZX_ERR_CANCELED, task2.last_status, "status 2");
    EXPECT_EQ(1u, task3.run_count, "run count 3");
    EXPECT_EQ(ZX_ERR_CANCELED, task3.last_status, "status 3");
    EXPECT_EQ(0u, task4.run_count, "run count 4");
    EXPECT_EQ(1u, task5.run_count, "run count 5");
    EXPECT_EQ(1u, task6.run_count, "run count 6");
    EXPECT_EQ(1u, task7.run_count, "run count 7");
    EXPECT_EQ(ZX_ERR_CANCELED, task7.last_status, "status 7");
    EXPECT_EQ(ZX_ERR_NOT_FOUND, task7.cancel_result, "cancel result 7");

    // Try to add or cancel work after shutdown.
    TestTask task8;
    EXPECT_EQ(ZX_ERR_BAD_STATE, task8.PostForTime(loop.dispatcher(), zx::time::infinite()), "post after shutdown");
    EXPECT_EQ(ZX_ERR_NOT_FOUND, task8.Cancel(loop.dispatcher()), "cancel after shutdown");
    EXPECT_EQ(0u, task8.run_count, "run count 8");

    END_TEST;
}

bool receiver_test() {
    const zx_packet_user_t data1{.u64 = {11, 12, 13, 14}};
    const zx_packet_user_t data2{.u64 = {21, 22, 23, 24}};
    const zx_packet_user_t data3{.u64 = {31, 32, 33, 34}};
    const zx_packet_user_t data_default{};

    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);

    TestReceiver receiver1;
    TestReceiver receiver2;
    TestReceiver receiver3;

    EXPECT_EQ(ZX_OK, receiver1.QueuePacket(loop.dispatcher(), &data1), "queue 1");
    EXPECT_EQ(ZX_OK, receiver1.QueuePacket(loop.dispatcher(), &data3), "queue 1, again");
    EXPECT_EQ(ZX_OK, receiver2.QueuePacket(loop.dispatcher(), &data2), "queue 2");
    EXPECT_EQ(ZX_OK, receiver3.QueuePacket(loop.dispatcher(), nullptr), "queue 3");

    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
    EXPECT_EQ(2u, receiver1.run_count, "run count 1");
    EXPECT_EQ(ZX_OK, receiver1.last_status, "status 1");
    EXPECT_NONNULL(receiver1.last_data);
    EXPECT_EQ(0, memcmp(&data3, receiver1.last_data, sizeof(zx_packet_user_t)), "data 1");
    EXPECT_EQ(1u, receiver2.run_count, "run count 2");
    EXPECT_EQ(ZX_OK, receiver2.last_status, "status 2");
    EXPECT_NONNULL(receiver2.last_data);
    EXPECT_EQ(0, memcmp(&data2, receiver2.last_data, sizeof(zx_packet_user_t)), "data 2");
    EXPECT_EQ(1u, receiver3.run_count, "run count 3");
    EXPECT_EQ(ZX_OK, receiver3.last_status, "status 3");
    EXPECT_NONNULL(receiver3.last_data);
    EXPECT_EQ(0, memcmp(&data_default, receiver3.last_data, sizeof(zx_packet_user_t)), "data 3");

    END_TEST;
}

bool receiver_shutdown_test() {
    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    loop.Shutdown();

    // Try to add work after shutdown.
    TestReceiver receiver;
    EXPECT_EQ(ZX_ERR_BAD_STATE, receiver.QueuePacket(loop.dispatcher(), nullptr), "queue after shutdown");
    EXPECT_EQ(0u, receiver.run_count, "run count 1");

    END_TEST;
}

uint32_t get_thread_state(zx_handle_t thread) {
    zx_info_thread_t info;
    __UNUSED zx_status_t status =
        zx_object_get_info(thread, ZX_INFO_THREAD,
                           &info, sizeof(info), NULL, NULL);
    ZX_DEBUG_ASSERT(status == ZX_OK);
    return info.state;
}

uint32_t get_thread_exception_port_type(zx_handle_t thread) {
    zx_info_thread_t info;
    __UNUSED zx_status_t status =
        zx_object_get_info(thread, ZX_INFO_THREAD,
                           &info, sizeof(info), NULL, NULL);
    ZX_DEBUG_ASSERT(status == ZX_OK);
    return info.wait_exception_port_type;
}

zx_koid_t get_koid(zx_handle_t handle) {
    zx_info_handle_basic_t info;
    __UNUSED zx_status_t status =
        zx_object_get_info(handle, ZX_INFO_HANDLE_BASIC,
                           &info, sizeof(info), NULL, NULL);
    ZX_DEBUG_ASSERT(status == ZX_OK);
    return info.koid;
}

zx_status_t create_thread(zx_handle_t* out_thread) {
    static const char kThreadName[] = "crasher";
    // Use zx_thread_create() so that the only cleanup we need to do is
    // zx_task_kill/zx_handle_close.
    return zx_thread_create(zx_process_self(), kThreadName,
                            strlen(kThreadName), 0, out_thread);
}

zx_status_t start_thread_to_crash(zx_handle_t thread) {
    // We want the thread to crash so we'll get an exception report.
    // Easiest to just pass crashing values for pc,sp.
    return zx_thread_start(thread, 0u, 0u, 0u, 0u);
}

bool exception_test() {
    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);

    // We need an exception that we can resume from without the exception
    // being passed on to higher level exceptions.
    // To keep things simple we bind to our process's exception port, it is
    // the next exception port to be tried after the thread's. An alternative
    // would be to bind to our debugger exception port and process thread
    // start synthetic exceptions, but then we couldn't run this test under
    // a debugger. Another alternative would be to cause architectural
    // exceptions that can be recovered from, but it requires
    // architecture-specific code which is nice to avoid if we can.

    zx_handle_t crashing_thread = ZX_HANDLE_INVALID;
    EXPECT_EQ(ZX_OK, create_thread(&crashing_thread));
    TestThreadException thread_exception(loop.dispatcher(), crashing_thread, 0);
    EXPECT_EQ(ZX_OK, thread_exception.Bind());

    zx_koid_t self_pid = get_koid(zx_process_self());
    zx_koid_t crashing_tid = get_koid(crashing_thread);

    zx_handle_t self = zx_process_self();
    TestProcessException process_exception(loop.dispatcher(), self, 0, crashing_tid);

    EXPECT_EQ(ZX_OK, process_exception.Bind());

    // Initially nothing is signaled.
    EXPECT_EQ(ZX_OK, loop.RunUntilIdle());
    EXPECT_EQ(0u, process_exception.run_count);

    EXPECT_EQ(ZX_OK, start_thread_to_crash(crashing_thread));

    // Wait until thread has crashed.
    uint32_t state;
    do {
        zx_nanosleep(zx_deadline_after(ZX_MSEC(1)));
        state = get_thread_state(crashing_thread);
    } while (state != ZX_THREAD_STATE_BLOCKED_EXCEPTION);

    // There should now be an exception to read on the thread exception port.
    EXPECT_EQ(ZX_EXCEPTION_PORT_TYPE_THREAD, get_thread_exception_port_type(crashing_thread));
    EXPECT_EQ(ZX_OK, loop.RunUntilIdle());
    EXPECT_EQ(1u, thread_exception.run_count);
    EXPECT_EQ(0u, process_exception.run_count);
    EXPECT_EQ(ZX_OK, thread_exception.last_status);
    ASSERT_NONNULL(thread_exception.last_report);
    EXPECT_EQ(ZX_EXCP_FATAL_PAGE_FAULT, thread_exception.last_report->type);
    EXPECT_EQ(self_pid, thread_exception.last_report->exception.pid);
    EXPECT_EQ(crashing_tid, thread_exception.last_report->exception.tid);

    // Resume this exception (which in this case means pass the exception
    // on to the next handler).
    EXPECT_EQ(ZX_OK, thread_exception.ResumeFromException(crashing_thread, ZX_RESUME_TRY_NEXT));

    // Wait until thread has moved on to try the next exception handler.
    do {
        zx_nanosleep(zx_deadline_after(ZX_MSEC(1)));
        state = get_thread_exception_port_type(crashing_thread);
    } while (state != ZX_EXCEPTION_PORT_TYPE_PROCESS);

    // There should now be an exception to read on the process exception port.
    EXPECT_EQ(ZX_OK, loop.RunUntilIdle());
    EXPECT_EQ(1u, thread_exception.run_count);
    EXPECT_EQ(1u, process_exception.run_count);
    EXPECT_EQ(ZX_OK, process_exception.last_status);
    ASSERT_NONNULL(process_exception.last_report);
    EXPECT_EQ(ZX_EXCP_FATAL_PAGE_FAULT, process_exception.last_report->type);
    EXPECT_EQ(self_pid, process_exception.last_report->exception.pid);
    EXPECT_EQ(crashing_tid, process_exception.last_report->exception.tid);

    // Kill the thread, we don't want the exception propagating further.
    zx_task_kill(crashing_thread);

    loop.Shutdown();
    EXPECT_EQ(ZX_ERR_CANCELED, thread_exception.last_status);
    EXPECT_EQ(ZX_ERR_CANCELED, process_exception.last_status);

    zx_handle_close(crashing_thread);

    END_TEST;
}

bool exception_shutdown_test() {
    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    loop.Shutdown();

    // Try to bind a port after shutdown.
    TestThreadException exception(loop.dispatcher(), zx_process_self(), 0);
    EXPECT_EQ(ZX_ERR_BAD_STATE, exception.Bind());

    END_TEST;
}

class GetDefaultDispatcherTask : public QuitTask {
public:
    async_dispatcher_t* last_default_dispatcher;

protected:
    void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override {
        QuitTask::Handle(dispatcher, status);
        last_default_dispatcher = async_get_default_dispatcher();
    }
};

class ConcurrencyMeasure {
public:
    ConcurrencyMeasure(uint32_t end)
        : end_(end) {}

    uint32_t max_threads() const { return fbl::atomic_load(&max_threads_, fbl::memory_order_acquire); }
    uint32_t count() const { return fbl::atomic_load(&count_, fbl::memory_order_acquire); }

    void Tally(async_dispatcher_t* dispatcher) {
        // Increment count of concurrently active threads.  Update maximum if needed.
        uint32_t active = 1u + fbl::atomic_fetch_add(&active_threads_, 1u,
                                                     fbl::memory_order_acq_rel);
        uint32_t old_max;
        do {
            old_max = fbl::atomic_load(&max_threads_, fbl::memory_order_acquire);
        } while (active > old_max &&
                 !fbl::atomic_compare_exchange_weak(&max_threads_, &old_max, active,
                                                    fbl::memory_order_acq_rel,
                                                    fbl::memory_order_acquire));

        // Pretend to do work.
        zx::nanosleep(zx::deadline_after(zx::msec(1)));

        // Decrement count of active threads.
        fbl::atomic_fetch_sub(&active_threads_, 1u, fbl::memory_order_acq_rel);

        // Quit when last item processed.
        if (1u + fbl::atomic_fetch_add(&count_, 1u, fbl::memory_order_acq_rel) == end_)
            async_loop_quit(async_loop_from_dispatcher(dispatcher));
    }

private:
    const uint32_t end_;
    fbl::atomic_uint32_t count_{};
    fbl::atomic_uint32_t active_threads_{};
    fbl::atomic_uint32_t max_threads_{};
};

class ThreadAssertWait : public TestWait {
public:
    ThreadAssertWait(zx_handle_t object, zx_signals_t trigger, ConcurrencyMeasure* measure)
        : TestWait(object, trigger), measure_(measure) {}

protected:
    ConcurrencyMeasure* measure_;

    void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
                const zx_packet_signal_t* signal) override {
        TestWait::Handle(dispatcher, status, signal);
        measure_->Tally(dispatcher);
    }
};

class ThreadAssertTask : public TestTask {
public:
    ThreadAssertTask(ConcurrencyMeasure* measure)
        : measure_(measure) {}

protected:
    ConcurrencyMeasure* measure_;

    void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override {
        TestTask::Handle(dispatcher, status);
        measure_->Tally(dispatcher);
    }
};

class ThreadAssertReceiver : public TestReceiver {
public:
    ThreadAssertReceiver(ConcurrencyMeasure* measure)
        : measure_(measure) {}

protected:
    ConcurrencyMeasure* measure_;

    // This receiver's handler will run concurrently on multiple threads
    // (unlike the Waits and Tasks) so we must guard its state.
    fbl::Mutex mutex_;

    void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
                const zx_packet_user_t* data) override {
        {
            fbl::AutoLock lock(&mutex_);
            TestReceiver::Handle(dispatcher, status, data);
        }
        measure_->Tally(dispatcher);
    }
};

class ThreadAssertException : public TestException {
public:
    ThreadAssertException(async_dispatcher_t* dispatcher, zx_handle_t task, uint32_t options,
                          ConcurrencyMeasure* measure)
        : TestException(dispatcher, task, options), measure_(measure) {}

protected:
    ConcurrencyMeasure* measure_;

    // This receiver's handler will run concurrently on multiple threads
    // (unlike the Waits and Tasks) so we must guard its state.
    fbl::Mutex mutex_;

    void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
                const zx_port_packet_t* report) override {
        {
            fbl::AutoLock lock(&mutex_);
            UpdateState(status, report);
        }
        measure_->Tally(dispatcher);
    }
};

bool threads_have_default_dispatcher() {
    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    EXPECT_EQ(ZX_OK, loop.StartThread(), "start thread");

    GetDefaultDispatcherTask task;
    EXPECT_EQ(ZX_OK, task.Post(loop.dispatcher()), "post task");
    loop.JoinThreads();

    EXPECT_EQ(1u, task.run_count, "run count");
    EXPECT_EQ(ZX_OK, task.last_status, "status");
    EXPECT_EQ(loop.dispatcher(), task.last_default_dispatcher, "default dispatcher");

    END_TEST;
}

// The goal here is to ensure that threads stop when Quit() is called.
bool threads_quit() {
    const size_t num_threads = 4;

    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    for (size_t i = 0; i < num_threads; i++) {
        EXPECT_EQ(ZX_OK, loop.StartThread());
    }
    loop.Quit();
    loop.JoinThreads();
    EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState());

    END_TEST;
}

// The goal here is to ensure that threads stop when Shutdown() is called.
bool threads_shutdown() {
    const size_t num_threads = 4;

    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    for (size_t i = 0; i < num_threads; i++) {
        EXPECT_EQ(ZX_OK, loop.StartThread());
    }
    loop.Shutdown();
    EXPECT_EQ(ASYNC_LOOP_SHUTDOWN, loop.GetState());

    loop.JoinThreads(); // should be a no-op

    EXPECT_EQ(ZX_ERR_BAD_STATE, loop.StartThread(), "can't start threads after shutdown");

    END_TEST;
}

// The goal here is to schedule a lot of work and see whether it runs
// on as many threads as we expected it to.
bool threads_waits_run_concurrently_test() {
    const size_t num_threads = 4;
    const size_t num_items = 100;

    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    for (size_t i = 0; i < num_threads; i++) {
        EXPECT_EQ(ZX_OK, loop.StartThread(), "start thread");
    }

    ConcurrencyMeasure measure(num_items);
    zx::event event;
    EXPECT_EQ(ZX_OK, zx::event::create(0u, &event), "create event");
    EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_0), "signal");

    // Post a number of work items to run all at once.
    ThreadAssertWait* items[num_items];
    for (size_t i = 0; i < num_items; i++) {
        items[i] = new ThreadAssertWait(event.get(), ZX_USER_SIGNAL_0, &measure);
        EXPECT_EQ(ZX_OK, items[i]->Begin(loop.dispatcher()), "begin wait");
    }

    // Wait until quitted.
    loop.JoinThreads();

    // Ensure all work items completed.
    EXPECT_EQ(num_items, measure.count(), "item count");
    for (size_t i = 0; i < num_items; i++) {
        EXPECT_EQ(1u, items[i]->run_count, "run count");
        EXPECT_EQ(ZX_OK, items[i]->last_status, "status");
        EXPECT_NONNULL(items[i]->last_signal, "signal");
        EXPECT_EQ(ZX_USER_SIGNAL_0, items[i]->last_signal->observed & ZX_USER_SIGNAL_ALL, "observed");
        delete items[i];
    }

    // Ensure that we actually ran many waits concurrently on different threads.
    EXPECT_NE(1u, measure.max_threads(), "waits handled concurrently");

    END_TEST;
}

// The goal here is to schedule a lot of work and see whether it runs
// on as many threads as we expected it to.
bool threads_tasks_run_sequentially_test() {
    const size_t num_threads = 4;
    const size_t num_items = 100;

    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    for (size_t i = 0; i < num_threads; i++) {
        EXPECT_EQ(ZX_OK, loop.StartThread(), "start thread");
    }

    ConcurrencyMeasure measure(num_items);

    // Post a number of work items to run all at once.
    ThreadAssertTask* items[num_items];
    zx::time start_time = async::Now(loop.dispatcher());
    for (size_t i = 0; i < num_items; i++) {
        items[i] = new ThreadAssertTask(&measure);
        EXPECT_EQ(ZX_OK, items[i]->PostForTime(loop.dispatcher(), start_time + zx::msec(i)), "post task");
    }

    // Wait until quitted.
    loop.JoinThreads();

    // Ensure all work items completed.
    EXPECT_EQ(num_items, measure.count(), "item count");
    for (size_t i = 0; i < num_items; i++) {
        EXPECT_EQ(1u, items[i]->run_count, "run count");
        EXPECT_EQ(ZX_OK, items[i]->last_status, "status");
        delete items[i];
    }

    // Ensure that we actually ran tasks sequentially despite having many
    // threads available.
    EXPECT_EQ(1u, measure.max_threads(), "tasks handled sequentially");

    END_TEST;
}

// The goal here is to schedule a lot of work and see whether it runs
// on as many threads as we expected it to.
bool threads_receivers_run_concurrently_test() {
    const size_t num_threads = 4;
    const size_t num_items = 100;

    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    for (size_t i = 0; i < num_threads; i++) {
        EXPECT_EQ(ZX_OK, loop.StartThread(), "start thread");
    }

    ConcurrencyMeasure measure(num_items);

    // Post a number of packets all at once.
    ThreadAssertReceiver receiver(&measure);
    for (size_t i = 0; i < num_items; i++) {
        EXPECT_EQ(ZX_OK, receiver.QueuePacket(loop.dispatcher(), nullptr), "queue packet");
    }

    // Wait until quitted.
    loop.JoinThreads();

    // Ensure all work items completed.
    EXPECT_EQ(num_items, measure.count(), "item count");
    EXPECT_EQ(num_items, receiver.run_count, "run count");
    EXPECT_EQ(ZX_OK, receiver.last_status, "status");

    // Ensure that we actually processed many packets concurrently on different threads.
    EXPECT_NE(1u, measure.max_threads(), "packets handled concurrently");

    END_TEST;
}

// The goal here is to schedule a lot of work and see whether it runs
// on as many threads as we expected it to.
bool threads_exceptions_run_concurrently_test() {
    const size_t num_threads = 4;
    // We generate this number of exceptions, and therefore this number of
    // crashing threads, so this number isn't that large (e.g., not 100).
    const size_t num_items = 10;

    BEGIN_TEST;

    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
    ConcurrencyMeasure measure(num_items);

    ThreadAssertException receiver(loop.dispatcher(), zx_process_self(), 0,
                                   &measure);

    zx_handle_t crashing_threads[num_items];
    EXPECT_EQ(ZX_OK, receiver.Bind());

    for (size_t i = 0; i < num_threads; i++) {
        EXPECT_EQ(ZX_OK, loop.StartThread());
    }

    // Post a number of packets all at once.
    for (size_t i = 0; i < num_items; i++) {
        EXPECT_EQ(ZX_OK, create_thread(&crashing_threads[i]));
        EXPECT_EQ(ZX_OK, start_thread_to_crash(crashing_threads[i]));
    }
    // We don't need to wait for the threads to crash here as the loop
    // will continue until |measure| receives |num_items|.

    // Wait until quitted.
    loop.JoinThreads();

    // Make sure the threads are gone before we unbind the exception port,
    // otherwise the global crash-handler will see the exceptions.
    for (size_t i = 0; i < num_items; i++) {
        zx_task_kill(crashing_threads[i]);
        zx_handle_close(crashing_threads[i]);
    }

    // Ensure all work items completed.
    // When |loop| goes out of scope |receiver| will get ZX_ERR_CANCELED,
    // which will add one to the packet received count. Do these tests
    // here before |loop| goes out of scope.
    EXPECT_EQ(num_items, measure.count());
    EXPECT_EQ(num_items, receiver.run_count);
    EXPECT_EQ(ZX_OK, receiver.last_status);

    // Now we can shutdown.
    loop.Shutdown();

    // Loop shutdown -> ZX_ERR_CANCELED.
    EXPECT_EQ(ZX_ERR_CANCELED, receiver.last_status);

    // Ensure that we actually processed many packets concurrently on different threads.
    EXPECT_NE(1u, measure.max_threads());

    END_TEST;
}

} // namespace

BEGIN_TEST_CASE(loop_tests)
RUN_TEST(c_api_basic_test)
RUN_TEST(make_default_false_test)
RUN_TEST(make_default_true_test)
RUN_TEST(quit_test)
RUN_TEST(time_test)
RUN_TEST(wait_test)
RUN_TEST(wait_unwaitable_handle_test)
RUN_TEST(wait_shutdown_test)
RUN_TEST(task_test)
RUN_TEST(task_shutdown_test)
RUN_TEST(receiver_test)
RUN_TEST(receiver_shutdown_test)
RUN_TEST(exception_test)
RUN_TEST(exception_shutdown_test)
RUN_TEST(threads_have_default_dispatcher)
for (int i = 0; i < 3; i++) {
    RUN_TEST(threads_quit)
    RUN_TEST(threads_shutdown)
    RUN_TEST(threads_waits_run_concurrently_test)
    RUN_TEST(threads_tasks_run_sequentially_test)
    RUN_TEST(threads_receivers_run_concurrently_test)
    RUN_TEST(threads_exceptions_run_concurrently_test)
}
END_TEST_CASE(loop_tests)
