| use token::Token; |
| use util::Slab; |
| use clock_ticks::precise_time_ns; |
| use std::{usize, iter}; |
| use std::cmp::max; |
| |
| use self::TimerErrorKind::TimerOverflow; |
| |
| const EMPTY: Token = Token(usize::MAX); |
| const NS_PER_MS: u64 = 1_000_000; |
| |
| // Implements coarse-grained timeouts using an algorithm based on hashed timing |
| // wheels by Varghese & Lauck. |
| // |
| // TODO: |
| // * Handle the case when the timer falls more than an entire wheel behind. There |
| // is no point to loop multiple times around the wheel in one go. |
| // * New type for tick, now() -> Tick |
| #[derive(Debug)] |
| pub struct Timer<T> { |
| // Size of each tick in milliseconds |
| tick_ms: u64, |
| // Slab of timeout entries |
| entries: Slab<Entry<T>>, |
| // Timeout wheel. Each tick, the timer will look at the next slot for |
| // timeouts that match the current tick. |
| wheel: Vec<Token>, |
| // Tick 0's time in milliseconds |
| start: u64, |
| // The current tick |
| tick: u64, |
| // The next entry to possibly timeout |
| next: Token, |
| // Masks the target tick to get the slot |
| mask: u64, |
| } |
| |
| #[derive(Copy, Clone)] |
| pub struct Timeout { |
| // Reference into the timer entry slab |
| token: Token, |
| // Tick that it should matchup with |
| tick: u64, |
| } |
| |
| impl<T> Timer<T> { |
| pub fn new(tick_ms: u64, mut slots: usize, mut capacity: usize) -> Timer<T> { |
| slots = slots.next_power_of_two(); |
| capacity = capacity.next_power_of_two(); |
| |
| Timer { |
| tick_ms: tick_ms, |
| entries: Slab::new(capacity), |
| wheel: iter::repeat(EMPTY).take(slots).collect(), |
| start: 0, |
| tick: 0, |
| next: EMPTY, |
| mask: (slots as u64) - 1 |
| } |
| } |
| |
| #[cfg(test)] |
| pub fn count(&self) -> usize { |
| self.entries.count() |
| } |
| |
| // Number of ms remaining until the next tick |
| pub fn next_tick_in_ms(&self) -> u64 { |
| let now = self.now_ms(); |
| let nxt = self.start + (self.tick + 1) * self.tick_ms; |
| |
| if nxt <= now { |
| return 0; |
| } |
| |
| nxt - now |
| } |
| |
| /* |
| * |
| * ===== Initialization ===== |
| * |
| */ |
| |
| // Sets the starting time of the timer using the current system time |
| pub fn setup(&mut self) { |
| let now = self.now_ms(); |
| self.set_start_ms(now); |
| } |
| |
| fn set_start_ms(&mut self, start: u64) { |
| assert!(!self.is_initialized(), "the timer has already started"); |
| self.start = start; |
| } |
| |
| /* |
| * |
| * ===== Timeout create / cancel ===== |
| * |
| */ |
| |
| pub fn timeout_ms(&mut self, token: T, delay: u64) -> TimerResult<Timeout> { |
| let at = self.now_ms() + max(0, delay); |
| self.timeout_at_ms(token, at) |
| } |
| |
| pub fn timeout_at_ms(&mut self, token: T, mut at: u64) -> TimerResult<Timeout> { |
| // Make relative to start |
| at -= self.start; |
| // Calculate tick |
| let mut tick = (at + self.tick_ms - 1) / self.tick_ms; |
| |
| // Always target at least 1 tick in the future |
| if tick <= self.tick { |
| tick = self.tick + 1; |
| } |
| |
| self.insert(token, tick) |
| } |
| |
| pub fn clear(&mut self, timeout: Timeout) -> bool { |
| let links = match self.entries.get(timeout.token) { |
| Some(e) => e.links, |
| None => return false |
| }; |
| |
| // Sanity check |
| if links.tick != timeout.tick { |
| return false; |
| } |
| |
| self.unlink(&links, timeout.token); |
| self.entries.remove(timeout.token); |
| true |
| } |
| |
| fn insert(&mut self, token: T, tick: u64) -> TimerResult<Timeout> { |
| // Get the slot for the requested tick |
| let slot = (tick & self.mask) as usize; |
| let curr = self.wheel[slot]; |
| |
| // Insert the new entry |
| let token = try!( |
| self.entries.insert(Entry::new(token, tick, curr)) |
| .map_err(|_| TimerError::overflow())); |
| |
| if curr != EMPTY { |
| // If there was a previous entry, set its prev pointer to the new |
| // entry |
| self.entries[curr].links.prev = token; |
| } |
| |
| // Update the head slot |
| self.wheel[slot] = token; |
| |
| trace!("inserted timout; slot={}; token={:?}", slot, token); |
| |
| // Return the new timeout |
| Ok(Timeout { |
| token: token, |
| tick: tick |
| }) |
| } |
| |
| fn unlink(&mut self, links: &EntryLinks, token: Token) { |
| trace!("unlinking timeout; slot={}; token={:?}", |
| self.slot_for(links.tick), token); |
| |
| if links.prev == EMPTY { |
| let slot = self.slot_for(links.tick); |
| self.wheel[slot] = links.next; |
| } else { |
| self.entries[links.prev].links.next = links.next; |
| } |
| |
| if links.next != EMPTY { |
| self.entries[links.next].links.prev = links.prev; |
| |
| if token == self.next { |
| self.next = links.next; |
| } |
| } else if token == self.next { |
| self.next = EMPTY; |
| } |
| } |
| |
| /* |
| * |
| * ===== Advance time ===== |
| * |
| */ |
| |
| pub fn now(&self) -> u64 { |
| self.ms_to_tick(self.now_ms()) |
| } |
| |
| pub fn tick_to(&mut self, now: u64) -> Option<T> { |
| trace!("tick_to; now={}; tick={}", now, self.tick); |
| |
| while self.tick <= now { |
| let curr = self.next; |
| |
| trace!("ticking; curr={:?}", curr); |
| |
| if curr == EMPTY { |
| self.tick += 1; |
| self.next = self.wheel[self.slot_for(self.tick)]; |
| } else { |
| let links = self.entries[curr].links; |
| |
| if links.tick <= self.tick { |
| trace!("triggering; token={:?}", curr); |
| |
| // Unlink will also advance self.next |
| self.unlink(&links, curr); |
| |
| // Remove and return the token |
| return self.entries.remove(curr) |
| .map(|e| e.token); |
| } else { |
| self.next = links.next; |
| } |
| } |
| } |
| |
| None |
| } |
| |
| /* |
| * |
| * ===== Misc ===== |
| * |
| */ |
| |
| // Timers are initialized when either the current time has been advanced or a timeout has been set |
| #[inline] |
| fn is_initialized(&self) -> bool { |
| self.tick > 0 || !self.entries.is_empty() |
| } |
| |
| #[inline] |
| fn slot_for(&self, tick: u64) -> usize { |
| (self.mask & tick) as usize |
| } |
| |
| // Convert a ms duration into a number of ticks, rounds up |
| #[inline] |
| fn ms_to_tick(&self, ms: u64) -> u64 { |
| (ms - self.start) / self.tick_ms |
| } |
| |
| #[inline] |
| fn now_ms(&self) -> u64 { |
| precise_time_ns() / NS_PER_MS |
| } |
| } |
| |
| // Doubly linked list of timer entries. Allows for efficient insertion / |
| // removal of timeouts. |
| struct Entry<T> { |
| token: T, |
| links: EntryLinks, |
| } |
| |
| impl<T> Entry<T> { |
| fn new(token: T, tick: u64, next: Token) -> Entry<T> { |
| Entry { |
| token: token, |
| links: EntryLinks { |
| tick: tick, |
| prev: EMPTY, |
| next: next, |
| }, |
| } |
| } |
| } |
| |
| #[derive(Copy, Clone)] |
| struct EntryLinks { |
| tick: u64, |
| prev: Token, |
| next: Token |
| } |
| |
| pub type TimerResult<T> = Result<T, TimerError>; |
| |
| #[derive(Debug)] |
| pub struct TimerError { |
| kind: TimerErrorKind, |
| desc: &'static str, |
| } |
| |
| impl TimerError { |
| fn overflow() -> TimerError { |
| TimerError { |
| kind: TimerOverflow, |
| desc: "too many timer entries" |
| } |
| } |
| } |
| |
| #[derive(Debug)] |
| pub enum TimerErrorKind { |
| TimerOverflow, |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use super::Timer; |
| |
| #[test] |
| pub fn test_timeout_next_tick() { |
| let mut t = timer(); |
| let mut tick; |
| |
| t.timeout_at_ms("a", 100).unwrap(); |
| |
| tick = t.ms_to_tick(50); |
| assert_eq!(None, t.tick_to(tick)); |
| |
| tick = t.ms_to_tick(100); |
| assert_eq!(Some("a"), t.tick_to(tick)); |
| assert_eq!(None, t.tick_to(tick)); |
| |
| tick = t.ms_to_tick(150); |
| assert_eq!(None, t.tick_to(tick)); |
| |
| tick = t.ms_to_tick(200); |
| assert_eq!(None, t.tick_to(tick)); |
| |
| assert_eq!(t.count(), 0); |
| } |
| |
| #[test] |
| pub fn test_clearing_timeout() { |
| let mut t = timer(); |
| let mut tick; |
| |
| let to = t.timeout_at_ms("a", 100).unwrap(); |
| assert!(t.clear(to)); |
| |
| tick = t.ms_to_tick(100); |
| assert_eq!(None, t.tick_to(tick)); |
| |
| tick = t.ms_to_tick(200); |
| assert_eq!(None, t.tick_to(tick)); |
| |
| assert_eq!(t.count(), 0); |
| } |
| |
| #[test] |
| pub fn test_multiple_timeouts_same_tick() { |
| let mut t = timer(); |
| let mut tick; |
| |
| t.timeout_at_ms("a", 100).unwrap(); |
| t.timeout_at_ms("b", 100).unwrap(); |
| |
| let mut rcv = vec![]; |
| |
| tick = t.ms_to_tick(100); |
| rcv.push(t.tick_to(tick).unwrap()); |
| rcv.push(t.tick_to(tick).unwrap()); |
| |
| assert_eq!(None, t.tick_to(tick)); |
| |
| rcv.sort(); |
| assert!(rcv == ["a", "b"], "actual={:?}", rcv); |
| |
| tick = t.ms_to_tick(200); |
| assert_eq!(None, t.tick_to(tick)); |
| |
| assert_eq!(t.count(), 0); |
| } |
| |
| #[test] |
| pub fn test_multiple_timeouts_diff_tick() { |
| let mut t = timer(); |
| let mut tick; |
| |
| t.timeout_at_ms("a", 110).unwrap(); |
| t.timeout_at_ms("b", 220).unwrap(); |
| t.timeout_at_ms("c", 230).unwrap(); |
| t.timeout_at_ms("d", 440).unwrap(); |
| |
| tick = t.ms_to_tick(100); |
| assert_eq!(None, t.tick_to(tick)); |
| |
| tick = t.ms_to_tick(200); |
| assert_eq!(Some("a"), t.tick_to(tick)); |
| assert_eq!(None, t.tick_to(tick)); |
| |
| tick = t.ms_to_tick(300); |
| assert_eq!(Some("c"), t.tick_to(tick)); |
| assert_eq!(Some("b"), t.tick_to(tick)); |
| assert_eq!(None, t.tick_to(tick)); |
| |
| tick = t.ms_to_tick(400); |
| assert_eq!(None, t.tick_to(tick)); |
| |
| tick = t.ms_to_tick(500); |
| assert_eq!(Some("d"), t.tick_to(tick)); |
| assert_eq!(None, t.tick_to(tick)); |
| |
| tick = t.ms_to_tick(600); |
| assert_eq!(None, t.tick_to(tick)); |
| } |
| |
| #[test] |
| pub fn test_catching_up() { |
| let mut t = timer(); |
| |
| t.timeout_at_ms("a", 110).unwrap(); |
| t.timeout_at_ms("b", 220).unwrap(); |
| t.timeout_at_ms("c", 230).unwrap(); |
| t.timeout_at_ms("d", 440).unwrap(); |
| |
| let tick = t.ms_to_tick(600); |
| assert_eq!(Some("a"), t.tick_to(tick)); |
| assert_eq!(Some("c"), t.tick_to(tick)); |
| assert_eq!(Some("b"), t.tick_to(tick)); |
| assert_eq!(Some("d"), t.tick_to(tick)); |
| assert_eq!(None, t.tick_to(tick)); |
| } |
| |
| #[test] |
| pub fn test_timeout_hash_collision() { |
| let mut t = timer(); |
| let mut tick; |
| |
| t.timeout_at_ms("a", 100).unwrap(); |
| t.timeout_at_ms("b", 100 + TICK * SLOTS as u64).unwrap(); |
| |
| tick = t.ms_to_tick(100); |
| assert_eq!(Some("a"), t.tick_to(tick)); |
| assert_eq!(1, t.count()); |
| |
| tick = t.ms_to_tick(200); |
| assert_eq!(None, t.tick_to(tick)); |
| assert_eq!(1, t.count()); |
| |
| tick = t.ms_to_tick(100 + TICK * SLOTS as u64); |
| assert_eq!(Some("b"), t.tick_to(tick)); |
| assert_eq!(0, t.count()); |
| } |
| |
| #[test] |
| pub fn test_clearing_timeout_between_triggers() { |
| let mut t = timer(); |
| let mut tick; |
| |
| let a = t.timeout_at_ms("a", 100).unwrap(); |
| let _ = t.timeout_at_ms("b", 100).unwrap(); |
| let _ = t.timeout_at_ms("c", 200).unwrap(); |
| |
| tick = t.ms_to_tick(100); |
| assert_eq!(Some("b"), t.tick_to(tick)); |
| assert_eq!(2, t.count()); |
| |
| t.clear(a); |
| assert_eq!(1, t.count()); |
| |
| assert_eq!(None, t.tick_to(tick)); |
| |
| tick = t.ms_to_tick(200); |
| assert_eq!(Some("c"), t.tick_to(tick)); |
| assert_eq!(0, t.count()); |
| } |
| |
| const TICK: u64 = 100; |
| const SLOTS: usize = 16; |
| |
| fn timer() -> Timer<&'static str> { |
| Timer::new(TICK, SLOTS, 32) |
| } |
| } |