blob: d3aa4f68edc3826c687bf0910cdcffdeeda3a1ba [file] [log] [blame]
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::isize;
use std::marker::PhantomData;
use std::mem;
use std::pin::Pin;
use std::process;
use std::ptr;
use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use crossbeam_utils::Backoff;
use futures_util::future;
use futures_sink::Sink;
use futures_util::stream::Stream;
use crate::event::{Event, EventListener};
/// Creates a bounded multi-producer multi-consumer channel.
///
/// This channel has a buffer that holds at most `cap` messages at a time. If `cap` is zero, no
/// messages can be stored in the channel, which means send operations must pair with receive
/// operations in order to pass messages over.
///
/// Senders and receivers can be cloned. When all senders associated with a channel are dropped,
/// remaining messages can still be received, but after that receive operations will return
/// [`None`]. On the other hand, when all receivers are dropped, further send operation block
/// forever.
///
/// # Examples
///
/// ```
/// use smol::{Task, Timer};
/// use std::time::Duration;
///
/// # smol::run(async {
/// // Create a channel that can hold 1 message at a time.
/// let (s, r) = piper::chan(1);
///
/// // Sending completes immediately because there is enough space in the channel.
/// s.send(1).await;
///
/// let t = Task::spawn(async move {
/// // This send operation is blocked because the channel is full.
/// // It will be able to complete only after the first message is received.
/// s.send(2).await;
/// });
///
/// // Sleep for a second and then receive both messages.
/// Timer::after(Duration::from_secs(1)).await;
/// assert_eq!(r.recv().await, Some(1));
/// assert_eq!(r.recv().await, Some(2));
/// # })
/// ```
pub fn chan<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
let channel = Arc::new(Channel::with_capacity(cap));
let s = Sender {
channel: channel.clone(),
buffer: VecDeque::new(),
listener: None,
};
let r = Receiver {
channel,
listener: None,
};
(s, r)
}
/// The sending side of a channel.
///
/// This struct is created by the [`chan`] function. See its documentation for more.
///
/// Senders can be cloned and implement [`Sink`].
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// let (s1, r) = piper::chan(100);
/// let s2 = s1.clone();
///
/// let t1 = Task::spawn(async move { s1.send(1).await });
/// let t2 = Task::spawn(async move { s2.send(2).await });
///
/// let msg1 = r.recv().await.unwrap();
/// let msg2 = r.recv().await.unwrap();
/// assert_eq!(msg1 + msg2, 3);
/// # })
/// ```
pub struct Sender<T> {
/// The inner channel.
channel: Arc<Channel<T>>,
/// The sink buffer.
///
/// Messages sent into this sender as a sink are first buffered, and then they get sent into
/// the channel when the sink is flushed.
buffer: VecDeque<T>,
/// Listens for a receive or handoff event that unblocks this sink.
listener: Option<EventListener>,
}
impl<T> Unpin for Sender<T> {}
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}
impl<T> Sender<T> {
/// Sends a message into the channel.
///
/// If the channel is full, this method waits until there is space for a message in the
/// channel. If there are no receivers on this channel, sending waits forever.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// let (s, r) = piper::chan(1);
///
/// let t = Task::spawn(async move {
/// s.send(1).await;
/// s.send(2).await;
/// });
///
/// assert_eq!(r.recv().await, Some(1));
/// assert_eq!(r.recv().await, Some(2));
/// assert_eq!(r.recv().await, None);
/// # })
/// ```
pub async fn send(&self, msg: T) {
self.channel.send(msg).await
}
/// Returns the channel capacity.
///
/// # Examples
///
/// ```
/// let (s, _) = piper::chan::<i32>(5);
/// assert_eq!(s.capacity(), 5);
/// ```
pub fn capacity(&self) -> usize {
// If this channel does handoff, the capacity is 0, even though the internal buffer's
// capacity is 1.
if self.channel.handoff.is_some() {
0
} else {
self.channel.cap
}
}
/// Returns `true` if the channel is empty.
///
/// If the channel's capacity is zero, it is always empty.
///
/// # Examples
///
/// ```
/// # smol::run(async {
/// let (s, r) = piper::chan(1);
///
/// assert!(s.is_empty());
/// s.send(0).await;
/// assert!(!s.is_empty());
/// # })
/// ```
pub fn is_empty(&self) -> bool {
if self.capacity() == 0 {
true
} else {
self.channel.is_empty()
}
}
/// Returns `true` if the channel is full.
///
/// If the channel's capacity is zero, it is always full.
///
/// # Examples
///
/// ```
///
/// # smol::run(async {
/// let (s, r) = piper::chan(1);
///
/// assert!(!s.is_full());
/// s.send(0).await;
/// assert!(s.is_full());
/// #
/// # })
/// ```
pub fn is_full(&self) -> bool {
if self.capacity() == 0 {
true
} else {
self.channel.is_full()
}
}
/// Returns the number of messages in the channel.
///
/// # Examples
///
/// ```
/// # smol::run(async {
/// let (s, r) = piper::chan(2);
/// assert_eq!(s.len(), 0);
///
/// s.send(1).await;
/// s.send(2).await;
/// assert_eq!(s.len(), 2);
/// # })
/// ```
pub fn len(&self) -> usize {
self.channel.len()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
// Decrement the sender count and disconnect the channel if it drops down to zero.
if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.channel.disconnect();
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
// Make sure the count never overflows, even if lots of sender clones are leaked.
if count > isize::MAX as usize {
process::abort();
}
Sender {
channel: self.channel.clone(),
buffer: VecDeque::new(),
listener: None,
}
}
}
impl<T> Sink<T> for Sender<T> {
type Error = std::convert::Infallible;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.buffer.is_empty() {
Poll::Ready(Ok(()))
} else {
// If there are messages in the buffer, we should encourage the user to flush the sink
// rather than sending more messages into the sink.
Poll::Pending
}
}
fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
// Sending simply involves pushing the message into the sink's buffer.
self.buffer.push_back(msg);
Ok(())
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
// If this sink is blocked on an event, first make sure it is unblocked.
if let Some(listener) = self.listener.as_mut() {
futures_util::ready!(Pin::new(listener).poll(cx));
self.listener = None;
}
loop {
// Get the next message from the buffer.
let msg = match self.buffer.pop_front() {
None => return Poll::Ready(Ok(())),
Some(msg) => msg,
};
// Attempt to send the message.
match self.channel.try_send(msg) {
Ok(stamp) => {
// The sink is not blocked on an event - drop the listener.
self.listener = None;
// If this is a zero-capacity channel, we need to wait for the receiver to
// confirm the message was received.
if let Some(h) = &self.channel.handoff {
// The internal buffer's capacity is 1, which means the pointer to the
// buffer matches the pointer to the first (and only) slot in it.
let slot_stamp = unsafe { &(*self.channel.buffer).stamp };
// If the stamp didn't change, the message was not received yet.
if slot_stamp.load(Ordering::SeqCst) == stamp {
// Listen for a handoff event.
let listener = h.listen();
// Check the stamp again.
if slot_stamp.load(Ordering::SeqCst) == stamp {
// Now we're really blocked on handoff - store this listener
// and go back to the outer loop.
self.listener = Some(listener);
break;
}
}
}
// Continue the inner loop to send the next message...
}
Err(TrySendError::Disconnected(_)) => {
// The sink is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Pending;
}
Err(TrySendError::Full(m)) => {
// The buffer is full - put the message back into the buffer.
self.buffer.push_front(m);
// Listen for a receive event.
match self.listener.as_mut() {
None => {
// Create a listener and try sending the message again.
self.listener = Some(self.channel.sink_ops.listen());
}
Some(_) => {
// Go back to the outer loop to poll the listener.
break;
}
}
}
}
}
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Sender { .. }")
}
}
/// The receiving side of a channel.
///
/// This struct is created by the [`chan`] function. See its documentation for more.
///
/// Receivers can be cloned and implement [`Stream`]. Note if a message is sent into a channel and
/// there are multiple receivers, only one of them will receive the message.
///
/// # Examples
///
/// ```
/// use smol::{Task, Timer};
/// use std::time::Duration;
///
/// # smol::run(async {
/// let (s, r) = piper::chan(100);
///
/// let t = Task::spawn((async move {
/// s.send(1).await;
/// Timer::after(Duration::from_secs(1)).await;
/// s.send(2).await;
/// });
///
/// assert_eq!(r.recv().await, Some(1)); // Received immediately.
/// assert_eq!(r.recv().await, Some(2)); // Received after 1 second.
/// #
/// # })
/// ```
pub struct Receiver<T> {
/// The inner channel.
channel: Arc<Channel<T>>,
/// Listens for a send or disconnect event that unblocks this stream.
listener: Option<EventListener>,
}
impl<T> Unpin for Receiver<T> {}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}
impl<T> Receiver<T> {
/// TODO
pub fn try_recv(&self) -> Option<T> {
self.channel.try_recv().ok()
}
/// Receives a message from the channel.
///
/// If the channel is empty and still has senders, this method will wait until a message is
/// sent into the channel or until all senders get dropped.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
/// use async_std::task;
///
/// let (s, r) = channel(1);
///
/// task::spawn(async move {
/// s.send(1).await;
/// s.send(2).await;
/// });
///
/// assert_eq!(r.recv().await, Some(1));
/// assert_eq!(r.recv().await, Some(2));
/// assert_eq!(r.recv().await, None);
/// #
/// # })
/// ```
pub async fn recv(&self) -> Option<T> {
self.channel.recv().await
}
/// Returns the channel capacity.
///
/// # Examples
///
/// ```
/// use async_std::sync::channel;
///
/// let (_, r) = channel::<i32>(5);
/// assert_eq!(r.capacity(), 5);
/// ```
pub fn capacity(&self) -> usize {
// If this channel does handoff, the capacity is 0, even though the internal buffer's
// capacity is 1.
if self.channel.handoff.is_some() {
0
} else {
self.channel.cap
}
}
/// Returns `true` if the channel is empty.
///
/// If the channel's capacity is zero, it is always empty.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
///
/// let (s, r) = channel(1);
///
/// assert!(r.is_empty());
/// s.send(0).await;
/// assert!(!r.is_empty());
/// #
/// # })
/// ```
pub fn is_empty(&self) -> bool {
if self.capacity() == 0 {
true
} else {
self.channel.is_empty()
}
}
/// Returns `true` if the channel is full.
///
/// If the channel's capacity is zero, it is always full.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
///
/// let (s, r) = channel(1);
///
/// assert!(!r.is_full());
/// s.send(0).await;
/// assert!(r.is_full());
/// #
/// # })
/// ```
pub fn is_full(&self) -> bool {
if self.capacity() == 0 {
true
} else {
self.channel.is_full()
}
}
/// Returns the number of messages in the channel.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
///
/// let (s, r) = channel(2);
/// assert_eq!(r.len(), 0);
///
/// s.send(1).await;
/// s.send(2).await;
/// assert_eq!(r.len(), 2);
/// #
/// # })
/// ```
pub fn len(&self) -> usize {
self.channel.len()
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
// Decrement the receiver count and disconnect the channel if it drops down to zero.
if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.channel.disconnect();
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Receiver<T> {
let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
// Make sure the count never overflows, even if lots of receiver clones are leaked.
if count > isize::MAX as usize {
process::abort();
}
Receiver {
channel: self.channel.clone(),
listener: None,
}
}
}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// If this stream is blocked on an event, first make sure it is unblocked.
if let Some(listener) = self.listener.as_mut() {
futures_util::ready!(Pin::new(listener).poll(cx));
self.listener = None;
}
loop {
// Attempt to receive a message.
match self.channel.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Disconnected) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Empty) => {}
}
// Listen for a send event.
match self.listener.as_mut() {
None => {
// Create a listener and try sending the message again.
self.listener = Some(self.channel.next_ops.listen());
}
Some(_) => {
// Go back to the outer loop to poll the listener.
break;
}
}
}
}
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Receiver { .. }")
}
}
/// A slot in a channel.
struct Slot<T> {
/// The current stamp.
stamp: AtomicUsize,
/// The message in this slot.
msg: UnsafeCell<T>,
}
/// Bounded channel based on a preallocated array.
struct Channel<T> {
/// The head of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
/// represent the lap. The mark bit in the head is always zero.
///
/// Messages are popped from the head of the channel.
head: AtomicUsize,
/// The tail of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
/// represent the lap. The mark bit indicates that the channel is disconnected.
///
/// Messages are pushed into the tail of the channel.
tail: AtomicUsize,
/// The buffer holding slots.
buffer: *mut Slot<T>,
/// The channel capacity.
cap: usize,
/// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
one_lap: usize,
/// If this bit is set in the tail, that means either all senders were dropped or all receivers
/// were dropped.
mark_bit: usize,
/// Send operations waiting while the channel is full.
send_ops: Event,
/// Sink operations waiting while the channel is full.
sink_ops: Event,
/// TODO
handoff: Option<Event>,
/// Receive operations waiting while the channel is empty and not disconnected.
recv_ops: Event,
/// Stream operations while the channel is empty and not disconnected.
next_ops: Event,
/// The number of currently active `Sender`s.
sender_count: AtomicUsize,
/// The number of currently active `Receivers`s.
receiver_count: AtomicUsize,
/// Indicates that dropping a `Channel<T>` may drop values of type `T`.
_marker: PhantomData<T>,
}
impl<T> Unpin for Channel<T> {}
unsafe impl<T: Send> Send for Channel<T> {}
unsafe impl<T: Send> Sync for Channel<T> {}
impl<T> Channel<T> {
/// Creates a bounded channel of capacity `cap`.
fn with_capacity(cap: usize) -> Self {
let handoff = if cap == 0 { Some(Event::new()) } else { None };
let cap = cap.max(1);
// Compute constants `mark_bit` and `one_lap`.
let mark_bit = (cap + 1).next_power_of_two();
let one_lap = mark_bit * 2;
// Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
let head = 0;
// Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
let tail = 0;
// Allocate a buffer of `cap` slots.
let buffer = {
let mut v = Vec::<Slot<T>>::with_capacity(cap);
let ptr = v.as_mut_ptr();
mem::forget(v);
ptr
};
// Initialize stamps in the slots.
for i in 0..cap {
unsafe {
// Set the stamp to `{ lap: 0, mark: 0, index: i }`.
let slot = buffer.add(i);
ptr::write(&mut (*slot).stamp, AtomicUsize::new(i));
}
}
Channel {
buffer,
cap,
one_lap,
mark_bit,
head: AtomicUsize::new(head),
tail: AtomicUsize::new(tail),
send_ops: Event::new(),
sink_ops: Event::new(),
handoff,
recv_ops: Event::new(),
next_ops: Event::new(),
sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1),
_marker: PhantomData,
}
}
/// Attempts to send a message.
fn try_send(&self, msg: T) -> Result<usize, TrySendError<T>> {
let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);
loop {
// Extract mark bit from the tail and unset it.
//
// If the mark bit was set (which means all receivers have been dropped), we will still
// send the message into the channel if there is enough capacity. The message will get
// dropped when the channel is dropped (which means when all senders are also dropped).
let mark_bit = tail & self.mark_bit;
tail ^= mark_bit;
// Deconstruct the tail.
let index = tail & (self.mark_bit - 1);
let lap = tail & !(self.one_lap - 1);
// Inspect the corresponding slot.
let slot = unsafe { &*self.buffer.add(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
// If the tail and the stamp match, we may attempt to push.
if tail == stamp {
let new_tail = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};
// Try moving the tail.
match self.tail.compare_exchange_weak(
tail | mark_bit,
new_tail | mark_bit,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
// Write the message into the slot and update the stamp.
unsafe { slot.msg.get().write(msg) };
let stamp = tail + 1;
slot.stamp.store(stamp, Ordering::Release);
// Wake a blocked receive operation.
self.recv_ops.notify_one();
// Wake all blocked streams.
self.next_ops.notify_all();
return Ok(stamp);
}
Err(t) => {
tail = t;
backoff.spin();
}
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
full_fence();
let head = self.head.load(Ordering::Relaxed);
// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the channel is full.
// Check if the channel is disconnected.
if mark_bit != 0 {
return Err(TrySendError::Disconnected(msg));
} else {
return Err(TrySendError::Full(msg));
}
}
backoff.spin();
tail = self.tail.load(Ordering::Relaxed);
} else {
// Snooze because we need to wait for the stamp to get updated.
backoff.snooze();
tail = self.tail.load(Ordering::Relaxed);
}
}
}
async fn send(&self, mut msg: T) {
let mut listener = None;
let stamp = loop {
match self.try_send(msg) {
Ok(stamp) => break stamp,
Err(TrySendError::Disconnected(_)) => return future::pending().await,
Err(TrySendError::Full(m)) => msg = m,
}
match listener.take() {
None => listener = Some(self.send_ops.listen()),
Some(l) => {
l.await;
if self.cap > 1 {
self.send_ops.notify_one();
}
}
}
};
let listener = match &self.handoff {
None => return,
Some(h) => {
let slot_stamp = unsafe { &(*self.buffer).stamp };
if slot_stamp.load(Ordering::SeqCst) != stamp {
return;
}
let listener = h.listen();
if slot_stamp.load(Ordering::SeqCst) != stamp {
return;
}
listener
}
};
listener.await;
}
/// Attempts to receive a message.
fn try_recv(&self) -> Result<T, TryRecvError> {
let backoff = Backoff::new();
let mut head = self.head.load(Ordering::Relaxed);
loop {
// Deconstruct the head.
let index = head & (self.mark_bit - 1);
let lap = head & !(self.one_lap - 1);
// Inspect the corresponding slot.
let slot = unsafe { &*self.buffer.add(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
// If the the stamp is ahead of the head by 1, we may attempt to pop.
if head + 1 == stamp {
let new = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
head + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};
// Try moving the head.
match self.head.compare_exchange_weak(
head,
new,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
// Read the message from the slot and update the stamp.
let msg = unsafe { slot.msg.get().read() };
let stamp = head.wrapping_add(self.one_lap);
slot.stamp.store(stamp, Ordering::Release);
// Wake a blocked send operation.
self.send_ops.notify_one();
// Wake all blocked sinks.
self.sink_ops.notify_all();
// Notify send operations waiting for handoff.
if let Some(h) = &self.handoff {
h.notify_all();
}
return Ok(msg);
}
Err(h) => {
head = h;
backoff.spin();
}
}
} else if stamp == head {
full_fence();
let tail = self.tail.load(Ordering::Relaxed);
// If the tail equals the head, that means the channel is empty.
if (tail & !self.mark_bit) == head {
// If the channel is disconnected...
if tail & self.mark_bit != 0 {
return Err(TryRecvError::Disconnected);
} else {
// Otherwise, the receive operation is not ready.
return Err(TryRecvError::Empty);
}
}
backoff.spin();
head = self.head.load(Ordering::Relaxed);
} else {
// Snooze because we need to wait for the stamp to get updated.
backoff.snooze();
head = self.head.load(Ordering::Relaxed);
}
}
}
async fn recv(&self) -> Option<T> {
let mut listener = None;
loop {
match self.try_recv() {
Ok(msg) => return Some(msg),
Err(TryRecvError::Disconnected) => return None,
Err(TryRecvError::Empty) => {}
}
match listener.take() {
None => listener = Some(self.recv_ops.listen()),
Some(l) => {
l.await;
if self.cap > 1 {
self.recv_ops.notify_one();
}
}
}
}
}
/// Returns the current number of messages inside the channel.
fn len(&self) -> usize {
loop {
// Load the tail, then load the head.
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
// If the tail didn't change, we've got consistent values to work with.
if self.tail.load(Ordering::SeqCst) == tail {
let hix = head & (self.mark_bit - 1);
let tix = tail & (self.mark_bit - 1);
return if hix < tix {
tix - hix
} else if hix > tix {
self.cap - hix + tix
} else if (tail & !self.mark_bit) == head {
0
} else {
self.cap
};
}
}
}
/// Returns `true` if the channel is empty.
fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
// Is the tail equal to the head?
//
// Note: If the head changes just before we load the tail, that means there was a moment
// when the channel was not empty, so it is safe to just return `false`.
(tail & !self.mark_bit) == head
}
/// Returns `true` if the channel is full.
fn is_full(&self) -> bool {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
// Is the head lagging one lap behind tail?
//
// Note: If the tail changes just before we load the head, that means there was a moment
// when the channel was not full, so it is safe to just return `false`.
head.wrapping_add(self.one_lap) == tail & !self.mark_bit
}
/// Disconnects the channel and wakes up all blocked operations.
fn disconnect(&self) {
let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
if tail & self.mark_bit == 0 {
// Notify everyone blocked on this channel.
self.send_ops.notify_all();
self.sink_ops.notify_all();
self.recv_ops.notify_all();
self.next_ops.notify_all();
if let Some(h) = &self.handoff {
h.notify_all();
}
}
}
}
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
// Get the index of the head.
let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
// Loop over all slots that hold a message and drop them.
for i in 0..self.len() {
// Compute the index of the next slot holding a message.
let index = if hix + i < self.cap {
hix + i
} else {
hix + i - self.cap
};
unsafe {
self.buffer.add(index).drop_in_place();
}
}
// Finally, deallocate the buffer, but don't run any destructors.
unsafe {
Vec::from_raw_parts(self.buffer, 0, self.cap);
}
}
}
/// An error returned from the `try_send()` method.
enum TrySendError<T> {
/// The channel is full but not disconnected.
Full(T),
/// The channel is full and disconnected.
Disconnected(T),
}
/// An error returned from the `try_recv()` method.
enum TryRecvError {
/// The channel is empty but not disconnected.
Empty,
/// The channel is empty and disconnected.
Disconnected,
}
/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
#[inline]
fn full_fence() {
if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
// HACK(stjepang): On x86 architectures there are two different ways of executing
// a `SeqCst` fence.
//
// 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
// 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
//
// Both instructions have the effect of a full barrier, but empirical benchmarks have shown
// that the second one is sometimes a bit faster.
//
// The ideal solution here would be to use inline assembly, but we're instead creating a
// temporary atomic variable and compare-and-exchanging its value. No sane compiler to
// x86 platforms is going to optimize this away.
let a = AtomicUsize::new(0);
a.compare_and_swap(0, 1, Ordering::SeqCst);
} else {
atomic::fence(Ordering::SeqCst);
}
}