// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use fuchsia_async as fasync;
use fuchsia_zircon_status as zx_status;
use fuchsia_zircon_types::{
    zx_handle_t, zx_packet_signal_t, zx_signals_t, zx_status_t, zx_time_t, ZX_ERR_NOT_SUPPORTED,
    ZX_OK,
};
use futures::channel::oneshot;
use parking_lot::Mutex;
use std::time::Duration;

struct EPtr(*mut Executor);
unsafe impl Send for EPtr {}
unsafe impl Sync for EPtr {}

impl EPtr {
    unsafe fn as_ref(&self) -> &Executor {
        self.0.as_ref().unwrap()
    }

    unsafe fn as_mut(&mut self) -> &mut Executor {
        self.0.as_mut().unwrap()
    }
}

#[cfg(target_os = "fuchsia")]
mod fuchsia_details {

    use super::*;
    use fuchsia_zircon as zx;

    // The callback holder... gets registered with the executor and receives a packet when
    // trigger signals are observed. Needs to hold its own registration as dropping that would cause
    // deregistration.
    pub(crate) struct WaitEnder {
        wait: *mut async_wait_t,
        dispatcher: *mut std::ffi::c_void,
        registration: Mutex<Registration>,
    }
    // Registration state machine:
    // Unregistered -+-> Registered -+-> Finished
    //               |               |
    //               +---------------+
    // i.e. we start at Unregistered, move to Finished, and for a time we may be Registered, or not.
    // The or not case happens when we get the receive_packet callback *before* we record the registration on
    // the WaitEnder instance.
    enum Registration {
        // Wait registration not yet recorded.
        Unregistered,
        // Wait registered. We expect to receive_packet sometime in the future.
        Registered(fasync::ReceiverRegistration<WaitEnder>),
        // We have gotten a receive_packet callback.
        Finished,
    }
    unsafe impl Send for WaitEnder {}
    unsafe impl Sync for WaitEnder {}
    impl fasync::PacketReceiver for WaitEnder {
        fn receive_packet(&self, packet: zx::Packet) {
            // Record that the receive_packet call has been received... this finishes the
            // registration state machine.
            // Lock only needs to be held as long as it takes to note this fact.
            *self.registration.lock() = Registration::Finished;
            if let zx::PacketContents::SignalOne(sig) = packet.contents() {
                unsafe {
                    ((*self.wait).handler)(
                        self.dispatcher,
                        self.wait,
                        packet.status(),
                        sig.raw_packet(),
                    )
                }
            } else {
                panic!("Expected a signal packet");
            }
        }
    }
    impl WaitEnder {
        pub(crate) fn new(dispatcher: *mut std::ffi::c_void, wait: *mut async_wait_t) -> WaitEnder {
            WaitEnder { wait, dispatcher, registration: Mutex::new(Registration::Unregistered) }
        }

        pub(crate) fn record_registration(
            &self,
            registration: fasync::ReceiverRegistration<WaitEnder>,
        ) {
            let mut lock = self.registration.lock();
            if let Registration::Finished =
                std::mem::replace(&mut *lock, Registration::Registered(registration))
            {
                // Need to be able to cope with the callback coming before we reach this code and finish
                // the cycle holding our registration.
                *lock = Registration::Finished;
            }
        }
    }
}

pub struct Executor {
    executor: Mutex<fasync::LocalExecutor>,
    quit_tx: Mutex<Option<oneshot::Sender<()>>>,
    start: fasync::Time,
    cb_executor: *mut std::ffi::c_void,
}

impl Executor {
    fn new(cb_executor: *mut std::ffi::c_void) -> Box<Executor> {
        Box::new(Executor {
            executor: Mutex::new(fasync::LocalExecutor::new().unwrap()),
            quit_tx: Mutex::new(None),
            start: fasync::Time::now(),
            cb_executor,
        })
    }

    fn run_singlethreaded(&self) {
        let (tx, rx) = oneshot::channel();
        *self.quit_tx.lock() = Some(tx);
        self.executor.lock().run_singlethreaded(async move { rx.await }).unwrap();
    }

    fn quit(&self) {
        self.quit_tx.lock().take().unwrap().send(()).unwrap();
    }

    #[cfg(target_os = "fuchsia")]
    unsafe fn begin_wait(&mut self, wait: *mut async_wait_t) -> Result<(), zx_status::Status> {
        // TODO: figure out how to change fasync::Executor such that this can be done without allocation.

        use fuchsia_details::*;
        use fuchsia_zircon as zx;
        use std::sync::Arc;
        use zx::AsHandleRef;

        let handle = zx::HandleRef::from_raw_handle((*wait).object);
        let dispatcher: *mut Executor = &mut *self;
        let wait_ender = Arc::new(WaitEnder::new(dispatcher as *mut std::ffi::c_void, wait));
        let registration = fasync::EHandle::local().register_receiver(wait_ender.clone());

        let signals =
            zx::Signals::from_bits((*wait).trigger).ok_or(zx_status::Status::INVALID_ARGS)?;
        let options =
            zx::WaitAsyncOpts::from_bits((*wait).options).ok_or(zx_status::Status::INVALID_ARGS)?;
        let result =
            handle.wait_async_handle(registration.port(), registration.key(), signals, options);

        wait_ender.record_registration(registration);

        result
    }

    #[cfg(not(target_os = "fuchsia"))]
    unsafe fn begin_wait(&self, _wait: *mut async_wait_t) -> Result<(), zx_status::Status> {
        Err(zx_status::Status::NOT_SUPPORTED)
    }

    fn now(&self) -> zx_time_t {
        let dur = fasync::Time::now() - self.start;
        #[cfg(target_os = "fuchsia")]
        let r = dur.into_nanos();
        #[cfg(not(target_os = "fuchsia"))]
        let r = dur.as_nanos() as zx_time_t;
        r
    }
}

/// Duplicated from //zircon/system/ulib/async/include/lib/async/dispatcher.h
#[repr(C)]
struct async_state_t {
    _reserved: [usize; 2],
}

/// Duplicated from //zircon/system/ulib/async/include/lib/async/wait.h
#[repr(C)]
pub(crate) struct async_wait_t {
    _state: async_state_t,
    handler: extern "C" fn(
        *mut std::ffi::c_void,
        *mut async_wait_t,
        zx_status_t,
        *const zx_packet_signal_t,
    ),
    object: zx_handle_t,
    trigger: zx_signals_t,
    options: u32,
}

/// Duplicated from //zircon/system/ulib/async/include/lib/async/task.h
#[repr(C)]
struct async_task_t {
    _state: async_state_t,
    handler: extern "C" fn(*mut std::ffi::c_void, *mut async_task_t, zx_status_t),
    deadline: zx_time_t,
}

struct TaskPtr(*mut async_task_t);
unsafe impl Send for TaskPtr {}
unsafe impl Sync for TaskPtr {}
impl TaskPtr {
    unsafe fn as_ref(&self) -> &async_task_t {
        self.0.as_ref().unwrap()
    }
}

#[no_mangle]
pub extern "C" fn fasync_executor_create(cb_executor: *mut std::ffi::c_void) -> *mut Executor {
    Box::into_raw(Executor::new(cb_executor))
}

#[allow(clippy::missing_safety_doc)] // TODO(fxbug.dev/99059)
#[no_mangle]
pub unsafe extern "C" fn fasync_executor_run_singlethreaded(executor: *mut Executor) {
    EPtr(executor).as_ref().run_singlethreaded()
}

#[allow(clippy::missing_safety_doc)] // TODO(fxbug.dev/99059)
#[no_mangle]
pub unsafe extern "C" fn fasync_executor_quit(executor: *mut Executor) {
    EPtr(executor).as_ref().quit()
}

#[allow(clippy::missing_safety_doc)] // TODO(fxbug.dev/99059)
#[no_mangle]
pub unsafe extern "C" fn fasync_executor_destroy(executor: *mut Executor) {
    drop(Box::from_raw(executor))
}

#[no_mangle]
unsafe extern "C" fn fasync_executor_now(executor: *mut Executor) -> zx_time_t {
    EPtr(executor).as_ref().now()
}

#[no_mangle]
unsafe extern "C" fn fasync_executor_begin_wait(
    executor: *mut Executor,
    wait: *mut async_wait_t,
) -> zx_status_t {
    zx_status::Status::from_result(EPtr(executor).as_mut().begin_wait(wait)).into_raw()
}

#[no_mangle]
unsafe extern "C" fn fasync_executor_cancel_wait(
    _executor: *mut Executor,
    _wait: *mut async_wait_t,
) -> zx_status_t {
    ZX_ERR_NOT_SUPPORTED
}

#[no_mangle]
unsafe extern "C" fn fasync_executor_post_task(
    executor: *mut Executor,
    task: *mut async_task_t,
) -> zx_status_t {
    let executor = EPtr(executor);
    let task = TaskPtr(task);
    fasync::Task::spawn(async move {
        let deadline = Duration::from_nanos(task.as_ref().deadline as u64);
        let start = executor.as_ref().start;
        fasync::Timer::new(start + deadline.into()).await;
        (task.as_ref().handler)(executor.as_ref().cb_executor, task.0, ZX_OK)
    })
    .detach();
    ZX_OK
}

#[no_mangle]
unsafe extern "C" fn fasync_executor_cancel_task(
    _executor: *mut Executor,
    _task: *mut async_task_t,
) -> zx_status_t {
    ZX_ERR_NOT_SUPPORTED
}
