// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use futures::future::FutureObj;
use futures::FutureExt;
use std::cell::UnsafeCell;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
use std::task::{Context, Poll};

/// A lock-free thread-safe future.
pub struct AtomicFuture {
    // ACTIVE, INACTIVE, NOTIFIED, or DONE
    state: AtomicUsize,
    // `future` is safe to access only after a successful
    // compare-and-swap from INACTIVE to ACTIVE, and before
    // a transition from ACTIVE or NOTIFIED to INACTIVE or DONE.
    // INVARIANT: this value must be `Some(...)` so long as
    // `state` != `DONE`.
    future: UnsafeCell<Option<FutureObj<'static, ()>>>,
}

/// `AtomicFuture` is safe to access from multiple threads at once.
/// Its only method is `try_poll`, which itself is thread-safe.
/// (See comments on method implementation for details)
unsafe impl Sync for AtomicFuture {}
trait AssertSend: Send {}
impl AssertSend for AtomicFuture {}

/// No thread is currently polling the future.
const INACTIVE: usize = 0;

/// A thread is currently polling the future.
const ACTIVE: usize = 1;

/// A thread is currently polling the future, and will poll again once it completes.
const NOTIFIED: usize = 2;

/// The future has been completed and should not be polled again.
const DONE: usize = 3;

// Valid transitions are:
//
// INACTIVE => ACTIVE: a thread took control of the future
// ACTIVE => INACTIVE: a thread released control of the future
// ACTIVE => NOTIFIED: a thread is currently in control of the future
//                     and has been notified to poll again
// ACTIVE => DONE: the future has been completed
// NOTIFIED => ACTIVE: a thread saw "NOTIFIED" and is going to poll again
// NOTIFIED => DONE: the future has been completed

/// The result of a call to `try_poll`.
/// This indicates the result of attempting to `poll` the future.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum AttemptPollResult {
    /// The future was being polled by another thread, but it was notified
    /// to poll at least once more before yielding.
    Busy,
    /// The future was polled, but did not complete.
    Pending,
    /// The future was polled and finished by this thread.
    /// This result is normally used to trigger garbage-collection of the future.
    IFinished,
    /// The future was already completed by another thread.
    SomeoneElseFinished,
}

impl AtomicFuture {
    /// Create a new `AtomicFuture`.
    pub fn new(future: FutureObj<'static, ()>) -> Self {
        AtomicFuture { state: AtomicUsize::new(INACTIVE), future: UnsafeCell::new(Some(future)) }
    }

    /// Attempt to poll the underlying future.
    ///
    /// `try_poll` ensures that the future is polled at least once more
    /// unless it has already finished.
    pub fn try_poll(&self, cx: &mut Context<'_>) -> AttemptPollResult {
        // AcqRel is used instead of SeqCst in the following code.
        //
        // AcqRel behaves like Acquire on loads and Release on stores.
        // This means that a successful compare-and-swap will observe reads and
        // writes from other threads that happened before their successful
        // compare and swap operations.
        //
        // Prior to reading *or* mutating `self.future`, we must have
        // performed a successful transition from INACTIVE to ACTIVE.
        //
        // After any mutation to `self.future`, no other threads will read
        // `self.future` until the following two operations have occurred:
        //
        // - The mutating thread has transitioned from ACTIVE to (INACTIVE or DONE)
        // - The new reader has transitioned from INACTIVE to ACTIVE.
        //
        // This guarantees that any writes written by the mutating thread will
        // be observable by the reading thread.
        loop {
            // Attempt to acquire sole responsibility for polling the future
            match self.state.compare_exchange(INACTIVE, ACTIVE, AcqRel, Acquire) {
                Ok(INACTIVE) => {
                    // we are now the (only) active worker. proceed to poll!
                    loop {
                        let poll_res = {
                            // This `UnsafeCell` access is valid because `self.future.get()`
                            // is only called here, inside the critical section where
                            // we performed the transition from INACTIVE to ACTIVE.
                            let opt: &mut Option<FutureObj<'static, ()>> =
                                unsafe { &mut *self.future.get() };

                            // We know that the future is still there and hasn't completed
                            // because `state` != `DONE`
                            let future = opt.as_mut().expect("Missing future in AtomicFuture");
                            future.poll_unpin(cx)
                        };

                        match poll_res {
                            Poll::Ready(()) => {
                                // Take the future so that its innards can be dropped
                                let future_opt: &mut Option<FutureObj<'static, ()>> =
                                    unsafe { &mut *self.future.get() };
                                future_opt.take();

                                // No one else will read `future` unless they see
                                // `INACTIVE`, which will never happen again.
                                self.state.store(DONE, Relaxed);
                                return AttemptPollResult::IFinished;
                            }
                            Poll::Pending => {
                                // Continue on
                            }
                        }

                        match self.state.compare_exchange(ACTIVE, INACTIVE, AcqRel, Acquire) {
                            Ok(ACTIVE) => {
                                return AttemptPollResult::Pending;
                            }
                            Err(NOTIFIED) => {
                                // We were notified to poll again while we were busy.
                                // We are still the sole owner of the memory in `future`,
                                // so we don't need any specific memory ordering guarantees.
                                self.state.store(ACTIVE, Relaxed);
                                continue;
                            }
                            Err(INACTIVE) | Err(DONE) => {
                                panic!("Invalid data contention in AtomicFuture");
                            }
                            _ => {
                                panic!("Unexpected AtomicFuture state");
                            }
                        }
                    }
                }
                Err(ACTIVE) => {
                    // Someone else was already working on this.
                    // notify them to make sure they poll at least one more time
                    //
                    // We're not acquiring access to memory or releasing any writes,
                    // so we can use `Relaxed` memory ordering.
                    match self.state.compare_exchange(ACTIVE, NOTIFIED, Relaxed, Relaxed) {
                        Err(INACTIVE) => {
                            // Ooh, the worker finished before we could notify.
                            // Let's start over and try and become the new worker.
                            continue;
                        }
                        Ok(ACTIVE) | Err(NOTIFIED) => {
                            // Either we CAS'd to NOTIFIED or someone else did.
                            //
                            // Since this is a relaxed read, you might wonder how we know
                            // that our read of NOTIFIED isn't stale.
                            //
                            // Note that the event which triggered the call to `try_poll` must
                            // have occurred prior to the initial (failed) CAS from INACTIVE to
                            // ACTIVE. That event itself must have introduced a memory fence in
                            // order to ensure visibility on successive calls to `try_poll` on
                            // another thread. Common events include syscalls and mutex writes.
                            //
                            // Since this CAS cannot be reordered with the initial CAS of
                            // the same variable, we have following definite order:
                            //
                            // event (syscall, mutex write, etc.) happens before
                            // read of ACTIVE happens before
                            // read of NOTIFIED
                            //
                            // This means `state` was definitely NOTIFIED at some point
                            // after the initial EVENT.
                            return AttemptPollResult::Busy;
                        }
                        Err(DONE) => {
                            // The worker completed this future already
                            return AttemptPollResult::SomeoneElseFinished;
                        }
                        _ => {
                            panic!("Unexpected AtomicFuture state");
                        }
                    }
                }
                Err(NOTIFIED) => {
                    // The worker is already going to poll at least one more time.
                    return AttemptPollResult::Busy;
                }
                Err(DONE) => {
                    // Someone else completed this future already
                    return AttemptPollResult::SomeoneElseFinished;
                }
                _ => {
                    panic!("Unexpected AtomicFuture state");
                }
            }
        }
    }
}
