| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| futures::ready, |
| std::{ |
| cell::UnsafeCell, |
| future::Future, |
| marker::PhantomData, |
| mem::ManuallyDrop, |
| pin::Pin, |
| sync::atomic::{ |
| AtomicUsize, |
| Ordering::{Acquire, Relaxed, Release}, |
| }, |
| task::{Context, Poll}, |
| }, |
| }; |
| |
| /// A lock-free thread-safe future. |
| // The debugger knows the layout so that async backtraces work, so if this changes the debugger |
| // might need to be changed too. |
| // LINT.IfChange |
| pub struct AtomicFuture<'a> { |
| // A bitfield (holds the bits INACTIVE, READY or DONE). |
| state: AtomicUsize, |
| |
| // `future` is safe to access after successfully clearing the INACTIVE bit and the `DONE` bit |
| // isn't set. |
| future: UnsafeCell<Box<dyn FutureOrResultAccess<'a>>>, |
| } |
| // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc) |
| |
| trait FutureOrResultAccess<'a>: 'a { |
| /// Drops the future. |
| /// |
| /// # Safety |
| /// |
| /// The caller must ensure the future hasn't been dropped. |
| // zxdb uses this method to figure out the concrete type of the future and it currently assumes |
| // it is the first method in the trait. |
| // LINT.IfChange |
| unsafe fn drop_future(&mut self); |
| // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc) |
| |
| /// Polls the future. |
| /// |
| /// # Safety |
| /// |
| /// The caller must ensure the future hasn't been dropped. |
| unsafe fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()>; |
| |
| /// Gets the result. |
| /// |
| /// # Safety |
| /// |
| /// The caller must ensure the future is finished and the result hasn't been taken or dropped. |
| unsafe fn get_result(&self) -> *const (); |
| |
| /// Drops the result. |
| /// |
| /// # Safety |
| /// |
| /// The caller must ensure the future is finished and the result hasn't already been taken or |
| /// dropped. |
| unsafe fn drop_result(&mut self); |
| } |
| |
| union FutureOrResult<'a, F: 'a, R: 'a> { |
| future: ManuallyDrop<F>, |
| result: ManuallyDrop<R>, |
| lifetime: PhantomData<&'a ()>, |
| } |
| |
| impl<'a, F: Future<Output = R> + 'a, R: 'a> FutureOrResultAccess<'a> for FutureOrResult<'a, F, R> { |
| unsafe fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
| let result = ready!(Pin::new_unchecked(&mut *self.future).poll(cx)); |
| // This might panic which will leave ourselves in a bad state. We deal with this by |
| // aborting (see below). |
| ManuallyDrop::drop(&mut self.future); |
| self.result = ManuallyDrop::new(result); |
| Poll::Ready(()) |
| } |
| |
| unsafe fn drop_future(&mut self) { |
| ManuallyDrop::drop(&mut self.future); |
| } |
| |
| unsafe fn get_result(&self) -> *const () { |
| &*self.result as *const R as *const () |
| } |
| |
| unsafe fn drop_result(&mut self) { |
| ManuallyDrop::drop(&mut self.result); |
| } |
| } |
| |
| /// `AtomicFuture` is safe to access from multiple threads at once. |
| unsafe impl Sync for AtomicFuture<'_> {} |
| unsafe impl Send for AtomicFuture<'_> {} |
| |
| /// State Bits |
| |
| // Exclusive access is gained by clearing this bit. |
| const INACTIVE: usize = 1 << 0; |
| |
| // Set to indicate the future needs to be polled again. |
| const READY: usize = 1 << 1; |
| |
| // Terminal state: the future is dropped upon entry to this state. When in this state, other bits |
| // can be set, including READY (which has no meaning). |
| const DONE: usize = 1 << 2; |
| |
| // The task has been detached. |
| const DETACHED: usize = 1 << 3; |
| |
| // The task has been cancelled. |
| const CANCELLED: usize = 1 << 4; |
| |
| // The result has been taken. |
| const RESULT_TAKEN: usize = 1 << 5; |
| |
| /// The result of a call to `try_poll`. |
| /// This indicates the result of attempting to `poll` the future. |
| #[derive(Debug)] |
| pub enum AttemptPollResult { |
| /// The future was being polled by another thread, but it was notified |
| /// to poll at least once more before yielding. |
| Busy, |
| /// The future was polled, but did not complete. |
| Pending, |
| /// The future was polled and finished by this thread. |
| /// This result is normally used to trigger garbage-collection of the future. |
| IFinished, |
| /// The future was already completed by another thread. |
| SomeoneElseFinished, |
| /// The future was polled, did not complete, but it is woken whilst it is polled so it |
| /// should be polled again. |
| Yield, |
| /// The future was cancelled. |
| Cancelled, |
| } |
| |
| impl<'a> AtomicFuture<'a> { |
| /// Create a new `AtomicFuture`. |
| pub fn new<F: Future<Output = R> + Send + 'a, R: Send + 'a>(future: F, detached: bool) -> Self { |
| unsafe { Self::new_local(future, detached) } |
| } |
| |
| /// Create a new `AtomicFuture` from a !Send future. |
| /// |
| /// # Safety |
| /// |
| /// The caller must uphold the Send requirements. |
| pub unsafe fn new_local<F: Future<Output = R> + 'a, R: 'a>(future: F, detached: bool) -> Self { |
| AtomicFuture { |
| state: AtomicUsize::new( |
| INACTIVE + { |
| if detached { |
| DETACHED |
| } else { |
| 0 |
| } |
| }, |
| ), |
| future: UnsafeCell::new(Box::new(FutureOrResult { future: ManuallyDrop::new(future) })), |
| } |
| } |
| |
| /// Attempt to poll the underlying future. |
| /// |
| /// `try_poll` ensures that the future is polled at least once more |
| /// unless it has already finished. |
| pub fn try_poll(&self, cx: &mut Context<'_>) -> AttemptPollResult { |
| loop { |
| // Attempt to acquire sole responsibility for polling the future (by clearing the |
| // INACTIVE bit) and also clear the READY bit at the same time so that we track if it |
| // becomes READY again whilst we are polling. |
| let old = self.state.fetch_and(!(INACTIVE | READY), Acquire); |
| if old & DONE != 0 { |
| // Someone else completed this future already |
| return AttemptPollResult::SomeoneElseFinished; |
| } |
| if old & INACTIVE != 0 { |
| // We are now the (only) active worker, proceed to poll... |
| if old & CANCELLED != 0 { |
| // The future was cancelled. |
| // SAFETY: We have exclusive access. |
| unsafe { |
| self.drop_future_unchecked(); |
| } |
| return AttemptPollResult::Cancelled; |
| } |
| break; |
| } |
| // Future was already active; this shouldn't really happen because we shouldn't be |
| // polling it from multiple threads at the same time. Still, we handle it by setting |
| // the READY bit so that it gets polled again. We do this regardless of whether we |
| // cleared the READY bit above. |
| let old = self.state.fetch_or(READY, Relaxed); |
| // If the future is still active, or the future was already marked as ready, we can |
| // just return and it will get polled again. |
| if old & INACTIVE == 0 || old & READY != 0 { |
| return AttemptPollResult::Pending; |
| } |
| // The worker finished, and we marked the future as ready, so we must try again because |
| // the future won't be in a run queue. |
| } |
| |
| // We cannot recover from panics. |
| struct Bomb; |
| impl Drop for Bomb { |
| fn drop(&mut self) { |
| std::process::abort(); |
| } |
| } |
| |
| let bomb = Bomb; |
| |
| // This `UnsafeCell` access is valid because `self.future.get()` is only called here, inside |
| // the critical section where we performed the transition from INACTIVE to ACTIVE. |
| let result = unsafe { (*self.future.get()).poll(cx) }; |
| |
| std::mem::forget(bomb); |
| |
| if let Poll::Ready(()) = result { |
| // The future will have been dropped, so we just need to set the state. |
| self.state.fetch_or(DONE, Relaxed); |
| // No one else will read `future` unless they see `INACTIVE`, which will never |
| // happen again. |
| AttemptPollResult::IFinished |
| } else if self.state.fetch_or(INACTIVE, Release) & READY == 0 { |
| AttemptPollResult::Pending |
| } else { |
| // The future was marked ready whilst we were polling, so yield. |
| AttemptPollResult::Yield |
| } |
| } |
| |
| /// Marks the future as ready and returns true if it needs to be added to a run queue, i.e. |
| /// it isn't already ready, active or done. |
| #[must_use] |
| pub fn mark_ready(&self) -> bool { |
| self.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE |
| } |
| |
| /// Drops the future without checking its current state. |
| /// |
| /// # Safety |
| /// |
| /// This doesn't check the current state, so this must only be called if it is known that there |
| /// is no concurrent access. This also does *not* include any memory barriers before dropping |
| /// the future. |
| pub unsafe fn drop_future_unchecked(&self) { |
| // Set the state first in case we panic when we drop. |
| assert!(self.state.fetch_or(DONE | RESULT_TAKEN, Relaxed) & DONE == 0); |
| (*self.future.get()).drop_future(); |
| } |
| |
| /// Drops the future if it is not currently being polled. Returns success if the future was |
| /// dropped or was already dropped. |
| pub fn try_drop(&self) -> Result<(), ()> { |
| let old = self.state.fetch_and(!INACTIVE, Acquire); |
| if old & DONE != 0 { |
| Ok(()) |
| } else if old & INACTIVE != 0 { |
| // SAFETY: We have exclusive access. |
| unsafe { |
| self.drop_future_unchecked(); |
| } |
| Ok(()) |
| } else { |
| Err(()) |
| } |
| } |
| |
| /// Cancels the task. Returns true if the task needs to be added to a run queue. |
| #[must_use] |
| pub fn cancel(&self) -> bool { |
| self.state.fetch_or(CANCELLED | READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE |
| } |
| |
| /// Marks the task as detached. |
| pub fn detach(&self) { |
| self.state.fetch_or(DETACHED, Relaxed); |
| } |
| |
| /// Returns true if the task is detached or cancelled. |
| pub fn is_detached_or_cancelled(&self) -> bool { |
| self.state.load(Relaxed) & (DETACHED | CANCELLED) != 0 |
| } |
| |
| /// Takes the result. |
| /// |
| /// # Safety |
| /// |
| /// The caller must guarantee that `R` is the correct type. |
| pub unsafe fn take_result<R>(&self) -> Option<R> { |
| if self.state.load(Relaxed) & (DONE | RESULT_TAKEN) == DONE |
| && self.state.fetch_or(RESULT_TAKEN, Relaxed) & RESULT_TAKEN == 0 |
| { |
| Some(((*self.future.get()).get_result() as *const R).read()) |
| } else { |
| None |
| } |
| } |
| } |
| |
| impl Drop for AtomicFuture<'_> { |
| fn drop(&mut self) { |
| let state = *self.state.get_mut(); |
| if state & DONE == 0 { |
| // SAFETY: The state isn't DONE so we must drop the future. |
| unsafe { |
| (*self.future.get()).drop_future(); |
| } |
| } else if state & RESULT_TAKEN == 0 { |
| // SAFETY: The result hasn't been taken so we must drop the result. |
| unsafe { |
| (*self.future.get()).drop_result(); |
| } |
| } |
| } |
| } |