blob: 81ea85fa3102b6e4e0fa2de3b6f6184d2d476d2e [file] [log] [blame]
use std::cell::RefCell;
use std::fmt;
use std::mem;
use std::rc::Rc;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
use std::io;
use {builder_spawn_unsafe, FnBox};
use sync::AtomicOption;
pub struct Scope<'a> {
dtors: RefCell<Option<DtorChain<'a>>>
}
struct DtorChain<'a> {
dtor: Box<FnBox + 'a>,
next: Option<Box<DtorChain<'a>>>
}
enum JoinState {
Running(thread::JoinHandle<()>),
Joined,
}
impl JoinState {
fn join(&mut self) {
let mut state = JoinState::Joined;
mem::swap(self, &mut state);
if let JoinState::Running(handle) = state {
let res = handle.join();
if !thread::panicking() { res.unwrap(); }
}
}
}
/// A handle to a scoped thread
pub struct ScopedJoinHandle<T> {
inner: Rc<RefCell<JoinState>>,
packet: Arc<AtomicOption<T>>,
thread: thread::Thread,
}
/// Create a new `scope`, for deferred destructors.
///
/// Scopes, in particular, support [*scoped thread spawning*](struct.Scope.html#method.spawn).
///
/// # Examples
///
/// Creating and using a scope:
///
/// ```
/// crossbeam::scope(|scope| {
/// scope.defer(|| println!("Exiting scope"));
/// scope.spawn(|| println!("Running child thread in scope"))
/// });
/// // Prints messages in the reverse order written
/// ```
pub fn scope<'a, F, R>(f: F) -> R where F: FnOnce(&Scope<'a>) -> R {
let mut scope = Scope { dtors: RefCell::new(None) };
let ret = f(&scope);
scope.drop_all();
ret
}
impl<'a> fmt::Debug for Scope<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Scope {{ ... }}")
}
}
impl<T> fmt::Debug for ScopedJoinHandle<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ScopedJoinHandle {{ ... }}")
}
}
impl<'a> Scope<'a> {
// This method is carefully written in a transactional style, so
// that it can be called directly and, if any dtor panics, can be
// resumed in the unwinding this causes. By initially running the
// method outside of any destructor, we avoid any leakage problems
// due to @rust-lang/rust#14875.
fn drop_all(&mut self) {
loop {
// use a separate scope to ensure that the RefCell borrow
// is relinquished before running `dtor`
let dtor = {
let mut dtors = self.dtors.borrow_mut();
if let Some(mut node) = dtors.take() {
*dtors = node.next.take().map(|b| *b);
node.dtor
} else {
return
}
};
dtor.call_box()
}
}
/// Schedule code to be executed when exiting the scope.
///
/// This is akin to having a destructor on the stack, except that it is
/// *guaranteed* to be run.
pub fn defer<F>(&self, f: F) where F: FnOnce() + 'a {
let mut dtors = self.dtors.borrow_mut();
*dtors = Some(DtorChain {
dtor: Box::new(f),
next: dtors.take().map(Box::new)
});
}
/// Create a scoped thread.
///
/// `spawn` is similar to the [`spawn`][spawn] function in Rust's standard library. The
/// difference is that this thread is scoped, meaning that it's guaranteed to terminate
/// before the current stack frame goes away, allowing you to reference the parent stack frame
/// directly. This is ensured by having the parent thread join on the child thread before the
/// scope exits.
///
/// [spawn]: http://doc.rust-lang.org/std/thread/fn.spawn.html
///
/// # Examples
///
/// A basic scoped thread:
///
/// ```
/// crossbeam::scope(|scope| {
/// scope.spawn(|| {
/// println!("Hello from a scoped thread!");
/// });
/// });
/// ```
///
/// When writing concurrent Rust programs, you'll sometimes see a pattern like this, using
/// [`std::thread::spawn`][spawn]:
///
/// ```ignore
/// let array = [1, 2, 3];
/// let mut guards = vec![];
///
/// for i in &array {
/// let guard = std::thread::spawn(move || {
/// println!("element: {}", i);
/// });
///
/// guards.push(guard);
/// }
///
/// for guard in guards {
/// guard.join().unwrap();
/// }
/// ```
///
/// The basic pattern is:
///
/// 1. Iterate over some collection.
/// 2. Spin up a thread to operate on each part of the collection.
/// 3. Join all the threads.
///
/// However, this code actually gives an error:
///
/// ```text
/// error: `array` does not live long enough
/// for i in &array {
/// ^~~~~
/// in expansion of for loop expansion
/// note: expansion site
/// note: reference must be valid for the static lifetime...
/// note: ...but borrowed value is only valid for the block suffix following statement 0 at ...
/// let array = [1, 2, 3];
/// let mut guards = vec![];
///
/// for i in &array {
/// let guard = std::thread::spawn(move || {
/// println!("element: {}", i);
/// ...
/// error: aborting due to previous error
/// ```
///
/// Because [`std::thread::spawn`][spawn] doesn't know about this scope, it requires a
/// `'static` lifetime. One way of giving it a proper lifetime is to use an [`Arc`][arc]:
///
/// [arc]: http://doc.rust-lang.org/stable/std/sync/struct.Arc.html
///
/// ```
/// use std::sync::Arc;
///
/// let array = Arc::new([1, 2, 3]);
/// let mut guards = vec![];
///
/// for i in 0..array.len() {
/// let a = array.clone();
///
/// let guard = std::thread::spawn(move || {
/// println!("element: {}", a[i]);
/// });
///
/// guards.push(guard);
/// }
///
/// for guard in guards {
/// guard.join().unwrap();
/// }
/// ```
///
/// But this introduces unnecessary allocation, as `Arc<T>` puts its data on the heap, and we
/// also end up dealing with reference counts. We know that we're joining the threads before
/// our function returns, so just taking a reference _should_ be safe. Rust can't know that,
/// though.
///
/// Enter scoped threads. Here's our original example, using `spawn` from crossbeam rather
/// than from `std::thread`:
///
/// ```
/// let array = [1, 2, 3];
///
/// crossbeam::scope(|scope| {
/// for i in &array {
/// scope.spawn(move || {
/// println!("element: {}", i);
/// });
/// }
/// });
/// ```
///
/// Much more straightforward.
pub fn spawn<F, T>(&self, f: F) -> ScopedJoinHandle<T> where
F: FnOnce() -> T + Send + 'a, T: Send + 'a
{
self.builder().spawn(f).unwrap()
}
/// Generates the base configuration for spawning a scoped thread, from which configuration
/// methods can be chained.
pub fn builder<'s>(&'s self) -> ScopedThreadBuilder<'s, 'a> {
ScopedThreadBuilder {
scope: self,
builder: thread::Builder::new(),
}
}
}
/// Scoped thread configuration. Provides detailed control over the properties and behavior of new
/// scoped threads.
pub struct ScopedThreadBuilder<'s, 'a: 's> {
scope: &'s Scope<'a>,
builder: thread::Builder,
}
impl<'s, 'a: 's> ScopedThreadBuilder<'s, 'a> {
/// Names the thread-to-be. Currently the name is used for identification only in panic
/// messages.
pub fn name(mut self, name: String) -> ScopedThreadBuilder<'s, 'a> {
self.builder = self.builder.name(name);
self
}
/// Sets the size of the stack for the new thread.
pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'s, 'a> {
self.builder = self.builder.stack_size(size);
self
}
/// Spawns a new thread, and returns a join handle for it.
pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<T>>
where F: FnOnce() -> T + Send + 'a, T: Send + 'a
{
let their_packet = Arc::new(AtomicOption::new());
let my_packet = their_packet.clone();
let join_handle = try!(unsafe {
builder_spawn_unsafe(self.builder, move || {
their_packet.swap(f(), Ordering::Relaxed);
})
});
let thread = join_handle.thread().clone();
let deferred_handle = Rc::new(RefCell::new(JoinState::Running(join_handle)));
let my_handle = deferred_handle.clone();
self.scope.defer(move || {
let mut state = deferred_handle.borrow_mut();
state.join();
});
Ok(ScopedJoinHandle {
inner: my_handle,
packet: my_packet,
thread: thread,
})
}
}
impl<T> ScopedJoinHandle<T> {
/// Join the scoped thread, returning the result it produced.
pub fn join(self) -> T {
self.inner.borrow_mut().join();
self.packet.take(Ordering::Relaxed).unwrap()
}
/// Get the underlying thread handle.
pub fn thread(&self) -> &thread::Thread {
&self.thread
}
}
impl<'a> Drop for Scope<'a> {
fn drop(&mut self) {
self.drop_all()
}
}