blob: 2940fd3495cc0d70247a364bef62eecfe2153e6f [file] [log] [blame]
//! An unbounded set of futures.
use std::cell::UnsafeCell;
use std::fmt::{self, Debug};
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
use std::sync::atomic::{AtomicPtr, AtomicBool};
use std::sync::{Arc, Weak};
use std::usize;
use {task, Stream, Future, Poll, Async};
use executor::{Notify, UnsafeNotify, NotifyHandle};
use task_impl::{self, AtomicTask};
/// An unbounded set of futures.
///
/// This "combinator" also serves a special function in this library, providing
/// the ability to maintain a set of futures that and manage driving them all
/// to completion.
///
/// Futures are pushed into this set and their realized values are yielded as
/// they are ready. This structure is optimized to manage a large number of
/// futures. Futures managed by `FuturesUnordered` will only be polled when they
/// generate notifications. This reduces the required amount of work needed to
/// coordinate large numbers of futures.
///
/// When a `FuturesUnordered` is first created, it does not contain any futures.
/// Calling `poll` in this state will result in `Ok(Async::Ready(None))` to be
/// returned. Futures are submitted to the set using `push`; however, the
/// future will **not** be polled at this point. `FuturesUnordered` will only
/// poll managed futures when `FuturesUnordered::poll` is called. As such, it
/// is important to call `poll` after pushing new futures.
///
/// If `FuturesUnordered::poll` returns `Ok(Async::Ready(None))` this means that
/// the set is currently not managing any futures. A future may be submitted
/// to the set at a later time. At that point, a call to
/// `FuturesUnordered::poll` will either return the future's resolved value
/// **or** `Ok(Async::NotReady)` if the future has not yet completed.
///
/// Note that you can create a ready-made `FuturesUnordered` via the
/// `futures_unordered` function in the `stream` module, or you can start with an
/// empty set with the `FuturesUnordered::new` constructor.
#[must_use = "streams do nothing unless polled"]
pub struct FuturesUnordered<F> {
inner: Arc<Inner<F>>,
len: usize,
head_all: *const Node<F>,
}
unsafe impl<T: Send> Send for FuturesUnordered<T> {}
unsafe impl<T: Sync> Sync for FuturesUnordered<T> {}
// FuturesUnordered is implemented using two linked lists. One which links all
// futures managed by a `FuturesUnordered` and one that tracks futures that have
// been scheduled for polling. The first linked list is not thread safe and is
// only accessed by the thread that owns the `FuturesUnordered` value. The
// second linked list is an implementation of the intrusive MPSC queue algorithm
// described by 1024cores.net.
//
// When a future is submitted to the set a node is allocated and inserted in
// both linked lists. The next call to `poll` will (eventually) see this node
// and call `poll` on the future.
//
// Before a managed future is polled, the current task's `Notify` is replaced
// with one that is aware of the specific future being run. This ensures that
// task notifications generated by that specific future are visible to
// `FuturesUnordered`. When a notification is received, the node is scheduled
// for polling by being inserted into the concurrent linked list.
//
// Each node uses an `AtomicUsize` to track it's state. The node state is the
// reference count (the number of outstanding handles to the node) as well as a
// flag tracking if the node is currently inserted in the atomic queue. When the
// future is notified, it will only insert itself into the linked list if it
// isn't currently inserted.
#[allow(missing_debug_implementations)]
struct Inner<T> {
// The task using `FuturesUnordered`.
parent: AtomicTask,
// Head/tail of the readiness queue
head_readiness: AtomicPtr<Node<T>>,
tail_readiness: UnsafeCell<*const Node<T>>,
stub: Arc<Node<T>>,
}
struct Node<T> {
// The future
future: UnsafeCell<Option<T>>,
// Next pointer for linked list tracking all active nodes
next_all: UnsafeCell<*const Node<T>>,
// Previous node in linked list tracking all active nodes
prev_all: UnsafeCell<*const Node<T>>,
// Next pointer in readiness queue
next_readiness: AtomicPtr<Node<T>>,
// Queue that we'll be enqueued to when notified
queue: Weak<Inner<T>>,
// Whether or not this node is currently in the mpsc queue.
queued: AtomicBool,
}
enum Dequeue<T> {
Data(*const Node<T>),
Empty,
Inconsistent,
}
impl<T> FuturesUnordered<T>
where T: Future,
{
/// Constructs a new, empty `FuturesUnordered`
///
/// The returned `FuturesUnordered` does not contain any futures and, in this
/// state, `FuturesUnordered::poll` will return `Ok(Async::Ready(None))`.
pub fn new() -> FuturesUnordered<T> {
let stub = Arc::new(Node {
future: UnsafeCell::new(None),
next_all: UnsafeCell::new(ptr::null()),
prev_all: UnsafeCell::new(ptr::null()),
next_readiness: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
queue: Weak::new(),
});
let stub_ptr = &*stub as *const Node<T>;
let inner = Arc::new(Inner {
parent: AtomicTask::new(),
head_readiness: AtomicPtr::new(stub_ptr as *mut _),
tail_readiness: UnsafeCell::new(stub_ptr),
stub: stub,
});
FuturesUnordered {
len: 0,
head_all: ptr::null_mut(),
inner: inner,
}
}
}
impl<T> FuturesUnordered<T> {
/// Returns the number of futures contained in the set.
///
/// This represents the total number of in-flight futures.
pub fn len(&self) -> usize {
self.len
}
/// Returns `true` if the set contains no futures
pub fn is_empty(&self) -> bool {
self.len == 0
}
/// Push a future into the set.
///
/// This function submits the given future to the set for managing. This
/// function will not call `poll` on the submitted future. The caller must
/// ensure that `FuturesUnordered::poll` is called in order to receive task
/// notifications.
pub fn push(&mut self, future: T) {
let node = Arc::new(Node {
future: UnsafeCell::new(Some(future)),
next_all: UnsafeCell::new(ptr::null_mut()),
prev_all: UnsafeCell::new(ptr::null_mut()),
next_readiness: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
queue: Arc::downgrade(&self.inner),
});
// Right now our node has a strong reference count of 1. We transfer
// ownership of this reference count to our internal linked list
// and we'll reclaim ownership through the `unlink` function below.
let ptr = self.link(node);
// We'll need to get the future "into the system" to start tracking it,
// e.g. getting its unpark notifications going to us tracking which
// futures are ready. To do that we unconditionally enqueue it for
// polling here.
self.inner.enqueue(ptr);
}
/// Returns an iterator that allows modifying each future in the set.
pub fn iter_mut(&mut self) -> IterMut<T> {
IterMut {
node: self.head_all,
len: self.len,
_marker: PhantomData
}
}
fn release_node(&mut self, node: Arc<Node<T>>) {
// The future is done, try to reset the queued flag. This will prevent
// `notify` from doing any work in the future
let prev = node.queued.swap(true, SeqCst);
// Drop the future, even if it hasn't finished yet. This is safe
// because we're dropping the future on the thread that owns
// `FuturesUnordered`, which correctly tracks T's lifetimes and such.
unsafe {
drop((*node.future.get()).take());
}
// If the queued flag was previously set then it means that this node
// is still in our internal mpsc queue. We then transfer ownership
// of our reference count to the mpsc queue, and it'll come along and
// free it later, noticing that the future is `None`.
//
// If, however, the queued flag was *not* set then we're safe to
// release our reference count on the internal node. The queued flag
// was set above so all future `enqueue` operations will not actually
// enqueue the node, so our node will never see the mpsc queue again.
// The node itself will be deallocated once all reference counts have
// been dropped by the various owning tasks elsewhere.
if prev {
mem::forget(node);
}
}
/// Insert a new node into the internal linked list.
fn link(&mut self, node: Arc<Node<T>>) -> *const Node<T> {
let ptr = arc2ptr(node);
unsafe {
*(*ptr).next_all.get() = self.head_all;
if !self.head_all.is_null() {
*(*self.head_all).prev_all.get() = ptr;
}
}
self.head_all = ptr;
self.len += 1;
return ptr
}
/// Remove the node from the linked list tracking all nodes currently
/// managed by `FuturesUnordered`.
unsafe fn unlink(&mut self, node: *const Node<T>) -> Arc<Node<T>> {
let node = ptr2arc(node);
let next = *node.next_all.get();
let prev = *node.prev_all.get();
*node.next_all.get() = ptr::null_mut();
*node.prev_all.get() = ptr::null_mut();
if !next.is_null() {
*(*next).prev_all.get() = prev;
}
if !prev.is_null() {
*(*prev).next_all.get() = next;
} else {
self.head_all = next;
}
self.len -= 1;
return node
}
}
impl<T> Stream for FuturesUnordered<T>
where T: Future
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
// Ensure `parent` is correctly set.
self.inner.parent.register();
loop {
let node = match unsafe { self.inner.dequeue() } {
Dequeue::Empty => {
if self.is_empty() {
return Ok(Async::Ready(None));
} else {
return Ok(Async::NotReady)
}
}
Dequeue::Inconsistent => {
// At this point, it may be worth yielding the thread &
// spinning a few times... but for now, just yield using the
// task system.
task::current().notify();
return Ok(Async::NotReady);
}
Dequeue::Data(node) => node,
};
debug_assert!(node != self.inner.stub());
unsafe {
let mut future = match (*(*node).future.get()).take() {
Some(future) => future,
// If the future has already gone away then we're just
// cleaning out this node. See the comment in
// `release_node` for more information, but we're basically
// just taking ownership of our reference count here.
None => {
let node = ptr2arc(node);
assert!((*node.next_all.get()).is_null());
assert!((*node.prev_all.get()).is_null());
continue
}
};
// Unset queued flag... this must be done before
// polling. This ensures that the future gets
// rescheduled if it is notified **during** a call
// to `poll`.
let prev = (*node).queued.swap(false, SeqCst);
assert!(prev);
// We're going to need to be very careful if the `poll`
// function below panics. We need to (a) not leak memory and
// (b) ensure that we still don't have any use-after-frees. To
// manage this we do a few things:
//
// * This "bomb" here will call `release_node` if dropped
// abnormally. That way we'll be sure the memory management
// of the `node` is managed correctly.
// * The future was extracted above (taken ownership). That way
// if it panics we're guaranteed that the future is
// dropped on this thread and doesn't accidentally get
// dropped on a different thread (bad).
// * We unlink the node from our internal queue to preemptively
// assume it'll panic, in which case we'll want to discard it
// regardless.
struct Bomb<'a, T: 'a> {
queue: &'a mut FuturesUnordered<T>,
node: Option<Arc<Node<T>>>,
}
impl<'a, T> Drop for Bomb<'a, T> {
fn drop(&mut self) {
if let Some(node) = self.node.take() {
self.queue.release_node(node);
}
}
}
let mut bomb = Bomb {
node: Some(self.unlink(node)),
queue: self,
};
// Poll the underlying future with the appropriate `notify`
// implementation. This is where a large bit of the unsafety
// starts to stem from internally. The `notify` instance itself
// is basically just our `Arc<Node<T>>` and tracks the mpsc
// queue of ready futures.
//
// Critically though `Node<T>` won't actually access `T`, the
// future, while it's floating around inside of `Task`
// instances. These structs will basically just use `T` to size
// the internal allocation, appropriately accessing fields and
// deallocating the node if need be.
let res = {
let notify = NodeToHandle(bomb.node.as_ref().unwrap());
task_impl::with_notify(&notify, 0, || {
future.poll()
})
};
let ret = match res {
Ok(Async::NotReady) => {
let node = bomb.node.take().unwrap();
*node.future.get() = Some(future);
bomb.queue.link(node);
continue
}
Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
Err(e) => Err(e),
};
return ret
}
}
}
}
impl<T: Debug> Debug for FuturesUnordered<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "FuturesUnordered {{ ... }}")
}
}
impl<T> Drop for FuturesUnordered<T> {
fn drop(&mut self) {
// When a `FuturesUnordered` is dropped we want to drop all futures associated
// with it. At the same time though there may be tons of `Task` handles
// flying around which contain `Node<T>` references inside them. We'll
// let those naturally get deallocated when the `Task` itself goes out
// of scope or gets notified.
unsafe {
while !self.head_all.is_null() {
let head = self.head_all;
let node = self.unlink(head);
self.release_node(node);
}
}
// Note that at this point we could still have a bunch of nodes in the
// mpsc queue. None of those nodes, however, have futures associated
// with them so they're safe to destroy on any thread. At this point
// the `FuturesUnordered` struct, the owner of the one strong reference
// to `Inner<T>` will drop the strong reference. At that point
// whichever thread releases the strong refcount last (be it this
// thread or some other thread as part of an `upgrade`) will clear out
// the mpsc queue and free all remaining nodes.
//
// While that freeing operation isn't guaranteed to happen here, it's
// guaranteed to happen "promptly" as no more "blocking work" will
// happen while there's a strong refcount held.
}
}
impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
fn from_iter<T>(iter: T) -> Self
where T: IntoIterator<Item = F>
{
let mut new = FuturesUnordered::new();
for future in iter.into_iter() {
new.push(future);
}
new
}
}
#[derive(Debug)]
/// Mutable iterator over all futures in the unordered set.
pub struct IterMut<'a, F: 'a> {
node: *const Node<F>,
len: usize,
_marker: PhantomData<&'a mut FuturesUnordered<F>>
}
impl<'a, F> Iterator for IterMut<'a, F> {
type Item = &'a mut F;
fn next(&mut self) -> Option<&'a mut F> {
if self.node.is_null() {
return None;
}
unsafe {
let future = (*(*self.node).future.get()).as_mut().unwrap();
let next = *(*self.node).next_all.get();
self.node = next;
self.len -= 1;
return Some(future);
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}
impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}
impl<T> Inner<T> {
/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
fn enqueue(&self, node: *const Node<T>) {
unsafe {
debug_assert!((*node).queued.load(Relaxed));
// This action does not require any coordination
(*node).next_readiness.store(ptr::null_mut(), Relaxed);
// Note that these atomic orderings come from 1024cores
let node = node as *mut _;
let prev = self.head_readiness.swap(node, AcqRel);
(*prev).next_readiness.store(node, Release);
}
}
/// The dequeue function from the 1024cores intrusive MPSC queue algorithm
///
/// Note that this unsafe as it required mutual exclusion (only one thread
/// can call this) to be guaranteed elsewhere.
unsafe fn dequeue(&self) -> Dequeue<T> {
let mut tail = *self.tail_readiness.get();
let mut next = (*tail).next_readiness.load(Acquire);
if tail == self.stub() {
if next.is_null() {
return Dequeue::Empty;
}
*self.tail_readiness.get() = next;
tail = next;
next = (*next).next_readiness.load(Acquire);
}
if !next.is_null() {
*self.tail_readiness.get() = next;
debug_assert!(tail != self.stub());
return Dequeue::Data(tail);
}
if self.head_readiness.load(Acquire) as *const _ != tail {
return Dequeue::Inconsistent;
}
self.enqueue(self.stub());
next = (*tail).next_readiness.load(Acquire);
if !next.is_null() {
*self.tail_readiness.get() = next;
return Dequeue::Data(tail);
}
Dequeue::Inconsistent
}
fn stub(&self) -> *const Node<T> {
&*self.stub
}
}
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
// Once we're in the destructor for `Inner<T>` we need to clear out the
// mpsc queue of nodes if there's anything left in there.
//
// Note that each node has a strong reference count associated with it
// which is owned by the mpsc queue. All nodes should have had their
// futures dropped already by the `FuturesUnordered` destructor above,
// so we're just pulling out nodes and dropping their refcounts.
unsafe {
loop {
match self.dequeue() {
Dequeue::Empty => break,
Dequeue::Inconsistent => abort("inconsistent in drop"),
Dequeue::Data(ptr) => drop(ptr2arc(ptr)),
}
}
}
}
}
#[allow(missing_debug_implementations)]
struct NodeToHandle<'a, T: 'a>(&'a Arc<Node<T>>);
impl<'a, T> Clone for NodeToHandle<'a, T> {
fn clone(&self) -> Self {
NodeToHandle(self.0)
}
}
impl<'a, T> From<NodeToHandle<'a, T>> for NotifyHandle {
fn from(handle: NodeToHandle<'a, T>) -> NotifyHandle {
unsafe {
let ptr = handle.0.clone();
let ptr = mem::transmute::<Arc<Node<T>>, *mut ArcNode<T>>(ptr);
NotifyHandle::new(hide_lt(ptr))
}
}
}
struct ArcNode<T>(PhantomData<T>);
// We should never touch `T` on any thread other than the one owning
// `FuturesUnordered`, so this should be a safe operation.
unsafe impl<T> Send for ArcNode<T> {}
unsafe impl<T> Sync for ArcNode<T> {}
impl<T> Notify for ArcNode<T> {
fn notify(&self, _id: usize) {
unsafe {
let me: *const ArcNode<T> = self;
let me: *const *const ArcNode<T> = &me;
let me = me as *const Arc<Node<T>>;
Node::notify(&*me)
}
}
}
unsafe impl<T> UnsafeNotify for ArcNode<T> {
unsafe fn clone_raw(&self) -> NotifyHandle {
let me: *const ArcNode<T> = self;
let me: *const *const ArcNode<T> = &me;
let me = &*(me as *const Arc<Node<T>>);
NodeToHandle(me).into()
}
unsafe fn drop_raw(&self) {
let mut me: *const ArcNode<T> = self;
let me = &mut me as *mut *const ArcNode<T> as *mut Arc<Node<T>>;
ptr::drop_in_place(me);
}
}
unsafe fn hide_lt<T>(p: *mut ArcNode<T>) -> *mut UnsafeNotify {
mem::transmute(p as *mut UnsafeNotify)
}
impl<T> Node<T> {
fn notify(me: &Arc<Node<T>>) {
let inner = match me.queue.upgrade() {
Some(inner) => inner,
None => return,
};
// It's our job to notify the node that it's ready to get polled,
// meaning that we need to enqueue it into the readiness queue. To
// do this we flag that we're ready to be queued, and if successful
// we then do the literal queueing operation, ensuring that we're
// only queued once.
//
// Once the node is inserted we be sure to notify the parent task,
// as it'll want to come along and pick up our node now.
//
// Note that we don't change the reference count of the node here,
// we're just enqueueing the raw pointer. The `FuturesUnordered`
// implementation guarantees that if we set the `queued` flag true that
// there's a reference count held by the main `FuturesUnordered` queue
// still.
let prev = me.queued.swap(true, SeqCst);
if !prev {
inner.enqueue(&**me);
inner.parent.notify();
}
}
}
impl<T> Drop for Node<T> {
fn drop(&mut self) {
// Currently a `Node<T>` is sent across all threads for any lifetime,
// regardless of `T`. This means that for memory safety we can't
// actually touch `T` at any time except when we have a reference to the
// `FuturesUnordered` itself.
//
// Consequently it *should* be the case that we always drop futures from
// the `FuturesUnordered` instance, but this is a bomb in place to catch
// any bugs in that logic.
unsafe {
if (*self.future.get()).is_some() {
abort("future still here when dropping");
}
}
}
}
fn arc2ptr<T>(ptr: Arc<T>) -> *const T {
let addr = &*ptr as *const T;
mem::forget(ptr);
return addr
}
unsafe fn ptr2arc<T>(ptr: *const T) -> Arc<T> {
let anchor = mem::transmute::<usize, Arc<T>>(0x10);
let addr = &*anchor as *const T;
mem::forget(anchor);
let offset = addr as isize - 0x10;
mem::transmute::<isize, Arc<T>>(ptr as isize - offset)
}
fn abort(s: &str) -> ! {
struct DoublePanic;
impl Drop for DoublePanic {
fn drop(&mut self) {
panic!("panicking twice to abort the program");
}
}
let _bomb = DoublePanic;
panic!("{}", s);
}