blob: 1f7fe820bf86a37c7edd5dbdd240afabce4df954 [file] [log] [blame]
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::v1::*;
use sync::atomic::{AtomicUsize, Ordering};
use sync::{mutex, MutexGuard, PoisonError};
use sys_common::condvar as sys;
use sys_common::mutex as sys_mutex;
use sys_common::poison::{self, LockResult};
use time::{Instant, Duration};
/// A type indicating whether a timed wait on a condition variable returned
/// due to a time out or not.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
#[stable(feature = "wait_timeout", since = "1.5.0")]
pub struct WaitTimeoutResult(bool);
impl WaitTimeoutResult {
/// Returns whether the wait was known to have timed out.
#[stable(feature = "wait_timeout", since = "1.5.0")]
pub fn timed_out(&self) -> bool {
self.0
}
}
/// A Condition Variable
///
/// Condition variables represent the ability to block a thread such that it
/// consumes no CPU time while waiting for an event to occur. Condition
/// variables are typically associated with a boolean predicate (a condition)
/// and a mutex. The predicate is always verified inside of the mutex before
/// determining that thread must block.
///
/// Functions in this module will block the current **thread** of execution and
/// are bindings to system-provided condition variables where possible. Note
/// that this module places one additional restriction over the system condition
/// variables: each condvar can be used with precisely one mutex at runtime. Any
/// attempt to use multiple mutexes on the same condition variable will result
/// in a runtime panic. If this is not desired, then the unsafe primitives in
/// `sys` do not have this restriction but may result in undefined behavior.
///
/// # Examples
///
/// ```
/// use std::sync::{Arc, Mutex, Condvar};
/// use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = pair.clone();
///
/// // Inside of our lock, spawn a new thread, and then wait for it to start
/// thread::spawn(move|| {
/// let &(ref lock, ref cvar) = &*pair2;
/// let mut started = lock.lock().unwrap();
/// *started = true;
/// cvar.notify_one();
/// });
///
/// // wait for the thread to start up
/// let &(ref lock, ref cvar) = &*pair;
/// let mut started = lock.lock().unwrap();
/// while !*started {
/// started = cvar.wait(started).unwrap();
/// }
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct Condvar { inner: Box<StaticCondvar> }
/// Statically allocated condition variables.
///
/// This structure is identical to `Condvar` except that it is suitable for use
/// in static initializers for other structures.
///
/// # Examples
///
/// ```
/// #![feature(static_condvar)]
///
/// use std::sync::{StaticCondvar, CONDVAR_INIT};
///
/// static CVAR: StaticCondvar = CONDVAR_INIT;
/// ```
#[unstable(feature = "static_condvar",
reason = "may be merged with Condvar in the future",
issue = "27717")]
pub struct StaticCondvar {
inner: sys::Condvar,
mutex: AtomicUsize,
}
/// Constant initializer for a statically allocated condition variable.
#[unstable(feature = "static_condvar",
reason = "may be merged with Condvar in the future",
issue = "27717")]
pub const CONDVAR_INIT: StaticCondvar = StaticCondvar::new();
impl Condvar {
/// Creates a new condition variable which is ready to be waited on and
/// notified.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn new() -> Condvar {
Condvar {
inner: box StaticCondvar {
inner: sys::Condvar::new(),
mutex: AtomicUsize::new(0),
}
}
}
/// Blocks the current thread until this condition variable receives a
/// notification.
///
/// This function will atomically unlock the mutex specified (represented by
/// `mutex_guard`) and block the current thread. This means that any calls
/// to `notify_*()` which happen logically after the mutex is unlocked are
/// candidates to wake this thread up. When this function call returns, the
/// lock specified will have been re-acquired.
///
/// Note that this function is susceptible to spurious wakeups. Condition
/// variables normally have a boolean predicate associated with them, and
/// the predicate must always be checked each time this function returns to
/// protect against spurious wakeups.
///
/// # Failure
///
/// This function will return an error if the mutex being waited on is
/// poisoned when this thread re-acquires the lock. For more information,
/// see information about poisoning on the Mutex type.
///
/// # Panics
///
/// This function will `panic!()` if it is used with more than one mutex
/// over time. Each condition variable is dynamically bound to exactly one
/// mutex to ensure defined behavior across platforms. If this functionality
/// is not desired, then unsafe primitives in `sys` are provided.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>)
-> LockResult<MutexGuard<'a, T>> {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait(guard)
}
}
/// Waits on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The semantics of this function are equivalent to `wait()`
/// except that the thread will be blocked for roughly no longer
/// than `ms` milliseconds. This method should not be used for
/// precise timing due to anomalies such as preemption or platform
/// differences that may not cause the maximum amount of time
/// waited to be precisely `ms`.
///
/// The returned boolean is `false` only if the timeout is known
/// to have elapsed.
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
#[stable(feature = "rust1", since = "1.0.0")]
#[rustc_deprecated(since = "1.6.0", reason = "replaced by `std::sync::Condvar::wait_timeout`")]
#[allow(deprecated)]
pub fn wait_timeout_ms<'a, T>(&self, guard: MutexGuard<'a, T>, ms: u32)
-> LockResult<(MutexGuard<'a, T>, bool)> {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait_timeout_ms(guard, ms)
}
}
/// Waits on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The semantics of this function are equivalent to `wait()` except that
/// the thread will be blocked for roughly no longer than `dur`. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum
/// amount of time waited to be precisely `dur`.
///
/// The returned `WaitTimeoutResult` value indicates if the timeout is
/// known to have elapsed.
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
#[stable(feature = "wait_timeout", since = "1.5.0")]
pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>,
dur: Duration)
-> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait_timeout(guard, dur)
}
}
/// Waits on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The semantics of this function are equivalent to `wait_timeout` except
/// that the implementation will repeatedly wait while the duration has not
/// passed and the provided function returns `false`.
#[unstable(feature = "wait_timeout_with",
reason = "unsure if this API is broadly needed or what form it should take",
issue = "27748")]
pub fn wait_timeout_with<'a, T, F>(&self,
guard: MutexGuard<'a, T>,
dur: Duration,
f: F)
-> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)>
where F: FnMut(LockResult<&mut T>) -> bool {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait_timeout_with(guard, dur, f)
}
}
/// Wakes up one blocked thread on this condvar.
///
/// If there is a blocked thread on this condition variable, then it will
/// be woken up from its call to `wait` or `wait_timeout`. Calls to
/// `notify_one` are not buffered in any way.
///
/// To wake up all threads, see `notify_all()`.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn notify_one(&self) { unsafe { self.inner.inner.notify_one() } }
/// Wakes up all blocked threads on this condvar.
///
/// This method will ensure that any current waiters on the condition
/// variable are awoken. Calls to `notify_all()` are not buffered in any
/// way.
///
/// To wake up only one thread, see `notify_one()`.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn notify_all(&self) { unsafe { self.inner.inner.notify_all() } }
}
#[stable(feature = "rust1", since = "1.0.0")]
impl Drop for Condvar {
fn drop(&mut self) {
unsafe { self.inner.inner.destroy() }
}
}
impl StaticCondvar {
/// Creates a new condition variable
#[unstable(feature = "static_condvar",
reason = "may be merged with Condvar in the future",
issue = "27717")]
pub const fn new() -> StaticCondvar {
StaticCondvar {
inner: sys::Condvar::new(),
mutex: AtomicUsize::new(0),
}
}
/// Blocks the current thread until this condition variable receives a
/// notification.
///
/// See `Condvar::wait`.
#[unstable(feature = "static_condvar",
reason = "may be merged with Condvar in the future",
issue = "27717")]
pub fn wait<'a, T>(&'static self, guard: MutexGuard<'a, T>)
-> LockResult<MutexGuard<'a, T>> {
let poisoned = unsafe {
let lock = mutex::guard_lock(&guard);
self.verify(lock);
self.inner.wait(lock);
mutex::guard_poison(&guard).get()
};
if poisoned {
Err(PoisonError::new(guard))
} else {
Ok(guard)
}
}
/// Waits on this condition variable for a notification, timing out after a
/// specified duration.
///
/// See `Condvar::wait_timeout`.
#[unstable(feature = "static_condvar",
reason = "may be merged with Condvar in the future",
issue = "27717")]
#[rustc_deprecated(since = "1.6.0",
reason = "replaced by `std::sync::StaticCondvar::wait_timeout`")]
pub fn wait_timeout_ms<'a, T>(&'static self, guard: MutexGuard<'a, T>, ms: u32)
-> LockResult<(MutexGuard<'a, T>, bool)> {
match self.wait_timeout(guard, Duration::from_millis(ms as u64)) {
Ok((guard, timed_out)) => Ok((guard, !timed_out.timed_out())),
Err(poison) => {
let (guard, timed_out) = poison.into_inner();
Err(PoisonError::new((guard, !timed_out.timed_out())))
}
}
}
/// Waits on this condition variable for a notification, timing out after a
/// specified duration.
///
/// See `Condvar::wait_timeout`.
#[unstable(feature = "static_condvar",
reason = "may be merged with Condvar in the future",
issue = "27717")]
pub fn wait_timeout<'a, T>(&'static self,
guard: MutexGuard<'a, T>,
timeout: Duration)
-> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> {
let (poisoned, result) = unsafe {
let lock = mutex::guard_lock(&guard);
self.verify(lock);
let success = self.inner.wait_timeout(lock, timeout);
(mutex::guard_poison(&guard).get(), WaitTimeoutResult(!success))
};
if poisoned {
Err(PoisonError::new((guard, result)))
} else {
Ok((guard, result))
}
}
/// Waits on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The implementation will repeatedly wait while the duration has not
/// passed and the function returns `false`.
///
/// See `Condvar::wait_timeout_with`.
#[unstable(feature = "static_condvar",
reason = "may be merged with Condvar in the future",
issue = "27717")]
pub fn wait_timeout_with<'a, T, F>(&'static self,
guard: MutexGuard<'a, T>,
dur: Duration,
mut f: F)
-> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)>
where F: FnMut(LockResult<&mut T>) -> bool {
// This could be made more efficient by pushing the implementation into
// sys::condvar
let start = Instant::now();
let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
while !f(guard_result
.as_mut()
.map(|g| &mut **g)
.map_err(|e| PoisonError::new(&mut **e.get_mut()))) {
let consumed = start.elapsed();
let guard = guard_result.unwrap_or_else(|e| e.into_inner());
let (new_guard_result, timed_out) = if consumed > dur {
(Ok(guard), WaitTimeoutResult(true))
} else {
match self.wait_timeout(guard, dur - consumed) {
Ok((new_guard, timed_out)) => (Ok(new_guard), timed_out),
Err(err) => {
let (new_guard, no_timeout) = err.into_inner();
(Err(PoisonError::new(new_guard)), no_timeout)
}
}
};
guard_result = new_guard_result;
if timed_out.timed_out() {
let result = f(guard_result
.as_mut()
.map(|g| &mut **g)
.map_err(|e| PoisonError::new(&mut **e.get_mut())));
let result = WaitTimeoutResult(!result);
return poison::map_result(guard_result, |g| (g, result));
}
}
poison::map_result(guard_result, |g| (g, WaitTimeoutResult(false)))
}
/// Wakes up one blocked thread on this condvar.
///
/// See `Condvar::notify_one`.
#[unstable(feature = "static_condvar",
reason = "may be merged with Condvar in the future",
issue = "27717")]
pub fn notify_one(&'static self) { unsafe { self.inner.notify_one() } }
/// Wakes up all blocked threads on this condvar.
///
/// See `Condvar::notify_all`.
#[unstable(feature = "static_condvar",
reason = "may be merged with Condvar in the future",
issue = "27717")]
pub fn notify_all(&'static self) { unsafe { self.inner.notify_all() } }
/// Deallocates all resources associated with this static condvar.
///
/// This method is unsafe to call as there is no guarantee that there are no
/// active users of the condvar, and this also doesn't prevent any future
/// users of the condvar. This method is required to be called to not leak
/// memory on all platforms.
#[unstable(feature = "static_condvar",
reason = "may be merged with Condvar in the future",
issue = "27717")]
pub unsafe fn destroy(&'static self) {
self.inner.destroy()
}
fn verify(&self, mutex: &sys_mutex::Mutex) {
let addr = mutex as *const _ as usize;
match self.mutex.compare_and_swap(0, addr, Ordering::SeqCst) {
// If we got out 0, then we have successfully bound the mutex to
// this cvar.
0 => {}
// If we get out a value that's the same as `addr`, then someone
// already beat us to the punch.
n if n == addr => {}
// Anything else and we're using more than one mutex on this cvar,
// which is currently disallowed.
_ => panic!("attempted to use a condition variable with two \
mutexes"),
}
}
}
#[cfg(test)]
mod tests {
use prelude::v1::*;
use super::StaticCondvar;
use sync::mpsc::channel;
use sync::{StaticMutex, Condvar, Mutex, Arc};
use sync::atomic::{AtomicUsize, Ordering};
use thread;
use time::Duration;
use u32;
#[test]
fn smoke() {
let c = Condvar::new();
c.notify_one();
c.notify_all();
}
#[test]
fn static_smoke() {
static C: StaticCondvar = StaticCondvar::new();
C.notify_one();
C.notify_all();
unsafe { C.destroy(); }
}
#[test]
fn notify_one() {
static C: StaticCondvar = StaticCondvar::new();
static M: StaticMutex = StaticMutex::new();
let g = M.lock().unwrap();
let _t = thread::spawn(move|| {
let _g = M.lock().unwrap();
C.notify_one();
});
let g = C.wait(g).unwrap();
drop(g);
unsafe { C.destroy(); M.destroy(); }
}
#[test]
fn notify_all() {
const N: usize = 10;
let data = Arc::new((Mutex::new(0), Condvar::new()));
let (tx, rx) = channel();
for _ in 0..N {
let data = data.clone();
let tx = tx.clone();
thread::spawn(move|| {
let &(ref lock, ref cond) = &*data;
let mut cnt = lock.lock().unwrap();
*cnt += 1;
if *cnt == N {
tx.send(()).unwrap();
}
while *cnt != 0 {
cnt = cond.wait(cnt).unwrap();
}
tx.send(()).unwrap();
});
}
drop(tx);
let &(ref lock, ref cond) = &*data;
rx.recv().unwrap();
let mut cnt = lock.lock().unwrap();
*cnt = 0;
cond.notify_all();
drop(cnt);
for _ in 0..N {
rx.recv().unwrap();
}
}
#[test]
fn wait_timeout_ms() {
static C: StaticCondvar = StaticCondvar::new();
static M: StaticMutex = StaticMutex::new();
let g = M.lock().unwrap();
let (g, _no_timeout) = C.wait_timeout(g, Duration::from_millis(1)).unwrap();
// spurious wakeups mean this isn't necessarily true
// assert!(!no_timeout);
let _t = thread::spawn(move || {
let _g = M.lock().unwrap();
C.notify_one();
});
let (g, timeout_res) = C.wait_timeout(g, Duration::from_millis(u32::MAX as u64)).unwrap();
assert!(!timeout_res.timed_out());
drop(g);
unsafe { C.destroy(); M.destroy(); }
}
#[test]
fn wait_timeout_with() {
static C: StaticCondvar = StaticCondvar::new();
static M: StaticMutex = StaticMutex::new();
static S: AtomicUsize = AtomicUsize::new(0);
let g = M.lock().unwrap();
let (g, timed_out) = C.wait_timeout_with(g, Duration::new(0, 1000), |_| {
false
}).unwrap();
assert!(timed_out.timed_out());
let (tx, rx) = channel();
let _t = thread::spawn(move || {
rx.recv().unwrap();
let g = M.lock().unwrap();
S.store(1, Ordering::SeqCst);
C.notify_one();
drop(g);
rx.recv().unwrap();
let g = M.lock().unwrap();
S.store(2, Ordering::SeqCst);
C.notify_one();
drop(g);
rx.recv().unwrap();
let _g = M.lock().unwrap();
S.store(3, Ordering::SeqCst);
C.notify_one();
});
let mut state = 0;
let day = 24 * 60 * 60;
let (_g, timed_out) = C.wait_timeout_with(g, Duration::new(day, 0), |_| {
assert_eq!(state, S.load(Ordering::SeqCst));
tx.send(()).unwrap();
state += 1;
match state {
1|2 => false,
_ => true,
}
}).unwrap();
assert!(!timed_out.timed_out());
}
#[test]
#[should_panic]
fn two_mutexes() {
static M1: StaticMutex = StaticMutex::new();
static M2: StaticMutex = StaticMutex::new();
static C: StaticCondvar = StaticCondvar::new();
let mut g = M1.lock().unwrap();
let _t = thread::spawn(move|| {
let _g = M1.lock().unwrap();
C.notify_one();
});
g = C.wait(g).unwrap();
drop(g);
let _ = C.wait(M2.lock().unwrap()).unwrap();
}
}