| // Copyright 2013 The Rust Project Developers. See the COPYRIGHT |
| // file at the top-level directory of this distribution and at |
| // http://rust-lang.org/COPYRIGHT. |
| // |
| // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or |
| // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license |
| // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your |
| // option. This file may not be copied, modified, or distributed |
| // except according to those terms. |
| |
| //! The "green scheduling" library |
| //! |
| //! This library provides M:N threading for rust programs. Internally this has |
| //! the implementation of a green scheduler along with context switching and a |
| //! stack-allocation strategy. This can be optionally linked in to rust |
| //! programs in order to provide M:N functionality inside of 1:1 programs. |
| //! |
| //! # Architecture |
| //! |
| //! An M:N scheduling library implies that there are N OS thread upon which M |
| //! "green threads" are multiplexed. In other words, a set of green threads are |
| //! all run inside a pool of OS threads. |
| //! |
| //! With this design, you can achieve _concurrency_ by spawning many green |
| //! threads, and you can achieve _parallelism_ by running the green threads |
| //! simultaneously on multiple OS threads. Each OS thread is a candidate for |
| //! being scheduled on a different core (the source of parallelism), and then |
| //! all of the green threads cooperatively schedule amongst one another (the |
| //! source of concurrency). |
| //! |
| //! ## Schedulers |
| //! |
| //! In order to coordinate among green threads, each OS thread is primarily |
| //! running something which we call a Scheduler. Whenever a reference to a |
| //! Scheduler is made, it is synonymous to referencing one OS thread. Each |
| //! scheduler is bound to one and exactly one OS thread, and the thread that it |
| //! is bound to never changes. |
| //! |
| //! Each scheduler is connected to a pool of other schedulers (a `SchedPool`) |
| //! which is the thread pool term from above. A pool of schedulers all share the |
| //! work that they create. Furthermore, whenever a green thread is created (also |
| //! synonymously referred to as a green task), it is associated with a |
| //! `SchedPool` forevermore. A green thread cannot leave its scheduler pool. |
| //! |
| //! Schedulers can have at most one green thread running on them at a time. When |
| //! a scheduler is asleep on its event loop, there are no green tasks running on |
| //! the OS thread or the scheduler. The term "context switch" is used for when |
| //! the running green thread is swapped out, but this simply changes the one |
| //! green thread which is running on the scheduler. |
| //! |
| //! ## Green Threads |
| //! |
| //! A green thread can largely be summarized by a stack and a register context. |
| //! Whenever a green thread is spawned, it allocates a stack, and then prepares |
| //! a register context for execution. The green task may be executed across |
| //! multiple OS threads, but it will always use the same stack and it will carry |
| //! its register context across OS threads. |
| //! |
| //! Each green thread is cooperatively scheduled with other green threads. |
| //! Primarily, this means that there is no pre-emption of a green thread. The |
| //! major consequence of this design is that a green thread stuck in an infinite |
| //! loop will prevent all other green threads from running on that particular |
| //! scheduler. |
| //! |
| //! Scheduling events for green threads occur on communication and I/O |
| //! boundaries. For example, if a green task blocks waiting for a message on a |
| //! channel some other green thread can now run on the scheduler. This also has |
| //! the consequence that until a green thread performs any form of scheduling |
| //! event, it will be running on the same OS thread (unconditionally). |
| //! |
| //! ## Work Stealing |
| //! |
| //! With a pool of schedulers, a new green task has a number of options when |
| //! deciding where to run initially. The current implementation uses a concept |
| //! called work stealing in order to spread out work among schedulers. |
| //! |
| //! In a work-stealing model, each scheduler maintains a local queue of tasks to |
| //! run, and this queue is stolen from by other schedulers. Implementation-wise, |
| //! work stealing has some hairy parts, but from a user-perspective, work |
| //! stealing simply implies what with M green threads and N schedulers where |
| //! M > N it is very likely that all schedulers will be busy executing work. |
| //! |
| //! # Considerations when using libgreen |
| //! |
| //! An M:N runtime has both pros and cons, and there is no one answer as to |
| //! whether M:N or 1:1 is appropriate to use. As always, there are many |
| //! advantages and disadvantages between the two. Regardless of the workload, |
| //! however, there are some aspects of using green thread which you should be |
| //! aware of: |
| //! |
| //! * The largest concern when using libgreen is interoperating with native |
| //! code. Care should be taken when calling native code that will block the OS |
| //! thread as it will prevent further green tasks from being scheduled on the |
| //! OS thread. |
| //! |
| //! * Native code using thread-local-storage should be approached |
| //! with care. Green threads may migrate among OS threads at any time, so |
| //! native libraries using thread-local state may not always work. |
| //! |
| //! * Native synchronization primitives (e.g. pthread mutexes) will also not |
| //! work for green threads. The reason for this is because native primitives |
| //! often operate on a _os thread_ granularity whereas green threads are |
| //! operating on a more granular unit of work. |
| //! |
| //! * A green threading runtime is not fork-safe. If the process forks(), it |
| //! cannot expect to make reasonable progress by continuing to use green |
| //! threads. |
| //! |
| //! Note that these concerns do not mean that operating with native code is a |
| //! lost cause. These are simply just concerns which should be considered when |
| //! invoking native code. |
| //! |
| //! # Starting with libgreen |
| //! |
| //! ```rust |
| //! extern crate green; |
| //! |
| //! #[start] |
| //! fn start(argc: int, argv: **u8) -> int { |
| //! green::start(argc, argv, green::basic::event_loop, main) |
| //! } |
| //! |
| //! fn main() { |
| //! // this code is running in a pool of schedulers |
| //! } |
| //! ``` |
| //! |
| //! > **Note**: This `main` function in this example does *not* have I/O |
| //! > support. The basic event loop does not provide any support |
| //! |
| //! # Starting with I/O support in libgreen |
| //! |
| //! ```rust |
| //! extern crate green; |
| //! extern crate rustuv; |
| //! |
| //! #[start] |
| //! fn start(argc: int, argv: **u8) -> int { |
| //! green::start(argc, argv, rustuv::event_loop, main) |
| //! } |
| //! |
| //! fn main() { |
| //! // this code is running in a pool of schedulers all powered by libuv |
| //! } |
| //! ``` |
| //! |
| //! The above code can also be shortened with a macro from libgreen. |
| //! |
| //! ``` |
| //! #![feature(phase)] |
| //! #[phase(plugin)] extern crate green; |
| //! |
| //! green_start!(main) |
| //! |
| //! fn main() { |
| //! // run inside of a green pool |
| //! } |
| //! ``` |
| //! |
| //! # Using a scheduler pool |
| //! |
| //! This library adds a `GreenTaskBuilder` trait that extends the methods |
| //! available on `std::task::TaskBuilder` to allow spawning a green task, |
| //! possibly pinned to a particular scheduler thread: |
| //! |
| //! ```rust |
| //! use std::task::TaskBuilder; |
| //! use green::{SchedPool, PoolConfig, GreenTaskBuilder}; |
| //! |
| //! let config = PoolConfig::new(); |
| //! let mut pool = SchedPool::new(config); |
| //! |
| //! // Spawn tasks into the pool of schedulers |
| //! TaskBuilder::new().green(&mut pool).spawn(proc() { |
| //! // this code is running inside the pool of schedulers |
| //! |
| //! spawn(proc() { |
| //! // this code is also running inside the same scheduler pool |
| //! }); |
| //! }); |
| //! |
| //! // Dynamically add a new scheduler to the scheduler pool. This adds another |
| //! // OS thread that green threads can be multiplexed on to. |
| //! let mut handle = pool.spawn_sched(); |
| //! |
| //! // Pin a task to the spawned scheduler |
| //! TaskBuilder::new().green_pinned(&mut pool, &mut handle).spawn(proc() { |
| //! /* ... */ |
| //! }); |
| //! |
| //! // Handles keep schedulers alive, so be sure to drop all handles before |
| //! // destroying the sched pool |
| //! drop(handle); |
| //! |
| //! // Required to shut down this scheduler pool. |
| //! // The task will fail if `shutdown` is not called. |
| //! pool.shutdown(); |
| //! ``` |
| |
| #![crate_id = "green#0.11.0"] |
| #![experimental] |
| #![license = "MIT/ASL2"] |
| #![crate_type = "rlib"] |
| #![crate_type = "dylib"] |
| #![doc(html_logo_url = "http://www.rust-lang.org/logos/rust-logo-128x128-blk-v2.png", |
| html_favicon_url = "http://www.rust-lang.org/favicon.ico", |
| html_root_url = "http://doc.rust-lang.org/0.11.0/", |
| html_playground_url = "http://play.rust-lang.org/")] |
| |
| // NB this does *not* include globs, please keep it that way. |
| #![feature(macro_rules, phase)] |
| #![allow(visible_private_types)] |
| #![allow(deprecated)] |
| #![feature(default_type_params)] |
| |
| #[cfg(test)] #[phase(plugin, link)] extern crate log; |
| #[cfg(test)] extern crate rustuv; |
| extern crate libc; |
| extern crate alloc; |
| |
| use alloc::arc::Arc; |
| use std::mem::replace; |
| use std::os; |
| use std::rt::rtio; |
| use std::rt::thread::Thread; |
| use std::rt::task::TaskOpts; |
| use std::rt; |
| use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT}; |
| use std::sync::deque; |
| use std::task::{TaskBuilder, Spawner}; |
| |
| use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, PinnedTask, NewNeighbor}; |
| use sleeper_list::SleeperList; |
| use stack::StackPool; |
| use task::GreenTask; |
| |
| mod macros; |
| mod simple; |
| mod message_queue; |
| |
| pub mod basic; |
| pub mod context; |
| pub mod coroutine; |
| pub mod sched; |
| pub mod sleeper_list; |
| pub mod stack; |
| pub mod task; |
| |
| /// A helper macro for booting a program with libgreen |
| /// |
| /// # Example |
| /// |
| /// ``` |
| /// #![feature(phase)] |
| /// #[phase(plugin)] extern crate green; |
| /// |
| /// green_start!(main) |
| /// |
| /// fn main() { |
| /// // running with libgreen |
| /// } |
| /// ``` |
| #[macro_export] |
| macro_rules! green_start( ($f:ident) => ( |
| mod __start { |
| extern crate green; |
| extern crate rustuv; |
| |
| #[start] |
| fn start(argc: int, argv: **u8) -> int { |
| green::start(argc, argv, rustuv::event_loop, super::$f) |
| } |
| } |
| ) ) |
| |
| /// Set up a default runtime configuration, given compiler-supplied arguments. |
| /// |
| /// This function will block until the entire pool of M:N schedulers have |
| /// exited. This function also requires a local task to be available. |
| /// |
| /// # Arguments |
| /// |
| /// * `argc` & `argv` - The argument vector. On Unix this information is used |
| /// by os::args. |
| /// * `main` - The initial procedure to run inside of the M:N scheduling pool. |
| /// Once this procedure exits, the scheduling pool will begin to shut |
| /// down. The entire pool (and this function) will only return once |
| /// all child tasks have finished executing. |
| /// |
| /// # Return value |
| /// |
| /// The return value is used as the process return code. 0 on success, 101 on |
| /// error. |
| pub fn start(argc: int, argv: **u8, |
| event_loop_factory: fn() -> Box<rtio::EventLoop + Send>, |
| main: proc():Send) -> int { |
| rt::init(argc, argv); |
| let mut main = Some(main); |
| let mut ret = None; |
| simple::task().run(|| { |
| ret = Some(run(event_loop_factory, main.take_unwrap())); |
| }).destroy(); |
| // unsafe is ok b/c we're sure that the runtime is gone |
| unsafe { rt::cleanup() } |
| ret.unwrap() |
| } |
| |
| /// Execute the main function in a pool of M:N schedulers. |
| /// |
| /// Configures the runtime according to the environment, by default using a task |
| /// scheduler with the same number of threads as cores. Returns a process exit |
| /// code. |
| /// |
| /// This function will not return until all schedulers in the associated pool |
| /// have returned. |
| pub fn run(event_loop_factory: fn() -> Box<rtio::EventLoop + Send>, |
| main: proc():Send) -> int { |
| // Create a scheduler pool and spawn the main task into this pool. We will |
| // get notified over a channel when the main task exits. |
| let mut cfg = PoolConfig::new(); |
| cfg.event_loop_factory = event_loop_factory; |
| let mut pool = SchedPool::new(cfg); |
| let (tx, rx) = channel(); |
| let mut opts = TaskOpts::new(); |
| opts.on_exit = Some(proc(r) tx.send(r)); |
| opts.name = Some("<main>".into_maybe_owned()); |
| pool.spawn(opts, main); |
| |
| // Wait for the main task to return, and set the process error code |
| // appropriately. |
| if rx.recv().is_err() { |
| os::set_exit_status(rt::DEFAULT_ERROR_CODE); |
| } |
| |
| // Now that we're sure all tasks are dead, shut down the pool of schedulers, |
| // waiting for them all to return. |
| pool.shutdown(); |
| os::get_exit_status() |
| } |
| |
| /// Configuration of how an M:N pool of schedulers is spawned. |
| pub struct PoolConfig { |
| /// The number of schedulers (OS threads) to spawn into this M:N pool. |
| pub threads: uint, |
| /// A factory function used to create new event loops. If this is not |
| /// specified then the default event loop factory is used. |
| pub event_loop_factory: fn() -> Box<rtio::EventLoop + Send>, |
| } |
| |
| impl PoolConfig { |
| /// Returns the default configuration, as determined the environment |
| /// variables of this process. |
| pub fn new() -> PoolConfig { |
| PoolConfig { |
| threads: rt::default_sched_threads(), |
| event_loop_factory: basic::event_loop, |
| } |
| } |
| } |
| |
| /// A structure representing a handle to a pool of schedulers. This handle is |
| /// used to keep the pool alive and also reap the status from the pool. |
| pub struct SchedPool { |
| id: uint, |
| threads: Vec<Thread<()>>, |
| handles: Vec<SchedHandle>, |
| stealers: Vec<deque::Stealer<Box<task::GreenTask>>>, |
| next_friend: uint, |
| stack_pool: StackPool, |
| deque_pool: deque::BufferPool<Box<task::GreenTask>>, |
| sleepers: SleeperList, |
| factory: fn() -> Box<rtio::EventLoop + Send>, |
| task_state: TaskState, |
| tasks_done: Receiver<()>, |
| } |
| |
| /// This is an internal state shared among a pool of schedulers. This is used to |
| /// keep track of how many tasks are currently running in the pool and then |
| /// sending on a channel once the entire pool has been drained of all tasks. |
| #[deriving(Clone)] |
| struct TaskState { |
| cnt: Arc<AtomicUint>, |
| done: Sender<()>, |
| } |
| |
| impl SchedPool { |
| /// Execute the main function in a pool of M:N schedulers. |
| /// |
| /// This will configure the pool according to the `config` parameter, and |
| /// initially run `main` inside the pool of schedulers. |
| pub fn new(config: PoolConfig) -> SchedPool { |
| static mut POOL_ID: AtomicUint = INIT_ATOMIC_UINT; |
| |
| let PoolConfig { |
| threads: nscheds, |
| event_loop_factory: factory |
| } = config; |
| assert!(nscheds > 0); |
| |
| // The pool of schedulers that will be returned from this function |
| let (p, state) = TaskState::new(); |
| let mut pool = SchedPool { |
| threads: vec![], |
| handles: vec![], |
| stealers: vec![], |
| id: unsafe { POOL_ID.fetch_add(1, SeqCst) }, |
| sleepers: SleeperList::new(), |
| stack_pool: StackPool::new(), |
| deque_pool: deque::BufferPool::new(), |
| next_friend: 0, |
| factory: factory, |
| task_state: state, |
| tasks_done: p, |
| }; |
| |
| // Create a work queue for each scheduler, ntimes. Create an extra |
| // for the main thread if that flag is set. We won't steal from it. |
| let mut workers = Vec::with_capacity(nscheds); |
| let mut stealers = Vec::with_capacity(nscheds); |
| |
| for _ in range(0, nscheds) { |
| let (w, s) = pool.deque_pool.deque(); |
| workers.push(w); |
| stealers.push(s); |
| } |
| pool.stealers = stealers; |
| |
| // Now that we've got all our work queues, create one scheduler per |
| // queue, spawn the scheduler into a thread, and be sure to keep a |
| // handle to the scheduler and the thread to keep them alive. |
| for worker in workers.move_iter() { |
| rtdebug!("inserting a regular scheduler"); |
| |
| let mut sched = box Scheduler::new(pool.id, |
| (pool.factory)(), |
| worker, |
| pool.stealers.clone(), |
| pool.sleepers.clone(), |
| pool.task_state.clone()); |
| pool.handles.push(sched.make_handle()); |
| pool.threads.push(Thread::start(proc() { sched.bootstrap(); })); |
| } |
| |
| return pool; |
| } |
| |
| /// Creates a new task configured to run inside of this pool of schedulers. |
| /// This is useful to create a task which can then be sent to a specific |
| /// scheduler created by `spawn_sched` (and possibly pin it to that |
| /// scheduler). |
| #[deprecated = "use the green and green_pinned methods of GreenTaskBuilder instead"] |
| pub fn task(&mut self, opts: TaskOpts, f: proc():Send) -> Box<GreenTask> { |
| GreenTask::configure(&mut self.stack_pool, opts, f) |
| } |
| |
| /// Spawns a new task into this pool of schedulers, using the specified |
| /// options to configure the new task which is spawned. |
| /// |
| /// New tasks are spawned in a round-robin fashion to the schedulers in this |
| /// pool, but tasks can certainly migrate among schedulers once they're in |
| /// the pool. |
| #[deprecated = "use the green and green_pinned methods of GreenTaskBuilder instead"] |
| pub fn spawn(&mut self, opts: TaskOpts, f: proc():Send) { |
| let task = self.task(opts, f); |
| |
| // Figure out someone to send this task to |
| let idx = self.next_friend; |
| self.next_friend += 1; |
| if self.next_friend >= self.handles.len() { |
| self.next_friend = 0; |
| } |
| |
| // Jettison the task away! |
| self.handles.get_mut(idx).send(TaskFromFriend(task)); |
| } |
| |
| /// Spawns a new scheduler into this M:N pool. A handle is returned to the |
| /// scheduler for use. The scheduler will not exit as long as this handle is |
| /// active. |
| /// |
| /// The scheduler spawned will participate in work stealing with all of the |
| /// other schedulers currently in the scheduler pool. |
| pub fn spawn_sched(&mut self) -> SchedHandle { |
| let (worker, stealer) = self.deque_pool.deque(); |
| self.stealers.push(stealer.clone()); |
| |
| // Tell all existing schedulers about this new scheduler so they can all |
| // steal work from it |
| for handle in self.handles.mut_iter() { |
| handle.send(NewNeighbor(stealer.clone())); |
| } |
| |
| // Create the new scheduler, using the same sleeper list as all the |
| // other schedulers as well as having a stealer handle to all other |
| // schedulers. |
| let mut sched = box Scheduler::new(self.id, |
| (self.factory)(), |
| worker, |
| self.stealers.clone(), |
| self.sleepers.clone(), |
| self.task_state.clone()); |
| let ret = sched.make_handle(); |
| self.handles.push(sched.make_handle()); |
| self.threads.push(Thread::start(proc() { sched.bootstrap() })); |
| |
| return ret; |
| } |
| |
| /// Consumes the pool of schedulers, waiting for all tasks to exit and all |
| /// schedulers to shut down. |
| /// |
| /// This function is required to be called in order to drop a pool of |
| /// schedulers, it is considered an error to drop a pool without calling |
| /// this method. |
| /// |
| /// This only waits for all tasks in *this pool* of schedulers to exit, any |
| /// native tasks or extern pools will not be waited on |
| pub fn shutdown(mut self) { |
| self.stealers = vec![]; |
| |
| // Wait for everyone to exit. We may have reached a 0-task count |
| // multiple times in the past, meaning there could be several buffered |
| // messages on the `tasks_done` port. We're guaranteed that after *some* |
| // message the current task count will be 0, so we just receive in a |
| // loop until everything is totally dead. |
| while self.task_state.active() { |
| self.tasks_done.recv(); |
| } |
| |
| // Now that everyone's gone, tell everything to shut down. |
| for mut handle in replace(&mut self.handles, vec![]).move_iter() { |
| handle.send(Shutdown); |
| } |
| for thread in replace(&mut self.threads, vec![]).move_iter() { |
| thread.join(); |
| } |
| } |
| } |
| |
| impl TaskState { |
| fn new() -> (Receiver<()>, TaskState) { |
| let (tx, rx) = channel(); |
| (rx, TaskState { |
| cnt: Arc::new(AtomicUint::new(0)), |
| done: tx, |
| }) |
| } |
| |
| fn increment(&mut self) { |
| self.cnt.fetch_add(1, SeqCst); |
| } |
| |
| fn active(&self) -> bool { |
| self.cnt.load(SeqCst) != 0 |
| } |
| |
| fn decrement(&mut self) { |
| let prev = self.cnt.fetch_sub(1, SeqCst); |
| if prev == 1 { |
| self.done.send(()); |
| } |
| } |
| } |
| |
| impl Drop for SchedPool { |
| fn drop(&mut self) { |
| if self.threads.len() > 0 { |
| fail!("dropping a M:N scheduler pool that wasn't shut down"); |
| } |
| } |
| } |
| |
| /// A spawner for green tasks |
| pub struct GreenSpawner<'a>{ |
| pool: &'a mut SchedPool, |
| handle: Option<&'a mut SchedHandle> |
| } |
| |
| impl<'a> Spawner for GreenSpawner<'a> { |
| #[inline] |
| fn spawn(self, opts: TaskOpts, f: proc():Send) { |
| let GreenSpawner { pool, handle } = self; |
| match handle { |
| None => pool.spawn(opts, f), |
| Some(h) => h.send(PinnedTask(pool.task(opts, f))) |
| } |
| } |
| } |
| |
| /// An extension trait adding `green` configuration methods to `TaskBuilder`. |
| pub trait GreenTaskBuilder { |
| fn green<'a>(self, &'a mut SchedPool) -> TaskBuilder<GreenSpawner<'a>>; |
| fn green_pinned<'a>(self, &'a mut SchedPool, &'a mut SchedHandle) |
| -> TaskBuilder<GreenSpawner<'a>>; |
| } |
| |
| impl<S: Spawner> GreenTaskBuilder for TaskBuilder<S> { |
| fn green<'a>(self, pool: &'a mut SchedPool) -> TaskBuilder<GreenSpawner<'a>> { |
| self.spawner(GreenSpawner {pool: pool, handle: None}) |
| } |
| |
| fn green_pinned<'a>(self, pool: &'a mut SchedPool, handle: &'a mut SchedHandle) |
| -> TaskBuilder<GreenSpawner<'a>> { |
| self.spawner(GreenSpawner {pool: pool, handle: Some(handle)}) |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use std::task::TaskBuilder; |
| use super::{SchedPool, PoolConfig, GreenTaskBuilder}; |
| |
| #[test] |
| fn test_green_builder() { |
| let mut pool = SchedPool::new(PoolConfig::new()); |
| let res = TaskBuilder::new().green(&mut pool).try(proc() { |
| "Success!".to_string() |
| }); |
| assert_eq!(res.ok().unwrap(), "Success!".to_string()); |
| pool.shutdown(); |
| } |
| } |