blob: da2d9cc219da8e2837374531d5a8b31f1f91d154 [file] [log] [blame]
// Copyright (C) 2022, Cloudflare, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//! Pacer provdes the timestamp for next packet to be sent
//! based on the current send_quantum, pacing rate and last
//! updated time. It's a kind of leaky bucket algorithm
//! (RFC 9002 7.7 Pacing) but it considers max burst
//! (send_quantum, in bytes) and provide a same timestamp
//! for the same sized (except last one) packets to be
//! GSO friendly, assuming we send packets using multiple sendmsg(),
//! a sendmmsg(), or sendmsg() with GSO without waiting for
//! new I/O event. After sending a burst of packets,
//! next timestamp will be updated based on the current
//! pacing rate. It will make actual timestamp sent and
//! recorded timestamp (Sent.time_sent) is
//! close as much as possible. If GSO is not used,
//! it still try to provide a close timestamp if the send
//! burst is implemented.
use std::time::Duration;
use std::time::Instant;
#[derive(Debug)]
pub struct Pacer {
// Bucket capacity (bytes).
capacity: usize,
// Bucket used (bytes).
used: usize,
// Sending pacing rate (bytes/sec).
rate: u64,
// Timestamp of last packet sent time update.
last_update: Instant,
// Timestamp of next packet to be sent.
next_time: Instant,
// Current MSS.
max_datagram_size: usize,
// Last packet size.
last_packet_size: Option<usize>,
// Interval to be added in next burst.
iv: Duration,
}
impl Pacer {
pub fn new(capacity: usize, rate: u64, max_datagram_size: usize) -> Self {
// Resize capacity round down to MSS.
let capacity = capacity / max_datagram_size * max_datagram_size;
Pacer {
capacity,
used: 0,
rate,
last_update: Instant::now(),
next_time: Instant::now(),
max_datagram_size,
last_packet_size: None,
iv: Duration::ZERO,
}
}
// Return the current pacing rate.
pub fn rate(&self) -> u64 {
self.rate
}
// Update bucket capacity or pacing_rate.
pub fn update(&mut self, capacity: usize, rate: u64, now: Instant) {
let capacity = capacity / self.max_datagram_size * self.max_datagram_size;
if self.capacity != capacity {
self.reset(now);
}
self.capacity = capacity;
self.rate = rate;
}
// Reset pacer for next burst.
pub fn reset(&mut self, now: Instant) {
self.used = 0;
self.last_update = now;
self.next_time = self.next_time.max(now);
self.last_packet_size = None;
self.iv = Duration::ZERO;
}
// Update the timestamp to sent.
pub fn send(&mut self, packet_size: usize, now: Instant) {
if self.rate == 0 {
self.reset(now);
return;
}
if !self.iv.is_zero() {
self.next_time = self.next_time.max(now) + self.iv;
self.iv = Duration::ZERO;
}
let interval =
Duration::from_secs_f64(self.capacity as f64 / self.rate as f64);
let elapsed = now.saturating_duration_since(self.last_update);
// if too old, reset it.
if elapsed > interval {
self.reset(now);
}
self.used += packet_size;
let same_size = if let Some(last_packet_size) = self.last_packet_size {
last_packet_size == packet_size
} else {
true
};
self.last_packet_size = Some(packet_size);
if self.used >= self.capacity || !same_size {
self.iv =
Duration::from_secs_f64(self.used as f64 / self.rate as f64);
self.used = 0;
self.last_update = now;
self.last_packet_size = None;
};
}
// Returns the timestamp to send a next packet.
pub fn next_time(&self) -> Instant {
self.next_time
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pacer_update() {
let datagram_size = 1200;
let max_burst = datagram_size * 10;
let pacing_rate = 100_000;
let mut p = Pacer::new(max_burst, pacing_rate, datagram_size);
let now = Instant::now();
// send 6000 (a half of max_burst) -> no timestamp change yet
p.send(6000, now);
// it may not be exactly same.
assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));
// send 6000 bytes -> max_burst filled
p.send(6000, now);
assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));
// Start of a new burst.
let now = now + Duration::from_millis(5);
// send 1000 bytes and next_time is updated
p.send(1000, now);
let interval = max_burst as f64 / pacing_rate as f64;
assert_eq!(p.next_time() - now, Duration::from_secs_f64(interval));
}
#[test]
fn pacer_idle() {
// same as pacer_update() but insert some idleness
// between two transfer, causing resetting
let datagram_size = 1200;
let max_burst = datagram_size * 10;
let pacing_rate = 100_000;
let mut p = Pacer::new(max_burst, pacing_rate, datagram_size);
let now = Instant::now();
// send 6000 (a half of max_burst) -> no timestamp change yet
p.send(6000, now);
// it may not be exactly same.
assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));
// sleep 200ms to reset the idle pacer (at least 120ms).
let now = now + Duration::from_millis(200);
// send 6000 bytes -> idle reset and a new burst started
p.send(6000, now);
assert_eq!(p.next_time(), now);
}
}