blob: 7857d54cf7ab1ec6ce7c14e8b8fac29ce71403dd [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::atomic_future::AtomicFuture;
use crossbeam::queue::SegQueue;
use fuchsia_zircon as zx;
use futures::future::{self, FutureObj, LocalFutureObj};
use futures::task::{
local_waker_from_nonlocal, local_waker_ref_from_nonlocal, AtomicWaker, Spawn, SpawnError,
};
use futures::{task, Future, FutureExt, Poll};
use parking_lot::{Condvar, Mutex};
use pin_utils::pin_mut;
use slab::Slab;
use std::cell::RefCell;
use std::collections::BinaryHeap;
use std::marker::Unpin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::thread;
use std::{cmp, fmt, mem};
use std::{u64, usize};
const EMPTY_WAKEUP_ID: u64 = u64::MAX;
const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
/// Spawn a new task to be run on the global executor.
///
/// Tasks spawned using this method must be threadsafe (implement the `Send` trait),
/// as they may be run on either a singlethreaded or multithreaded executor.
pub fn spawn<F>(future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Inner::spawn(&EHandle::local().inner, FutureObj::new(Box::new(future)));
}
/// Spawn a new task to be run on the global executor.
///
/// This is similar to the `spawn` function, but tasks spawned using this method
/// do not have to be threadsafe (implement the `Send` trait). In return, this method
/// requires that the current executor never be run in a multithreaded mode-- only
/// `run_singlethreaded` can be used.
pub fn spawn_local<F>(future: F)
where
F: Future<Output = ()> + 'static,
{
Inner::spawn_local(
&EHandle::local().inner,
LocalFutureObj::new(Box::new(future)),
);
}
/// A trait for handling the arrival of a packet on a `zx::Port`.
///
/// This trait should be implemented by users who wish to write their own
/// types which receive asynchronous notifications from a `zx::Port`.
/// Implementors of this trait generally contain a `futures::task::AtomicWaker` which
/// is used to wake up the task which can make progress due to the arrival of
/// the packet.
///
/// `PacketReceiver`s should be registered with a `Core` using the
/// `register_receiver` method on `Core`, `Handle`, or `Remote`.
/// Upon registration, users will receive a `ReceiverRegistration`
/// which provides `key` and `port` methods. These methods can be used to wait on
/// asynchronous signals.
pub trait PacketReceiver: Send + Sync + 'static {
/// Receive a packet when one arrives.
fn receive_packet(&self, packet: zx::Packet);
}
/// A registration of a `PacketReceiver`.
/// When dropped, it will automatically deregister the `PacketReceiver`.
// NOTE: purposefully does not implement `Clone`.
#[derive(Debug)]
pub struct ReceiverRegistration<T: PacketReceiver> {
receiver: Arc<T>,
ehandle: EHandle,
key: u64,
}
impl<T> ReceiverRegistration<T>
where
T: PacketReceiver,
{
/// The key with which `Packet`s destined for this receiver should be sent on the `zx::Port`.
pub fn key(&self) -> u64 {
self.key
}
/// The internal `PacketReceiver`.
pub fn receiver(&self) -> &T {
&*self.receiver
}
/// The `zx::Port` on which packets destined for this `PacketReceiver` should be queued.
pub fn port(&self) -> &zx::Port {
self.ehandle.port()
}
}
impl<T> Drop for ReceiverRegistration<T>
where
T: PacketReceiver,
{
fn drop(&mut self) {
self.ehandle.deregister_receiver(self.key);
}
}
/// A port-based executor for Fuchsia OS.
// NOTE: intentionally does not implement `Clone`.
pub struct Executor {
inner: Arc<Inner>,
}
impl fmt::Debug for Executor {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Executor")
.field("port", &self.inner.port)
.finish()
}
}
type TimerHeap = BinaryHeap<TimeWaker>;
thread_local!(
static EXECUTOR: RefCell<Option<(Arc<Inner>, TimerHeap)>> = RefCell::new(None)
);
fn with_local_timer_heap<F, R>(f: F) -> R
where
F: FnOnce(&mut TimerHeap) -> R,
{
EXECUTOR.with(|e| {
(f)(&mut e
.borrow_mut()
.as_mut()
.expect("can't get timer heap before fuchsia_async::Executor is initialized")
.1)
})
}
impl Executor {
/// Creates a new executor.
pub fn new() -> Result<Self, zx::Status> {
let executor = Executor {
inner: Arc::new(Inner {
port: zx::Port::create()?,
done: AtomicBool::new(false),
threadiness: Threadiness::default(),
threads: Mutex::new(Vec::new()),
receivers: Mutex::new(Slab::new()),
ready_tasks: SegQueue::new(),
}),
};
executor.ehandle().set_local(TimerHeap::new());
Ok(executor)
}
/// Returns a handle to the executor.
pub fn ehandle(&self) -> EHandle {
EHandle {
inner: self.inner.clone(),
}
}
/// Run a single future to completion on a single thread.
// Takes `&mut self` to ensure that only one thread-manager is running at a time.
pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
where
F: Future,
{
pin_mut!(main_future);
let lw =
local_waker_from_nonlocal(Arc::new(SingleThreadedMainTaskWake(self.inner.clone())));
let mut res = main_future.as_mut().poll(&lw);
loop {
if let Poll::Ready(res) = res {
return res;
}
let packet = with_local_timer_heap(|timer_heap| {
let deadline = next_deadline(timer_heap)
.map(|t| t.time)
.unwrap_or(zx::Time::INFINITE);
match self.inner.port.wait(deadline) {
Ok(packet) => Some(packet),
Err(zx::Status::TIMED_OUT) => {
let time_waker = timer_heap.pop().unwrap();
time_waker.wake();
None
}
Err(status) => {
panic!("Error calling port wait: {:?}", status);
}
}
});
if let Some(packet) = packet {
match packet.key() {
EMPTY_WAKEUP_ID => {
res = main_future.as_mut().poll(&lw);
}
TASK_READY_WAKEUP_ID => {
// TODO: loop but don't starve
if let Some(task) = self.inner.ready_tasks.try_pop() {
let lw = local_waker_ref_from_nonlocal(&task);
task.future.try_poll(&lw);
}
}
receiver_key => {
self.inner.deliver_packet(receiver_key as usize, packet);
}
}
}
}
}
/// PollResult the future. If it is not ready, dispatch available packets and possibly try again.
/// Timers will not fire. Never blocks.
///
/// Task-local data will not be persisted across different calls to this method.
///
/// This is mainly intended for testing.
///
/// Unpin: this function requires all futures to be `Unpin`able, so any `!Unpin`
/// futures must first be pinned using the `pin_mut!` macro from the `pin-utils` crate.
pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
where
F: Future + Unpin,
{
let lw =
local_waker_from_nonlocal(Arc::new(SingleThreadedMainTaskWake(self.inner.clone())));
let mut res = main_future.poll_unpin(&lw);
loop {
if res.is_ready() {
return res;
}
let packet = match self.inner.port.wait(zx::Time::from_nanos(0)) {
Ok(packet) => packet,
Err(zx::Status::TIMED_OUT) => return Poll::Pending,
Err(status) => panic!("Error calling port wait: {:?}", status),
};
match packet.key() {
EMPTY_WAKEUP_ID => {
res = main_future.poll_unpin(&lw);
}
TASK_READY_WAKEUP_ID => {
if let Some(task) = self.inner.ready_tasks.try_pop() {
let lw = local_waker_ref_from_nonlocal(&task);
task.future.try_poll(&lw);
}
}
receiver_key => {
self.inner.deliver_packet(receiver_key as usize, packet);
}
}
}
}
/// Wake up the next task waiting for a timer, if any, and return the time for which the
/// timer was scheduled.
///
/// This is intended for use in test code in conjunction with `run_until_stalled`.
/// For example, here is how one could test that the Timer future fires after the given
/// timeout:
///
/// let deadline = 5.seconds().after_now();
/// let mut future = Timer::<Never>::new(deadline);
/// assert_eq!(Ok(Poll::Pending), exec.run_until_stalled(&mut future));
/// assert_eq!(Some(deadline), exec.wake_next_timer());
/// assert_eq!(Ok(Poll::Ready(())), exec.run_until_stalled(&mut future));
pub fn wake_next_timer(&mut self) -> Option<zx::Time> {
with_local_timer_heap(|timer_heap| {
let deadline = next_deadline(timer_heap).map(|waker| {
waker.wake();
waker.time
});
if deadline.is_some() {
timer_heap.pop();
}
deadline
})
}
/// Run a single future to completion using multiple threads.
// Takes `&mut self` to ensure that only one thread-manager is running at a time.
pub fn run<F>(&mut self, future: F, num_threads: usize) -> F::Output
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.inner.threadiness.require_multithreaded().expect(
"Error: called `run` on executor after using `spawn_local`. \
Use `run_singlethreaded` instead.",
);
let pair = Arc::new((Mutex::new(None), Condvar::new()));
let pair2 = pair.clone();
// Spawn a future which will set the result upon completion.
Inner::spawn(
&self.inner,
FutureObj::new(Box::new(future.then(move |fut_result| {
let (lock, cvar) = &*pair2;
let mut result = lock.lock();
*result = Some(fut_result);
cvar.notify_one();
future::ready(())
}))),
);
// Start worker threads, handing off timers from the current thread.
self.inner.done.store(false, Ordering::SeqCst);
with_local_timer_heap(|timer_heap| {
let timer_heap = mem::replace(timer_heap, TimerHeap::new());
self.create_worker_threads(num_threads, Some(timer_heap));
});
// Wait until the signal the future has completed.
let (lock, cvar) = &*pair;
let mut result = lock.lock();
while result.is_none() {
cvar.wait(&mut result);
}
// Spin down worker threads
self.inner.done.store(true, Ordering::SeqCst);
self.join_all();
// Unwrap is fine because of the check to `is_none` above.
result.take().unwrap()
}
/// Add `num_workers` worker threads to the executor's thread pool.
/// `timers`: timers from the "master" thread which would otherwise be lost.
fn create_worker_threads(&self, num_workers: usize, mut timers: Option<TimerHeap>) {
let mut threads = self.inner.threads.lock();
for _ in 0..num_workers {
threads.push(self.new_worker(timers.take()));
}
}
fn join_all(&self) {
let mut threads = self.inner.threads.lock();
// Send a user packet to wake up all the threads
for _thread in threads.iter() {
self.inner.notify_empty();
}
// Join the worker threads
for thread in threads.drain(..) {
thread.join().expect("Couldn't join worker thread.");
}
}
fn new_worker(&self, timers: Option<TimerHeap>) -> thread::JoinHandle<()> {
let inner = self.inner.clone();
thread::spawn(move || Self::worker_lifecycle(inner, timers))
}
fn worker_lifecycle(inner: Arc<Inner>, timers: Option<TimerHeap>) {
let executor: EHandle = EHandle {
inner: inner.clone(),
};
executor.set_local(timers.unwrap_or(TimerHeap::new()));
loop {
if inner.done.load(Ordering::SeqCst) {
EHandle::rm_local();
return;
}
let packet = with_local_timer_heap(|timer_heap| {
let deadline = next_deadline(timer_heap)
.map(|t| t.time)
.unwrap_or(zx::Time::INFINITE);
match inner.port.wait(deadline) {
Ok(packet) => Some(packet),
Err(zx::Status::TIMED_OUT) => {
let time_waker = timer_heap.pop().unwrap();
time_waker.wake();
None
}
Err(status) => {
panic!("Error calling port wait: {:?}", status);
}
}
});
if let Some(packet) = packet {
match packet.key() {
EMPTY_WAKEUP_ID => {}
TASK_READY_WAKEUP_ID => {
// TODO: loop but don't starve
if let Some(task) = inner.ready_tasks.try_pop() {
let lw = local_waker_ref_from_nonlocal(&task);
task.future.try_poll(&lw);
}
}
receiver_key => {
inner.deliver_packet(receiver_key as usize, packet);
}
}
}
}
}
}
fn next_deadline(heap: &mut TimerHeap) -> Option<&TimeWaker> {
while is_defunct_timer(heap.peek()) {
heap.pop();
}
heap.peek()
}
fn is_defunct_timer(timer: Option<&TimeWaker>) -> bool {
match timer {
None => false,
Some(timer) => timer.waker_and_bool.upgrade().is_none(),
}
}
// Since there are no other threads running, we don't have to use the EMPTY_WAKEUP_ID,
// so instead we save it for use as the main task wakeup id.
struct SingleThreadedMainTaskWake(Arc<Inner>);
impl task::Wake for SingleThreadedMainTaskWake {
fn wake(arc_self: &Arc<Self>) {
arc_self.0.notify_empty();
}
}
impl Drop for Executor {
fn drop(&mut self) {
// Done flag must be set before dropping packet recievers
// so that future receivers that attempt to deregister themselves
// know that it's okay if their entries are already missing.
self.inner.done.store(true, Ordering::SeqCst);
// Wake the threads so they can kill themselves.
self.join_all();
// Drop all of the packet receivers
self.inner.receivers.lock().clear();
// Drop all of the uncompleted tasks
while let Some(_) = self.inner.ready_tasks.try_pop() {}
// Remove the thread-local executor set in `new`.
EHandle::rm_local();
}
}
/// A handle to an executor.
#[derive(Clone)]
pub struct EHandle {
inner: Arc<Inner>,
}
impl fmt::Debug for EHandle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EHandle")
.field("port", &self.inner.port)
.finish()
}
}
impl Spawn for EHandle {
fn spawn_obj(&mut self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> {
<&EHandle>::spawn_obj(&mut &*self, f)
}
}
impl<'a> Spawn for &'a EHandle {
fn spawn_obj(&mut self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> {
Inner::spawn(&self.inner, f);
Ok(())
}
}
impl EHandle {
/// Returns the thread-local executor.
pub fn local() -> Self {
let inner = EXECUTOR
.with(|e| e.borrow().as_ref().map(|x| x.0.clone()))
.expect("Fuchsia Executor must be created first");
EHandle { inner }
}
fn set_local(self, timers: TimerHeap) {
let inner = self.inner.clone();
EXECUTOR.with(|e| {
let mut e = e.borrow_mut();
assert!(e.is_none());
*e = Some((inner, timers));
});
}
fn rm_local() {
EXECUTOR.with(|e| *e.borrow_mut() = None);
}
/// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
pub fn port(&self) -> &zx::Port {
&self.inner.port
}
/// Registers a `PacketReceiver` with the executor and returns a registration.
/// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
pub fn register_receiver<T>(&self, receiver: Arc<T>) -> ReceiverRegistration<T>
where
T: PacketReceiver,
{
let key = self.inner.receivers.lock().insert(receiver.clone()) as u64;
ReceiverRegistration {
ehandle: self.clone(),
key,
receiver,
}
}
fn deregister_receiver(&self, key: u64) {
let key = key as usize;
let mut lock = self.inner.receivers.lock();
if lock.contains(key) {
lock.remove(key);
} else {
// The executor is shutting down and already removed the entry.
assert!(
self.inner.done.load(Ordering::SeqCst),
"Missing receiver to deregister"
);
}
}
pub(crate) fn register_timer(
&self, time: zx::Time, waker_and_bool: &Arc<(AtomicWaker, AtomicBool)>,
) {
with_local_timer_heap(|timer_heap| {
let waker_and_bool = Arc::downgrade(waker_and_bool);
timer_heap.push(TimeWaker {
time,
waker_and_bool,
})
})
}
}
/// The executor has not been run in multithreaded mode and no thread-unsafe
/// futures have been spawned.
const THREADINESS_ANY: usize = 0;
/// The executor has not been run in multithreaded mode, but thread-unsafe
/// futures have been spawned, so it cannot ever be run in multithreaded mode.
const THREADINESS_SINGLE: usize = 1;
/// The executor has been run in multithreaded mode.
/// No thread-unsafe futures can be spawned.
const THREADINESS_MULTI: usize = 2;
/// Tracks the multihthreaded-compatibility state of the executor.
struct Threadiness(AtomicUsize);
impl Default for Threadiness {
fn default() -> Self {
Threadiness(AtomicUsize::new(THREADINESS_ANY))
}
}
impl Threadiness {
fn try_become(&self, target: usize) -> Result<(), ()> {
match self.0.compare_exchange(
/* current */ THREADINESS_ANY,
/* new */ target,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => Ok(()),
Err(x) if x == target => Ok(()),
Err(_) => Err(()),
}
}
/// Attempts to switch the threadiness to singlethreaded-only mode.
/// Will fail iff a prior call to `require_multithreaded` was made.
fn require_singlethreaded(&self) -> Result<(), ()> {
self.try_become(THREADINESS_SINGLE)
}
/// Attempts to switch the threadiness to multithreaded mode.
/// Will fail iff a prior call to `require_singlethreaded` was made.
fn require_multithreaded(&self) -> Result<(), ()> {
self.try_become(THREADINESS_MULTI)
}
}
struct Inner {
port: zx::Port,
done: AtomicBool,
threadiness: Threadiness,
threads: Mutex<Vec<thread::JoinHandle<()>>>,
receivers: Mutex<Slab<Arc<PacketReceiver>>>,
ready_tasks: SegQueue<Arc<Task>>,
}
struct TimeWaker {
time: zx::Time,
waker_and_bool: Weak<(AtomicWaker, AtomicBool)>,
}
impl TimeWaker {
fn wake(&self) {
if let Some(wb) = self.waker_and_bool.upgrade() {
wb.1.store(true, Ordering::SeqCst);
wb.0.wake();
}
}
}
impl Ord for TimeWaker {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.time.cmp(&other.time).reverse() // Reverse to get min-heap rather than max
}
}
impl PartialOrd for TimeWaker {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Eq for TimeWaker {}
// N.B.: two TimerWakers can be equal even if they don't have the same
// waker_and_bool. This is fine since BinaryHeap doesn't deduplicate.
impl PartialEq for TimeWaker {
fn eq(&self, other: &Self) -> bool {
self.time == other.time
}
}
impl Inner {
fn spawn(arc_self: &Arc<Self>, future: FutureObj<'static, ()>) {
let task = Arc::new(Task {
future: AtomicFuture::new(future),
executor: arc_self.clone(),
});
arc_self.ready_tasks.push(task);
arc_self.notify_task_ready();
}
fn spawn_local(arc_self: &Arc<Self>, future: LocalFutureObj<'static, ()>) {
arc_self.threadiness.require_singlethreaded().expect(
"Error: called `spawn_local` after calling `run` on executor. \
Use `spawn` or `run_singlethreaded` instead.",
);
Inner::spawn(
arc_self,
// Unsafety: we've confirmed that the boxed futures here will never be used
// across multiple threads, so we can safely convert from a non-`Send`able
// future to a `Send`able one.
unsafe { future.into_future_obj() },
)
}
fn notify_task_ready(&self) {
// TODO: optimize so that this function doesn't push new items onto
// the queue if all worker threads are already awake
self.notify_id(TASK_READY_WAKEUP_ID);
}
fn notify_empty(&self) {
self.notify_id(EMPTY_WAKEUP_ID);
}
fn notify_id(&self, id: u64) {
let up = zx::UserPacket::from_u8_array([0; 32]);
let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
if let Err(e) = self.port.queue(&packet) {
// TODO: logging
eprintln!("Failed to queue notify in port: {:?}", e);
}
}
fn deliver_packet(&self, key: usize, packet: zx::Packet) {
let receiver = match self.receivers.lock().get(key) {
// Clone the `Arc` so that we don't hold the lock
// any longer than absolutely necessary.
// The `receive_packet` impl may be arbitrarily complex.
Some(receiver) => receiver.clone(),
None => return,
};
receiver.receive_packet(packet);
}
}
struct Task {
future: AtomicFuture,
executor: Arc<Inner>,
}
impl task::Wake for Task {
fn wake(arc_self: &Arc<Self>) {
arc_self.executor.ready_tasks.push(arc_self.clone());
arc_self.executor.notify_task_ready();
}
}