// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <threads.h>

#include <magenta/syscalls.h>
#include <magenta/syscalls/port.h>
#include <mxio/dispatcher.h>
#include <magenta/listnode.h>

// Eventually we want to use the repeating version of mx_object_wait_async,
// but it is not ready for prime time yet.  This feature flag enables testing.
#define USE_WAIT_ONCE 1

#define VERBOSE_DEBUG 0

#if VERBOSE_DEBUG
#define xprintf(fmt...) printf(fmt)
#else
#define xprintf(...) do {} while (0)
#endif

typedef struct {
    list_node_t node;
    mx_handle_t h;
    uint32_t flags;
    mxio_dispatcher_cb_t cb;
    void* func;
    void* cookie;
} handler_t;

#if !USE_WAIT_ONCE
#define FLAG_DISCONNECTED 1
#endif

struct mxio_dispatcher {
    mtx_t lock;
    list_node_t list;
    mx_handle_t port;
    mxio_dispatcher_cb_t default_cb;
    thrd_t t;
};

static void mxio_dispatcher_destroy(mxio_dispatcher_t* md) {
    mx_handle_close(md->port);
    free(md);
}

static void destroy_handler(mxio_dispatcher_t* md, handler_t* handler, bool need_close_cb) {
    if (need_close_cb) {
        handler->cb(0, handler->func, handler->cookie);
    }

    mx_handle_close(handler->h);
    handler->h = MX_HANDLE_INVALID;

    mtx_lock(&md->lock);
    list_delete(&handler->node);
    mtx_unlock(&md->lock);
    free(handler);
}

// synthetic signal bit for synthetic packet
// used during teardown.
#define SIGNAL_NEEDS_CLOSE_CB 1u

static void disconnect_handler(mxio_dispatcher_t* md, handler_t* handler, bool need_close_cb) {
    xprintf("dispatcher: disconnect: %p / %x\n", handler, handler->h);

#if USE_WAIT_ONCE
    destroy_handler(md, handler, need_close_cb);
#else
    // Cancel the async wait operations.
    mx_status_t r = mx_port_cancel(md->port, handler->h, (uint64_t)(uintptr_t)handler);
    if (r) {
        printf("dispatcher: CANCEL FAILED %d\n", r);
    }

    // send a synthetic message so we know when it's safe to destroy
    // TODO: once cancel guarantees no packets will arrive after,
    // we can just destroy the object here instead of doing this...
    mx_port_packet_t packet;
    packet.key = (uint64_t)(uintptr_t)handler;
    packet.signal.observed = need_close_cb ? SIGNAL_NEEDS_CLOSE_CB : 0;
    r = mx_port_queue(md->port, &packet, 0);
    if (r) {
        printf("dispatcher: PORT QUEUE FAILED %d\n", r);
    }

    // flag so we know to ignore further events
    handler->flags |= FLAG_DISCONNECTED;
#endif
}

static int mxio_dispatcher_thread(void* _md) {
    mxio_dispatcher_t* md = _md;
    mx_status_t r;
    xprintf("dispatcher: start %p\n", md);

    for (;;) {
        mx_port_packet_t packet;
        if ((r = mx_port_wait(md->port, MX_TIME_INFINITE, &packet, 0)) < 0) {
            printf("dispatcher: port wait failed %d\n", r);
            break;
        }
        handler_t* handler = (void*)(uintptr_t)packet.key;
#if !USE_WAIT_ONCE
        if (handler->flags & FLAG_DISCONNECTED) {
            // handler is awaiting gc
            // ignore events for it until we get the synthetic "destroy" event
            if (packet.type == MX_PKT_TYPE_USER) {
                destroy_handler(md, handler, packet.signal.observed & SIGNAL_NEEDS_CLOSE_CB);
                printf("dispatcher: destroy %p\n", handler);
            } else {
                printf("dispatcher: spurious packet for %p\n", handler);
            }
            continue;
        }
#endif
        if (packet.signal.observed & MX_CHANNEL_READABLE) {
            if ((r = handler->cb(handler->h, handler->func, handler->cookie)) != 0) {
                if (r == ERR_DISPATCHER_NO_WORK) {
                    printf("mxio: dispatcher found no work to do!\n");
                } else {
                    disconnect_handler(md, handler, r != ERR_DISPATCHER_DONE);
                    continue;
                }
            }
#if USE_WAIT_ONCE
            if ((r = mx_object_wait_async(handler->h, md->port, (uint64_t)(uintptr_t)handler,
                                          MX_CHANNEL_READABLE | MX_CHANNEL_PEER_CLOSED,
                                          MX_WAIT_ASYNC_ONCE)) < 0) {
                printf("dispatcher: could not re-arm: %p\n", handler);
            }
#endif
            continue;
        }
        if (packet.signal.observed & MX_CHANNEL_PEER_CLOSED) {
            // synthesize a close
            disconnect_handler(md, handler, true);
        }
    }

    xprintf("dispatcher: FATAL ERROR, EXITING\n");
    mxio_dispatcher_destroy(md);
    return MX_OK;
}

mx_status_t mxio_dispatcher_create(mxio_dispatcher_t** out, mxio_dispatcher_cb_t cb) {
    mxio_dispatcher_t* md;
    if ((md = calloc(1, sizeof(*md))) == NULL) {
        return MX_ERR_NO_MEMORY;
    }
    xprintf("mxio_dispatcher_create: %p\n", md);
    list_initialize(&md->list);
    mtx_init(&md->lock, mtx_plain);
    mx_status_t status;
    if ((status = mx_port_create(0, &md->port)) < 0) {
        free(md);
        return status;
    }
    md->default_cb = cb;
    *out = md;
    return MX_OK;
}

mx_status_t mxio_dispatcher_start(mxio_dispatcher_t* md, const char* name) {
    mx_status_t r;
    mtx_lock(&md->lock);
    if (md->t == NULL) {
        if (thrd_create_with_name(&md->t, mxio_dispatcher_thread, md, name) != thrd_success) {
            mxio_dispatcher_destroy(md);
            r = MX_ERR_NO_RESOURCES;
        } else {
            thrd_detach(md->t);
            r = MX_OK;
        }
    } else {
        r = MX_ERR_BAD_STATE;
    }
    mtx_unlock(&md->lock);
    return r;
}

void mxio_dispatcher_run(mxio_dispatcher_t* md) {
    mxio_dispatcher_thread(md);
}

mx_status_t mxio_dispatcher_add(mxio_dispatcher_t* md, mx_handle_t h, void* func, void* cookie) {
    return mxio_dispatcher_add_etc(md, h, md->default_cb, func, cookie);
}

mx_status_t mxio_dispatcher_add_etc(mxio_dispatcher_t* md, mx_handle_t h,
                                    mxio_dispatcher_cb_t cb,
                                    void* func, void* cookie) {
    handler_t* handler;
    mx_status_t r;

    if ((handler = malloc(sizeof(handler_t))) == NULL) {
        return MX_ERR_NO_MEMORY;
    }
    handler->h = h;
    handler->flags = 0;
    handler->cb = cb;
    handler->func = func;
    handler->cookie = cookie;

    mtx_lock(&md->lock);
    list_add_tail(&md->list, &handler->node);
#if USE_WAIT_ONCE
    if ((r = mx_object_wait_async(h, md->port, (uint64_t)(uintptr_t)handler,
                                  MX_CHANNEL_READABLE | MX_CHANNEL_PEER_CLOSED,
                                  MX_WAIT_ASYNC_ONCE)) < 0) {
        list_delete(&handler->node);
    }
#else
    if ((r = mx_object_wait_async(h, md->port, (uint64_t)(uintptr_t)handler,
                                  MX_CHANNEL_READABLE | MX_CHANNEL_PEER_CLOSED,
                                  MX_WAIT_ASYNC_REPEATING)) < 0) {
        list_delete(&handler->node);
    }
#endif
    mtx_unlock(&md->lock);

    if (r < 0) {
        printf("dispatcher: failed to bind: %d\n", r);
        free(handler);
    } else {
        xprintf("dispatcher: added %p / %x\n", handler, h);
    }
    return r;
}
