//! A concurrent multi-producer multi-consumer queue.
//! There are two kinds of queues:
//! 1. [Bounded] queue with limited capacity.
//! 2. [Unbounded] queue with unlimited capacity.
//! Queues also have the capability to get [closed] at any point. When closed, no more items can be
//! pushed into the queue, although the remaining items can still be popped.
//! These features make it easy to build channels similar to [`std::sync::mpsc`] on top of this
//! crate.
//! # Examples
//! ```
//! use concurrent_queue::ConcurrentQueue;
//! let q = ConcurrentQueue::unbounded();
//! q.push(1).unwrap();
//! q.push(2).unwrap();
//! assert_eq!(q.pop(), Ok(1));
//! assert_eq!(q.pop(), Ok(2));
//! ```
//! # Features
//! `concurrent-queue` uses an `std` default feature. With this feature enabled, this crate will
//! use [`std::thread::yield_now`] to avoid busy waiting in tight loops. However, with this
//! feature disabled, [`core::hint::spin_loop`] will be used instead. Disabling `std` will allow
//! this crate to be used on `no_std` platforms at the potential expense of more busy waiting.
//! There is also a `portable-atomic` feature, which uses a polyfill from the
//! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them.
//! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it.
//! Note that even with this feature enabled, `concurrent-queue` still requires a global allocator
//! to be available. See the documentation for the [`std::alloc::GlobalAlloc`] trait for more
//! information.
//! [Bounded]: `ConcurrentQueue::bounded()`
//! [Unbounded]: `ConcurrentQueue::unbounded()`
//! [closed]: `ConcurrentQueue::close()`
//! [`portable-atomic`]:
//! [`README`]:
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
extern crate alloc;
#[cfg(feature = "std")]
extern crate std;
use alloc::boxed::Box;
use core::fmt;
use sync::atomic::{self, AtomicUsize, Ordering};
#[cfg(feature = "std")]
use std::error;
#[cfg(feature = "std")]
use std::panic::{RefUnwindSafe, UnwindSafe};
use crate::bounded::Bounded;
use crate::single::Single;
use crate::sync::busy_wait;
use crate::unbounded::Unbounded;
mod bounded;
mod single;
mod unbounded;
mod sync;
/// A concurrent queue.
/// # Examples
/// ```
/// use concurrent_queue::{ConcurrentQueue, PopError, PushError};
/// let q = ConcurrentQueue::bounded(2);
/// assert_eq!(q.push('a'), Ok(()));
/// assert_eq!(q.push('b'), Ok(()));
/// assert_eq!(q.push('c'), Err(PushError::Full('c')));
/// assert_eq!(q.pop(), Ok('a'));
/// assert_eq!(q.pop(), Ok('b'));
/// assert_eq!(q.pop(), Err(PopError::Empty));
/// ```
pub struct ConcurrentQueue<T>(Inner<T>);
unsafe impl<T: Send> Send for ConcurrentQueue<T> {}
unsafe impl<T: Send> Sync for ConcurrentQueue<T> {}
#[cfg(feature = "std")]
impl<T> UnwindSafe for ConcurrentQueue<T> {}
#[cfg(feature = "std")]
impl<T> RefUnwindSafe for ConcurrentQueue<T> {}
enum Inner<T> {
impl<T> ConcurrentQueue<T> {
/// Creates a new bounded queue.
/// The queue allocates enough space for `cap` items.
/// # Panics
/// If the capacity is zero, this constructor will panic.
/// # Examples
/// ```
/// use concurrent_queue::ConcurrentQueue;
/// let q = ConcurrentQueue::<i32>::bounded(100);
/// ```
pub fn bounded(cap: usize) -> ConcurrentQueue<T> {
if cap == 1 {
} else {
/// Creates a new unbounded queue.
/// # Examples
/// ```
/// use concurrent_queue::ConcurrentQueue;
/// let q = ConcurrentQueue::<i32>::unbounded();
/// ```
pub fn unbounded() -> ConcurrentQueue<T> {
/// Attempts to push an item into the queue.
/// If the queue is full or closed, the item is returned back as an error.
/// # Examples
/// ```
/// use concurrent_queue::{ConcurrentQueue, PushError};
/// let q = ConcurrentQueue::bounded(1);
/// // Push succeeds because there is space in the queue.
/// assert_eq!(q.push(10), Ok(()));
/// // Push errors because the queue is now full.
/// assert_eq!(q.push(20), Err(PushError::Full(20)));
/// // Close the queue, which will prevent further pushes.
/// q.close();
/// // Pushing now errors indicating the queue is closed.
/// assert_eq!(q.push(20), Err(PushError::Closed(20)));
/// // Pop the single item in the queue.
/// assert_eq!(q.pop(), Ok(10));
/// // Even though there is space, no more items can be pushed.
/// assert_eq!(q.push(20), Err(PushError::Closed(20)));
/// ```
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
match &self.0 {
Inner::Single(q) => q.push(value),
Inner::Bounded(q) => q.push(value),
Inner::Unbounded(q) => q.push(value),
/// Attempts to pop an item from the queue.
/// If the queue is empty, an error is returned.
/// # Examples
/// ```
/// use concurrent_queue::{ConcurrentQueue, PopError};
/// let q = ConcurrentQueue::bounded(1);
/// // Pop errors when the queue is empty.
/// assert_eq!(q.pop(), Err(PopError::Empty));
/// // Push one item and close the queue.
/// assert_eq!(q.push(10), Ok(()));
/// q.close();
/// // Remaining items can be popped.
/// assert_eq!(q.pop(), Ok(10));
/// // Again, pop errors when the queue is empty,
/// // but now also indicates that the queue is closed.
/// assert_eq!(q.pop(), Err(PopError::Closed));
/// ```
pub fn pop(&self) -> Result<T, PopError> {
match &self.0 {
Inner::Single(q) => q.pop(),
Inner::Bounded(q) => q.pop(),
Inner::Unbounded(q) => q.pop(),
/// Returns `true` if the queue is empty.
/// # Examples
/// ```
/// use concurrent_queue::ConcurrentQueue;
/// let q = ConcurrentQueue::<i32>::unbounded();
/// assert!(q.is_empty());
/// q.push(1).unwrap();
/// assert!(!q.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
match &self.0 {
Inner::Single(q) => q.is_empty(),
Inner::Bounded(q) => q.is_empty(),
Inner::Unbounded(q) => q.is_empty(),
/// Returns `true` if the queue is full.
/// An unbounded queue is never full.
/// # Examples
/// ```
/// use concurrent_queue::ConcurrentQueue;
/// let q = ConcurrentQueue::bounded(1);
/// assert!(!q.is_full());
/// q.push(1).unwrap();
/// assert!(q.is_full());
/// ```
pub fn is_full(&self) -> bool {
match &self.0 {
Inner::Single(q) => q.is_full(),
Inner::Bounded(q) => q.is_full(),
Inner::Unbounded(q) => q.is_full(),
/// Returns the number of items in the queue.
/// # Examples
/// ```
/// use concurrent_queue::ConcurrentQueue;
/// let q = ConcurrentQueue::unbounded();
/// assert_eq!(q.len(), 0);
/// assert_eq!(q.push(10), Ok(()));
/// assert_eq!(q.len(), 1);
/// assert_eq!(q.push(20), Ok(()));
/// assert_eq!(q.len(), 2);
/// ```
pub fn len(&self) -> usize {
match &self.0 {
Inner::Single(q) => q.len(),
Inner::Bounded(q) => q.len(),
Inner::Unbounded(q) => q.len(),
/// Returns the capacity of the queue.
/// Unbounded queues have infinite capacity, represented as [`None`].
/// # Examples
/// ```
/// use concurrent_queue::ConcurrentQueue;
/// let q = ConcurrentQueue::<i32>::bounded(7);
/// assert_eq!(q.capacity(), Some(7));
/// let q = ConcurrentQueue::<i32>::unbounded();
/// assert_eq!(q.capacity(), None);
/// ```
pub fn capacity(&self) -> Option<usize> {
match &self.0 {
Inner::Single(_) => Some(1),
Inner::Bounded(q) => Some(q.capacity()),
Inner::Unbounded(_) => None,
/// Closes the queue.
/// Returns `true` if this call closed the queue, or `false` if it was already closed.
/// When a queue is closed, no more items can be pushed but the remaining items can still be
/// popped.
/// # Examples
/// ```
/// use concurrent_queue::{ConcurrentQueue, PopError, PushError};
/// let q = ConcurrentQueue::unbounded();
/// assert_eq!(q.push(10), Ok(()));
/// assert!(q.close()); // `true` because this call closes the queue.
/// assert!(!q.close()); // `false` because the queue is already closed.
/// // Cannot push any more items when closed.
/// assert_eq!(q.push(20), Err(PushError::Closed(20)));
/// // Remaining items can still be popped.
/// assert_eq!(q.pop(), Ok(10));
/// // When no more items are present, the error is `Closed`.
/// assert_eq!(q.pop(), Err(PopError::Closed));
/// ```
pub fn close(&self) -> bool {
match &self.0 {
Inner::Single(q) => q.close(),
Inner::Bounded(q) => q.close(),
Inner::Unbounded(q) => q.close(),
/// Returns `true` if the queue is closed.
/// # Examples
/// ```
/// use concurrent_queue::ConcurrentQueue;
/// let q = ConcurrentQueue::<i32>::unbounded();
/// assert!(!q.is_closed());
/// q.close();
/// assert!(q.is_closed());
/// ```
pub fn is_closed(&self) -> bool {
match &self.0 {
Inner::Single(q) => q.is_closed(),
Inner::Bounded(q) => q.is_closed(),
Inner::Unbounded(q) => q.is_closed(),
impl<T> fmt::Debug for ConcurrentQueue<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
.field("len", &self.len())
.field("capacity", &self.capacity())
.field("is_closed", &self.is_closed())
/// Error which occurs when popping from an empty queue.
#[derive(Clone, Copy, Eq, PartialEq)]
pub enum PopError {
/// The queue is empty but not closed.
/// The queue is empty and closed.
impl PopError {
/// Returns `true` if the queue is empty but not closed.
pub fn is_empty(&self) -> bool {
match self {
PopError::Empty => true,
PopError::Closed => false,
/// Returns `true` if the queue is empty and closed.
pub fn is_closed(&self) -> bool {
match self {
PopError::Empty => false,
PopError::Closed => true,
#[cfg(feature = "std")]
impl error::Error for PopError {}
impl fmt::Debug for PopError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PopError::Empty => write!(f, "Empty"),
PopError::Closed => write!(f, "Closed"),
impl fmt::Display for PopError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PopError::Empty => write!(f, "Empty"),
PopError::Closed => write!(f, "Closed"),
/// Error which occurs when pushing into a full or closed queue.
#[derive(Clone, Copy, Eq, PartialEq)]
pub enum PushError<T> {
/// The queue is full but not closed.
/// The queue is closed.
impl<T> PushError<T> {
/// Unwraps the item that couldn't be pushed.
pub fn into_inner(self) -> T {
match self {
PushError::Full(t) => t,
PushError::Closed(t) => t,
/// Returns `true` if the queue is full but not closed.
pub fn is_full(&self) -> bool {
match self {
PushError::Full(_) => true,
PushError::Closed(_) => false,
/// Returns `true` if the queue is closed.
pub fn is_closed(&self) -> bool {
match self {
PushError::Full(_) => false,
PushError::Closed(_) => true,
#[cfg(feature = "std")]
impl<T: fmt::Debug> error::Error for PushError<T> {}
impl<T: fmt::Debug> fmt::Debug for PushError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PushError::Full(t) => f.debug_tuple("Full").field(t).finish(),
PushError::Closed(t) => f.debug_tuple("Closed").field(t).finish(),
impl<T> fmt::Display for PushError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PushError::Full(_) => write!(f, "Full"),
PushError::Closed(_) => write!(f, "Closed"),
/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
fn full_fence() {
if cfg!(all(
any(target_arch = "x86", target_arch = "x86_64"),
)) {
// HACK(stjepang): On x86 architectures there are two different ways of executing
// a `SeqCst` fence.
// 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
// 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
// Both instructions have the effect of a full barrier, but empirical benchmarks have shown
// that the second one is sometimes a bit faster.
// The ideal solution here would be to use inline assembly, but we're instead creating a
// temporary atomic variable and compare-and-exchanging its value. No sane compiler to
// x86 platforms is going to optimize this away.
let a = AtomicUsize::new(0);
let _ = a.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst);
} else {