blob: 29ea9e93b2c418d716c28f021996f689d6f4ee4d [file] [log] [blame]
// Copyright 2015 Colin Sherratt
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
extern crate atom;
extern crate time;
use std::sync::atomic::{AtomicUsize};
use std::thread;
use std::mem;
use std::fmt;
use std::ops::Deref;
use std::sync::atomic::Ordering;
use std::cell::RefCell;
use atom::*;
use time::precise_time_s;
use fnbox::FnBox;
pub use select::{Select, SelectMap};
pub use barrier::Barrier;
mod select;
mod barrier;
mod fnbox;
/// Drop rules
/// This may be freed iff state is Signald | Dropped
/// and Waiting is Dropped
struct Inner {
state: AtomicUsize,
waiting: Atom<Box<Waiting>>
// TODO 64bit sized, probably does not matter now
const PULSED: usize = 0x8000_0000;
const TX_DROP: usize = 0x4000_0000;
const TX_FLAGS: usize = PULSED | TX_DROP;
const REF_COUNT: usize = !TX_FLAGS;
struct Waiting {
next: Option<Box<Waiting>>,
wake: Wake
impl GetNextMut for Box<Waiting> {
type NextPtr = Option<Box<Waiting>>;
fn get_next(&mut self) -> &mut Option<Box<Waiting>> {
enum Wake {
impl Waiting {
fn wake(s: Box<Self>, id: usize) {
let mut next = Some(s);
while let Some(s) = next {
// There must be a better way to do this...
let s = *s;
let Waiting { next: n, wake } = s;
next = n;
match wake {
Wake::Thread(thread) => thread.unpark(),
Wake::Select(select) => {
let trigger = {
let mut guard = select.0.lock().unwrap();
};|x| x.pulse());
Wake::Barrier(barrier) => {
let count = barrier.0.count.fetch_sub(1, Ordering::Relaxed);
if count == 1 {
let mut guard = barrier.0.trigger.lock().unwrap();
if let Some(t) = guard.take() {
Wake::Callback(cb) => cb.call_box(),
fn id(&self) -> usize {
unsafe { mem::transmute(self) }
fn thread() -> Box<Waiting> {
Box::new(Waiting {
next: None,
wake: Wake::Thread(thread::current())
fn select(handle: select::Handle) -> Box<Waiting> {
next: None,
wake: Wake::Select(handle)
fn barrier(handle: barrier::Handle) -> Box<Waiting> {
next: None,
wake: Wake::Barrier(handle)
fn callback<F>(cb: F) -> Box<Waiting> where F: FnOnce() + 'static {
next: None,
wake: Wake::Callback(Box::new(cb))
unsafe impl Send for Pulse {}
// This should be safe a pulse requires ownership to
// actually `pulse`
unsafe impl Sync for Pulse {}
/// A `Pulse` is represents an unfired signal. It is the tx side of Signal
/// A `Pulse` can only purpose it to be fired, and then it will be moved
/// as to never allow it to fire again. `Dropping` a pulse will `pulse`
/// The signal, but the signal will enter an error state.
pub struct Pulse {
inner: *mut Inner
impl fmt::Debug for Pulse {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let id: usize = unsafe { mem::transmute(self.inner) };
write!(f, "Pulse({:?})", id)
fn delete_inner(state: usize, inner: *mut Inner) {
if state & REF_COUNT == 1 {
let inner: Box<Inner> = unsafe {
impl Drop for Pulse {
fn drop(&mut self) {
let state = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(state, self.inner)
impl Pulse {
/// Create a Pulse from a usize. This is naturally unsafe.
pub unsafe fn cast_from_usize(ptr: usize) -> Pulse {
Pulse {
inner: mem::transmute(ptr)
/// Convert a trigger to a `usize`, This is unsafe
/// and it will kill your kittens if you are not careful
pub unsafe fn cast_to_usize(self) -> usize {
let us = mem::transmute(self.inner);
fn inner(&self) -> &Inner {
unsafe { mem::transmute(self.inner) }
fn set(&self, state: usize) -> usize {
self.inner().state.fetch_or(state, Ordering::Relaxed)
fn wake(&self) {
let id = unsafe { mem::transmute(self.inner) };
match self.inner().waiting.take() {
None => (),
Some(v) => Waiting::wake(v, id)
/// Pulse the `pulse` which will transition the `Signal` out from pending
/// to ready. This moves the pulse so that it can only be fired once.
pub fn pulse(self) {
let state = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(state, self.inner);
unsafe impl Send for Signal {}
// This should be safe a signal requires ownership to do anything
// the inner is all atomically modified data anyhow
unsafe impl Sync for Signal {}
/// A `Signal` represents listens for a `pulse` to occur in the system. A
/// `Signal` has one of three states. Pending, Pulsed, or Errored. Pending
/// means the pulse has not fired, but still exists. Pulsed meaning the
/// pulse has fired, and no longer exists. Errored means the pulse was dropped
/// without firing. This normally means a programming error of some sort.
pub struct Signal {
inner: *mut Inner
impl fmt::Debug for Signal {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Signal(id={:?}, pending={:?})",, self.is_pending())
impl Clone for Signal {
fn clone(&self) -> Signal {
self.inner().state.fetch_add(1, Ordering::Relaxed);
Signal { inner: self.inner }
impl Drop for Signal {
fn drop(&mut self) {
let flag = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(flag, self.inner);
impl Signal {
/// Create a Signal and a Pulse that are associated.
pub fn new() -> (Signal, Pulse) {
let inner = Box::new(Inner {
state: AtomicUsize::new(2),
waiting: Atom::empty()
let inner = unsafe {mem::transmute(inner)};
(Signal {
inner: inner
Pulse {
inner: inner,
/// Create a signal that is already pulsed
pub fn pulsed() -> Signal {
let inner = Box::new(Inner {
state: AtomicUsize::new(1 | PULSED),
waiting: Atom::empty()
let inner = unsafe {mem::transmute(inner)};
Signal { inner: inner }
fn inner(&self) -> &Inner {
unsafe { mem::transmute(self.inner) }
/// Read out the state of the Signal
pub fn state(&self) -> SignalState {
let flags = self.inner().state.load(Ordering::Relaxed);
match (flags & TX_DROP == TX_DROP, flags & PULSED == PULSED) {
(_, true) => SignalState::Pulsed,
(true, _) => SignalState::Dropped,
(_, _) => SignalState::Pending
/// Check to see if the signal is pending. A signal
pub fn is_pending(&self) -> bool {
self.state() == SignalState::Pending
/// Add a waiter to a waitlist
fn add_to_waitlist(&self, waiter: Box<Waiting>) -> usize {
let id =;
if !self.is_pending() {
return id;
// if armed fire now
if !self.is_pending() {
if let Some(t) = self.inner().waiting.take() {
/// Remove Waiter with `id` from the waitlist
fn remove_from_waitlist(&self, id: usize) {
let mut wl = self.inner().waiting.take();
while let Some(mut w) = wl {
let next =;
if != id {
wl = next;
/// Arm a pulse to wake
fn arm(self, waiter: Box<Waiting>) -> ArmedSignal {
let id = self.add_to_waitlist(waiter);
ArmedSignal {
id: id,
pulse: self
/// This is a unique id that can be used to identify the signal from others
/// See `Select` for how this api is useful.
pub fn id(&self) -> usize {
unsafe { mem::transmute_copy(&self.inner) }
/// Block the current thread until a `pulse` is ready.
/// This will block indefinably if the pulse never fires.
pub fn wait(self) -> Result<(), WaitError> {
match self.state() {
SignalState::Pulsed => Ok(()),
SignalState::Dropped => Err(WaitError::Dropped),
SignalState::Pending => {
let s = take_scheduler().expect("no scheduler found");
let res = s.wait(self);
/// Block until either the pulse is sent, or the timeout is reached
pub fn wait_timeout_ms(self, ms: u32) -> Result<(), TimeoutError> {
SCHED.with(|sched| {
let s = sched.borrow_mut().take().expect("Waited while: no scheduler installed");
let res = s.wait_timeout_ms(self, ms);
*sched.borrow_mut() = Some(s);
pub fn callback<F>(self, cb: F) where F: FnOnce() + 'static {
/// Described the possible states of a Signal
#[derive(Debug, PartialEq, Eq)]
pub enum SignalState {
impl IntoRawPtr for Pulse {
unsafe fn into_raw(self) -> *mut () {
let inner = self.inner;
impl FromRawPtr for Pulse {
unsafe fn from_raw(ptr: *mut ()) -> Pulse {
Pulse { inner: mem::transmute(ptr) }
impl IntoRawPtr for Signal {
unsafe fn into_raw(self) -> *mut () {
let inner = self.inner;
impl FromRawPtr for Signal {
unsafe fn from_raw(ptr: *mut ()) -> Signal {
Signal { inner: mem::transmute(ptr) }
/// Represents the possible errors that can occur on a `Signal`
#[derive(Debug, PartialEq, Eq)]
pub enum WaitError {
/// The `Pulse` was dropped before it could `Pulse`
/// Represents the possible errors from a wait timeout
#[derive(Debug, PartialEq, Eq)]
pub enum TimeoutError {
/// A `WaitError` has occurred
/// The `Signal` timed-out before a `Pulse` was observed.
struct ArmedSignal {
pulse: Signal,
id: usize
impl Deref for ArmedSignal {
type Target = Signal;
fn deref(&self) -> &Signal { &self.pulse }
impl ArmedSignal {
fn disarm(self) -> Signal {
/// allows an object to assert a wait signal
pub trait Signals {
/// Get a signal from a object
fn signal(&self) -> Signal;
/// Block the current thread until the object
/// assets a pulse.
fn wait(&self) -> Result<(), WaitError> {
let signal = self.signal();
/// Block the current thread until the object
/// assets a pulse. Or until the timeout has been asserted.
fn wait_timeout_ms(&self, ms: u32) -> Result<(), TimeoutError> {
let signal = self.signal();
/// This is the hook into the async wait methods provided
/// by `pulse`. It is required for the user to override
/// the current system scheduler.
pub trait Scheduler: std::fmt::Debug {
/// Wait until the signal is made `ready` or `errored`
fn wait(&self, signal: Signal) -> Result<(), WaitError>;
/// Wait until the signal is made `ready` or `errored` or the
/// timeout has been reached.
fn wait_timeout_ms(&self, signal: Signal, timeout: u32) -> Result<(), TimeoutError>;
/// This is the `default` system scheduler that is used if no
/// user provided scheduler is installed. It is very basic
/// and will block the OS thread using `thread::park`
pub struct ThreadScheduler;
impl Scheduler for ThreadScheduler {
fn wait(&self, signal: Signal) -> Result<(), WaitError> {
loop {
let id = signal.add_to_waitlist(Waiting::thread());
if signal.is_pending() {
match signal.state() {
SignalState::Pending => (),
SignalState::Pulsed => return Ok(()),
SignalState::Dropped => return Err(WaitError::Dropped)
fn wait_timeout_ms(&self, signal: Signal, ms: u32) -> Result<(), TimeoutError> {
let mut now = (precise_time_s() * 1000.) as u64;
let end = now + ms as u64;
loop {
let id = signal.add_to_waitlist(Waiting::thread());
if signal.is_pending() {
now = (precise_time_s() * 1000.) as u64;
if now > end {
return Err(TimeoutError::Timeout)
thread::park_timeout_ms((end - now) as u32);
match signal.state() {
SignalState::Pending => (),
SignalState::Pulsed => return Ok(()),
SignalState::Dropped => return Err(TimeoutError::Error(WaitError::Dropped))
/// The TLS scheduler
thread_local!(static SCHED: RefCell<Option<Box<Scheduler>>> = RefCell::new(Some(Box::new(ThreadScheduler))));
// this is inline never to avoid the SCHED pointer being cached
fn take_scheduler() -> Option<Box<Scheduler>> {
use std::mem;
let mut sched = None;
SCHED.with(|s| mem::swap(&mut *s.borrow_mut(), &mut sched));
/// Replace the current Scheduler with your own supplied scheduler.
/// all `wait()` commands will be run through this scheduler now.
/// This will return the current TLS scheduler, which may be useful
/// to restore it later.
pub fn swap_scheduler(sched: Box<Scheduler>) -> Option<Box<Scheduler>> {
use std::mem;
let mut sched = Some(sched);
SCHED.with(|s| mem::swap(&mut *s.borrow_mut(), &mut sched));
/// Call the suppled closure using the supplied schedulee
pub fn with_scheduler<F>(f: F, sched: Box<Scheduler>) -> Option<Box<Scheduler>> where F: FnOnce() {
let old = swap_scheduler(sched);
old.and_then(|o| swap_scheduler(o))