blob: 9f65a7f6239a013fdf0c522e0d5f501e5006fd80 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use super::super::timer::{TimeWaker, TimerHandle, TimerHeap};
use super::{
instrumentation::{Collector, LocalCollector, WakeupReason},
packets::{PacketReceiver, PacketReceiverMap, ReceiverRegistration},
use crate::atomic_future::{AtomicFuture, AttemptPollResult};
use crossbeam::queue::SegQueue;
use fuchsia_sync::Mutex;
use fuchsia_zircon::{self as zx};
use rustc_hash::FxHashMap as HashMap;
use std::{
sync::atomic::{AtomicBool, AtomicI64, AtomicU16, AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Weak},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
u64, usize,
pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
/// The id of the main task, which is a virtual task that lives from construction
/// to destruction of the executor. The main task may correspond to multiple
/// main futures, in cases where the executor runs multiple times during its lifetime.
pub(crate) const MAIN_TASK_ID: usize = 0;
static EXECUTOR: RefCell<Option<(Arc<Inner>, TimerHeap)>> = RefCell::new(None)
pub(crate) fn with_local_timer_heap<F, R>(f: F) -> R
F: FnOnce(&mut TimerHeap) -> R,
EXECUTOR.with(|e| {
(f)(&mut e
.expect("can't get timer heap before fuchsia_async::Executor is initialized")
pub enum ExecutorTime {
enum PollReadyTasksResult {
// -- Helpers for threads_state below --
fn threads_sleeping(state: u16) -> u8 {
state as u8
fn threads_notified(state: u16) -> u8 {
(state >> 8) as u8
fn make_threads_state(sleeping: u8, notified: u8) -> u16 {
sleeping as u16 | ((notified as u16) << 8)
pub(super) struct Inner {
pub(super) port: zx::Port,
pub(super) done: AtomicBool,
is_local: bool,
receivers: Mutex<PacketReceiverMap<Arc<dyn PacketReceiver>>>,
task_count: AtomicUsize,
task_state: Mutex<TaskState>,
pub(super) ready_tasks: SegQueue<Arc<Task>>,
time: ExecutorTime,
pub(super) collector: Collector,
pub(super) source: Option<&'static Location<'static>>,
// The low byte is the number of threads currently sleeping. The high byte is the number of
// of wake-up notifications pending.
pub(super) threads_state: AtomicU16,
pub(super) num_threads: u8,
pub(super) polled: AtomicU64,
// Data that belongs to the user that can be accessed via EHandle::local(). See
// `TestExecutor::poll_until_stalled`.
pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
struct TaskState {
all_tasks: HashMap<usize, Arc<Task>>,
join_wakers: HashMap<usize, Waker>,
impl Inner {
#[cfg_attr(trace_level_logging, track_caller)]
pub fn new(time: ExecutorTime, is_local: bool, num_threads: u8) -> Self {
let source = Some(Location::caller());
let source = None;
let collector = Collector::new();
Inner {
port: zx::Port::create(),
done: AtomicBool::new(false),
receivers: Mutex::new(PacketReceiverMap::new()),
task_count: AtomicUsize::new(MAIN_TASK_ID + 1),
task_state: Mutex::new(TaskState {
all_tasks: HashMap::default(),
join_wakers: HashMap::default(),
ready_tasks: SegQueue::new(),
threads_state: AtomicU16::new(0),
polled: AtomicU64::new(0),
owner_data: Mutex::new(None),
pub fn set_local(self: Arc<Self>, timers: TimerHeap) {
EXECUTOR.with(|e| {
let mut e = e.borrow_mut();
assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
*e = Some((self, timers));
fn poll_ready_tasks(&self, local_collector: &mut LocalCollector<'_>) -> PollReadyTasksResult {
loop {
for _ in 0..16 {
let Some(task) = self.ready_tasks.pop() else {
return PollReadyTasksResult::NoneReady;
let complete = self.try_poll(&task);
if complete && == MAIN_TASK_ID {
return PollReadyTasksResult::MainTaskCompleted;
self.polled.fetch_add(1, Ordering::Relaxed);
// We didn't finish all the ready tasks. If there are sleeping threads, post a
// notification to wake one up.
let mut threads_state = self.threads_state.load(Ordering::Relaxed);
loop {
if threads_sleeping(threads_state) == 0 {
// All threads are awake now. Prevent starvation.
return PollReadyTasksResult::MoreReady;
if threads_notified(threads_state) >= threads_sleeping(threads_state) {
// All sleeping threads have been notified. Keep going and poll more tasks.
match self.try_notify(threads_state) {
Ok(()) => break,
Err(s) => threads_state = s,
#[cfg_attr(trace_level_logging, track_caller)]
pub fn spawn(self: &Arc<Self>, future: AtomicFuture<'static>) -> usize {
// Prevent a deadlock in `.all_tasks` when a task is spawned from a custom
// Drop impl while the executor is being torn down.
if self.done.load(Ordering::SeqCst) {
return usize::MAX;
let next_id = self.task_count.fetch_add(1, Ordering::Relaxed);
let task = Task::new(next_id, future, self.clone());
self.collector.task_created(next_id, task.source());
self.task_state.lock().all_tasks.insert(next_id, task.clone());
#[cfg_attr(trace_level_logging, track_caller)]
pub fn spawn_local<F: Future<Output = R> + 'static, R: 'static>(
self: &Arc<Self>,
future: F,
detached: bool,
) -> usize {
if !self.is_local {
"Error: called `spawn_local` on multithreaded executor. \
Use `spawn` or a `LocalExecutor` instead."
// SAFETY: We've confirmed that the futures here will never be used across multiple threads,
// so the Send requirements that `new_local` requires should be met.
self.spawn(unsafe { AtomicFuture::new_local(future, detached) })
/// Spawns the main future.
pub fn spawn_main(self: &Arc<Self>, future: AtomicFuture<'static>) {
let task = Task::new(MAIN_TASK_ID, future, self.clone());
self.collector.task_created(MAIN_TASK_ID, task.source());
self.task_state.lock().all_tasks.insert(MAIN_TASK_ID, task.clone()).is_none(),
"Existing main task"
pub fn notify_task_ready(&self) {
// Only post if there's no thread running (or soon to be running). If we happen to be
// running on a thread for this executor, then threads_state won't be equal to num_threads,
// which means notifications only get fired if this is from a non-async thread, or a thread
// that belongs to a different executor. We use SeqCst ordering here to make sure this load
// happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
let mut threads_state = self.threads_state.load(Ordering::SeqCst);
// We compare threads_state directly against self.num_threads (which means that
// notifications must be zero) because we only want to notify if there are no pending
// notifications and *all* threads are currently asleep.
while threads_state == self.num_threads as u16 {
match self.try_notify(threads_state) {
Ok(()) => break,
Err(s) => threads_state = s,
/// Tries to notify a thread to wake up. Returns threads_state if it fails.
fn try_notify(&self, old_threads_state: u16) -> Result<(), u16> {
old_threads_state + 0x100, // <- Add one to notifications.
.map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
pub fn wake_one_thread(&self) {
let mut threads_state = self.threads_state.load(Ordering::Relaxed);
let current_sleeping = threads_sleeping(threads_state);
if current_sleeping == 0 {
while threads_notified(threads_state) == 0
&& threads_sleeping(threads_state) >= current_sleeping
match self.try_notify(threads_state) {
Ok(()) => break,
Err(s) => threads_state = s,
pub fn notify_id(&self, id: u64) {
let up = zx::UserPacket::from_u8_array([0; 32]);
let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
if let Err(e) = self.port.queue(&packet) {
// TODO: logging
eprintln!("Failed to queue notify in port: {:?}", e);
pub fn deliver_packet(&self, key: usize, packet: zx::Packet) {
let receiver = match self.receivers.lock().get(key) {
// Clone the `Arc` so that we don't hold the lock
// any longer than absolutely necessary.
// The `receive_packet` impl may be arbitrarily complex.
Some(receiver) => receiver.clone(),
None => return,
pub fn now(&self) -> Time {
match &self.time {
ExecutorTime::RealTime => Time::from_zx(zx::Time::get_monotonic()),
ExecutorTime::FakeTime(t) => Time::from_nanos(t.load(Ordering::Relaxed)),
pub fn set_fake_time(&self, new: Time) {
match &self.time {
ExecutorTime::RealTime => {
panic!("Error: called `set_fake_time` on an executor using actual time.")
ExecutorTime::FakeTime(t) =>, Ordering::Relaxed),
pub fn is_real_time(&self) -> bool {
matches!(self.time, ExecutorTime::RealTime)
/// Must be called before `on_parent_drop`.
/// Done flag must be set before dropping packet receivers
/// so that future receivers that attempt to deregister themselves
/// know that it's okay if their entries are already missing.
pub fn mark_done(&self) {, Ordering::SeqCst);
// Make sure there's at least one notification outstanding per thread to wake up all
// workers. This might be more notifications than required, but this way we don't have to
// worry about races where tasks are just about to sleep; when a task receives the
// notification, it will check done and terminate.
let mut threads_state = self.threads_state.load(Ordering::Relaxed);
let num_threads = self.num_threads;
loop {
let notified = threads_notified(threads_state);
if notified >= num_threads {
match self.threads_state.compare_exchange_weak(
make_threads_state(threads_sleeping(threads_state), num_threads),
) {
Ok(_) => {
for _ in notified..num_threads {
Err(old) => threads_state = old,
/// Notes about the lifecycle of an Executor.
/// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
/// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
/// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
/// the port is dropped alongside the Executor as it should.
/// TODO( Ensure the port goes away with the executor.
/// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
/// "current" executor being set, and that's unset when the executor is dropped.
/// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
/// and point (b) is related to "what happens when I try to create a new receiver when there
/// is no executor".
/// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
/// storage [1]. And the reactor discourages usage of strong references to it by vending weak
/// references to it [2] instead of strong.
/// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
/// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
/// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
/// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
/// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
/// in that thread.
/// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
/// unregistered themselves when the executor drops. The choice to panic was made based on
/// patterns in fuchsia-async that may come to change:
/// - Executor vends strong references to itself and those references are *stored* by most
/// receiver implementations (as opposed to reached out on TLS every time).
/// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
/// easy to understand error to return when polling on an extinct executor.
/// - All receivers are implemented in this crate and well-known.
/// [1]:
/// [2]:
/// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
/// opaque non-clone-copy-or-send guard would be stronger than this. See:
pub fn on_parent_drop(&self) {
// Drop all tasks
let all_tasks = std::mem::take(&mut self.task_state.lock().all_tasks);
// Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
// as part of waking, there's an upgrade to a strong reference, so for a small amount of
// time `fasync::unblock` can hold a strong reference to a task which in turn holds the
// future for the task which in turn could hold references to receivers, which, if we did
// nothing about it, would trip the assertion below. For that reason, we forcibly drop the
// task futures here.
for (_, task) in all_tasks {
task.future.try_drop().expect("Failed to drop task");
// Drop all of the uncompleted tasks
while let Some(_) = self.ready_tasks.pop() {}
// Synthetic main task marked completed
self.collector.task_completed(MAIN_TASK_ID, self.source);
// Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
// happen. See discussion above.
// If you're here because you hit this panic check your code for:
// - A struct that contains a fuchsia_async::Executor NOT in the last position (last
// position gets dropped last:
// - A function scope that contains a fuchsia_async::Executor NOT in the first position
// (first position in function scope gets dropped last:
// - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
// contains a temporary (temporaries are dropped after the function scope:
// This usually
// looks like a `match` statement at the end of the function without a semicolon.
// - Storing channel and FIDL objects in static variables.
// - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
"receivers must not outlive their executor"
// Remove the thread-local executor set in `new`.
// The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
// the debugger needs to be updated.
// LINT.IfChange
pub fn worker_lifecycle<const UNTIL_STALLED: bool>(self: &Arc<Inner>) {
// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/
let mut local_collector = self.collector.create_local_collector();
loop {
// Keep track of whether we are considered asleep.
let mut sleeping = false;
match self.poll_ready_tasks(&mut local_collector) {
PollReadyTasksResult::NoneReady => {
// No more tasks, indicate we are sleeping. We use SeqCst ordering because we
// want this change here to happen *before* we check ready_tasks below. This
// synchronizes with notify_task_ready which is called *after* a task is added
// to ready_tasks.
self.threads_state.fetch_add(1, Ordering::SeqCst);
// Check ready tasks again. If a task got posted, wake up. This has to be done
// because a notification won't get sent if there is at least one active thread
// so there's a window between the preceding two lines where a task could be
// made ready and a notification is not sent because it looks like there is at
// least one thread running.
if self.ready_tasks.is_empty() {
sleeping = true;
} else {
// We lost a race, we're no longer sleeping.
self.threads_state.fetch_sub(1, Ordering::Relaxed);
PollReadyTasksResult::MoreReady => {}
PollReadyTasksResult::MainTaskCompleted => return,
// Check done here after updating threads_state to avoid shutdown races.
if self.done.load(Ordering::SeqCst) {
enum Work {
let mut notified = false;
let work = with_local_timer_heap(|timer_heap| {
// If we're considered awake choose INFINITE_PAST which will make the wait call
// return immediately. Otherwise choose a deadline from the timers.
let deadline = if !sleeping || UNTIL_STALLED {
} else {
timer_heap.next_deadline().map(|t| t.time()).unwrap_or(Time::INFINITE)
// into_zx: we are using real time, so the time is a monotonic time.
match self.port.wait(deadline.into_zx()) {
Ok(packet) => {
if packet.key() == TASK_READY_WAKEUP_ID {
notified = true;
} else {
Err(zx::Status::TIMED_OUT) => {
if !sleeping {
} else if UNTIL_STALLED {
// Fire timers if using fake time.
if !self.is_real_time() {
if let Some(deadline) = timer_heap.next_deadline().map(|t| t.time())
if deadline <= {
return Work::Timer(timer_heap.pop().unwrap());
} else {
Err(status) => {
panic!("Error calling port wait: {:?}", status);
let threads_state_sub = make_threads_state(sleeping as u8, notified as u8);
if threads_state_sub > 0 {
self.threads_state.fetch_sub(threads_state_sub, Ordering::Relaxed);
match work {
Work::Packet(packet) => {
self.deliver_packet(packet.key() as usize, packet);
Work::Timer(timer) => {
Work::None => {}
Work::Stalled => return,
/// Drops the main task.
/// # Safety
/// The caller must guarantee that the executor isn't running.
pub(super) unsafe fn drop_main_task(&self) {
if let Some(task) = self.task_state.lock().all_tasks.remove(&MAIN_TASK_ID) {
// Even though we've removed the task from active tasks, it could still be in
// pending_tasks, so we have to drop the future here. At time of writing, this is only
// used by the local executor and there could only be something in ready_tasks if
// there's a panic.
/// Polls for a join result for the given task ID.
/// # Safety
/// The caller must guarantee that `R` is the correct type.
pub unsafe fn poll_join_result<R>(&self, task_id: usize, cx: &mut Context<'_>) -> Poll<R> {
let mut tasks = self.task_state.lock();
let Some(task) = tasks.all_tasks.get(&task_id) else { return Poll::Pending };
if let Some(result) = task.future.take_result() {
} else {
tasks.join_wakers.insert(task_id, cx.waker().clone());
fn try_poll(&self, task: &Arc<Task>) -> bool {
// SAFETY: We meet the contract for RawWaker/RawWakerVtable.
let task_waker = unsafe {
Waker::from_raw(RawWaker::new(Arc::as_ptr(task) as *const (), &BORROWED_VTABLE))
match task.future.try_poll(&mut Context::from_waker(&task_waker)) {
AttemptPollResult::Yield => {
AttemptPollResult::IFinished => {
let mut waker = None;
let mut tasks = self.task_state.lock();
if !task.future.is_detached_or_cancelled() {
waker = tasks.join_wakers.remove(&;
} else if != MAIN_TASK_ID {
if let Some(waker) = waker {
AttemptPollResult::Cancelled => {
_ => false,
/// A handle to an executor.
pub struct EHandle {
pub(super) inner: Arc<Inner>,
impl fmt::Debug for EHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EHandle").field("port", &self.inner.port).finish()
impl EHandle {
/// Returns the thread-local executor.
/// # Panics
/// If called outside the context of an active async executor.
pub fn local() -> Self {
let inner = EXECUTOR
.with(|e| e.borrow().as_ref().map(|x| x.0.clone()))
.expect("Fuchsia Executor must be created first");
EHandle { inner }
pub(super) fn rm_local() {
EXECUTOR.with(|e| *e.borrow_mut() = None);
/// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
pub fn port(&self) -> &zx::Port {
/// Registers a `PacketReceiver` with the executor and returns a registration.
/// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
pub fn register_receiver<T>(&self, receiver: Arc<T>) -> ReceiverRegistration<T>
T: PacketReceiver,
let key = self.inner.receivers.lock().insert(receiver.clone()) as u64;
ReceiverRegistration { ehandle: self.clone(), key, receiver }
pub(crate) fn deregister_receiver(&self, key: u64) {
let key = key as usize;
let mut lock = self.inner.receivers.lock();
if lock.contains(key) {
} else {
// The executor is shutting down and already removed the entry.
assert!(self.inner.done.load(Ordering::SeqCst), "Missing receiver to deregister");
pub(crate) fn register_timer(time: Time, handle: TimerHandle) {
with_local_timer_heap(|timer_heap| {
timer_heap.add_timer(time, handle);
/// See `Inner::spawn`.
#[cfg_attr(trace_level_logging, track_caller)]
pub(crate) fn spawn<R: Send + 'static>(
future: impl Future<Output = R> + Send + 'static,
) -> usize {
self.inner.spawn(AtomicFuture::new(future, false))
/// Spawn a new task to be run on this executor.
/// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
/// may be run on either a singlethreaded or multithreaded executor.
#[cfg_attr(trace_level_logging, track_caller)]
pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
self.inner.spawn(AtomicFuture::new(future, true));
/// See `Inner::spawn_local`.
#[cfg_attr(trace_level_logging, track_caller)]
pub(crate) fn spawn_local<R: 'static>(
future: impl Future<Output = R> + 'static,
) -> usize {
self.inner.spawn_local(future, false)
/// Spawn a new task to be run on this executor.
/// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
/// have to be threads-safe (implement the `Send` trait). In return, this method requires that
/// this executor is a LocalExecutor.
#[cfg_attr(trace_level_logging, track_caller)]
pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
self.inner.spawn_local(future, true);
/// Marks the task as detached.
pub(crate) fn detach(&self, task_id: usize) {
let mut tasks = self.inner.task_state.lock();
if let Some(task) = tasks.all_tasks.get(&task_id) {
/// Cancels the task.
/// # Safety
/// The caller must guarantee that `R` is the correct type.
pub(crate) unsafe fn cancel<R>(&self, task_id: usize) -> Option<R> {
let mut tasks = self.inner.task_state.lock();
tasks.all_tasks.get(&task_id).and_then(|task| {
if task.future.cancel() {
/// See `Inner::poll_join_result`.
pub(crate) unsafe fn poll_join_result<R>(
task_id: usize,
cx: &mut Context<'_>,
) -> Poll<R> {
self.inner.poll_join_result(task_id, cx)
pub(super) struct Task {
id: usize,
future: AtomicFuture<'static>,
executor: Arc<Inner>,
source: &'static Location<'static>,
impl Task {
#[cfg_attr(trace_level_logging, track_caller)]
fn new(id: usize, future: AtomicFuture<'static>, executor: Arc<Inner>) -> Arc<Self> {
let this = Arc::new(Self {
source: Location::caller(),
// Take a weak reference now to be used as a waker.
let _ = Arc::downgrade(&this).into_raw();
fn wake(self: &Arc<Self>) {
if self.future.mark_ready() {
fn source(&self) -> Option<&'static Location<'static>> {
impl Drop for Task {
fn drop(&mut self) {
// SAFETY: This balances the `into_raw` in `new`.
unsafe {
// TODO( We might need to revisit this when pointer
// provenance lands.
// This vtable is used for the waker that exists for the lifetime of the task, which gets dropped
// above, so these functions never drop.
static BORROWED_VTABLE: RawWakerVTable =
RawWakerVTable::new(waker_clone, waker_wake_by_ref, waker_wake_by_ref, waker_noop);
static VTABLE: RawWakerVTable =
RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop);
fn waker_clone(weak_raw: *const ()) -> RawWaker {
// SAFETY: `weak_raw` comes from a previous call to `into_raw`.
let weak = ManuallyDrop::new(unsafe { Weak::from_raw(weak_raw as *const Task) });
RawWaker::new((*weak).clone().into_raw() as *const _, &VTABLE)
fn waker_wake(weak_raw: *const ()) {
// SAFETY: `weak_raw` comes from a previous call to `into_raw`.
if let Some(task) = unsafe { Weak::from_raw(weak_raw as *const Task) }.upgrade() {
fn waker_wake_by_ref(weak_raw: *const ()) {
// SAFETY: `weak_raw` comes from a previous call to `into_raw`.
if let Some(task) =
ManuallyDrop::new(unsafe { Weak::from_raw(weak_raw as *const Task) }).upgrade()
fn waker_noop(_weak_raw: *const ()) {}
fn waker_drop(weak_raw: *const ()) {
// SAFETY: `weak_raw` comes from a previous call to `into_raw`.
unsafe {
Weak::from_raw(weak_raw as *const Task);
mod tests {
use {
crate::{LocalExecutor, Task},
atomic::{AtomicU32, Ordering},
async fn yield_to_executor() {
let mut done = false;
poll_fn(|cx| {
if done {
} else {
done = true;
fn test_detach() {
let mut e = LocalExecutor::new();
e.run_singlethreaded(async {
let counter = Arc::new(AtomicU32::new(0));
let counter = counter.clone();
Task::spawn(async move {
for _ in 0..5 {
counter.fetch_add(1, Ordering::Relaxed);
while counter.load(Ordering::Relaxed) != 5 {
fn test_cancel() {
let mut e = LocalExecutor::new();
e.run_singlethreaded(async {
let ref_count = Arc::new(());
// First, just drop the task.
let ref_count = ref_count.clone();
let _ = Task::spawn(async move {
let _ref_count = ref_count;
let _: () = std::future::pending().await;
while Arc::strong_count(&ref_count) != 1 {
// Now try explicitly cancelling.
let task = {
let ref_count = ref_count.clone();
Task::spawn(async move {
let _ref_count = ref_count;
let _: () = std::future::pending().await;
assert_eq!(task.cancel(), None);
while Arc::strong_count(&ref_count) != 1 {
// Now cancel a task that has already finished.
let task = {
let ref_count = ref_count.clone();
Task::spawn(async move {
let _ref_count = ref_count;
// Wait for it to finish.
while Arc::strong_count(&ref_count) != 1 {
assert_eq!(task.cancel(), Some(()));