blob: c46dcdd01d71803c1776a5be31f2a6d4e680b9f6 [file] [log] [blame]
//! A concurrent work-stealing deque.
//!
//! This data structure is most commonly used in schedulers. The typical setup involves a number of
//! threads where each thread has its own deque containing tasks. A thread may push tasks into its
//! deque as well as pop tasks from it. Once it runs out of tasks, it may steal some from other
//! threads to help complete tasks more quickly. Therefore, work-stealing deques supports three
//! essential operations: *push*, *pop*, and *steal*.
//!
//! # Types of deques
//!
//! There are two types of deques, differing only in which order tasks get pushed and popped. The
//! two task ordering strategies are:
//!
//! * First-in first-out (FIFO)
//! * Last-in first-out (LIFO)
//!
//! A deque is a buffer with two ends, front and back. In a FIFO deque, tasks are pushed into the
//! back, popped from the front, and stolen from the front. However, in a LIFO deque, tasks are
//! popped from the back instead - that is the only difference.
//!
//! # Workers and stealers
//!
//! There are two functions that construct a deque: [`fifo`] and [`lifo`]. These functions return a
//! [`Worker`] and a [`Stealer`]. The thread which owns the deque is usually called *worker*, while
//! all other threads are *stealers*.
//!
//! [`Worker`] is able to push and pop tasks. It cannot be shared among multiple threads - only
//! one thread owns it.
//!
//! [`Stealer`] can only steal tasks. It can be shared among multiple threads by reference or by
//! cloning. Cloning a [`Stealer`] simply creates another one associated with the same deque.
//!
//! # Examples
//!
//! ```
//! use crossbeam_deque::{self as deque, Pop, Steal};
//! use std::thread;
//!
//! // Create a LIFO deque.
//! let (w, s) = deque::lifo();
//!
//! // Push several elements into the back.
//! w.push(1);
//! w.push(2);
//! w.push(3);
//!
//! // This is a LIFO deque, which means an element is popped from the back.
//! // If it was a FIFO deque, `w.pop()` would return `Some(1)`.
//! assert_eq!(w.pop(), Pop::Data(3));
//!
//! // Create a stealer thread.
//! thread::spawn(move || {
//! assert_eq!(s.steal(), Steal::Data(1));
//! assert_eq!(s.steal(), Steal::Data(2));
//! }).join().unwrap();
//! ```
//!
//! [`Worker`]: struct.Worker.html
//! [`Stealer`]: struct.Stealer.html
//! [`fifo`]: fn.fifo.html
//! [`lifo`]: fn.lifo.html
#![warn(missing_docs)]
#![warn(missing_debug_implementations)]
extern crate crossbeam_epoch as epoch;
extern crate crossbeam_utils as utils;
use std::cell::Cell;
use std::cmp;
use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::atomic::{self, AtomicIsize, Ordering};
use std::sync::Arc;
use epoch::{Atomic, Owned};
use utils::CachePadded;
/// Minimum buffer capacity for a deque.
const MIN_CAP: usize = 32;
/// Maximum number of additional elements that can be stolen in `steal_many`.
const MAX_BATCH: usize = 128;
/// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
/// deallocated as soon as possible.
const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
/// Creates a work-stealing deque with the first-in first-out strategy.
///
/// Elements are pushed into the back, popped from the front, and stolen from the front. In other
/// words, the worker side behaves as a FIFO queue.
///
/// # Examples
///
/// ```
/// use crossbeam_deque::{self as deque, Pop, Steal};
///
/// let (w, s) = deque::fifo::<i32>();
/// w.push(1);
/// w.push(2);
/// w.push(3);
///
/// assert_eq!(s.steal(), Steal::Data(1));
/// assert_eq!(w.pop(), Pop::Data(2));
/// assert_eq!(w.pop(), Pop::Data(3));
/// ```
pub fn fifo<T>() -> (Worker<T>, Stealer<T>) {
let buffer = Buffer::alloc(MIN_CAP);
let inner = Arc::new(CachePadded::new(Inner {
front: AtomicIsize::new(0),
back: AtomicIsize::new(0),
buffer: Atomic::new(buffer),
}));
let w = Worker {
inner: inner.clone(),
cached_buffer: Cell::new(buffer),
flavor: Flavor::Fifo,
_marker: PhantomData,
};
let s = Stealer {
inner,
flavor: Flavor::Fifo,
};
(w, s)
}
/// Creates a work-stealing deque with the last-in first-out strategy.
///
/// Elements are pushed into the back, popped from the back, and stolen from the front. In other
/// words, the worker side behaves as a LIFO stack.
///
/// # Examples
///
/// ```
/// use crossbeam_deque::{self as deque, Pop, Steal};
///
/// let (w, s) = deque::lifo::<i32>();
/// w.push(1);
/// w.push(2);
/// w.push(3);
///
/// assert_eq!(s.steal(), Steal::Data(1));
/// assert_eq!(w.pop(), Pop::Data(3));
/// assert_eq!(w.pop(), Pop::Data(2));
/// ```
pub fn lifo<T>() -> (Worker<T>, Stealer<T>) {
let buffer = Buffer::alloc(MIN_CAP);
let inner = Arc::new(CachePadded::new(Inner {
front: AtomicIsize::new(0),
back: AtomicIsize::new(0),
buffer: Atomic::new(buffer),
}));
let w = Worker {
inner: inner.clone(),
cached_buffer: Cell::new(buffer),
flavor: Flavor::Lifo,
_marker: PhantomData,
};
let s = Stealer {
inner,
flavor: Flavor::Lifo,
};
(w, s)
}
/// A buffer that holds elements in a deque.
///
/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
/// *not* deallocate the buffer.
struct Buffer<T> {
/// Pointer to the allocated memory.
ptr: *mut T,
/// Capacity of the buffer. Always a power of two.
cap: usize,
}
unsafe impl<T> Send for Buffer<T> {}
impl<T> Buffer<T> {
/// Allocates a new buffer with the specified capacity.
fn alloc(cap: usize) -> Self {
debug_assert_eq!(cap, cap.next_power_of_two());
let mut v = Vec::with_capacity(cap);
let ptr = v.as_mut_ptr();
mem::forget(v);
Buffer { ptr, cap }
}
/// Deallocates the buffer.
unsafe fn dealloc(self) {
drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
}
/// Returns a pointer to the element at the specified `index`.
unsafe fn at(&self, index: isize) -> *mut T {
// `self.cap` is always a power of two.
self.ptr.offset(index & (self.cap - 1) as isize)
}
/// Writes `value` into the specified `index`.
unsafe fn write(&self, index: isize, value: T) {
ptr::write_volatile(self.at(index), value)
}
/// Reads a value from the specified `index`.
unsafe fn read(&self, index: isize) -> T {
ptr::read_volatile(self.at(index))
}
}
impl<T> Clone for Buffer<T> {
fn clone(&self) -> Buffer<T> {
Buffer {
ptr: self.ptr,
cap: self.cap,
}
}
}
impl<T> Copy for Buffer<T> {}
/// Possible outcomes of a pop operation.
#[must_use]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
pub enum Pop<T> {
/// The deque was empty at the time of popping.
Empty,
/// Some data has been successfully popped.
Data(T),
/// Lost the race for popping data to another concurrent steal operation. Try again.
Retry,
}
/// Possible outcomes of a steal operation.
#[must_use]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
pub enum Steal<T> {
/// The deque was empty at the time of stealing.
Empty,
/// Some data has been successfully stolen.
Data(T),
/// Lost the race for stealing data to another concurrent steal or pop operation. Try again.
Retry,
}
/// Internal data that is shared between the worker and stealers.
///
/// The implementation is based on the following work:
///
/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
/// PPoPP 2013.][weak-mem]
/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
/// atomics. OOPSLA 2013.][checker]
///
/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
struct Inner<T> {
/// The front index.
front: AtomicIsize,
/// The back index.
back: AtomicIsize,
/// The underlying buffer.
buffer: Atomic<Buffer<T>>,
}
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
// Load the back index, front index, and buffer.
let b = self.back.load(Ordering::Relaxed);
let f = self.front.load(Ordering::Relaxed);
unsafe {
let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
// Go through the buffer from front to back and drop all elements in the deque.
let mut i = f;
while i != b {
ptr::drop_in_place(buffer.deref().at(i));
i = i.wrapping_add(1);
}
// Free the memory allocated by the buffer.
buffer.into_owned().into_box().dealloc();
}
}
}
/// The flavor of a deque: FIFO or LIFO.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum Flavor {
/// The first-in first-out flavor.
Fifo,
/// The last-in first-out flavor.
Lifo,
}
/// The worker side of a deque.
///
/// Workers push elements into the back and pop elements depending on the strategy:
///
/// * In FIFO deques, elements are popped from the front.
/// * In LIFO deques, elements are popped from the back.
///
/// A deque has only one worker. Workers are not intended to be shared among multiple threads.
pub struct Worker<T> {
/// A reference to the inner representation of the deque.
inner: Arc<CachePadded<Inner<T>>>,
/// A copy of `inner.buffer` for quick access.
cached_buffer: Cell<Buffer<T>>,
/// The flavor of the deque.
flavor: Flavor,
/// Indicates that the worker cannot be shared among threads.
_marker: PhantomData<*mut ()>, // !Send + !Sync
}
unsafe impl<T: Send> Send for Worker<T> {}
impl<T> Worker<T> {
/// Resizes the internal buffer to the new capacity of `new_cap`.
#[cold]
unsafe fn resize(&self, new_cap: usize) {
// Load the back index, front index, and buffer.
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::Relaxed);
let buffer = self.cached_buffer.get();
// Allocate a new buffer.
let new = Buffer::alloc(new_cap);
self.cached_buffer.set(new);
// Copy data from the old buffer to the new one.
let mut i = f;
while i != b {
ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
i = i.wrapping_add(1);
}
let guard = &epoch::pin();
// Replace the old buffer with the new one.
let old =
self.inner
.buffer
.swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
// Destroy the old buffer later.
guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
// If the buffer is very large, then flush the thread-local garbage in order to deallocate
// it as soon as possible.
if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
guard.flush();
}
}
/// Reserves enough capacity so that `reserve_cap` elements can be pushed without growing the
/// buffer.
fn reserve(&self, reserve_cap: usize) {
if reserve_cap > 0 {
// Compute the current length.
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::SeqCst);
let len = b.wrapping_sub(f) as usize;
// The current capacity.
let cap = self.cached_buffer.get().cap;
// Is there enough capacity to push `reserve_cap` elements?
if cap - len < reserve_cap {
// Keep doubling the capacity as much as is needed.
let mut new_cap = cap * 2;
while new_cap - len < reserve_cap {
new_cap *= 2;
}
// Resize the buffer.
unsafe {
self.resize(new_cap);
}
}
}
}
/// Returns `true` if the deque is empty.
///
/// ```
/// use crossbeam_deque as deque;
///
/// let (w, _) = deque::lifo();
/// assert!(w.is_empty());
/// w.push(1);
/// assert!(!w.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::SeqCst);
b.wrapping_sub(f) <= 0
}
/// Pushes an element into the back of the deque.
///
/// # Examples
///
/// ```
/// use crossbeam_deque as deque;
///
/// let (w, _) = deque::lifo();
/// w.push(1);
/// w.push(2);
/// ```
pub fn push(&self, value: T) {
// Load the back index, front index, and buffer.
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::Acquire);
let mut buffer = self.cached_buffer.get();
// Calculate the length of the deque.
let len = b.wrapping_sub(f);
// Is the deque full?
if len >= buffer.cap as isize {
// Yes. Grow the underlying buffer.
unsafe {
self.resize(2 * buffer.cap);
}
buffer = self.cached_buffer.get();
}
// Write `value` into the slot.
unsafe {
buffer.write(b, value);
}
atomic::fence(Ordering::Release);
// Increment the back index.
//
// This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
// races because it doesn't understand fences.
self.inner.back.store(b.wrapping_add(1), Ordering::Release);
}
/// Pops an element from the deque.
///
/// Which end of the deque is used depends on the strategy:
///
/// * If this is a FIFO deque, an element is popped from the front.
/// * If this is a LIFO deque, an element is popped from the back.
///
/// # Examples
///
/// ```
/// use crossbeam_deque::{self as deque, Pop};
///
/// let (w, _) = deque::fifo();
/// w.push(1);
/// w.push(2);
///
/// assert_eq!(w.pop(), Pop::Data(1));
/// assert_eq!(w.pop(), Pop::Data(2));
/// assert_eq!(w.pop(), Pop::Empty);
/// ```
pub fn pop(&self) -> Pop<T> {
// Load the back and front index.
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::Relaxed);
// Calculate the length of the deque.
let len = b.wrapping_sub(f);
// Is the deque empty?
if len <= 0 {
return Pop::Empty;
}
match self.flavor {
// Pop from the front of the deque.
Flavor::Fifo => {
// Try incrementing the front index to pop the value.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
unsafe {
// Read the popped value.
let buffer = self.cached_buffer.get();
let data = buffer.read(f);
// Shrink the buffer if `len - 1` is less than one fourth of the capacity.
if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
self.resize(buffer.cap / 2);
}
return Pop::Data(data);
}
}
Pop::Retry
}
// Pop from the back of the deque.
Flavor::Lifo => {
// Decrement the back index.
let b = b.wrapping_sub(1);
self.inner.back.store(b, Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
// Load the front index.
let f = self.inner.front.load(Ordering::Relaxed);
// Compute the length after the back index was decremented.
let len = b.wrapping_sub(f);
if len < 0 {
// The deque is empty. Restore the back index to the original value.
self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
Pop::Empty
} else {
// Read the value to be popped.
let buffer = self.cached_buffer.get();
let mut value = unsafe { Some(buffer.read(b)) };
// Are we popping the last element from the deque?
if len == 0 {
// Try incrementing the front index.
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(1),
Ordering::SeqCst,
Ordering::Relaxed,
).is_err()
{
// Failed. We didn't pop anything.
mem::forget(value.take());
}
// Restore the back index to the original value.
self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
} else {
// Shrink the buffer if `len` is less than one fourth of the capacity.
if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
unsafe {
self.resize(buffer.cap / 2);
}
}
}
match value {
None => Pop::Empty,
Some(data) => Pop::Data(data),
}
}
}
}
}
}
impl<T> fmt::Debug for Worker<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Worker {{ ... }}")
}
}
/// The stealer side of a deque.
///
/// Stealers can only steal elements from the front of the deque.
///
/// Stealers are cloneable so that they can be easily shared among multiple threads.
pub struct Stealer<T> {
/// A reference to the inner representation of the deque.
inner: Arc<CachePadded<Inner<T>>>,
/// The flavor of the deque.
flavor: Flavor,
}
unsafe impl<T: Send> Send for Stealer<T> {}
unsafe impl<T: Send> Sync for Stealer<T> {}
impl<T> Stealer<T> {
/// Returns `true` if the deque is empty.
///
/// ```
/// use crossbeam_deque as deque;
///
/// let (w, s) = deque::lifo();
/// assert!(s.is_empty());
/// w.push(1);
/// assert!(!s.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
let f = self.inner.front.load(Ordering::Acquire);
atomic::fence(Ordering::SeqCst);
let b = self.inner.back.load(Ordering::Acquire);
b.wrapping_sub(f) <= 0
}
/// Steals an element from the front of the deque.
///
/// # Examples
///
/// ```
/// use crossbeam_deque::{self as deque, Steal};
///
/// let (w, s) = deque::lifo();
/// w.push(1);
/// w.push(2);
///
/// assert_eq!(s.steal(), Steal::Data(1));
/// assert_eq!(s.steal(), Steal::Data(2));
/// assert_eq!(s.steal(), Steal::Empty);
/// ```
pub fn steal(&self) -> Steal<T> {
// Load the front index.
let f = self.inner.front.load(Ordering::Acquire);
// A SeqCst fence is needed here.
//
// If the current thread is already pinned (reentrantly), we must manually issue the
// fence. Otherwise, the following pinning will issue the fence anyway, so we don't
// have to.
if epoch::is_pinned() {
atomic::fence(Ordering::SeqCst);
}
let guard = &epoch::pin();
// Load the back index.
let b = self.inner.back.load(Ordering::Acquire);
// Is the deque empty?
if b.wrapping_sub(f) <= 0 {
return Steal::Empty;
}
// Load the buffer and read the value at the front.
let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
let value = unsafe { buffer.deref().read(f) };
// Try incrementing the front index to steal the value.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
// We didn't steal this value, forget it.
mem::forget(value);
return Steal::Retry;
}
// Return the stolen value.
Steal::Data(value)
}
/// Steals elements from the front of the deque.
///
/// If at least one element can be stolen, it will be returned. Additionally, some of the
/// remaining elements will be stolen and pushed into the back of worker `dest` in order to
/// balance the work among deques. There is no hard guarantee on exactly how many elements will
/// be stolen, but it should be around half of the deque.
///
/// # Examples
///
/// ```
/// use crossbeam_deque::{self as deque, Steal};
///
/// let (w1, s1) = deque::fifo();
/// let (w2, s2) = deque::fifo();
///
/// w1.push(1);
/// w1.push(2);
/// w1.push(3);
/// w1.push(4);
///
/// assert_eq!(s1.steal_many(&w2), Steal::Data(1));
/// assert_eq!(s2.steal(), Steal::Data(2));
/// ```
pub fn steal_many(&self, dest: &Worker<T>) -> Steal<T> {
// Load the front index.
let mut f = self.inner.front.load(Ordering::Acquire);
// A SeqCst fence is needed here.
//
// If the current thread is already pinned (reentrantly), we must manually issue the
// fence. Otherwise, the following pinning will issue the fence anyway, so we don't
// have to.
if epoch::is_pinned() {
atomic::fence(Ordering::SeqCst);
}
let guard = &epoch::pin();
// Load the back index.
let b = self.inner.back.load(Ordering::Acquire);
// Is the deque empty?
let len = b.wrapping_sub(f);
if len <= 0 {
return Steal::Empty;
}
// Reserve capacity for the stolen additional elements.
let additional = cmp::min((len as usize - 1) / 2, MAX_BATCH);
dest.reserve(additional);
let additional = additional as isize;
// Get the destination buffer and back index.
let dest_buffer = dest.cached_buffer.get();
let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
// Load the buffer and read the value at the front.
let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
let value = unsafe { buffer.deref().read(f) };
match self.flavor {
// Steal a batch of elements from the front at once.
Flavor::Fifo => {
// Copy the additional elements from the source to the destination buffer.
for i in 0..additional {
unsafe {
let value = buffer.deref().read(f.wrapping_add(i + 1));
dest_buffer.write(dest_b.wrapping_add(i), value);
}
}
// Try incrementing the front index to steal the batch.
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(additional + 1),
Ordering::SeqCst,
Ordering::Relaxed,
).is_err()
{
// We didn't steal this value, forget it.
mem::forget(value);
return Steal::Retry;
}
atomic::fence(Ordering::Release);
// Success! Update the back index in the destination deque.
//
// This ordering could be `Relaxed`, but then thread sanitizer would falsely report
// data races because it doesn't understand fences.
dest.inner
.back
.store(dest_b.wrapping_add(additional), Ordering::Release);
// Return the first stolen value.
Steal::Data(value)
}
// Steal a batch of elements from the front one by one.
Flavor::Lifo => {
// Try incrementing the front index to steal the value.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
// We didn't steal this value, forget it.
mem::forget(value);
return Steal::Retry;
}
// Move the front index one step forward.
f = f.wrapping_add(1);
// Repeat the same procedure for the additional steals.
for _ in 0..additional {
// We've already got the current front index. Now execute the fence to
// synchronize with other threads.
atomic::fence(Ordering::SeqCst);
// Load the back index.
let b = self.inner.back.load(Ordering::Acquire);
// Is the deque empty?
if b.wrapping_sub(f) <= 0 {
break;
}
// Read the value at the front.
let value = unsafe { buffer.deref().read(f) };
// Try incrementing the front index to steal the value.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
// We didn't steal this value, forget it and break from the loop.
mem::forget(value);
break;
}
// Write the stolen value into the destination buffer.
unsafe {
dest_buffer.write(dest_b, value);
}
// Move the source front index and the destination back index one step forward.
f = f.wrapping_add(1);
dest_b = dest_b.wrapping_add(1);
atomic::fence(Ordering::Release);
// Update the destination back index.
//
// This ordering could be `Relaxed`, but then thread sanitizer would falsely
// report data races because it doesn't understand fences.
dest.inner.back.store(dest_b, Ordering::Release);
}
// Return the first stolen value.
Steal::Data(value)
}
}
}
}
impl<T> Clone for Stealer<T> {
fn clone(&self) -> Stealer<T> {
Stealer {
inner: self.inner.clone(),
flavor: self.flavor,
}
}
}
impl<T> fmt::Debug for Stealer<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Stealer {{ ... }}")
}
}