|  | //   Copyright 2015 Colin Sherratt | 
|  | // | 
|  | //   Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | //   you may not use this file except in compliance with the License. | 
|  | //   You may obtain a copy of the License at | 
|  | // | 
|  | //       http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | //   Unless required by applicable law or agreed to in writing, software | 
|  | //   distributed under the License is distributed on an "AS IS" BASIS, | 
|  | //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | //   See the License for the specific language governing permissions and | 
|  | //   limitations under the License. | 
|  |  | 
|  | extern crate atom; | 
|  | extern crate time; | 
|  |  | 
|  | use std::sync::atomic::{AtomicUsize}; | 
|  | use std::thread; | 
|  | use std::mem; | 
|  | use std::fmt; | 
|  | use std::ops::Deref; | 
|  | use std::sync::atomic::Ordering; | 
|  | use std::cell::RefCell; | 
|  |  | 
|  | use atom::*; | 
|  | use time::precise_time_s; | 
|  | use fnbox::FnBox; | 
|  |  | 
|  | pub use select::{Select, SelectMap}; | 
|  | pub use barrier::Barrier; | 
|  | mod select; | 
|  | mod barrier; | 
|  | mod fnbox; | 
|  |  | 
|  | /// Drop rules | 
|  | /// This may be freed iff state is Signald | Dropped | 
|  | /// and Waiting is Dropped | 
|  | struct Inner { | 
|  | state: AtomicUsize, | 
|  | waiting: Atom<Box<Waiting>> | 
|  | } | 
|  |  | 
|  | // TODO 64bit sized, probably does not matter now | 
|  | const PULSED: usize = 0x8000_0000; | 
|  | const TX_DROP: usize = 0x4000_0000; | 
|  | const TX_FLAGS: usize = PULSED | TX_DROP; | 
|  | const REF_COUNT: usize = !TX_FLAGS; | 
|  |  | 
|  | struct Waiting { | 
|  | next: Option<Box<Waiting>>, | 
|  | wake: Wake | 
|  | } | 
|  |  | 
|  | impl GetNextMut for Box<Waiting> { | 
|  | type NextPtr = Option<Box<Waiting>>; | 
|  |  | 
|  | fn get_next(&mut self) -> &mut Option<Box<Waiting>> { | 
|  | &mut self.next | 
|  | } | 
|  | } | 
|  |  | 
|  | enum Wake { | 
|  | Thread(thread::Thread), | 
|  | Select(select::Handle), | 
|  | Barrier(barrier::Handle), | 
|  | Callback(Box<FnBox>) | 
|  | } | 
|  |  | 
|  | impl Waiting { | 
|  | fn wake(s: Box<Self>, id: usize) { | 
|  | let mut next = Some(s); | 
|  | while let Some(s) = next { | 
|  | // There must be a better way to do this... | 
|  | let s = *s; | 
|  | let Waiting { next: n, wake } = s; | 
|  | next = n; | 
|  | match wake { | 
|  | Wake::Thread(thread) => thread.unpark(), | 
|  | Wake::Select(select) => { | 
|  | let trigger = { | 
|  | let mut guard = select.0.lock().unwrap(); | 
|  | guard.ready.push(id); | 
|  | guard.trigger.take() | 
|  | }; | 
|  | trigger.map(|x| x.pulse()); | 
|  | } | 
|  | Wake::Barrier(barrier) => { | 
|  | let count = barrier.0.count.fetch_sub(1, Ordering::Relaxed); | 
|  | if count == 1 { | 
|  | let mut guard = barrier.0.trigger.lock().unwrap(); | 
|  | if let Some(t) = guard.take() { | 
|  | t.pulse(); | 
|  | } | 
|  | } | 
|  | } | 
|  | Wake::Callback(cb) => cb.call_box(), | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | fn id(&self) -> usize { | 
|  | unsafe { mem::transmute(self) } | 
|  | } | 
|  |  | 
|  | fn thread() -> Box<Waiting> { | 
|  | Box::new(Waiting { | 
|  | next: None, | 
|  | wake: Wake::Thread(thread::current()) | 
|  | }) | 
|  | } | 
|  |  | 
|  | fn select(handle: select::Handle) -> Box<Waiting> { | 
|  | Box::new(Waiting{ | 
|  | next: None, | 
|  | wake: Wake::Select(handle) | 
|  | }) | 
|  | } | 
|  |  | 
|  | fn barrier(handle: barrier::Handle) -> Box<Waiting> { | 
|  | Box::new(Waiting{ | 
|  | next: None, | 
|  | wake: Wake::Barrier(handle) | 
|  | }) | 
|  | } | 
|  |  | 
|  | fn callback<F>(cb: F) -> Box<Waiting> where F: FnOnce() + 'static { | 
|  | Box::new(Waiting{ | 
|  | next: None, | 
|  | wake: Wake::Callback(Box::new(cb)) | 
|  | }) | 
|  | } | 
|  | } | 
|  |  | 
|  | unsafe impl Send for Pulse {} | 
|  | // This should be safe a pulse requires ownership to | 
|  | // actually `pulse` | 
|  | unsafe impl Sync for Pulse {} | 
|  |  | 
|  | /// A `Pulse` is represents an unfired signal. It is the tx side of Signal | 
|  | /// A `Pulse` can only purpose it to be fired, and then it will be moved | 
|  | /// as to never allow it to fire again. `Dropping` a pulse will `pulse` | 
|  | /// The signal, but the signal will enter an error state. | 
|  | pub struct Pulse { | 
|  | inner: *mut Inner | 
|  | } | 
|  |  | 
|  | impl fmt::Debug for Pulse { | 
|  | fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { | 
|  | let id: usize = unsafe { mem::transmute(self.inner) }; | 
|  | write!(f, "Pulse({:?})", id) | 
|  | } | 
|  | } | 
|  |  | 
|  | fn delete_inner(state: usize, inner: *mut Inner) { | 
|  | if state & REF_COUNT == 1 { | 
|  | let inner: Box<Inner> = unsafe { | 
|  | mem::transmute(inner) | 
|  | }; | 
|  | drop(inner); | 
|  | } | 
|  | } | 
|  |  | 
|  | impl Drop for Pulse { | 
|  | fn drop(&mut self) { | 
|  | self.set(TX_DROP); | 
|  | self.wake(); | 
|  | let state = self.inner().state.fetch_sub(1, Ordering::Relaxed); | 
|  | delete_inner(state, self.inner) | 
|  | } | 
|  | } | 
|  |  | 
|  | impl Pulse { | 
|  | /// Create a Pulse from a usize. This is naturally unsafe. | 
|  | #[inline] | 
|  | pub unsafe fn cast_from_usize(ptr: usize) -> Pulse { | 
|  | Pulse { | 
|  | inner: mem::transmute(ptr) | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Convert a trigger to a `usize`, This is unsafe | 
|  | /// and it will kill your kittens if you are not careful | 
|  | #[inline] | 
|  | pub unsafe fn cast_to_usize(self) -> usize { | 
|  | let us = mem::transmute(self.inner); | 
|  | mem::forget(self); | 
|  | us | 
|  | } | 
|  |  | 
|  | #[inline] | 
|  | fn inner(&self) -> &Inner { | 
|  | unsafe { mem::transmute(self.inner) } | 
|  | } | 
|  |  | 
|  | #[inline] | 
|  | fn set(&self, state: usize) -> usize { | 
|  | self.inner().state.fetch_or(state, Ordering::Relaxed) | 
|  | } | 
|  |  | 
|  | #[inline] | 
|  | fn wake(&self) { | 
|  | let id = unsafe { mem::transmute(self.inner) }; | 
|  | match self.inner().waiting.take() { | 
|  | None => (), | 
|  | Some(v) => Waiting::wake(v, id) | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Pulse the `pulse` which will transition the `Signal` out from pending | 
|  | /// to ready. This moves the pulse so that it can only be fired once. | 
|  | #[inline] | 
|  | pub fn pulse(self) { | 
|  | self.set(PULSED); | 
|  | self.wake(); | 
|  |  | 
|  | let state = self.inner().state.fetch_sub(1, Ordering::Relaxed); | 
|  | delete_inner(state, self.inner); | 
|  | mem::forget(self) | 
|  | } | 
|  | } | 
|  |  | 
|  |  | 
|  | unsafe impl Send for Signal {} | 
|  | // This should be safe a signal requires ownership to do anything | 
|  | // the inner is all atomically modified data anyhow | 
|  | unsafe impl Sync for Signal {} | 
|  |  | 
|  | /// A `Signal` represents listens for a `pulse` to occur in the system. A | 
|  | /// `Signal` has one of three states. Pending, Pulsed, or Errored. Pending | 
|  | /// means the pulse has not fired, but still exists. Pulsed meaning the | 
|  | /// pulse has fired, and no longer exists. Errored means the pulse was dropped | 
|  | /// without firing. This normally means a programming error of some sort. | 
|  | pub struct Signal { | 
|  | inner: *mut Inner | 
|  | } | 
|  |  | 
|  | impl fmt::Debug for Signal { | 
|  | fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { | 
|  | write!(f, "Signal(id={:?}, pending={:?})", self.id(), self.is_pending()) | 
|  | } | 
|  | } | 
|  |  | 
|  | impl Clone for Signal { | 
|  | #[inline(always)] | 
|  | fn clone(&self) -> Signal { | 
|  | self.inner().state.fetch_add(1, Ordering::Relaxed); | 
|  | Signal { inner: self.inner } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl Drop for Signal { | 
|  | #[inline] | 
|  | fn drop(&mut self) { | 
|  | let flag = self.inner().state.fetch_sub(1, Ordering::Relaxed); | 
|  | delete_inner(flag, self.inner); | 
|  | } | 
|  | } | 
|  |  | 
|  | impl Signal { | 
|  | /// Create a Signal and a Pulse that are associated. | 
|  | pub fn new() -> (Signal, Pulse) { | 
|  | let inner = Box::new(Inner { | 
|  | state: AtomicUsize::new(2), | 
|  | waiting: Atom::empty() | 
|  | }); | 
|  |  | 
|  | let inner = unsafe {mem::transmute(inner)}; | 
|  |  | 
|  | (Signal { | 
|  | inner: inner | 
|  | }, | 
|  | Pulse { | 
|  | inner: inner, | 
|  | }) | 
|  | } | 
|  |  | 
|  | /// Create a signal that is already pulsed | 
|  | pub fn pulsed() -> Signal { | 
|  | let inner = Box::new(Inner { | 
|  | state: AtomicUsize::new(1 | PULSED), | 
|  | waiting: Atom::empty() | 
|  | }); | 
|  |  | 
|  | let inner = unsafe {mem::transmute(inner)}; | 
|  |  | 
|  | Signal { inner: inner } | 
|  | } | 
|  |  | 
|  | #[inline] | 
|  | fn inner(&self) -> &Inner { | 
|  | unsafe { mem::transmute(self.inner) } | 
|  | } | 
|  |  | 
|  | /// Read out the state of the Signal | 
|  | #[inline] | 
|  | pub fn state(&self) -> SignalState { | 
|  | let flags = self.inner().state.load(Ordering::Relaxed); | 
|  | match (flags & TX_DROP == TX_DROP, flags & PULSED == PULSED) { | 
|  | (_, true) => SignalState::Pulsed, | 
|  | (true, _) => SignalState::Dropped, | 
|  | (_, _) => SignalState::Pending | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Check to see if the signal is pending. A signal | 
|  | #[inline] | 
|  | pub fn is_pending(&self) -> bool { | 
|  | self.state() == SignalState::Pending | 
|  | } | 
|  |  | 
|  | /// Add a waiter to a waitlist | 
|  | fn add_to_waitlist(&self, waiter: Box<Waiting>) -> usize { | 
|  | let id = waiter.id(); | 
|  |  | 
|  | if !self.is_pending() { | 
|  | Waiting::wake(waiter, self.id()); | 
|  | return id; | 
|  | } | 
|  |  | 
|  | self.inner().waiting.replace_and_set_next(waiter); | 
|  |  | 
|  | // if armed fire now | 
|  | if !self.is_pending() { | 
|  | if let Some(t) = self.inner().waiting.take() { | 
|  | Waiting::wake(t, self.id()); | 
|  | } | 
|  | } | 
|  | id | 
|  | } | 
|  |  | 
|  | /// Remove Waiter with `id` from the waitlist | 
|  | fn remove_from_waitlist(&self, id: usize) { | 
|  | let mut wl = self.inner().waiting.take(); | 
|  | while let Some(mut w) = wl { | 
|  | let next = w.next.take(); | 
|  | if w.id() != id { | 
|  | self.add_to_waitlist(w); | 
|  | } | 
|  | wl = next; | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Arm a pulse to wake | 
|  | fn arm(self, waiter: Box<Waiting>) -> ArmedSignal { | 
|  | let id = self.add_to_waitlist(waiter); | 
|  | ArmedSignal { | 
|  | id: id, | 
|  | pulse: self | 
|  | } | 
|  | } | 
|  |  | 
|  | /// This is a unique id that can be used to identify the signal from others | 
|  | /// See `Select` for how this api is useful. | 
|  | pub fn id(&self) -> usize { | 
|  | unsafe { mem::transmute_copy(&self.inner) } | 
|  | } | 
|  |  | 
|  | /// Block the current thread until a `pulse` is ready. | 
|  | /// This will block indefinably if the pulse never fires. | 
|  | #[inline] | 
|  | pub fn wait(self) -> Result<(), WaitError> { | 
|  | match self.state() { | 
|  | SignalState::Pulsed => Ok(()), | 
|  | SignalState::Dropped => Err(WaitError::Dropped), | 
|  | SignalState::Pending => { | 
|  | let s = take_scheduler().expect("no scheduler found"); | 
|  | let res = s.wait(self); | 
|  | swap_scheduler(s); | 
|  | res | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Block until either the pulse is sent, or the timeout is reached | 
|  | pub fn wait_timeout_ms(self, ms: u32) -> Result<(), TimeoutError> { | 
|  | SCHED.with(|sched| { | 
|  | let s = sched.borrow_mut().take().expect("Waited while: no scheduler installed"); | 
|  | let res = s.wait_timeout_ms(self, ms); | 
|  | *sched.borrow_mut() = Some(s); | 
|  | res | 
|  | }) | 
|  | } | 
|  |  | 
|  | pub fn callback<F>(self, cb: F) where F: FnOnce() + 'static { | 
|  | self.add_to_waitlist(Waiting::callback(cb)); | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Described the possible states of a Signal | 
|  | #[derive(Debug, PartialEq, Eq)] | 
|  | pub enum SignalState { | 
|  | Pending, | 
|  | Pulsed, | 
|  | Dropped | 
|  | } | 
|  |  | 
|  | impl IntoRawPtr for Pulse { | 
|  | #[inline(always)] | 
|  | unsafe fn into_raw(self) -> *mut () { | 
|  | let inner = self.inner; | 
|  | mem::forget(self); | 
|  | mem::transmute(inner) | 
|  | } | 
|  | } | 
|  |  | 
|  | impl FromRawPtr for Pulse { | 
|  | #[inline(always)] | 
|  | unsafe fn from_raw(ptr: *mut ()) -> Pulse { | 
|  | Pulse { inner: mem::transmute(ptr) } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl IntoRawPtr for Signal { | 
|  | #[inline(always)] | 
|  | unsafe fn into_raw(self) -> *mut () { | 
|  | let inner = self.inner; | 
|  | mem::forget(self); | 
|  | mem::transmute(inner) | 
|  | } | 
|  | } | 
|  |  | 
|  | impl FromRawPtr for Signal { | 
|  | #[inline(always)] | 
|  | unsafe fn from_raw(ptr: *mut ()) -> Signal { | 
|  | Signal { inner: mem::transmute(ptr) } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Represents the possible errors that can occur on a `Signal` | 
|  | #[derive(Debug, PartialEq, Eq)] | 
|  | pub enum WaitError { | 
|  | /// The `Pulse` was dropped before it could `Pulse` | 
|  | Dropped | 
|  | } | 
|  |  | 
|  | /// Represents the possible errors from a wait timeout | 
|  | #[derive(Debug, PartialEq, Eq)] | 
|  | pub enum TimeoutError { | 
|  | /// A `WaitError` has occurred | 
|  | Error(WaitError), | 
|  | /// The `Signal` timed-out before a `Pulse` was observed. | 
|  | Timeout | 
|  | } | 
|  |  | 
|  | struct ArmedSignal { | 
|  | pulse: Signal, | 
|  | id: usize | 
|  | } | 
|  |  | 
|  | impl Deref for ArmedSignal { | 
|  | type Target = Signal; | 
|  |  | 
|  | fn deref(&self) -> &Signal { &self.pulse } | 
|  | } | 
|  |  | 
|  | impl ArmedSignal { | 
|  | fn disarm(self) -> Signal { | 
|  | self.remove_from_waitlist(self.id); | 
|  | self.pulse | 
|  | } | 
|  | } | 
|  |  | 
|  | /// allows an object to assert a wait signal | 
|  | pub trait Signals { | 
|  | /// Get a signal from a object | 
|  | fn signal(&self) -> Signal; | 
|  |  | 
|  | /// Block the current thread until the object | 
|  | /// assets a pulse. | 
|  | fn wait(&self) -> Result<(), WaitError> { | 
|  | let signal = self.signal(); | 
|  | signal.wait() | 
|  | } | 
|  |  | 
|  | /// Block the current thread until the object | 
|  | /// assets a pulse. Or until the timeout has been asserted. | 
|  | fn wait_timeout_ms(&self, ms: u32) -> Result<(), TimeoutError> { | 
|  | let signal = self.signal(); | 
|  | signal.wait_timeout_ms(ms) | 
|  | } | 
|  | } | 
|  |  | 
|  | /// This is the hook into the async wait methods provided | 
|  | /// by `pulse`. It is required for the user to override | 
|  | /// the current system scheduler. | 
|  | pub trait Scheduler: std::fmt::Debug { | 
|  | /// Wait until the signal is made `ready` or `errored` | 
|  | fn wait(&self, signal: Signal) -> Result<(), WaitError>; | 
|  |  | 
|  | /// Wait until the signal is made `ready` or `errored` or the | 
|  | /// timeout has been reached. | 
|  | fn wait_timeout_ms(&self, signal: Signal, timeout: u32) -> Result<(), TimeoutError>; | 
|  | } | 
|  |  | 
|  | /// This is the `default` system scheduler that is used if no | 
|  | /// user provided scheduler is installed. It is very basic | 
|  | /// and will block the OS thread using `thread::park` | 
|  | #[derive(Debug)] | 
|  | pub struct ThreadScheduler; | 
|  |  | 
|  | impl Scheduler for ThreadScheduler { | 
|  | fn wait(&self, signal: Signal) -> Result<(), WaitError> { | 
|  | loop { | 
|  | let id = signal.add_to_waitlist(Waiting::thread()); | 
|  | if signal.is_pending() { | 
|  | thread::park(); | 
|  | } | 
|  | signal.remove_from_waitlist(id); | 
|  |  | 
|  | match signal.state() { | 
|  | SignalState::Pending => (), | 
|  | SignalState::Pulsed => return Ok(()), | 
|  | SignalState::Dropped => return Err(WaitError::Dropped) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | fn wait_timeout_ms(&self, signal: Signal, ms: u32) -> Result<(), TimeoutError> { | 
|  | let mut now = (precise_time_s() * 1000.) as u64; | 
|  | let end = now + ms as u64; | 
|  |  | 
|  | loop { | 
|  | let id = signal.add_to_waitlist(Waiting::thread()); | 
|  | if signal.is_pending() { | 
|  | now = (precise_time_s() * 1000.) as u64; | 
|  | if now > end { | 
|  | return Err(TimeoutError::Timeout) | 
|  | } | 
|  | thread::park_timeout_ms((end - now) as u32); | 
|  | } | 
|  | signal.remove_from_waitlist(id); | 
|  |  | 
|  | match signal.state() { | 
|  | SignalState::Pending => (), | 
|  | SignalState::Pulsed => return Ok(()), | 
|  | SignalState::Dropped => return Err(TimeoutError::Error(WaitError::Dropped)) | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// The TLS scheduler | 
|  | thread_local!(static SCHED: RefCell<Option<Box<Scheduler>>> = RefCell::new(Some(Box::new(ThreadScheduler)))); | 
|  |  | 
|  | // this is inline never to avoid the SCHED pointer being cached | 
|  | #[inline(never)] | 
|  | fn take_scheduler() -> Option<Box<Scheduler>> { | 
|  | use std::mem; | 
|  | let mut sched = None; | 
|  | SCHED.with(|s| mem::swap(&mut *s.borrow_mut(), &mut sched)); | 
|  | sched | 
|  | } | 
|  |  | 
|  | /// Replace the current Scheduler with your own supplied scheduler. | 
|  | /// all `wait()` commands will be run through this scheduler now. | 
|  | /// | 
|  | /// This will return the current TLS scheduler, which may be useful | 
|  | /// to restore it later. | 
|  | #[inline(never)] | 
|  | pub fn swap_scheduler(sched: Box<Scheduler>) -> Option<Box<Scheduler>> { | 
|  | use std::mem; | 
|  | let mut sched = Some(sched); | 
|  | SCHED.with(|s| mem::swap(&mut *s.borrow_mut(), &mut sched)); | 
|  | sched | 
|  | } | 
|  |  | 
|  | /// Call the suppled closure using the supplied schedulee | 
|  | pub fn with_scheduler<F>(f: F, sched: Box<Scheduler>) -> Option<Box<Scheduler>> where F: FnOnce() { | 
|  | let old = swap_scheduler(sched); | 
|  | f(); | 
|  | old.and_then(|o| swap_scheduler(o)) | 
|  | } |