blob: 615efc1f703872dfd546f75f7c1a62843e8aace2 [file] [log] [blame]
//! Execution of futures on a single thread
//!
//! This module has no special handling of any blocking operations other than
//! futures-aware inter-thread communications, and is not intended to be used to
//! manage I/O. For futures that do I/O you'll likely want to use `tokio-core`.
use std::cell::{Cell, RefCell};
use std::sync::{Arc, Mutex, mpsc};
use futures::executor::{self, Spawn, Notify};
use futures::future::{Executor, ExecuteError};
use futures::{Future, Async};
/// Main loop object
pub struct Core {
tx: mpsc::Sender<usize>,
rx: mpsc::Receiver<usize>,
notify: Arc<MyNotify>,
// Slab of running futures used to track what's running and what slots are
// empty. Slot indexes are then sent along tx/rx above to indicate which
// future is ready to get polled.
tasks: RefCell<Vec<Slot>>,
next_vacant: Cell<usize>,
}
enum Slot {
Vacant { next_vacant: usize },
Running(Option<Spawn<Box<Future<Item = (), Error = ()>>>>),
}
impl Core {
/// Create a new `Core`.
pub fn new() -> Self {
let (tx, rx) = mpsc::channel();
Core {
notify: Arc::new(MyNotify {
tx: Mutex::new(tx.clone()),
}),
tx: tx,
rx: rx,
next_vacant: Cell::new(0),
tasks: RefCell::new(Vec::new()),
}
}
/// Spawn a future to be executed by a future call to `run`.
///
/// The future `f` provided will not be executed until `run` is called
/// below. While futures passed to `run` are executing, the future provided
/// here will be executed concurrently as well.
pub fn spawn<F>(&self, f: F)
where F: Future<Item=(), Error=()> + 'static
{
let idx = self.next_vacant.get();
let mut tasks = self.tasks.borrow_mut();
match tasks.get_mut(idx) {
Some(&mut Slot::Vacant { next_vacant }) => {
self.next_vacant.set(next_vacant);
}
Some(&mut Slot::Running (_)) => {
panic!("vacant points to running future")
}
None => {
assert_eq!(idx, tasks.len());
tasks.push(Slot::Vacant { next_vacant: 0 });
self.next_vacant.set(idx + 1);
}
}
tasks[idx] = Slot::Running(Some(executor::spawn(Box::new(f))));
self.tx.send(idx).unwrap();
}
/// Run the loop until the future `f` completes.
///
/// This method will block the current thread until the future `f` has
/// resolved. While waiting on `f` to finish it will also execute any
/// futures spawned via `spawn` above.
pub fn run<F>(&self, f: F) -> Result<F::Item, F::Error>
where F: Future,
{
let id = usize::max_value();
self.tx.send(id).unwrap();
let mut f = executor::spawn(f);
loop {
if self.turn() {
match f.poll_future_notify(&self.notify, id)? {
Async::Ready(e) => return Ok(e),
Async::NotReady => {}
}
}
}
}
/// "Turns" this event loop one tick.
///
/// This'll block the current thread until something happens, and once an
/// event happens this will act on that event.
///
/// # Return value
///
/// Returns `true` if the future passed to `run` should be polled or `false`
/// otherwise.
fn turn(&self) -> bool {
let task_id = self.rx.recv().unwrap();
if task_id == usize::max_value() {
return true
}
// This may be a spurious wakeup so we're not guaranteed to have a
// future associated with `task_id`, so do a fallible lookup.
//
// Note that we don't want to borrow `self.tasks` for too long so we
// try to extract the future here and leave behind a tombstone future
// which'll get replaced or removed later. This is how we support
// spawn-in-run.
let mut future = match self.tasks.borrow_mut().get_mut(task_id) {
Some(&mut Slot::Running(ref mut future)) => future.take().unwrap(),
Some(&mut Slot::Vacant { .. }) => return false,
None => return false,
};
// Drive this future forward. If it's done we remove it and if it's not
// done then we put it back in the tasks array.
let done = match future.poll_future_notify(&self.notify, task_id) {
Ok(Async::Ready(())) | Err(()) => true,
Ok(Async::NotReady) => false,
};
let mut tasks = self.tasks.borrow_mut();
if done {
tasks[task_id] = Slot::Vacant { next_vacant: self.next_vacant.get() };
self.next_vacant.set(task_id);
} else {
tasks[task_id] = Slot::Running(Some(future));
}
return false
}
}
impl<F> Executor<F> for Core
where F: Future<Item = (), Error = ()> + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.spawn(future);
Ok(())
}
}
struct MyNotify {
// TODO: it's pretty unfortunate to use a `Mutex` here where the `Sender`
// itself is basically `Sync` as-is. Ideally this'd use something like
// an off-the-shelf mpsc queue as well as `thread::park` and
// `Thread::unpark`.
tx: Mutex<mpsc::Sender<usize>>,
}
impl Notify for MyNotify {
fn notify(&self, id: usize) {
drop(self.tx.lock().unwrap().send(id));
}
}