blob: d60d8e0202cffe0ebbe04af72545eb152e4e17f5 [file] [log] [blame]
//! An asynchronously awaitable `CancellationToken`.
//! The token allows to signal a cancellation request to one or more tasks.
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::intrusive_double_linked_list::{LinkedList, ListNode};
use core::future::Future;
use core::pin::Pin;
use core::ptr::NonNull;
use core::sync::atomic::Ordering;
use core::task::{Context, Poll, Waker};
/// A token which can be used to signal a cancellation request to one or more
/// tasks.
///
/// Tasks can call [`CancellationToken::cancelled()`] in order to
/// obtain a Future which will be resolved when cancellation is requested.
///
/// Cancellation can be requested through the [`CancellationToken::cancel`] method.
///
/// # Examples
///
/// ```ignore
/// use tokio::select;
/// use tokio::scope::CancellationToken;
///
/// #[tokio::main]
/// async fn main() {
/// let token = CancellationToken::new();
/// let cloned_token = token.clone();
///
/// let join_handle = tokio::spawn(async move {
/// // Wait for either cancellation or a very long time
/// select! {
/// _ = cloned_token.cancelled() => {
/// // The token was cancelled
/// 5
/// }
/// _ = tokio::time::delay_for(std::time::Duration::from_secs(9999)) => {
/// 99
/// }
/// }
/// });
///
/// tokio::spawn(async move {
/// tokio::time::delay_for(std::time::Duration::from_millis(10)).await;
/// token.cancel();
/// });
///
/// assert_eq!(5, join_handle.await.unwrap());
/// }
/// ```
pub struct CancellationToken {
inner: NonNull<CancellationTokenState>,
}
// Safety: The CancellationToken is thread-safe and can be moved between threads,
// since all methods are internally synchronized.
unsafe impl Send for CancellationToken {}
unsafe impl Sync for CancellationToken {}
/// A Future that is resolved once the corresponding [`CancellationToken`]
/// was cancelled
#[must_use = "futures do nothing unless polled"]
pub struct WaitForCancellationFuture<'a> {
/// The CancellationToken that is associated with this WaitForCancellationFuture
cancellation_token: Option<&'a CancellationToken>,
/// Node for waiting at the cancellation_token
wait_node: ListNode<WaitQueueEntry>,
/// Whether this future was registered at the token yet as a waiter
is_registered: bool,
}
// Safety: Futures can be sent between threads as long as the underlying
// cancellation_token is thread-safe (Sync),
// which allows to poll/register/unregister from a different thread.
unsafe impl<'a> Send for WaitForCancellationFuture<'a> {}
// ===== impl CancellationToken =====
impl core::fmt::Debug for CancellationToken {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("CancellationToken")
.field("is_cancelled", &self.is_cancelled())
.finish()
}
}
impl Clone for CancellationToken {
fn clone(&self) -> Self {
// Safety: The state inside a `CancellationToken` is always valid, since
// is reference counted
let inner = self.state();
// Tokens are cloned by increasing their refcount
let current_state = inner.snapshot();
inner.increment_refcount(current_state);
CancellationToken { inner: self.inner }
}
}
impl Drop for CancellationToken {
fn drop(&mut self) {
let token_state_pointer = self.inner;
// Safety: The state inside a `CancellationToken` is always valid, since
// is reference counted
let inner = unsafe { &mut *self.inner.as_ptr() };
let mut current_state = inner.snapshot();
// We need to safe the parent, since the state might be released by the
// next call
let parent = inner.parent;
// Drop our own refcount
current_state = inner.decrement_refcount(current_state);
// If this was the last reference, unregister from the parent
if current_state.refcount == 0 {
if let Some(mut parent) = parent {
// Safety: Since we still retain a reference on the parent, it must be valid.
let parent = unsafe { parent.as_mut() };
parent.unregister_child(token_state_pointer, current_state);
}
}
}
}
impl CancellationToken {
/// Creates a new CancellationToken in the non-cancelled state.
pub fn new() -> CancellationToken {
let state = Box::new(CancellationTokenState::new(
None,
StateSnapshot {
cancel_state: CancellationState::NotCancelled,
has_parent_ref: false,
refcount: 1,
},
));
// Safety: We just created the Box. The pointer is guaranteed to be
// not null
CancellationToken {
inner: unsafe { NonNull::new_unchecked(Box::into_raw(state)) },
}
}
/// Returns a reference to the utilized `CancellationTokenState`.
fn state(&self) -> &CancellationTokenState {
// Safety: The state inside a `CancellationToken` is always valid, since
// is reference counted
unsafe { &*self.inner.as_ptr() }
}
/// Creates a `CancellationToken` which will get cancelled whenever the
/// current token gets cancelled.
///
/// If the current token is already cancelled, the child token will get
/// returned in cancelled state.
///
/// # Examples
///
/// ```ignore
/// use tokio::select;
/// use tokio::scope::CancellationToken;
///
/// #[tokio::main]
/// async fn main() {
/// let token = CancellationToken::new();
/// let child_token = token.child_token();
///
/// let join_handle = tokio::spawn(async move {
/// // Wait for either cancellation or a very long time
/// select! {
/// _ = child_token.cancelled() => {
/// // The token was cancelled
/// 5
/// }
/// _ = tokio::time::delay_for(std::time::Duration::from_secs(9999)) => {
/// 99
/// }
/// }
/// });
///
/// tokio::spawn(async move {
/// tokio::time::delay_for(std::time::Duration::from_millis(10)).await;
/// token.cancel();
/// });
///
/// assert_eq!(5, join_handle.await.unwrap());
/// }
/// ```
pub fn child_token(&self) -> CancellationToken {
let inner = self.state();
// Increment the refcount of this token. It will be referenced by the
// child, independent of whether the child is immediately cancelled or
// not.
let _current_state = inner.increment_refcount(inner.snapshot());
let mut unpacked_child_state = StateSnapshot {
has_parent_ref: true,
refcount: 1,
cancel_state: CancellationState::NotCancelled,
};
let mut child_token_state = Box::new(CancellationTokenState::new(
Some(self.inner),
unpacked_child_state,
));
{
let mut guard = inner.synchronized.lock().unwrap();
if guard.is_cancelled {
// This task was already cancelled. In this case we should not
// insert the child into the list, since it would never get removed
// from the list.
(*child_token_state.synchronized.lock().unwrap()).is_cancelled = true;
unpacked_child_state.cancel_state = CancellationState::Cancelled;
// Since it's not in the list, the parent doesn't need to retain
// a reference to it.
unpacked_child_state.has_parent_ref = false;
child_token_state
.state
.store(unpacked_child_state.pack(), Ordering::SeqCst);
} else {
if let Some(mut first_child) = guard.first_child {
child_token_state.from_parent.next_peer = Some(first_child);
// Safety: We manipulate other child task inside the Mutex
// and retain a parent reference on it. The child token can't
// get invalidated while the Mutex is held.
unsafe {
first_child.as_mut().from_parent.prev_peer =
Some((&mut *child_token_state).into())
};
}
guard.first_child = Some((&mut *child_token_state).into());
}
};
let child_token_ptr = Box::into_raw(child_token_state);
// Safety: We just created the pointer from a `Box`
CancellationToken {
inner: unsafe { NonNull::new_unchecked(child_token_ptr) },
}
}
/// Cancel the [`CancellationToken`] and all child tokens which had been
/// derived from it.
///
/// This will wake up all tasks which are waiting for cancellation.
pub fn cancel(&self) {
self.state().cancel();
}
/// Returns `true` if the `CancellationToken` had been cancelled
pub fn is_cancelled(&self) -> bool {
self.state().is_cancelled()
}
/// Returns a `Future` that gets fulfilled when cancellation is requested.
pub fn cancelled(&self) -> WaitForCancellationFuture<'_> {
WaitForCancellationFuture {
cancellation_token: Some(self),
wait_node: ListNode::new(WaitQueueEntry::new()),
is_registered: false,
}
}
unsafe fn register(
&self,
wait_node: &mut ListNode<WaitQueueEntry>,
cx: &mut Context<'_>,
) -> Poll<()> {
self.state().register(wait_node, cx)
}
fn check_for_cancellation(
&self,
wait_node: &mut ListNode<WaitQueueEntry>,
cx: &mut Context<'_>,
) -> Poll<()> {
self.state().check_for_cancellation(wait_node, cx)
}
fn unregister(&self, wait_node: &mut ListNode<WaitQueueEntry>) {
self.state().unregister(wait_node)
}
}
// ===== impl WaitForCancellationFuture =====
impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("WaitForCancellationFuture").finish()
}
}
impl<'a> Future for WaitForCancellationFuture<'a> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// Safety: We do not move anything out of `WaitForCancellationFuture`
let mut_self: &mut WaitForCancellationFuture<'_> = unsafe { Pin::get_unchecked_mut(self) };
let cancellation_token = mut_self
.cancellation_token
.expect("polled WaitForCancellationFuture after completion");
let poll_res = if !mut_self.is_registered {
// Safety: The `ListNode` is pinned through the Future,
// and we will unregister it in `WaitForCancellationFuture::drop`
// before the Future is dropped and the memory reference is invalidated.
unsafe { cancellation_token.register(&mut mut_self.wait_node, cx) }
} else {
cancellation_token.check_for_cancellation(&mut mut_self.wait_node, cx)
};
if let Poll::Ready(()) = poll_res {
// The cancellation_token was signalled
mut_self.cancellation_token = None;
// A signalled Token means the Waker won't be enqueued anymore
mut_self.is_registered = false;
mut_self.wait_node.task = None;
} else {
// This `Future` and its stored `Waker` stay registered at the
// `CancellationToken`
mut_self.is_registered = true;
}
poll_res
}
}
impl<'a> Drop for WaitForCancellationFuture<'a> {
fn drop(&mut self) {
// If this WaitForCancellationFuture has been polled and it was added to the
// wait queue at the cancellation_token, it must be removed before dropping.
// Otherwise the cancellation_token would access invalid memory.
if let Some(token) = self.cancellation_token {
if self.is_registered {
token.unregister(&mut self.wait_node);
}
}
}
}
/// Tracks how the future had interacted with the [`CancellationToken`]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum PollState {
/// The task has never interacted with the [`CancellationToken`].
New,
/// The task was added to the wait queue at the [`CancellationToken`].
Waiting,
/// The task has been polled to completion.
Done,
}
/// Tracks the WaitForCancellationFuture waiting state.
/// Access to this struct is synchronized through the mutex in the CancellationToken.
struct WaitQueueEntry {
/// The task handle of the waiting task
task: Option<Waker>,
// Current polling state. This state is only updated inside the Mutex of
// the CancellationToken.
state: PollState,
}
impl WaitQueueEntry {
/// Creates a new WaitQueueEntry
fn new() -> WaitQueueEntry {
WaitQueueEntry {
task: None,
state: PollState::New,
}
}
}
struct SynchronizedState {
waiters: LinkedList<WaitQueueEntry>,
first_child: Option<NonNull<CancellationTokenState>>,
is_cancelled: bool,
}
impl SynchronizedState {
fn new() -> Self {
Self {
waiters: LinkedList::new(),
first_child: None,
is_cancelled: false,
}
}
}
/// Information embedded in child tokens which is synchronized through the Mutex
/// in their parent.
struct SynchronizedThroughParent {
next_peer: Option<NonNull<CancellationTokenState>>,
prev_peer: Option<NonNull<CancellationTokenState>>,
}
/// Possible states of a `CancellationToken`
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum CancellationState {
NotCancelled = 0,
Cancelling = 1,
Cancelled = 2,
}
impl CancellationState {
fn pack(self) -> usize {
self as usize
}
fn unpack(value: usize) -> Self {
match value {
0 => CancellationState::NotCancelled,
1 => CancellationState::Cancelling,
2 => CancellationState::Cancelled,
_ => unreachable!("Invalid value"),
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct StateSnapshot {
/// The amount of references to this particular CancellationToken.
/// `CancellationToken` structs hold these references to a `CancellationTokenState`.
/// Also the state is referenced by the state of each child.
refcount: usize,
/// Whether the state is still referenced by it's parent and can therefore
/// not be freed.
has_parent_ref: bool,
/// Whether the token is cancelled
cancel_state: CancellationState,
}
impl StateSnapshot {
/// Packs the snapshot into a `usize`
fn pack(self) -> usize {
self.refcount << 3 | if self.has_parent_ref { 4 } else { 0 } | self.cancel_state.pack()
}
/// Unpacks the snapshot from a `usize`
fn unpack(value: usize) -> Self {
let refcount = value >> 3;
let has_parent_ref = value & 4 != 0;
let cancel_state = CancellationState::unpack(value & 0x03);
StateSnapshot {
refcount,
has_parent_ref,
cancel_state,
}
}
/// Whether this `CancellationTokenState` is still referenced by any
/// `CancellationToken`.
fn has_refs(&self) -> bool {
self.refcount != 0 || self.has_parent_ref
}
}
/// The maximum permitted amount of references to a CancellationToken. This
/// is derived from the intent to never use more than 32bit in the `Snapshot`.
const MAX_REFS: u32 = (std::u32::MAX - 7) >> 3;
/// Internal state of the `CancellationToken` pair above
struct CancellationTokenState {
state: AtomicUsize,
parent: Option<NonNull<CancellationTokenState>>,
from_parent: SynchronizedThroughParent,
synchronized: Mutex<SynchronizedState>,
}
impl CancellationTokenState {
fn new(
parent: Option<NonNull<CancellationTokenState>>,
state: StateSnapshot,
) -> CancellationTokenState {
CancellationTokenState {
parent,
from_parent: SynchronizedThroughParent {
prev_peer: None,
next_peer: None,
},
state: AtomicUsize::new(state.pack()),
synchronized: Mutex::new(SynchronizedState::new()),
}
}
/// Returns a snapshot of the current atomic state of the token
fn snapshot(&self) -> StateSnapshot {
StateSnapshot::unpack(self.state.load(Ordering::SeqCst))
}
fn atomic_update_state<F>(&self, mut current_state: StateSnapshot, func: F) -> StateSnapshot
where
F: Fn(StateSnapshot) -> StateSnapshot,
{
let mut current_packed_state = current_state.pack();
loop {
let next_state = func(current_state);
match self.state.compare_exchange(
current_packed_state,
next_state.pack(),
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
return next_state;
}
Err(actual) => {
current_packed_state = actual;
current_state = StateSnapshot::unpack(actual);
}
}
}
}
fn increment_refcount(&self, current_state: StateSnapshot) -> StateSnapshot {
self.atomic_update_state(current_state, |mut state: StateSnapshot| {
if state.refcount >= MAX_REFS as usize {
eprintln!("[ERROR] Maximum reference count for CancellationToken was exceeded");
std::process::abort();
}
state.refcount += 1;
state
})
}
fn decrement_refcount(&self, current_state: StateSnapshot) -> StateSnapshot {
let current_state = self.atomic_update_state(current_state, |mut state: StateSnapshot| {
state.refcount -= 1;
state
});
// Drop the State if it is not referenced anymore
if !current_state.has_refs() {
// Safety: `CancellationTokenState` is always stored in refcounted
// Boxes
let _ = unsafe { Box::from_raw(self as *const Self as *mut Self) };
}
current_state
}
fn remove_parent_ref(&self, current_state: StateSnapshot) -> StateSnapshot {
let current_state = self.atomic_update_state(current_state, |mut state: StateSnapshot| {
state.has_parent_ref = false;
state
});
// Drop the State if it is not referenced anymore
if !current_state.has_refs() {
// Safety: `CancellationTokenState` is always stored in refcounted
// Boxes
let _ = unsafe { Box::from_raw(self as *const Self as *mut Self) };
}
current_state
}
/// Unregisters a child from the parent token.
/// The child tokens state is not exactly known at this point in time.
/// If the parent token is cancelled, the child token gets removed from the
/// parents list, and might therefore already have been freed. If the parent
/// token is not cancelled, the child token is still valid.
fn unregister_child(
&mut self,
mut child_state: NonNull<CancellationTokenState>,
current_child_state: StateSnapshot,
) {
let removed_child = {
// Remove the child toke from the parents linked list
let mut guard = self.synchronized.lock().unwrap();
if !guard.is_cancelled {
// Safety: Since the token was not cancelled, the child must
// still be in the list and valid.
let mut child_state = unsafe { child_state.as_mut() };
debug_assert!(child_state.snapshot().has_parent_ref);
if guard.first_child == Some(child_state.into()) {
guard.first_child = child_state.from_parent.next_peer;
}
// Safety: If peers wouldn't be valid anymore, they would try
// to remove themselves from the list. This would require locking
// the Mutex that we currently own.
unsafe {
if let Some(mut prev_peer) = child_state.from_parent.prev_peer {
prev_peer.as_mut().from_parent.next_peer =
child_state.from_parent.next_peer;
}
if let Some(mut next_peer) = child_state.from_parent.next_peer {
next_peer.as_mut().from_parent.prev_peer =
child_state.from_parent.prev_peer;
}
}
child_state.from_parent.prev_peer = None;
child_state.from_parent.next_peer = None;
// The child is no longer referenced by the parent, since we were able
// to remove its reference from the parents list.
true
} else {
// Do not touch the linked list anymore. If the parent is cancelled
// it will move all childs outside of the Mutex and manipulate
// the pointers there. Manipulating the pointers here too could
// lead to races. Therefore leave them just as as and let the
// parent deal with it. The parent will make sure to retain a
// reference to this state as long as it manipulates the list
// pointers. Therefore the pointers are not dangling.
false
}
};
if removed_child {
// If the token removed itself from the parents list, it can reset
// the the parent ref status. If it is isn't able to do so, because the
// parent removed it from the list, there is no need to do this.
// The parent ref acts as as another reference count. Therefore
// removing this reference can free the object.
// Safety: The token was in the list. This means the parent wasn't
// cancelled before, and the token must still be alive.
unsafe { child_state.as_mut().remove_parent_ref(current_child_state) };
}
// Decrement the refcount on the parent and free it if necessary
self.decrement_refcount(self.snapshot());
}
fn cancel(&self) {
// Move the state of the CancellationToken from `NotCancelled` to `Cancelling`
let mut current_state = self.snapshot();
let state_after_cancellation = loop {
if current_state.cancel_state != CancellationState::NotCancelled {
// Another task already initiated the cancellation
return;
}
let mut next_state = current_state;
next_state.cancel_state = CancellationState::Cancelling;
match self.state.compare_exchange(
current_state.pack(),
next_state.pack(),
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break next_state,
Err(actual) => current_state = StateSnapshot::unpack(actual),
}
};
// This task cancelled the token
// Take the task list out of the Token
// We do not want to cancel child token inside this lock. If one of the
// child tasks would have additional child tokens, we would recursively
// take locks.
// Doing this action has an impact if the child token is dropped concurrently:
// It will try to deregister itself from the parent task, but can not find
// itself in the task list anymore. Therefore it needs to assume the parent
// has extracted the list and will process it. It may not modify the list.
// This is OK from a memory safety perspective, since the parent still
// retains a reference to the child task until it finished iterating over
// it.
let mut first_child = {
let mut guard = self.synchronized.lock().unwrap();
// Save the cancellation also inside the Mutex
// This allows child tokens which want to detach themselves to detect
// that this is no longer required since the parent cleared the list.
guard.is_cancelled = true;
// Wakeup all waiters
// This happens inside the lock to make cancellation reliable
// If we would access waiters outside of the lock, the pointers
// may no longer be valid.
// Typically this shouldn't be an issue, since waking a task should
// only move it from the blocked into the ready state and not have
// further side effects.
// Use a reverse iterator, so that the oldest waiter gets
// scheduled first
guard.waiters.reverse_drain(|waiter| {
// We are not allowed to move the `Waker` out of the list node.
// The `Future` relies on the fact that the old `Waker` stays there
// as long as the `Future` has not completed in order to perform
// the `will_wake()` check.
// Therefore `wake_by_ref` is used instead of `wake()`
if let Some(handle) = &mut waiter.task {
handle.wake_by_ref();
}
// Mark the waiter to have been removed from the list.
waiter.state = PollState::Done;
});
guard.first_child.take()
};
while let Some(mut child) = first_child {
// Safety: We know this is a valid pointer since it is in our child pointer
// list. It can't have been freed in between, since we retain a a reference
// to each child.
let mut_child = unsafe { child.as_mut() };
// Get the next child and clean up list pointers
first_child = mut_child.from_parent.next_peer;
mut_child.from_parent.prev_peer = None;
mut_child.from_parent.next_peer = None;
// Cancel the child task
mut_child.cancel();
// Drop the parent reference. This `CancellationToken` is not interested
// in interacting with the child anymore.
// This is ONLY allowed once we promised not to touch the state anymore
// after this interaction.
mut_child.remove_parent_ref(mut_child.snapshot());
}
// The cancellation has completed
// At this point in time tasks which registered a wait node can be sure
// that this wait node already had been dequeued from the list without
// needing to inspect the list.
self.atomic_update_state(state_after_cancellation, |mut state| {
state.cancel_state = CancellationState::Cancelled;
state
});
}
/// Returns `true` if the `CancellationToken` had been cancelled
fn is_cancelled(&self) -> bool {
let current_state = self.snapshot();
current_state.cancel_state != CancellationState::NotCancelled
}
/// Registers a waiting task at the `CancellationToken`.
/// Safety: This method is only safe as long as the waiting waiting task
/// will properly unregister the wait node before it gets moved.
unsafe fn register(
&self,
wait_node: &mut ListNode<WaitQueueEntry>,
cx: &mut Context<'_>,
) -> Poll<()> {
debug_assert_eq!(PollState::New, wait_node.state);
let current_state = self.snapshot();
// Perform an optimistic cancellation check before. This is not strictly
// necessary since we also check for cancellation in the Mutex, but
// reduces the necessary work to be performed for tasks which already
// had been cancelled.
if current_state.cancel_state != CancellationState::NotCancelled {
return Poll::Ready(());
}
// So far the token is not cancelled. However it could be cancelld before
// we get the chance to store the `Waker`. Therfore we need to check
// for cancellation again inside the mutex.
let mut guard = self.synchronized.lock().unwrap();
if guard.is_cancelled {
// Cancellation was signalled
wait_node.state = PollState::Done;
Poll::Ready(())
} else {
// Added the task to the wait queue
wait_node.task = Some(cx.waker().clone());
wait_node.state = PollState::Waiting;
guard.waiters.add_front(wait_node);
Poll::Pending
}
}
fn check_for_cancellation(
&self,
wait_node: &mut ListNode<WaitQueueEntry>,
cx: &mut Context<'_>,
) -> Poll<()> {
debug_assert!(
wait_node.task.is_some(),
"Method can only be called after task had been registered"
);
let current_state = self.snapshot();
if current_state.cancel_state != CancellationState::NotCancelled {
// If the cancellation had been fully completed we know that our `Waker`
// is no longer registered at the `CancellationToken`.
// Otherwise the cancel call may or may not yet have iterated
// through the waiters list and removed the wait nodes.
// If it hasn't yet, we need to remove it. Otherwise an attempt to
// reuse the `wait_nodeĀ“ might get freed due to the `WaitForCancellationFuture`
// getting dropped before the cancellation had interacted with it.
if current_state.cancel_state != CancellationState::Cancelled {
self.unregister(wait_node);
}
Poll::Ready(())
} else {
// Check if we need to swap the `Waker`. This will make the check more
// expensive, since the `Waker` is synchronized through the Mutex.
// If we don't need to perform a `Waker` update, an atomic check for
// cancellation is sufficient.
let need_waker_update = wait_node
.task
.as_ref()
.map(|waker| waker.will_wake(cx.waker()))
.unwrap_or(true);
if need_waker_update {
let guard = self.synchronized.lock().unwrap();
if guard.is_cancelled {
// Cancellation was signalled. Since this cancellation signal
// is set inside the Mutex, the old waiter must already have
// been removed from the waiting list
debug_assert_eq!(PollState::Done, wait_node.state);
wait_node.task = None;
Poll::Ready(())
} else {
// The WaitForCancellationFuture is already in the queue.
// The CancellationToken can't have been cancelled,
// since this would change the is_cancelled flag inside the mutex.
// Therefore we just have to update the Waker. A follow-up
// cancellation will always use the new waker.
wait_node.task = Some(cx.waker().clone());
Poll::Pending
}
} else {
// Do nothing. If the token gets cancelled, this task will get
// woken again and can fetch the cancellation.
Poll::Pending
}
}
}
fn unregister(&self, wait_node: &mut ListNode<WaitQueueEntry>) {
debug_assert!(
wait_node.task.is_some(),
"waiter can not be active without task"
);
let mut guard = self.synchronized.lock().unwrap();
// WaitForCancellationFuture only needs to get removed if it has been added to
// the wait queue of the CancellationToken.
// This has happened in the PollState::Waiting case.
if let PollState::Waiting = wait_node.state {
// Safety: Due to the state, we know that the node must be part
// of the waiter list
if !unsafe { guard.waiters.remove(wait_node) } {
// Panic if the address isn't found. This can only happen if the contract was
// violated, e.g. the WaitQueueEntry got moved after the initial poll.
panic!("Future could not be removed from wait queue");
}
wait_node.state = PollState::Done;
}
wait_node.task = None;
}
}