| // Copyright (C) 2020-2022, Cloudflare, Inc. |
| // All rights reserved. |
| // |
| // Redistribution and use in source and binary forms, with or without |
| // modification, are permitted provided that the following conditions are |
| // met: |
| // |
| // * Redistributions of source code must retain the above copyright notice, |
| // this list of conditions and the following disclaimer. |
| // |
| // * Redistributions in binary form must reproduce the above copyright |
| // notice, this list of conditions and the following disclaimer in the |
| // documentation and/or other materials provided with the distribution. |
| // |
| // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
| // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, |
| // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR |
| // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
| // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
| // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
| // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
| // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| //! Delivery rate estimation. |
| //! |
| //! This implements the algorithm for estimating delivery rate as described in |
| //! <https://tools.ietf.org/html/draft-cheng-iccrg-delivery-rate-estimation-01> |
| |
| use std::time::Duration; |
| use std::time::Instant; |
| |
| use crate::recovery::Acked; |
| use crate::recovery::Sent; |
| |
| #[derive(Debug)] |
| pub struct Rate { |
| delivered: usize, |
| |
| delivered_time: Instant, |
| |
| first_sent_time: Instant, |
| |
| // Packet number of the last sent packet with app limited. |
| end_of_app_limited: u64, |
| |
| // Packet number of the last sent packet. |
| last_sent_packet: u64, |
| |
| // Packet number of the largest acked packet. |
| largest_acked: u64, |
| |
| // Sample of rate estimation. |
| rate_sample: RateSample, |
| } |
| |
| impl Default for Rate { |
| fn default() -> Self { |
| let now = Instant::now(); |
| |
| Rate { |
| delivered: 0, |
| |
| delivered_time: now, |
| |
| first_sent_time: now, |
| |
| end_of_app_limited: 0, |
| |
| last_sent_packet: 0, |
| |
| largest_acked: 0, |
| |
| rate_sample: RateSample::default(), |
| } |
| } |
| } |
| |
| impl Rate { |
| pub fn on_packet_sent( |
| &mut self, pkt: &mut Sent, bytes_in_flight: usize, now: Instant, |
| ) { |
| // No packets in flight yet? |
| if bytes_in_flight == 0 { |
| self.first_sent_time = now; |
| self.delivered_time = now; |
| } |
| |
| pkt.first_sent_time = self.first_sent_time; |
| pkt.delivered_time = self.delivered_time; |
| pkt.delivered = self.delivered; |
| pkt.is_app_limited = self.app_limited(); |
| |
| self.last_sent_packet = pkt.pkt_num; |
| } |
| |
| // Update the delivery rate sample when a packet is acked. |
| pub fn update_rate_sample(&mut self, pkt: &Acked, now: Instant) { |
| self.delivered += pkt.size; |
| self.delivered_time = now; |
| |
| // Update info using the newest packet. If rate_sample is not yet |
| // initialized, initialize with the first packet. |
| if self.rate_sample.prior_time.is_none() || |
| pkt.delivered > self.rate_sample.prior_delivered |
| { |
| self.rate_sample.prior_delivered = pkt.delivered; |
| self.rate_sample.prior_time = Some(pkt.delivered_time); |
| self.rate_sample.is_app_limited = pkt.is_app_limited; |
| self.rate_sample.send_elapsed = |
| pkt.time_sent.saturating_duration_since(pkt.first_sent_time); |
| self.rate_sample.rtt = pkt.rtt; |
| self.rate_sample.ack_elapsed = self |
| .delivered_time |
| .saturating_duration_since(pkt.delivered_time); |
| |
| self.first_sent_time = pkt.time_sent; |
| } |
| |
| self.largest_acked = self.largest_acked.max(pkt.pkt_num); |
| } |
| |
| pub fn generate_rate_sample(&mut self, min_rtt: Duration) { |
| // End app-limited phase if bubble is ACKed and gone. |
| if self.app_limited() && self.largest_acked > self.end_of_app_limited { |
| self.update_app_limited(false); |
| } |
| |
| if self.rate_sample.prior_time.is_some() { |
| let interval = self |
| .rate_sample |
| .send_elapsed |
| .max(self.rate_sample.ack_elapsed); |
| |
| self.rate_sample.delivered = |
| self.delivered - self.rate_sample.prior_delivered; |
| self.rate_sample.interval = interval; |
| |
| if interval < min_rtt { |
| self.rate_sample.interval = Duration::ZERO; |
| |
| // No reliable sample. |
| return; |
| } |
| |
| if !interval.is_zero() { |
| // Fill in rate_sample with a rate sample. |
| self.rate_sample.delivery_rate = |
| (self.rate_sample.delivered as f64 / interval.as_secs_f64()) |
| as u64; |
| } |
| } |
| } |
| |
| pub fn update_app_limited(&mut self, v: bool) { |
| self.end_of_app_limited = if v { self.last_sent_packet.max(1) } else { 0 } |
| } |
| |
| pub fn app_limited(&mut self) -> bool { |
| self.end_of_app_limited != 0 |
| } |
| |
| pub fn delivered(&self) -> usize { |
| self.delivered |
| } |
| |
| pub fn sample_delivery_rate(&self) -> u64 { |
| self.rate_sample.delivery_rate |
| } |
| |
| pub fn sample_rtt(&self) -> Duration { |
| self.rate_sample.rtt |
| } |
| |
| pub fn sample_is_app_limited(&self) -> bool { |
| self.rate_sample.is_app_limited |
| } |
| } |
| |
| #[derive(Default, Debug)] |
| struct RateSample { |
| delivery_rate: u64, |
| |
| is_app_limited: bool, |
| |
| interval: Duration, |
| |
| delivered: usize, |
| |
| prior_delivered: usize, |
| |
| prior_time: Option<Instant>, |
| |
| send_elapsed: Duration, |
| |
| ack_elapsed: Duration, |
| |
| rtt: Duration, |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| use crate::recovery::*; |
| |
| #[test] |
| fn rate_check() { |
| let config = Config::new(0xbabababa).unwrap(); |
| let mut r = Recovery::new(&config); |
| |
| let now = Instant::now(); |
| let mss = r.max_datagram_size(); |
| |
| // Send 2 packets. |
| for pn in 0..2 { |
| let pkt = Sent { |
| pkt_num: pn, |
| frames: vec![], |
| time_sent: now, |
| time_acked: None, |
| time_lost: None, |
| size: mss, |
| ack_eliciting: true, |
| in_flight: true, |
| delivered: 0, |
| delivered_time: now, |
| first_sent_time: now, |
| is_app_limited: false, |
| has_data: false, |
| }; |
| |
| r.on_packet_sent( |
| pkt, |
| packet::EPOCH_APPLICATION, |
| HandshakeStatus::default(), |
| now, |
| "", |
| ); |
| } |
| |
| let rtt = Duration::from_millis(50); |
| let now = now + rtt; |
| |
| // Ack 2 packets. |
| for pn in 0..2 { |
| let acked = Acked { |
| pkt_num: pn, |
| time_sent: now, |
| size: mss, |
| rtt, |
| delivered: 0, |
| delivered_time: now, |
| first_sent_time: now - rtt, |
| is_app_limited: false, |
| }; |
| |
| r.delivery_rate.update_rate_sample(&acked, now); |
| } |
| |
| // Update rate sample after 1 rtt. |
| r.delivery_rate.generate_rate_sample(rtt); |
| |
| // Bytes acked so far. |
| assert_eq!(r.delivery_rate.delivered(), 2400); |
| |
| // Estimated delivery rate = (1200 x 2) / 0.05s = 48000. |
| assert_eq!(r.delivery_rate(), 48000); |
| } |
| |
| #[test] |
| fn app_limited_cwnd_full() { |
| let config = Config::new(0xbabababa).unwrap(); |
| let mut r = Recovery::new(&config); |
| |
| let now = Instant::now(); |
| let mss = r.max_datagram_size(); |
| |
| // Send 10 packets to fill cwnd. |
| for pn in 0..10 { |
| let pkt = Sent { |
| pkt_num: pn, |
| frames: vec![], |
| time_sent: now, |
| time_acked: None, |
| time_lost: None, |
| size: mss, |
| ack_eliciting: true, |
| in_flight: true, |
| delivered: 0, |
| delivered_time: now, |
| first_sent_time: now, |
| is_app_limited: false, |
| has_data: false, |
| }; |
| |
| r.on_packet_sent( |
| pkt, |
| packet::EPOCH_APPLICATION, |
| HandshakeStatus::default(), |
| now, |
| "", |
| ); |
| } |
| |
| assert_eq!(r.app_limited(), false); |
| assert_eq!(r.delivery_rate.sample_is_app_limited(), false); |
| } |
| |
| #[test] |
| fn app_limited_check() { |
| let config = Config::new(0xbabababa).unwrap(); |
| let mut r = Recovery::new(&config); |
| |
| let now = Instant::now(); |
| let mss = r.max_datagram_size(); |
| |
| // Send 5 packets. |
| for pn in 0..5 { |
| let pkt = Sent { |
| pkt_num: pn, |
| frames: vec![], |
| time_sent: now, |
| time_acked: None, |
| time_lost: None, |
| size: mss, |
| ack_eliciting: true, |
| in_flight: true, |
| delivered: 0, |
| delivered_time: now, |
| first_sent_time: now, |
| is_app_limited: false, |
| has_data: false, |
| }; |
| |
| r.on_packet_sent( |
| pkt, |
| packet::EPOCH_APPLICATION, |
| HandshakeStatus::default(), |
| now, |
| "", |
| ); |
| } |
| |
| let rtt = Duration::from_millis(50); |
| let now = now + rtt; |
| |
| let mut acked = ranges::RangeSet::default(); |
| acked.insert(0..5); |
| |
| assert_eq!( |
| r.on_ack_received( |
| &acked, |
| 25, |
| packet::EPOCH_APPLICATION, |
| HandshakeStatus::default(), |
| now, |
| "", |
| ), |
| Ok(()), |
| ); |
| |
| assert_eq!(r.app_limited(), true); |
| // Rate sample is not app limited (all acked). |
| assert_eq!(r.delivery_rate.sample_is_app_limited(), false); |
| assert_eq!(r.delivery_rate.sample_rtt(), rtt); |
| } |
| } |