blob: 29ea9e93b2c418d716c28f021996f689d6f4ee4d [file] [log] [blame]
// Copyright 2015 Colin Sherratt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
extern crate atom;
extern crate time;
use std::sync::atomic::{AtomicUsize};
use std::thread;
use std::mem;
use std::fmt;
use std::ops::Deref;
use std::sync::atomic::Ordering;
use std::cell::RefCell;
use atom::*;
use time::precise_time_s;
use fnbox::FnBox;
pub use select::{Select, SelectMap};
pub use barrier::Barrier;
mod select;
mod barrier;
mod fnbox;
/// Drop rules
/// This may be freed iff state is Signald | Dropped
/// and Waiting is Dropped
struct Inner {
state: AtomicUsize,
waiting: Atom<Box<Waiting>>
}
// TODO 64bit sized, probably does not matter now
const PULSED: usize = 0x8000_0000;
const TX_DROP: usize = 0x4000_0000;
const TX_FLAGS: usize = PULSED | TX_DROP;
const REF_COUNT: usize = !TX_FLAGS;
struct Waiting {
next: Option<Box<Waiting>>,
wake: Wake
}
impl GetNextMut for Box<Waiting> {
type NextPtr = Option<Box<Waiting>>;
fn get_next(&mut self) -> &mut Option<Box<Waiting>> {
&mut self.next
}
}
enum Wake {
Thread(thread::Thread),
Select(select::Handle),
Barrier(barrier::Handle),
Callback(Box<FnBox>)
}
impl Waiting {
fn wake(s: Box<Self>, id: usize) {
let mut next = Some(s);
while let Some(s) = next {
// There must be a better way to do this...
let s = *s;
let Waiting { next: n, wake } = s;
next = n;
match wake {
Wake::Thread(thread) => thread.unpark(),
Wake::Select(select) => {
let trigger = {
let mut guard = select.0.lock().unwrap();
guard.ready.push(id);
guard.trigger.take()
};
trigger.map(|x| x.pulse());
}
Wake::Barrier(barrier) => {
let count = barrier.0.count.fetch_sub(1, Ordering::Relaxed);
if count == 1 {
let mut guard = barrier.0.trigger.lock().unwrap();
if let Some(t) = guard.take() {
t.pulse();
}
}
}
Wake::Callback(cb) => cb.call_box(),
}
}
}
fn id(&self) -> usize {
unsafe { mem::transmute(self) }
}
fn thread() -> Box<Waiting> {
Box::new(Waiting {
next: None,
wake: Wake::Thread(thread::current())
})
}
fn select(handle: select::Handle) -> Box<Waiting> {
Box::new(Waiting{
next: None,
wake: Wake::Select(handle)
})
}
fn barrier(handle: barrier::Handle) -> Box<Waiting> {
Box::new(Waiting{
next: None,
wake: Wake::Barrier(handle)
})
}
fn callback<F>(cb: F) -> Box<Waiting> where F: FnOnce() + 'static {
Box::new(Waiting{
next: None,
wake: Wake::Callback(Box::new(cb))
})
}
}
unsafe impl Send for Pulse {}
// This should be safe a pulse requires ownership to
// actually `pulse`
unsafe impl Sync for Pulse {}
/// A `Pulse` is represents an unfired signal. It is the tx side of Signal
/// A `Pulse` can only purpose it to be fired, and then it will be moved
/// as to never allow it to fire again. `Dropping` a pulse will `pulse`
/// The signal, but the signal will enter an error state.
pub struct Pulse {
inner: *mut Inner
}
impl fmt::Debug for Pulse {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let id: usize = unsafe { mem::transmute(self.inner) };
write!(f, "Pulse({:?})", id)
}
}
fn delete_inner(state: usize, inner: *mut Inner) {
if state & REF_COUNT == 1 {
let inner: Box<Inner> = unsafe {
mem::transmute(inner)
};
drop(inner);
}
}
impl Drop for Pulse {
fn drop(&mut self) {
self.set(TX_DROP);
self.wake();
let state = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(state, self.inner)
}
}
impl Pulse {
/// Create a Pulse from a usize. This is naturally unsafe.
#[inline]
pub unsafe fn cast_from_usize(ptr: usize) -> Pulse {
Pulse {
inner: mem::transmute(ptr)
}
}
/// Convert a trigger to a `usize`, This is unsafe
/// and it will kill your kittens if you are not careful
#[inline]
pub unsafe fn cast_to_usize(self) -> usize {
let us = mem::transmute(self.inner);
mem::forget(self);
us
}
#[inline]
fn inner(&self) -> &Inner {
unsafe { mem::transmute(self.inner) }
}
#[inline]
fn set(&self, state: usize) -> usize {
self.inner().state.fetch_or(state, Ordering::Relaxed)
}
#[inline]
fn wake(&self) {
let id = unsafe { mem::transmute(self.inner) };
match self.inner().waiting.take() {
None => (),
Some(v) => Waiting::wake(v, id)
}
}
/// Pulse the `pulse` which will transition the `Signal` out from pending
/// to ready. This moves the pulse so that it can only be fired once.
#[inline]
pub fn pulse(self) {
self.set(PULSED);
self.wake();
let state = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(state, self.inner);
mem::forget(self)
}
}
unsafe impl Send for Signal {}
// This should be safe a signal requires ownership to do anything
// the inner is all atomically modified data anyhow
unsafe impl Sync for Signal {}
/// A `Signal` represents listens for a `pulse` to occur in the system. A
/// `Signal` has one of three states. Pending, Pulsed, or Errored. Pending
/// means the pulse has not fired, but still exists. Pulsed meaning the
/// pulse has fired, and no longer exists. Errored means the pulse was dropped
/// without firing. This normally means a programming error of some sort.
pub struct Signal {
inner: *mut Inner
}
impl fmt::Debug for Signal {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Signal(id={:?}, pending={:?})", self.id(), self.is_pending())
}
}
impl Clone for Signal {
#[inline(always)]
fn clone(&self) -> Signal {
self.inner().state.fetch_add(1, Ordering::Relaxed);
Signal { inner: self.inner }
}
}
impl Drop for Signal {
#[inline]
fn drop(&mut self) {
let flag = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(flag, self.inner);
}
}
impl Signal {
/// Create a Signal and a Pulse that are associated.
pub fn new() -> (Signal, Pulse) {
let inner = Box::new(Inner {
state: AtomicUsize::new(2),
waiting: Atom::empty()
});
let inner = unsafe {mem::transmute(inner)};
(Signal {
inner: inner
},
Pulse {
inner: inner,
})
}
/// Create a signal that is already pulsed
pub fn pulsed() -> Signal {
let inner = Box::new(Inner {
state: AtomicUsize::new(1 | PULSED),
waiting: Atom::empty()
});
let inner = unsafe {mem::transmute(inner)};
Signal { inner: inner }
}
#[inline]
fn inner(&self) -> &Inner {
unsafe { mem::transmute(self.inner) }
}
/// Read out the state of the Signal
#[inline]
pub fn state(&self) -> SignalState {
let flags = self.inner().state.load(Ordering::Relaxed);
match (flags & TX_DROP == TX_DROP, flags & PULSED == PULSED) {
(_, true) => SignalState::Pulsed,
(true, _) => SignalState::Dropped,
(_, _) => SignalState::Pending
}
}
/// Check to see if the signal is pending. A signal
#[inline]
pub fn is_pending(&self) -> bool {
self.state() == SignalState::Pending
}
/// Add a waiter to a waitlist
fn add_to_waitlist(&self, waiter: Box<Waiting>) -> usize {
let id = waiter.id();
if !self.is_pending() {
Waiting::wake(waiter, self.id());
return id;
}
self.inner().waiting.replace_and_set_next(waiter);
// if armed fire now
if !self.is_pending() {
if let Some(t) = self.inner().waiting.take() {
Waiting::wake(t, self.id());
}
}
id
}
/// Remove Waiter with `id` from the waitlist
fn remove_from_waitlist(&self, id: usize) {
let mut wl = self.inner().waiting.take();
while let Some(mut w) = wl {
let next = w.next.take();
if w.id() != id {
self.add_to_waitlist(w);
}
wl = next;
}
}
/// Arm a pulse to wake
fn arm(self, waiter: Box<Waiting>) -> ArmedSignal {
let id = self.add_to_waitlist(waiter);
ArmedSignal {
id: id,
pulse: self
}
}
/// This is a unique id that can be used to identify the signal from others
/// See `Select` for how this api is useful.
pub fn id(&self) -> usize {
unsafe { mem::transmute_copy(&self.inner) }
}
/// Block the current thread until a `pulse` is ready.
/// This will block indefinably if the pulse never fires.
#[inline]
pub fn wait(self) -> Result<(), WaitError> {
match self.state() {
SignalState::Pulsed => Ok(()),
SignalState::Dropped => Err(WaitError::Dropped),
SignalState::Pending => {
let s = take_scheduler().expect("no scheduler found");
let res = s.wait(self);
swap_scheduler(s);
res
}
}
}
/// Block until either the pulse is sent, or the timeout is reached
pub fn wait_timeout_ms(self, ms: u32) -> Result<(), TimeoutError> {
SCHED.with(|sched| {
let s = sched.borrow_mut().take().expect("Waited while: no scheduler installed");
let res = s.wait_timeout_ms(self, ms);
*sched.borrow_mut() = Some(s);
res
})
}
pub fn callback<F>(self, cb: F) where F: FnOnce() + 'static {
self.add_to_waitlist(Waiting::callback(cb));
}
}
/// Described the possible states of a Signal
#[derive(Debug, PartialEq, Eq)]
pub enum SignalState {
Pending,
Pulsed,
Dropped
}
impl IntoRawPtr for Pulse {
#[inline(always)]
unsafe fn into_raw(self) -> *mut () {
let inner = self.inner;
mem::forget(self);
mem::transmute(inner)
}
}
impl FromRawPtr for Pulse {
#[inline(always)]
unsafe fn from_raw(ptr: *mut ()) -> Pulse {
Pulse { inner: mem::transmute(ptr) }
}
}
impl IntoRawPtr for Signal {
#[inline(always)]
unsafe fn into_raw(self) -> *mut () {
let inner = self.inner;
mem::forget(self);
mem::transmute(inner)
}
}
impl FromRawPtr for Signal {
#[inline(always)]
unsafe fn from_raw(ptr: *mut ()) -> Signal {
Signal { inner: mem::transmute(ptr) }
}
}
/// Represents the possible errors that can occur on a `Signal`
#[derive(Debug, PartialEq, Eq)]
pub enum WaitError {
/// The `Pulse` was dropped before it could `Pulse`
Dropped
}
/// Represents the possible errors from a wait timeout
#[derive(Debug, PartialEq, Eq)]
pub enum TimeoutError {
/// A `WaitError` has occurred
Error(WaitError),
/// The `Signal` timed-out before a `Pulse` was observed.
Timeout
}
struct ArmedSignal {
pulse: Signal,
id: usize
}
impl Deref for ArmedSignal {
type Target = Signal;
fn deref(&self) -> &Signal { &self.pulse }
}
impl ArmedSignal {
fn disarm(self) -> Signal {
self.remove_from_waitlist(self.id);
self.pulse
}
}
/// allows an object to assert a wait signal
pub trait Signals {
/// Get a signal from a object
fn signal(&self) -> Signal;
/// Block the current thread until the object
/// assets a pulse.
fn wait(&self) -> Result<(), WaitError> {
let signal = self.signal();
signal.wait()
}
/// Block the current thread until the object
/// assets a pulse. Or until the timeout has been asserted.
fn wait_timeout_ms(&self, ms: u32) -> Result<(), TimeoutError> {
let signal = self.signal();
signal.wait_timeout_ms(ms)
}
}
/// This is the hook into the async wait methods provided
/// by `pulse`. It is required for the user to override
/// the current system scheduler.
pub trait Scheduler: std::fmt::Debug {
/// Wait until the signal is made `ready` or `errored`
fn wait(&self, signal: Signal) -> Result<(), WaitError>;
/// Wait until the signal is made `ready` or `errored` or the
/// timeout has been reached.
fn wait_timeout_ms(&self, signal: Signal, timeout: u32) -> Result<(), TimeoutError>;
}
/// This is the `default` system scheduler that is used if no
/// user provided scheduler is installed. It is very basic
/// and will block the OS thread using `thread::park`
#[derive(Debug)]
pub struct ThreadScheduler;
impl Scheduler for ThreadScheduler {
fn wait(&self, signal: Signal) -> Result<(), WaitError> {
loop {
let id = signal.add_to_waitlist(Waiting::thread());
if signal.is_pending() {
thread::park();
}
signal.remove_from_waitlist(id);
match signal.state() {
SignalState::Pending => (),
SignalState::Pulsed => return Ok(()),
SignalState::Dropped => return Err(WaitError::Dropped)
}
}
}
fn wait_timeout_ms(&self, signal: Signal, ms: u32) -> Result<(), TimeoutError> {
let mut now = (precise_time_s() * 1000.) as u64;
let end = now + ms as u64;
loop {
let id = signal.add_to_waitlist(Waiting::thread());
if signal.is_pending() {
now = (precise_time_s() * 1000.) as u64;
if now > end {
return Err(TimeoutError::Timeout)
}
thread::park_timeout_ms((end - now) as u32);
}
signal.remove_from_waitlist(id);
match signal.state() {
SignalState::Pending => (),
SignalState::Pulsed => return Ok(()),
SignalState::Dropped => return Err(TimeoutError::Error(WaitError::Dropped))
}
}
}
}
/// The TLS scheduler
thread_local!(static SCHED: RefCell<Option<Box<Scheduler>>> = RefCell::new(Some(Box::new(ThreadScheduler))));
// this is inline never to avoid the SCHED pointer being cached
#[inline(never)]
fn take_scheduler() -> Option<Box<Scheduler>> {
use std::mem;
let mut sched = None;
SCHED.with(|s| mem::swap(&mut *s.borrow_mut(), &mut sched));
sched
}
/// Replace the current Scheduler with your own supplied scheduler.
/// all `wait()` commands will be run through this scheduler now.
///
/// This will return the current TLS scheduler, which may be useful
/// to restore it later.
#[inline(never)]
pub fn swap_scheduler(sched: Box<Scheduler>) -> Option<Box<Scheduler>> {
use std::mem;
let mut sched = Some(sched);
SCHED.with(|s| mem::swap(&mut *s.borrow_mut(), &mut sched));
sched
}
/// Call the suppled closure using the supplied schedulee
pub fn with_scheduler<F>(f: F, sched: Box<Scheduler>) -> Option<Box<Scheduler>> where F: FnOnce() {
let old = swap_scheduler(sched);
f();
old.and_then(|o| swap_scheduler(o))
}