| // Copyright 2016 Amanieu d'Antras |
| // |
| // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or |
| // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or |
| // http://opensource.org/licenses/MIT>, at your option. This file may not be |
| // copied, modified, or distributed except according to those terms. |
| |
| use crate::spinwait::SpinWait; |
| use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT}; |
| use core::{ |
| cell::Cell, |
| mem, ptr, |
| sync::atomic::{fence, AtomicUsize, Ordering}, |
| }; |
| |
| struct ThreadData { |
| parker: ThreadParker, |
| |
| // Linked list of threads in the queue. The queue is split into two parts: |
| // the processed part and the unprocessed part. When new nodes are added to |
| // the list, they only have the next pointer set, and queue_tail is null. |
| // |
| // Nodes are processed with the queue lock held, which consists of setting |
| // the prev pointer for each node and setting the queue_tail pointer on the |
| // first processed node of the list. |
| // |
| // This setup allows nodes to be added to the queue without a lock, while |
| // still allowing O(1) removal of nodes from the processed part of the list. |
| // The only cost is the O(n) processing, but this only needs to be done |
| // once for each node, and therefore isn't too expensive. |
| queue_tail: Cell<*const ThreadData>, |
| prev: Cell<*const ThreadData>, |
| next: Cell<*const ThreadData>, |
| } |
| |
| impl ThreadData { |
| #[inline] |
| fn new() -> ThreadData { |
| assert!(mem::align_of::<ThreadData>() > !QUEUE_MASK); |
| ThreadData { |
| parker: ThreadParker::new(), |
| queue_tail: Cell::new(ptr::null()), |
| prev: Cell::new(ptr::null()), |
| next: Cell::new(ptr::null()), |
| } |
| } |
| } |
| |
| // Invokes the given closure with a reference to the current thread `ThreadData`. |
| #[inline] |
| fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T { |
| let mut thread_data_ptr = ptr::null(); |
| // If ThreadData is expensive to construct, then we want to use a cached |
| // version in thread-local storage if possible. |
| if !ThreadParker::IS_CHEAP_TO_CONSTRUCT { |
| thread_local!(static THREAD_DATA: ThreadData = ThreadData::new()); |
| if let Ok(tls_thread_data) = THREAD_DATA.try_with(|x| x as *const ThreadData) { |
| thread_data_ptr = tls_thread_data; |
| } |
| } |
| // Otherwise just create a ThreadData on the stack |
| let mut thread_data_storage = None; |
| if thread_data_ptr.is_null() { |
| thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new); |
| } |
| |
| f(unsafe { &*thread_data_ptr }) |
| } |
| |
| const LOCKED_BIT: usize = 1; |
| const QUEUE_LOCKED_BIT: usize = 2; |
| const QUEUE_MASK: usize = !3; |
| |
| // Word-sized lock that is used to implement the parking_lot API. Since this |
| // can't use parking_lot, it instead manages its own queue of waiting threads. |
| pub struct WordLock { |
| state: AtomicUsize, |
| } |
| |
| impl WordLock { |
| /// Returns a new, unlocked, WordLock. |
| pub const fn new() -> Self { |
| WordLock { |
| state: AtomicUsize::new(0), |
| } |
| } |
| |
| #[inline] |
| pub fn lock(&self) { |
| if self |
| .state |
| .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed) |
| .is_ok() |
| { |
| return; |
| } |
| self.lock_slow(); |
| } |
| |
| /// Must not be called on an already unlocked `WordLock`! |
| #[inline] |
| pub unsafe fn unlock(&self) { |
| let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release); |
| if state.is_queue_locked() || state.queue_head().is_null() { |
| return; |
| } |
| self.unlock_slow(); |
| } |
| |
| #[cold] |
| fn lock_slow(&self) { |
| let mut spinwait = SpinWait::new(); |
| let mut state = self.state.load(Ordering::Relaxed); |
| loop { |
| // Grab the lock if it isn't locked, even if there is a queue on it |
| if !state.is_locked() { |
| match self.state.compare_exchange_weak( |
| state, |
| state | LOCKED_BIT, |
| Ordering::Acquire, |
| Ordering::Relaxed, |
| ) { |
| Ok(_) => return, |
| Err(x) => state = x, |
| } |
| continue; |
| } |
| |
| // If there is no queue, try spinning a few times |
| if state.queue_head().is_null() && spinwait.spin() { |
| state = self.state.load(Ordering::Relaxed); |
| continue; |
| } |
| |
| // Get our thread data and prepare it for parking |
| state = with_thread_data(|thread_data| { |
| // The pthread implementation is still unsafe, so we need to surround `prepare_park` |
| // with `unsafe {}`. |
| #[allow(unused_unsafe)] |
| unsafe { |
| thread_data.parker.prepare_park(); |
| } |
| |
| // Add our thread to the front of the queue |
| let queue_head = state.queue_head(); |
| if queue_head.is_null() { |
| thread_data.queue_tail.set(thread_data); |
| thread_data.prev.set(ptr::null()); |
| } else { |
| thread_data.queue_tail.set(ptr::null()); |
| thread_data.prev.set(ptr::null()); |
| thread_data.next.set(queue_head); |
| } |
| if let Err(x) = self.state.compare_exchange_weak( |
| state, |
| state.with_queue_head(thread_data), |
| Ordering::Release, |
| Ordering::Relaxed, |
| ) { |
| return x; |
| } |
| |
| // Sleep until we are woken up by an unlock |
| // Ignoring unused unsafe, since it's only a few platforms where this is unsafe. |
| #[allow(unused_unsafe)] |
| unsafe { |
| thread_data.parker.park(); |
| } |
| |
| // Loop back and try locking again |
| spinwait.reset(); |
| self.state.load(Ordering::Relaxed) |
| }); |
| } |
| } |
| |
| #[cold] |
| fn unlock_slow(&self) { |
| let mut state = self.state.load(Ordering::Relaxed); |
| loop { |
| // We just unlocked the WordLock. Just check if there is a thread |
| // to wake up. If the queue is locked then another thread is already |
| // taking care of waking up a thread. |
| if state.is_queue_locked() || state.queue_head().is_null() { |
| return; |
| } |
| |
| // Try to grab the queue lock |
| match self.state.compare_exchange_weak( |
| state, |
| state | QUEUE_LOCKED_BIT, |
| Ordering::Acquire, |
| Ordering::Relaxed, |
| ) { |
| Ok(_) => break, |
| Err(x) => state = x, |
| } |
| } |
| |
| // Now we have the queue lock and the queue is non-empty |
| 'outer: loop { |
| // First, we need to fill in the prev pointers for any newly added |
| // threads. We do this until we reach a node that we previously |
| // processed, which has a non-null queue_tail pointer. |
| let queue_head = state.queue_head(); |
| let mut queue_tail; |
| let mut current = queue_head; |
| loop { |
| queue_tail = unsafe { (*current).queue_tail.get() }; |
| if !queue_tail.is_null() { |
| break; |
| } |
| unsafe { |
| let next = (*current).next.get(); |
| (*next).prev.set(current); |
| current = next; |
| } |
| } |
| |
| // Set queue_tail on the queue head to indicate that the whole list |
| // has prev pointers set correctly. |
| unsafe { |
| (*queue_head).queue_tail.set(queue_tail); |
| } |
| |
| // If the WordLock is locked, then there is no point waking up a |
| // thread now. Instead we let the next unlocker take care of waking |
| // up a thread. |
| if state.is_locked() { |
| match self.state.compare_exchange_weak( |
| state, |
| state & !QUEUE_LOCKED_BIT, |
| Ordering::Release, |
| Ordering::Relaxed, |
| ) { |
| Ok(_) => return, |
| Err(x) => state = x, |
| } |
| |
| // Need an acquire fence before reading the new queue |
| fence(Ordering::Acquire); |
| continue; |
| } |
| |
| // Remove the last thread from the queue and unlock the queue |
| let new_tail = unsafe { (*queue_tail).prev.get() }; |
| if new_tail.is_null() { |
| loop { |
| match self.state.compare_exchange_weak( |
| state, |
| state & LOCKED_BIT, |
| Ordering::Release, |
| Ordering::Relaxed, |
| ) { |
| Ok(_) => break, |
| Err(x) => state = x, |
| } |
| |
| // If the compare_exchange failed because a new thread was |
| // added to the queue then we need to re-scan the queue to |
| // find the previous element. |
| if state.queue_head().is_null() { |
| continue; |
| } else { |
| // Need an acquire fence before reading the new queue |
| fence(Ordering::Acquire); |
| continue 'outer; |
| } |
| } |
| } else { |
| unsafe { |
| (*queue_head).queue_tail.set(new_tail); |
| } |
| self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release); |
| } |
| |
| // Finally, wake up the thread we removed from the queue. Note that |
| // we don't need to worry about any races here since the thread is |
| // guaranteed to be sleeping right now and we are the only one who |
| // can wake it up. |
| unsafe { |
| (*queue_tail).parker.unpark_lock().unpark(); |
| } |
| break; |
| } |
| } |
| } |
| |
| trait LockState { |
| fn is_locked(self) -> bool; |
| fn is_queue_locked(self) -> bool; |
| fn queue_head(self) -> *const ThreadData; |
| fn with_queue_head(self, thread_data: *const ThreadData) -> Self; |
| } |
| |
| impl LockState for usize { |
| #[inline] |
| fn is_locked(self) -> bool { |
| self & LOCKED_BIT != 0 |
| } |
| |
| #[inline] |
| fn is_queue_locked(self) -> bool { |
| self & QUEUE_LOCKED_BIT != 0 |
| } |
| |
| #[inline] |
| fn queue_head(self) -> *const ThreadData { |
| (self & QUEUE_MASK) as *const ThreadData |
| } |
| |
| #[inline] |
| fn with_queue_head(self, thread_data: *const ThreadData) -> Self { |
| (self & !QUEUE_MASK) | thread_data as *const _ as usize |
| } |
| } |