| use self::Blocker::*; |
| /// Synchronous channels/ports |
| /// |
| /// This channel implementation differs significantly from the asynchronous |
| /// implementations found next to it (oneshot/stream/share). This is an |
| /// implementation of a synchronous, bounded buffer channel. |
| /// |
| /// Each channel is created with some amount of backing buffer, and sends will |
| /// *block* until buffer space becomes available. A buffer size of 0 is valid, |
| /// which means that every successful send is paired with a successful recv. |
| /// |
| /// This flavor of channels defines a new `send_opt` method for channels which |
| /// is the method by which a message is sent but the thread does not panic if it |
| /// cannot be delivered. |
| /// |
| /// Another major difference is that send() will *always* return back the data |
| /// if it couldn't be sent. This is because it is deterministically known when |
| /// the data is received and when it is not received. |
| /// |
| /// Implementation-wise, it can all be summed up with "use a mutex plus some |
| /// logic". The mutex used here is an OS native mutex, meaning that no user code |
| /// is run inside of the mutex (to prevent context switching). This |
| /// implementation shares almost all code for the buffered and unbuffered cases |
| /// of a synchronous channel. There are a few branches for the unbuffered case, |
| /// but they're mostly just relevant to blocking senders. |
| pub use self::Failure::*; |
| |
| use core::intrinsics::abort; |
| use core::mem; |
| use core::ptr; |
| |
| use crate::sync::atomic::{AtomicUsize, Ordering}; |
| use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken}; |
| use crate::sync::{Mutex, MutexGuard}; |
| use crate::time::Instant; |
| |
| const MAX_REFCOUNT: usize = (isize::MAX) as usize; |
| |
| pub struct Packet<T> { |
| /// Only field outside of the mutex. Just done for kicks, but mainly because |
| /// the other shared channel already had the code implemented |
| channels: AtomicUsize, |
| |
| lock: Mutex<State<T>>, |
| } |
| |
| unsafe impl<T: Send> Send for Packet<T> {} |
| |
| unsafe impl<T: Send> Sync for Packet<T> {} |
| |
| struct State<T> { |
| disconnected: bool, // Is the channel disconnected yet? |
| queue: Queue, // queue of senders waiting to send data |
| blocker: Blocker, // currently blocked thread on this channel |
| buf: Buffer<T>, // storage for buffered messages |
| cap: usize, // capacity of this channel |
| |
| /// A curious flag used to indicate whether a sender failed or succeeded in |
| /// blocking. This is used to transmit information back to the thread that it |
| /// must dequeue its message from the buffer because it was not received. |
| /// This is only relevant in the 0-buffer case. This obviously cannot be |
| /// safely constructed, but it's guaranteed to always have a valid pointer |
| /// value. |
| canceled: Option<&'static mut bool>, |
| } |
| |
| unsafe impl<T: Send> Send for State<T> {} |
| |
| /// Possible flavors of threads who can be blocked on this channel. |
| enum Blocker { |
| BlockedSender(SignalToken), |
| BlockedReceiver(SignalToken), |
| NoneBlocked, |
| } |
| |
| /// Simple queue for threading threads together. Nodes are stack-allocated, so |
| /// this structure is not safe at all |
| struct Queue { |
| head: *mut Node, |
| tail: *mut Node, |
| } |
| |
| struct Node { |
| token: Option<SignalToken>, |
| next: *mut Node, |
| } |
| |
| unsafe impl Send for Node {} |
| |
| /// A simple ring-buffer |
| struct Buffer<T> { |
| buf: Vec<Option<T>>, |
| start: usize, |
| size: usize, |
| } |
| |
| #[derive(Debug)] |
| pub enum Failure { |
| Empty, |
| Disconnected, |
| } |
| |
| /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock` |
| /// in the meantime. This re-locks the mutex upon returning. |
| fn wait<'a, 'b, T>( |
| lock: &'a Mutex<State<T>>, |
| mut guard: MutexGuard<'b, State<T>>, |
| f: fn(SignalToken) -> Blocker, |
| ) -> MutexGuard<'a, State<T>> { |
| let (wait_token, signal_token) = blocking::tokens(); |
| match mem::replace(&mut guard.blocker, f(signal_token)) { |
| NoneBlocked => {} |
| _ => unreachable!(), |
| } |
| drop(guard); // unlock |
| wait_token.wait(); // block |
| lock.lock().unwrap() // relock |
| } |
| |
| /// Same as wait, but waiting at most until `deadline`. |
| fn wait_timeout_receiver<'a, 'b, T>( |
| lock: &'a Mutex<State<T>>, |
| deadline: Instant, |
| mut guard: MutexGuard<'b, State<T>>, |
| success: &mut bool, |
| ) -> MutexGuard<'a, State<T>> { |
| let (wait_token, signal_token) = blocking::tokens(); |
| match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) { |
| NoneBlocked => {} |
| _ => unreachable!(), |
| } |
| drop(guard); // unlock |
| *success = wait_token.wait_max_until(deadline); // block |
| let mut new_guard = lock.lock().unwrap(); // relock |
| if !*success { |
| abort_selection(&mut new_guard); |
| } |
| new_guard |
| } |
| |
| fn abort_selection<T>(guard: &mut MutexGuard<'_, State<T>>) -> bool { |
| match mem::replace(&mut guard.blocker, NoneBlocked) { |
| NoneBlocked => true, |
| BlockedSender(token) => { |
| guard.blocker = BlockedSender(token); |
| true |
| } |
| BlockedReceiver(token) => { |
| drop(token); |
| false |
| } |
| } |
| } |
| |
| /// Wakes up a thread, dropping the lock at the correct time |
| fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) { |
| // We need to be careful to wake up the waiting thread *outside* of the mutex |
| // in case it incurs a context switch. |
| drop(guard); |
| token.signal(); |
| } |
| |
| impl<T> Packet<T> { |
| pub fn new(capacity: usize) -> Packet<T> { |
| Packet { |
| channels: AtomicUsize::new(1), |
| lock: Mutex::new(State { |
| disconnected: false, |
| blocker: NoneBlocked, |
| cap: capacity, |
| canceled: None, |
| queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() }, |
| buf: Buffer { |
| buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(), |
| start: 0, |
| size: 0, |
| }, |
| }), |
| } |
| } |
| |
| // wait until a send slot is available, returning locked access to |
| // the channel state. |
| fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> { |
| let mut node = Node { token: None, next: ptr::null_mut() }; |
| loop { |
| let mut guard = self.lock.lock().unwrap(); |
| // are we ready to go? |
| if guard.disconnected || guard.buf.size() < guard.buf.capacity() { |
| return guard; |
| } |
| // no room; actually block |
| let wait_token = guard.queue.enqueue(&mut node); |
| drop(guard); |
| wait_token.wait(); |
| } |
| } |
| |
| pub fn send(&self, t: T) -> Result<(), T> { |
| let mut guard = self.acquire_send_slot(); |
| if guard.disconnected { |
| return Err(t); |
| } |
| guard.buf.enqueue(t); |
| |
| match mem::replace(&mut guard.blocker, NoneBlocked) { |
| // if our capacity is 0, then we need to wait for a receiver to be |
| // available to take our data. After waiting, we check again to make |
| // sure the port didn't go away in the meantime. If it did, we need |
| // to hand back our data. |
| NoneBlocked if guard.cap == 0 => { |
| let mut canceled = false; |
| assert!(guard.canceled.is_none()); |
| guard.canceled = Some(unsafe { mem::transmute(&mut canceled) }); |
| let mut guard = wait(&self.lock, guard, BlockedSender); |
| if canceled { Err(guard.buf.dequeue()) } else { Ok(()) } |
| } |
| |
| // success, we buffered some data |
| NoneBlocked => Ok(()), |
| |
| // success, someone's about to receive our buffered data. |
| BlockedReceiver(token) => { |
| wakeup(token, guard); |
| Ok(()) |
| } |
| |
| BlockedSender(..) => panic!("lolwut"), |
| } |
| } |
| |
| pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> { |
| let mut guard = self.lock.lock().unwrap(); |
| if guard.disconnected { |
| Err(super::TrySendError::Disconnected(t)) |
| } else if guard.buf.size() == guard.buf.capacity() { |
| Err(super::TrySendError::Full(t)) |
| } else if guard.cap == 0 { |
| // With capacity 0, even though we have buffer space we can't |
| // transfer the data unless there's a receiver waiting. |
| match mem::replace(&mut guard.blocker, NoneBlocked) { |
| NoneBlocked => Err(super::TrySendError::Full(t)), |
| BlockedSender(..) => unreachable!(), |
| BlockedReceiver(token) => { |
| guard.buf.enqueue(t); |
| wakeup(token, guard); |
| Ok(()) |
| } |
| } |
| } else { |
| // If the buffer has some space and the capacity isn't 0, then we |
| // just enqueue the data for later retrieval, ensuring to wake up |
| // any blocked receiver if there is one. |
| assert!(guard.buf.size() < guard.buf.capacity()); |
| guard.buf.enqueue(t); |
| match mem::replace(&mut guard.blocker, NoneBlocked) { |
| BlockedReceiver(token) => wakeup(token, guard), |
| NoneBlocked => {} |
| BlockedSender(..) => unreachable!(), |
| } |
| Ok(()) |
| } |
| } |
| |
| // Receives a message from this channel |
| // |
| // When reading this, remember that there can only ever be one receiver at |
| // time. |
| pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> { |
| let mut guard = self.lock.lock().unwrap(); |
| |
| let mut woke_up_after_waiting = false; |
| // Wait for the buffer to have something in it. No need for a |
| // while loop because we're the only receiver. |
| if !guard.disconnected && guard.buf.size() == 0 { |
| if let Some(deadline) = deadline { |
| guard = |
| wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting); |
| } else { |
| guard = wait(&self.lock, guard, BlockedReceiver); |
| woke_up_after_waiting = true; |
| } |
| } |
| |
| // N.B., channel could be disconnected while waiting, so the order of |
| // these conditionals is important. |
| if guard.disconnected && guard.buf.size() == 0 { |
| return Err(Disconnected); |
| } |
| |
| // Pick up the data, wake up our neighbors, and carry on |
| assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting)); |
| |
| if guard.buf.size() == 0 { |
| return Err(Empty); |
| } |
| |
| let ret = guard.buf.dequeue(); |
| self.wakeup_senders(woke_up_after_waiting, guard); |
| Ok(ret) |
| } |
| |
| pub fn try_recv(&self) -> Result<T, Failure> { |
| let mut guard = self.lock.lock().unwrap(); |
| |
| // Easy cases first |
| if guard.disconnected && guard.buf.size() == 0 { |
| return Err(Disconnected); |
| } |
| if guard.buf.size() == 0 { |
| return Err(Empty); |
| } |
| |
| // Be sure to wake up neighbors |
| let ret = Ok(guard.buf.dequeue()); |
| self.wakeup_senders(false, guard); |
| ret |
| } |
| |
| // Wake up pending senders after some data has been received |
| // |
| // * `waited` - flag if the receiver blocked to receive some data, or if it |
| // just picked up some data on the way out |
| // * `guard` - the lock guard that is held over this channel's lock |
| fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) { |
| let pending_sender1: Option<SignalToken> = guard.queue.dequeue(); |
| |
| // If this is a no-buffer channel (cap == 0), then if we didn't wait we |
| // need to ACK the sender. If we waited, then the sender waking us up |
| // was already the ACK. |
| let pending_sender2 = if guard.cap == 0 && !waited { |
| match mem::replace(&mut guard.blocker, NoneBlocked) { |
| NoneBlocked => None, |
| BlockedReceiver(..) => unreachable!(), |
| BlockedSender(token) => { |
| guard.canceled.take(); |
| Some(token) |
| } |
| } |
| } else { |
| None |
| }; |
| mem::drop(guard); |
| |
| // only outside of the lock do we wake up the pending threads |
| if let Some(token) = pending_sender1 { |
| token.signal(); |
| } |
| if let Some(token) = pending_sender2 { |
| token.signal(); |
| } |
| } |
| |
| // Prepares this shared packet for a channel clone, essentially just bumping |
| // a refcount. |
| pub fn clone_chan(&self) { |
| let old_count = self.channels.fetch_add(1, Ordering::SeqCst); |
| |
| // See comments on Arc::clone() on why we do this (for `mem::forget`). |
| if old_count > MAX_REFCOUNT { |
| abort(); |
| } |
| } |
| |
| pub fn drop_chan(&self) { |
| // Only flag the channel as disconnected if we're the last channel |
| match self.channels.fetch_sub(1, Ordering::SeqCst) { |
| 1 => {} |
| _ => return, |
| } |
| |
| // Not much to do other than wake up a receiver if one's there |
| let mut guard = self.lock.lock().unwrap(); |
| if guard.disconnected { |
| return; |
| } |
| guard.disconnected = true; |
| match mem::replace(&mut guard.blocker, NoneBlocked) { |
| NoneBlocked => {} |
| BlockedSender(..) => unreachable!(), |
| BlockedReceiver(token) => wakeup(token, guard), |
| } |
| } |
| |
| pub fn drop_port(&self) { |
| let mut guard = self.lock.lock().unwrap(); |
| |
| if guard.disconnected { |
| return; |
| } |
| guard.disconnected = true; |
| |
| // If the capacity is 0, then the sender may want its data back after |
| // we're disconnected. Otherwise it's now our responsibility to destroy |
| // the buffered data. As with many other portions of this code, this |
| // needs to be careful to destroy the data *outside* of the lock to |
| // prevent deadlock. |
| let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() }; |
| let mut queue = |
| mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() }); |
| |
| let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) { |
| NoneBlocked => None, |
| BlockedSender(token) => { |
| *guard.canceled.take().unwrap() = true; |
| Some(token) |
| } |
| BlockedReceiver(..) => unreachable!(), |
| }; |
| mem::drop(guard); |
| |
| while let Some(token) = queue.dequeue() { |
| token.signal(); |
| } |
| if let Some(token) = waiter { |
| token.signal(); |
| } |
| } |
| } |
| |
| impl<T> Drop for Packet<T> { |
| fn drop(&mut self) { |
| assert_eq!(self.channels.load(Ordering::SeqCst), 0); |
| let mut guard = self.lock.lock().unwrap(); |
| assert!(guard.queue.dequeue().is_none()); |
| assert!(guard.canceled.is_none()); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Buffer, a simple ring buffer backed by Vec<T> |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| impl<T> Buffer<T> { |
| fn enqueue(&mut self, t: T) { |
| let pos = (self.start + self.size) % self.buf.len(); |
| self.size += 1; |
| let prev = mem::replace(&mut self.buf[pos], Some(t)); |
| assert!(prev.is_none()); |
| } |
| |
| fn dequeue(&mut self) -> T { |
| let start = self.start; |
| self.size -= 1; |
| self.start = (self.start + 1) % self.buf.len(); |
| let result = &mut self.buf[start]; |
| result.take().unwrap() |
| } |
| |
| fn size(&self) -> usize { |
| self.size |
| } |
| fn capacity(&self) -> usize { |
| self.buf.len() |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| // Queue, a simple queue to enqueue threads with (stack-allocated nodes) |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| impl Queue { |
| fn enqueue(&mut self, node: &mut Node) -> WaitToken { |
| let (wait_token, signal_token) = blocking::tokens(); |
| node.token = Some(signal_token); |
| node.next = ptr::null_mut(); |
| |
| if self.tail.is_null() { |
| self.head = node as *mut Node; |
| self.tail = node as *mut Node; |
| } else { |
| unsafe { |
| (*self.tail).next = node as *mut Node; |
| self.tail = node as *mut Node; |
| } |
| } |
| |
| wait_token |
| } |
| |
| fn dequeue(&mut self) -> Option<SignalToken> { |
| if self.head.is_null() { |
| return None; |
| } |
| let node = self.head; |
| self.head = unsafe { (*node).next }; |
| if self.head.is_null() { |
| self.tail = ptr::null_mut(); |
| } |
| unsafe { |
| (*node).next = ptr::null_mut(); |
| Some((*node).token.take().unwrap()) |
| } |
| } |
| } |