blob: ec7518157c67b5aab021d9ea0905cdfb893986c8 [file] [log] [blame]
//! The core reactor driving all I/O
//!
//! This module contains the `Core` type which is the reactor for all I/O
//! happening in `tokio-core`. This reactor (or event loop) is used to run
//! futures, schedule tasks, issue I/O requests, etc.
use std::cell::RefCell;
use std::cmp;
use std::fmt;
use std::io;
use std::mem;
use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicBool, ATOMIC_USIZE_INIT, Ordering};
use std::time::{Instant, Duration};
use tokio;
use tokio::executor::current_thread::{CurrentThread, TaskExecutor};
use tokio_executor;
use tokio_executor::park::{Park, Unpark, ParkThread, UnparkThread};
use futures::{Future, IntoFuture, Async};
use futures::future::{self, Executor, ExecuteError};
use futures::executor::{self, Spawn, Notify};
use futures::sync::mpsc;
use futures::task::Task;
use mio;
use slab::Slab;
use heap::{Heap, Slot};
mod timeout_token;
mod poll_evented;
mod poll_evented2;
mod timeout;
mod interval;
pub use self::poll_evented::PollEvented;
pub(crate) use self::poll_evented2::PollEvented as PollEvented2;
pub use self::timeout::Timeout;
pub use self::interval::Interval;
static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
scoped_thread_local!(static CURRENT_LOOP: Core);
/// An event loop.
///
/// The event loop is the main source of blocking in an application which drives
/// all other I/O events and notifications happening. Each event loop can have
/// multiple handles pointing to it, each of which can then be used to create
/// various I/O objects to interact with the event loop in interesting ways.
// TODO: expand this
pub struct Core {
/// Uniquely identifies the reactor
id: usize,
/// Handle to the Tokio runtime
rt: tokio::runtime::Runtime,
/// Executes tasks
executor: RefCell<CurrentThread>,
/// Wakes up the thread when the `run` future is notified
notify_future: Arc<MyNotify>,
/// Wakes up the thread when a message is posted to `rx`
notify_rx: Arc<MyNotify>,
/// Send messages across threads to the core
tx: mpsc::UnboundedSender<Message>,
/// Receive messages
rx: RefCell<Spawn<mpsc::UnboundedReceiver<Message>>>,
// Shared inner state
inner: Rc<RefCell<Inner>>,
}
struct Inner {
// Tasks that need to be spawned onto the executor.
pending_spawn: Vec<Box<Future<Item = (), Error = ()>>>,
// Timer wheel keeping track of all timeouts. The `usize` stored in the
// timer wheel is an index into the slab below.
//
// The slab below keeps track of the timeouts themselves as well as the
// state of the timeout itself. The `TimeoutToken` type is an index into the
// `timeouts` slab.
timer_heap: Heap<(Instant, usize)>,
timeouts: Slab<(Option<Slot>, TimeoutState)>,
}
/// An unique ID for a Core
///
/// An ID by which different cores may be distinguished. Can be compared and used as an index in
/// a `HashMap`.
///
/// The ID is globally unique and never reused.
#[derive(Clone,Copy,Eq,PartialEq,Hash,Debug)]
pub struct CoreId(usize);
/// Handle to an event loop, used to construct I/O objects, send messages, and
/// otherwise interact indirectly with the event loop itself.
///
/// Handles can be cloned, and when cloned they will still refer to the
/// same underlying event loop.
#[derive(Clone)]
pub struct Remote {
id: usize,
tx: mpsc::UnboundedSender<Message>,
new_handle: tokio::reactor::Handle,
}
/// A non-sendable handle to an event loop, useful for manufacturing instances
/// of `LoopData`.
#[derive(Clone)]
pub struct Handle {
remote: Remote,
inner: Weak<RefCell<Inner>>,
thread_pool: ::tokio::runtime::TaskExecutor,
}
enum TimeoutState {
NotFired,
Fired,
Waiting(Task),
}
enum Message {
UpdateTimeout(usize, Task),
ResetTimeout(usize, Instant),
CancelTimeout(usize),
Run(Box<FnBox>),
}
// ===== impl Core =====
impl Core {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub fn new() -> io::Result<Core> {
// Create a new parker
let park = ParkThread::new();
// Create notifiers
let notify_future = Arc::new(MyNotify::new(park.unpark()));
let notify_rx = Arc::new(MyNotify::new(park.unpark()));
// New Tokio reactor + threadpool
let rt = tokio::runtime::Runtime::new()?;
// Executor to run !Send futures
let executor = RefCell::new(CurrentThread::new_with_park(park));
// Used to send messages across threads
let (tx, rx) = mpsc::unbounded();
// Wrap the rx half with a future context and refcell
let rx = RefCell::new(executor::spawn(rx));
let id = NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed);
Ok(Core {
id,
rt,
notify_future,
notify_rx,
tx,
rx,
executor,
inner: Rc::new(RefCell::new(Inner {
pending_spawn: vec![],
timeouts: Slab::with_capacity(1),
timer_heap: Heap::new(),
})),
})
}
/// Returns a handle to this event loop which cannot be sent across threads
/// but can be used as a proxy to the event loop itself.
///
/// Handles are cloneable and clones always refer to the same event loop.
/// This handle is typically passed into functions that create I/O objects
/// to bind them to this event loop.
pub fn handle(&self) -> Handle {
Handle {
remote: self.remote(),
inner: Rc::downgrade(&self.inner),
thread_pool: self.rt.executor().clone(),
}
}
/// Returns a reference to the runtime backing the instance
///
/// This provides access to the newer features of Tokio.
pub fn runtime(&self) -> &tokio::runtime::Runtime {
&self.rt
}
/// Generates a remote handle to this event loop which can be used to spawn
/// tasks from other threads into this event loop.
pub fn remote(&self) -> Remote {
Remote {
id: self.id,
tx: self.tx.clone(),
new_handle: self.rt.handle().clone(),
}
}
/// Runs a future until completion, driving the event loop while we're
/// otherwise waiting for the future to complete.
///
/// This function will begin executing the event loop and will finish once
/// the provided future is resolved. Note that the future argument here
/// crucially does not require the `'static` nor `Send` bounds. As a result
/// the future will be "pinned" to not only this thread but also this stack
/// frame.
///
/// This function will return the value that the future resolves to once
/// the future has finished. If the future never resolves then this function
/// will never return.
///
/// # Panics
///
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
pub fn run<F>(&mut self, f: F) -> Result<F::Item, F::Error>
where F: Future,
{
let mut task = executor::spawn(f);
let handle1 = self.rt.handle().clone();
let handle2 = self.rt.handle().clone();
let mut executor1 = self.rt.executor().clone();
let mut executor2 = self.rt.executor().clone();
// Make sure the future will run at least once on enter
self.notify_future.notify(0);
loop {
if self.notify_future.take() {
let mut enter = tokio_executor::enter()
.ok().expect("cannot recursively call into `Core`");
let notify = &self.notify_future;
let mut current_thread = self.executor.borrow_mut();
let res = try!(CURRENT_LOOP.set(self, || {
::tokio_reactor::with_default(&handle1, &mut enter, |enter| {
tokio_executor::with_default(&mut executor1, enter, |enter| {
current_thread.enter(enter)
.block_on(future::lazy(|| {
Ok::<_, ()>(task.poll_future_notify(notify, 0))
})).unwrap()
})
})
}));
if let Async::Ready(e) = res {
return Ok(e)
}
}
self.poll(None, &handle2, &mut executor2);
}
}
/// Performs one iteration of the event loop, blocking on waiting for events
/// for at most `max_wait` (forever if `None`).
///
/// It only makes sense to call this method if you've previously spawned
/// a future onto this event loop.
///
/// `loop { lp.turn(None) }` is equivalent to calling `run` with an
/// empty future (one that never finishes).
pub fn turn(&mut self, max_wait: Option<Duration>) {
let handle = self.rt.handle().clone();
let mut executor = self.rt.executor().clone();
self.poll(max_wait, &handle, &mut executor);
}
fn poll(&mut self, max_wait: Option<Duration>,
handle: &tokio::reactor::Handle,
sender: &mut tokio::runtime::TaskExecutor) {
let mut enter = tokio_executor::enter()
.ok().expect("cannot recursively call into `Core`");
::tokio_reactor::with_default(handle, &mut enter, |enter| {
tokio_executor::with_default(sender, enter, |enter| {
// Given the `max_wait` variable specified, figure out the actual
// timeout that we're going to pass to `poll`. This involves taking a
// look at active timers on our heap as well.
let start = Instant::now();
let timeout = self.inner.borrow_mut().timer_heap.peek().map(|t| {
if t.0 < start {
Duration::new(0, 0)
} else {
t.0 - start
}
});
let timeout = match (max_wait, timeout) {
(Some(d1), Some(d2)) => Some(cmp::min(d1, d2)),
(max_wait, timeout) => max_wait.or(timeout),
};
// Process all the events that came in, dispatching appropriately
if self.notify_rx.take() {
CURRENT_LOOP.set(self, || self.consume_queue());
}
// Drain any futures pending spawn
{
let mut e = self.executor.borrow_mut();
let mut i = self.inner.borrow_mut();
for f in i.pending_spawn.drain(..) {
// Little hack
e.enter(enter).block_on(future::lazy(|| {
TaskExecutor::current().spawn_local(f).unwrap();
Ok::<_, ()>(())
})).unwrap();
}
}
CURRENT_LOOP.set(self, || {
self.executor.borrow_mut()
.enter(enter)
.turn(timeout)
.ok().expect("error in `CurrentThread::turn`");
});
let after_poll = Instant::now();
debug!("loop poll - {:?}", after_poll - start);
debug!("loop time - {:?}", after_poll);
// Process all timeouts that may have just occurred, updating the
// current time since
self.consume_timeouts(after_poll);
debug!("loop process, {:?}", after_poll.elapsed());
});
});
}
fn consume_timeouts(&mut self, now: Instant) {
loop {
let mut inner = self.inner.borrow_mut();
match inner.timer_heap.peek() {
Some(head) if head.0 <= now => {}
Some(_) => break,
None => break,
};
let (_, slab_idx) = inner.timer_heap.pop().unwrap();
trace!("firing timeout: {}", slab_idx);
inner.timeouts[slab_idx].0.take().unwrap();
let handle = inner.timeouts[slab_idx].1.fire();
drop(inner);
if let Some(handle) = handle {
self.notify_handle(handle);
}
}
}
/// Method used to notify a task handle.
///
/// Note that this should be used instead of `handle.notify()` to ensure
/// that the `CURRENT_LOOP` variable is set appropriately.
fn notify_handle(&self, handle: Task) {
debug!("notifying a task handle");
CURRENT_LOOP.set(self, || handle.notify());
}
fn consume_queue(&self) {
debug!("consuming notification queue");
// TODO: can we do better than `.unwrap()` here?
loop {
let msg = self.rx.borrow_mut().poll_stream_notify(&self.notify_rx, 0).unwrap();
match msg {
Async::Ready(Some(msg)) => self.notify(msg),
Async::NotReady |
Async::Ready(None) => break,
}
}
}
fn notify(&self, msg: Message) {
match msg {
Message::UpdateTimeout(t, handle) => {
let task = self.inner.borrow_mut().update_timeout(t, handle);
if let Some(task) = task {
self.notify_handle(task);
}
}
Message::ResetTimeout(t, at) => {
self.inner.borrow_mut().reset_timeout(t, at);
}
Message::CancelTimeout(t) => {
self.inner.borrow_mut().cancel_timeout(t)
}
Message::Run(r) => r.call_box(self),
}
}
/// Get the ID of this loop
pub fn id(&self) -> CoreId {
CoreId(self.id)
}
}
impl<F> Executor<F> for Core
where F: Future<Item = (), Error = ()> + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.handle().execute(future)
}
}
impl fmt::Debug for Core {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Core")
.field("id", &self.id())
.finish()
}
}
impl Inner {
fn add_timeout(&mut self, at: Instant) -> usize {
if self.timeouts.len() == self.timeouts.capacity() {
let len = self.timeouts.len();
self.timeouts.reserve_exact(len);
}
let entry = self.timeouts.vacant_entry();
let key = entry.key();
let slot = self.timer_heap.push((at, key));
entry.insert((Some(slot), TimeoutState::NotFired));
debug!("added a timeout: {}", key);
return key;
}
fn update_timeout(&mut self, token: usize, handle: Task) -> Option<Task> {
debug!("updating a timeout: {}", token);
self.timeouts[token].1.block(handle)
}
fn reset_timeout(&mut self, token: usize, at: Instant) {
let pair = &mut self.timeouts[token];
// TODO: avoid remove + push and instead just do one sift of the heap?
// In theory we could update it in place and then do the percolation
// as necessary
if let Some(slot) = pair.0.take() {
self.timer_heap.remove(slot);
}
let slot = self.timer_heap.push((at, token));
*pair = (Some(slot), TimeoutState::NotFired);
debug!("set a timeout: {}", token);
}
fn cancel_timeout(&mut self, token: usize) {
debug!("cancel a timeout: {}", token);
let pair = self.timeouts.remove(token);
if let (Some(slot), _state) = pair {
self.timer_heap.remove(slot);
}
}
}
impl Remote {
fn send(&self, msg: Message) {
self.with_loop(|lp| {
match lp {
Some(lp) => {
// We want to make sure that all messages are received in
// order, so we need to consume pending messages before
// delivering this message to the core. The actually
// `consume_queue` function, however, can be somewhat slow
// right now where receiving on a channel will acquire a
// lock and block the current task.
//
// To speed this up check the message queue's readiness as a
// sort of preflight check to see if we've actually got any
// messages. This should just involve some atomics and if it
// comes back false then we know for sure there are no
// pending messages, so we can immediately deliver our
// message.
if lp.notify_rx.take() {
lp.consume_queue();
}
lp.notify(msg);
}
None => {
match self.tx.unbounded_send(msg) {
Ok(()) => {}
// TODO: this error should punt upwards and we should
// notify the caller that the message wasn't
// received. This is tokio-core#17
Err(e) => drop(e),
}
}
}
})
}
fn with_loop<F, R>(&self, f: F) -> R
where F: FnOnce(Option<&Core>) -> R
{
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| {
let same = lp.id == self.id;
if same {
f(Some(lp))
} else {
f(None)
}
})
} else {
f(None)
}
}
/// Spawns a new future into the event loop this remote is associated with.
///
/// This function takes a closure which is executed within the context of
/// the I/O loop itself. The future returned by the closure will be
/// scheduled on the event loop and run to completion.
///
/// Note that while the closure, `F`, requires the `Send` bound as it might
/// cross threads, the future `R` does not.
///
/// # Panics
///
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
pub fn spawn<F, R>(&self, f: F)
where F: FnOnce(&Handle) -> R + Send + 'static,
R: IntoFuture<Item=(), Error=()>,
R::Future: 'static,
{
self.send(Message::Run(Box::new(|lp: &Core| {
let f = f(&lp.handle());
lp.handle().spawn(f.into_future());
})));
}
/// Return the ID of the represented Core
pub fn id(&self) -> CoreId {
CoreId(self.id)
}
/// Attempts to "promote" this remote to a handle, if possible.
///
/// This function is intended for structures which typically work through a
/// `Remote` but want to optimize runtime when the remote doesn't actually
/// leave the thread of the original reactor. This will attempt to return a
/// handle if the `Remote` is on the same thread as the event loop and the
/// event loop is running.
///
/// If this `Remote` has moved to a different thread or if the event loop is
/// running, then `None` may be returned. If you need to guarantee access to
/// a `Handle`, then you can call this function and fall back to using
/// `spawn` above if it returns `None`.
pub fn handle(&self) -> Option<Handle> {
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| {
let same = lp.id == self.id;
if same {
Some(lp.handle())
} else {
None
}
})
} else {
None
}
}
}
impl<F> Executor<F> for Remote
where F: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.spawn(|_| future);
Ok(())
}
}
impl fmt::Debug for Remote {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Remote")
.field("id", &self.id())
.finish()
}
}
impl Handle {
/// Returns a reference to the new Tokio handle
pub fn new_tokio_handle(&self) -> &::tokio::reactor::Handle {
&self.remote.new_handle
}
/// Returns a reference to the underlying remote handle to the event loop.
pub fn remote(&self) -> &Remote {
&self.remote
}
/// Spawns a new future on the event loop this handle is associated with.
///
/// # Panics
///
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
pub fn spawn<F>(&self, f: F)
where F: Future<Item=(), Error=()> + 'static,
{
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => {
return;
}
};
// Try accessing the executor directly
if let Ok(mut inner) = inner.try_borrow_mut() {
inner.pending_spawn.push(Box::new(f));
return;
}
// If that doesn't work, the executor is probably active, so spawn using
// the global fn.
let _ = TaskExecutor::current().spawn_local(Box::new(f));
}
/// Spawns a new future onto the threadpool
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn_send<F>(&self, f: F)
where F: Future<Item=(), Error=()> + Send + 'static,
{
self.thread_pool.spawn(f);
}
/// Spawns a closure on this event loop.
///
/// This function is a convenience wrapper around the `spawn` function above
/// for running a closure wrapped in `futures::lazy`. It will spawn the
/// function `f` provided onto the event loop, and continue to run the
/// future returned by `f` on the event loop as well.
///
/// # Panics
///
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
pub fn spawn_fn<F, R>(&self, f: F)
where F: FnOnce() -> R + 'static,
R: IntoFuture<Item=(), Error=()> + 'static,
{
self.spawn(future::lazy(f))
}
/// Return the ID of the represented Core
pub fn id(&self) -> CoreId {
self.remote.id()
}
}
impl<F> Executor<F> for Handle
where F: Future<Item = (), Error = ()> + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.spawn(future);
Ok(())
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Handle")
.field("id", &self.id())
.finish()
}
}
impl TimeoutState {
fn block(&mut self, handle: Task) -> Option<Task> {
match *self {
TimeoutState::Fired => return Some(handle),
_ => {}
}
*self = TimeoutState::Waiting(handle);
None
}
fn fire(&mut self) -> Option<Task> {
match mem::replace(self, TimeoutState::Fired) {
TimeoutState::NotFired => None,
TimeoutState::Fired => panic!("fired twice?"),
TimeoutState::Waiting(handle) => Some(handle),
}
}
}
struct MyNotify {
unpark: UnparkThread,
notified: AtomicBool,
}
impl MyNotify {
fn new(unpark: UnparkThread) -> Self {
MyNotify {
unpark,
notified: AtomicBool::new(true),
}
}
fn take(&self) -> bool {
self.notified.swap(false, Ordering::SeqCst)
}
}
impl Notify for MyNotify {
fn notify(&self, _: usize) {
self.notified.store(true, Ordering::SeqCst);
self.unpark.unpark();
}
}
trait FnBox: Send + 'static {
fn call_box(self: Box<Self>, lp: &Core);
}
impl<F: FnOnce(&Core) + Send + 'static> FnBox for F {
fn call_box(self: Box<Self>, lp: &Core) {
(*self)(lp)
}
}
const READ: usize = 1 << 0;
const WRITE: usize = 1 << 1;
fn ready2usize(ready: mio::Ready) -> usize {
let mut bits = 0;
if ready.is_readable() {
bits |= READ;
}
if ready.is_writable() {
bits |= WRITE;
}
bits | platform::ready2usize(ready)
}
fn usize2ready(bits: usize) -> mio::Ready {
let mut ready = mio::Ready::empty();
if bits & READ != 0 {
ready.insert(mio::Ready::readable());
}
if bits & WRITE != 0 {
ready.insert(mio::Ready::writable());
}
ready | platform::usize2ready(bits)
}
#[cfg(all(unix, not(target_os = "fuchsia")))]
mod platform {
use mio::Ready;
use mio::unix::UnixReady;
const HUP: usize = 1 << 2;
const ERROR: usize = 1 << 3;
const AIO: usize = 1 << 4;
#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
fn is_aio(ready: &Ready) -> bool {
UnixReady::from(*ready).is_aio()
}
#[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))]
fn is_aio(_ready: &Ready) -> bool {
false
}
pub fn ready2usize(ready: Ready) -> usize {
let ready = UnixReady::from(ready);
let mut bits = 0;
if is_aio(&ready) {
bits |= AIO;
}
if ready.is_error() {
bits |= ERROR;
}
if ready.is_hup() {
bits |= HUP;
}
bits
}
#[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "ios",
target_os = "macos"))]
fn usize2ready_aio(ready: &mut UnixReady) {
ready.insert(UnixReady::aio());
}
#[cfg(not(any(target_os = "dragonfly",
target_os = "freebsd", target_os = "ios", target_os = "macos")))]
fn usize2ready_aio(_ready: &mut UnixReady) {
// aio not available here → empty
}
pub fn usize2ready(bits: usize) -> Ready {
let mut ready = UnixReady::from(Ready::empty());
if bits & AIO != 0 {
usize2ready_aio(&mut ready);
}
if bits & HUP != 0 {
ready.insert(UnixReady::hup());
}
if bits & ERROR != 0 {
ready.insert(UnixReady::error());
}
ready.into()
}
}
#[cfg(any(windows, target_os = "fuchsia"))]
mod platform {
use mio::Ready;
pub fn ready2usize(_r: Ready) -> usize {
0
}
pub fn usize2ready(_r: usize) -> Ready {
Ready::empty()
}
}