| use std::cell::UnsafeCell; |
| use std::mem::MaybeUninit; |
| use std::ptr; |
| use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; |
| use std::thread; |
| |
| use cache_padded::CachePadded; |
| |
| use crate::{PopError, PushError}; |
| |
| // Bits indicating the state of a slot: |
| // * If a value has been written into the slot, `WRITE` is set. |
| // * If a value has been read from the slot, `READ` is set. |
| // * If the block is being destroyed, `DESTROY` is set. |
| const WRITE: usize = 1; |
| const READ: usize = 2; |
| const DESTROY: usize = 4; |
| |
| // Each block covers one "lap" of indices. |
| const LAP: usize = 32; |
| // The maximum number of items a block can hold. |
| const BLOCK_CAP: usize = LAP - 1; |
| // How many lower bits are reserved for metadata. |
| const SHIFT: usize = 1; |
| // Has two different purposes: |
| // * If set in head, indicates that the block is not the last one. |
| // * If set in tail, indicates that the queue is closed. |
| const MARK_BIT: usize = 1; |
| |
| /// A slot in a block. |
| struct Slot<T> { |
| /// The value. |
| value: UnsafeCell<MaybeUninit<T>>, |
| |
| /// The state of the slot. |
| state: AtomicUsize, |
| } |
| |
| impl<T> Slot<T> { |
| const UNINIT: Slot<T> = Slot { |
| value: UnsafeCell::new(MaybeUninit::uninit()), |
| state: AtomicUsize::new(0), |
| }; |
| |
| /// Waits until a value is written into the slot. |
| fn wait_write(&self) { |
| while self.state.load(Ordering::Acquire) & WRITE == 0 { |
| thread::yield_now(); |
| } |
| } |
| } |
| |
| /// A block in a linked list. |
| /// |
| /// Each block in the list can hold up to `BLOCK_CAP` values. |
| struct Block<T> { |
| /// The next block in the linked list. |
| next: AtomicPtr<Block<T>>, |
| |
| /// Slots for values. |
| slots: [Slot<T>; BLOCK_CAP], |
| } |
| |
| impl<T> Block<T> { |
| /// Creates an empty block. |
| fn new() -> Block<T> { |
| Block { |
| next: AtomicPtr::new(ptr::null_mut()), |
| slots: [Slot::UNINIT; BLOCK_CAP], |
| } |
| } |
| |
| /// Waits until the next pointer is set. |
| fn wait_next(&self) -> *mut Block<T> { |
| loop { |
| let next = self.next.load(Ordering::Acquire); |
| if !next.is_null() { |
| return next; |
| } |
| thread::yield_now(); |
| } |
| } |
| |
| /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. |
| unsafe fn destroy(this: *mut Block<T>, start: usize) { |
| // It is not necessary to set the `DESTROY` bit in the last slot because that slot has |
| // begun destruction of the block. |
| for i in start..BLOCK_CAP - 1 { |
| let slot = (*this).slots.get_unchecked(i); |
| |
| // Mark the `DESTROY` bit if a thread is still using the slot. |
| if slot.state.load(Ordering::Acquire) & READ == 0 |
| && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 |
| { |
| // If a thread is still using the slot, it will continue destruction of the block. |
| return; |
| } |
| } |
| |
| // No thread is using the block, now it is safe to destroy it. |
| drop(Box::from_raw(this)); |
| } |
| } |
| |
| /// A position in a queue. |
| struct Position<T> { |
| /// The index in the queue. |
| index: AtomicUsize, |
| |
| /// The block in the linked list. |
| block: AtomicPtr<Block<T>>, |
| } |
| |
| /// An unbounded queue. |
| pub struct Unbounded<T> { |
| /// The head of the queue. |
| head: CachePadded<Position<T>>, |
| |
| /// The tail of the queue. |
| tail: CachePadded<Position<T>>, |
| } |
| |
| impl<T> Unbounded<T> { |
| /// Creates a new unbounded queue. |
| pub fn new() -> Unbounded<T> { |
| Unbounded { |
| head: CachePadded::new(Position { |
| block: AtomicPtr::new(ptr::null_mut()), |
| index: AtomicUsize::new(0), |
| }), |
| tail: CachePadded::new(Position { |
| block: AtomicPtr::new(ptr::null_mut()), |
| index: AtomicUsize::new(0), |
| }), |
| } |
| } |
| |
| /// Pushes an item into the queue. |
| pub fn push(&self, value: T) -> Result<(), PushError<T>> { |
| let mut tail = self.tail.index.load(Ordering::Acquire); |
| let mut block = self.tail.block.load(Ordering::Acquire); |
| let mut next_block = None; |
| |
| loop { |
| // Check if the queue is closed. |
| if tail & MARK_BIT != 0 { |
| return Err(PushError::Closed(value)); |
| } |
| |
| // Calculate the offset of the index into the block. |
| let offset = (tail >> SHIFT) % LAP; |
| |
| // If we reached the end of the block, wait until the next one is installed. |
| if offset == BLOCK_CAP { |
| thread::yield_now(); |
| tail = self.tail.index.load(Ordering::Acquire); |
| block = self.tail.block.load(Ordering::Acquire); |
| continue; |
| } |
| |
| // If we're going to have to install the next block, allocate it in advance in order to |
| // make the wait for other threads as short as possible. |
| if offset + 1 == BLOCK_CAP && next_block.is_none() { |
| next_block = Some(Box::new(Block::<T>::new())); |
| } |
| |
| // If this is the first value to be pushed into the queue, we need to allocate the |
| // first block and install it. |
| if block.is_null() { |
| let new = Box::into_raw(Box::new(Block::<T>::new())); |
| |
| if self |
| .tail |
| .block |
| .compare_and_swap(block, new, Ordering::Release) |
| == block |
| { |
| self.head.block.store(new, Ordering::Release); |
| block = new; |
| } else { |
| next_block = unsafe { Some(Box::from_raw(new)) }; |
| tail = self.tail.index.load(Ordering::Acquire); |
| block = self.tail.block.load(Ordering::Acquire); |
| continue; |
| } |
| } |
| |
| let new_tail = tail + (1 << SHIFT); |
| |
| // Try advancing the tail forward. |
| match self.tail.index.compare_exchange_weak( |
| tail, |
| new_tail, |
| Ordering::SeqCst, |
| Ordering::Acquire, |
| ) { |
| Ok(_) => unsafe { |
| // If we've reached the end of the block, install the next one. |
| if offset + 1 == BLOCK_CAP { |
| let next_block = Box::into_raw(next_block.unwrap()); |
| self.tail.block.store(next_block, Ordering::Release); |
| self.tail.index.fetch_add(1 << SHIFT, Ordering::Release); |
| (*block).next.store(next_block, Ordering::Release); |
| } |
| |
| // Write the value into the slot. |
| let slot = (*block).slots.get_unchecked(offset); |
| slot.value.get().write(MaybeUninit::new(value)); |
| slot.state.fetch_or(WRITE, Ordering::Release); |
| return Ok(()); |
| }, |
| Err(t) => { |
| tail = t; |
| block = self.tail.block.load(Ordering::Acquire); |
| } |
| } |
| } |
| } |
| |
| /// Pops an item from the queue. |
| pub fn pop(&self) -> Result<T, PopError> { |
| let mut head = self.head.index.load(Ordering::Acquire); |
| let mut block = self.head.block.load(Ordering::Acquire); |
| |
| loop { |
| // Calculate the offset of the index into the block. |
| let offset = (head >> SHIFT) % LAP; |
| |
| // If we reached the end of the block, wait until the next one is installed. |
| if offset == BLOCK_CAP { |
| thread::yield_now(); |
| head = self.head.index.load(Ordering::Acquire); |
| block = self.head.block.load(Ordering::Acquire); |
| continue; |
| } |
| |
| let mut new_head = head + (1 << SHIFT); |
| |
| if new_head & MARK_BIT == 0 { |
| crate::full_fence(); |
| let tail = self.tail.index.load(Ordering::Relaxed); |
| |
| // If the tail equals the head, that means the queue is empty. |
| if head >> SHIFT == tail >> SHIFT { |
| // Check if the queue is closed. |
| if tail & MARK_BIT != 0 { |
| return Err(PopError::Closed); |
| } else { |
| return Err(PopError::Empty); |
| } |
| } |
| |
| // If head and tail are not in the same block, set `MARK_BIT` in head. |
| if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { |
| new_head |= MARK_BIT; |
| } |
| } |
| |
| // The block can be null here only if the first push operation is in progress. |
| if block.is_null() { |
| thread::yield_now(); |
| head = self.head.index.load(Ordering::Acquire); |
| block = self.head.block.load(Ordering::Acquire); |
| continue; |
| } |
| |
| // Try moving the head index forward. |
| match self.head.index.compare_exchange_weak( |
| head, |
| new_head, |
| Ordering::SeqCst, |
| Ordering::Acquire, |
| ) { |
| Ok(_) => unsafe { |
| // If we've reached the end of the block, move to the next one. |
| if offset + 1 == BLOCK_CAP { |
| let next = (*block).wait_next(); |
| let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); |
| if !(*next).next.load(Ordering::Relaxed).is_null() { |
| next_index |= MARK_BIT; |
| } |
| |
| self.head.block.store(next, Ordering::Release); |
| self.head.index.store(next_index, Ordering::Release); |
| } |
| |
| // Read the value. |
| let slot = (*block).slots.get_unchecked(offset); |
| slot.wait_write(); |
| let value = slot.value.get().read().assume_init(); |
| |
| // Destroy the block if we've reached the end, or if another thread wanted to |
| // destroy but couldn't because we were busy reading from the slot. |
| if offset + 1 == BLOCK_CAP { |
| Block::destroy(block, 0); |
| } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { |
| Block::destroy(block, offset + 1); |
| } |
| |
| return Ok(value); |
| }, |
| Err(h) => { |
| head = h; |
| block = self.head.block.load(Ordering::Acquire); |
| } |
| } |
| } |
| } |
| |
| /// Returns the number of items in the queue. |
| pub fn len(&self) -> usize { |
| loop { |
| // Load the tail index, then load the head index. |
| let mut tail = self.tail.index.load(Ordering::SeqCst); |
| let mut head = self.head.index.load(Ordering::SeqCst); |
| |
| // If the tail index didn't change, we've got consistent indices to work with. |
| if self.tail.index.load(Ordering::SeqCst) == tail { |
| // Erase the lower bits. |
| tail &= !((1 << SHIFT) - 1); |
| head &= !((1 << SHIFT) - 1); |
| |
| // Fix up indices if they fall onto block ends. |
| if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { |
| tail = tail.wrapping_add(1 << SHIFT); |
| } |
| if (head >> SHIFT) & (LAP - 1) == LAP - 1 { |
| head = head.wrapping_add(1 << SHIFT); |
| } |
| |
| // Rotate indices so that head falls into the first block. |
| let lap = (head >> SHIFT) / LAP; |
| tail = tail.wrapping_sub((lap * LAP) << SHIFT); |
| head = head.wrapping_sub((lap * LAP) << SHIFT); |
| |
| // Remove the lower bits. |
| tail >>= SHIFT; |
| head >>= SHIFT; |
| |
| // Return the difference minus the number of blocks between tail and head. |
| return tail - head - tail / LAP; |
| } |
| } |
| } |
| |
| /// Returns `true` if the queue is empty. |
| pub fn is_empty(&self) -> bool { |
| let head = self.head.index.load(Ordering::SeqCst); |
| let tail = self.tail.index.load(Ordering::SeqCst); |
| head >> SHIFT == tail >> SHIFT |
| } |
| |
| /// Returns `true` if the queue is full. |
| pub fn is_full(&self) -> bool { |
| false |
| } |
| |
| /// Closes the queue. |
| /// |
| /// Returns `true` if this call closed the queue. |
| pub fn close(&self) -> bool { |
| let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); |
| tail & MARK_BIT == 0 |
| } |
| |
| /// Returns `true` if the queue is closed. |
| pub fn is_closed(&self) -> bool { |
| self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0 |
| } |
| } |
| |
| impl<T> Drop for Unbounded<T> { |
| fn drop(&mut self) { |
| let mut head = self.head.index.load(Ordering::Relaxed); |
| let mut tail = self.tail.index.load(Ordering::Relaxed); |
| let mut block = self.head.block.load(Ordering::Relaxed); |
| |
| // Erase the lower bits. |
| head &= !((1 << SHIFT) - 1); |
| tail &= !((1 << SHIFT) - 1); |
| |
| unsafe { |
| // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. |
| while head != tail { |
| let offset = (head >> SHIFT) % LAP; |
| |
| if offset < BLOCK_CAP { |
| // Drop the value in the slot. |
| let slot = (*block).slots.get_unchecked(offset); |
| let value = slot.value.get().read().assume_init(); |
| drop(value); |
| } else { |
| // Deallocate the block and move to the next one. |
| let next = (*block).next.load(Ordering::Relaxed); |
| drop(Box::from_raw(block)); |
| block = next; |
| } |
| |
| head = head.wrapping_add(1 << SHIFT); |
| } |
| |
| // Deallocate the last remaining block. |
| if !block.is_null() { |
| drop(Box::from_raw(block)); |
| } |
| } |
| } |
| } |