blob: 18c35845a6f66d596903a7640312b21eae6befc4 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::atomic_future::AtomicFuture;
use crossbeam::queue::SegQueue;
use fuchsia_zircon::{self as zx, AsHandleRef};
use futures::future::{self, FutureObj, LocalFutureObj};
use futures::task::{waker_ref, ArcWake, AtomicWaker, Spawn, SpawnError};
use futures::FutureExt;
use parking_lot::{Condvar, Mutex};
use pin_utils::pin_mut;
use std::cell::RefCell;
use std::collections::{BinaryHeap, HashMap};
use std::future::Future;
use std::marker::Unpin;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, Waker};
use std::{cmp, fmt, mem, ops, thread, u64, usize};
const EMPTY_WAKEUP_ID: u64 = u64::MAX;
const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
/// Spawn a new task to be run on the global executor.
///
/// Tasks spawned using this method must be threadsafe (implement the `Send` trait),
/// as they may be run on either a singlethreaded or multithreaded executor.
pub fn spawn<F>(future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Inner::spawn(&EHandle::local().inner, FutureObj::new(Box::new(future)));
}
/// Spawn a new task to be run on the global executor.
///
/// This is similar to the `spawn` function, but tasks spawned using this method
/// do not have to be threadsafe (implement the `Send` trait). In return, this method
/// requires that the current executor never be run in a multithreaded mode-- only
/// `run_singlethreaded` can be used.
pub fn spawn_local<F>(future: F)
where
F: Future<Output = ()> + 'static,
{
Inner::spawn_local(&EHandle::local().inner, LocalFutureObj::new(Box::new(future)));
}
/// A time relative to the executor's clock.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
#[repr(transparent)]
pub struct Time(zx::Time);
impl Time {
/// Return the current time according to the global executor.
///
/// This function requires that an executor has been set up.
pub fn now() -> Self {
EHandle::local().inner.now()
}
/// Compute a deadline for the time in the future that is the
/// given `Duration` away. Similarly to `zx::Time::after`,
/// saturates on overflow instead of wrapping around.
///
/// This function requires that an executor has been set up.
pub fn after(duration: zx::Duration) -> Self {
Self(zx::Time::from_nanos(Self::now().0.into_nanos().saturating_add(duration.into_nanos())))
}
/// Convert from `zx::Time`. This only makes sense if the time is
/// taken from the same source (for the real clock, this is
/// `zx::ClockId::Monotonic`).
pub fn from_zx(t: zx::Time) -> Self {
Time(t)
}
/// Convert into `zx::Time`. For the real clock, this will be a
/// monotonic time.
pub fn into_zx(self) -> zx::Time {
self.0
}
/// Convert from nanoseconds.
pub fn from_nanos(nanos: i64) -> Self {
Self::from_zx(zx::Time::from_nanos(nanos))
}
/// Convert to nanoseconds.
pub fn into_nanos(self) -> i64 {
self.0.into_nanos()
}
/// The maximum time.
pub const INFINITE: Time = Time(zx::Time::INFINITE);
/// The minimum time.
pub const INFINITE_PAST: Time = Time(zx::Time::INFINITE_PAST);
}
impl From<zx::Time> for Time {
fn from(t: zx::Time) -> Time {
Time(t)
}
}
impl From<Time> for zx::Time {
fn from(t: Time) -> zx::Time {
t.0
}
}
impl ops::Add<zx::Duration> for Time {
type Output = Time;
fn add(self, d: zx::Duration) -> Time {
Time(self.0 + d)
}
}
impl ops::Add<Time> for zx::Duration {
type Output = Time;
fn add(self, t: Time) -> Time {
Time(self + t.0)
}
}
impl ops::Sub<zx::Duration> for Time {
type Output = Time;
fn sub(self, d: zx::Duration) -> Time {
Time(self.0 - d)
}
}
impl ops::Sub<Time> for Time {
type Output = zx::Duration;
fn sub(self, t: Time) -> zx::Duration {
self.0 - t.0
}
}
impl ops::AddAssign<zx::Duration> for Time {
fn add_assign(&mut self, d: zx::Duration) {
self.0.add_assign(d)
}
}
impl ops::SubAssign<zx::Duration> for Time {
fn sub_assign(&mut self, d: zx::Duration) {
self.0.sub_assign(d)
}
}
/// An extension trait to provide `after_now` on `zx::Duration`.
pub trait DurationExt {
/// Return a `Time` which is a `Duration` after the current time.
/// `duration.after_now()` is equivalent to `Time::after(duration)`.
///
/// This method requires that an executor has been set up.
fn after_now(self) -> Time;
}
impl DurationExt for zx::Duration {
fn after_now(self) -> Time {
Time::after(self)
}
}
/// A trait for handling the arrival of a packet on a `zx::Port`.
///
/// This trait should be implemented by users who wish to write their own
/// types which receive asynchronous notifications from a `zx::Port`.
/// Implementors of this trait generally contain a `futures::task::AtomicWaker` which
/// is used to wake up the task which can make progress due to the arrival of
/// the packet.
///
/// `PacketReceiver`s should be registered with a `Core` using the
/// `register_receiver` method on `Core`, `Handle`, or `Remote`.
/// Upon registration, users will receive a `ReceiverRegistration`
/// which provides `key` and `port` methods. These methods can be used to wait on
/// asynchronous signals.
///
/// Note that `PacketReceiver`s may receive false notifications intended for a
/// previous receiver, and should handle these gracefully.
pub trait PacketReceiver: Send + Sync + 'static {
/// Receive a packet when one arrives.
fn receive_packet(&self, packet: zx::Packet);
}
pub(crate) fn need_signal(
cx: &mut Context<'_>,
task: &AtomicWaker,
atomic_signals: &AtomicU32,
signal: zx::Signals,
clear_closed: bool,
handle: zx::HandleRef<'_>,
port: &zx::Port,
key: u64,
) -> Result<(), zx::Status> {
const OBJECT_PEER_CLOSED: zx::Signals = zx::Signals::OBJECT_PEER_CLOSED;
task.register(cx.waker());
let mut clear_signals = signal;
if clear_closed {
clear_signals |= OBJECT_PEER_CLOSED;
}
let old = zx::Signals::from_bits_truncate(
atomic_signals.fetch_and(!clear_signals.bits(), Ordering::SeqCst),
);
// We only need to schedule a new packet if one isn't already scheduled.
// If the bits were already false, a packet was already scheduled.
let was_signal = old.contains(signal);
let was_closed = old.contains(OBJECT_PEER_CLOSED);
if was_closed || was_signal {
let mut signals_to_schedule = zx::Signals::empty();
if was_signal {
signals_to_schedule |= signal;
}
if clear_closed && was_closed {
signals_to_schedule |= OBJECT_PEER_CLOSED
};
schedule_packet(handle, port, key, signals_to_schedule)?;
}
if was_closed && !clear_closed {
// We just missed a channel close-- go around again.
cx.waker().wake_by_ref();
}
Ok(())
}
pub(crate) fn schedule_packet(
handle: zx::HandleRef<'_>,
port: &zx::Port,
key: u64,
signals: zx::Signals,
) -> Result<(), zx::Status> {
handle.wait_async_handle(port, key, signals, zx::WaitAsyncOpts::Once)
}
/// A registration of a `PacketReceiver`.
/// When dropped, it will automatically deregister the `PacketReceiver`.
// NOTE: purposefully does not implement `Clone`.
#[derive(Debug)]
pub struct ReceiverRegistration<T: PacketReceiver> {
receiver: Arc<T>,
ehandle: EHandle,
key: u64,
}
impl<T> ReceiverRegistration<T>
where
T: PacketReceiver,
{
/// The key with which `Packet`s destined for this receiver should be sent on the `zx::Port`.
pub fn key(&self) -> u64 {
self.key
}
/// The internal `PacketReceiver`.
pub fn receiver(&self) -> &T {
&*self.receiver
}
/// The `zx::Port` on which packets destined for this `PacketReceiver` should be queued.
pub fn port(&self) -> &zx::Port {
self.ehandle.port()
}
}
impl<T: PacketReceiver> Deref for ReceiverRegistration<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.receiver()
}
}
impl<T> Drop for ReceiverRegistration<T>
where
T: PacketReceiver,
{
fn drop(&mut self) {
self.ehandle.deregister_receiver(self.key);
}
}
/// A port-based executor for Fuchsia OS.
// NOTE: intentionally does not implement `Clone`.
pub struct Executor {
inner: Arc<Inner>,
// A packet that has been dequeued but not processed. This is used by `run_one_step`.
next_packet: Option<zx::Packet>,
}
impl fmt::Debug for Executor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Executor").field("port", &self.inner.port).finish()
}
}
type TimerHeap = BinaryHeap<TimeWaker>;
thread_local!(
static EXECUTOR: RefCell<Option<(Arc<Inner>, TimerHeap)>> = RefCell::new(None)
);
fn with_local_timer_heap<F, R>(f: F) -> R
where
F: FnOnce(&mut TimerHeap) -> R,
{
EXECUTOR.with(|e| {
(f)(&mut e
.borrow_mut()
.as_mut()
.expect("can't get timer heap before fuchsia_async::Executor is initialized")
.1)
})
}
impl Executor {
fn new_with_time(time: ExecutorTime) -> Result<Self, zx::Status> {
let executor = Executor {
inner: Arc::new(Inner {
port: zx::Port::create()?,
done: AtomicBool::new(false),
threadiness: Threadiness::default(),
threads: Mutex::new(Vec::new()),
receivers: Mutex::new(PacketReceiverMap::new()),
ready_tasks: SegQueue::new(),
time: time,
}),
next_packet: None,
};
executor.ehandle().set_local(TimerHeap::new());
Ok(executor)
}
/// Create a new executor running with actual time.
pub fn new() -> Result<Self, zx::Status> {
Self::new_with_time(ExecutorTime::RealTime)
}
/// Create a new executor running with fake time.
pub fn new_with_fake_time() -> Result<Self, zx::Status> {
Self::new_with_time(ExecutorTime::FakeTime(AtomicI64::new(
Time::INFINITE_PAST.into_nanos(),
)))
}
/// Return the current time according to the executor.
pub fn now(&self) -> Time {
self.inner.now()
}
/// Set the fake time to a given value.
pub fn set_fake_time(&self, t: Time) {
self.inner.set_fake_time(t)
}
/// Return a handle to the executor.
pub fn ehandle(&self) -> EHandle {
EHandle { inner: self.inner.clone() }
}
fn singlethreaded_main_task_wake(&self) -> Waker {
futures::task::waker(Arc::new(SingleThreadedMainTaskWake(self.inner.clone())))
}
/// Run a single future to completion on a single thread.
// Takes `&mut self` to ensure that only one thread-manager is running at a time.
pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
where
F: Future,
{
self.inner
.require_real_time()
.expect("Error: called `run_singlethreaded` on an executor using fake time");
if let Some(_) = self.next_packet {
panic!("Error: called `run_singlethreaded` on an executor with a packet waiting");
}
pin_mut!(main_future);
let waker = self.singlethreaded_main_task_wake();
let main_cx = &mut Context::from_waker(&waker);
let mut res = main_future.as_mut().poll(main_cx);
loop {
if let Poll::Ready(res) = res {
return res;
}
let packet = with_local_timer_heap(|timer_heap| {
let deadline = next_deadline(timer_heap).map(|t| t.time).unwrap_or(Time::INFINITE);
// into_zx: we are using real time, so the time is a monotonic time.
match self.inner.port.wait(deadline.into_zx()) {
Ok(packet) => Some(packet),
Err(zx::Status::TIMED_OUT) => {
let time_waker = timer_heap.pop().unwrap();
time_waker.wake();
None
}
Err(status) => {
panic!("Error calling port wait: {:?}", status);
}
}
});
if let Some(packet) = packet {
match packet.key() {
EMPTY_WAKEUP_ID => {
res = main_future.as_mut().poll(main_cx);
}
TASK_READY_WAKEUP_ID => self.inner.poll_ready_tasks(),
receiver_key => {
self.inner.deliver_packet(receiver_key as usize, packet);
}
}
}
}
}
/// PollResult the future. If it is not ready, dispatch available packets and possibly try again.
/// Timers will not fire. Never blocks.
///
/// This function is for testing. DO NOT use this function in tests or applications that
/// involve any interaction with other threads or processes, as those interactions
/// may become stalled waiting for signals from "the outside world" which is beyond
/// the knowledge of the executor.
///
/// Unpin: this function requires all futures to be `Unpin`able, so any `!Unpin`
/// futures must first be pinned using the `pin_mut!` macro from the `pin-utils` crate.
pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
where
F: Future + Unpin,
{
self.wake_main_future();
while let NextStep::NextPacket = self.next_step(/*fire_timers:*/ false) {
// Will not fail, because NextPacket means there is a
// packet ready to be processed.
let res = self.consume_packet(main_future);
if res.is_ready() {
return res;
}
}
Poll::Pending
}
/// Schedule the main future for being woken up. This is useful in conjunction with
/// `run_one_step`.
pub fn wake_main_future(&mut self) {
self.inner.notify_empty()
}
/// Run one iteration of the loop: dispatch the first available packet or timer. Returns `None`
/// if nothing has been dispatched, `Some(Poll::Pending)` if execution made progress but the
/// main future has not completed, and `Some(Poll::Ready(_))` if the main future has completed
/// at this step.
///
/// For the main future to run, `wake_main_future` needs to have been called first.
/// This will fire timers that are in the past, but will not advance the executor's time.
///
/// Unpin: this function requires all futures to be `Unpin`able, so any `!Unpin`
/// futures must first be pinned using the `pin_mut!` macro from the `pin-utils` crate.
///
/// This function is meant to be used for reproducible integration tests: multiple async
/// processes can be run in a controlled way, dispatching events one at a time and randomly
/// (but reproducibly) choosing which process gets to advance at each step.
pub fn run_one_step<F>(&mut self, main_future: &mut F) -> Option<Poll<F::Output>>
where
F: Future + Unpin,
{
match self.next_step(/*fire_timers:*/ true) {
NextStep::WaitUntil(_) => None,
NextStep::NextPacket => {
// Will not fail because NextPacket means there is a
// packet ready to be processed.
Some(self.consume_packet(main_future))
}
NextStep::NextTimer => {
let next_timer = with_local_timer_heap(|timer_heap| {
// unwrap: will not fail because NextTimer
// guarantees there is a timer in the heap.
timer_heap.pop().unwrap()
});
next_timer.wake();
Some(Poll::Pending)
}
}
}
/// Consumes a packet that has already been dequeued from the port.
/// This must only be called when there is a packet available.
fn consume_packet<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
where
F: Future + Unpin,
{
let packet =
self.next_packet.take().expect("consume_packet called but no packet available");
match packet.key() {
EMPTY_WAKEUP_ID => self.poll_main_future(main_future),
TASK_READY_WAKEUP_ID => {
if let Some(task) = self.inner.ready_tasks.try_pop() {
let lw = waker_ref(&task);
task.future.try_poll(&mut Context::from_waker(&lw));
};
Poll::Pending
}
receiver_key => {
self.inner.deliver_packet(receiver_key as usize, packet);
Poll::Pending
}
}
}
fn poll_main_future<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
where
F: Future + Unpin,
{
let waker = self.singlethreaded_main_task_wake();
let main_cx = &mut Context::from_waker(&waker);
main_future.poll_unpin(main_cx)
}
fn next_step(&mut self, fire_timers: bool) -> NextStep {
// If a packet is queued from a previous call to next_step, it must be executed first.
if let Some(_) = self.next_packet {
return NextStep::NextPacket;
}
// If we are past a deadline, run the corresponding timer.
let next_deadline = with_local_timer_heap(|timer_heap| {
next_deadline(timer_heap).map(|t| t.time).unwrap_or(Time::INFINITE)
});
if fire_timers && next_deadline <= self.inner.now() {
NextStep::NextTimer
} else {
// Try to unqueue a packet from the port.
match self.inner.port.wait(zx::Time::INFINITE_PAST) {
Ok(packet) => {
self.next_packet = Some(packet);
NextStep::NextPacket
}
Err(zx::Status::TIMED_OUT) => NextStep::WaitUntil(next_deadline),
Err(status) => {
panic!("Error calling port wait: {:?}", status);
}
}
}
}
/// Return `Ready` if the executor has work to do, or `Waiting(next_deadline)` if there will be
/// no work to do before `next_deadline` or an external event.
///
/// If this returns `Ready`, `run_one_step` will return `Some(_)`. If there is no pending packet
/// or timer, `Waiting(Time::INFINITE)` is returned.
pub fn is_waiting(&mut self) -> WaitState {
match self.next_step(/*fire_timers:*/ true) {
NextStep::NextPacket | NextStep::NextTimer => WaitState::Ready,
NextStep::WaitUntil(t) => WaitState::Waiting(t),
}
}
/// Wake all tasks waiting for expired timers, and return `true` if any task was woken.
///
/// This is intended for use in test code in conjunction with fake time.
pub fn wake_expired_timers(&mut self) -> bool {
let now = self.now();
with_local_timer_heap(|timer_heap| {
let mut ret = false;
while let Some(waker) = next_deadline(timer_heap).filter(|waker| waker.time <= now) {
waker.wake();
timer_heap.pop();
ret = true;
}
ret
})
}
/// Wake up the next task waiting for a timer, if any, and return the time for which the
/// timer was scheduled.
///
/// This is intended for use in test code in conjunction with `run_until_stalled`.
/// For example, here is how one could test that the Timer future fires after the given
/// timeout:
///
/// let deadline = 5.seconds().after_now();
/// let mut future = Timer::<Never>::new(deadline);
/// assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
/// assert_eq!(Some(deadline), exec.wake_next_timer());
/// assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
pub fn wake_next_timer(&mut self) -> Option<Time> {
with_local_timer_heap(|timer_heap| {
let deadline = next_deadline(timer_heap).map(|waker| {
waker.wake();
waker.time
});
if deadline.is_some() {
timer_heap.pop();
}
deadline
})
}
/// Run a single future to completion using multiple threads.
// Takes `&mut self` to ensure that only one thread-manager is running at a time.
pub fn run<F>(&mut self, future: F, num_threads: usize) -> F::Output
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.inner.require_real_time().expect("Error: called `run` on an executor using fake time");
self.inner.threadiness.require_multithreaded().expect(
"Error: called `run` on executor after using `spawn_local`. \
Use `run_singlethreaded` instead.",
);
if let Some(_) = self.next_packet {
panic!("Error: called `run` on an executor with a packet waiting");
}
let pair = Arc::new((Mutex::new(None), Condvar::new()));
let pair2 = pair.clone();
// Spawn a future which will set the result upon completion.
Inner::spawn(
&self.inner,
FutureObj::new(Box::new(future.then(move |fut_result| {
let (lock, cvar) = &*pair2;
let mut result = lock.lock();
*result = Some(fut_result);
cvar.notify_one();
future::ready(())
}))),
);
// Start worker threads, handing off timers from the current thread.
self.inner.done.store(false, Ordering::SeqCst);
with_local_timer_heap(|timer_heap| {
let timer_heap = mem::replace(timer_heap, TimerHeap::new());
self.create_worker_threads(num_threads, Some(timer_heap));
});
// Wait until the signal the future has completed.
let (lock, cvar) = &*pair;
let mut result = lock.lock();
while result.is_none() {
cvar.wait(&mut result);
}
// Spin down worker threads
self.inner.done.store(true, Ordering::SeqCst);
self.join_all();
// Unwrap is fine because of the check to `is_none` above.
result.take().unwrap()
}
/// Add `num_workers` worker threads to the executor's thread pool.
/// `timers`: timers from the "master" thread which would otherwise be lost.
fn create_worker_threads(&self, num_workers: usize, mut timers: Option<TimerHeap>) {
let mut threads = self.inner.threads.lock();
for _ in 0..num_workers {
threads.push(self.new_worker(timers.take()));
}
}
fn join_all(&self) {
let mut threads = self.inner.threads.lock();
// Send a user packet to wake up all the threads
for _thread in threads.iter() {
self.inner.notify_empty();
}
// Join the worker threads
for thread in threads.drain(..) {
thread.join().expect("Couldn't join worker thread.");
}
}
fn new_worker(&self, timers: Option<TimerHeap>) -> thread::JoinHandle<()> {
let inner = self.inner.clone();
thread::spawn(move || Self::worker_lifecycle(inner, timers))
}
fn worker_lifecycle(inner: Arc<Inner>, timers: Option<TimerHeap>) {
let executor: EHandle = EHandle { inner: inner.clone() };
executor.set_local(timers.unwrap_or(TimerHeap::new()));
loop {
if inner.done.load(Ordering::SeqCst) {
EHandle::rm_local();
return;
}
let packet = with_local_timer_heap(|timer_heap| {
let deadline = next_deadline(timer_heap).map(|t| t.time).unwrap_or(Time::INFINITE);
// into_zx: we are using real time, so the time is a monotonic time.
match inner.port.wait(deadline.into_zx()) {
Ok(packet) => Some(packet),
Err(zx::Status::TIMED_OUT) => {
let time_waker = timer_heap.pop().unwrap();
time_waker.wake();
None
}
Err(status) => {
panic!("Error calling port wait: {:?}", status);
}
}
});
if let Some(packet) = packet {
match packet.key() {
EMPTY_WAKEUP_ID => {}
TASK_READY_WAKEUP_ID => inner.poll_ready_tasks(),
receiver_key => {
inner.deliver_packet(receiver_key as usize, packet);
}
}
}
}
}
}
enum NextStep {
WaitUntil(Time),
NextPacket,
NextTimer,
}
/// Indicates whether the executor can run, or is stuck waiting.
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub enum WaitState {
/// The executor can run immediately.
Ready,
/// The executor will wait for the given time or an external event.
Waiting(Time),
}
fn next_deadline(heap: &mut TimerHeap) -> Option<&TimeWaker> {
while is_defunct_timer(heap.peek()) {
heap.pop();
}
heap.peek()
}
fn is_defunct_timer(timer: Option<&TimeWaker>) -> bool {
match timer {
None => false,
Some(timer) => timer.waker_and_bool.upgrade().is_none(),
}
}
// Since there are no other threads running, we don't have to use the EMPTY_WAKEUP_ID,
// so instead we save it for use as the main task wakeup id.
struct SingleThreadedMainTaskWake(Arc<Inner>);
impl ArcWake for SingleThreadedMainTaskWake {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.0.notify_empty();
}
}
impl Drop for Executor {
fn drop(&mut self) {
// Done flag must be set before dropping packet receivers
// so that future receivers that attempt to deregister themselves
// know that it's okay if their entries are already missing.
self.inner.done.store(true, Ordering::SeqCst);
// Wake the threads so they can kill themselves.
self.join_all();
// Drop all of the packet receivers
self.inner.receivers.lock().clear();
// Drop all of the uncompleted tasks
while let Some(_) = self.inner.ready_tasks.try_pop() {}
// Remove the thread-local executor set in `new`.
EHandle::rm_local();
}
}
/// A handle to an executor.
#[derive(Clone)]
pub struct EHandle {
inner: Arc<Inner>,
}
impl fmt::Debug for EHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EHandle").field("port", &self.inner.port).finish()
}
}
impl Spawn for EHandle {
fn spawn_obj(&self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> {
Inner::spawn(&self.inner, f);
Ok(())
}
}
impl EHandle {
/// Returns the thread-local executor.
pub fn local() -> Self {
let inner = EXECUTOR
.with(|e| e.borrow().as_ref().map(|x| x.0.clone()))
.expect("Fuchsia Executor must be created first");
EHandle { inner }
}
fn set_local(self, timers: TimerHeap) {
let inner = self.inner.clone();
EXECUTOR.with(|e| {
let mut e = e.borrow_mut();
assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
*e = Some((inner, timers));
});
}
fn rm_local() {
EXECUTOR.with(|e| *e.borrow_mut() = None);
}
/// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
pub fn port(&self) -> &zx::Port {
&self.inner.port
}
/// Registers a `PacketReceiver` with the executor and returns a registration.
/// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
pub fn register_receiver<T>(&self, receiver: Arc<T>) -> ReceiverRegistration<T>
where
T: PacketReceiver,
{
let key = self.inner.receivers.lock().insert(receiver.clone()) as u64;
ReceiverRegistration { ehandle: self.clone(), key, receiver }
}
fn deregister_receiver(&self, key: u64) {
let key = key as usize;
let mut lock = self.inner.receivers.lock();
if lock.contains(key) {
lock.remove(key);
} else {
// The executor is shutting down and already removed the entry.
assert!(self.inner.done.load(Ordering::SeqCst), "Missing receiver to deregister");
}
}
pub(crate) fn register_timer(
&self,
time: Time,
waker_and_bool: &Arc<(AtomicWaker, AtomicBool)>,
) {
with_local_timer_heap(|timer_heap| {
let waker_and_bool = Arc::downgrade(waker_and_bool);
timer_heap.push(TimeWaker { time, waker_and_bool })
})
}
}
/// The executor has not been run in multithreaded mode and no thread-unsafe
/// futures have been spawned.
const THREADINESS_ANY: usize = 0;
/// The executor has not been run in multithreaded mode, but thread-unsafe
/// futures have been spawned, so it cannot ever be run in multithreaded mode.
const THREADINESS_SINGLE: usize = 1;
/// The executor has been run in multithreaded mode.
/// No thread-unsafe futures can be spawned.
const THREADINESS_MULTI: usize = 2;
/// Tracks the multithreaded-compatibility state of the executor.
struct Threadiness(AtomicUsize);
impl Default for Threadiness {
fn default() -> Self {
Threadiness(AtomicUsize::new(THREADINESS_ANY))
}
}
impl Threadiness {
fn try_become(&self, target: usize) -> Result<(), ()> {
match self.0.compare_exchange(
/* current */ THREADINESS_ANY,
/* new */ target,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => Ok(()),
Err(x) if x == target => Ok(()),
Err(_) => Err(()),
}
}
/// Attempts to switch the threadiness to singlethreaded-only mode.
/// Will fail iff a prior call to `require_multithreaded` was made.
fn require_singlethreaded(&self) -> Result<(), ()> {
self.try_become(THREADINESS_SINGLE)
}
/// Attempts to switch the threadiness to multithreaded mode.
/// Will fail iff a prior call to `require_singlethreaded` was made.
fn require_multithreaded(&self) -> Result<(), ()> {
self.try_become(THREADINESS_MULTI)
}
}
enum ExecutorTime {
RealTime,
FakeTime(AtomicI64),
}
// Simple slab::Slab replacement that doesn't re-use keys
// TODO(43101): figure out how to safely cancel async waits so we can re-use keys again.
struct PacketReceiverMap<T> {
next_key: usize,
mapping: HashMap<usize, T>,
}
impl<T> PacketReceiverMap<T> {
fn new() -> Self {
Self { next_key: 0, mapping: HashMap::new() }
}
fn clear(&mut self) {
self.mapping.clear()
}
fn get(&self, key: usize) -> Option<&T> {
self.mapping.get(&key)
}
fn insert(&mut self, val: T) -> usize {
let key = self.next_key;
self.next_key = self.next_key.checked_add(1).expect("ran out of keys");
self.mapping.insert(key, val);
key
}
fn remove(&mut self, key: usize) -> T {
self.mapping.remove(&key).unwrap_or_else(|| panic!("invalid key"))
}
fn contains(&self, key: usize) -> bool {
self.mapping.contains_key(&key)
}
}
struct Inner {
port: zx::Port,
done: AtomicBool,
threadiness: Threadiness,
threads: Mutex<Vec<thread::JoinHandle<()>>>,
receivers: Mutex<PacketReceiverMap<Arc<dyn PacketReceiver>>>,
ready_tasks: SegQueue<Arc<Task>>,
time: ExecutorTime,
}
struct TimeWaker {
time: Time,
waker_and_bool: Weak<(AtomicWaker, AtomicBool)>,
}
impl TimeWaker {
fn wake(&self) {
if let Some(wb) = self.waker_and_bool.upgrade() {
wb.1.store(true, Ordering::SeqCst);
wb.0.wake();
}
}
}
impl Ord for TimeWaker {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.time.cmp(&other.time).reverse() // Reverse to get min-heap rather than max
}
}
impl PartialOrd for TimeWaker {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Eq for TimeWaker {}
// N.B.: two TimerWakers can be equal even if they don't have the same
// waker_and_bool. This is fine since BinaryHeap doesn't deduplicate.
impl PartialEq for TimeWaker {
fn eq(&self, other: &Self) -> bool {
self.time == other.time
}
}
impl Inner {
fn poll_ready_tasks(&self) {
// TODO: loop but don't starve
if let Some(task) = self.ready_tasks.try_pop() {
let w = waker_ref(&task);
task.future.try_poll(&mut Context::from_waker(&w));
}
}
fn spawn(arc_self: &Arc<Self>, future: FutureObj<'static, ()>) {
let task = Arc::new(Task { future: AtomicFuture::new(future), executor: arc_self.clone() });
arc_self.ready_tasks.push(task);
arc_self.notify_task_ready();
}
fn spawn_local(arc_self: &Arc<Self>, future: LocalFutureObj<'static, ()>) {
arc_self.threadiness.require_singlethreaded().expect(
"Error: called `spawn_local` after calling `run` on executor. \
Use `spawn` or `run_singlethreaded` instead.",
);
Inner::spawn(
arc_self,
// Unsafety: we've confirmed that the boxed futures here will never be used
// across multiple threads, so we can safely convert from a non-`Send`able
// future to a `Send`able one.
unsafe { future.into_future_obj() },
)
}
fn notify_task_ready(&self) {
// TODO: optimize so that this function doesn't push new items onto
// the queue if all worker threads are already awake
self.notify_id(TASK_READY_WAKEUP_ID);
}
fn notify_empty(&self) {
self.notify_id(EMPTY_WAKEUP_ID);
}
fn notify_id(&self, id: u64) {
let up = zx::UserPacket::from_u8_array([0; 32]);
let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
if let Err(e) = self.port.queue(&packet) {
// TODO: logging
eprintln!("Failed to queue notify in port: {:?}", e);
}
}
fn deliver_packet(&self, key: usize, packet: zx::Packet) {
let receiver = match self.receivers.lock().get(key) {
// Clone the `Arc` so that we don't hold the lock
// any longer than absolutely necessary.
// The `receive_packet` impl may be arbitrarily complex.
Some(receiver) => receiver.clone(),
None => return,
};
receiver.receive_packet(packet);
}
fn now(&self) -> Time {
match &self.time {
ExecutorTime::RealTime => Time::from_zx(zx::Time::get(zx::ClockId::Monotonic)),
ExecutorTime::FakeTime(t) => Time::from_nanos(t.load(Ordering::Relaxed)),
}
}
fn set_fake_time(&self, new: Time) {
match &self.time {
ExecutorTime::RealTime => {
panic!("Error: called `advance_fake_time` on an executor using actual time.")
}
ExecutorTime::FakeTime(t) => t.store(new.into_nanos(), Ordering::Relaxed),
}
}
fn require_real_time(&self) -> Result<(), ()> {
match self.time {
ExecutorTime::RealTime => Ok(()),
ExecutorTime::FakeTime(_) => Err(()),
}
}
}
struct Task {
future: AtomicFuture,
executor: Arc<Inner>,
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.executor.ready_tasks.push(arc_self.clone());
arc_self.executor.notify_task_ready();
}
}
#[cfg(test)]
mod tests {
use core::task::{Context, Waker};
use fuchsia_zircon::{self as zx, AsHandleRef, DurationNum};
use futures::{future::poll_fn, Future};
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::task::Poll;
use super::*;
use crate::{on_signals::OnSignals, timer::Timer};
fn time_operations_param(zxt1: zx::Time, zxt2: zx::Time, d: zx::Duration) {
let t1 = Time::from_zx(zxt1);
let t2 = Time::from_zx(zxt2);
assert_eq!(t1.into_zx(), zxt1);
assert_eq!(Time::from_zx(zx::Time::INFINITE), Time::INFINITE);
assert_eq!(Time::from_zx(zx::Time::INFINITE_PAST), Time::INFINITE_PAST);
assert_eq!(zxt1 - zxt2, t1 - t2);
assert_eq!(zxt1 + d, (t1 + d).into_zx());
assert_eq!(d + zxt1, (d + t1).into_zx());
assert_eq!(zxt1 - d, (t1 - d).into_zx());
let mut zxt = zxt1;
let mut t = t1;
t += d;
zxt += d;
assert_eq!(zxt, t.into_zx());
t -= d;
zxt -= d;
assert_eq!(zxt, t.into_zx());
}
#[test]
fn time_operations() {
time_operations_param(zx::Time::from_nanos(0), zx::Time::from_nanos(1000), 12.seconds());
time_operations_param(
zx::Time::from_nanos(-100000),
zx::Time::from_nanos(65324),
(-785).hours(),
);
}
#[test]
fn time_now_real_time() {
let _executor = Executor::new().unwrap();
let t1 = zx::Time::after(0.seconds());
let t2 = Time::now().into_zx();
let t3 = zx::Time::after(0.seconds());
assert!(t1 <= t2);
assert!(t2 <= t3);
}
#[test]
fn time_now_fake_time() {
let executor = Executor::new_with_fake_time().unwrap();
let t1 = Time::from_zx(zx::Time::from_nanos(0));
executor.set_fake_time(t1);
assert_eq!(Time::now(), t1);
let t2 = Time::from_zx(zx::Time::from_nanos(1000));
executor.set_fake_time(t2);
assert_eq!(Time::now(), t2);
}
#[test]
fn time_after_overflow() {
let executor = Executor::new_with_fake_time().unwrap();
executor.set_fake_time(Time::INFINITE - 100.nanos());
assert_eq!(Time::after(200.seconds()), Time::INFINITE);
executor.set_fake_time(Time::INFINITE_PAST + 100.nanos());
assert_eq!(Time::after((-200).seconds()), Time::INFINITE_PAST);
}
fn run_until_stalled<F>(executor: &mut Executor, fut: &mut F)
where
F: Future + Unpin,
{
loop {
match executor.run_one_step(fut) {
None => return,
Some(Poll::Pending) => { /* continue */ }
Some(Poll::Ready(_)) => panic!("executor stopped"),
}
}
}
fn run_until_done<F>(executor: &mut Executor, fut: &mut F) -> F::Output
where
F: Future + Unpin,
{
loop {
match executor.run_one_step(fut) {
None => panic!("executor stalled"),
Some(Poll::Pending) => { /* continue */ }
Some(Poll::Ready(res)) => return res,
}
}
}
// Runs a future that suspends and returns after being resumed.
#[test]
fn stepwise_two_steps() {
let fut_step = Cell::new(0);
let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
let fut_fn = |cx: &mut Context<'_>| {
fut_waker.borrow_mut().replace(cx.waker().clone());
match fut_step.get() {
0 => {
fut_step.set(1);
Poll::Pending
}
1 => {
fut_step.set(2);
Poll::Ready(())
}
_ => panic!("future called after done"),
}
};
let fut = poll_fn(fut_fn);
pin_mut!(fut);
let mut executor = Executor::new_with_fake_time().unwrap();
executor.wake_main_future();
assert_eq!(executor.is_waiting(), WaitState::Ready);
assert_eq!(fut_step.get(), 0);
assert_eq!(executor.run_one_step(&mut fut), Some(Poll::Pending));
assert_eq!(executor.is_waiting(), WaitState::Waiting(Time::INFINITE));
assert_eq!(executor.run_one_step(&mut fut), None);
assert_eq!(fut_step.get(), 1);
fut_waker.borrow_mut().take().unwrap().wake();
assert_eq!(executor.is_waiting(), WaitState::Ready);
assert_eq!(executor.run_one_step(&mut fut), Some(Poll::Ready(())));
assert_eq!(fut_step.get(), 2);
}
#[test]
// Runs a future that waits on a timer.
fn stepwise_timer() {
let mut executor = Executor::new_with_fake_time().unwrap();
executor.set_fake_time(Time::from_nanos(0));
let fut = Timer::new(Time::after(1000.nanos()));
pin_mut!(fut);
executor.wake_main_future();
run_until_stalled(&mut executor, &mut fut);
assert_eq!(Time::now(), Time::from_nanos(0));
assert_eq!(executor.is_waiting(), WaitState::Waiting(Time::from_nanos(1000)));
executor.set_fake_time(Time::from_nanos(1000));
assert_eq!(Time::now(), Time::from_nanos(1000));
assert_eq!(executor.is_waiting(), WaitState::Ready);
assert_eq!(run_until_done(&mut executor, &mut fut), ());
}
// Runs a future that waits on an event.
#[test]
fn stepwise_event() {
let mut executor = Executor::new_with_fake_time().unwrap();
let event = zx::Event::create().unwrap();
let fut = OnSignals::new(&event, zx::Signals::USER_0);
pin_mut!(fut);
executor.wake_main_future();
run_until_stalled(&mut executor, &mut fut);
assert_eq!(executor.is_waiting(), WaitState::Waiting(Time::INFINITE));
event.signal_handle(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
assert!(run_until_done(&mut executor, &mut fut).is_ok());
}
// Using `run_until_stalled` does not modify the order of events
// compared to normal execution.
#[test]
fn run_until_stalled_preserves_order() {
let mut executor = Executor::new_with_fake_time().unwrap();
let spawned_fut_completed = Arc::new(AtomicBool::new(false));
let spawned_fut_completed_writer = spawned_fut_completed.clone();
let spawned_fut = Box::pin(async move {
Timer::new(Time::after(5.seconds())).await;
spawned_fut_completed_writer.store(true, Ordering::SeqCst);
});
let main_fut = async {
Timer::new(Time::after(10.seconds())).await;
};
pin_mut!(main_fut);
spawn(spawned_fut);
assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
executor.set_fake_time(Time::after(15.seconds()));
executor.wake_expired_timers();
// The timer in `spawned_fut` should fire first, then the
// timer in `main_fut`.
assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
assert_eq!(spawned_fut_completed.load(Ordering::SeqCst), true);
}
#[test]
fn packet_receiver_map_does_not_reuse_keys() {
#[derive(Debug, Copy, Clone, PartialEq)]
struct DummyPacketReceiver {
id: i32,
}
let mut map = PacketReceiverMap::<DummyPacketReceiver>::new();
let e1 = DummyPacketReceiver { id: 1 };
assert_eq!(map.insert(e1), 0);
assert_eq!(map.insert(e1), 1);
// Still doesn't reuse IDs after one is removed
map.remove(1);
assert_eq!(map.insert(e1), 2);
// Still doesn't reuse IDs after map is cleared
map.clear();
assert_eq!(map.insert(e1), 3);
}
}