|  | use alloc::boxed::Box; | 
|  | use core::cell::UnsafeCell; | 
|  | use core::fmt; | 
|  | use core::marker::PhantomData; | 
|  | use core::mem::{self, ManuallyDrop}; | 
|  | use core::ptr; | 
|  | use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; | 
|  |  | 
|  | use crossbeam_utils::{Backoff, CachePadded}; | 
|  |  | 
|  | use err::PopError; | 
|  |  | 
|  | // Bits indicating the state of a slot: | 
|  | // * If a value has been written into the slot, `WRITE` is set. | 
|  | // * If a value has been read from the slot, `READ` is set. | 
|  | // * If the block is being destroyed, `DESTROY` is set. | 
|  | const WRITE: usize = 1; | 
|  | const READ: usize = 2; | 
|  | const DESTROY: usize = 4; | 
|  |  | 
|  | // Each block covers one "lap" of indices. | 
|  | const LAP: usize = 32; | 
|  | // The maximum number of values a block can hold. | 
|  | const BLOCK_CAP: usize = LAP - 1; | 
|  | // How many lower bits are reserved for metadata. | 
|  | const SHIFT: usize = 1; | 
|  | // Indicates that the block is not the last one. | 
|  | const HAS_NEXT: usize = 1; | 
|  |  | 
|  | /// A slot in a block. | 
|  | struct Slot<T> { | 
|  | /// The value. | 
|  | value: UnsafeCell<ManuallyDrop<T>>, | 
|  |  | 
|  | /// The state of the slot. | 
|  | state: AtomicUsize, | 
|  | } | 
|  |  | 
|  | impl<T> Slot<T> { | 
|  | /// Waits until a value is written into the slot. | 
|  | fn wait_write(&self) { | 
|  | let backoff = Backoff::new(); | 
|  | while self.state.load(Ordering::Acquire) & WRITE == 0 { | 
|  | backoff.snooze(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// A block in a linked list. | 
|  | /// | 
|  | /// Each block in the list can hold up to `BLOCK_CAP` values. | 
|  | struct Block<T> { | 
|  | /// The next block in the linked list. | 
|  | next: AtomicPtr<Block<T>>, | 
|  |  | 
|  | /// Slots for values. | 
|  | slots: [Slot<T>; BLOCK_CAP], | 
|  | } | 
|  |  | 
|  | impl<T> Block<T> { | 
|  | /// Creates an empty block that starts at `start_index`. | 
|  | fn new() -> Block<T> { | 
|  | unsafe { mem::zeroed() } | 
|  | } | 
|  |  | 
|  | /// Waits until the next pointer is set. | 
|  | fn wait_next(&self) -> *mut Block<T> { | 
|  | let backoff = Backoff::new(); | 
|  | loop { | 
|  | let next = self.next.load(Ordering::Acquire); | 
|  | if !next.is_null() { | 
|  | return next; | 
|  | } | 
|  | backoff.snooze(); | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. | 
|  | unsafe fn destroy(this: *mut Block<T>, start: usize) { | 
|  | // It is not necessary to set the `DESTROY` bit in the last slot because that slot has | 
|  | // begun destruction of the block. | 
|  | for i in start..BLOCK_CAP - 1 { | 
|  | let slot = (*this).slots.get_unchecked(i); | 
|  |  | 
|  | // Mark the `DESTROY` bit if a thread is still using the slot. | 
|  | if slot.state.load(Ordering::Acquire) & READ == 0 | 
|  | && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 | 
|  | { | 
|  | // If a thread is still using the slot, it will continue destruction of the block. | 
|  | return; | 
|  | } | 
|  | } | 
|  |  | 
|  | // No thread is using the block, now it is safe to destroy it. | 
|  | drop(Box::from_raw(this)); | 
|  | } | 
|  | } | 
|  |  | 
|  | /// A position in a queue. | 
|  | struct Position<T> { | 
|  | /// The index in the queue. | 
|  | index: AtomicUsize, | 
|  |  | 
|  | /// The block in the linked list. | 
|  | block: AtomicPtr<Block<T>>, | 
|  | } | 
|  |  | 
|  | /// An unbounded multi-producer multi-consumer queue. | 
|  | /// | 
|  | /// This queue is implemented as a linked list of segments, where each segment is a small buffer | 
|  | /// that can hold a handful of elements. There is no limit to how many elements can be in the queue | 
|  | /// at a time. However, since segments need to be dynamically allocated as elements get pushed, | 
|  | /// this queue is somewhat slower than [`ArrayQueue`]. | 
|  | /// | 
|  | /// [`ArrayQueue`]: struct.ArrayQueue.html | 
|  | /// | 
|  | /// # Examples | 
|  | /// | 
|  | /// ``` | 
|  | /// use crossbeam_queue::{PopError, SegQueue}; | 
|  | /// | 
|  | /// let q = SegQueue::new(); | 
|  | /// | 
|  | /// q.push('a'); | 
|  | /// q.push('b'); | 
|  | /// | 
|  | /// assert_eq!(q.pop(), Ok('a')); | 
|  | /// assert_eq!(q.pop(), Ok('b')); | 
|  | /// assert_eq!(q.pop(), Err(PopError)); | 
|  | /// ``` | 
|  | pub struct SegQueue<T> { | 
|  | /// The head of the queue. | 
|  | head: CachePadded<Position<T>>, | 
|  |  | 
|  | /// The tail of the queue. | 
|  | tail: CachePadded<Position<T>>, | 
|  |  | 
|  | /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`. | 
|  | _marker: PhantomData<T>, | 
|  | } | 
|  |  | 
|  | unsafe impl<T: Send> Send for SegQueue<T> {} | 
|  | unsafe impl<T: Send> Sync for SegQueue<T> {} | 
|  |  | 
|  | impl<T> SegQueue<T> { | 
|  | /// Creates a new unbounded queue. | 
|  | /// | 
|  | /// # Examples | 
|  | /// | 
|  | /// ``` | 
|  | /// use crossbeam_queue::SegQueue; | 
|  | /// | 
|  | /// let q = SegQueue::<i32>::new(); | 
|  | /// ``` | 
|  | pub fn new() -> SegQueue<T> { | 
|  | SegQueue { | 
|  | head: CachePadded::new(Position { | 
|  | block: AtomicPtr::new(ptr::null_mut()), | 
|  | index: AtomicUsize::new(0), | 
|  | }), | 
|  | tail: CachePadded::new(Position { | 
|  | block: AtomicPtr::new(ptr::null_mut()), | 
|  | index: AtomicUsize::new(0), | 
|  | }), | 
|  | _marker: PhantomData, | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Pushes an element into the queue. | 
|  | /// | 
|  | /// # Examples | 
|  | /// | 
|  | /// ``` | 
|  | /// use crossbeam_queue::SegQueue; | 
|  | /// | 
|  | /// let q = SegQueue::new(); | 
|  | /// | 
|  | /// q.push(10); | 
|  | /// q.push(20); | 
|  | /// ``` | 
|  | pub fn push(&self, value: T) { | 
|  | let backoff = Backoff::new(); | 
|  | let mut tail = self.tail.index.load(Ordering::Acquire); | 
|  | let mut block = self.tail.block.load(Ordering::Acquire); | 
|  | let mut next_block = None; | 
|  |  | 
|  | loop { | 
|  | // Calculate the offset of the index into the block. | 
|  | let offset = (tail >> SHIFT) % LAP; | 
|  |  | 
|  | // If we reached the end of the block, wait until the next one is installed. | 
|  | if offset == BLOCK_CAP { | 
|  | backoff.snooze(); | 
|  | tail = self.tail.index.load(Ordering::Acquire); | 
|  | block = self.tail.block.load(Ordering::Acquire); | 
|  | continue; | 
|  | } | 
|  |  | 
|  | // If we're going to have to install the next block, allocate it in advance in order to | 
|  | // make the wait for other threads as short as possible. | 
|  | if offset + 1 == BLOCK_CAP && next_block.is_none() { | 
|  | next_block = Some(Box::new(Block::<T>::new())); | 
|  | } | 
|  |  | 
|  | // If this is the first push operation, we need to allocate the first block. | 
|  | if block.is_null() { | 
|  | let new = Box::into_raw(Box::new(Block::<T>::new())); | 
|  |  | 
|  | if self | 
|  | .tail | 
|  | .block | 
|  | .compare_and_swap(block, new, Ordering::Release) | 
|  | == block | 
|  | { | 
|  | self.head.block.store(new, Ordering::Release); | 
|  | block = new; | 
|  | } else { | 
|  | next_block = unsafe { Some(Box::from_raw(new)) }; | 
|  | tail = self.tail.index.load(Ordering::Acquire); | 
|  | block = self.tail.block.load(Ordering::Acquire); | 
|  | continue; | 
|  | } | 
|  | } | 
|  |  | 
|  | let new_tail = tail + (1 << SHIFT); | 
|  |  | 
|  | // Try advancing the tail forward. | 
|  | match self.tail.index.compare_exchange_weak( | 
|  | tail, | 
|  | new_tail, | 
|  | Ordering::SeqCst, | 
|  | Ordering::Acquire, | 
|  | ) { | 
|  | Ok(_) => unsafe { | 
|  | // If we've reached the end of the block, install the next one. | 
|  | if offset + 1 == BLOCK_CAP { | 
|  | let next_block = Box::into_raw(next_block.unwrap()); | 
|  | let next_index = new_tail.wrapping_add(1 << SHIFT); | 
|  |  | 
|  | self.tail.block.store(next_block, Ordering::Release); | 
|  | self.tail.index.store(next_index, Ordering::Release); | 
|  | (*block).next.store(next_block, Ordering::Release); | 
|  | } | 
|  |  | 
|  | // Write the value into the slot. | 
|  | let slot = (*block).slots.get_unchecked(offset); | 
|  | slot.value.get().write(ManuallyDrop::new(value)); | 
|  | slot.state.fetch_or(WRITE, Ordering::Release); | 
|  |  | 
|  | return; | 
|  | }, | 
|  | Err(t) => { | 
|  | tail = t; | 
|  | block = self.tail.block.load(Ordering::Acquire); | 
|  | backoff.spin(); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Pops an element from the queue. | 
|  | /// | 
|  | /// If the queue is empty, an error is returned. | 
|  | /// | 
|  | /// # Examples | 
|  | /// | 
|  | /// ``` | 
|  | /// use crossbeam_queue::{PopError, SegQueue}; | 
|  | /// | 
|  | /// let q = SegQueue::new(); | 
|  | /// | 
|  | /// q.push(10); | 
|  | /// assert_eq!(q.pop(), Ok(10)); | 
|  | /// assert_eq!(q.pop(), Err(PopError)); | 
|  | /// ``` | 
|  | pub fn pop(&self) -> Result<T, PopError> { | 
|  | let backoff = Backoff::new(); | 
|  | let mut head = self.head.index.load(Ordering::Acquire); | 
|  | let mut block = self.head.block.load(Ordering::Acquire); | 
|  |  | 
|  | loop { | 
|  | // Calculate the offset of the index into the block. | 
|  | let offset = (head >> SHIFT) % LAP; | 
|  |  | 
|  | // If we reached the end of the block, wait until the next one is installed. | 
|  | if offset == BLOCK_CAP { | 
|  | backoff.snooze(); | 
|  | head = self.head.index.load(Ordering::Acquire); | 
|  | block = self.head.block.load(Ordering::Acquire); | 
|  | continue; | 
|  | } | 
|  |  | 
|  | let mut new_head = head + (1 << SHIFT); | 
|  |  | 
|  | if new_head & HAS_NEXT == 0 { | 
|  | atomic::fence(Ordering::SeqCst); | 
|  | let tail = self.tail.index.load(Ordering::Relaxed); | 
|  |  | 
|  | // If the tail equals the head, that means the queue is empty. | 
|  | if head >> SHIFT == tail >> SHIFT { | 
|  | return Err(PopError); | 
|  | } | 
|  |  | 
|  | // If head and tail are not in the same block, set `HAS_NEXT` in head. | 
|  | if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { | 
|  | new_head |= HAS_NEXT; | 
|  | } | 
|  | } | 
|  |  | 
|  | // The block can be null here only if the first push operation is in progress. In that | 
|  | // case, just wait until it gets initialized. | 
|  | if block.is_null() { | 
|  | backoff.snooze(); | 
|  | head = self.head.index.load(Ordering::Acquire); | 
|  | block = self.head.block.load(Ordering::Acquire); | 
|  | continue; | 
|  | } | 
|  |  | 
|  | // Try moving the head index forward. | 
|  | match self.head.index.compare_exchange_weak( | 
|  | head, | 
|  | new_head, | 
|  | Ordering::SeqCst, | 
|  | Ordering::Acquire, | 
|  | ) { | 
|  | Ok(_) => unsafe { | 
|  | // If we've reached the end of the block, move to the next one. | 
|  | if offset + 1 == BLOCK_CAP { | 
|  | let next = (*block).wait_next(); | 
|  | let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); | 
|  | if !(*next).next.load(Ordering::Relaxed).is_null() { | 
|  | next_index |= HAS_NEXT; | 
|  | } | 
|  |  | 
|  | self.head.block.store(next, Ordering::Release); | 
|  | self.head.index.store(next_index, Ordering::Release); | 
|  | } | 
|  |  | 
|  | // Read the value. | 
|  | let slot = (*block).slots.get_unchecked(offset); | 
|  | slot.wait_write(); | 
|  | let m = slot.value.get().read(); | 
|  | let value = ManuallyDrop::into_inner(m); | 
|  |  | 
|  | // Destroy the block if we've reached the end, or if another thread wanted to | 
|  | // destroy but couldn't because we were busy reading from the slot. | 
|  | if offset + 1 == BLOCK_CAP { | 
|  | Block::destroy(block, 0); | 
|  | } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { | 
|  | Block::destroy(block, offset + 1); | 
|  | } | 
|  |  | 
|  | return Ok(value); | 
|  | }, | 
|  | Err(h) => { | 
|  | head = h; | 
|  | block = self.head.block.load(Ordering::Acquire); | 
|  | backoff.spin(); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Returns `true` if the queue is empty. | 
|  | /// | 
|  | /// # Examples | 
|  | /// | 
|  | /// ``` | 
|  | /// use crossbeam_queue::SegQueue; | 
|  | /// | 
|  | /// let q = SegQueue::new(); | 
|  | /// | 
|  | /// assert!(q.is_empty()); | 
|  | /// q.push(1); | 
|  | /// assert!(!q.is_empty()); | 
|  | /// ``` | 
|  | pub fn is_empty(&self) -> bool { | 
|  | let head = self.head.index.load(Ordering::SeqCst); | 
|  | let tail = self.tail.index.load(Ordering::SeqCst); | 
|  | head >> SHIFT == tail >> SHIFT | 
|  | } | 
|  |  | 
|  | /// Returns the number of elements in the queue. | 
|  | /// | 
|  | /// # Examples | 
|  | /// | 
|  | /// ``` | 
|  | /// use crossbeam_queue::{SegQueue, PopError}; | 
|  | /// | 
|  | /// let q = SegQueue::new(); | 
|  | /// assert_eq!(q.len(), 0); | 
|  | /// | 
|  | /// q.push(10); | 
|  | /// assert_eq!(q.len(), 1); | 
|  | /// | 
|  | /// q.push(20); | 
|  | /// assert_eq!(q.len(), 2); | 
|  | /// ``` | 
|  | pub fn len(&self) -> usize { | 
|  | loop { | 
|  | // Load the tail index, then load the head index. | 
|  | let mut tail = self.tail.index.load(Ordering::SeqCst); | 
|  | let mut head = self.head.index.load(Ordering::SeqCst); | 
|  |  | 
|  | // If the tail index didn't change, we've got consistent indices to work with. | 
|  | if self.tail.index.load(Ordering::SeqCst) == tail { | 
|  | // Erase the lower bits. | 
|  | tail &= !((1 << SHIFT) - 1); | 
|  | head &= !((1 << SHIFT) - 1); | 
|  |  | 
|  | // Rotate indices so that head falls into the first block. | 
|  | let lap = (head >> SHIFT) / LAP; | 
|  | tail = tail.wrapping_sub((lap * LAP) << SHIFT); | 
|  | head = head.wrapping_sub((lap * LAP) << SHIFT); | 
|  |  | 
|  | // Remove the lower bits. | 
|  | tail >>= SHIFT; | 
|  | head >>= SHIFT; | 
|  |  | 
|  | // Fix up indices if they fall onto block ends. | 
|  | if head == BLOCK_CAP { | 
|  | head = 0; | 
|  | tail -= LAP; | 
|  | } | 
|  | if tail == BLOCK_CAP { | 
|  | tail += 1; | 
|  | } | 
|  |  | 
|  | // Return the difference minus the number of blocks between tail and head. | 
|  | return tail - head - tail / LAP; | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Drop for SegQueue<T> { | 
|  | fn drop(&mut self) { | 
|  | let mut head = self.head.index.load(Ordering::Relaxed); | 
|  | let mut tail = self.tail.index.load(Ordering::Relaxed); | 
|  | let mut block = self.head.block.load(Ordering::Relaxed); | 
|  |  | 
|  | // Erase the lower bits. | 
|  | head &= !((1 << SHIFT) - 1); | 
|  | tail &= !((1 << SHIFT) - 1); | 
|  |  | 
|  | unsafe { | 
|  | // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. | 
|  | while head != tail { | 
|  | let offset = (head >> SHIFT) % LAP; | 
|  |  | 
|  | if offset < BLOCK_CAP { | 
|  | // Drop the value in the slot. | 
|  | let slot = (*block).slots.get_unchecked(offset); | 
|  | ManuallyDrop::drop(&mut *(*slot).value.get()); | 
|  | } else { | 
|  | // Deallocate the block and move to the next one. | 
|  | let next = (*block).next.load(Ordering::Relaxed); | 
|  | drop(Box::from_raw(block)); | 
|  | block = next; | 
|  | } | 
|  |  | 
|  | head = head.wrapping_add(1 << SHIFT); | 
|  | } | 
|  |  | 
|  | // Deallocate the last remaining block. | 
|  | if !block.is_null() { | 
|  | drop(Box::from_raw(block)); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> fmt::Debug for SegQueue<T> { | 
|  | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | 
|  | f.pad("SegQueue { .. }") | 
|  | } | 
|  | } | 
|  |  | 
|  | impl<T> Default for SegQueue<T> { | 
|  | fn default() -> SegQueue<T> { | 
|  | SegQueue::new() | 
|  | } | 
|  | } |