blob: 994e3c561a9721010527e0ef438b1c9198184a5e [file] [log] [blame]
//! Threads that can borrow variables from the stack.
//!
//! Create a scope when spawned threads need to access variables on the stack:
//!
//! ```
//! use crossbeam_utils::thread;
//!
//! let people = vec![
//! "Alice".to_string(),
//! "Bob".to_string(),
//! "Carol".to_string(),
//! ];
//!
//! thread::scope(|scope| {
//! for person in &people {
//! scope.spawn(move |_| {
//! println!("Hello, {}!", person);
//! });
//! }
//! }).unwrap();
//! ```
//!
//! # Why scoped threads?
//!
//! Suppose we wanted to re-write the previous example using plain threads:
//!
//! ```ignore
//! use std::thread;
//!
//! let people = vec![
//! "Alice".to_string(),
//! "Bob".to_string(),
//! "Carol".to_string(),
//! ];
//!
//! let mut threads = Vec::new();
//!
//! for person in &people {
//! threads.push(thread::spawn(move |_| {
//! println!("Hello, {}!", person);
//! }));
//! }
//!
//! for thread in threads {
//! thread.join().unwrap();
//! }
//! ```
//!
//! This doesn't work because the borrow checker complains about `people` not living long enough:
//!
//! ```text
//! error[E0597]: `people` does not live long enough
//! --> src/main.rs:12:20
//! |
//! 12 | for person in &people {
//! | ^^^^^^ borrowed value does not live long enough
//! ...
//! 21 | }
//! | - borrowed value only lives until here
//! |
//! = note: borrowed value must be valid for the static lifetime...
//! ```
//!
//! The problem here is that spawned threads are not allowed to borrow variables on stack because
//! the compiler cannot prove they will be joined before `people` is destroyed.
//!
//! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined
//! before the scope ends.
//!
//! # How scoped threads work
//!
//! If a variable is borrowed by a thread, the thread must complete before the variable is
//! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the
//! `'static` lifetime because the borrow checker cannot be sure when the thread will complete.
//!
//! A scope creates a clear boundary between variables outside the scope and threads inside the
//! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends.
//! This way we guarantee to the borrow checker that scoped threads only live within the scope and
//! can safely access variables outside it.
//!
//! [`std::thread::spawn`]: https://doc.rust-lang.org/std/thread/fn.spawn.html
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::mem;
use std::panic;
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
type SharedVec<T> = Arc<Mutex<Vec<T>>>;
type SharedOption<T> = Arc<Mutex<Option<T>>>;
/// Creates a new `Scope` for [*scoped thread spawning*](struct.Scope.html#method.spawn).
///
/// No matter what happens, before the `Scope` is dropped, it is guaranteed that all the unjoined
/// spawned scoped threads are joined.
///
/// `thread::scope()` returns `Ok(())` if all the unjoined spawned threads did not panic. It returns
/// `Err(e)` if one of them panics with `e`. If many of them panic, it is still guaranteed that all
/// the threads are joined, and `thread::scope()` returns `Err(e)` with `e` from a panicking thread.
///
/// # Examples
///
/// Creating and using a scope:
///
/// ```
/// crossbeam_utils::thread::scope(|scope| {
/// scope.spawn(|_| println!("Exiting scope"));
/// scope.spawn(|_| println!("Running child thread in scope"));
/// }).unwrap();
/// ```
pub fn scope<'env, F, R>(f: F) -> thread::Result<R>
where
F: FnOnce(&Scope<'env>) -> R,
{
let (tx, rx) = mpsc::channel();
let scope = Scope::<'env> {
handles: SharedVec::default(),
chan: tx,
_marker: PhantomData,
};
// Execute the scoped function, but catch any panics.
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope)));
// Wait until all nested scopes are dropped.
drop(scope.chan);
let _ = rx.recv();
// Join all remaining spawned threads.
let panics: Vec<_> = {
let mut handles = scope.handles.lock().unwrap();
// Filter handles that haven't been joined, join them, and collect errors.
let panics = handles
.drain(..)
.filter_map(|handle| handle.lock().unwrap().take())
.filter_map(|handle| handle.join().err())
.collect();
panics
};
// If `f` has panicked, resume unwinding.
// If any of the child threads have panicked, return the panic errors.
// Otherwise, everything is OK and return the result of `f`.
match result {
Err(err) => panic::resume_unwind(err),
Ok(res) => {
if panics.is_empty() {
Ok(res)
} else {
Err(Box::new(panics))
}
}
}
}
/// A scope for spawning threads.
pub struct Scope<'env> {
/// The list of the thread join handles.
handles: SharedVec<SharedOption<thread::JoinHandle<()>>>,
/// Used to wait until all subscopes all dropped.
chan: mpsc::Sender<()>,
/// Borrows data with invariant lifetime `'env`.
_marker: PhantomData<&'env mut &'env ()>,
}
unsafe impl<'env> Sync for Scope<'env> {}
impl<'env> Scope<'env> {
/// Create a scoped thread.
///
/// `spawn` is similar to the [`spawn`] function in Rust's standard library. The difference is
/// that this thread is scoped, meaning that it's guaranteed to terminate before the current
/// stack frame goes away, allowing you to reference the parent stack frame directly. This is
/// ensured by having the parent thread join on the child thread before the scope exits.
///
/// [`spawn`]: https://doc.rust-lang.org/std/thread/fn.spawn.html
pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
where
F: FnOnce(&Scope<'env>) -> T,
F: Send + 'env,
T: Send + 'env,
{
self.builder().spawn(f).unwrap()
}
/// Generates the base configuration for spawning a scoped thread, from which configuration
/// methods can be chained.
pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> {
ScopedThreadBuilder {
scope: self,
builder: thread::Builder::new(),
}
}
}
impl<'env> fmt::Debug for Scope<'env> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Scope { .. }")
}
}
/// Scoped thread configuration. Provides detailed control over the properties and behavior of new
/// scoped threads.
#[derive(Debug)]
pub struct ScopedThreadBuilder<'scope, 'env: 'scope> {
scope: &'scope Scope<'env>,
builder: thread::Builder,
}
impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> {
/// Names the thread-to-be. Currently the name is used for identification only in panic
/// messages.
pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> {
self.builder = self.builder.name(name);
self
}
/// Sets the size of the stack for the new thread.
pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> {
self.builder = self.builder.stack_size(size);
self
}
/// Spawns a new thread, and returns a join handle for it.
pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>>
where
F: FnOnce(&Scope<'env>) -> T,
F: Send + 'env,
T: Send + 'env,
{
// The result of `f` will be stored here.
let result = SharedOption::default();
// Spawn the thread and grab its join handle and thread handle.
let (handle, thread) = {
let result = Arc::clone(&result);
// A clone of the scope that will be moved into the new thread.
let scope = Scope::<'env> {
handles: Arc::clone(&self.scope.handles),
chan: self.scope.chan.clone(),
_marker: PhantomData,
};
// Spawn the thread.
let handle = {
let closure = move || {
// Make sure the scope is inside the closure with the proper `'env` lifetime.
let scope: Scope<'env> = scope;
// Run the closure.
let res = f(&scope);
// Store the result if the closure didn't panic.
*result.lock().unwrap() = Some(res);
};
// Change the type of `closure` from `FnOnce() -> T` to `FnMut() -> T`.
let mut closure = Some(closure);
let closure = move || closure.take().unwrap()();
// Allocate `clsoure` on the heap and erase the `'env` bound.
let closure: Box<FnMut() + Send + 'env> = Box::new(closure);
let closure: Box<FnMut() + Send + 'static> = unsafe { mem::transmute(closure) };
// Finally, spawn the closure.
let mut closure = closure;
self.builder.spawn(move || closure())?
};
let thread = handle.thread().clone();
let handle = Arc::new(Mutex::new(Some(handle)));
(handle, thread)
};
// Add the handle to the shared list of join handles.
self.scope.handles.lock().unwrap().push(Arc::clone(&handle));
Ok(ScopedJoinHandle {
handle,
result,
thread,
_marker: PhantomData,
})
}
}
unsafe impl<'scope, T> Send for ScopedJoinHandle<'scope, T> {}
unsafe impl<'scope, T> Sync for ScopedJoinHandle<'scope, T> {}
/// A handle to a scoped thread
pub struct ScopedJoinHandle<'scope, T> {
/// A join handle to the spawned thread.
handle: SharedOption<thread::JoinHandle<()>>,
/// Holds the result of the inner closure.
result: SharedOption<T>,
/// A handle to the the spawned thread.
thread: thread::Thread,
/// Borrows the parent scope with lifetime `'scope`.
_marker: PhantomData<&'scope ()>,
}
impl<'scope, T> ScopedJoinHandle<'scope, T> {
/// Waits for the associated thread to finish.
///
/// If the child thread panics, [`Err`] is returned with the parameter given to [`panic`].
///
/// [`Err`]: https://doc.rust-lang.org/std/result/enum.Result.html#variant.Err
/// [`panic`]: https://doc.rust-lang.org/std/macro.panic.html
///
/// # Panics
///
/// This function may panic on some platforms if a thread attempts to join itself or otherwise
/// may create a deadlock with joining threads.
pub fn join(self) -> thread::Result<T> {
// Take out the handle. The handle will surely be available because the root scope waits
// for nested scopes before joining remaining threads.
let handle = self.handle.lock().unwrap().take().unwrap();
// Join the thread and then take the result out of its inner closure.
handle
.join()
.map(|()| self.result.lock().unwrap().take().unwrap())
}
/// Gets the underlying [`std::thread::Thread`] handle.
///
/// [`std::thread::Thread`]: https://doc.rust-lang.org/std/thread/struct.Thread.html
pub fn thread(&self) -> &thread::Thread {
&self.thread
}
}
impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("ScopedJoinHandle { .. }")
}
}