blob: 60e0184b9ea057d5d9a65a51d8118a01a67d3f2e [file] [log] [blame]
use token::Token;
use util::Slab;
use clock_ticks::precise_time_ns;
use std::{usize, iter};
use std::cmp::max;
use self::TimerErrorKind::TimerOverflow;
const EMPTY: Token = Token(usize::MAX);
const NS_PER_MS: u64 = 1_000_000;
// Implements coarse-grained timeouts using an algorithm based on hashed timing
// wheels by Varghese & Lauck.
//
// TODO:
// * Handle the case when the timer falls more than an entire wheel behind. There
// is no point to loop multiple times around the wheel in one go.
// * New type for tick, now() -> Tick
#[derive(Debug)]
pub struct Timer<T> {
// Size of each tick in milliseconds
tick_ms: u64,
// Slab of timeout entries
entries: Slab<Entry<T>>,
// Timeout wheel. Each tick, the timer will look at the next slot for
// timeouts that match the current tick.
wheel: Vec<Token>,
// Tick 0's time in milliseconds
start: u64,
// The current tick
tick: u64,
// The next entry to possibly timeout
next: Token,
// Masks the target tick to get the slot
mask: u64,
}
#[derive(Copy, Clone)]
pub struct Timeout {
// Reference into the timer entry slab
token: Token,
// Tick that it should matchup with
tick: u64,
}
impl<T> Timer<T> {
pub fn new(tick_ms: u64, mut slots: usize, mut capacity: usize) -> Timer<T> {
slots = slots.next_power_of_two();
capacity = capacity.next_power_of_two();
Timer {
tick_ms: tick_ms,
entries: Slab::new(capacity),
wheel: iter::repeat(EMPTY).take(slots).collect(),
start: 0,
tick: 0,
next: EMPTY,
mask: (slots as u64) - 1
}
}
#[cfg(test)]
pub fn count(&self) -> usize {
self.entries.count()
}
// Number of ms remaining until the next tick
pub fn next_tick_in_ms(&self) -> u64 {
let now = self.now_ms();
let nxt = self.start + (self.tick + 1) * self.tick_ms;
if nxt <= now {
return 0;
}
nxt - now
}
/*
*
* ===== Initialization =====
*
*/
// Sets the starting time of the timer using the current system time
pub fn setup(&mut self) {
let now = self.now_ms();
self.set_start_ms(now);
}
fn set_start_ms(&mut self, start: u64) {
assert!(!self.is_initialized(), "the timer has already started");
self.start = start;
}
/*
*
* ===== Timeout create / cancel =====
*
*/
pub fn timeout_ms(&mut self, token: T, delay: u64) -> TimerResult<Timeout> {
let at = self.now_ms() + max(0, delay);
self.timeout_at_ms(token, at)
}
pub fn timeout_at_ms(&mut self, token: T, mut at: u64) -> TimerResult<Timeout> {
// Make relative to start
at -= self.start;
// Calculate tick
let mut tick = (at + self.tick_ms - 1) / self.tick_ms;
// Always target at least 1 tick in the future
if tick <= self.tick {
tick = self.tick + 1;
}
self.insert(token, tick)
}
pub fn clear(&mut self, timeout: Timeout) -> bool {
let links = match self.entries.get(timeout.token) {
Some(e) => e.links,
None => return false
};
// Sanity check
if links.tick != timeout.tick {
return false;
}
self.unlink(&links, timeout.token);
self.entries.remove(timeout.token);
true
}
fn insert(&mut self, token: T, tick: u64) -> TimerResult<Timeout> {
// Get the slot for the requested tick
let slot = (tick & self.mask) as usize;
let curr = self.wheel[slot];
// Insert the new entry
let token = try!(
self.entries.insert(Entry::new(token, tick, curr))
.map_err(|_| TimerError::overflow()));
if curr != EMPTY {
// If there was a previous entry, set its prev pointer to the new
// entry
self.entries[curr].links.prev = token;
}
// Update the head slot
self.wheel[slot] = token;
trace!("inserted timout; slot={}; token={:?}", slot, token);
// Return the new timeout
Ok(Timeout {
token: token,
tick: tick
})
}
fn unlink(&mut self, links: &EntryLinks, token: Token) {
trace!("unlinking timeout; slot={}; token={:?}",
self.slot_for(links.tick), token);
if links.prev == EMPTY {
let slot = self.slot_for(links.tick);
self.wheel[slot] = links.next;
} else {
self.entries[links.prev].links.next = links.next;
}
if links.next != EMPTY {
self.entries[links.next].links.prev = links.prev;
if token == self.next {
self.next = links.next;
}
} else if token == self.next {
self.next = EMPTY;
}
}
/*
*
* ===== Advance time =====
*
*/
pub fn now(&self) -> u64 {
self.ms_to_tick(self.now_ms())
}
pub fn tick_to(&mut self, now: u64) -> Option<T> {
trace!("tick_to; now={}; tick={}", now, self.tick);
while self.tick <= now {
let curr = self.next;
trace!("ticking; curr={:?}", curr);
if curr == EMPTY {
self.tick += 1;
self.next = self.wheel[self.slot_for(self.tick)];
} else {
let links = self.entries[curr].links;
if links.tick <= self.tick {
trace!("triggering; token={:?}", curr);
// Unlink will also advance self.next
self.unlink(&links, curr);
// Remove and return the token
return self.entries.remove(curr)
.map(|e| e.token);
} else {
self.next = links.next;
}
}
}
None
}
/*
*
* ===== Misc =====
*
*/
// Timers are initialized when either the current time has been advanced or a timeout has been set
#[inline]
fn is_initialized(&self) -> bool {
self.tick > 0 || !self.entries.is_empty()
}
#[inline]
fn slot_for(&self, tick: u64) -> usize {
(self.mask & tick) as usize
}
// Convert a ms duration into a number of ticks, rounds up
#[inline]
fn ms_to_tick(&self, ms: u64) -> u64 {
(ms - self.start) / self.tick_ms
}
#[inline]
fn now_ms(&self) -> u64 {
precise_time_ns() / NS_PER_MS
}
}
// Doubly linked list of timer entries. Allows for efficient insertion /
// removal of timeouts.
struct Entry<T> {
token: T,
links: EntryLinks,
}
impl<T> Entry<T> {
fn new(token: T, tick: u64, next: Token) -> Entry<T> {
Entry {
token: token,
links: EntryLinks {
tick: tick,
prev: EMPTY,
next: next,
},
}
}
}
#[derive(Copy, Clone)]
struct EntryLinks {
tick: u64,
prev: Token,
next: Token
}
pub type TimerResult<T> = Result<T, TimerError>;
#[derive(Debug)]
pub struct TimerError {
kind: TimerErrorKind,
desc: &'static str,
}
impl TimerError {
fn overflow() -> TimerError {
TimerError {
kind: TimerOverflow,
desc: "too many timer entries"
}
}
}
#[derive(Debug)]
pub enum TimerErrorKind {
TimerOverflow,
}
#[cfg(test)]
mod test {
use super::Timer;
#[test]
pub fn test_timeout_next_tick() {
let mut t = timer();
let mut tick;
t.timeout_at_ms("a", 100).unwrap();
tick = t.ms_to_tick(50);
assert_eq!(None, t.tick_to(tick));
tick = t.ms_to_tick(100);
assert_eq!(Some("a"), t.tick_to(tick));
assert_eq!(None, t.tick_to(tick));
tick = t.ms_to_tick(150);
assert_eq!(None, t.tick_to(tick));
tick = t.ms_to_tick(200);
assert_eq!(None, t.tick_to(tick));
assert_eq!(t.count(), 0);
}
#[test]
pub fn test_clearing_timeout() {
let mut t = timer();
let mut tick;
let to = t.timeout_at_ms("a", 100).unwrap();
assert!(t.clear(to));
tick = t.ms_to_tick(100);
assert_eq!(None, t.tick_to(tick));
tick = t.ms_to_tick(200);
assert_eq!(None, t.tick_to(tick));
assert_eq!(t.count(), 0);
}
#[test]
pub fn test_multiple_timeouts_same_tick() {
let mut t = timer();
let mut tick;
t.timeout_at_ms("a", 100).unwrap();
t.timeout_at_ms("b", 100).unwrap();
let mut rcv = vec![];
tick = t.ms_to_tick(100);
rcv.push(t.tick_to(tick).unwrap());
rcv.push(t.tick_to(tick).unwrap());
assert_eq!(None, t.tick_to(tick));
rcv.sort();
assert!(rcv == ["a", "b"], "actual={:?}", rcv);
tick = t.ms_to_tick(200);
assert_eq!(None, t.tick_to(tick));
assert_eq!(t.count(), 0);
}
#[test]
pub fn test_multiple_timeouts_diff_tick() {
let mut t = timer();
let mut tick;
t.timeout_at_ms("a", 110).unwrap();
t.timeout_at_ms("b", 220).unwrap();
t.timeout_at_ms("c", 230).unwrap();
t.timeout_at_ms("d", 440).unwrap();
tick = t.ms_to_tick(100);
assert_eq!(None, t.tick_to(tick));
tick = t.ms_to_tick(200);
assert_eq!(Some("a"), t.tick_to(tick));
assert_eq!(None, t.tick_to(tick));
tick = t.ms_to_tick(300);
assert_eq!(Some("c"), t.tick_to(tick));
assert_eq!(Some("b"), t.tick_to(tick));
assert_eq!(None, t.tick_to(tick));
tick = t.ms_to_tick(400);
assert_eq!(None, t.tick_to(tick));
tick = t.ms_to_tick(500);
assert_eq!(Some("d"), t.tick_to(tick));
assert_eq!(None, t.tick_to(tick));
tick = t.ms_to_tick(600);
assert_eq!(None, t.tick_to(tick));
}
#[test]
pub fn test_catching_up() {
let mut t = timer();
t.timeout_at_ms("a", 110).unwrap();
t.timeout_at_ms("b", 220).unwrap();
t.timeout_at_ms("c", 230).unwrap();
t.timeout_at_ms("d", 440).unwrap();
let tick = t.ms_to_tick(600);
assert_eq!(Some("a"), t.tick_to(tick));
assert_eq!(Some("c"), t.tick_to(tick));
assert_eq!(Some("b"), t.tick_to(tick));
assert_eq!(Some("d"), t.tick_to(tick));
assert_eq!(None, t.tick_to(tick));
}
#[test]
pub fn test_timeout_hash_collision() {
let mut t = timer();
let mut tick;
t.timeout_at_ms("a", 100).unwrap();
t.timeout_at_ms("b", 100 + TICK * SLOTS as u64).unwrap();
tick = t.ms_to_tick(100);
assert_eq!(Some("a"), t.tick_to(tick));
assert_eq!(1, t.count());
tick = t.ms_to_tick(200);
assert_eq!(None, t.tick_to(tick));
assert_eq!(1, t.count());
tick = t.ms_to_tick(100 + TICK * SLOTS as u64);
assert_eq!(Some("b"), t.tick_to(tick));
assert_eq!(0, t.count());
}
#[test]
pub fn test_clearing_timeout_between_triggers() {
let mut t = timer();
let mut tick;
let a = t.timeout_at_ms("a", 100).unwrap();
let _ = t.timeout_at_ms("b", 100).unwrap();
let _ = t.timeout_at_ms("c", 200).unwrap();
tick = t.ms_to_tick(100);
assert_eq!(Some("b"), t.tick_to(tick));
assert_eq!(2, t.count());
t.clear(a);
assert_eq!(1, t.count());
assert_eq!(None, t.tick_to(tick));
tick = t.ms_to_tick(200);
assert_eq!(Some("c"), t.tick_to(tick));
assert_eq!(0, t.count());
}
const TICK: u64 = 100;
const SLOTS: usize = 16;
fn timer() -> Timer<&'static str> {
Timer::new(TICK, SLOTS, 32)
}
}