cc: fix hystart++ bugs and excessive cwnd growth
Current hystart++ implementation has a few issues:
- hystart.on_packet_acked() returns a new cwnd/ssthresh and
decide if it need to enter LSS. However during slow start
and congestion avoidance, it will confuse CC's ABC
based cwnd calculation, resulting higher cwnd growth
with HyStart++ during slow start and LSS episode.
(CUBIC and Reno)
- hystart LSS cwnd calculation is not ABC based,
can result in higher cwnd growth then expected (no
2 x ABC_L x MSS upper limit)
- Reno slow start: mixed use of bytes_acked in slow start
and congestion avoidance. It will result in miscalcuation
of cwnd.
Changes:
- hystart: remove on_packet_acked() and add try_enter_lss()
for checking if CC can enter LSS. cwnd/ssthresh will be
changed from the caller. Also added in_lss() for a
shortcut of checking the condition whether currently in
LSS. lss_cwnd() will provide a new cwnd in LSS.
- Separate bytes_acked for slow start (bytes_acked_sl)
and congestion avoidance (bytes_acked_ca for Reno).
CUBIC uses cwnd_inc)
- CUBIC: fix a bug where cwnd can grow more than 1 MSS
in congestion avoidance. cwnd_inc will be reset
after increasing 1 MSS for preventing cwnd keep increasing
when cwnd_inc > 2 MSS.
(This has a future work - need to follow up recent CUBIC
RFC update which address this problem better)
- Follows HyStart++ draft 03, especially around L based
cwnd growth limit:
https://tools.ietf.org/html/draft-balasubramanian-tcpm-hystartplusplus-03#section-4.2
diff --git a/src/recovery/cubic.rs b/src/recovery/cubic.rs
index ec323d7..adde769 100644
--- a/src/recovery/cubic.rs
+++ b/src/recovery/cubic.rs
@@ -170,35 +170,39 @@
if r.congestion_window < r.ssthresh {
// Slow start.
- let new_cwnd;
-
- if r.hystart.enabled() && epoch == packet::EPOCH_APPLICATION {
- let (cwnd, ssthresh) = r.hystart_on_packet_acked(packet, now);
-
- new_cwnd = cwnd;
- r.ssthresh = ssthresh;
- } else {
- // Reno Slow Start.
- new_cwnd = r.congestion_window + packet.size;
- }
-
let cwnd_inc = cmp::min(
- new_cwnd - r.congestion_window,
+ packet.size,
r.max_datagram_size * recovery::ABC_L -
- cmp::min(r.bytes_acked, r.max_datagram_size * recovery::ABC_L),
+ cmp::min(
+ r.bytes_acked_sl,
+ r.max_datagram_size * recovery::ABC_L,
+ ),
);
- r.bytes_acked += packet.size;
+ // In Slow slart, bytes_acked_sl is used for counting
+ // acknowledged bytes.
+ r.bytes_acked_sl += packet.size;
+
r.congestion_window += cwnd_inc;
+
+ if r.hystart.enabled() &&
+ epoch == packet::EPOCH_APPLICATION &&
+ r.hystart.try_enter_lss(
+ packet,
+ r.latest_rtt,
+ r.congestion_window,
+ now,
+ r.max_datagram_size,
+ )
+ {
+ r.ssthresh = r.congestion_window;
+ }
} else {
// Congestion avoidance.
let ca_start_time;
// In LSS, use lss_start_time instead of congestion_recovery_start_time.
- if r.hystart.enabled() &&
- epoch == packet::EPOCH_APPLICATION &&
- r.hystart.lss_start_time().is_some()
- {
+ if r.hystart.in_lss(epoch) {
ca_start_time = r.hystart.lss_start_time().unwrap();
// Reset w_max and k when LSS started.
@@ -244,11 +248,16 @@
// When in Limited Slow Start, take the max of CA cwnd and
// LSS cwnd.
- if r.hystart.enabled() &&
- epoch == packet::EPOCH_APPLICATION &&
- r.hystart.lss_start_time().is_some()
- {
- let (lss_cwnd, _) = r.hystart_on_packet_acked(packet, now);
+ if r.hystart.in_lss(epoch) {
+ let lss_cwnd = r.hystart.lss_cwnd(
+ packet.size,
+ r.bytes_acked_sl,
+ r.congestion_window,
+ r.ssthresh,
+ r.max_datagram_size,
+ );
+
+ r.bytes_acked_sl += packet.size;
cubic_cwnd = cmp::max(cubic_cwnd, lss_cwnd);
}
@@ -261,7 +270,7 @@
// the increase of cwnd to 1 max_datagram_size per cwnd acknowledged.
if r.cubic_state.cwnd_inc >= r.max_datagram_size {
r.congestion_window += r.max_datagram_size;
- r.cubic_state.cwnd_inc -= r.max_datagram_size;
+ r.cubic_state.cwnd_inc = 0;
}
}
}
@@ -297,7 +306,7 @@
r.cubic_state.cwnd_inc =
(r.cubic_state.cwnd_inc as f64 * BETA_CUBIC) as usize;
- if r.hystart.enabled() && epoch == packet::EPOCH_APPLICATION {
+ if r.hystart.in_lss(epoch) {
r.hystart.congestion_event();
}
}
@@ -581,13 +590,13 @@
assert_eq!(r.hystart.lss_start_time().is_some(), false);
// 2nd round.
- r.hystart.start_round(pkts_1st_round * 2 + 1);
+ r.hystart.start_round(pkts_1st_round * 2);
let mut rtt_2nd = 100;
let now = now + Duration::from_millis(rtt_2nd);
// Send 2nd round packets.
- for _ in 0..n_rtt_sample + 1 {
+ for _ in 0..n_rtt_sample {
r.on_packet_sent_cc(p.size, now);
}
@@ -595,7 +604,7 @@
// Last ack will cause to exit to LSS.
let mut cwnd_prev = r.cwnd();
- for _ in 0..n_rtt_sample + 1 {
+ for _ in 0..n_rtt_sample {
cwnd_prev = r.cwnd();
r.update_rtt(
Duration::from_millis(rtt_2nd),
@@ -617,10 +626,14 @@
// Now we are in LSS.
assert_eq!(r.hystart.lss_start_time().is_some(), true);
- assert_eq!(r.cwnd(), cwnd_prev);
+ assert_eq!(r.cwnd(), cwnd_prev + r.max_datagram_size);
- // Ack'ing more packet to increase cwnd by 1 MSS
- for _ in 0..3 {
+ // Send a full cwnd.
+ r.on_packet_sent_cc(r.cwnd(), now);
+
+ // Ack'ing 4 packets to increase cwnd by 1 MSS during LSS
+ cwnd_prev = r.cwnd();
+ for _ in 0..4 {
let acked = vec![Acked {
pkt_num: p.pkt_num,
time_sent: p.time_sent,
@@ -629,6 +642,7 @@
r.on_packets_acked(acked, epoch, now);
}
+ // During LSS cwnd will be increased less than usual slow start.
assert_eq!(r.cwnd(), cwnd_prev + r.max_datagram_size);
}
}
diff --git a/src/recovery/hystart.rs b/src/recovery/hystart.rs
index fc60e89..e599b61 100644
--- a/src/recovery/hystart.rs
+++ b/src/recovery/hystart.rs
@@ -28,12 +28,13 @@
//!
//! This implementation is based on the following I-D:
//!
-//! https://tools.ietf.org/html/draft-balasubramanian-tcpm-hystartplusplus-02
+//! https://tools.ietf.org/html/draft-balasubramanian-tcpm-hystartplusplus-03
use std::cmp;
use std::time::Duration;
use std::time::Instant;
+use crate::packet;
use crate::recovery;
/// Constants from I-D.
@@ -91,6 +92,12 @@
self.lss_start_time
}
+ pub fn in_lss(&self, epoch: packet::Epoch) -> bool {
+ self.enabled &&
+ epoch == packet::EPOCH_APPLICATION &&
+ self.lss_start_time().is_some()
+ }
+
pub fn start_round(&mut self, pkt_num: u64) {
if self.window_end.is_none() {
*self = Hystart {
@@ -109,18 +116,12 @@
}
}
- // Returns a new (ssthresh, cwnd) during slow start.
- pub fn on_packet_acked(
+ // Returns true if LSS started.
+ pub fn try_enter_lss(
&mut self, packet: &recovery::Acked, rtt: Duration, cwnd: usize,
- ssthresh: usize, now: Instant, max_datagram_size: usize,
- ) -> (usize, usize) {
- let mut ssthresh = ssthresh;
- let mut cwnd = cwnd;
-
+ now: Instant, max_datagram_size: usize,
+ ) -> bool {
if self.lss_start_time().is_none() {
- // Reno Slow Start.
- cwnd += packet.size;
-
if let Some(current_round_min_rtt) = self.current_round_min_rtt {
self.current_round_min_rtt =
Some(cmp::min(current_round_min_rtt, rtt));
@@ -147,8 +148,6 @@
if self.current_round_min_rtt.unwrap() >=
(self.last_round_min_rtt.unwrap() + rtt_thresh)
{
- ssthresh = cwnd;
-
self.lss_start_time = Some(now);
}
}
@@ -160,23 +159,29 @@
self.window_end = None;
}
}
- } else {
- // LSS (Limited Slow Start).
- let k = cwnd as f64 / (LSS_DIVISOR * ssthresh as f64);
-
- cwnd += (packet.size as f64 / k) as usize;
}
- (cwnd, ssthresh)
+ self.lss_start_time.is_some()
+ }
+
+ // Return a new cwnd during LSS (Limited Slow Start).
+ pub fn lss_cwnd(
+ &self, pkt_size: usize, bytes_acked: usize, cwnd: usize, ssthresh: usize,
+ max_datagram_size: usize,
+ ) -> usize {
+ let k = cwnd as f64 / (LSS_DIVISOR * ssthresh as f64);
+
+ cwnd + cmp::min(
+ pkt_size,
+ max_datagram_size * recovery::ABC_L -
+ cmp::min(bytes_acked, max_datagram_size * recovery::ABC_L),
+ ) / k as usize
}
// Exit HyStart++ when entering congestion avoidance.
pub fn congestion_event(&mut self) {
- if self.window_end.is_some() {
- self.window_end = None;
-
- self.lss_start_time = None;
- }
+ self.window_end = None;
+ self.lss_start_time = None;
}
}
@@ -196,124 +201,35 @@
}
#[test]
- fn reno_slow_start() {
- let mut hspp = Hystart::default();
- let pkt_num = 100;
- let size = 1000;
- let now = Instant::now();
+ fn lss_cwnd() {
+ let hspp = Hystart::default();
- hspp.start_round(pkt_num);
+ let datagram_size = 1200;
+ let mut cwnd = 24000;
+ let ssthresh = 24000;
- assert_eq!(hspp.window_end, Some(pkt_num));
+ let lss_cwnd =
+ hspp.lss_cwnd(datagram_size, 0, cwnd, ssthresh, datagram_size);
- let p = recovery::Acked {
- pkt_num,
- time_sent: now + Duration::from_millis(10),
- size,
- };
-
- let init_cwnd = 30000;
- let init_ssthresh = 1000000;
-
- let (cwnd, ssthresh) = hspp.on_packet_acked(
- &p,
- Duration::from_millis(10),
- init_cwnd,
- init_ssthresh,
- now,
- crate::MAX_SEND_UDP_PAYLOAD_SIZE,
+ assert_eq!(
+ cwnd + (datagram_size as f64 * LSS_DIVISOR) as usize,
+ lss_cwnd
);
- // Expecting Reno slow start.
- assert_eq!(hspp.lss_start_time().is_some(), false);
- assert_eq!((cwnd, ssthresh), (init_cwnd + size, init_ssthresh));
- }
+ cwnd = lss_cwnd;
- #[test]
- fn limited_slow_start() {
- let mut hspp = Hystart::default();
- let size = 1000;
- let now = Instant::now();
+ let lss_cwnd = hspp.lss_cwnd(
+ datagram_size,
+ datagram_size,
+ cwnd,
+ ssthresh,
+ datagram_size,
+ );
- // 1st round rtt = 50ms
- let rtt_1st = 50;
-
- // end of 1st round
- let pkt_1st = N_RTT_SAMPLE as u64;
-
- hspp.start_round(pkt_1st);
-
- assert_eq!(hspp.window_end, Some(pkt_1st));
-
- let (mut cwnd, mut ssthresh) = (30000, 1000000);
- let mut pkt_num = 0;
-
- // 1st round.
- for _ in 0..N_RTT_SAMPLE + 1 {
- let p = recovery::Acked {
- pkt_num,
- time_sent: now + Duration::from_millis(pkt_num),
- size,
- };
-
- // We use a fixed rtt for 1st round.
- let rtt = Duration::from_millis(rtt_1st);
-
- let (new_cwnd, new_ssthresh) = hspp.on_packet_acked(
- &p,
- rtt,
- cwnd,
- ssthresh,
- now,
- crate::MAX_SEND_UDP_PAYLOAD_SIZE,
- );
-
- cwnd = new_cwnd;
- ssthresh = new_ssthresh;
-
- pkt_num += 1;
- }
-
- // 2nd round. rtt = 100ms to trigger LSS.
- let rtt_2nd = 100;
-
- hspp.start_round(pkt_1st * 2 + 1);
-
- for _ in 0..N_RTT_SAMPLE + 1 {
- let p = recovery::Acked {
- pkt_num,
- time_sent: now + Duration::from_millis(pkt_num),
- size,
- };
-
- // Keep increasing rtt to simulate buffer queueing delay
- // This is to exit from slow slart to LSS.
- let rtt = Duration::from_millis(rtt_2nd + pkt_num * 4);
-
- let (new_cwnd, new_ssthresh) = hspp.on_packet_acked(
- &p,
- rtt,
- cwnd,
- ssthresh,
- now,
- crate::MAX_SEND_UDP_PAYLOAD_SIZE,
- );
-
- cwnd = new_cwnd;
- ssthresh = new_ssthresh;
-
- pkt_num += 1;
- }
-
- // At this point, cwnd exits to LSS mode.
- assert_eq!(hspp.lss_start_time().is_some(), true);
-
- // Check if current cwnd is in LSS.
- let cur_ssthresh = 47000;
- let k = cur_ssthresh as f64 / (LSS_DIVISOR * cur_ssthresh as f64);
- let lss_cwnd = cur_ssthresh as f64 + size as f64 / k;
-
- assert_eq!((cwnd, ssthresh), (lss_cwnd as usize, cur_ssthresh));
+ assert_eq!(
+ cwnd + (datagram_size as f64 * LSS_DIVISOR) as usize,
+ lss_cwnd
+ );
}
#[test]
diff --git a/src/recovery/mod.rs b/src/recovery/mod.rs
index a8e4138..92304c5 100644
--- a/src/recovery/mod.rs
+++ b/src/recovery/mod.rs
@@ -117,7 +117,9 @@
ssthresh: usize,
- bytes_acked: usize,
+ bytes_acked_sl: usize,
+
+ bytes_acked_ca: usize,
congestion_recovery_start_time: Option<Instant>,
@@ -179,7 +181,9 @@
ssthresh: std::usize::MAX,
- bytes_acked: 0,
+ bytes_acked_sl: 0,
+
+ bytes_acked_ca: 0,
congestion_recovery_start_time: None,
@@ -351,8 +355,6 @@
self.drain_packets(epoch);
- trace!("{} {:?}", trace_id, self);
-
Ok(())
}
@@ -718,9 +720,7 @@
(self.cc_ops.on_packet_acked)(self, &pkt, epoch, now);
}
- if self.congestion_window < self.ssthresh {
- self.bytes_acked = 0;
- }
+ self.bytes_acked_sl = 0;
}
fn in_congestion_recovery(&self, sent_time: Instant) -> bool {
@@ -768,19 +768,6 @@
}
}
- fn hystart_on_packet_acked(
- &mut self, packet: &Acked, now: Instant,
- ) -> (usize, usize) {
- self.hystart.on_packet_acked(
- packet,
- self.latest_rtt,
- self.congestion_window,
- self.ssthresh,
- now,
- self.max_datagram_size,
- )
- }
-
pub fn update_app_limited(&mut self, v: bool) {
self.app_limited = v;
}
diff --git a/src/recovery/reno.rs b/src/recovery/reno.rs
index a42937c..f7397e4 100644
--- a/src/recovery/reno.rs
+++ b/src/recovery/reno.rs
@@ -64,48 +64,61 @@
if r.congestion_window < r.ssthresh {
// Slow start.
- let new_cwnd;
-
- if r.hystart.enabled() && epoch == packet::EPOCH_APPLICATION {
- let (cwnd, ssthresh) = r.hystart_on_packet_acked(packet, now);
-
- new_cwnd = cwnd;
- r.ssthresh = ssthresh;
- } else {
- new_cwnd = r.congestion_window + packet.size;
- }
-
let cwnd_inc = cmp::min(
- new_cwnd - r.congestion_window,
+ packet.size,
r.max_datagram_size * recovery::ABC_L -
- cmp::min(r.bytes_acked, r.max_datagram_size * recovery::ABC_L),
+ cmp::min(
+ r.bytes_acked_sl,
+ r.max_datagram_size * recovery::ABC_L,
+ ),
);
- r.bytes_acked += packet.size;
+ // In Slow slart, bytes_acked_sl is used for counting
+ // acknowledged bytes.
+ r.bytes_acked_sl += packet.size;
+
r.congestion_window += cwnd_inc;
+
+ if r.hystart.enabled() &&
+ epoch == packet::EPOCH_APPLICATION &&
+ r.hystart.try_enter_lss(
+ packet,
+ r.latest_rtt,
+ r.congestion_window,
+ now,
+ r.max_datagram_size,
+ )
+ {
+ r.ssthresh = r.congestion_window;
+ }
} else {
// Congestion avoidance.
let mut reno_cwnd = r.congestion_window;
- r.bytes_acked += packet.size;
+ r.bytes_acked_ca += packet.size;
- if r.bytes_acked >= r.congestion_window {
- r.bytes_acked -= r.congestion_window;
+ if r.bytes_acked_ca >= r.congestion_window {
+ r.bytes_acked_ca -= r.congestion_window;
reno_cwnd += r.max_datagram_size;
}
// When in Limited Slow Start, take the max of CA cwnd and
// LSS cwnd.
- if r.hystart.enabled() &&
- epoch == packet::EPOCH_APPLICATION &&
- r.hystart.lss_start_time().is_some()
- {
- let (lss_cwnd, _) = r.hystart_on_packet_acked(packet, now);
+ if r.hystart.in_lss(epoch) {
+ let lss_cwnd = r.hystart.lss_cwnd(
+ packet.size,
+ r.bytes_acked_sl,
+ r.congestion_window,
+ r.ssthresh,
+ r.max_datagram_size,
+ );
- reno_cwnd = cmp::max(reno_cwnd, lss_cwnd);
+ r.bytes_acked_sl += packet.size;
+
+ r.congestion_window = cmp::max(reno_cwnd, lss_cwnd);
+ } else {
+ r.congestion_window = reno_cwnd;
}
-
- r.congestion_window = reno_cwnd;
}
}
@@ -126,12 +139,12 @@
r.max_datagram_size * recovery::MINIMUM_WINDOW_PACKETS,
);
- r.bytes_acked = (r.congestion_window as f64 *
+ r.bytes_acked_ca = (r.congestion_window as f64 *
recovery::LOSS_REDUCTION_FACTOR) as usize;
r.ssthresh = r.congestion_window;
- if r.hystart.enabled() && epoch == packet::EPOCH_APPLICATION {
+ if r.hystart.in_lss(epoch) {
r.hystart.congestion_event();
}
}
@@ -139,7 +152,8 @@
pub fn collapse_cwnd(r: &mut Recovery) {
r.congestion_window = r.max_datagram_size * recovery::MINIMUM_WINDOW_PACKETS;
- r.bytes_acked = 0;
+ r.bytes_acked_sl = 0;
+ r.bytes_acked_ca = 0;
}
#[cfg(test)]