blob: 644ac4e45f6569e62ee82b890fd0f23d3898e5a0 [file] [log] [blame]
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Homing I/O implementation
//!
//! In libuv, whenever a handle is created on an I/O loop it is illegal to use
//! that handle outside of that I/O loop. We use libuv I/O with our green
//! scheduler, and each green scheduler corresponds to a different I/O loop on a
//! different OS thread. Green tasks are also free to roam among schedulers,
//! which implies that it is possible to create an I/O handle on one event loop
//! and then attempt to use it on another.
//!
//! In order to solve this problem, this module implements the notion of a
//! "homing operation" which will transplant a task from its currently running
//! scheduler back onto the original I/O loop. This is accomplished entirely at
//! the librustuv layer with very little cooperation from the scheduler (which
//! we don't even know exists technically).
//!
//! These homing operations are completed by first realizing that we're on the
//! wrong I/O loop, then descheduling ourselves, sending ourselves to the
//! correct I/O loop, and then waking up the I/O loop in order to process its
//! local queue of tasks which need to run.
//!
//! This enqueueing is done with a concurrent queue from libstd, and the
//! signalling is achieved with an async handle.
#![allow(dead_code)]
use std::mem;
use std::rt::local::Local;
use std::rt::rtio::LocalIo;
use std::rt::task::{Task, BlockedTask};
use ForbidUnwind;
use queue::{Queue, QueuePool};
/// A handle to a remote libuv event loop. This handle will keep the event loop
/// alive while active in order to ensure that a homing operation can always be
/// completed.
///
/// Handles are clone-able in order to derive new handles from existing handles
/// (very useful for when accepting a socket from a server).
pub struct HomeHandle {
queue: Queue,
id: uint,
}
impl HomeHandle {
pub fn new(id: uint, pool: &mut QueuePool) -> HomeHandle {
HomeHandle { queue: pool.queue(), id: id }
}
fn send(&mut self, task: BlockedTask) {
self.queue.push(task);
}
}
impl Clone for HomeHandle {
fn clone(&self) -> HomeHandle {
HomeHandle {
queue: self.queue.clone(),
id: self.id,
}
}
}
pub fn local_id() -> uint {
let mut io = match LocalIo::borrow() {
Some(io) => io, None => return 0,
};
let io = io.get();
unsafe {
let (_vtable, ptr): (uint, uint) = mem::transmute(io);
return ptr;
}
}
#[doc(hidden)]
pub trait HomingIO {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle;
/// This function will move tasks to run on their home I/O scheduler. Note
/// that this function does *not* pin the task to the I/O scheduler, but
/// rather it simply moves it to running on the I/O scheduler.
fn go_to_io_home(&mut self) -> uint {
let _f = ForbidUnwind::new("going home");
let cur_loop_id = local_id();
let destination = self.home().id;
// Try at all costs to avoid the homing operation because it is quite
// expensive. Hence, we only deschedule/send if we're not on the correct
// event loop. If we're already on the home event loop, then we're good
// to go (remember we have no preemption, so we're guaranteed to stay on
// this event loop as long as we avoid the scheduler).
if cur_loop_id != destination {
let cur_task: Box<Task> = Local::take();
cur_task.deschedule(1, |task| {
self.home().send(task);
Ok(())
});
// Once we wake up, assert that we're in the right location
assert_eq!(local_id(), destination);
}
return destination;
}
/// Fires a single homing missile, returning another missile targeted back
/// at the original home of this task. In other words, this function will
/// move the local task to its I/O scheduler and then return an RAII wrapper
/// which will return the task home.
fn fire_homing_missile(&mut self) -> HomingMissile {
HomingMissile { io_home: self.go_to_io_home() }
}
}
/// After a homing operation has been completed, this will return the current
/// task back to its appropriate home (if applicable). The field is used to
/// assert that we are where we think we are.
pub struct HomingMissile {
io_home: uint,
}
impl HomingMissile {
/// Check at runtime that the task has *not* transplanted itself to a
/// different I/O loop while executing.
pub fn check(&self, msg: &'static str) {
assert!(local_id() == self.io_home, "{}", msg);
}
}
impl Drop for HomingMissile {
fn drop(&mut self) {
let _f = ForbidUnwind::new("leaving home");
// It would truly be a sad day if we had moved off the home I/O
// scheduler while we were doing I/O.
self.check("task moved away from the home scheduler");
}
}
#[cfg(test)]
mod test {
use green::sched;
use green::{SchedPool, PoolConfig};
use std::rt::rtio::RtioUdpSocket;
use std::rt::task::TaskOpts;
use net::UdpWatcher;
use super::super::local_loop;
// On one thread, create a udp socket. Then send that socket to another
// thread and destroy the socket on the remote thread. This should make sure
// that homing kicks in for the socket to go back home to the original
// thread, close itself, and then come back to the last thread.
#[test]
fn test_homing_closes_correctly() {
let (tx, rx) = channel();
let mut pool = SchedPool::new(PoolConfig {
threads: 1,
event_loop_factory: ::event_loop,
});
pool.spawn(TaskOpts::new(), proc() {
let listener = UdpWatcher::bind(local_loop(), ::next_test_ip4());
tx.send(listener.unwrap());
});
let task = pool.task(TaskOpts::new(), proc() {
drop(rx.recv());
});
pool.spawn_sched().send(sched::TaskFromFriend(task));
pool.shutdown();
}
#[test]
fn test_homing_read() {
let (tx, rx) = channel();
let mut pool = SchedPool::new(PoolConfig {
threads: 1,
event_loop_factory: ::event_loop,
});
pool.spawn(TaskOpts::new(), proc() {
let addr1 = ::next_test_ip4();
let addr2 = ::next_test_ip4();
let listener = UdpWatcher::bind(local_loop(), addr2);
tx.send((listener.unwrap(), addr1));
let mut listener = UdpWatcher::bind(local_loop(), addr1).unwrap();
listener.sendto([1, 2, 3, 4], addr2).ok().unwrap();
});
let task = pool.task(TaskOpts::new(), proc() {
let (mut watcher, addr) = rx.recv();
let mut buf = [0, ..10];
assert!(watcher.recvfrom(buf).ok().unwrap() == (4, addr));
});
pool.spawn_sched().send(sched::TaskFromFriend(task));
pool.shutdown();
}
}