blob: 6d9ae52500c8a96aaf395869b40bf74faecaebbc [file] [log] [blame]
//! Run closures in parallel.
//!
//! This is a simple primitive for spawning threads in bulk and waiting for them to complete.
//! Threads are allowed to borrow local variables from the main thread.
//!
//! # Examples
//!
//! Run two threads that increment a number:
//!
//! ```
//! use easy_parallel::Parallel;
//! use std::sync::Mutex;
//!
//! let mut m = Mutex::new(0);
//!
//! Parallel::new()
//! .add(|| *m.lock().unwrap() += 1)
//! .add(|| *m.lock().unwrap() += 1)
//! .run();
//!
//! assert_eq!(*m.get_mut().unwrap(), 2);
//! ```
//!
//! Square each number of a vector on a different thread:
//!
//! ```
//! use easy_parallel::Parallel;
//!
//! let v = vec![10, 20, 30];
//!
//! let squares = Parallel::new()
//! .each(0..v.len(), |i| v[i] * v[i])
//! .run();
//!
//! assert_eq!(squares, [100, 400, 900]);
//! ```
//!
//! Compute the sum of numbers in an array:
//!
//! ```
//! use easy_parallel::Parallel;
//!
//! fn par_sum(v: &[i32]) -> i32 {
//! const THRESHOLD: usize = 2;
//!
//! if v.len() <= THRESHOLD {
//! v.iter().copied().sum()
//! } else {
//! let half = (v.len() + 1) / 2;
//! let sums = Parallel::new().each(v.chunks(half), par_sum).run();
//! sums.into_iter().sum()
//! }
//! }
//!
//! let v = [1, 25, -4, 10, 8];
//! assert_eq!(par_sum(&v), 40);
//! ```
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::fmt;
use std::mem;
use std::panic;
use std::process;
use std::sync::mpsc;
use std::thread;
/// A builder that runs closures in parallel.
#[derive(Default)]
#[must_use]
pub struct Parallel<'a, T> {
/// Closures to run.
closures: Vec<Box<dyn FnOnce() -> T + Send + 'a>>,
}
impl<'a, T> Parallel<'a, T> {
/// Creates a builder for running closures in parallel.
///
/// # Examples
///
/// ```
/// use easy_parallel::Parallel;
///
/// let p = Parallel::<()>::new();
/// ```
pub fn new() -> Parallel<'a, T> {
Parallel {
closures: Vec::new(),
}
}
/// Adds a closure to the list.
///
/// # Examples
///
/// ```
/// use easy_parallel::Parallel;
///
/// Parallel::new()
/// .add(|| println!("hello from a thread"))
/// .run();
/// ```
pub fn add<F>(mut self, f: F) -> Parallel<'a, T>
where
F: FnOnce() -> T + Send + 'a,
T: Send + 'a,
{
self.closures.push(Box::new(f));
self
}
/// Adds a cloned closure for each item in an iterator.
///
/// Each clone of the closure takes an item as an argument.
///
/// # Examples
///
/// ```
/// use easy_parallel::Parallel;
///
/// Parallel::new()
/// .each(0..5, |i| println!("hello from thread #{}", i))
/// .run();
/// ```
pub fn each<A, I, F>(mut self, iter: I, f: F) -> Parallel<'a, T>
where
I: IntoIterator<Item = A>,
F: FnOnce(A) -> T + Clone + Send + 'a,
A: Send + 'a,
T: Send + 'a,
{
for t in iter.into_iter() {
let f = f.clone();
self.closures.push(Box::new(|| f(t)));
}
self
}
/// Runs each closure on a separate thread and collects their results.
///
/// Results are collected in the order in which closures were added. One of the closures always
/// runs on the main thread because there is no point in spawning an extra thread for it.
///
/// If a closure panics, panicking will resume in the main thread after all threads are joined.
///
/// # Examples
///
/// ```
/// use easy_parallel::Parallel;
/// use std::thread;
/// use std::time::Duration;
///
/// let res = Parallel::new()
/// .each(1..=3, |i| 10 * i)
/// .add(|| 100)
/// .run();
///
/// assert_eq!(res, [10, 20, 30, 100]);
/// ```
pub fn run(mut self) -> Vec<T>
where
T: Send + 'a,
{
// Get the last closure.
let f = match self.closures.pop() {
None => return Vec::new(),
Some(f) => f,
};
// Spawn threads, run the last closure on the current thread.
let (mut results, r) = self.finish(f);
results.push(r);
results
}
/// Finishes with a closure to run on the main thread, starts threads, and collects results.
///
/// Results are collected in the order in which closures were added.
///
/// If a closure panics, panicking will resume in the main thread after all threads are joined.
///
/// # Examples
///
/// ```
/// use easy_parallel::Parallel;
/// use std::thread;
/// use std::time::Duration;
///
/// let (res, ()) = Parallel::new()
/// .each(1..=3, |i| 10 * i)
/// .finish(|| println!("Waiting for results"));
///
/// assert_eq!(res, [10, 20, 30]);
/// ```
pub fn finish<F, R>(self, f: F) -> (Vec<T>, R)
where
F: FnOnce() -> R,
T: Send + 'a,
{
// Set up a guard that aborts on panic.
let guard = NoPanic;
// Join handles for spawned threads.
let mut handles = Vec::new();
// Channels to collect results from spawned threads.
let mut receivers = Vec::new();
// Spawn a thread for each closure after the first one.
for f in self.closures.into_iter() {
// Wrap into a closure that sends the result back.
let (sender, receiver) = mpsc::channel();
let f = move || sender.send(f()).unwrap();
// Erase the `'a` lifetime.
let f: Box<dyn FnOnce() + Send + 'a> = Box::new(f);
let f: Box<dyn FnOnce() + Send + 'static> = unsafe { mem::transmute(f) };
// Spawn a thread for the closure.
handles.push(thread::spawn(f));
receivers.push(receiver);
}
let mut last_err = None;
// Run the main closure on the main thread.
let res = panic::catch_unwind(panic::AssertUnwindSafe(f));
// Join threads and save the last panic if there was one.
for h in handles {
if let Err(err) = h.join() {
last_err = Some(err);
}
}
// Drop the guard because we may resume a panic now.
drop(guard);
// If a thread has panicked, resume the last collected panic.
if let Some(err) = last_err {
panic::resume_unwind(err);
}
// Collect the results from threads.
let mut results = Vec::new();
for receiver in receivers {
results.push(receiver.recv().unwrap());
}
// If the main closure panicked, resume its panic.
match res {
Ok(r) => (results, r),
Err(err) => panic::resume_unwind(err),
}
}
}
impl<T> fmt::Debug for Parallel<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Parallel")
.field("len", &self.closures.len())
.finish()
}
}
/// Aborts the process if dropped while panicking.
struct NoPanic;
impl Drop for NoPanic {
fn drop(&mut self) {
if thread::panicking() {
process::abort();
}
}
}