| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! TCP state machine per [RFC 793](https://tools.ietf.org/html/rfc793). |
| // Note: All RFC quotes (with two extra spaces at the beginning of each line) in |
| // this file are from https://tools.ietf.org/html/rfc793#section-3.9 if not |
| // specified otherwise. |
| |
| use core::{ |
| convert::{Infallible, TryFrom as _}, |
| fmt::Debug, |
| num::{NonZeroU32, NonZeroU8, NonZeroUsize, TryFromIntError}, |
| time::Duration, |
| }; |
| |
| use assert_matches::assert_matches; |
| use const_unwrap::const_unwrap_option; |
| use derivative::Derivative; |
| use explicit::ResultExt as _; |
| use packet_formats::utils::NonZeroDuration; |
| use replace_with::{replace_with, replace_with_and}; |
| |
| use crate::{ |
| ip::icmp::IcmpErrorCode, |
| transport::tcp::{ |
| buffer::{Assembler, BufferLimits, IntoBuffers, ReceiveBuffer, SendBuffer, SendPayload}, |
| congestion::CongestionControl, |
| rtt::Estimator, |
| segment::{Options, Payload, Segment}, |
| seqnum::{SeqNum, UnscaledWindowSize, WindowScale, WindowSize}, |
| BufferSizes, ConnectionError, Control, KeepAlive, Mss, OptionalBufferSizes, SocketOptions, |
| TcpCountersInner, |
| }, |
| Instant, |
| }; |
| |
| /// Per RFC 793 (https://tools.ietf.org/html/rfc793#page-81): |
| /// MSL |
| /// Maximum Segment Lifetime, the time a TCP segment can exist in |
| /// the internetwork system. Arbitrarily defined to be 2 minutes. |
| pub(super) const MSL: Duration = Duration::from_secs(2 * 60); |
| // TODO(https://fxbug.dev/42069155): With the current usage of netstack3 on mostly |
| // link-local workloads, these values are large enough to accommodate most cases |
| // so it can help us detect failure faster. We should make them agree with other |
| // common implementations once we can configure them through socket options. |
| const DEFAULT_MAX_RETRIES: NonZeroU8 = const_unwrap_option(NonZeroU8::new(12)); |
| const DEFAULT_USER_TIMEOUT: Duration = Duration::from_secs(60 * 2); |
| |
| /// Default maximum SYN's to send before giving up an attempt to connect. |
| // TODO(https://fxbug.dev/42077087): Make these constants configurable. |
| pub(super) const DEFAULT_MAX_SYN_RETRIES: NonZeroU8 = const_unwrap_option(NonZeroU8::new(6)); |
| const DEFAULT_MAX_SYNACK_RETRIES: NonZeroU8 = const_unwrap_option(NonZeroU8::new(5)); |
| |
| /// Per RFC 9293 (https://tools.ietf.org/html/rfc9293#section-3.8.6.3): |
| /// ... in particular, the delay MUST be less than 0.5 seconds. |
| const ACK_DELAY_THRESHOLD: Duration = Duration::from_millis(500); |
| /// Per RFC 9293 Section 3.8.6.2.1: |
| /// ... The override timeout should be in the range 0.1 - 1.0 seconds. |
| /// Note that we pick the lower end of the range because this case should be |
| /// rare and the installing a timer itself represents a high probability of |
| /// receiver having reduced its window so that our MAX(SND.WND) is an |
| /// overestimation, so we choose the value to avoid unnecessary delay. |
| const SWS_PROBE_TIMEOUT: Duration = Duration::from_millis(100); |
| /// Per RFC 9293 Section 3.8.6.2.2 and 3.8.6.2.1: |
| /// where Fr is a fraction whose recommended value is 1/2, |
| /// Note that we use the inverse since we want to avoid floating point. |
| const SWS_BUFFER_FACTOR: u32 = 2; |
| |
| /// A helper trait for duration socket options that use 0 to indicate default. |
| trait NonZeroDurationOptionExt { |
| fn get_or_default(&self, default: Duration) -> Duration; |
| } |
| |
| impl NonZeroDurationOptionExt for Option<NonZeroDuration> { |
| fn get_or_default(&self, default: Duration) -> Duration { |
| self.map(NonZeroDuration::get).unwrap_or(default) |
| } |
| } |
| |
| /// Per RFC 793 (https://tools.ietf.org/html/rfc793#page-22): |
| /// |
| /// CLOSED - represents no connection state at all. |
| /// |
| /// Allowed operations: |
| /// - listen |
| /// - connect |
| /// Disallowed operations: |
| /// - send |
| /// - recv |
| /// - shutdown |
| /// - accept |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| pub struct Closed<Error> { |
| /// Describes a reason why the connection was closed. |
| pub(crate) reason: Error, |
| } |
| |
| /// An uninhabited type used together with [`Closed`] to sugest that it is in |
| /// initial condition and no errors have occurred yet. |
| pub(crate) enum Initial {} |
| |
| impl Closed<Initial> { |
| /// Corresponds to the [OPEN](https://tools.ietf.org/html/rfc793#page-54) |
| /// user call. |
| /// |
| /// `iss`is The initial send sequence number. Which is effectively the |
| /// sequence number of SYN. |
| pub(crate) fn connect<I: Instant, ActiveOpen>( |
| iss: SeqNum, |
| now: I, |
| active_open: ActiveOpen, |
| buffer_sizes: BufferSizes, |
| device_mss: Mss, |
| default_mss: Mss, |
| SocketOptions { |
| keep_alive: _, |
| nagle_enabled: _, |
| user_timeout, |
| delayed_ack: _, |
| fin_wait2_timeout: _, |
| max_syn_retries, |
| }: &SocketOptions, |
| ) -> (SynSent<I, ActiveOpen>, Segment<()>) { |
| let user_timeout = user_timeout.get_or_default(DEFAULT_USER_TIMEOUT); |
| let rcv_wnd_scale = buffer_sizes.rwnd().scale(); |
| // RFC 7323 Section 2.2: |
| // The window field in a segment where the SYN bit is set (i.e., a |
| // <SYN> or <SYN,ACK>) MUST NOT be scaled. |
| let rwnd = buffer_sizes.rwnd_unscaled(); |
| ( |
| SynSent { |
| iss, |
| timestamp: Some(now), |
| retrans_timer: RetransTimer::new( |
| now, |
| Estimator::RTO_INIT, |
| user_timeout, |
| *max_syn_retries, |
| ), |
| active_open, |
| buffer_sizes, |
| device_mss, |
| default_mss, |
| rcv_wnd_scale, |
| }, |
| Segment::syn( |
| iss, |
| rwnd, |
| Options { mss: Some(device_mss), window_scale: Some(rcv_wnd_scale) }, |
| ), |
| ) |
| } |
| |
| pub(crate) fn listen( |
| iss: SeqNum, |
| buffer_sizes: BufferSizes, |
| device_mss: Mss, |
| default_mss: Mss, |
| user_timeout: Option<NonZeroDuration>, |
| ) -> Listen { |
| Listen { iss, buffer_sizes, device_mss, default_mss, user_timeout } |
| } |
| } |
| |
| impl<Error> Closed<Error> { |
| /// Processes an incoming segment in the CLOSED state. |
| /// |
| /// TCP will either drop the incoming segment or generate a RST. |
| pub(crate) fn on_segment( |
| &self, |
| Segment { seq: seg_seq, ack: seg_ack, wnd: _, contents, options: _ }: Segment<impl Payload>, |
| ) -> Option<Segment<()>> { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-65): |
| // If the state is CLOSED (i.e., TCB does not exist) then |
| // all data in the incoming segment is discarded. An incoming |
| // segment containing a RST is discarded. An incoming segment |
| // not containing a RST causes a RST to be sent in response. |
| // The acknowledgment and sequence field values are selected to |
| // make the reset sequence acceptable to the TCP that sent the |
| // offending segment. |
| // If the ACK bit is off, sequence number zero is used, |
| // <SEQ=0><ACK=SEG.SEQ+SEG.LEN><CTL=RST,ACK> |
| // If the ACK bit is on, |
| // <SEQ=SEG.ACK><CTL=RST> |
| // Return. |
| if contents.control() == Some(Control::RST) { |
| return None; |
| } |
| Some(match seg_ack { |
| Some(seg_ack) => Segment::rst(seg_ack), |
| None => Segment::rst_ack(SeqNum::from(0), seg_seq + contents.len()), |
| }) |
| } |
| } |
| |
| /// Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21): |
| /// |
| /// LISTEN - represents waiting for a connection request from any remote |
| /// TCP and port. |
| /// |
| /// Allowed operations: |
| /// - send (queued until connection is established) |
| /// - recv (queued until connection is established) |
| /// - connect |
| /// - shutdown |
| /// - accept |
| /// Disallowed operations: |
| /// - listen |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| pub struct Listen { |
| iss: SeqNum, |
| buffer_sizes: BufferSizes, |
| device_mss: Mss, |
| default_mss: Mss, |
| user_timeout: Option<NonZeroDuration>, |
| } |
| |
| /// Dispositions of [`Listen::on_segment`]. |
| #[cfg_attr(test, derive(Debug, PartialEq, Eq))] |
| enum ListenOnSegmentDisposition<I: Instant> { |
| SendSynAckAndEnterSynRcvd(Segment<()>, SynRcvd<I, Infallible>), |
| SendRst(Segment<()>), |
| Ignore, |
| } |
| |
| impl Listen { |
| fn on_segment<I: Instant>( |
| &self, |
| Segment { seq, ack, wnd: _, contents, options }: Segment<impl Payload>, |
| now: I, |
| ) -> ListenOnSegmentDisposition<I> { |
| let Listen { iss, buffer_sizes, device_mss, default_mss, user_timeout } = *self; |
| let smss = options.mss.unwrap_or(default_mss).min(device_mss); |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-65): |
| // first check for an RST |
| // An incoming RST should be ignored. Return. |
| if contents.control() == Some(Control::RST) { |
| return ListenOnSegmentDisposition::Ignore; |
| } |
| if let Some(ack) = ack { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-65): |
| // second check for an ACK |
| // Any acknowledgment is bad if it arrives on a connection still in |
| // the LISTEN state. An acceptable reset segment should be formed |
| // for any arriving ACK-bearing segment. The RST should be |
| // formatted as follows: |
| // <SEQ=SEG.ACK><CTL=RST> |
| // Return. |
| return ListenOnSegmentDisposition::SendRst(Segment::rst(ack)); |
| } |
| if contents.control() == Some(Control::SYN) { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-65): |
| // third check for a SYN |
| // Set RCV.NXT to SEG.SEQ+1, IRS is set to SEG.SEQ and any other |
| // control or text should be queued for processing later. ISS |
| // should be selected and a SYN segment sent of the form: |
| // <SEQ=ISS><ACK=RCV.NXT><CTL=SYN,ACK> |
| // SND.NXT is set to ISS+1 and SND.UNA to ISS. The connection |
| // state should be changed to SYN-RECEIVED. Note that any other |
| // incoming control or data (combined with SYN) will be processed |
| // in the SYN-RECEIVED state, but processing of SYN and ACK should |
| // not be repeated. |
| // Note: We don't support data being tranmistted in this state, so |
| // there is no need to store these the RCV and SND variables. |
| let user_timeout = user_timeout.get_or_default(DEFAULT_USER_TIMEOUT); |
| let rcv_wnd_scale = buffer_sizes.rwnd().scale(); |
| // RFC 7323 Section 2.2: |
| // The window field in a segment where the SYN bit is set (i.e., a |
| // <SYN> or <SYN,ACK>) MUST NOT be scaled. |
| let rwnd = buffer_sizes.rwnd_unscaled(); |
| return ListenOnSegmentDisposition::SendSynAckAndEnterSynRcvd( |
| Segment::syn_ack( |
| iss, |
| seq + 1, |
| rwnd, |
| Options { |
| mss: Some(smss), |
| // Per RFC 7323 Section 2.3: |
| // If a TCP receives a <SYN> segment containing a |
| // Window Scale option, it SHOULD send its own Window |
| // Scale option in the <SYN,ACK> segment. |
| window_scale: options.window_scale.map(|_| rcv_wnd_scale), |
| }, |
| ), |
| SynRcvd { |
| iss, |
| irs: seq, |
| timestamp: Some(now), |
| retrans_timer: RetransTimer::new( |
| now, |
| Estimator::RTO_INIT, |
| user_timeout, |
| DEFAULT_MAX_SYNACK_RETRIES, |
| ), |
| simultaneous_open: None, |
| buffer_sizes, |
| smss, |
| rcv_wnd_scale, |
| snd_wnd_scale: options.window_scale, |
| }, |
| ); |
| } |
| ListenOnSegmentDisposition::Ignore |
| } |
| } |
| |
| /// Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21): |
| /// |
| /// SYN-SENT - represents waiting for a matching connection request |
| /// after having sent a connection request. |
| /// |
| /// Allowed operations: |
| /// - send (queued until connection is established) |
| /// - recv (queued until connection is established) |
| /// - shutdown |
| /// Disallowed operations: |
| /// - listen |
| /// - accept |
| /// - connect |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| pub struct SynSent<I, ActiveOpen> { |
| iss: SeqNum, |
| // The timestamp when the SYN segment was sent. A `None` here means that |
| // the SYN segment was retransmitted so that it can't be used to estimate |
| // RTT. |
| timestamp: Option<I>, |
| retrans_timer: RetransTimer<I>, |
| active_open: ActiveOpen, |
| buffer_sizes: BufferSizes, |
| device_mss: Mss, |
| default_mss: Mss, |
| rcv_wnd_scale: WindowScale, |
| } |
| |
| /// Dispositions of [`SynSent::on_segment`]. |
| #[cfg_attr(test, derive(Debug, PartialEq, Eq))] |
| enum SynSentOnSegmentDisposition<I: Instant, ActiveOpen> { |
| SendAckAndEnterEstablished(Established<I, (), ()>), |
| SendSynAckAndEnterSynRcvd(Segment<()>, SynRcvd<I, ActiveOpen>), |
| SendRst(Segment<()>), |
| EnterClosed(Closed<Option<ConnectionError>>), |
| Ignore, |
| } |
| |
| impl<I: Instant + 'static, ActiveOpen> SynSent<I, ActiveOpen> { |
| /// Processes an incoming segment in the SYN-SENT state. |
| /// |
| /// Transitions to ESTABLSHED if the incoming segment is a proper SYN-ACK. |
| /// Transitions to SYN-RCVD if the incoming segment is a SYN. Otherwise, |
| /// the segment is dropped or an RST is generated. |
| fn on_segment( |
| &self, |
| Segment { seq: seg_seq, ack: seg_ack, wnd: seg_wnd, contents, options }: Segment< |
| impl Payload, |
| >, |
| now: I, |
| ) -> SynSentOnSegmentDisposition<I, ActiveOpen> { |
| let SynSent { |
| iss, |
| timestamp: syn_sent_ts, |
| retrans_timer: RetransTimer { user_timeout_until, remaining_retries: _, at: _, rto: _ }, |
| active_open: _, |
| buffer_sizes, |
| device_mss, |
| default_mss, |
| rcv_wnd_scale, |
| } = *self; |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-65): |
| // first check the ACK bit |
| // If the ACK bit is set |
| // If SEG.ACK =< ISS, or SEG.ACK > SND.NXT, send a reset (unless |
| // the RST bit is set, if so drop the segment and return) |
| // <SEQ=SEG.ACK><CTL=RST> |
| // and discard the segment. Return. |
| // If SND.UNA =< SEG.ACK =< SND.NXT then the ACK is acceptable. |
| let has_ack = match seg_ack { |
| Some(ack) => { |
| // In our implementation, because we don't carry data in our |
| // initial SYN segment, SND.UNA == ISS, SND.NXT == ISS+1. |
| if ack.before(iss) || ack.after(iss + 1) { |
| return if contents.control() == Some(Control::RST) { |
| SynSentOnSegmentDisposition::Ignore |
| } else { |
| SynSentOnSegmentDisposition::SendRst(Segment::rst(ack)) |
| }; |
| } |
| true |
| } |
| None => false, |
| }; |
| |
| match contents.control() { |
| Some(Control::RST) => { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-67): |
| // second check the RST bit |
| // If the RST bit is set |
| // If the ACK was acceptable then signal the user "error: |
| // connection reset", drop the segment, enter CLOSED state, |
| // delete TCB, and return. Otherwise (no ACK) drop the |
| // segment and return. |
| if has_ack { |
| SynSentOnSegmentDisposition::EnterClosed(Closed { |
| reason: Some(ConnectionError::ConnectionReset), |
| }) |
| } else { |
| SynSentOnSegmentDisposition::Ignore |
| } |
| } |
| Some(Control::SYN) => { |
| let smss = options.mss.unwrap_or(default_mss).min(device_mss); |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-67): |
| // fourth check the SYN bit |
| // This step should be reached only if the ACK is ok, or there |
| // is no ACK, and it [sic] the segment did not contain a RST. |
| match seg_ack { |
| Some(seg_ack) => { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-67): |
| // If the SYN bit is on and the security/compartment |
| // and precedence are acceptable then, RCV.NXT is set |
| // to SEG.SEQ+1, IRS is set to SEG.SEQ. SND.UNA |
| // should be advanced to equal SEG.ACK (if there is an |
| // ACK), and any segments on the retransmission queue |
| // which are thereby acknowledged should be removed. |
| |
| // If SND.UNA > ISS (our SYN has been ACKed), change |
| // the connection state to ESTABLISHED, form an ACK |
| // segment |
| // <SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK> |
| // and send it. Data or controls which were queued |
| // for transmission may be included. If there are |
| // other controls or text in the segment then |
| // continue processing at the sixth step below where |
| // the URG bit is checked, otherwise return. |
| if seg_ack.after(iss) { |
| let irs = seg_seq; |
| let mut rtt_estimator = Estimator::default(); |
| if let Some(syn_sent_ts) = syn_sent_ts { |
| rtt_estimator.sample(now.duration_since(syn_sent_ts)); |
| } |
| let (rcv_wnd_scale, snd_wnd_scale) = options |
| .window_scale |
| .map(|snd_wnd_scale| (rcv_wnd_scale, snd_wnd_scale)) |
| .unwrap_or_default(); |
| let established = Established { |
| snd: Send { |
| nxt: iss + 1, |
| max: iss + 1, |
| una: seg_ack, |
| // This segment has a SYN, do not scale. |
| wnd: seg_wnd << WindowScale::default(), |
| wl1: seg_seq, |
| wl2: seg_ack, |
| buffer: (), |
| last_seq_ts: None, |
| rtt_estimator, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(smss), |
| wnd_scale: snd_wnd_scale, |
| wnd_max: seg_wnd << WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: (), |
| assembler: Assembler::new(irs + 1), |
| timer: None, |
| mss: smss, |
| wnd_scale: rcv_wnd_scale, |
| last_window_update: (irs + 1, buffer_sizes.rwnd()), |
| }, |
| }; |
| SynSentOnSegmentDisposition::SendAckAndEnterEstablished(established) |
| } else { |
| SynSentOnSegmentDisposition::Ignore |
| } |
| } |
| None => { |
| if now < user_timeout_until { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-68): |
| // Otherwise enter SYN-RECEIVED, form a SYN,ACK |
| // segment |
| // <SEQ=ISS><ACK=RCV.NXT><CTL=SYN,ACK> |
| // and send it. If there are other controls or text |
| // in the segment, queue them for processing after the |
| // ESTABLISHED state has been reached, return. |
| let rcv_wnd_scale = buffer_sizes.rwnd().scale(); |
| // RFC 7323 Section 2.2: |
| // The window field in a segment where the SYN bit |
| // is set (i.e., a <SYN> or <SYN,ACK>) MUST NOT be |
| // scaled. |
| let rwnd = buffer_sizes.rwnd_unscaled(); |
| SynSentOnSegmentDisposition::SendSynAckAndEnterSynRcvd( |
| Segment::syn_ack( |
| iss, |
| seg_seq + 1, |
| rwnd, |
| Options { |
| mss: Some(smss), |
| window_scale: options.window_scale.map(|_| rcv_wnd_scale), |
| }, |
| ), |
| SynRcvd { |
| iss, |
| irs: seg_seq, |
| timestamp: Some(now), |
| retrans_timer: RetransTimer::new( |
| now, |
| Estimator::RTO_INIT, |
| user_timeout_until.duration_since(now), |
| DEFAULT_MAX_SYNACK_RETRIES, |
| ), |
| // This should be set to active_open by the caller: |
| simultaneous_open: None, |
| buffer_sizes, |
| smss, |
| rcv_wnd_scale, |
| snd_wnd_scale: options.window_scale, |
| }, |
| ) |
| } else { |
| SynSentOnSegmentDisposition::EnterClosed(Closed { reason: None }) |
| } |
| } |
| } |
| } |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-68): |
| // fifth, if neither of the SYN or RST bits is set then drop the |
| // segment and return. |
| Some(Control::FIN) | None => SynSentOnSegmentDisposition::Ignore, |
| } |
| } |
| } |
| |
| /// Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21): |
| /// |
| /// SYN-RECEIVED - represents waiting for a confirming connection |
| /// request acknowledgment after having both received and sent a |
| /// connection request. |
| /// |
| /// Allowed operations: |
| /// - send (queued until connection is established) |
| /// - recv (queued until connection is established) |
| /// - shutdown |
| /// Disallowed operations: |
| /// - listen |
| /// - accept |
| /// - connect |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| pub struct SynRcvd<I, ActiveOpen> { |
| iss: SeqNum, |
| irs: SeqNum, |
| /// The timestamp when the SYN segment was received, and consequently, our |
| /// SYN-ACK segment was sent. A `None` here means that the SYN-ACK segment |
| /// was retransmitted so that it can't be used to estimate RTT. |
| timestamp: Option<I>, |
| retrans_timer: RetransTimer<I>, |
| /// Indicates that we arrive this state from [`SynSent`], i.e., this was an |
| /// active open connection. Store this information so that we don't use the |
| /// wrong routines to construct buffers. |
| simultaneous_open: Option<ActiveOpen>, |
| buffer_sizes: BufferSizes, |
| /// The sender MSS negotiated as described in [RFC 9293 section 3.7.1]. |
| /// |
| /// [RFC 9293 section 3.7.1]: https://datatracker.ietf.org/doc/html/rfc9293#name-maximum-segment-size-option |
| smss: Mss, |
| rcv_wnd_scale: WindowScale, |
| snd_wnd_scale: Option<WindowScale>, |
| } |
| |
| impl<I: Instant, R: ReceiveBuffer, S: SendBuffer, ActiveOpen> From<SynRcvd<I, Infallible>> |
| for State<I, R, S, ActiveOpen> |
| { |
| fn from( |
| SynRcvd { |
| iss, |
| irs, |
| timestamp, |
| retrans_timer, |
| simultaneous_open, |
| buffer_sizes, |
| smss, |
| rcv_wnd_scale, |
| snd_wnd_scale, |
| }: SynRcvd<I, Infallible>, |
| ) -> Self { |
| match simultaneous_open { |
| None => State::SynRcvd(SynRcvd { |
| iss, |
| irs, |
| timestamp, |
| retrans_timer, |
| simultaneous_open: None, |
| buffer_sizes, |
| smss, |
| rcv_wnd_scale, |
| snd_wnd_scale, |
| }), |
| Some(infallible) => match infallible {}, |
| } |
| } |
| } |
| enum FinQueued {} |
| |
| impl FinQueued { |
| // TODO(https://github.com/rust-lang/rust/issues/95174): Before we can use |
| // enum for const generics, we define the following constants to give |
| // meaning to the bools when used. |
| const YES: bool = true; |
| const NO: bool = false; |
| } |
| |
| /// TCP control block variables that are responsible for sending. |
| #[derive(Derivative)] |
| #[derivative(Debug)] |
| #[cfg_attr(test, derivative(PartialEq, Eq))] |
| struct Send<I, S, const FIN_QUEUED: bool> { |
| nxt: SeqNum, |
| max: SeqNum, |
| una: SeqNum, |
| wnd: WindowSize, |
| wnd_scale: WindowScale, |
| wnd_max: WindowSize, |
| wl1: SeqNum, |
| wl2: SeqNum, |
| // The last sequence number sent out and its timestamp when sent. |
| last_seq_ts: Option<(SeqNum, I)>, |
| rtt_estimator: Estimator, |
| timer: Option<SendTimer<I>>, |
| #[derivative(PartialEq = "ignore")] |
| congestion_control: CongestionControl<I>, |
| buffer: S, |
| } |
| |
| impl<I> Send<I, (), false> { |
| fn with_buffer<S>(self, buffer: S) -> Send<I, S, false> { |
| let Self { |
| nxt, |
| max, |
| una, |
| wnd, |
| wnd_scale, |
| wnd_max, |
| wl1, |
| wl2, |
| last_seq_ts, |
| rtt_estimator, |
| timer, |
| congestion_control, |
| buffer: _, |
| } = self; |
| Send { |
| nxt, |
| max, |
| una, |
| wnd, |
| wnd_scale, |
| wnd_max, |
| wl1, |
| wl2, |
| last_seq_ts, |
| rtt_estimator, |
| timer, |
| congestion_control, |
| buffer, |
| } |
| } |
| } |
| |
| #[derive(Debug, Clone, Copy)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| struct RetransTimer<I> { |
| user_timeout_until: I, |
| remaining_retries: Option<NonZeroU8>, |
| at: I, |
| rto: Duration, |
| } |
| |
| impl<I: Instant> RetransTimer<I> { |
| fn new(now: I, rto: Duration, user_timeout: Duration, max_retries: NonZeroU8) -> Self { |
| let wakeup_after = rto.min(user_timeout); |
| let at = now.add(wakeup_after); |
| let user_timeout_until = now.add(user_timeout); |
| Self { at, rto, user_timeout_until, remaining_retries: Some(max_retries) } |
| } |
| |
| fn backoff(&mut self, now: I) { |
| let Self { at, rto, user_timeout_until, remaining_retries } = self; |
| *remaining_retries = remaining_retries.and_then(|r| NonZeroU8::new(r.get() - 1)); |
| *rto = rto.saturating_mul(2); |
| let remaining = if now < *user_timeout_until { |
| user_timeout_until.duration_since(now) |
| } else { |
| // `now` has already passed `user_timeout_until`, just update the |
| // timer to expire as soon as possible. |
| Duration::ZERO |
| }; |
| *at = now.add(core::cmp::min(*rto, remaining)); |
| } |
| |
| fn rearm(&mut self, now: I) { |
| let Self { at, rto, user_timeout_until: _, remaining_retries: _ } = self; |
| *at = now.add(*rto); |
| } |
| |
| fn timed_out(&self, now: I) -> bool { |
| let RetransTimer { user_timeout_until, remaining_retries, at, rto: _ } = self; |
| (remaining_retries.is_none() && now >= *at) || now >= *user_timeout_until |
| } |
| } |
| |
| /// Possible timers for a sender. |
| #[derive(Debug, Clone, Copy)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| enum SendTimer<I> { |
| /// A retransmission timer can only be installed when there is outstanding |
| /// data. |
| Retrans(RetransTimer<I>), |
| /// A keep-alive timer can only be installed when the connection is idle, |
| /// i.e., the connection must not have any outstanding data. |
| KeepAlive(KeepAliveTimer<I>), |
| /// A zero window probe timer is installed when the receiver advertises a |
| /// zero window but we have data to send. RFC 9293 Section 3.8.6.1 suggests |
| /// that: |
| /// The transmitting host SHOULD send the first zero-window probe when a |
| /// zero window has existed for the retransmission timeout period, and |
| /// SHOULD increase exponentially the interval between successive probes. |
| /// So we choose a retransmission timer as its implementation. |
| ZeroWindowProbe(RetransTimer<I>), |
| /// A timer installed to override silly window avoidance, when the receiver |
| /// reduces its buffer size to be below 1 MSS (should happen very rarely), |
| /// it's possible for the connection to make no progress if there is no such |
| /// timer. Per RFC 9293 Section 3.8.6.2.1: |
| /// To avoid a resulting deadlock, it is necessary to have a timeout to |
| /// force transmission of data, overriding the SWS avoidance algorithm. |
| /// In practice, this timeout should seldom occur. |
| SWSProbe { at: I }, |
| } |
| |
| #[derive(Debug, Clone, Copy)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| enum ReceiveTimer<I> { |
| DelayedAck { at: I, received_bytes: NonZeroU32 }, |
| } |
| |
| #[derive(Debug, Clone, Copy)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| struct KeepAliveTimer<I> { |
| at: I, |
| already_sent: u8, |
| } |
| |
| impl<I: Instant> KeepAliveTimer<I> { |
| fn idle(now: I, keep_alive: &KeepAlive) -> Self { |
| let at = now.add(keep_alive.idle.into()); |
| Self { at, already_sent: 0 } |
| } |
| } |
| |
| impl<I: Instant> SendTimer<I> { |
| fn expiry(&self) -> I { |
| match self { |
| SendTimer::Retrans(RetransTimer { |
| at, |
| rto: _, |
| user_timeout_until: _, |
| remaining_retries: _, |
| }) |
| | SendTimer::KeepAlive(KeepAliveTimer { at, already_sent: _ }) |
| | SendTimer::ZeroWindowProbe(RetransTimer { |
| at, |
| rto: _, |
| user_timeout_until: _, |
| remaining_retries: _, |
| }) => *at, |
| SendTimer::SWSProbe { at } => *at, |
| } |
| } |
| } |
| |
| impl<I: Instant> ReceiveTimer<I> { |
| fn expiry(&self) -> I { |
| match self { |
| ReceiveTimer::DelayedAck { at, received_bytes: _ } => *at, |
| } |
| } |
| } |
| |
| /// TCP control block variables that are responsible for receiving. |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| struct Recv<I, R> { |
| assembler: Assembler, |
| timer: Option<ReceiveTimer<I>>, |
| mss: Mss, |
| wnd_scale: WindowScale, |
| last_window_update: (SeqNum, WindowSize), |
| buffer: R, |
| } |
| |
| impl<I> Recv<I, ()> { |
| fn with_buffer<R>(self, buffer: R) -> Recv<I, R> { |
| let Self { assembler, timer, mss, wnd_scale, last_window_update, buffer: _ } = self; |
| Recv { assembler, timer, mss, wnd_scale, last_window_update, buffer } |
| } |
| } |
| |
| impl<I: Instant, R: ReceiveBuffer> Recv<I, R> { |
| fn select_window(&mut self) -> WindowSize { |
| let Self { |
| buffer, |
| assembler, |
| timer: _, |
| mss, |
| wnd_scale: _, |
| last_window_update: (rcv_wup, last_wnd), |
| } = self; |
| let rcv_nxt = assembler.nxt(); |
| // Per RFC 9293 Section 3.8.6.2.2: |
| // The suggested SWS avoidance algorithm for the receiver is to keep |
| // RCV.NXT+RCV.WND fixed until the reduction satisfies: |
| // RCV.BUFF - RCV.USER - RCV.WND >= |
| // min( Fr * RCV.BUFF, Eff.snd.MSS ) |
| // where Fr is a fraction whose recommended value is 1/2, and |
| // Eff.snd.MSS is the effective send MSS for the connection. |
| // When the inequality is satisfied, RCV.WND is set to RCV.BUFF-RCV.USER. |
| |
| // `len` and `capacity` are RCV.USER and RCV.BUFF respectively. |
| let BufferLimits { capacity, len } = buffer.limits(); |
| // `unused_window` is RCV.WND as described above. |
| let unused_window = WindowSize::from_u32(u32::try_from(*rcv_wup + *last_wnd - rcv_nxt).unwrap_or_else(|_: TryFromIntError| { |
| tracing::error!( |
| "we received more bytes than we advertised, rcv_nxt: {:?}, rcv_wup: {:?}, last_wnd: {:?}", |
| rcv_nxt, rcv_wup, last_wnd |
| ); |
| 0 |
| })).unwrap_or(WindowSize::MAX); |
| // Note: between the last window update and now, it's possible that we |
| // have reduced our receive buffer's capacity, so we need to use |
| // saturating arithmetic below. |
| let reduction = capacity.saturating_sub(len.saturating_add(usize::from(unused_window))); |
| *rcv_wup = rcv_nxt; |
| *last_wnd = if reduction >= usize::min(capacity / 2, usize::from(mss.get().get())) { |
| // We have enough reduction in the buffer space, advertise more. |
| WindowSize::new(capacity - len).unwrap_or(WindowSize::MAX) |
| } else { |
| // Keep the right edge fixed by only advertise whatever is unused in |
| // the last advertisement. |
| unused_window |
| }; |
| *last_wnd |
| } |
| |
| fn nxt(&self) -> SeqNum { |
| self.assembler.nxt() |
| } |
| |
| fn take(&mut self) -> Self { |
| let Self { buffer, assembler, timer, mss, wnd_scale, last_window_update } = self; |
| Self { |
| buffer: buffer.take(), |
| assembler: core::mem::replace(assembler, Assembler::new(SeqNum::new(0))), |
| timer: *timer, |
| mss: *mss, |
| wnd_scale: *wnd_scale, |
| last_window_update: *last_window_update, |
| } |
| } |
| |
| fn set_capacity(&mut self, size: usize) { |
| let Self { buffer, assembler: _, timer: _, mss: _, wnd_scale: _, last_window_update: _ } = |
| self; |
| buffer.request_capacity(size) |
| } |
| |
| fn target_capacity(&self) -> usize { |
| let Self { buffer, assembler: _, timer: _, mss: _, wnd_scale: _, last_window_update: _ } = |
| self; |
| buffer.target_capacity() |
| } |
| |
| fn poll_send(&mut self, snd_max: SeqNum, now: I) -> Option<Segment<()>> { |
| match self.timer { |
| Some(ReceiveTimer::DelayedAck { at, received_bytes: _ }) => (at <= now).then(|| { |
| self.timer = None; |
| Segment::ack(snd_max, self.nxt(), self.select_window() >> self.wnd_scale) |
| }), |
| None => None, |
| } |
| } |
| } |
| |
| /// Per RFC 793 (https://tools.ietf.org/html/rfc793#page-22): |
| /// |
| /// ESTABLISHED - represents an open connection, data received can be |
| /// delivered to the user. The normal state for the data transfer phase |
| /// of the connection. |
| /// |
| /// Allowed operations: |
| /// - send |
| /// - recv |
| /// - shutdown |
| /// Disallowed operations: |
| /// - listen |
| /// - accept |
| /// - connect |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| pub struct Established<I, R, S> { |
| snd: Send<I, S, { FinQueued::NO }>, |
| rcv: Recv<I, R>, |
| } |
| |
| /// Indicates whether at least one byte of data was acknowledged by the remote |
| /// in an incoming segment. |
| #[derive(Debug, Clone, Copy, PartialEq)] |
| pub(crate) enum DataAcked { |
| Yes, |
| No, |
| } |
| |
| impl<I: Instant, S: SendBuffer, const FIN_QUEUED: bool> Send<I, S, FIN_QUEUED> { |
| /// Returns true if the connection should still be alive per the send state. |
| fn timed_out(&self, now: I, keep_alive: &KeepAlive) -> bool { |
| match self.timer { |
| Some(SendTimer::KeepAlive(keep_alive_timer)) => { |
| keep_alive.enabled && keep_alive_timer.already_sent >= keep_alive.count.get() |
| } |
| Some(SendTimer::Retrans(timer)) | Some(SendTimer::ZeroWindowProbe(timer)) => { |
| timer.timed_out(now) |
| } |
| Some(SendTimer::SWSProbe { at: _ }) | None => false, |
| } |
| } |
| |
| /// Polls for new segments with enabled options. |
| /// |
| /// `limit` is the maximum bytes wanted in the TCP segment (if any). The |
| /// returned segment will have payload size up to the smaller of the given |
| /// limit or the calculated MSS for the connection. |
| fn poll_send( |
| &mut self, |
| counters: &TcpCountersInner, |
| rcv_nxt: SeqNum, |
| rcv_wnd: WindowSize, |
| limit: u32, |
| now: I, |
| SocketOptions { |
| keep_alive, |
| nagle_enabled, |
| user_timeout, |
| delayed_ack: _, |
| fin_wait2_timeout: _, |
| max_syn_retries: _, |
| }: &SocketOptions, |
| ) -> Option<Segment<SendPayload<'_>>> { |
| let Self { |
| nxt: snd_nxt, |
| max: snd_max, |
| una: snd_una, |
| wnd: snd_wnd, |
| buffer, |
| wl1: _, |
| wl2: _, |
| last_seq_ts, |
| rtt_estimator, |
| timer, |
| congestion_control, |
| wnd_scale, |
| wnd_max: snd_wnd_max, |
| } = self; |
| let BufferLimits { capacity: _, len: readable_bytes } = buffer.limits(); |
| let mss = u32::from(congestion_control.mss()); |
| let mut zero_window_probe = false; |
| let mut override_sws = false; |
| |
| let increment_retransmit_counters = |congestion_control: &CongestionControl<I>| { |
| counters.retransmits.increment(); |
| if congestion_control.in_fast_recovery() { |
| counters.fast_retransmits.increment(); |
| } |
| if congestion_control.in_slow_start() { |
| counters.slow_start_retransmits.increment(); |
| } |
| }; |
| |
| match timer { |
| Some(SendTimer::Retrans(retrans_timer)) => { |
| if retrans_timer.at <= now { |
| // Per https://tools.ietf.org/html/rfc6298#section-5: |
| // (5.4) Retransmit the earliest segment that has not |
| // been acknowledged by the TCP receiver. |
| // (5.5) The host MUST set RTO <- RTO * 2 ("back off |
| // the timer"). The maximum value discussed in |
| // (2.5) above may be used to provide an upper |
| // bound to this doubling operation. |
| // (5.6) Start the retransmission timer, such that it |
| // expires after RTO seconds (for the value of |
| // RTO after the doubling operation outlined in |
| // 5.5). |
| *snd_nxt = *snd_una; |
| retrans_timer.backoff(now); |
| congestion_control.on_retransmission_timeout(); |
| counters.timeouts.increment(); |
| increment_retransmit_counters(congestion_control); |
| } |
| } |
| Some(SendTimer::ZeroWindowProbe(retrans_timer)) => { |
| debug_assert!(readable_bytes > 0 || FIN_QUEUED); |
| if retrans_timer.at <= now { |
| zero_window_probe = true; |
| *snd_nxt = *snd_una; |
| // Per RFC 9293 Section 3.8.6.1: |
| // [...] SHOULD increase exponentially the interval |
| // between successive probes. |
| retrans_timer.backoff(now); |
| } |
| } |
| Some(SendTimer::KeepAlive(KeepAliveTimer { at, already_sent })) => { |
| // Per RFC 9293 Section 3.8.4: |
| // Keep-alive packets MUST only be sent when no sent data is |
| // outstanding, and no data or acknowledgment packets have |
| // been received for the connection within an interval. |
| if keep_alive.enabled && !FIN_QUEUED && readable_bytes == 0 { |
| if *at <= now { |
| *at = now.add(keep_alive.interval.into()); |
| *already_sent = already_sent.saturating_add(1); |
| // Per RFC 9293 Section 3.8.4: |
| // Such a segment generally contains SEG.SEQ = SND.NXT-1 |
| return Some( |
| Segment::ack(*snd_max - 1, rcv_nxt, rcv_wnd >> *wnd_scale).into(), |
| ); |
| } |
| } else { |
| *timer = None; |
| } |
| } |
| Some(SendTimer::SWSProbe { at }) => { |
| if *at <= now { |
| override_sws = true; |
| *timer = None; |
| } |
| } |
| None => {} |
| }; |
| // Find the sequence number for the next segment, we start with snd_nxt |
| // unless a fast retransmit is needed. |
| let next_seg = match congestion_control.fast_retransmit() { |
| None => *snd_nxt, |
| Some(seg) => { |
| increment_retransmit_counters(congestion_control); |
| seg |
| } |
| }; |
| // First calculate the open window, note that if our peer has shrank |
| // their window (it is strongly discouraged), the following conversion |
| // will fail and we return early. |
| let cwnd = congestion_control.cwnd(); |
| let swnd = WindowSize::min(*snd_wnd, cwnd); |
| let open_window = |
| u32::try_from(*snd_una + swnd - next_seg).ok_checked::<TryFromIntError>()?; |
| let offset = |
| usize::try_from(next_seg - *snd_una).unwrap_or_else(|TryFromIntError { .. }| { |
| panic!("next_seg({:?}) should never fall behind snd.una({:?})", *snd_nxt, *snd_una); |
| }); |
| let available = u32::try_from(readable_bytes + usize::from(FIN_QUEUED) - offset) |
| .unwrap_or(WindowSize::MAX.into()); |
| // We can only send the minimum of the open window and the bytes that |
| // are available, additionally, if in zero window probe mode, allow at |
| // least one byte past the limit to be sent. |
| let can_send = |
| open_window.min(available).min(mss).min(limit).max(u32::from(zero_window_probe)); |
| if can_send == 0 { |
| if available == 0 && offset == 0 && timer.is_none() && keep_alive.enabled { |
| *timer = Some(SendTimer::KeepAlive(KeepAliveTimer::idle(now, keep_alive))); |
| } |
| if available != 0 && offset == 0 && timer.is_none() && *snd_wnd == WindowSize::ZERO { |
| let user_timeout = user_timeout.get_or_default(DEFAULT_USER_TIMEOUT); |
| *timer = Some(SendTimer::ZeroWindowProbe(RetransTimer::new( |
| now, |
| rtt_estimator.rto(), |
| user_timeout, |
| DEFAULT_MAX_RETRIES, |
| ))) |
| } |
| return None; |
| } |
| let has_fin = FIN_QUEUED && can_send == available; |
| let seg = buffer.peek_with(offset, |readable| { |
| let bytes_to_send = u32::min( |
| can_send - u32::from(has_fin), |
| u32::try_from(readable.len()).unwrap_or(u32::MAX), |
| ); |
| let has_fin = has_fin && bytes_to_send == can_send - u32::from(has_fin); |
| // If we have more bytes to send than a MSS (enough to send) or the |
| // segment is a FIN (no need to wait for more), don't hold off. |
| if bytes_to_send < mss && !has_fin { |
| if bytes_to_send == 0 { |
| return None; |
| } |
| // First check if disallowed by nagle. |
| // Per RFC 9293 Section 3.7.4: |
| // If there is unacknowledged data (i.e., SND.NXT > SND.UNA), |
| // then the sending TCP endpoint buffers all user data |
| // (regardless of the PSH bit) until the outstanding data has |
| // been acknowledged or until the TCP endpoint can send a |
| // full-sized segment (Eff.snd.MSS bytes). |
| if *nagle_enabled && snd_nxt.after(*snd_una) { |
| return None; |
| } |
| // Otherwise check if disallowed by SWS avoidance. |
| // Per RFC 9293 Section 3.8.6.2.1: |
| // Send data: |
| // (1) if a maximum-sized segment can be sent, i.e., if: |
| // min(D,U) >= Eff.snd.MSS; |
| // (2) or if the data is pushed and all queued data can be |
| // sent now, i.e., if: |
| // [SND.NXT = SND.UNA and] PUSHed and D <= U |
| // (the bracketed condition is imposed by the Nagle algorithm); |
| // (3) or if at least a fraction Fs of the maximum window can |
| // be sent, i.e., if: |
| // [SND.NXT = SND.UNA and] min(D,U) >= Fs * Max(SND.WND); |
| // (4) or if the override timeout occurs. |
| // ... Here Fs is a fraction whose recommended value is 1/2 |
| // Explanation: |
| // To simplify the conditions, we can ignore the brackets since |
| // those are controlled by the nagle algorithm and is handled by |
| // the block above. Also we consider all data as PUSHed so for |
| // example (2) is now simply `D <= U`. Mapping into the code |
| // context, `D` is `available` and `U` is `open_window`. |
| // |
| // The RFC says when to send data, negating it, we will get the |
| // condition for when to hold off sending segments, that is: |
| // - negate (2) we get D > U, |
| // - negate (1) and combine with D > U, we get U < Eff.snd.MSS, |
| // - negate (3) and combine with D > U, we get U < Fs * Max(SND.WND). |
| // If the overriding timer fired or we are in zero window |
| // probing phase, we override it to send data anyways. |
| if available > open_window |
| && open_window < u32::min(mss, u32::from(*snd_wnd_max) / SWS_BUFFER_FACTOR) |
| && !override_sws |
| && !zero_window_probe |
| { |
| if timer.is_none() { |
| *timer = Some(SendTimer::SWSProbe { at: now.add(SWS_PROBE_TIMEOUT) }) |
| } |
| return None; |
| } |
| } |
| let (seg, discarded) = Segment::with_data( |
| next_seg, |
| Some(rcv_nxt), |
| has_fin.then(|| Control::FIN), |
| rcv_wnd >> *wnd_scale, |
| readable.slice(0..bytes_to_send), |
| ); |
| debug_assert_eq!(discarded, 0); |
| Some(seg) |
| })?; |
| let seq_max = next_seg + seg.contents.len(); |
| match *last_seq_ts { |
| Some((seq, _ts)) => { |
| if seq_max.after(seq) { |
| *last_seq_ts = Some((seq_max, now)); |
| } else { |
| // If the recorded sequence number is ahead of us, we are |
| // in retransmission, we should discard the timestamp and |
| // abort the estimation. |
| *last_seq_ts = None; |
| } |
| } |
| None => *last_seq_ts = Some((seq_max, now)), |
| } |
| if seq_max.after(*snd_nxt) { |
| *snd_nxt = seq_max; |
| } |
| if seq_max.after(*snd_max) { |
| *snd_max = seq_max; |
| } |
| // Per https://tools.ietf.org/html/rfc6298#section-5: |
| // (5.1) Every time a packet containing data is sent (including a |
| // retransmission), if the timer is not running, start it |
| // running so that it will expire after RTO seconds (for the |
| // current value of RTO). |
| match timer { |
| Some(SendTimer::Retrans(_)) | Some(SendTimer::ZeroWindowProbe(_)) => {} |
| Some(SendTimer::KeepAlive(_)) | Some(SendTimer::SWSProbe { at: _ }) | None => { |
| let user_timeout = user_timeout.get_or_default(DEFAULT_USER_TIMEOUT); |
| *timer = Some(SendTimer::Retrans(RetransTimer::new( |
| now, |
| rtt_estimator.rto(), |
| user_timeout, |
| DEFAULT_MAX_RETRIES, |
| ))) |
| } |
| } |
| Some(seg) |
| } |
| |
| /// Processes an incoming ACK and returns a segment if one needs to be sent, |
| /// along with whether at least one byte of data was ACKed. |
| fn process_ack( |
| &mut self, |
| counters: &TcpCountersInner, |
| seg_seq: SeqNum, |
| seg_ack: SeqNum, |
| seg_wnd: UnscaledWindowSize, |
| pure_ack: bool, |
| rcv_nxt: SeqNum, |
| rcv_wnd: WindowSize, |
| now: I, |
| keep_alive: &KeepAlive, |
| ) -> (Option<Segment<()>>, DataAcked) { |
| let Self { |
| nxt: snd_nxt, |
| max: snd_max, |
| una: snd_una, |
| wnd: snd_wnd, |
| wl1: snd_wl1, |
| wl2: snd_wl2, |
| wnd_max, |
| buffer, |
| last_seq_ts, |
| rtt_estimator, |
| timer, |
| congestion_control, |
| wnd_scale, |
| } = self; |
| let seg_wnd = seg_wnd << *wnd_scale; |
| match timer { |
| Some(SendTimer::KeepAlive(_)) | None => { |
| if keep_alive.enabled { |
| *timer = Some(SendTimer::KeepAlive(KeepAliveTimer::idle(now, keep_alive))); |
| } |
| } |
| Some(SendTimer::Retrans(retrans_timer)) => { |
| // Per https://tools.ietf.org/html/rfc6298#section-5: |
| // (5.2) When all outstanding data has been acknowledged, |
| // turn off the retransmission timer. |
| // (5.3) When an ACK is received that acknowledges new |
| // data, restart the retransmission timer so that |
| // it will expire after RTO seconds (for the current |
| // value of RTO). |
| if seg_ack == *snd_max { |
| *timer = None; |
| } else if seg_ack.before(*snd_max) && seg_ack.after(*snd_una) { |
| retrans_timer.rearm(now); |
| } |
| } |
| Some(SendTimer::ZeroWindowProbe(_)) | Some(SendTimer::SWSProbe { at: _ }) => {} |
| } |
| // Note: we rewind SND.NXT to SND.UNA on retransmission; if |
| // `seg_ack` is after `snd.max`, it means the segment acks |
| // something we never sent. |
| if seg_ack.after(*snd_max) { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-72): |
| // If the ACK acks something not yet sent (SEG.ACK > |
| // SND.NXT) then send an ACK, drop the segment, and |
| // return. |
| (Some(Segment::ack(*snd_max, rcv_nxt, rcv_wnd >> *wnd_scale)), DataAcked::No) |
| } else if seg_ack.after(*snd_una) { |
| // The unwrap is safe because the result must be positive. |
| let acked = u32::try_from(seg_ack - *snd_una) |
| .ok_checked::<TryFromIntError>() |
| .and_then(NonZeroU32::new) |
| .unwrap_or_else(|| { |
| panic!("seg_ack({:?}) - snd_una({:?}) must be positive", seg_ack, snd_una); |
| }); |
| let BufferLimits { len, capacity: _ } = buffer.limits(); |
| let fin_acked = FIN_QUEUED && seg_ack == *snd_una + len + 1; |
| // Remove the acked bytes from the send buffer. The following |
| // operation should not panic because we are in this branch |
| // means seg_ack is before snd.max, thus seg_ack - snd.una |
| // cannot exceed the buffer length. |
| buffer.mark_read( |
| NonZeroUsize::try_from(acked) |
| .unwrap_or_else(|TryFromIntError { .. }| { |
| // we've checked that acked must be smaller than the outstanding |
| // bytes we have in the buffer; plus in Rust, any allocation can |
| // only have a size up to isize::MAX bytes. |
| panic!( |
| "acked({:?}) must be smaller than isize::MAX({:?})", |
| acked, |
| isize::MAX |
| ) |
| }) |
| .get() |
| - usize::from(fin_acked), |
| ); |
| *snd_una = seg_ack; |
| // If the incoming segment acks something that has been sent |
| // but not yet retransmitted (`snd.nxt < seg_ack <= snd.max`), |
| // bump `snd.nxt` as well. |
| if seg_ack.after(*snd_nxt) { |
| *snd_nxt = seg_ack; |
| } |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-72): |
| // If SND.UNA < SEG.ACK =< SND.NXT, the send window should be |
| // updated. If (SND.WL1 < SEG.SEQ or (SND.WL1 = SEG.SEQ and |
| // SND.WL2 =< SEG.ACK)), set SND.WND <- SEG.WND, set |
| // SND.WL1 <- SEG.SEQ, and set SND.WL2 <- SEG.ACK. |
| if snd_wl1.before(seg_seq) || (seg_seq == *snd_wl1 && !snd_wl2.after(seg_ack)) { |
| *snd_wnd = seg_wnd; |
| *snd_wl1 = seg_seq; |
| *snd_wl2 = seg_ack; |
| *wnd_max = seg_wnd.max(*wnd_max); |
| if seg_wnd != WindowSize::ZERO |
| && matches!(timer, Some(SendTimer::ZeroWindowProbe(_))) |
| { |
| *timer = None; |
| } |
| } |
| // If the incoming segment acks the sequence number that we used |
| // for RTT estimate, feed the sample to the estimator. |
| if let Some((seq_max, timestamp)) = *last_seq_ts { |
| if !seg_ack.before(seq_max) { |
| rtt_estimator.sample(now.duration_since(timestamp)); |
| } |
| } |
| congestion_control.on_ack(acked, now, rtt_estimator.rto()); |
| // At least one byte of data was ACKed by the peer. |
| (None, DataAcked::Yes) |
| } else { |
| // Per RFC 5681 (https://www.rfc-editor.org/rfc/rfc5681#section-2): |
| // DUPLICATE ACKNOWLEDGMENT: An acknowledgment is considered a |
| // "duplicate" in the following algorithms when (a) the receiver of |
| // the ACK has outstanding data, (b) the incoming acknowledgment |
| // carries no data, (c) the SYN and FIN bits are both off, (d) the |
| // acknowledgment number is equal to the greatest acknowledgment |
| // received on the given connection (TCP.UNA from [RFC793]) and (e) |
| // the advertised window in the incoming acknowledgment equals the |
| // advertised window in the last incoming acknowledgment. |
| let is_dup_ack = { |
| snd_nxt.after(*snd_una) // (a) |
| && pure_ack // (b) & (c) |
| && seg_ack == *snd_una // (d) |
| && seg_wnd == *snd_wnd // (e) |
| }; |
| if is_dup_ack { |
| let fast_recovery_initiated = congestion_control.on_dup_ack(seg_ack); |
| if fast_recovery_initiated { |
| counters.fast_recovery.increment(); |
| } |
| } |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-72): |
| // If the ACK is a duplicate (SEG.ACK < SND.UNA), it can be |
| // ignored. |
| (None, DataAcked::No) |
| } |
| } |
| |
| fn take(&mut self) -> Self { |
| Self { |
| buffer: self.buffer.take(), |
| congestion_control: self.congestion_control.take(), |
| ..*self |
| } |
| } |
| |
| fn set_capacity(&mut self, size: usize) { |
| let Self { |
| nxt: _, |
| max: _, |
| una: _, |
| wnd: _, |
| wl1: _, |
| wl2: _, |
| buffer, |
| last_seq_ts: _, |
| rtt_estimator: _, |
| timer: _, |
| congestion_control: _, |
| wnd_scale: _, |
| wnd_max: _, |
| } = self; |
| buffer.request_capacity(size) |
| } |
| |
| fn target_capacity(&self) -> usize { |
| let Self { |
| nxt: _, |
| max: _, |
| una: _, |
| wnd: _, |
| wl1: _, |
| wl2: _, |
| buffer, |
| last_seq_ts: _, |
| rtt_estimator: _, |
| timer: _, |
| congestion_control: _, |
| wnd_scale: _, |
| wnd_max: _, |
| } = self; |
| buffer.target_capacity() |
| } |
| } |
| |
| impl<I: Instant, S: SendBuffer> Send<I, S, { FinQueued::NO }> { |
| fn queue_fin(self) -> Send<I, S, { FinQueued::YES }> { |
| let Self { |
| nxt, |
| max, |
| una, |
| wnd, |
| wl1, |
| wl2, |
| buffer, |
| last_seq_ts, |
| rtt_estimator, |
| timer, |
| congestion_control, |
| wnd_scale, |
| wnd_max, |
| } = self; |
| Send { |
| nxt, |
| max, |
| una, |
| wnd, |
| wl1, |
| wl2, |
| buffer, |
| last_seq_ts, |
| rtt_estimator, |
| timer, |
| congestion_control, |
| wnd_scale, |
| wnd_max, |
| } |
| } |
| } |
| |
| /// Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21): |
| /// |
| /// CLOSE-WAIT - represents waiting for a connection termination request |
| /// from the local user. |
| /// |
| /// Allowed operations: |
| /// - send |
| /// - recv (only leftovers and no new data will be accepted from the peer) |
| /// - shutdown |
| /// Disallowed operations: |
| /// - listen |
| /// - accept |
| /// - connect |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| pub struct CloseWait<I, S> { |
| snd: Send<I, S, { FinQueued::NO }>, |
| last_ack: SeqNum, |
| last_wnd: WindowSize, |
| } |
| |
| /// Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21): |
| /// |
| /// LAST-ACK - represents waiting for an acknowledgment of the |
| /// connection termination request previously sent to the remote TCP |
| /// (which includes an acknowledgment of its connection termination |
| /// request). |
| /// |
| /// Allowed operations: |
| /// - recv (only leftovers and no new data will be accepted from the peer) |
| /// Disallowed operations: |
| /// - send |
| /// - shutdown |
| /// - accept |
| /// - listen |
| /// - connect |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| pub struct LastAck<I, S> { |
| snd: Send<I, S, { FinQueued::YES }>, |
| last_ack: SeqNum, |
| last_wnd: WindowSize, |
| } |
| |
| /// Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21): |
| /// |
| /// FIN-WAIT-1 - represents waiting for a connection termination request |
| /// from the remote TCP, or an acknowledgment of the connection |
| /// termination request previously sent. |
| /// |
| /// Allowed operations: |
| /// - recv |
| /// Disallowed operations: |
| /// - send |
| /// - shutdown |
| /// - accept |
| /// - listen |
| /// - connect |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| pub struct FinWait1<I, R, S> { |
| snd: Send<I, S, { FinQueued::YES }>, |
| rcv: Recv<I, R>, |
| } |
| |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| /// Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21): |
| /// |
| /// FIN-WAIT-2 - represents waiting for a connection termination request |
| /// from the remote TCP. |
| /// |
| /// Allowed operations: |
| /// - recv |
| /// Disallowed operations: |
| /// - send |
| /// - shutdown |
| /// - accept |
| /// - listen |
| /// - connect |
| pub struct FinWait2<I, R> { |
| last_seq: SeqNum, |
| rcv: Recv<I, R>, |
| timeout_at: Option<I>, |
| } |
| |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| /// Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21): |
| /// |
| /// CLOSING - represents waiting for a connection termination request |
| /// acknowledgment from the remote TCP |
| /// |
| /// Allowed operations: |
| /// - recv |
| /// Disallowed operations: |
| /// - send |
| /// - shutdown |
| /// - accept |
| /// - listen |
| /// - connect |
| pub struct Closing<I, S> { |
| snd: Send<I, S, { FinQueued::YES }>, |
| last_ack: SeqNum, |
| last_wnd: WindowSize, |
| last_wnd_scale: WindowScale, |
| } |
| |
| /// Per RFC 793 (https://tools.ietf.org/html/rfc793#page-22): |
| /// |
| /// TIME-WAIT - represents waiting for enough time to pass to be sure |
| /// the remote TCP received the acknowledgment of its connection |
| /// termination request. |
| /// |
| /// Allowed operations: |
| /// - recv (only leftovers and no new data will be accepted from the peer) |
| /// Disallowed operations: |
| /// - send |
| /// - shutdown |
| /// - accept |
| /// - listen |
| /// - connect |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| pub struct TimeWait<I> { |
| pub(super) last_seq: SeqNum, |
| pub(super) last_ack: SeqNum, |
| pub(super) last_wnd: WindowSize, |
| pub(super) last_wnd_scale: WindowScale, |
| pub(super) expiry: I, |
| } |
| |
| fn new_time_wait_expiry<I: Instant>(now: I) -> I { |
| now.add(MSL * 2) |
| } |
| |
| #[derive(Debug)] |
| #[cfg_attr(test, derive(PartialEq, Eq))] |
| pub enum State<I, R, S, ActiveOpen> { |
| Closed(Closed<Option<ConnectionError>>), |
| Listen(Listen), |
| SynRcvd(SynRcvd<I, ActiveOpen>), |
| SynSent(SynSent<I, ActiveOpen>), |
| Established(Established<I, R, S>), |
| CloseWait(CloseWait<I, S>), |
| LastAck(LastAck<I, S>), |
| FinWait1(FinWait1<I, R, S>), |
| FinWait2(FinWait2<I, R>), |
| Closing(Closing<I, S>), |
| TimeWait(TimeWait<I>), |
| } |
| |
| impl<I, R, S, ActiveOpen> core::fmt::Display for State<I, R, S, ActiveOpen> { |
| fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> { |
| let name = match self { |
| State::Closed(_) => "Closed", |
| State::Listen(_) => "Listen", |
| State::SynRcvd(_) => "SynRcvd", |
| State::SynSent(_) => "SynSent", |
| State::Established(_) => "Established", |
| State::CloseWait(_) => "CloseWait", |
| State::LastAck(_) => "LastAck", |
| State::FinWait1(_) => "FinWait1", |
| State::FinWait2(_) => "FinWait2", |
| State::Closing(_) => "Closing", |
| State::TimeWait(_) => "TimeWait", |
| }; |
| write!(f, "{name}") |
| } |
| } |
| |
| #[derive(Debug, PartialEq, Eq)] |
| /// Possible errors for closing a connection |
| pub(super) enum CloseError { |
| /// The connection is already being closed. |
| Closing, |
| /// There is no connection to be closed. |
| NoConnection, |
| } |
| |
| /// A provider that creates receive and send buffers when the connection |
| /// becomes established. |
| pub(crate) trait BufferProvider<R: ReceiveBuffer, S: SendBuffer> { |
| /// An object that is returned when a passive open connection is |
| /// established. |
| type PassiveOpen; |
| |
| /// An object that is needed to initiate a connection which will be |
| /// later used to create buffers once the connection is established. |
| type ActiveOpen: IntoBuffers<R, S>; |
| |
| /// Creates new send and receive buffers and an object that needs to be |
| /// vended to the application. |
| fn new_passive_open_buffers(buffer_sizes: BufferSizes) -> (R, S, Self::PassiveOpen); |
| } |
| |
| /// Allows the implementor to be replaced by an empty value. The semantics is |
| /// similar to [`core::mem::take`], except that for some types it is not |
| /// sensible to implement [`Default`] for the target type. This can be useful |
| /// in state transitions, but `replace_with` might be a better choice. |
| pub trait Takeable { |
| /// Replaces `self` with an implementor-defined "empty" value. |
| fn take(&mut self) -> Self; |
| } |
| |
| impl<T: Default> Takeable for T { |
| fn take(&mut self) -> Self { |
| core::mem::take(self) |
| } |
| } |
| |
| impl<I: Instant + 'static, R: ReceiveBuffer, S: SendBuffer, ActiveOpen: Debug> |
| State<I, R, S, ActiveOpen> |
| { |
| /// Updates this state to the provided new state. |
| fn transition_to_state( |
| &mut self, |
| counters: &TcpCountersInner, |
| new_state: State<I, R, S, ActiveOpen>, |
| ) { |
| if let State::Closed(Closed { reason }) = &new_state { |
| let was_established = match self { |
| State::Closed(_) | State::Listen(_) | State::SynRcvd(_) | State::SynSent(_) => { |
| false |
| } |
| State::Established(_) |
| | State::CloseWait(_) |
| | State::LastAck(_) |
| | State::FinWait1(_) |
| | State::FinWait2(_) |
| | State::Closing(_) |
| | State::TimeWait(_) => true, |
| }; |
| if was_established { |
| counters.established_closed.increment(); |
| match reason { |
| Some(ConnectionError::ConnectionReset) => { |
| counters.established_resets.increment(); |
| } |
| Some(ConnectionError::TimedOut) => { |
| counters.established_timedout.increment(); |
| } |
| _ => {} |
| } |
| } |
| } |
| *self = new_state |
| } |
| /// Processes an incoming segment and advances the state machine. |
| /// |
| /// Returns a segment if one needs to be sent; if a passive open connection |
| /// is newly established, the corresponding object that our client needs |
| /// will be returned. Also returns whether an at least one byte of data was |
| /// ACKed by the incoming segment. |
| pub(crate) fn on_segment<P: Payload, BP: BufferProvider<R, S, ActiveOpen = ActiveOpen>>( |
| &mut self, |
| counters: &TcpCountersInner, |
| incoming: Segment<P>, |
| now: I, |
| SocketOptions { |
| keep_alive, |
| nagle_enabled: _, |
| user_timeout: _, |
| delayed_ack, |
| fin_wait2_timeout, |
| max_syn_retries: _, |
| }: &SocketOptions, |
| defunct: bool, |
| ) -> (Option<Segment<()>>, Option<BP::PassiveOpen>, DataAcked) |
| where |
| BP::PassiveOpen: Debug, |
| ActiveOpen: IntoBuffers<R, S>, |
| { |
| let mut passive_open = None; |
| let mut data_acked = DataAcked::No; |
| let seg = (|| { |
| let (mut rcv_nxt, rcv_wnd, rcv_wnd_scale, snd_max) = match self { |
| State::Closed(closed) => return closed.on_segment(incoming), |
| State::Listen(listen) => { |
| return match listen.on_segment(incoming, now) { |
| ListenOnSegmentDisposition::SendSynAckAndEnterSynRcvd( |
| syn_ack, |
| SynRcvd { |
| iss, |
| irs, |
| timestamp, |
| retrans_timer, |
| simultaneous_open, |
| buffer_sizes, |
| smss, |
| rcv_wnd_scale, |
| snd_wnd_scale, |
| }, |
| ) => { |
| match simultaneous_open { |
| None => { |
| self.transition_to_state( |
| counters, |
| State::SynRcvd(SynRcvd { |
| iss, |
| irs, |
| timestamp, |
| retrans_timer, |
| simultaneous_open: None, |
| buffer_sizes, |
| smss, |
| rcv_wnd_scale, |
| snd_wnd_scale, |
| }), |
| ); |
| } |
| Some(infallible) => match infallible {}, |
| } |
| Some(syn_ack) |
| } |
| ListenOnSegmentDisposition::SendRst(rst) => Some(rst), |
| ListenOnSegmentDisposition::Ignore => None, |
| } |
| } |
| State::SynSent(synsent) => { |
| return match synsent.on_segment(incoming, now) { |
| SynSentOnSegmentDisposition::SendAckAndEnterEstablished(established) => { |
| replace_with_and(self, |this| { |
| assert_matches!(this, State::SynSent(SynSent { |
| active_open, |
| buffer_sizes, |
| rcv_wnd_scale, |
| .. |
| }) => { |
| let (rcv_buffer, snd_buffer) = |
| active_open.into_buffers(buffer_sizes); |
| let mut established = Established { |
| snd: established.snd.with_buffer(snd_buffer), |
| rcv: established.rcv.with_buffer(rcv_buffer), |
| }; |
| let ack = Some(Segment::ack( |
| established.snd.max, |
| established.rcv.nxt(), |
| established.rcv.select_window() >> rcv_wnd_scale, |
| )); |
| (State::Established(established), ack) |
| }) |
| }) |
| } |
| SynSentOnSegmentDisposition::SendSynAckAndEnterSynRcvd( |
| syn_ack, |
| mut syn_rcvd, |
| ) => { |
| replace_with(self, |this| { |
| assert_matches!(this, State::SynSent(SynSent { |
| active_open, |
| .. |
| }) => { |
| assert_matches!(syn_rcvd.simultaneous_open.replace(active_open), None); |
| State::SynRcvd(syn_rcvd) |
| }) |
| }); |
| Some(syn_ack) |
| } |
| SynSentOnSegmentDisposition::SendRst(rst) => Some(rst), |
| SynSentOnSegmentDisposition::EnterClosed(closed) => { |
| self.transition_to_state(counters, State::Closed(closed)); |
| None |
| } |
| SynSentOnSegmentDisposition::Ignore => None, |
| } |
| } |
| State::SynRcvd(SynRcvd { |
| iss, |
| irs, |
| timestamp: _, |
| retrans_timer: _, |
| simultaneous_open: _, |
| buffer_sizes, |
| smss: _, |
| rcv_wnd_scale: _, |
| snd_wnd_scale: _, |
| }) => { |
| // RFC 7323 Section 2.2: |
| // The window field in a segment where the SYN bit is set |
| // (i.e., a <SYN> or <SYN,ACK>) MUST NOT be scaled. |
| let advertised = buffer_sizes.rwnd_unscaled(); |
| ( |
| *irs + 1, |
| advertised << WindowScale::default(), |
| WindowScale::default(), |
| *iss + 1, |
| ) |
| } |
| State::Established(Established { rcv, snd }) => { |
| (rcv.nxt(), rcv.select_window(), rcv.wnd_scale, snd.max) |
| } |
| State::CloseWait(CloseWait { snd, last_ack, last_wnd }) => { |
| (*last_ack, *last_wnd, WindowScale::default(), snd.max) |
| } |
| State::LastAck(LastAck { snd, last_ack, last_wnd }) |
| | State::Closing(Closing { snd, last_ack, last_wnd, last_wnd_scale: _ }) => { |
| (*last_ack, *last_wnd, WindowScale::default(), snd.max) |
| } |
| State::FinWait1(FinWait1 { rcv, snd }) => { |
| (rcv.nxt(), rcv.select_window(), rcv.wnd_scale, snd.max) |
| } |
| State::FinWait2(FinWait2 { last_seq, rcv, timeout_at: _ }) => { |
| (rcv.nxt(), rcv.select_window(), rcv.wnd_scale, *last_seq) |
| } |
| State::TimeWait(TimeWait { |
| last_seq, |
| last_ack, |
| last_wnd, |
| expiry: _, |
| last_wnd_scale: _, |
| }) => (*last_ack, *last_wnd, WindowScale::default(), *last_seq), |
| }; |
| // Unreachable note(1): The above match returns early for states CLOSED, |
| // SYN_SENT and LISTEN, so it is impossible to have the above states |
| // past this line. |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-69): |
| // first check sequence number |
| let is_rst = incoming.contents.control() == Some(Control::RST); |
| // pure ACKs (empty segments) don't need to be ack'ed. |
| let pure_ack = incoming.contents.len() == 0; |
| let needs_ack = !pure_ack; |
| let Segment { seq: seg_seq, ack: seg_ack, wnd: seg_wnd, contents, options: _ } = |
| match incoming.overlap(rcv_nxt, rcv_wnd) { |
| Some(incoming) => incoming, |
| None => { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-69): |
| // If an incoming segment is not acceptable, an acknowledgment |
| // should be sent in reply (unless the RST bit is set, if so drop |
| // the segment and return): |
| // <SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK> |
| // After sending the acknowledgment, drop the unacceptable segment |
| // and return. |
| return if is_rst { |
| None |
| } else { |
| Some(Segment::ack(snd_max, rcv_nxt, rcv_wnd >> rcv_wnd_scale)) |
| }; |
| } |
| }; |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-70): |
| // second check the RST bit |
| // If the RST bit is set then, any outstanding RECEIVEs and SEND |
| // should receive "reset" responses. All segment queues should be |
| // flushed. Users should also receive an unsolicited general |
| // "connection reset" signal. Enter the CLOSED state, delete the |
| // TCB, and return. |
| if contents.control() == Some(Control::RST) { |
| self.transition_to_state( |
| counters, |
| State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }), |
| ); |
| return None; |
| } |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-70): |
| // fourth, check the SYN bit |
| // If the SYN is in the window it is an error, send a reset, any |
| // outstanding RECEIVEs and SEND should receive "reset" responses, |
| // all segment queues should be flushed, the user should also |
| // receive an unsolicited general "connection reset" signal, enter |
| // the CLOSED state, delete the TCB, and return. |
| // If the SYN is not in the window this step would not be reached |
| // and an ack would have been sent in the first step (sequence |
| // number check). |
| if contents.control() == Some(Control::SYN) { |
| self.transition_to_state( |
| counters, |
| State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }), |
| ); |
| return Some(Segment::rst(snd_max)); |
| } |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-72): |
| // fifth check the ACK field |
| match seg_ack { |
| Some(seg_ack) => match self { |
| State::Closed(_) | State::Listen(_) | State::SynSent(_) => { |
| // This unreachable assert is justified by note (1). |
| unreachable!("encountered an already-handled state: {:?}", self) |
| } |
| State::SynRcvd(SynRcvd { |
| iss, |
| irs, |
| timestamp: syn_rcvd_ts, |
| retrans_timer: _, |
| simultaneous_open, |
| buffer_sizes, |
| smss, |
| rcv_wnd_scale, |
| snd_wnd_scale, |
| }) => { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-72): |
| // if the ACK bit is on |
| // SYN-RECEIVED STATE |
| // If SND.UNA =< SEG.ACK =< SND.NXT then enter ESTABLISHED state |
| // and continue processing. |
| // If the segment acknowledgment is not acceptable, form a |
| // reset segment, |
| // <SEQ=SEG.ACK><CTL=RST> |
| // and send it. |
| // Note: We don't support sending data with SYN, so we don't |
| // store the `SND` variables because they can be easily derived |
| // from ISS: SND.UNA=ISS and SND.NXT=ISS+1. |
| if seg_ack != *iss + 1 { |
| return Some(Segment::rst(seg_ack)); |
| } else { |
| let mut rtt_estimator = Estimator::default(); |
| if let Some(syn_rcvd_ts) = syn_rcvd_ts { |
| rtt_estimator.sample(now.duration_since(*syn_rcvd_ts)); |
| } |
| let (rcv_buffer, snd_buffer) = match simultaneous_open.take() { |
| None => { |
| let (rcv_buffer, snd_buffer, client) = |
| BP::new_passive_open_buffers(*buffer_sizes); |
| assert_matches!(passive_open.replace(client), None); |
| (rcv_buffer, snd_buffer) |
| } |
| Some(active_open) => active_open.into_buffers(*buffer_sizes), |
| }; |
| let (snd_wnd_scale, rcv_wnd_scale) = snd_wnd_scale |
| .map(|snd_wnd_scale| (snd_wnd_scale, *rcv_wnd_scale)) |
| .unwrap_or_default(); |
| let established = Established { |
| snd: Send { |
| nxt: *iss + 1, |
| max: *iss + 1, |
| una: seg_ack, |
| wnd: seg_wnd << snd_wnd_scale, |
| wl1: seg_seq, |
| wl2: seg_ack, |
| buffer: snd_buffer, |
| last_seq_ts: None, |
| rtt_estimator, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(*smss), |
| wnd_scale: snd_wnd_scale, |
| wnd_max: seg_wnd << snd_wnd_scale, |
| }, |
| rcv: Recv { |
| buffer: rcv_buffer, |
| assembler: Assembler::new(*irs + 1), |
| timer: None, |
| mss: *smss, |
| wnd_scale: rcv_wnd_scale, |
| last_window_update: (*irs + 1, buffer_sizes.rwnd()), |
| }, |
| }; |
| self.transition_to_state(counters, State::Established(established)); |
| } |
| // Unreachable note(2): Because we either return early or |
| // transition to Established for the ack processing, it is |
| // impossible for SYN_RCVD to appear past this line. |
| } |
| State::Established(Established { snd, rcv: _ }) |
| | State::CloseWait(CloseWait { snd, last_ack: _, last_wnd: _ }) => { |
| let (ack, segment_acked_data) = snd.process_ack( |
| counters, seg_seq, seg_ack, seg_wnd, pure_ack, rcv_nxt, rcv_wnd, now, |
| keep_alive, |
| ); |
| data_acked = segment_acked_data; |
| if let Some(ack) = ack { |
| return Some(ack); |
| } |
| } |
| State::LastAck(LastAck { snd, last_ack: _, last_wnd: _ }) => { |
| let BufferLimits { len, capacity: _ } = snd.buffer.limits(); |
| let fin_seq = snd.una + len + 1; |
| let (ack, segment_acked_data) = snd.process_ack( |
| counters, seg_seq, seg_ack, seg_wnd, pure_ack, rcv_nxt, rcv_wnd, now, |
| keep_alive, |
| ); |
| data_acked = segment_acked_data; |
| if let Some(ack) = ack { |
| return Some(ack); |
| } else if seg_ack == fin_seq { |
| self.transition_to_state( |
| counters, |
| State::Closed(Closed { reason: None }), |
| ); |
| return None; |
| } |
| } |
| State::FinWait1(FinWait1 { snd, rcv }) => { |
| let BufferLimits { len, capacity: _ } = snd.buffer.limits(); |
| let fin_seq = snd.una + len + 1; |
| let (ack, segment_acked_data) = snd.process_ack( |
| counters, seg_seq, seg_ack, seg_wnd, pure_ack, rcv_nxt, rcv_wnd, now, |
| keep_alive, |
| ); |
| data_acked = segment_acked_data; |
| if let Some(ack) = ack { |
| return Some(ack); |
| } else if seg_ack == fin_seq { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-73): |
| // In addition to the processing for the ESTABLISHED |
| // state, if the FIN segment is now acknowledged then |
| // enter FIN-WAIT-2 and continue processing in that |
| // state |
| let last_seq = snd.nxt; |
| let finwait2 = FinWait2 { |
| last_seq, |
| rcv: rcv.take(), |
| // If the connection is already defunct, we set |
| // a timeout to reclaim, but otherwise, a later |
| // `close` call should set the timer. |
| timeout_at: fin_wait2_timeout |
| .and_then(|timeout| defunct.then_some(now.add(timeout))), |
| }; |
| self.transition_to_state(counters, State::FinWait2(finwait2)); |
| } |
| } |
| State::Closing(Closing { snd, last_ack, last_wnd, last_wnd_scale }) => { |
| let BufferLimits { len, capacity: _ } = snd.buffer.limits(); |
| let fin_seq = snd.una + len + 1; |
| let (ack, segment_acked_data) = snd.process_ack( |
| counters, seg_seq, seg_ack, seg_wnd, pure_ack, rcv_nxt, rcv_wnd, now, |
| keep_alive, |
| ); |
| data_acked = segment_acked_data; |
| if let Some(ack) = ack { |
| data_acked = segment_acked_data; |
| return Some(ack); |
| } else if seg_ack == fin_seq { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-73): |
| // In addition to the processing for the ESTABLISHED state, if |
| // the ACK acknowledges our FIN then enter the TIME-WAIT state, |
| // otherwise ignore the segment. |
| let timewait = TimeWait { |
| last_seq: snd.nxt, |
| last_ack: *last_ack, |
| last_wnd: *last_wnd, |
| expiry: new_time_wait_expiry(now), |
| last_wnd_scale: *last_wnd_scale, |
| }; |
| self.transition_to_state(counters, State::TimeWait(timewait)); |
| } |
| } |
| State::FinWait2(_) | State::TimeWait(_) => {} |
| }, |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-72): |
| // if the ACK bit is off drop the segment and return |
| None => return None, |
| } |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-74): |
| // seventh, process the segment text |
| // Once in the ESTABLISHED state, it is possible to deliver segment |
| // text to user RECEIVE buffers. Text from segments can be moved |
| // into buffers until either the buffer is full or the segment is |
| // empty. If the segment empties and carries an PUSH flag, then |
| // the user is informed, when the buffer is returned, that a PUSH |
| // has been received. |
| // |
| // When the TCP takes responsibility for delivering the data to the |
| // user it must also acknowledge the receipt of the data. |
| // Once the TCP takes responsibility for the data it advances |
| // RCV.NXT over the data accepted, and adjusts RCV.WND as |
| // apporopriate to the current buffer availability. The total of |
| // RCV.NXT and RCV.WND should not be reduced. |
| // |
| // Please note the window management suggestions in section 3.7. |
| // Send an acknowledgment of the form: |
| // <SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK> |
| // This acknowledgment should be piggybacked on a segment being |
| // transmitted if possible without incurring undue delay. |
| let ack_to_text = if needs_ack { |
| match self { |
| State::Closed(_) | State::Listen(_) | State::SynRcvd(_) | State::SynSent(_) => { |
| // This unreachable assert is justified by note (1) and (2). |
| unreachable!("encountered an already-handled state: {:?}", self) |
| } |
| State::Established(Established { snd: _, rcv }) |
| | State::FinWait1(FinWait1 { snd: _, rcv }) |
| | State::FinWait2(FinWait2 { last_seq: _, rcv, timeout_at: _ }) => { |
| // Write the segment data in the buffer and keep track if it fills |
| // any hole in the assembler. |
| let had_out_of_order = rcv.assembler.has_out_of_order(); |
| if contents.data().len() > 0 { |
| let offset = usize::try_from(seg_seq - rcv.nxt()).unwrap_or_else(|TryFromIntError {..}| { |
| panic!("The segment was trimmed to fit the window, thus seg.seq({:?}) must not come before rcv.nxt({:?})", seg_seq, rcv.nxt()); |
| }); |
| let nwritten = rcv.buffer.write_at(offset, contents.data()); |
| let readable = rcv.assembler.insert(seg_seq..seg_seq + nwritten); |
| rcv.buffer.make_readable(readable); |
| rcv_nxt = rcv.nxt(); |
| } |
| // Per RFC 5681 Section 4.2: |
| // Out-of-order data segments SHOULD be acknowledged |
| // immediately, ... the receiver SHOULD send an |
| // immediate ACK when it receives a data segment that |
| // fills in all or part of a gap in the sequence space. |
| let immediate_ack = |
| !*delayed_ack || had_out_of_order || rcv.assembler.has_out_of_order(); |
| if immediate_ack { |
| rcv.timer = None; |
| } else { |
| match &mut rcv.timer { |
| Some(ReceiveTimer::DelayedAck { at: _, received_bytes }) => { |
| *received_bytes = received_bytes.saturating_add( |
| u32::try_from(contents.data().len()).unwrap_or(u32::MAX), |
| ); |
| // Per RFC 5681 Section 4.2: |
| // An implementation is deemed to comply |
| // with this requirement if it sends at |
| // least one acknowledgment every time it |
| // receives 2*RMSS bytes of new data from |
| // the sender |
| if received_bytes.get() >= 2 * u32::from(rcv.mss) { |
| rcv.timer = None; |
| } |
| } |
| None => { |
| if let Some(received_bytes) = NonZeroU32::new( |
| u32::try_from(contents.data().len()).unwrap_or(u32::MAX), |
| ) { |
| rcv.timer = Some(ReceiveTimer::DelayedAck { |
| at: now.add(ACK_DELAY_THRESHOLD), |
| received_bytes, |
| }) |
| } |
| } |
| } |
| } |
| (!matches!(rcv.timer, Some(ReceiveTimer::DelayedAck { .. }))).then_some( |
| Segment::ack(snd_max, rcv.nxt(), rcv.select_window() >> rcv.wnd_scale), |
| ) |
| } |
| State::CloseWait(_) |
| | State::LastAck(_) |
| | State::Closing(_) |
| | State::TimeWait(_) => { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-75): |
| // This should not occur, since a FIN has been received from the |
| // remote side. Ignore the segment text. |
| None |
| } |
| } |
| } else { |
| None |
| }; |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-75): |
| // eighth, check the FIN bit |
| let ack_to_fin = if contents.control() == Some(Control::FIN) |
| && rcv_nxt == seg_seq + contents.data().len() |
| { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-75): |
| // If the FIN bit is set, signal the user "connection closing" and |
| // return any pending RECEIVEs with same message, advance RCV.NXT |
| // over the FIN, and send an acknowledgment for the FIN. |
| match self { |
| State::Closed(_) | State::Listen(_) | State::SynRcvd(_) | State::SynSent(_) => { |
| // This unreachable assert is justified by note (1) and (2). |
| unreachable!("encountered an already-handled state: {:?}", self) |
| } |
| State::Established(Established { snd, rcv }) => { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-75): |
| // Enter the CLOSE-WAIT state. |
| let last_ack = rcv.nxt() + 1; |
| let last_wnd = |
| rcv.select_window().checked_sub(1).unwrap_or(WindowSize::ZERO); |
| let scaled_wnd = last_wnd >> rcv.wnd_scale; |
| let closewait = CloseWait { snd: snd.take(), last_ack, last_wnd }; |
| self.transition_to_state(counters, State::CloseWait(closewait)); |
| Some(Segment::ack(snd_max, last_ack, scaled_wnd)) |
| } |
| State::CloseWait(_) | State::LastAck(_) | State::Closing(_) => { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-75): |
| // CLOSE-WAIT STATE |
| // Remain in the CLOSE-WAIT state. |
| // CLOSING STATE |
| // Remain in the CLOSING state. |
| // LAST-ACK STATE |
| // Remain in the LAST-ACK state. |
| None |
| } |
| State::FinWait1(FinWait1 { snd, rcv }) => { |
| let last_ack = rcv.nxt() + 1; |
| let last_wnd = |
| rcv.select_window().checked_sub(1).unwrap_or(WindowSize::ZERO); |
| let scaled_wnd = last_wnd >> rcv.wnd_scale; |
| let closing = Closing { |
| snd: snd.take(), |
| last_ack, |
| last_wnd, |
| last_wnd_scale: rcv.wnd_scale, |
| }; |
| self.transition_to_state(counters, State::Closing(closing)); |
| Some(Segment::ack(snd_max, last_ack, scaled_wnd)) |
| } |
| State::FinWait2(FinWait2 { last_seq, rcv, timeout_at: _ }) => { |
| let last_ack = rcv.nxt() + 1; |
| let last_wnd = |
| rcv.select_window().checked_sub(1).unwrap_or(WindowSize::ZERO); |
| let scaled_window = last_wnd >> rcv.wnd_scale; |
| let timewait = TimeWait { |
| last_seq: *last_seq, |
| last_ack, |
| last_wnd, |
| expiry: new_time_wait_expiry(now), |
| last_wnd_scale: rcv.wnd_scale, |
| }; |
| self.transition_to_state(counters, State::TimeWait(timewait)); |
| Some(Segment::ack(snd_max, last_ack, scaled_window)) |
| } |
| State::TimeWait(TimeWait { |
| last_seq, |
| last_ack, |
| last_wnd, |
| expiry, |
| last_wnd_scale, |
| }) => { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-76): |
| // TIME-WAIT STATE |
| // Remain in the TIME-WAIT state. Restart the 2 MSL time-wait |
| // timeout. |
| *expiry = new_time_wait_expiry(now); |
| Some(Segment::ack(*last_seq, *last_ack, *last_wnd >> *last_wnd_scale)) |
| } |
| } |
| } else { |
| None |
| }; |
| // If we generated an ACK to FIN, then because of the cumulative nature |
| // of ACKs, the ACK generated to text (if any) can be safely overridden. |
| ack_to_fin.or(ack_to_text) |
| })(); |
| (seg, passive_open, data_acked) |
| } |
| |
| /// Polls if there are any bytes available to send in the buffer. |
| /// |
| /// Forms one segment of at most `limit` available bytes, as long as the |
| /// receiver window allows. |
| pub(crate) fn poll_send( |
| &mut self, |
| counters: &TcpCountersInner, |
| limit: u32, |
| now: I, |
| socket_options: &SocketOptions, |
| ) -> Option<Segment<SendPayload<'_>>> { |
| if self.poll_close(counters, now, socket_options) { |
| return None; |
| } |
| fn poll_rcv_then_snd< |
| 'a, |
| I: Instant, |
| R: ReceiveBuffer, |
| S: SendBuffer, |
| const FIN_QUEUED: bool, |
| >( |
| counters: &TcpCountersInner, |
| snd: &'a mut Send<I, S, FIN_QUEUED>, |
| rcv: &'a mut Recv<I, R>, |
| limit: u32, |
| now: I, |
| socket_options: &SocketOptions, |
| ) -> Option<Segment<SendPayload<'a>>> { |
| // We favor `rcv` over `snd` if both are present. The alternative |
| // will also work (sending whatever is ready at the same time of |
| // sending the delayed ACK) but we need special treatment for |
| // FIN_WAIT_1 if we choose the alternative, because otherwise we |
| // will have a spurious retransmitted FIN. |
| let rcv_nxt = rcv.nxt(); |
| let rcv_wnd = rcv.select_window(); |
| let seg = rcv |
| .poll_send(snd.max, now) |
| .map(Into::into) |
| .or_else(|| snd.poll_send(counters, rcv_nxt, rcv_wnd, limit, now, socket_options)); |
| // We must have piggybacked an ACK so we can cancel the timer now. |
| if seg.is_some() && matches!(rcv.timer, Some(ReceiveTimer::DelayedAck { .. })) { |
| rcv.timer = None; |
| } |
| seg |
| } |
| match self { |
| State::SynSent(SynSent { |
| iss, |
| timestamp, |
| retrans_timer, |
| active_open: _, |
| buffer_sizes: _, |
| device_mss, |
| default_mss: _, |
| rcv_wnd_scale, |
| }) => (retrans_timer.at <= now).then(|| { |
| *timestamp = None; |
| retrans_timer.backoff(now); |
| Segment::syn( |
| *iss, |
| UnscaledWindowSize::from(u16::MAX), |
| Options { mss: Some(*device_mss), window_scale: Some(*rcv_wnd_scale) }, |
| ) |
| .into() |
| }), |
| State::SynRcvd(SynRcvd { |
| iss, |
| irs, |
| timestamp, |
| retrans_timer, |
| simultaneous_open: _, |
| buffer_sizes: _, |
| smss, |
| rcv_wnd_scale, |
| snd_wnd_scale, |
| }) => (retrans_timer.at <= now).then(|| { |
| *timestamp = None; |
| retrans_timer.backoff(now); |
| Segment::syn_ack( |
| *iss, |
| *irs + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| Options { |
| mss: Some(*smss), |
| window_scale: snd_wnd_scale.map(|_| *rcv_wnd_scale), |
| }, |
| ) |
| .into() |
| }), |
| State::Established(Established { snd, rcv }) => { |
| poll_rcv_then_snd(counters, snd, rcv, limit, now, socket_options) |
| } |
| State::CloseWait(CloseWait { snd, last_ack, last_wnd }) => { |
| snd.poll_send(counters, *last_ack, *last_wnd, limit, now, socket_options) |
| } |
| State::LastAck(LastAck { snd, last_ack, last_wnd }) |
| | State::Closing(Closing { snd, last_ack, last_wnd, last_wnd_scale: _ }) => { |
| snd.poll_send(counters, *last_ack, *last_wnd, limit, now, socket_options) |
| } |
| State::FinWait1(FinWait1 { snd, rcv }) => { |
| poll_rcv_then_snd(counters, snd, rcv, limit, now, socket_options) |
| } |
| State::FinWait2(FinWait2 { last_seq, rcv, timeout_at: _ }) => { |
| rcv.poll_send(*last_seq, now).map(Into::into) |
| } |
| State::Closed(_) | State::Listen(_) | State::TimeWait(_) => None, |
| } |
| } |
| |
| /// Polls the state machine to check if the connection should be closed. |
| /// |
| /// Returns whether the connection has been closed. |
| fn poll_close( |
| &mut self, |
| counters: &TcpCountersInner, |
| now: I, |
| SocketOptions { |
| keep_alive, |
| nagle_enabled: _, |
| user_timeout: _, |
| delayed_ack: _, |
| fin_wait2_timeout: _, |
| max_syn_retries: _, |
| }: &SocketOptions, |
| ) -> bool { |
| let timed_out = match self { |
| State::Established(Established { snd, rcv: _ }) => snd.timed_out(now, keep_alive), |
| State::CloseWait(CloseWait { snd, last_ack: _, last_wnd: _ }) => { |
| snd.timed_out(now, keep_alive) |
| } |
| State::LastAck(LastAck { snd, last_ack: _, last_wnd: _ }) |
| | State::Closing(Closing { snd, last_ack: _, last_wnd: _, last_wnd_scale: _ }) => { |
| snd.timed_out(now, keep_alive) |
| } |
| State::FinWait1(FinWait1 { snd, rcv: _ }) => snd.timed_out(now, keep_alive), |
| State::SynSent(SynSent { |
| iss: _, |
| timestamp: _, |
| retrans_timer, |
| active_open: _, |
| buffer_sizes: _, |
| device_mss: _, |
| default_mss: _, |
| rcv_wnd_scale: _, |
| }) |
| | State::SynRcvd(SynRcvd { |
| iss: _, |
| irs: _, |
| timestamp: _, |
| retrans_timer, |
| simultaneous_open: _, |
| buffer_sizes: _, |
| smss: _, |
| rcv_wnd_scale: _, |
| snd_wnd_scale: _, |
| }) => retrans_timer.timed_out(now), |
| |
| State::Closed(_) | State::Listen(_) | State::TimeWait(_) => false, |
| State::FinWait2(FinWait2 { last_seq: _, rcv: _, timeout_at }) => { |
| timeout_at.map(|at| now >= at).unwrap_or(false) |
| } |
| }; |
| if timed_out { |
| self.transition_to_state( |
| counters, |
| State::Closed(Closed { reason: Some(ConnectionError::TimedOut) }), |
| ); |
| } else if let State::TimeWait(tw) = self { |
| if tw.expiry <= now { |
| self.transition_to_state(counters, State::Closed(Closed { reason: None })); |
| } |
| } |
| matches!(self, State::Closed(_)) |
| } |
| |
| /// Returns an instant at which the caller SHOULD make their best effort to |
| /// call [`poll_send`]. |
| /// |
| /// An example synchronous protocol loop would look like: |
| /// |
| /// ```ignore |
| /// loop { |
| /// let now = Instant::now(); |
| /// output(state.poll_send(now)); |
| /// let incoming = wait_until(state.poll_send_at()) |
| /// output(state.on_segment(incoming, Instant::now())); |
| /// } |
| /// ``` |
| /// |
| /// Note: When integrating asynchronously, the caller needs to install |
| /// timers (for example, by using `TimerContext`), then calls to |
| /// `poll_send_at` and to `install_timer`/`cancel_timer` should not |
| /// interleave, otherwise timers may be lost. |
| pub(crate) fn poll_send_at(&self) -> Option<I> { |
| let combine_expiry = |e1: Option<I>, e2: Option<I>| match (e1, e2) { |
| (None, None) => None, |
| (None, Some(e2)) => Some(e2), |
| (Some(e1), None) => Some(e1), |
| (Some(e1), Some(e2)) => Some(e1.min(e2)), |
| }; |
| match self { |
| State::Established(Established { snd, rcv }) => combine_expiry( |
| snd.timer.as_ref().map(SendTimer::expiry), |
| rcv.timer.as_ref().map(ReceiveTimer::expiry), |
| ), |
| State::CloseWait(CloseWait { snd, last_ack: _, last_wnd: _ }) => { |
| Some(snd.timer?.expiry()) |
| } |
| State::LastAck(LastAck { snd, last_ack: _, last_wnd: _ }) |
| | State::Closing(Closing { snd, last_ack: _, last_wnd: _, last_wnd_scale: _ }) => { |
| Some(snd.timer?.expiry()) |
| } |
| State::FinWait1(FinWait1 { snd, rcv }) => combine_expiry( |
| snd.timer.as_ref().map(SendTimer::expiry), |
| rcv.timer.as_ref().map(ReceiveTimer::expiry), |
| ), |
| State::FinWait2(FinWait2 { last_seq: _, rcv, timeout_at }) => { |
| combine_expiry(*timeout_at, rcv.timer.as_ref().map(ReceiveTimer::expiry)) |
| } |
| State::SynRcvd(syn_rcvd) => Some(syn_rcvd.retrans_timer.at), |
| State::SynSent(syn_sent) => Some(syn_sent.retrans_timer.at), |
| State::Closed(_) | State::Listen(_) => None, |
| State::TimeWait(TimeWait { |
| last_seq: _, |
| last_ack: _, |
| last_wnd: _, |
| expiry, |
| last_wnd_scale: _, |
| }) => Some(*expiry), |
| } |
| } |
| |
| /// Corresponds to the [CLOSE](https://tools.ietf.org/html/rfc793#page-60) |
| /// user call. |
| /// |
| /// The caller should provide the current time if this close call would make |
| /// the connection defunct, so that we can reclaim defunct connections based |
| /// on timeouts. |
| pub(super) fn close( |
| &mut self, |
| counters: &TcpCountersInner, |
| close_reason: CloseReason<I>, |
| socket_options: &SocketOptions, |
| ) -> Result<(), CloseError> |
| where |
| ActiveOpen: IntoBuffers<R, S>, |
| { |
| match self { |
| State::Closed(_) => Err(CloseError::NoConnection), |
| State::Listen(_) | State::SynSent(_) => { |
| self.transition_to_state(counters, State::Closed(Closed { reason: None })); |
| Ok(()) |
| } |
| State::SynRcvd(SynRcvd { |
| iss, |
| irs, |
| timestamp: _, |
| retrans_timer: _, |
| simultaneous_open, |
| buffer_sizes, |
| smss, |
| rcv_wnd_scale, |
| snd_wnd_scale, |
| }) => { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-60): |
| // SYN-RECEIVED STATE |
| // If no SENDs have been issued and there is no pending data |
| // to send, then form a FIN segment and send it, and enter |
| // FIN-WAIT-1 state; otherwise queue for processing after |
| // entering ESTABLISHED state. |
| // Note: Per RFC, we should transition into FIN-WAIT-1, however |
| // popular implementations deviate from it - Freebsd resets the |
| // connection instead of a normal shutdown: |
| // https://github.com/freebsd/freebsd-src/blob/8fc80638496e620519b2585d9fab409494ea4b43/sys/netinet/tcp_subr.c#L2344-L2346 |
| // while Linux simply does not send anything: |
| // https://github.com/torvalds/linux/blob/68e77ffbfd06ae3ef8f2abf1c3b971383c866983/net/ipv4/inet_connection_sock.c#L1180-L1187 |
| // Here we choose the Linux's behavior, because it is more |
| // popular and it is still correct from the protocol's point of |
| // view: the peer will find out eventually when it retransmits |
| // its SYN - it will get a RST back because now the listener no |
| // longer exists - it is as if the initial SYN is lost. The |
| // following check makes sure we only proceed if we were |
| // actively opened, i.e., initiated by `connect`. |
| let (rcv_buffer, snd_buffer) = simultaneous_open |
| .take() |
| .expect( |
| "a SYN-RCVD state that is in the pending queue \ |
| should call abort instead of close", |
| ) |
| .into_buffers(*buffer_sizes); |
| // Note: `Send` in `FinWait1` always has a FIN queued. |
| // Since we don't support sending data when connection |
| // isn't established, so enter FIN-WAIT-1 immediately. |
| let (snd_wnd_scale, rcv_wnd_scale) = snd_wnd_scale |
| .map(|snd_wnd_scale| (snd_wnd_scale, *rcv_wnd_scale)) |
| .unwrap_or_default(); |
| let finwait1 = FinWait1 { |
| snd: Send { |
| nxt: *iss + 1, |
| max: *iss + 1, |
| una: *iss + 1, |
| wnd: WindowSize::DEFAULT, |
| wl1: *iss, |
| wl2: *irs, |
| buffer: snd_buffer, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::NoSample, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(*smss), |
| wnd_scale: snd_wnd_scale, |
| wnd_max: WindowSize::DEFAULT, |
| }, |
| rcv: Recv { |
| buffer: rcv_buffer, |
| assembler: Assembler::new(*irs + 1), |
| timer: None, |
| mss: *smss, |
| wnd_scale: rcv_wnd_scale, |
| last_window_update: (*irs + 1, buffer_sizes.rwnd()), |
| }, |
| }; |
| self.transition_to_state(counters, State::FinWait1(finwait1)); |
| Ok(()) |
| } |
| State::Established(Established { snd, rcv }) => { |
| // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-60): |
| // ESTABLISHED STATE |
| // Queue this until all preceding SENDs have been segmentized, |
| // then form a FIN segment and send it. In any case, enter |
| // FIN-WAIT-1 state. |
| let finwait1 = FinWait1 { snd: snd.take().queue_fin(), rcv: rcv.take() }; |
| self.transition_to_state(counters, State::FinWait1(finwait1)); |
| Ok(()) |
| } |
| State::CloseWait(CloseWait { snd, last_ack, last_wnd }) => { |
| let lastack = LastAck { |
| snd: snd.take().queue_fin(), |
| last_ack: *last_ack, |
| last_wnd: *last_wnd, |
| }; |
| self.transition_to_state(counters, State::LastAck(lastack)); |
| Ok(()) |
| } |
| State::LastAck(_) | State::FinWait1(_) | State::Closing(_) | State::TimeWait(_) => { |
| Err(CloseError::Closing) |
| } |
| State::FinWait2(FinWait2 { last_seq: _, rcv: _, timeout_at }) => { |
| if let (CloseReason::Close { now }, Some(fin_wait2_timeout)) = |
| (close_reason, socket_options.fin_wait2_timeout) |
| { |
| assert_eq!(timeout_at.replace(now.add(fin_wait2_timeout)), None); |
| } |
| Err(CloseError::Closing) |
| } |
| } |
| } |
| |
| /// Corresponds to [ABORT](https://tools.ietf.org/html/rfc9293#section-3.10.5) |
| /// user call. |
| pub(crate) fn abort(&mut self, counters: &TcpCountersInner) -> Option<Segment<()>> { |
| let reply = match self { |
| // LISTEN STATE |
| // * Any outstanding RECEIVEs should be returned with "error: |
| // connection reset" responses. Delete TCB, enter CLOSED state, and |
| // return. |
| // SYN-SENT STATE |
| // * All queued SENDs and RECEIVEs should be given "connection reset" |
| // notification. Delete the TCB, enter CLOSED state, and return. |
| // CLOSING STATE |
| // LAST-ACK STATE |
| // TIME-WAIT STATE |
| // * Respond with "ok" and delete the TCB, enter CLOSED state, and |
| // return. |
| State::Closed(_) |
| | State::Listen(_) |
| | State::SynSent(_) |
| | State::Closing(_) |
| | State::LastAck(_) |
| | State::TimeWait(_) => None, |
| // SYN-RECEIVED STATE |
| // ESTABLISHED STATE |
| // FIN-WAIT-1 STATE |
| // FIN-WAIT-2 STATE |
| // CLOSE-WAIT STATE |
| // * Send a reset segment: |
| // <SEQ=SND.NXT><CTL=RST> |
| // * All queued SENDs and RECEIVEs should be given "connection reset" |
| // notification; all segments queued for transmission (except for the |
| // RST formed above) or retransmission should be flushed. Delete the |
| // TCB, enter CLOSED state, and return. |
| State::SynRcvd(SynRcvd { |
| iss, |
| irs, |
| timestamp: _, |
| retrans_timer: _, |
| simultaneous_open: _, |
| buffer_sizes: _, |
| smss: _, |
| rcv_wnd_scale: _, |
| snd_wnd_scale: _, |
| }) => Some(Segment::rst_ack(*iss, *irs + 1)), |
| State::Established(Established { snd, rcv }) => { |
| Some(Segment::rst_ack(snd.nxt, rcv.nxt())) |
| } |
| State::FinWait1(FinWait1 { snd, rcv }) => Some(Segment::rst_ack(snd.nxt, rcv.nxt())), |
| State::FinWait2(FinWait2 { rcv, last_seq, timeout_at: _ }) => { |
| Some(Segment::rst_ack(*last_seq, rcv.nxt())) |
| } |
| State::CloseWait(CloseWait { snd, last_ack, last_wnd: _ }) => { |
| Some(Segment::rst_ack(snd.nxt, *last_ack)) |
| } |
| }; |
| self.transition_to_state( |
| counters, |
| State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }), |
| ); |
| reply |
| } |
| |
| pub(crate) fn set_send_buffer_size(&mut self, size: usize) { |
| match self { |
| State::FinWait2(_) | State::TimeWait(_) | State::Closed(_) => (), |
| State::Listen(Listen { |
| iss: _, |
| buffer_sizes: BufferSizes { send, receive: _ }, |
| device_mss: _, |
| default_mss: _, |
| user_timeout: _, |
| }) |
| | State::SynRcvd(SynRcvd { |
| iss: _, |
| irs: _, |
| timestamp: _, |
| retrans_timer: _, |
| simultaneous_open: _, |
| buffer_sizes: BufferSizes { send, receive: _ }, |
| smss: _, |
| rcv_wnd_scale: _, |
| snd_wnd_scale: _, |
| }) |
| | State::SynSent(SynSent { |
| iss: _, |
| timestamp: _, |
| retrans_timer: _, |
| active_open: _, |
| buffer_sizes: BufferSizes { send, receive: _ }, |
| device_mss: _, |
| default_mss: _, |
| rcv_wnd_scale: _, |
| }) => *send = size, |
| State::Established(Established { snd, rcv: _ }) => snd.set_capacity(size), |
| State::FinWait1(FinWait1 { snd, rcv: _ }) => snd.set_capacity(size), |
| State::Closing(Closing { snd, last_ack: _, last_wnd: _, last_wnd_scale: _ }) |
| | State::LastAck(LastAck { snd, last_ack: _, last_wnd: _ }) => snd.set_capacity(size), |
| State::CloseWait(CloseWait { snd, last_ack: _, last_wnd: _ }) => snd.set_capacity(size), |
| } |
| } |
| |
| pub(crate) fn set_receive_buffer_size(&mut self, size: usize) { |
| match self { |
| State::Closing(_) |
| | State::LastAck(_) |
| | State::CloseWait(_) |
| | State::TimeWait(_) |
| | State::Closed(_) => (), |
| State::Listen(Listen { |
| iss: _, |
| buffer_sizes: BufferSizes { send: _, receive }, |
| device_mss: _, |
| default_mss: _, |
| user_timeout: _, |
| }) |
| | State::SynRcvd(SynRcvd { |
| iss: _, |
| irs: _, |
| timestamp: _, |
| retrans_timer: _, |
| simultaneous_open: _, |
| buffer_sizes: BufferSizes { send: _, receive }, |
| smss: _, |
| rcv_wnd_scale: _, |
| snd_wnd_scale: _, |
| }) |
| | State::SynSent(SynSent { |
| iss: _, |
| timestamp: _, |
| retrans_timer: _, |
| active_open: _, |
| buffer_sizes: BufferSizes { send: _, receive }, |
| device_mss: _, |
| default_mss: _, |
| rcv_wnd_scale: _, |
| }) => *receive = size, |
| State::Established(Established { snd: _, rcv }) => rcv.set_capacity(size), |
| State::FinWait1(FinWait1 { snd: _, rcv }) |
| | State::FinWait2(FinWait2 { last_seq: _, rcv, timeout_at: _ }) => { |
| rcv.set_capacity(size) |
| } |
| } |
| } |
| |
| pub(crate) fn target_buffer_sizes(&self) -> OptionalBufferSizes { |
| match self { |
| State::TimeWait(_) | State::Closed(_) => { |
| OptionalBufferSizes { send: None, receive: None } |
| } |
| State::Listen(Listen { |
| iss: _, |
| buffer_sizes, |
| device_mss: _, |
| default_mss: _, |
| user_timeout: _, |
| }) |
| | State::SynRcvd(SynRcvd { |
| iss: _, |
| irs: _, |
| timestamp: _, |
| retrans_timer: _, |
| simultaneous_open: _, |
| buffer_sizes, |
| smss: _, |
| rcv_wnd_scale: _, |
| snd_wnd_scale: _, |
| }) |
| | State::SynSent(SynSent { |
| iss: _, |
| timestamp: _, |
| retrans_timer: _, |
| active_open: _, |
| buffer_sizes, |
| device_mss: _, |
| default_mss: _, |
| rcv_wnd_scale: _, |
| }) => buffer_sizes.into_optional(), |
| State::Established(Established { snd, rcv }) => OptionalBufferSizes { |
| send: Some(snd.target_capacity()), |
| receive: Some(rcv.target_capacity()), |
| }, |
| State::FinWait1(FinWait1 { snd, rcv }) => OptionalBufferSizes { |
| send: Some(snd.target_capacity()), |
| receive: Some(rcv.target_capacity()), |
| }, |
| State::FinWait2(FinWait2 { last_seq: _, rcv, timeout_at: _ }) => { |
| OptionalBufferSizes { send: None, receive: Some(rcv.target_capacity()) } |
| } |
| State::Closing(Closing { snd, last_ack: _, last_wnd: _, last_wnd_scale: _ }) |
| | State::LastAck(LastAck { snd, last_ack: _, last_wnd: _ }) => { |
| OptionalBufferSizes { send: Some(snd.target_capacity()), receive: None } |
| } |
| State::CloseWait(CloseWait { snd, last_ack: _, last_wnd: _ }) => { |
| OptionalBufferSizes { send: Some(snd.target_capacity()), receive: None } |
| } |
| } |
| } |
| |
| /// Processes an incoming ICMP error, returns an soft error that needs to be |
| /// recorded in the containing socket. |
| pub(super) fn on_icmp_error( |
| &mut self, |
| counters: &TcpCountersInner, |
| err: IcmpErrorCode, |
| seq: SeqNum, |
| ) -> Option<ConnectionError> { |
| let err = Option::<ConnectionError>::from(err)?; |
| // We consider the following RFC quotes when implementing this function. |
| // Per RFC 5927 Section 4.1: |
| // Many TCP implementations have incorporated a validation check such |
| // that they react only to those ICMP error messages that appear to |
| // relate to segments currently "in flight" to the destination system. |
| // These implementations check that the TCP sequence number contained |
| // in the payload of the ICMP error message is within the range |
| // SND.UNA =< SEG.SEQ < SND.NXT. |
| // Per RFC 5927 Section 5.2: |
| // Based on this analysis, most popular TCP implementations treat all |
| // ICMP "hard errors" received for connections in any of the |
| // synchronized states (ESTABLISHED, FIN-WAIT-1, FIN-WAIT-2, CLOSE-WAIT, |
| // CLOSING, LAST-ACK, or TIME-WAIT) as "soft errors". That is, they do |
| // not abort the corresponding connection upon receipt of them. |
| // Per RFC 5461 Section 4.1: |
| // A number of TCP implementations have modified their reaction to all |
| // ICMP soft errors and treat them as hard errors when they are received |
| // for connections in the SYN-SENT or SYN-RECEIVED states. For example, |
| // this workaround has been implemented in the Linux kernel since |
| // version 2.0.0 (released in 1996) [Linux] |
| match self { |
| State::Closed(_) => None, |
| State::Listen(listen) => unreachable!( |
| "ICMP errors should not be delivered on a listener, received code {:?} on {:?}", |
| err, listen |
| ), |
| State::SynRcvd(SynRcvd { |
| iss, |
| irs: _, |
| timestamp: _, |
| retrans_timer: _, |
| simultaneous_open: _, |
| buffer_sizes: _, |
| smss: _, |
| rcv_wnd_scale: _, |
| snd_wnd_scale: _, |
| }) |
| | State::SynSent(SynSent { |
| iss, |
| timestamp: _, |
| retrans_timer: _, |
| active_open: _, |
| buffer_sizes: _, |
| device_mss: _, |
| default_mss: _, |
| rcv_wnd_scale: _, |
| }) => { |
| if *iss == seq { |
| self.transition_to_state(counters, State::Closed(Closed { reason: Some(err) })); |
| } |
| None |
| } |
| State::Established(Established { snd, rcv: _ }) |
| | State::CloseWait(CloseWait { snd, last_ack: _, last_wnd: _ }) => { |
| (!snd.una.after(seq) && seq.before(snd.nxt)).then_some(err) |
| } |
| State::LastAck(LastAck { snd, last_ack: _, last_wnd: _ }) |
| | State::FinWait1(FinWait1 { snd, rcv: _ }) => { |
| (!snd.una.after(seq) && seq.before(snd.nxt)).then_some(err) |
| } |
| // The following states does not have any outstanding segments, so |
| // they don't expect any incoming ICMP error. |
| State::FinWait2(_) | State::Closing(_) | State::TimeWait(_) => None, |
| } |
| } |
| } |
| |
| /// From the socket layer, both `close` and `shutdown` will result in a state |
| /// machine level `close` call. We need to differentiate between the two |
| /// because we may need to do extra work if it is a socket `close`. |
| pub(super) enum CloseReason<I: Instant> { |
| Shutdown, |
| Close { now: I }, |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use alloc::vec; |
| use core::{fmt::Debug, num::NonZeroU16, time::Duration}; |
| |
| use assert_matches::assert_matches; |
| use net_types::ip::Ipv4; |
| use test_case::test_case; |
| |
| use super::*; |
| use crate::{ |
| context::{ |
| testutil::{FakeInstant, FakeInstantCtx}, |
| InstantContext as _, |
| }, |
| transport::tcp::{ |
| buffer::{Buffer, RingBuffer}, |
| testutil::{ |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE, |
| }, |
| DEFAULT_FIN_WAIT2_TIMEOUT, |
| }, |
| }; |
| |
| const ISS_1: SeqNum = SeqNum::new(100); |
| const ISS_2: SeqNum = SeqNum::new(300); |
| |
| const RTT: Duration = Duration::from_millis(500); |
| |
| const DEVICE_MAXIMUM_SEGMENT_SIZE: Mss = Mss(const_unwrap_option(NonZeroU16::new(1400 as u16))); |
| |
| /// A buffer provider that doesn't need extra information to construct |
| /// buffers as this is only used in unit tests for the state machine only. |
| enum ClientlessBufferProvider {} |
| |
| impl<R: ReceiveBuffer + Default, S: SendBuffer + Default> BufferProvider<R, S> |
| for ClientlessBufferProvider |
| { |
| type PassiveOpen = (); |
| type ActiveOpen = (); |
| |
| fn new_passive_open_buffers(_buffer_sizes: BufferSizes) -> (R, S, Self::PassiveOpen) { |
| (R::default(), S::default(), ()) |
| } |
| } |
| |
| impl<P: Payload> Segment<P> { |
| fn data(seq: SeqNum, ack: SeqNum, wnd: UnscaledWindowSize, data: P) -> Segment<P> { |
| let (seg, truncated) = Segment::with_data(seq, Some(ack), None, wnd, data); |
| assert_eq!(truncated, 0); |
| seg |
| } |
| |
| fn piggybacked_fin( |
| seq: SeqNum, |
| ack: SeqNum, |
| wnd: UnscaledWindowSize, |
| data: P, |
| ) -> Segment<P> { |
| let (seg, truncated) = |
| Segment::with_data(seq, Some(ack), Some(Control::FIN), wnd, data); |
| assert_eq!(truncated, 0); |
| seg |
| } |
| } |
| |
| impl Segment<()> { |
| fn fin(seq: SeqNum, ack: SeqNum, wnd: UnscaledWindowSize) -> Self { |
| Segment::new(seq, Some(ack), Some(Control::FIN), wnd) |
| } |
| } |
| |
| impl RingBuffer { |
| fn with_data<'a>(cap: usize, data: &'a [u8]) -> Self { |
| let mut buffer = RingBuffer::new(cap); |
| let nwritten = buffer.write_at(0, &data); |
| assert_eq!(nwritten, data.len()); |
| buffer.make_readable(nwritten); |
| buffer |
| } |
| } |
| |
| impl UnscaledWindowSize { |
| fn from_usize(size: usize) -> Self { |
| UnscaledWindowSize::from(u16::try_from(size).unwrap()) |
| } |
| |
| fn from_u32(size: u32) -> Self { |
| UnscaledWindowSize::from(u16::try_from(size).unwrap()) |
| } |
| } |
| |
| /// A buffer that can't read or write for test purpose. |
| #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] |
| struct NullBuffer; |
| |
| impl Buffer for NullBuffer { |
| fn limits(&self) -> BufferLimits { |
| BufferLimits { len: 0, capacity: 0 } |
| } |
| |
| fn target_capacity(&self) -> usize { |
| 0 |
| } |
| |
| fn request_capacity(&mut self, _size: usize) {} |
| } |
| |
| impl ReceiveBuffer for NullBuffer { |
| fn write_at<P: Payload>(&mut self, _offset: usize, _data: &P) -> usize { |
| 0 |
| } |
| |
| fn make_readable(&mut self, count: usize) { |
| assert_eq!(count, 0); |
| } |
| } |
| |
| impl SendBuffer for NullBuffer { |
| fn mark_read(&mut self, count: usize) { |
| assert_eq!(count, 0); |
| } |
| |
| fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R |
| where |
| F: FnOnce(SendPayload<'a>) -> R, |
| { |
| assert_eq!(offset, 0); |
| f(SendPayload::Contiguous(&[])) |
| } |
| } |
| |
| impl<R: ReceiveBuffer, S: SendBuffer> State<FakeInstant, R, S, ()> { |
| fn poll_send_with_default_options( |
| &mut self, |
| mss: u32, |
| now: FakeInstant, |
| counters: &TcpCountersInner, |
| ) -> Option<Segment<SendPayload<'_>>> { |
| self.poll_send(counters, mss, now, &SocketOptions::default()) |
| } |
| |
| fn on_segment_with_default_options<P: Payload, BP: BufferProvider<R, S, ActiveOpen = ()>>( |
| &mut self, |
| incoming: Segment<P>, |
| now: FakeInstant, |
| counters: &TcpCountersInner, |
| ) -> (Option<Segment<()>>, Option<BP::PassiveOpen>) |
| where |
| BP::PassiveOpen: Debug, |
| R: Default, |
| S: Default, |
| { |
| // In testing, it is convenient to disable delayed ack by default. |
| let (segment, passive_open, _data_acked) = self.on_segment::<P, BP>( |
| counters, |
| incoming, |
| now, |
| &SocketOptions::default(), |
| false, /* defunct */ |
| ); |
| (segment, passive_open) |
| } |
| } |
| |
| impl<S: SendBuffer + Debug> State<FakeInstant, RingBuffer, S, ()> { |
| fn read_with(&mut self, f: impl for<'b> FnOnce(&'b [&'_ [u8]]) -> usize) -> usize { |
| match self { |
| State::Closed(_) |
| | State::Listen(_) |
| | State::SynRcvd(_) |
| | State::SynSent(_) |
| | State::CloseWait(_) |
| | State::LastAck(_) |
| | State::Closing(_) |
| | State::TimeWait(_) => { |
| panic!("No receive state in {:?}", self); |
| } |
| State::Established(e) => e.rcv.buffer.read_with(f), |
| State::FinWait1(FinWait1 { snd: _, rcv }) |
| | State::FinWait2(FinWait2 { last_seq: _, rcv, timeout_at: _ }) => { |
| rcv.buffer.read_with(f) |
| } |
| } |
| } |
| } |
| |
| #[test_case(Segment::rst(ISS_1) => None; "drop RST")] |
| #[test_case(Segment::rst_ack(ISS_1, ISS_2) => None; "drop RST|ACK")] |
| #[test_case(Segment::syn(ISS_1, UnscaledWindowSize::from(0), Options { mss: None, window_scale: None }) => Some(Segment::rst_ack(SeqNum::new(0), ISS_1 + 1)); "reset SYN")] |
| #[test_case(Segment::syn_ack(ISS_1, ISS_2, UnscaledWindowSize::from(0), Options { mss: None, window_scale: None }) => Some(Segment::rst(ISS_2)); "reset SYN|ACK")] |
| #[test_case(Segment::data(ISS_1, ISS_2, UnscaledWindowSize::from(0), &[0, 1, 2][..]) => Some(Segment::rst(ISS_2)); "reset data segment")] |
| fn segment_arrives_when_closed( |
| incoming: impl Into<Segment<&'static [u8]>>, |
| ) -> Option<Segment<()>> { |
| let closed = Closed { reason: () }; |
| closed.on_segment(incoming.into()) |
| } |
| |
| #[test_case( |
| Segment::rst_ack(ISS_2, ISS_1 - 1), RTT |
| => SynSentOnSegmentDisposition::Ignore; "unacceptable ACK with RST")] |
| #[test_case( |
| Segment::ack(ISS_2, ISS_1 - 1, UnscaledWindowSize::from(u16::MAX)), RTT |
| => SynSentOnSegmentDisposition::SendRst( |
| Segment::rst(ISS_1-1), |
| ); "unacceptable ACK without RST")] |
| #[test_case( |
| Segment::rst_ack(ISS_2, ISS_1), RTT |
| => SynSentOnSegmentDisposition::EnterClosed( |
| Closed { reason: Some(ConnectionError::ConnectionReset) }, |
| ); "acceptable ACK(ISS) with RST")] |
| #[test_case( |
| Segment::rst_ack(ISS_2, ISS_1 + 1), RTT |
| => SynSentOnSegmentDisposition::EnterClosed( |
| Closed { reason: Some(ConnectionError::ConnectionReset) }, |
| ); "acceptable ACK(ISS+1) with RST")] |
| #[test_case( |
| Segment::rst(ISS_2), RTT |
| => SynSentOnSegmentDisposition::Ignore; "RST without ack")] |
| #[test_case( |
| Segment::syn(ISS_2, UnscaledWindowSize::from(u16::MAX), Options { mss: None, window_scale: Some(WindowScale::default()) }), RTT |
| => SynSentOnSegmentDisposition::SendSynAckAndEnterSynRcvd( |
| Segment::syn_ack(ISS_1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX), Options { mss: Some(Mss::default::<Ipv4>()), window_scale: Some(WindowScale::default()) }), |
| SynRcvd { |
| iss: ISS_1, |
| irs: ISS_2, |
| timestamp: Some(FakeInstant::from(RTT)), |
| retrans_timer: RetransTimer::new( |
| FakeInstant::from(RTT), |
| Estimator::RTO_INIT, |
| DEFAULT_USER_TIMEOUT - RTT, |
| DEFAULT_MAX_SYNACK_RETRIES |
| ), |
| simultaneous_open: None, |
| buffer_sizes: BufferSizes::default(), |
| smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale: WindowScale::default(), |
| snd_wnd_scale: Some(WindowScale::default()), |
| } |
| ); "SYN only")] |
| #[test_case( |
| Segment::fin(ISS_2, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)), RTT |
| => SynSentOnSegmentDisposition::Ignore; "acceptable ACK with FIN")] |
| #[test_case( |
| Segment::ack(ISS_2, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)), RTT |
| => SynSentOnSegmentDisposition::Ignore; "acceptable ACK(ISS+1) with nothing")] |
| #[test_case( |
| Segment::ack(ISS_2, ISS_1, UnscaledWindowSize::from(u16::MAX)), RTT |
| => SynSentOnSegmentDisposition::Ignore; "acceptable ACK(ISS) without RST")] |
| #[test_case( |
| Segment::syn(ISS_2, UnscaledWindowSize::from(u16::MAX), Options { mss: None, window_scale: None }), |
| DEFAULT_USER_TIMEOUT |
| => SynSentOnSegmentDisposition::EnterClosed(Closed { |
| reason: None |
| }); "syn but timed out")] |
| fn segment_arrives_when_syn_sent( |
| incoming: Segment<()>, |
| delay: Duration, |
| ) -> SynSentOnSegmentDisposition<FakeInstant, ()> { |
| let syn_sent = SynSent { |
| iss: ISS_1, |
| timestamp: Some(FakeInstant::default()), |
| retrans_timer: RetransTimer::new( |
| FakeInstant::default(), |
| Estimator::RTO_INIT, |
| DEFAULT_USER_TIMEOUT, |
| DEFAULT_MAX_RETRIES, |
| ), |
| active_open: (), |
| buffer_sizes: BufferSizes::default(), |
| default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| device_mss: DEVICE_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale: WindowScale::default(), |
| }; |
| syn_sent.on_segment(incoming, FakeInstant::from(delay)) |
| } |
| |
| #[test_case(Segment::rst(ISS_2) => ListenOnSegmentDisposition::Ignore; "ignore RST")] |
| #[test_case(Segment::ack(ISS_2, ISS_1, UnscaledWindowSize::from(u16::MAX)) => |
| ListenOnSegmentDisposition::SendRst(Segment::rst(ISS_1)); "reject ACK")] |
| #[test_case(Segment::syn(ISS_2, UnscaledWindowSize::from(u16::MAX), Options { mss: None, window_scale: Some(WindowScale::default()) }) => |
| ListenOnSegmentDisposition::SendSynAckAndEnterSynRcvd( |
| Segment::syn_ack(ISS_1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX), Options { mss: Some(Mss::default::<Ipv4>()), window_scale: Some(WindowScale::default()) }), |
| SynRcvd { |
| iss: ISS_1, |
| irs: ISS_2, |
| timestamp: Some(FakeInstant::default()), |
| retrans_timer: RetransTimer::new( |
| FakeInstant::default(), |
| Estimator::RTO_INIT, |
| DEFAULT_USER_TIMEOUT, |
| DEFAULT_MAX_SYNACK_RETRIES, |
| ), |
| simultaneous_open: None, |
| buffer_sizes: BufferSizes::default(), |
| smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale: WindowScale::default(), |
| snd_wnd_scale: Some(WindowScale::default()), |
| }); "accept syn")] |
| fn segment_arrives_when_listen( |
| incoming: Segment<()>, |
| ) -> ListenOnSegmentDisposition<FakeInstant> { |
| let listen = Closed::<Initial>::listen( |
| ISS_1, |
| Default::default(), |
| DEVICE_MAXIMUM_SEGMENT_SIZE, |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| None, |
| ); |
| listen.on_segment(incoming, FakeInstant::default()) |
| } |
| |
| #[test_case( |
| Segment::ack(ISS_1, ISS_2, UnscaledWindowSize::from(u16::MAX)), |
| None |
| => Some( |
| Segment::ack(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)) |
| ); "OTW segment")] |
| #[test_case( |
| Segment::rst_ack(ISS_1, ISS_2), |
| None |
| => None; "OTW RST")] |
| #[test_case( |
| Segment::rst_ack(ISS_1 + 1, ISS_2), |
| Some(State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) })) |
| => None; "acceptable RST")] |
| #[test_case( |
| Segment::syn(ISS_1 + 1, UnscaledWindowSize::from(u16::MAX), Options { mss: None, window_scale: Some(WindowScale::default()) }), |
| Some(State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) })) |
| => Some( |
| Segment::rst(ISS_2 + 1) |
| ); "duplicate syn")] |
| #[test_case( |
| Segment::ack(ISS_1 + 1, ISS_2, UnscaledWindowSize::from(u16::MAX)), |
| None |
| => Some( |
| Segment::rst(ISS_2) |
| ); "unacceptable ack (ISS)")] |
| #[test_case( |
| Segment::ack(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX)), |
| Some(State::Established( |
| Established { |
| snd: Send { |
| nxt: ISS_2 + 1, |
| max: ISS_2 + 1, |
| una: ISS_2 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: NullBuffer, |
| wl1: ISS_1 + 1, |
| wl2: ISS_2 + 1, |
| rtt_estimator: Estimator::Measured { |
| srtt: RTT, |
| rtt_var: RTT / 2, |
| }, |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::default(), |
| assembler: Assembler::new(ISS_1 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_1 + 1, WindowSize::DEFAULT), |
| }, |
| } |
| )) |
| => None; "acceptable ack (ISS + 1)")] |
| #[test_case( |
| Segment::ack(ISS_1 + 1, ISS_2 + 2, UnscaledWindowSize::from(u16::MAX)), |
| None |
| => Some( |
| Segment::rst(ISS_2 + 2) |
| ); "unacceptable ack (ISS + 2)")] |
| #[test_case( |
| Segment::ack(ISS_1 + 1, ISS_2 - 1, UnscaledWindowSize::from(u16::MAX)), |
| None |
| => Some( |
| Segment::rst(ISS_2 - 1) |
| ); "unacceptable ack (ISS - 1)")] |
| #[test_case( |
| Segment::new(ISS_1 + 1, None, None, UnscaledWindowSize::from(u16::MAX)), |
| None |
| => None; "no ack")] |
| #[test_case( |
| Segment::fin(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX)), |
| Some(State::CloseWait(CloseWait { |
| snd: Send { |
| nxt: ISS_2 + 1, |
| max: ISS_2 + 1, |
| una: ISS_2 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: NullBuffer, |
| wl1: ISS_1 + 1, |
| wl2: ISS_2 + 1, |
| rtt_estimator: Estimator::Measured{ |
| srtt: RTT, |
| rtt_var: RTT / 2, |
| }, |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| wnd_scale: WindowScale::default(), |
| }, |
| last_ack: ISS_1 + 2, |
| last_wnd: WindowSize::from_u32(u32::from(u16::MAX - 1)).unwrap(), |
| })) |
| => Some( |
| Segment::ack(ISS_2 + 1, ISS_1 + 2, UnscaledWindowSize::from(u16::MAX - 1)) |
| ); "fin")] |
| fn segment_arrives_when_syn_rcvd( |
| incoming: Segment<()>, |
| expected: Option<State<FakeInstant, RingBuffer, NullBuffer, ()>>, |
| ) -> Option<Segment<()>> { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut state = State::SynRcvd(SynRcvd { |
| iss: ISS_2, |
| irs: ISS_1, |
| timestamp: Some(clock.now()), |
| retrans_timer: RetransTimer::new( |
| clock.now(), |
| Estimator::RTO_INIT, |
| DEFAULT_USER_TIMEOUT, |
| DEFAULT_MAX_RETRIES, |
| ), |
| simultaneous_open: Some(()), |
| buffer_sizes: Default::default(), |
| smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale: WindowScale::default(), |
| snd_wnd_scale: Some(WindowScale::default()), |
| }); |
| clock.sleep(RTT); |
| let (seg, _passive_open) = state |
| .on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| incoming, |
| clock.now(), |
| &counters, |
| ); |
| match expected { |
| Some(new_state) => assert_eq!(new_state, state), |
| None => assert_matches!(state, State::SynRcvd(_)), |
| }; |
| seg |
| } |
| |
| #[test_case( |
| Segment::syn(ISS_2 + 1, UnscaledWindowSize::from(u16::MAX), Options { mss: None, window_scale: None }), |
| Some(State::Closed ( |
| Closed { reason: Some(ConnectionError::ConnectionReset) }, |
| )) |
| => Some(Segment::rst(ISS_1 + 1)); "duplicate syn")] |
| #[test_case( |
| Segment::rst(ISS_2 + 1), |
| Some(State::Closed ( |
| Closed { reason: Some(ConnectionError::ConnectionReset) }, |
| )) |
| => None; "accepatable rst")] |
| #[test_case( |
| Segment::ack(ISS_2 + 1, ISS_1 + 2, UnscaledWindowSize::from(u16::MAX)), |
| None |
| => Some( |
| Segment::ack(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(2)) |
| ); "unacceptable ack")] |
| #[test_case( |
| Segment::ack(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)), |
| None |
| => None; "pure ack")] |
| #[test_case( |
| Segment::fin(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)), |
| Some(State::CloseWait(CloseWait { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: NullBuffer, |
| wl1: ISS_2 + 1, |
| wl2: ISS_1 + 1, |
| rtt_estimator: Estimator::default(), |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| wnd_scale: WindowScale::default(), |
| }, |
| last_ack: ISS_2 + 2, |
| last_wnd: WindowSize::new(1).unwrap(), |
| })) |
| => Some( |
| Segment::ack(ISS_1 + 1, ISS_2 + 2, UnscaledWindowSize::from(1)) |
| ); "pure fin")] |
| #[test_case( |
| Segment::piggybacked_fin(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX), "A".as_bytes()), |
| Some(State::CloseWait(CloseWait { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: NullBuffer, |
| wl1: ISS_2 + 1, |
| wl2: ISS_1 + 1, |
| rtt_estimator: Estimator::default(), |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| wnd_scale: WindowScale::default(), |
| }, |
| last_ack: ISS_2 + 3, |
| last_wnd: WindowSize::ZERO, |
| })) |
| => Some( |
| Segment::ack(ISS_1 + 1, ISS_2 + 3, UnscaledWindowSize::from(0)) |
| ); "fin with 1 byte")] |
| #[test_case( |
| Segment::piggybacked_fin(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX), "AB".as_bytes()), |
| None |
| => Some( |
| Segment::ack(ISS_1 + 1, ISS_2 + 3, UnscaledWindowSize::from(0)) |
| ); "fin with 2 bytes")] |
| fn segment_arrives_when_established( |
| incoming: Segment<impl Payload>, |
| expected: Option<State<FakeInstant, RingBuffer, NullBuffer, ()>>, |
| ) -> Option<Segment<()>> { |
| let counters = TcpCountersInner::default(); |
| let mut state = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: NullBuffer, |
| wl1: ISS_2 + 1, |
| wl2: ISS_1 + 1, |
| rtt_estimator: Estimator::default(), |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new(2), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::new(2).unwrap()), |
| }, |
| }); |
| let (seg, passive_open) = state |
| .on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| incoming, |
| FakeInstant::default(), |
| &counters, |
| ); |
| assert_eq!(passive_open, None); |
| match expected { |
| Some(new_state) => assert_eq!(new_state, state), |
| None => assert_matches!(state, State::Established(_)), |
| }; |
| seg |
| } |
| |
| #[test] |
| fn common_rcv_data_segment_arrives() { |
| let counters = TcpCountersInner::default(); |
| // Tests the common behavior when data segment arrives in states that |
| // have a receive state. |
| let new_snd = || Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: NullBuffer, |
| wl1: ISS_2 + 1, |
| wl2: ISS_1 + 1, |
| rtt_estimator: Estimator::default(), |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }; |
| let new_rcv = || Recv { |
| buffer: RingBuffer::new(TEST_BYTES.len()), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::new(TEST_BYTES.len()).unwrap()), |
| }; |
| for mut state in [ |
| State::Established(Established { snd: new_snd(), rcv: new_rcv() }), |
| State::FinWait1(FinWait1 { snd: new_snd().queue_fin(), rcv: new_rcv() }), |
| State::FinWait2(FinWait2 { last_seq: ISS_1 + 1, rcv: new_rcv(), timeout_at: None }), |
| ] { |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::data( |
| ISS_2 + 1, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| TEST_BYTES |
| ), |
| FakeInstant::default(), |
| &counters, |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + 1, |
| ISS_2 + 1 + TEST_BYTES.len(), |
| UnscaledWindowSize::from(0) |
| )), |
| None |
| ) |
| ); |
| assert_eq!( |
| state.read_with(|bytes| { |
| assert_eq!(bytes.concat(), TEST_BYTES); |
| TEST_BYTES.len() |
| }), |
| TEST_BYTES.len() |
| ); |
| } |
| } |
| |
| #[test] |
| fn common_snd_ack_segment_arrives() { |
| let counters = TcpCountersInner::default(); |
| // Tests the common behavior when ack segment arrives in states that |
| // have a send state. |
| let new_snd = || Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: RingBuffer::with_data(TEST_BYTES.len(), TEST_BYTES), |
| wl1: ISS_2 + 1, |
| wl2: ISS_1 + 1, |
| rtt_estimator: Estimator::default(), |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }; |
| let new_rcv = || Recv { |
| buffer: NullBuffer, |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::ZERO), |
| }; |
| for mut state in [ |
| State::Established(Established { snd: new_snd(), rcv: new_rcv() }), |
| State::FinWait1(FinWait1 { snd: new_snd().queue_fin(), rcv: new_rcv() }), |
| State::Closing(Closing { |
| snd: new_snd().queue_fin(), |
| last_ack: ISS_2 + 1, |
| last_wnd: WindowSize::ZERO, |
| last_wnd_scale: WindowScale::default(), |
| }), |
| State::CloseWait(CloseWait { |
| snd: new_snd(), |
| last_ack: ISS_2 + 1, |
| last_wnd: WindowSize::ZERO, |
| }), |
| State::LastAck(LastAck { |
| snd: new_snd().queue_fin(), |
| last_ack: ISS_2 + 1, |
| last_wnd: WindowSize::ZERO, |
| }), |
| ] { |
| assert_eq!( |
| state.poll_send_with_default_options( |
| u32::try_from(TEST_BYTES.len()).unwrap(), |
| FakeInstant::default(), |
| &counters, |
| ), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_2 + 1, |
| UnscaledWindowSize::from(0), |
| SendPayload::Contiguous(TEST_BYTES) |
| )) |
| ); |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack( |
| ISS_2 + 1, |
| ISS_1 + 1 + TEST_BYTES.len(), |
| UnscaledWindowSize::from(u16::MAX) |
| ), |
| FakeInstant::default(), |
| &counters, |
| ), |
| (None, None), |
| ); |
| assert_eq!(state.poll_send_at(), None); |
| let snd = match state { |
| State::Closed(_) |
| | State::Listen(_) |
| | State::SynRcvd(_) |
| | State::SynSent(_) |
| | State::FinWait2(_) |
| | State::TimeWait(_) => unreachable!("Unexpected state {:?}", state), |
| State::Established(e) => e.snd.queue_fin(), |
| State::CloseWait(c) => c.snd.queue_fin(), |
| State::LastAck(l) => l.snd, |
| State::FinWait1(f) => f.snd, |
| State::Closing(c) => c.snd, |
| }; |
| assert_eq!(snd.nxt, ISS_1 + 1 + TEST_BYTES.len()); |
| assert_eq!(snd.max, ISS_1 + 1 + TEST_BYTES.len()); |
| assert_eq!(snd.una, ISS_1 + 1 + TEST_BYTES.len()); |
| assert_eq!(snd.buffer.limits().len, 0); |
| } |
| } |
| |
| #[test_case( |
| Segment::syn(ISS_2 + 2, UnscaledWindowSize::from(u16::MAX), Options { mss: None, window_scale: None }), |
| Some(State::Closed ( |
| Closed { reason: Some(ConnectionError::ConnectionReset) }, |
| )) |
| => Some(Segment::rst(ISS_1 + 1)); "syn")] |
| #[test_case( |
| Segment::rst(ISS_2 + 2), |
| Some(State::Closed ( |
| Closed { reason: Some(ConnectionError::ConnectionReset) }, |
| )) |
| => None; "rst")] |
| #[test_case( |
| Segment::fin(ISS_2 + 2, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)), |
| None |
| => None; "ignore fin")] |
| #[test_case( |
| Segment::data(ISS_2 + 2, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX), "Hello".as_bytes()), |
| None |
| => None; "ignore data")] |
| fn segment_arrives_when_close_wait( |
| incoming: Segment<impl Payload>, |
| expected: Option<State<FakeInstant, RingBuffer, NullBuffer, ()>>, |
| ) -> Option<Segment<()>> { |
| let counters = TcpCountersInner::default(); |
| let mut state = State::CloseWait(CloseWait { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: NullBuffer, |
| wl1: ISS_2 + 1, |
| wl2: ISS_1 + 1, |
| rtt_estimator: Estimator::default(), |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| last_ack: ISS_2 + 2, |
| last_wnd: WindowSize::DEFAULT, |
| }); |
| let (seg, _passive_open) = state |
| .on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| incoming, |
| FakeInstant::default(), |
| &counters, |
| ); |
| match expected { |
| Some(new_state) => assert_eq!(new_state, state), |
| None => assert_matches!(state, State::CloseWait(_)), |
| }; |
| seg |
| } |
| |
| #[test] |
| fn active_passive_open() { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let (syn_sent, syn_seg) = Closed::<Initial>::connect( |
| ISS_1, |
| clock.now(), |
| (), |
| Default::default(), |
| DEVICE_MAXIMUM_SEGMENT_SIZE, |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| &SocketOptions::default(), |
| ); |
| assert_eq!( |
| syn_seg, |
| Segment::syn( |
| ISS_1, |
| UnscaledWindowSize::from(u16::MAX), |
| Options { |
| mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE), |
| window_scale: Some(WindowScale::default()) |
| } |
| ) |
| ); |
| assert_eq!( |
| syn_sent, |
| SynSent { |
| iss: ISS_1, |
| timestamp: Some(clock.now()), |
| retrans_timer: RetransTimer::new( |
| clock.now(), |
| Estimator::RTO_INIT, |
| DEFAULT_USER_TIMEOUT, |
| DEFAULT_MAX_SYN_RETRIES, |
| ), |
| active_open: (), |
| buffer_sizes: BufferSizes::default(), |
| default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| device_mss: DEVICE_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale: WindowScale::default(), |
| } |
| ); |
| let mut active = State::SynSent(syn_sent); |
| let mut passive = State::Listen(Closed::<Initial>::listen( |
| ISS_2, |
| Default::default(), |
| DEVICE_MAXIMUM_SEGMENT_SIZE, |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| None, |
| )); |
| clock.sleep(RTT / 2); |
| let (seg, passive_open) = passive |
| .on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| syn_seg, |
| clock.now(), |
| &counters, |
| ); |
| let syn_ack = seg.expect("failed to generate a syn-ack segment"); |
| assert_eq!(passive_open, None); |
| assert_eq!( |
| syn_ack, |
| Segment::syn_ack( |
| ISS_2, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| Options { |
| mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE), |
| window_scale: Some(WindowScale::default()) |
| } |
| ) |
| ); |
| assert_matches!(passive, State::SynRcvd(ref syn_rcvd) if syn_rcvd == &SynRcvd { |
| iss: ISS_2, |
| irs: ISS_1, |
| timestamp: Some(clock.now()), |
| retrans_timer: RetransTimer::new( |
| clock.now(), |
| Estimator::RTO_INIT, |
| DEFAULT_USER_TIMEOUT, |
| DEFAULT_MAX_SYNACK_RETRIES, |
| ), |
| simultaneous_open: None, |
| buffer_sizes: Default::default(), |
| smss: DEVICE_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale: WindowScale::default(), |
| snd_wnd_scale: Some(WindowScale::default()), |
| }); |
| clock.sleep(RTT / 2); |
| let (seg, passive_open) = active |
| .on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| syn_ack, |
| clock.now(), |
| &counters, |
| ); |
| let ack_seg = seg.expect("failed to generate a ack segment"); |
| assert_eq!(passive_open, None); |
| assert_eq!(ack_seg, Segment::ack(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX))); |
| assert_matches!(active, State::Established(ref established) if established == &Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: RingBuffer::default(), |
| wl1: ISS_2, |
| wl2: ISS_1 + 1, |
| rtt_estimator: Estimator::Measured { |
| srtt: RTT, |
| rtt_var: RTT / 2, |
| }, |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::default(), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| |
| mss: DEVICE_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::DEFAULT), |
| } |
| }); |
| clock.sleep(RTT / 2); |
| assert_eq!( |
| passive.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| ack_seg, |
| clock.now(), |
| &counters, |
| ), |
| (None, Some(())), |
| ); |
| assert_matches!(passive, State::Established(ref established) if established == &Established { |
| snd: Send { |
| nxt: ISS_2 + 1, |
| max: ISS_2 + 1, |
| una: ISS_2 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: RingBuffer::default(), |
| wl1: ISS_1 + 1, |
| wl2: ISS_2 + 1, |
| rtt_estimator: Estimator::Measured { |
| srtt: RTT, |
| rtt_var: RTT / 2, |
| }, |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::default(), |
| assembler: Assembler::new(ISS_1 + 1), |
| timer: None, |
| mss: DEVICE_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_1 + 1, WindowSize::DEFAULT), |
| } |
| }) |
| } |
| |
| #[test] |
| fn simultaneous_open() { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let start = clock.now(); |
| let (syn_sent1, syn1) = Closed::<Initial>::connect( |
| ISS_1, |
| clock.now(), |
| (), |
| Default::default(), |
| DEVICE_MAXIMUM_SEGMENT_SIZE, |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| &SocketOptions::default(), |
| ); |
| let (syn_sent2, syn2) = Closed::<Initial>::connect( |
| ISS_2, |
| clock.now(), |
| (), |
| Default::default(), |
| DEVICE_MAXIMUM_SEGMENT_SIZE, |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| &SocketOptions::default(), |
| ); |
| |
| assert_eq!( |
| syn1, |
| Segment::syn( |
| ISS_1, |
| UnscaledWindowSize::from(u16::MAX), |
| Options { |
| mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE), |
| window_scale: Some(WindowScale::default()) |
| } |
| ) |
| ); |
| assert_eq!( |
| syn2, |
| Segment::syn( |
| ISS_2, |
| UnscaledWindowSize::from(u16::MAX), |
| Options { |
| mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE), |
| window_scale: Some(WindowScale::default()) |
| } |
| ) |
| ); |
| |
| let mut state1 = State::SynSent(syn_sent1); |
| let mut state2 = State::SynSent(syn_sent2); |
| |
| clock.sleep(RTT); |
| let (seg, passive_open) = state1 |
| .on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| syn2, |
| clock.now(), |
| &counters, |
| ); |
| let syn_ack1 = seg.expect("failed to generate syn ack"); |
| assert_eq!(passive_open, None); |
| let (seg, passive_open) = state2 |
| .on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| syn1, |
| clock.now(), |
| &counters, |
| ); |
| let syn_ack2 = seg.expect("failed to generate syn ack"); |
| assert_eq!(passive_open, None); |
| |
| assert_eq!( |
| syn_ack1, |
| Segment::syn_ack( |
| ISS_1, |
| ISS_2 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| Options { |
| mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE), |
| window_scale: Some(WindowScale::default()) |
| } |
| ) |
| ); |
| assert_eq!( |
| syn_ack2, |
| Segment::syn_ack( |
| ISS_2, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| Options { |
| mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE), |
| window_scale: Some(WindowScale::default()) |
| } |
| ) |
| ); |
| |
| let elapsed = clock.now() - start; |
| assert_matches!(state1, State::SynRcvd(ref syn_rcvd) if syn_rcvd == &SynRcvd { |
| iss: ISS_1, |
| irs: ISS_2, |
| timestamp: Some(clock.now()), |
| retrans_timer: RetransTimer::new( |
| clock.now(), |
| Estimator::RTO_INIT, |
| DEFAULT_USER_TIMEOUT - elapsed, |
| DEFAULT_MAX_SYNACK_RETRIES, |
| ), |
| simultaneous_open: Some(()), |
| buffer_sizes: BufferSizes::default(), |
| smss: DEVICE_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale: WindowScale::default(), |
| snd_wnd_scale: Some(WindowScale::default()), |
| }); |
| assert_matches!(state2, State::SynRcvd(ref syn_rcvd) if syn_rcvd == &SynRcvd { |
| iss: ISS_2, |
| irs: ISS_1, |
| timestamp: Some(clock.now()), |
| retrans_timer: RetransTimer::new( |
| clock.now(), |
| Estimator::RTO_INIT, |
| DEFAULT_USER_TIMEOUT - elapsed, |
| DEFAULT_MAX_SYNACK_RETRIES, |
| ), |
| simultaneous_open: Some(()), |
| buffer_sizes: BufferSizes::default(), |
| smss: DEVICE_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale: WindowScale::default(), |
| snd_wnd_scale: Some(WindowScale::default()), |
| }); |
| |
| clock.sleep(RTT); |
| assert_eq!( |
| state1.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| syn_ack2, |
| clock.now(), |
| &counters, |
| ), |
| (Some(Segment::ack(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX))), None) |
| ); |
| assert_eq!( |
| state2.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| syn_ack1, |
| clock.now(), |
| &counters, |
| ), |
| (Some(Segment::ack(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX))), None) |
| ); |
| |
| assert_matches!(state1, State::Established(established) if established == Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: RingBuffer::default(), |
| wl1: ISS_2 + 1, |
| wl2: ISS_1 + 1, |
| rtt_estimator: Estimator::Measured { |
| srtt: RTT, |
| rtt_var: RTT / 2, |
| }, |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::default(), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEVICE_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::DEFAULT) |
| } |
| }); |
| |
| assert_matches!(state2, State::Established(established) if established == Established { |
| snd: Send { |
| nxt: ISS_2 + 1, |
| max: ISS_2 + 1, |
| una: ISS_2 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: RingBuffer::default(), |
| wl1: ISS_1 + 1, |
| wl2: ISS_2 + 1, |
| rtt_estimator: Estimator::Measured { |
| srtt: RTT, |
| rtt_var: RTT / 2, |
| }, |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::default(), |
| assembler: Assembler::new(ISS_1 + 1), |
| timer: None, |
| mss: DEVICE_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_1 + 1, WindowSize::DEFAULT) |
| } |
| }); |
| } |
| |
| const BUFFER_SIZE: usize = 16; |
| const TEST_BYTES: &[u8] = "Hello".as_bytes(); |
| |
| #[test] |
| fn established_receive() { |
| let clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut established = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::ZERO, |
| wnd_max: WindowSize::ZERO, |
| buffer: NullBuffer, |
| wl1: ISS_2 + 1, |
| wl2: ISS_1 + 1, |
| rtt_estimator: Estimator::default(), |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(Mss( |
| NonZeroU16::new(5).unwrap() |
| )), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: Mss(NonZeroU16::new(5).unwrap()), |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()), |
| }, |
| }); |
| |
| // Received an expected segment at rcv.nxt. |
| assert_eq!( |
| established.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::data(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(0), TEST_BYTES,), |
| clock.now(), |
| &counters, |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + 1, |
| ISS_2 + 1 + TEST_BYTES.len(), |
| UnscaledWindowSize::from((BUFFER_SIZE - TEST_BYTES.len()) as u16), |
| )), |
| None |
| ), |
| ); |
| assert_eq!( |
| established.read_with(|available| { |
| assert_eq!(available, &[TEST_BYTES]); |
| available[0].len() |
| }), |
| TEST_BYTES.len() |
| ); |
| |
| // Receive an out-of-order segment. |
| assert_eq!( |
| established.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::data( |
| ISS_2 + 1 + TEST_BYTES.len() * 2, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(0), |
| TEST_BYTES, |
| ), |
| clock.now(), |
| &counters |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + 1, |
| ISS_2 + 1 + TEST_BYTES.len(), |
| UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()), |
| )), |
| None |
| ), |
| ); |
| assert_eq!( |
| established.read_with(|available| { |
| assert_eq!(available, &[&[][..]]); |
| 0 |
| }), |
| 0 |
| ); |
| |
| // Receive the next segment that fills the hole. |
| assert_eq!( |
| established.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::data( |
| ISS_2 + 1 + TEST_BYTES.len(), |
| ISS_1 + 1, |
| UnscaledWindowSize::from(0), |
| TEST_BYTES, |
| ), |
| clock.now(), |
| &counters |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + 1, |
| ISS_2 + 1 + 3 * TEST_BYTES.len(), |
| UnscaledWindowSize::from_usize(BUFFER_SIZE - 2 * TEST_BYTES.len()), |
| )), |
| None |
| ), |
| ); |
| assert_eq!( |
| established.read_with(|available| { |
| assert_eq!(available, &[[TEST_BYTES, TEST_BYTES].concat()]); |
| available[0].len() |
| }), |
| 10 |
| ); |
| } |
| |
| #[test] |
| fn established_send() { |
| let clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut send_buffer = RingBuffer::new(BUFFER_SIZE); |
| assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5); |
| let mut established = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1, |
| wnd: WindowSize::ZERO, |
| wnd_max: WindowSize::ZERO, |
| buffer: send_buffer, |
| wl1: ISS_2, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::default(), |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()), |
| }, |
| }); |
| // Data queued but the window is not opened, nothing to send. |
| assert_eq!( |
| established.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| None |
| ); |
| let open_window = |established: &mut State<FakeInstant, RingBuffer, RingBuffer, ()>, |
| ack: SeqNum, |
| win: usize, |
| now: FakeInstant, |
| counters: &TcpCountersInner| { |
| assert_eq!( |
| established.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack(ISS_2 + 1, ack, UnscaledWindowSize::from_usize(win)), |
| now, |
| counters |
| ), |
| (None, None), |
| ); |
| }; |
| // Open up the window by 1 byte. |
| open_window(&mut established, ISS_1 + 1, 1, clock.now(), &counters); |
| assert_eq!( |
| established.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_2 + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(&TEST_BYTES[1..2]), |
| )) |
| ); |
| |
| // Open up the window by 10 bytes, but the MSS is limited to 2 bytes. |
| open_window(&mut established, ISS_1 + 2, 10, clock.now(), &counters); |
| assert_eq!( |
| established.poll_send_with_default_options(2, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 2, |
| ISS_2 + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(&TEST_BYTES[2..4]), |
| )) |
| ); |
| |
| assert_eq!( |
| established.poll_send( |
| &counters, |
| 1, |
| clock.now(), |
| &SocketOptions { nagle_enabled: false, ..SocketOptions::default() } |
| ), |
| Some(Segment::data( |
| ISS_1 + 4, |
| ISS_2 + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(&TEST_BYTES[4..5]), |
| )) |
| ); |
| |
| // We've exhausted our send buffer. |
| assert_eq!(established.poll_send_with_default_options(1, clock.now(), &counters), None); |
| } |
| |
| #[test] |
| fn self_connect_retransmission() { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let (syn_sent, syn) = Closed::<Initial>::connect( |
| ISS_1, |
| clock.now(), |
| (), |
| Default::default(), |
| DEVICE_MAXIMUM_SEGMENT_SIZE, |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| &SocketOptions::default(), |
| ); |
| let mut state = State::<_, RingBuffer, RingBuffer, ()>::SynSent(syn_sent); |
| // Retransmission timer should be installed. |
| assert_eq!(state.poll_send_at(), Some(FakeInstant::from(Estimator::RTO_INIT))); |
| clock.sleep(Estimator::RTO_INIT); |
| // The SYN segment should be retransmitted. |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(syn.into()) |
| ); |
| |
| // Bring the state to SYNRCVD. |
| let (seg, passive_open) = state |
| .on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| syn, |
| clock.now(), |
| &counters, |
| ); |
| let syn_ack = seg.expect("expected SYN-ACK"); |
| assert_eq!(passive_open, None); |
| // Retransmission timer should be installed. |
| assert_eq!(state.poll_send_at(), Some(clock.now() + Estimator::RTO_INIT)); |
| clock.sleep(Estimator::RTO_INIT); |
| // The SYN-ACK segment should be retransmitted. |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(syn_ack.into()) |
| ); |
| |
| // Bring the state to ESTABLISHED and write some data. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| syn_ack, |
| clock.now(), |
| &counters, |
| ), |
| (Some(Segment::ack(ISS_1 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX))), None) |
| ); |
| match state { |
| State::Closed(_) |
| | State::Listen(_) |
| | State::SynRcvd(_) |
| | State::SynSent(_) |
| | State::LastAck(_) |
| | State::FinWait1(_) |
| | State::FinWait2(_) |
| | State::Closing(_) |
| | State::TimeWait(_) => { |
| panic!("expected that we have entered established state, but got {:?}", state) |
| } |
| State::Established(Established { ref mut snd, rcv: _ }) |
| | State::CloseWait(CloseWait { ref mut snd, last_ack: _, last_wnd: _ }) => { |
| assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len()); |
| } |
| } |
| // We have no outstanding segments, so there is no retransmission timer. |
| assert_eq!(state.poll_send_at(), None); |
| // The retransmission timer should backoff exponentially. |
| for i in 0..3 { |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(TEST_BYTES), |
| )) |
| ); |
| assert_eq!(state.poll_send_at(), Some(clock.now() + (1 << i) * Estimator::RTO_INIT)); |
| clock.sleep((1 << i) * Estimator::RTO_INIT); |
| assert_eq!(counters.retransmits.get(), i); |
| assert_eq!(counters.timeouts.get(), i); |
| } |
| // The receiver acks the first byte of the payload. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack( |
| ISS_1 + 1 + TEST_BYTES.len(), |
| ISS_1 + 1 + 1, |
| UnscaledWindowSize::from(u16::MAX) |
| ), |
| clock.now(), |
| &counters, |
| ), |
| (None, None), |
| ); |
| // The timer is rearmed, and the current RTO after 2 retransmissions |
| // should be 4s (1s, 2s, 4s). |
| assert_eq!(state.poll_send_at(), Some(clock.now() + 4 * Estimator::RTO_INIT)); |
| clock.sleep(4 * Estimator::RTO_INIT); |
| assert_eq!( |
| state.poll_send_with_default_options(1, clock.now(), &counters,), |
| Some(Segment::data( |
| ISS_1 + 1 + 1, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(&TEST_BYTES[1..2]), |
| )) |
| ); |
| // Currently, snd.nxt = ISS_1 + 2, snd.max = ISS_1 + 5, a segment |
| // with ack number ISS_1 + 4 should bump snd.nxt immediately. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack( |
| ISS_1 + 1 + TEST_BYTES.len(), |
| ISS_1 + 1 + 3, |
| UnscaledWindowSize::from(u16::MAX) |
| ), |
| clock.now(), |
| &counters, |
| ), |
| (None, None) |
| ); |
| // Since we retransmitted once more, the RTO is now 8s. |
| assert_eq!(counters.retransmits.get(), 3); |
| assert_eq!(counters.timeouts.get(), 3); |
| assert_eq!(state.poll_send_at(), Some(clock.now() + 8 * Estimator::RTO_INIT)); |
| assert_eq!( |
| state.poll_send_with_default_options(1, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 1 + 3, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(&TEST_BYTES[3..4]), |
| )) |
| ); |
| // Finally the receiver ACKs all the outstanding data. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack( |
| ISS_1 + 1 + TEST_BYTES.len(), |
| ISS_1 + 1 + TEST_BYTES.len(), |
| UnscaledWindowSize::from(u16::MAX) |
| ), |
| clock.now(), |
| &counters |
| ), |
| (None, None) |
| ); |
| // The retransmission timer should be removed. |
| assert_eq!(state.poll_send_at(), None); |
| } |
| |
| #[test] |
| fn passive_close() { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut send_buffer = RingBuffer::new(BUFFER_SIZE); |
| assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5); |
| // Set up the state machine to start with Established. |
| let mut state = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: send_buffer.clone(), |
| wl1: ISS_2, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::default(), |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()), |
| }, |
| }); |
| let last_wnd = WindowSize::new(BUFFER_SIZE - 1).unwrap(); |
| // Transition the state machine to CloseWait by sending a FIN. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::fin(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)), |
| clock.now(), |
| &counters, |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + 1, |
| ISS_2 + 2, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE - 1) |
| )), |
| None |
| ) |
| ); |
| // Then call CLOSE to transition the state machine to LastAck. |
| assert_eq!( |
| state.close(&counters, CloseReason::Shutdown, &SocketOptions::default()), |
| Ok(()) |
| ); |
| assert_eq!( |
| state, |
| State::LastAck(LastAck { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: send_buffer, |
| wl1: ISS_2, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::default(), |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| last_ack: ISS_2 + 2, |
| last_wnd, |
| }) |
| ); |
| // When the send window is not big enough, there should be no FIN. |
| assert_eq!( |
| state.poll_send_with_default_options(2, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_2 + 2, |
| last_wnd >> WindowScale::default(), |
| SendPayload::Contiguous(&TEST_BYTES[..2]), |
| )) |
| ); |
| // We should be able to send out all remaining bytes together with a FIN. |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(Segment::piggybacked_fin( |
| ISS_1 + 3, |
| ISS_2 + 2, |
| last_wnd >> WindowScale::default(), |
| SendPayload::Contiguous(&TEST_BYTES[2..]), |
| )) |
| ); |
| // Now let's test we retransmit correctly by only acking the data. |
| clock.sleep(RTT); |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack( |
| ISS_2 + 2, |
| ISS_1 + 1 + TEST_BYTES.len(), |
| UnscaledWindowSize::from(u16::MAX) |
| ), |
| clock.now(), |
| &counters, |
| ), |
| (None, None) |
| ); |
| assert_eq!(state.poll_send_at(), Some(clock.now() + Estimator::RTO_INIT)); |
| clock.sleep(Estimator::RTO_INIT); |
| // The FIN should be retransmitted. |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some( |
| Segment::fin( |
| ISS_1 + 1 + TEST_BYTES.len(), |
| ISS_2 + 2, |
| last_wnd >> WindowScale::default() |
| ) |
| .into() |
| ) |
| ); |
| |
| // Finally, our FIN is acked. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack( |
| ISS_2 + 2, |
| ISS_1 + 1 + TEST_BYTES.len() + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| ), |
| clock.now(), |
| &counters, |
| ), |
| (None, None) |
| ); |
| // The connection is closed. |
| assert_eq!(state, State::Closed(Closed { reason: None })); |
| assert_eq!(counters.established_closed.get(), 1); |
| assert_eq!(counters.established_timedout.get(), 0); |
| assert_eq!(counters.established_resets.get(), 0); |
| } |
| |
| #[test] |
| fn syn_rcvd_active_close() { |
| let counters = TcpCountersInner::default(); |
| let mut state: State<_, RingBuffer, NullBuffer, ()> = State::SynRcvd(SynRcvd { |
| iss: ISS_1, |
| irs: ISS_2, |
| timestamp: None, |
| retrans_timer: RetransTimer { |
| at: FakeInstant::default(), |
| rto: Duration::new(0, 0), |
| user_timeout_until: FakeInstant::from(DEFAULT_USER_TIMEOUT), |
| remaining_retries: Some(DEFAULT_MAX_RETRIES), |
| }, |
| simultaneous_open: Some(()), |
| buffer_sizes: Default::default(), |
| smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale: WindowScale::default(), |
| snd_wnd_scale: Some(WindowScale::default()), |
| }); |
| assert_eq!( |
| state.close(&counters, CloseReason::Shutdown, &SocketOptions::default()), |
| Ok(()) |
| ); |
| assert_matches!(state, State::FinWait1(_)); |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, FakeInstant::default(), &counters), |
| Some(Segment::fin(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX)).into()) |
| ); |
| } |
| |
| #[test] |
| fn established_active_close() { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut send_buffer = RingBuffer::new(BUFFER_SIZE); |
| assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5); |
| // Set up the state machine to start with Established. |
| let mut state = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: send_buffer.clone(), |
| wl1: ISS_2, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::default(), |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(Mss( |
| NonZeroU16::new(5).unwrap() |
| )), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: Mss(NonZeroU16::new(5).unwrap()), |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()), |
| }, |
| }); |
| assert_eq!( |
| state.close(&counters, CloseReason::Shutdown, &SocketOptions::default()), |
| Ok(()) |
| ); |
| assert_matches!(state, State::FinWait1(_)); |
| assert_eq!( |
| state.close(&counters, CloseReason::Shutdown, &SocketOptions::default()), |
| Err(CloseError::Closing) |
| ); |
| |
| // Poll for 2 bytes. |
| assert_eq!( |
| state.poll_send_with_default_options(2, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_2 + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(&TEST_BYTES[..2]) |
| )) |
| ); |
| |
| // And we should send the rest of the buffer together with the FIN. |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(Segment::piggybacked_fin( |
| ISS_1 + 3, |
| ISS_2 + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(&TEST_BYTES[2..]) |
| )) |
| ); |
| |
| // Test that the recv state works in FIN_WAIT_1. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::data( |
| ISS_2 + 1, |
| ISS_1 + 1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| TEST_BYTES |
| ), |
| clock.now(), |
| &counters, |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + TEST_BYTES.len() + 2, |
| ISS_2 + TEST_BYTES.len() + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE - TEST_BYTES.len()), |
| )), |
| None |
| ) |
| ); |
| |
| assert_eq!( |
| state.read_with(|avail| { |
| let got = avail.concat(); |
| assert_eq!(got, TEST_BYTES); |
| got.len() |
| }), |
| TEST_BYTES.len() |
| ); |
| |
| // The retrans timer should be installed correctly. |
| assert_eq!(state.poll_send_at(), Some(clock.now() + Estimator::RTO_INIT)); |
| |
| // Because only the first byte was acked, we need to retransmit. |
| clock.sleep(Estimator::RTO_INIT); |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(Segment::piggybacked_fin( |
| ISS_1 + 2, |
| ISS_2 + TEST_BYTES.len() + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(&TEST_BYTES[1..]), |
| )) |
| ); |
| |
| // Now our FIN is acked, we should transition to FinWait2. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack( |
| ISS_2 + TEST_BYTES.len() + 1, |
| ISS_1 + TEST_BYTES.len() + 2, |
| UnscaledWindowSize::from(u16::MAX) |
| ), |
| clock.now(), |
| &counters, |
| ), |
| (None, None) |
| ); |
| assert_matches!(state, State::FinWait2(_)); |
| |
| // Test that the recv state works in FIN_WAIT_2. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::data( |
| ISS_2 + 1 + TEST_BYTES.len(), |
| ISS_1 + TEST_BYTES.len() + 2, |
| UnscaledWindowSize::from(u16::MAX), |
| TEST_BYTES |
| ), |
| clock.now(), |
| &counters, |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + TEST_BYTES.len() + 2, |
| ISS_2 + 2 * TEST_BYTES.len() + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE - TEST_BYTES.len()), |
| )), |
| None |
| ) |
| ); |
| |
| assert_eq!( |
| state.read_with(|avail| { |
| let got = avail.concat(); |
| assert_eq!(got, TEST_BYTES); |
| got.len() |
| }), |
| TEST_BYTES.len() |
| ); |
| |
| // Should ack the FIN and transition to TIME_WAIT. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::fin( |
| ISS_2 + 2 * TEST_BYTES.len() + 1, |
| ISS_1 + TEST_BYTES.len() + 2, |
| UnscaledWindowSize::from(u16::MAX) |
| ), |
| clock.now(), |
| &counters, |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + TEST_BYTES.len() + 2, |
| ISS_2 + 2 * TEST_BYTES.len() + 2, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE - 1), |
| )), |
| None |
| ) |
| ); |
| |
| assert_matches!(state, State::TimeWait(_)); |
| |
| const SMALLEST_DURATION: Duration = Duration::from_secs(1); |
| assert_eq!(state.poll_send_at(), Some(clock.now() + MSL * 2)); |
| clock.sleep(MSL * 2 - SMALLEST_DURATION); |
| // The state should still be in time wait before the time out. |
| assert_eq!(state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), None); |
| assert_matches!(state, State::TimeWait(_)); |
| clock.sleep(SMALLEST_DURATION); |
| // The state should become closed. |
| assert_eq!(state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), None); |
| assert_eq!(state, State::Closed(Closed { reason: None })); |
| assert_eq!(counters.established_closed.get(), 1); |
| assert_eq!(counters.established_timedout.get(), 0); |
| assert_eq!(counters.established_resets.get(), 0); |
| } |
| |
| #[test] |
| fn fin_wait_1_fin_ack_to_time_wait() { |
| let counters = TcpCountersInner::default(); |
| // Test that we can transition from FIN-WAIT-2 to TIME-WAIT directly |
| // with one FIN-ACK segment. |
| let mut state = State::FinWait1(FinWait1 { |
| snd: Send { |
| nxt: ISS_1 + 2, |
| max: ISS_1 + 2, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: NullBuffer, |
| wl1: ISS_2, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::default(), |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()), |
| }, |
| }); |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::fin(ISS_2 + 1, ISS_1 + 2, UnscaledWindowSize::from(u16::MAX)), |
| FakeInstant::default(), |
| &counters, |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + 2, |
| ISS_2 + 2, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE - 1) |
| )), |
| None |
| ), |
| ); |
| assert_matches!(state, State::TimeWait(_)); |
| } |
| |
| #[test] |
| fn simultaneous_close() { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut send_buffer = RingBuffer::new(BUFFER_SIZE); |
| assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5); |
| // Set up the state machine to start with Established. |
| let mut state = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: send_buffer.clone(), |
| wl1: ISS_2, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::default(), |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| assembler: Assembler::new(ISS_1 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_1 + 1, WindowSize::new(BUFFER_SIZE).unwrap()), |
| }, |
| }); |
| assert_eq!( |
| state.close(&counters, CloseReason::Shutdown, &SocketOptions::default()), |
| Ok(()) |
| ); |
| assert_matches!(state, State::FinWait1(_)); |
| assert_eq!( |
| state.close(&counters, CloseReason::Shutdown, &SocketOptions::default()), |
| Err(CloseError::Closing) |
| ); |
| |
| let fin = state.poll_send_with_default_options(u32::MAX, clock.now(), &counters); |
| assert_eq!( |
| fin, |
| Some(Segment::piggybacked_fin( |
| ISS_1 + 1, |
| ISS_1 + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(TEST_BYTES), |
| )) |
| ); |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::piggybacked_fin( |
| ISS_1 + 1, |
| ISS_1 + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(TEST_BYTES), |
| ), |
| clock.now(), |
| &counters, |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + TEST_BYTES.len() + 2, |
| ISS_1 + TEST_BYTES.len() + 2, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE - TEST_BYTES.len() - 1), |
| )), |
| None |
| ) |
| ); |
| |
| // We have a self connection, feeding the FIN packet we generated should |
| // make us transition to CLOSING. |
| assert_matches!(state, State::Closing(_)); |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack( |
| ISS_1 + TEST_BYTES.len() + 2, |
| ISS_1 + TEST_BYTES.len() + 2, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE - TEST_BYTES.len() - 1), |
| ), |
| clock.now(), |
| &counters, |
| ), |
| (None, None) |
| ); |
| |
| // And feeding the ACK we produced for FIN should make us transition to |
| // TIME-WAIT. |
| assert_matches!(state, State::TimeWait(_)); |
| |
| const SMALLEST_DURATION: Duration = Duration::from_secs(1); |
| assert_eq!(state.poll_send_at(), Some(clock.now() + MSL * 2)); |
| clock.sleep(MSL * 2 - SMALLEST_DURATION); |
| // The state should still be in time wait before the time out. |
| assert_eq!(state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), None); |
| assert_matches!(state, State::TimeWait(_)); |
| clock.sleep(SMALLEST_DURATION); |
| // The state should become closed. |
| assert_eq!(state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), None); |
| assert_eq!(state, State::Closed(Closed { reason: None })); |
| assert_eq!(counters.established_closed.get(), 1); |
| assert_eq!(counters.established_timedout.get(), 0); |
| assert_eq!(counters.established_resets.get(), 0); |
| } |
| |
| #[test] |
| fn time_wait_restarts_timer() { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut time_wait = State::<_, NullBuffer, NullBuffer, ()>::TimeWait(TimeWait { |
| last_seq: ISS_1 + 2, |
| last_ack: ISS_2 + 2, |
| last_wnd: WindowSize::DEFAULT, |
| last_wnd_scale: WindowScale::default(), |
| expiry: new_time_wait_expiry(clock.now()), |
| }); |
| |
| assert_eq!(time_wait.poll_send_at(), Some(clock.now() + MSL * 2)); |
| clock.sleep(Duration::from_secs(1)); |
| assert_eq!( |
| time_wait.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::fin(ISS_2 + 2, ISS_1 + 2, UnscaledWindowSize::from(u16::MAX)), |
| clock.now(), |
| &counters, |
| ), |
| (Some(Segment::ack(ISS_1 + 2, ISS_2 + 2, UnscaledWindowSize::from(u16::MAX))), None), |
| ); |
| assert_eq!(time_wait.poll_send_at(), Some(clock.now() + MSL * 2)); |
| } |
| |
| #[test_case( |
| State::Established(Established { |
| snd: Send { |
| nxt: ISS_1, |
| max: ISS_1, |
| una: ISS_1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: NullBuffer, |
| wl1: ISS_2, |
| wl2: ISS_1, |
| rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 }, |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::default(), |
| assembler: Assembler::new(ISS_2 + 5), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 5, WindowSize::DEFAULT), |
| }, |
| }), |
| Segment::data(ISS_2, ISS_1, UnscaledWindowSize::from(u16::MAX), TEST_BYTES) => |
| Some(Segment::ack(ISS_1, ISS_2 + 5, UnscaledWindowSize::from(u16::MAX))); "retransmit data" |
| )] |
| #[test_case( |
| State::SynRcvd(SynRcvd { |
| iss: ISS_1, |
| irs: ISS_2, |
| timestamp: None, |
| retrans_timer: RetransTimer { |
| at: FakeInstant::default(), |
| rto: Duration::new(0, 0), |
| user_timeout_until: FakeInstant::from(DEFAULT_USER_TIMEOUT), |
| remaining_retries: Some(DEFAULT_MAX_RETRIES), |
| }, |
| simultaneous_open: None, |
| buffer_sizes: BufferSizes::default(), |
| smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale: WindowScale::default(), |
| snd_wnd_scale: Some(WindowScale::default()), |
| }), |
| Segment::syn_ack(ISS_2, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX), Options { mss: None, window_scale: Some(WindowScale::default()) }).into() => |
| Some(Segment::ack(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX))); "retransmit syn_ack" |
| )] |
| // Regression test for https://fxbug.dev/42058963 |
| fn ack_to_retransmitted_segment( |
| mut state: State<FakeInstant, RingBuffer, NullBuffer, ()>, |
| seg: Segment<&[u8]>, |
| ) -> Option<Segment<()>> { |
| let counters = TcpCountersInner::default(); |
| let (reply, _): (_, Option<()>) = state |
| .on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| seg, |
| FakeInstant::default(), |
| &counters, |
| ); |
| reply |
| } |
| |
| #[test] |
| fn fast_retransmit() { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut send_buffer = RingBuffer::default(); |
| for b in b'A'..=b'D' { |
| assert_eq!( |
| send_buffer.enqueue_data(&[b; DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE]), |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE |
| ); |
| } |
| let mut state: State<_, _, _, ()> = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1, |
| max: ISS_1, |
| una: ISS_1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: send_buffer, |
| wl1: ISS_2, |
| wl2: ISS_1, |
| rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 }, |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::default(), |
| assembler: Assembler::new(ISS_2), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2, WindowSize::DEFAULT), |
| }, |
| }); |
| |
| assert_eq!( |
| state.poll_send_with_default_options( |
| u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| clock.now(), |
| &counters, |
| ), |
| Some(Segment::data( |
| ISS_1, |
| ISS_2, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(&[b'A'; DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE]) |
| )) |
| ); |
| |
| let mut dup_ack = |expected_byte: u8, counters: &TcpCountersInner| { |
| clock.sleep(Duration::from_millis(10)); |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack(ISS_2, ISS_1, UnscaledWindowSize::from(u16::MAX)), |
| clock.now(), |
| counters, |
| ), |
| (None, None) |
| ); |
| |
| assert_eq!( |
| state.poll_send_with_default_options( |
| u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| clock.now(), |
| counters, |
| ), |
| Some(Segment::data( |
| ISS_1 |
| + u32::from(expected_byte - b'A') |
| * u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| ISS_2, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous( |
| &[expected_byte; DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE] |
| ) |
| )) |
| ); |
| }; |
| |
| // The first two dup acks should allow two previously unsent segments |
| // into the network. |
| assert_eq!(counters.fast_retransmits.get(), 0); |
| assert_eq!(counters.fast_recovery.get(), 0); |
| dup_ack(b'B', &counters); |
| assert_eq!(counters.fast_retransmits.get(), 0); |
| assert_eq!(counters.fast_recovery.get(), 1); |
| dup_ack(b'C', &counters); |
| assert_eq!(counters.fast_retransmits.get(), 0); |
| assert_eq!(counters.fast_recovery.get(), 1); |
| // The third dup ack will cause a fast retransmit of the first segment |
| // at snd.una. |
| dup_ack(b'A', &counters); |
| assert_eq!(counters.fast_retransmits.get(), 1); |
| assert_eq!(counters.fast_recovery.get(), 1); |
| // Afterwards, we continue to send previously unsent data if allowed. |
| dup_ack(b'D', &counters); |
| assert_eq!(counters.fast_retransmits.get(), 1); |
| assert_eq!(counters.fast_recovery.get(), 1); |
| |
| // Make sure the window size is deflated after loss is recovered. |
| clock.sleep(Duration::from_millis(10)); |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack( |
| ISS_2, |
| ISS_1 + u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| UnscaledWindowSize::from(u16::MAX) |
| ), |
| clock.now(), |
| &counters, |
| ), |
| (None, None) |
| ); |
| let established = assert_matches!(state, State::Established(established) => established); |
| assert_eq!( |
| u32::from(established.snd.congestion_control.cwnd()), |
| 2 * u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE) |
| ); |
| } |
| |
| #[test] |
| fn keep_alive() { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut state: State<_, _, _, ()> = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1, |
| max: ISS_1, |
| una: ISS_1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: RingBuffer::default(), |
| wl1: ISS_2, |
| wl2: ISS_1, |
| rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 }, |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::default(), |
| assembler: Assembler::new(ISS_2), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2, WindowSize::DEFAULT), |
| }, |
| }); |
| |
| let socket_options = { |
| let mut socket_options = SocketOptions::default(); |
| socket_options.keep_alive.enabled = true; |
| socket_options |
| }; |
| let socket_options = &socket_options; |
| let keep_alive = &socket_options.keep_alive; |
| |
| // Currently we have nothing to send, |
| assert_eq!( |
| state.poll_send( |
| &counters, |
| u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| clock.now(), |
| socket_options, |
| ), |
| None, |
| ); |
| // so the above poll_send call will install a timer, which will fire |
| // after `keep_alive.idle`. |
| assert_eq!(state.poll_send_at(), Some(clock.now().add(keep_alive.idle.into()))); |
| |
| // Now we receive an ACK after an hour. |
| clock.sleep(Duration::from_secs(60 * 60)); |
| assert_eq!( |
| state.on_segment::<&[u8], ClientlessBufferProvider>( |
| &counters, |
| Segment::ack(ISS_2, ISS_1, UnscaledWindowSize::from(u16::MAX)).into(), |
| clock.now(), |
| socket_options, |
| false, /* defunct */ |
| ), |
| (None, None, DataAcked::No) |
| ); |
| // the timer is reset to fire in 2 hours. |
| assert_eq!(state.poll_send_at(), Some(clock.now().add(keep_alive.idle.into())),); |
| clock.sleep(keep_alive.idle.into()); |
| |
| // Then there should be `count` probes being sent out after `count` |
| // `interval` seconds. |
| for _ in 0..keep_alive.count.get() { |
| assert_eq!( |
| state.poll_send( |
| &counters, |
| u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| clock.now(), |
| socket_options, |
| ), |
| Some(Segment::ack(ISS_1 - 1, ISS_2, UnscaledWindowSize::from(u16::MAX)).into()) |
| ); |
| clock.sleep(keep_alive.interval.into()); |
| assert_matches!(state, State::Established(_)); |
| } |
| |
| // At this time the connection is closed and we don't have anything to |
| // send. |
| assert_eq!( |
| state.poll_send( |
| &counters, |
| u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| clock.now(), |
| socket_options, |
| ), |
| None, |
| ); |
| assert_eq!(state, State::Closed(Closed { reason: Some(ConnectionError::TimedOut) })); |
| assert_eq!(counters.established_closed.get(), 1); |
| assert_eq!(counters.established_timedout.get(), 1); |
| assert_eq!(counters.established_resets.get(), 0); |
| } |
| |
| /// A `SendBuffer` that doesn't allow peeking some number of bytes. |
| #[derive(Debug)] |
| struct ReservingBuffer<B> { |
| buffer: B, |
| reserved_bytes: usize, |
| } |
| |
| impl<B: Buffer> Buffer for ReservingBuffer<B> { |
| fn limits(&self) -> BufferLimits { |
| self.buffer.limits() |
| } |
| |
| fn target_capacity(&self) -> usize { |
| self.buffer.target_capacity() |
| } |
| |
| fn request_capacity(&mut self, size: usize) { |
| self.buffer.request_capacity(size) |
| } |
| } |
| |
| impl<B: Takeable> Takeable for ReservingBuffer<B> { |
| fn take(&mut self) -> Self { |
| let Self { buffer, reserved_bytes } = self; |
| Self { buffer: buffer.take(), reserved_bytes: *reserved_bytes } |
| } |
| } |
| |
| impl<B: SendBuffer> SendBuffer for ReservingBuffer<B> { |
| fn mark_read(&mut self, count: usize) { |
| self.buffer.mark_read(count) |
| } |
| |
| fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R |
| where |
| F: FnOnce(SendPayload<'a>) -> R, |
| { |
| let Self { buffer, reserved_bytes } = self; |
| buffer.peek_with(offset, |payload| { |
| let len = payload.len(); |
| let new_len = len.saturating_sub(*reserved_bytes); |
| f(payload.slice(0..new_len.try_into().unwrap_or(u32::MAX))) |
| }) |
| } |
| } |
| |
| #[test_case(true, 0)] |
| #[test_case(false, 0)] |
| #[test_case(true, 1)] |
| #[test_case(false, 1)] |
| fn poll_send_len(has_fin: bool, reserved_bytes: usize) { |
| const VALUE: u8 = 0xaa; |
| |
| fn with_poll_send_result<const HAS_FIN: bool>( |
| f: impl FnOnce(Segment<SendPayload<'_>>), |
| reserved_bytes: usize, |
| ) { |
| const DATA_LEN: usize = 40; |
| let buffer = ReservingBuffer { |
| buffer: RingBuffer::with_data(DATA_LEN, &vec![VALUE; DATA_LEN]), |
| reserved_bytes, |
| }; |
| assert_eq!(buffer.limits().len, DATA_LEN); |
| |
| let mut snd = Send::<FakeInstant, _, HAS_FIN> { |
| nxt: ISS_1, |
| max: ISS_1, |
| una: ISS_1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer, |
| wl1: ISS_2, |
| wl2: ISS_1, |
| rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 }, |
| last_seq_ts: None, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }; |
| |
| let counters = TcpCountersInner::default(); |
| |
| f(snd |
| .poll_send( |
| &counters, |
| ISS_1, |
| WindowSize::DEFAULT, |
| u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| FakeInstant::default(), |
| &SocketOptions::default(), |
| ) |
| .expect("has data")) |
| } |
| |
| let f = |segment: Segment<SendPayload<'_>>| { |
| let Segment { contents, ack: _, seq: _, wnd: _, options: _ } = segment; |
| let contents_len = contents.data().len(); |
| |
| if has_fin && reserved_bytes == 0 { |
| assert_eq!( |
| contents.len(), |
| u32::try_from(contents_len + 1).unwrap(), |
| "FIN not accounted for" |
| ); |
| } else { |
| assert_eq!(contents.len(), u32::try_from(contents_len).unwrap()); |
| } |
| |
| let mut target = vec![0; contents_len]; |
| contents.data().partial_copy(0, target.as_mut_slice()); |
| assert_eq!(target, vec![VALUE; contents_len]); |
| }; |
| match has_fin { |
| true => with_poll_send_result::<true>(f, reserved_bytes), |
| false => with_poll_send_result::<false>(f, reserved_bytes), |
| } |
| } |
| |
| #[test] |
| fn zero_window_probe() { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut send_buffer = RingBuffer::new(BUFFER_SIZE); |
| assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5); |
| // Set up the state machine to start with Established. |
| let mut state = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::ZERO, |
| wnd_max: WindowSize::ZERO, |
| buffer: send_buffer.clone(), |
| wl1: ISS_2, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::default(), |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()), |
| }, |
| }); |
| assert_eq!(state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), None); |
| assert_eq!(state.poll_send_at(), Some(clock.now().add(Estimator::RTO_INIT))); |
| |
| // Send the first probe after first RTO. |
| clock.sleep(Estimator::RTO_INIT); |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_2 + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(&TEST_BYTES[0..1]) |
| )) |
| ); |
| |
| // The receiver still has a zero window. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(0)), |
| clock.now(), |
| &counters, |
| ), |
| (None, None) |
| ); |
| // The timer should backoff exponentially. |
| assert_eq!(state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), None); |
| assert_eq!(state.poll_send_at(), Some(clock.now().add(Estimator::RTO_INIT * 2))); |
| |
| // No probe should be sent before the timeout. |
| clock.sleep(Estimator::RTO_INIT); |
| assert_eq!(state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), None); |
| |
| // Probe sent after the timeout. |
| clock.sleep(Estimator::RTO_INIT); |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_2 + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(&TEST_BYTES[0..1]) |
| )) |
| ); |
| |
| // The receiver now opens its receive window. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::ack(ISS_2 + 1, ISS_1 + 2, UnscaledWindowSize::from(u16::MAX)), |
| clock.now(), |
| &counters, |
| ), |
| (None, None) |
| ); |
| assert_eq!(state.poll_send_at(), None); |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 2, |
| ISS_2 + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(&TEST_BYTES[1..]) |
| )) |
| ); |
| } |
| |
| #[test] |
| fn nagle() { |
| let clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut send_buffer = RingBuffer::new(BUFFER_SIZE); |
| assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5); |
| // Set up the state machine to start with Established. |
| let mut state: State<_, _, _, ()> = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: send_buffer.clone(), |
| wl1: ISS_2, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::default(), |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()), |
| }, |
| }); |
| let mut socket_options = SocketOptions::default(); |
| assert_eq!( |
| state.poll_send(&counters, 3, clock.now(), &socket_options), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_2 + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(&TEST_BYTES[0..3]) |
| )) |
| ); |
| assert_eq!(state.poll_send(&counters, 3, clock.now(), &socket_options), None); |
| socket_options.nagle_enabled = false; |
| assert_eq!( |
| state.poll_send(&counters, 3, clock.now(), &socket_options), |
| Some(Segment::data( |
| ISS_1 + 4, |
| ISS_2 + 1, |
| UnscaledWindowSize::from_usize(BUFFER_SIZE), |
| SendPayload::Contiguous(&TEST_BYTES[3..5]) |
| )) |
| ); |
| } |
| |
| #[test] |
| fn mss_option() { |
| let clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let (syn_sent, syn) = Closed::<Initial>::connect( |
| ISS_1, |
| clock.now(), |
| (), |
| Default::default(), |
| Mss(const_unwrap_option(NonZeroU16::new(1))), |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| &SocketOptions::default(), |
| ); |
| let mut state = State::<_, RingBuffer, RingBuffer, ()>::SynSent(syn_sent); |
| |
| // Bring the state to SYNRCVD. |
| let (seg, passive_open) = state |
| .on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| syn, |
| clock.now(), |
| &counters, |
| ); |
| let syn_ack = seg.expect("expected SYN-ACK"); |
| assert_eq!(passive_open, None); |
| |
| // Bring the state to ESTABLISHED and write some data. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| syn_ack, |
| clock.now(), |
| &counters, |
| ), |
| (Some(Segment::ack(ISS_1 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX))), None) |
| ); |
| match state { |
| State::Closed(_) |
| | State::Listen(_) |
| | State::SynRcvd(_) |
| | State::SynSent(_) |
| | State::LastAck(_) |
| | State::FinWait1(_) |
| | State::FinWait2(_) |
| | State::Closing(_) |
| | State::TimeWait(_) => { |
| panic!("expected that we have entered established state, but got {:?}", state) |
| } |
| State::Established(Established { ref mut snd, rcv: _ }) |
| | State::CloseWait(CloseWait { ref mut snd, last_ack: _, last_wnd: _ }) => { |
| assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len()); |
| } |
| } |
| // Since the MSS of the connection is 1, we can only get the first byte. |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(&TEST_BYTES[..1]), |
| )) |
| ); |
| } |
| |
| // We can use a smaller and a larger RTT so that when using the smaller one, |
| // we can reach maximum retransmit retires and when using the larger one, we |
| // timeout before reaching maximum retries. |
| #[test_case(Duration::from_millis(1), false, true; "retrans_max_retries")] |
| #[test_case(Duration::from_secs(1), false, false; "retrans_no_max_retries")] |
| #[test_case(Duration::from_millis(1), true, true; "zwp_max_retries")] |
| #[test_case(Duration::from_secs(1), true, false; "zwp_no_max_retires")] |
| fn user_timeout(rtt: Duration, zero_window_probe: bool, max_retries: bool) { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut send_buffer = RingBuffer::new(BUFFER_SIZE); |
| assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5); |
| // Set up the state machine to start with Established. |
| let mut state: State<_, _, _, ()> = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: send_buffer.clone(), |
| wl1: ISS_2, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::Measured { srtt: rtt, rtt_var: Duration::ZERO }, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::DEFAULT), |
| }, |
| }); |
| let mut times = 1; |
| let start = clock.now(); |
| while let Some(seg) = state.poll_send_with_default_options(u32::MAX, clock.now(), &counters) |
| { |
| if zero_window_probe { |
| let zero_window_ack = |
| Segment::ack(seg.ack.unwrap(), seg.seq, UnscaledWindowSize::from(0)); |
| assert_eq!( |
| state.on_segment_with_default_options::<(), ClientlessBufferProvider>( |
| zero_window_ack, |
| clock.now(), |
| &counters, |
| ), |
| (None, None) |
| ); |
| } |
| let deadline = state.poll_send_at().expect("must have a retransmission timer"); |
| clock.sleep(deadline.duration_since(clock.now())); |
| times += 1; |
| } |
| let elapsed = clock.now().duration_since(start); |
| assert_eq!(elapsed, DEFAULT_USER_TIMEOUT); |
| if max_retries { |
| assert_eq!(times, DEFAULT_MAX_RETRIES.get()); |
| } else { |
| assert!(times < DEFAULT_MAX_RETRIES.get()); |
| } |
| assert_eq!(state, State::Closed(Closed { reason: Some(ConnectionError::TimedOut) })); |
| assert_eq!(counters.established_closed.get(), 1); |
| assert_eq!(counters.established_timedout.get(), 1); |
| assert_eq!(counters.established_resets.get(), 0); |
| } |
| |
| #[test] |
| fn retrans_timer_backoff() { |
| let mut clock = FakeInstantCtx::default(); |
| let mut timer = RetransTimer::new( |
| clock.now(), |
| Duration::from_secs(1), |
| DEFAULT_USER_TIMEOUT, |
| DEFAULT_MAX_RETRIES, |
| ); |
| clock.sleep(DEFAULT_USER_TIMEOUT); |
| timer.backoff(clock.now()); |
| assert_eq!(timer.at, FakeInstant::from(DEFAULT_USER_TIMEOUT)); |
| clock.sleep(Duration::from_secs(1)); |
| // The current time is now later than the timeout deadline, |
| timer.backoff(clock.now()); |
| // The timer should adjust its expiration time to be the current time |
| // instead of panicking. |
| assert_eq!(timer.at, clock.now()); |
| } |
| |
| #[test_case( |
| State::Established(Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| wl1: ISS_2, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::Measured { |
| srtt: Estimator::RTO_INIT, |
| rtt_var: Duration::ZERO, |
| }, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new( |
| TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize, |
| ), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: ( |
| ISS_2 + 1, |
| WindowSize::new(TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize).unwrap() |
| ), |
| }, |
| }); "established")] |
| #[test_case( |
| State::FinWait1(FinWait1 { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| wl1: ISS_2, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::Measured { |
| srtt: Estimator::RTO_INIT, |
| rtt_var: Duration::ZERO, |
| }, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new( |
| TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize, |
| ), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: ( |
| ISS_2 + 1, |
| WindowSize::new(TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize).unwrap() |
| ), |
| }, |
| }); "fin_wait_1")] |
| #[test_case( |
| State::FinWait2(FinWait2 { |
| last_seq: ISS_1 + 1, |
| rcv: Recv { |
| buffer: RingBuffer::new( |
| TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize, |
| ), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: ( |
| ISS_2 + 1, |
| WindowSize::new(TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize).unwrap() |
| ), |
| }, |
| timeout_at: None, |
| }); "fin_wait_2")] |
| fn delayed_ack(mut state: State<FakeInstant, RingBuffer, RingBuffer, ()>) { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| // TODO(https://fxbug.dev/42075191): Enable delayed ack by default. |
| let socket_options = SocketOptions { delayed_ack: true, ..SocketOptions::default() }; |
| assert_eq!( |
| state.on_segment::<_, ClientlessBufferProvider>( |
| &counters, |
| Segment::data( |
| ISS_2 + 1, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(TEST_BYTES), |
| ), |
| clock.now(), |
| &socket_options, |
| false, /* defunct */ |
| ), |
| (None, None, DataAcked::No) |
| ); |
| assert_eq!(state.poll_send_at(), Some(clock.now().add(ACK_DELAY_THRESHOLD))); |
| clock.sleep(ACK_DELAY_THRESHOLD); |
| assert_eq!( |
| state.poll_send(&counters, u32::MAX, clock.now(), &socket_options), |
| Some( |
| Segment::ack( |
| ISS_1 + 1, |
| ISS_2 + 1 + TEST_BYTES.len(), |
| UnscaledWindowSize::from_u32(2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE)), |
| ) |
| .into() |
| ) |
| ); |
| let full_segment_sized_payload = |
| vec![b'0'; u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize]; |
| // The first full sized segment should not trigger an immediate ACK, |
| assert_eq!( |
| state.on_segment::<_, ClientlessBufferProvider>( |
| &counters, |
| Segment::data( |
| ISS_2 + 1 + TEST_BYTES.len(), |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(&full_segment_sized_payload[..]), |
| ), |
| clock.now(), |
| &socket_options, |
| false, /* defunct */ |
| ), |
| (None, None, DataAcked::No) |
| ); |
| // ... but just a timer. |
| assert_eq!(state.poll_send_at(), Some(clock.now().add(ACK_DELAY_THRESHOLD))); |
| // Now the second full sized segment arrives, an ACK should be sent |
| // immediately. |
| assert_eq!( |
| state.on_segment::<_, ClientlessBufferProvider>( |
| &counters, |
| Segment::data( |
| ISS_2 + 1 + TEST_BYTES.len() + full_segment_sized_payload.len(), |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(&full_segment_sized_payload[..]), |
| ), |
| clock.now(), |
| &socket_options, |
| false, /* defunct */ |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + 1, |
| ISS_2 + 1 + TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE), |
| UnscaledWindowSize::from(0), |
| )), |
| None, |
| DataAcked::No, |
| ) |
| ); |
| assert_eq!(state.poll_send_at(), None); |
| } |
| |
| #[test] |
| fn immediate_ack_if_out_of_order_or_fin() { |
| let clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| // TODO(https://fxbug.dev/42075191): Enable delayed ack by default. |
| let socket_options = SocketOptions { delayed_ack: true, ..SocketOptions::default() }; |
| let mut state: State<_, _, _, ()> = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| wl1: ISS_2, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::Measured { |
| srtt: Estimator::RTO_INIT, |
| rtt_var: Duration::ZERO, |
| }, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new(TEST_BYTES.len() + 1), |
| assembler: Assembler::new(ISS_2 + 1), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2 + 1, WindowSize::new(TEST_BYTES.len() + 1).unwrap()), |
| }, |
| }); |
| // Upon receiving an out-of-order segment, we should send an ACK |
| // immediately. |
| assert_eq!( |
| state.on_segment::<_, ClientlessBufferProvider>( |
| &counters, |
| Segment::data( |
| ISS_2 + 2, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(&TEST_BYTES[1..]) |
| ), |
| clock.now(), |
| &socket_options, |
| false, /* defunct */ |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + 1, |
| ISS_2 + 1, |
| UnscaledWindowSize::from(u16::try_from(TEST_BYTES.len() + 1).unwrap()) |
| )), |
| None, |
| DataAcked::No, |
| ) |
| ); |
| assert_eq!(state.poll_send_at(), None); |
| // The next segment fills a gap, so it should trigger an immediate |
| // ACK. |
| assert_eq!( |
| state.on_segment::<_, ClientlessBufferProvider>( |
| &counters, |
| Segment::data( |
| ISS_2 + 1, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(&TEST_BYTES[..1]) |
| ), |
| clock.now(), |
| &socket_options, |
| false, /* defunct */ |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + 1, |
| ISS_2 + 1 + TEST_BYTES.len(), |
| UnscaledWindowSize::from(1), |
| )), |
| None, |
| DataAcked::No, |
| ) |
| ); |
| assert_eq!(state.poll_send_at(), None); |
| // We should also respond immediately with an ACK to a FIN. |
| assert_eq!( |
| state.on_segment::<_, ClientlessBufferProvider>( |
| &counters, |
| Segment::fin( |
| ISS_2 + 1 + TEST_BYTES.len(), |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| ), |
| clock.now(), |
| &socket_options, |
| false, /* defunct */ |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + 1, |
| ISS_2 + 1 + TEST_BYTES.len() + 1, |
| UnscaledWindowSize::from(0), |
| )), |
| None, |
| DataAcked::No, |
| ) |
| ); |
| assert_eq!(state.poll_send_at(), None); |
| } |
| |
| #[test] |
| fn fin_wait2_timeout() { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut state: State<_, _, NullBuffer, ()> = State::FinWait2(FinWait2 { |
| last_seq: ISS_1, |
| rcv: Recv { |
| buffer: NullBuffer, |
| assembler: Assembler::new(ISS_2), |
| timer: None, |
| mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_2, WindowSize::DEFAULT), |
| }, |
| timeout_at: None, |
| }); |
| assert_eq!( |
| state.close( |
| &counters, |
| CloseReason::Close { now: clock.now() }, |
| &SocketOptions::default() |
| ), |
| Err(CloseError::Closing) |
| ); |
| assert_eq!(state.poll_send_at(), Some(clock.now().add(DEFAULT_FIN_WAIT2_TIMEOUT))); |
| clock.sleep(DEFAULT_FIN_WAIT2_TIMEOUT); |
| assert_eq!(state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), None); |
| assert_eq!(state, State::Closed(Closed { reason: Some(ConnectionError::TimedOut) })); |
| assert_eq!(counters.established_closed.get(), 1); |
| assert_eq!(counters.established_timedout.get(), 1); |
| assert_eq!(counters.established_resets.get(), 0); |
| } |
| |
| #[test_case(RetransTimer { |
| user_timeout_until: FakeInstant::from(Duration::from_secs(100)), |
| remaining_retries: None, |
| at: FakeInstant::from(Duration::from_secs(1)), |
| rto: Duration::from_secs(1), |
| }, FakeInstant::from(Duration::from_secs(1)) => true)] |
| #[test_case(RetransTimer { |
| user_timeout_until: FakeInstant::from(Duration::from_secs(100)), |
| remaining_retries: None, |
| at: FakeInstant::from(Duration::from_secs(2)), |
| rto: Duration::from_secs(1), |
| }, FakeInstant::from(Duration::from_secs(1)) => false)] |
| #[test_case(RetransTimer { |
| user_timeout_until: FakeInstant::from(Duration::from_secs(100)), |
| remaining_retries: Some(NonZeroU8::new(1).unwrap()), |
| at: FakeInstant::from(Duration::from_secs(2)), |
| rto: Duration::from_secs(1), |
| }, FakeInstant::from(Duration::from_secs(1)) => false)] |
| #[test_case(RetransTimer { |
| user_timeout_until: FakeInstant::from(Duration::from_secs(1)), |
| remaining_retries: Some(NonZeroU8::new(1).unwrap()), |
| at: FakeInstant::from(Duration::from_secs(1)), |
| rto: Duration::from_secs(1), |
| }, FakeInstant::from(Duration::from_secs(1)) => true)] |
| fn send_timed_out(timer: RetransTimer<FakeInstant>, now: FakeInstant) -> bool { |
| timer.timed_out(now) |
| } |
| |
| #[test_case( |
| State::SynSent(SynSent{ |
| iss: ISS_1, |
| timestamp: Some(FakeInstant::default()), |
| retrans_timer: RetransTimer::new( |
| FakeInstant::default(), |
| Duration::from_millis(1), |
| Duration::from_secs(60), |
| DEFAULT_MAX_SYN_RETRIES, |
| ), |
| active_open: (), |
| buffer_sizes: Default::default(), |
| device_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale: WindowScale::default(), |
| }) |
| => DEFAULT_MAX_SYN_RETRIES.get(); "syn_sent")] |
| #[test_case( |
| State::SynRcvd(SynRcvd{ |
| iss: ISS_1, |
| irs: ISS_2, |
| timestamp: Some(FakeInstant::default()), |
| retrans_timer: RetransTimer::new( |
| FakeInstant::default(), |
| Duration::from_millis(1), |
| Duration::from_secs(60), |
| DEFAULT_MAX_SYNACK_RETRIES, |
| ), |
| simultaneous_open: None, |
| buffer_sizes: BufferSizes::default(), |
| smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale: WindowScale::default(), |
| snd_wnd_scale: Some(WindowScale::default()), |
| }) |
| => DEFAULT_MAX_SYNACK_RETRIES.get(); "syn_rcvd")] |
| fn handshake_timeout(mut state: State<FakeInstant, RingBuffer, RingBuffer, ()>) -> u8 { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let mut retransmissions = 0; |
| clock.sleep(Duration::from_millis(1)); |
| while let Some(_seg) = |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters) |
| { |
| let deadline = state.poll_send_at().expect("must have a retransmission timer"); |
| clock.sleep(deadline.duration_since(clock.now())); |
| retransmissions += 1; |
| } |
| assert_eq!(state, State::Closed(Closed { reason: Some(ConnectionError::TimedOut) })); |
| assert_eq!(counters.established_closed.get(), 0); |
| assert_eq!(counters.established_timedout.get(), 0); |
| assert_eq!(counters.established_resets.get(), 0); |
| retransmissions |
| } |
| |
| #[test_case( |
| u16::MAX as usize, WindowScale::default(), Some(WindowScale::default()) |
| => (WindowScale::default(), WindowScale::default()))] |
| #[test_case( |
| u16::MAX as usize + 1, WindowScale::new(1).unwrap(), Some(WindowScale::default()) |
| => (WindowScale::new(1).unwrap(), WindowScale::default()))] |
| #[test_case( |
| u16::MAX as usize + 1, WindowScale::new(1).unwrap(), None |
| => (WindowScale::default(), WindowScale::default()))] |
| #[test_case( |
| u16::MAX as usize, WindowScale::default(), Some(WindowScale::new(1).unwrap()) |
| => (WindowScale::default(), WindowScale::new(1).unwrap()))] |
| fn window_scale( |
| buffer_size: usize, |
| syn_window_scale: WindowScale, |
| syn_ack_window_scale: Option<WindowScale>, |
| ) -> (WindowScale, WindowScale) { |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| let (syn_sent, syn_seg) = Closed::<Initial>::connect( |
| ISS_1, |
| clock.now(), |
| (), |
| BufferSizes { receive: buffer_size, ..Default::default() }, |
| DEVICE_MAXIMUM_SEGMENT_SIZE, |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| &SocketOptions::default(), |
| ); |
| assert_eq!( |
| syn_seg, |
| Segment::syn( |
| ISS_1, |
| UnscaledWindowSize::from(u16::try_from(buffer_size).unwrap_or(u16::MAX)), |
| Options { |
| mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE), |
| window_scale: Some(syn_window_scale) |
| }, |
| ) |
| ); |
| let mut active = State::SynSent(syn_sent); |
| clock.sleep(RTT / 2); |
| let (seg, passive_open) = active |
| .on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::syn_ack( |
| ISS_2, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| Options { |
| mss: Some(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE), |
| window_scale: syn_ack_window_scale, |
| }, |
| ), |
| clock.now(), |
| &counters, |
| ); |
| assert_eq!(passive_open, None); |
| assert_matches!(seg, Some(_)); |
| |
| let established: Established<FakeInstant, RingBuffer, NullBuffer> = |
| assert_matches!(active, State::Established(established) => established); |
| |
| assert_eq!(established.snd.wnd, WindowSize::DEFAULT); |
| |
| (established.rcv.wnd_scale, established.snd.wnd_scale) |
| } |
| |
| #[test_case( |
| u16::MAX as usize, |
| Segment::syn_ack( |
| ISS_2 + 1 + u16::MAX as usize, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| Options::default(), |
| ) |
| )] |
| #[test_case( |
| u16::MAX as usize + 1, |
| Segment::syn_ack( |
| ISS_2 + 1 + u16::MAX as usize, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| Options::default(), |
| ) |
| )] |
| #[test_case( |
| u16::MAX as usize, |
| Segment::data( |
| ISS_2 + 1 + u16::MAX as usize, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| &TEST_BYTES[..], |
| ) |
| )] |
| #[test_case( |
| u16::MAX as usize + 1, |
| Segment::data( |
| ISS_2 + 1 + u16::MAX as usize, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| &TEST_BYTES[..], |
| ) |
| )] |
| fn window_scale_otw_seq(receive_buf_size: usize, otw_seg: impl Into<Segment<&'static [u8]>>) { |
| let counters = TcpCountersInner::default(); |
| let buffer_sizes = BufferSizes { send: 0, receive: receive_buf_size }; |
| let rcv_wnd_scale = buffer_sizes.rwnd().scale(); |
| let mut syn_rcvd: State<_, RingBuffer, RingBuffer, ()> = State::SynRcvd(SynRcvd { |
| iss: ISS_1, |
| irs: ISS_2, |
| timestamp: None, |
| retrans_timer: RetransTimer::new( |
| FakeInstant::default(), |
| Estimator::RTO_INIT, |
| Duration::from_secs(10), |
| DEFAULT_MAX_SYNACK_RETRIES, |
| ), |
| simultaneous_open: None, |
| buffer_sizes: BufferSizes { send: 0, receive: receive_buf_size }, |
| smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| rcv_wnd_scale, |
| snd_wnd_scale: WindowScale::new(1), |
| }); |
| |
| assert_eq!( |
| syn_rcvd.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| otw_seg.into(), |
| FakeInstant::default(), |
| &counters |
| ), |
| (Some(Segment::ack(ISS_1 + 1, ISS_2 + 1, buffer_sizes.rwnd_unscaled())), None), |
| ) |
| } |
| |
| #[test] |
| fn poll_send_reserving_buffer() { |
| const RESERVED_BYTES: usize = 3; |
| let mut snd: Send<FakeInstant, _, false> = Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| wnd_scale: WindowScale::default(), |
| wl1: ISS_1, |
| wl2: ISS_2, |
| buffer: ReservingBuffer { |
| buffer: RingBuffer::with_data(TEST_BYTES.len(), TEST_BYTES), |
| reserved_bytes: RESERVED_BYTES, |
| }, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::default(), |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss( |
| DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, |
| ), |
| }; |
| |
| let counters = TcpCountersInner::default(); |
| |
| assert_eq!( |
| snd.poll_send( |
| &counters, |
| ISS_2 + 1, |
| WindowSize::DEFAULT, |
| u32::MAX, |
| FakeInstant::default(), |
| &SocketOptions::default(), |
| ), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_2 + 1, |
| WindowSize::DEFAULT >> WindowScale::default(), |
| SendPayload::Contiguous(&TEST_BYTES[..TEST_BYTES.len() - RESERVED_BYTES]) |
| )) |
| ); |
| |
| assert_eq!(snd.nxt, ISS_1 + 1 + (TEST_BYTES.len() - RESERVED_BYTES)); |
| } |
| |
| #[test] |
| fn rcv_silly_window_avoidance() { |
| const MULTIPLE: usize = 3; |
| const CAP: usize = TEST_BYTES.len() * MULTIPLE; |
| let mut rcv: Recv<FakeInstant, _> = Recv { |
| buffer: RingBuffer::new(CAP), |
| assembler: Assembler::new(ISS_1), |
| timer: None, |
| mss: Mss(NonZeroU16::new(TEST_BYTES.len() as u16).unwrap()), |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_1, WindowSize::new(CAP).unwrap()), |
| }; |
| // Initially the entire buffer is advertised. |
| assert_eq!(rcv.select_window(), WindowSize::new(CAP).unwrap()); |
| |
| for _ in 0..MULTIPLE { |
| assert_eq!(rcv.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len()); |
| } |
| assert_eq!(rcv.assembler.insert(ISS_1..ISS_1 + CAP), CAP); |
| // Since the buffer is full, we now get a zero window. |
| assert_eq!(rcv.select_window(), WindowSize::ZERO); |
| |
| // The user reads 1 byte, but our implementation should not advertise |
| // a new window because it is too small; |
| assert_eq!(rcv.buffer.read_with(|_| 1), 1); |
| assert_eq!(rcv.select_window(), WindowSize::ZERO); |
| |
| // Now at least there is at least 1 MSS worth of free space in the |
| // buffer, advertise it. |
| assert_eq!(rcv.buffer.read_with(|_| TEST_BYTES.len()), TEST_BYTES.len()); |
| assert_eq!(rcv.select_window(), WindowSize::new(TEST_BYTES.len() + 1).unwrap()); |
| } |
| |
| #[test] |
| fn snd_silly_window_avoidance() { |
| const CAP: usize = TEST_BYTES.len() * 2; |
| let mut snd: Send<FakeInstant, RingBuffer, false> = Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::new(CAP).unwrap(), |
| wnd_scale: WindowScale::default(), |
| wnd_max: WindowSize::DEFAULT, |
| wl1: ISS_1, |
| wl2: ISS_2, |
| buffer: RingBuffer::new(CAP), |
| last_seq_ts: None, |
| rtt_estimator: Estimator::default(), |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(Mss(NonZeroU16::new( |
| TEST_BYTES.len() as u16, |
| ) |
| .unwrap())), |
| }; |
| |
| let mut clock = FakeInstantCtx::default(); |
| let counters = TcpCountersInner::default(); |
| |
| // We enqueue two copies of TEST_BYTES. |
| assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len()); |
| assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len()); |
| |
| // The first copy should be sent out since the receiver has the space. |
| assert_eq!( |
| snd.poll_send( |
| &counters, |
| ISS_2 + 1, |
| WindowSize::DEFAULT, |
| u32::MAX, |
| clock.now(), |
| &SocketOptions::default(), |
| ), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_2 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(TEST_BYTES), |
| )), |
| ); |
| |
| assert_eq!( |
| snd.process_ack( |
| &counters, |
| ISS_2 + 1, |
| ISS_1 + 1 + TEST_BYTES.len(), |
| UnscaledWindowSize::from(0), |
| true, |
| ISS_2 + 1, |
| WindowSize::DEFAULT, |
| clock.now(), |
| &KeepAlive::default(), |
| ), |
| (None, DataAcked::Yes) |
| ); |
| |
| // Now that we received a zero window, we should start probing for the |
| // window reopening. |
| assert_eq!( |
| snd.poll_send( |
| &counters, |
| ISS_2 + 1, |
| WindowSize::DEFAULT, |
| u32::MAX, |
| clock.now(), |
| &SocketOptions::default(), |
| ), |
| None |
| ); |
| |
| assert_eq!( |
| snd.timer, |
| Some(SendTimer::ZeroWindowProbe(RetransTimer::new( |
| clock.now(), |
| snd.rtt_estimator.rto(), |
| DEFAULT_USER_TIMEOUT, |
| DEFAULT_MAX_RETRIES, |
| ))) |
| ); |
| |
| clock.sleep(Duration::from_millis(100)); |
| |
| assert_eq!( |
| snd.poll_send( |
| &counters, |
| ISS_2 + 1, |
| WindowSize::DEFAULT, |
| u32::MAX, |
| clock.now(), |
| &SocketOptions::default(), |
| ), |
| Some(Segment::data( |
| ISS_1 + 1 + TEST_BYTES.len(), |
| ISS_2 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(&TEST_BYTES[..1]), |
| )) |
| ); |
| |
| // Now the receiver sends back a window update, but not enough for a |
| // full MSS. |
| assert_eq!( |
| snd.process_ack( |
| &counters, |
| ISS_2 + 1, |
| ISS_1 + 1 + TEST_BYTES.len() + 1, |
| UnscaledWindowSize::from(3), |
| true, |
| ISS_2 + 1, |
| WindowSize::DEFAULT, |
| clock.now(), |
| &KeepAlive::default(), |
| ), |
| (None, DataAcked::Yes) |
| ); |
| |
| // We would then transition into SWS avoidance. |
| assert_eq!( |
| snd.poll_send( |
| &counters, |
| ISS_2 + 1, |
| WindowSize::DEFAULT, |
| u32::MAX, |
| clock.now(), |
| &SocketOptions::default(), |
| ), |
| None, |
| ); |
| assert_eq!(snd.timer, Some(SendTimer::SWSProbe { at: clock.now().add(SWS_PROBE_TIMEOUT) })); |
| clock.sleep(SWS_PROBE_TIMEOUT); |
| |
| // After the overriding timeout, we should push out whatever the |
| // receiver is willing to receive. |
| assert_eq!( |
| snd.poll_send( |
| &counters, |
| ISS_2 + 1, |
| WindowSize::DEFAULT, |
| u32::MAX, |
| clock.now(), |
| &SocketOptions::default(), |
| ), |
| Some(Segment::data( |
| ISS_1 + 1 + TEST_BYTES.len() + 1, |
| ISS_2 + 1, |
| UnscaledWindowSize::from(u16::MAX), |
| SendPayload::Contiguous(&TEST_BYTES[1..4]), |
| )) |
| ); |
| } |
| |
| #[test] |
| // Regression test for https://fxbug.dev/334926865. |
| fn ack_uses_snd_max() { |
| let counters = TcpCountersInner::default(); |
| let mss = Mss(NonZeroU16::new(u16::try_from(TEST_BYTES.len()).unwrap()).unwrap()); |
| |
| let mut clock = FakeInstantCtx::default(); |
| let mut buffer = RingBuffer::new(BUFFER_SIZE); |
| assert_eq!(buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len()); |
| assert_eq!(buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len()); |
| |
| // This connection has the same send and receive seqnum space. |
| let mut state: State<_, _, _, ()> = State::Established(Established { |
| snd: Send { |
| nxt: ISS_1 + 1, |
| max: ISS_1 + 1, |
| una: ISS_1 + 1, |
| wnd: WindowSize::DEFAULT, |
| wnd_max: WindowSize::DEFAULT, |
| buffer, |
| wl1: ISS_1, |
| wl2: ISS_1, |
| last_seq_ts: None, |
| rtt_estimator: Estimator::NoSample, |
| timer: None, |
| congestion_control: CongestionControl::cubic_with_mss(mss), |
| wnd_scale: WindowScale::default(), |
| }, |
| rcv: Recv { |
| buffer: RingBuffer::new(BUFFER_SIZE), |
| assembler: Assembler::new(ISS_1 + 1), |
| timer: None, |
| mss, |
| wnd_scale: WindowScale::default(), |
| last_window_update: (ISS_1 + 1, WindowSize::new(BUFFER_SIZE).unwrap()), |
| }, |
| }); |
| |
| // Send the first two data segments. |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()), |
| SendPayload::Contiguous(TEST_BYTES), |
| )), |
| ); |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 1 + TEST_BYTES.len(), |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()), |
| SendPayload::Contiguous(TEST_BYTES), |
| )), |
| ); |
| |
| // Retransmit, now snd.nxt = TEST.BYTES.len() + 1. |
| clock.sleep(Estimator::RTO_INIT); |
| assert_eq!( |
| state.poll_send_with_default_options(u32::MAX, clock.now(), &counters), |
| Some(Segment::data( |
| ISS_1 + 1, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()), |
| SendPayload::Contiguous(TEST_BYTES), |
| )), |
| ); |
| |
| // the ACK sent should have seq = snd.max (2 * TEST_BYTES.len() + 1) to |
| // avoid getting stuck in an ACK cycle. |
| assert_eq!( |
| state.on_segment_with_default_options::<_, ClientlessBufferProvider>( |
| Segment::data( |
| ISS_1 + 1, |
| ISS_1 + 1, |
| UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()), |
| SendPayload::Contiguous(TEST_BYTES), |
| ), |
| clock.now(), |
| &counters, |
| ), |
| ( |
| Some(Segment::ack( |
| ISS_1 + 1 + 2 * TEST_BYTES.len(), |
| ISS_1 + 1 + TEST_BYTES.len(), |
| UnscaledWindowSize::from( |
| u16::try_from(BUFFER_SIZE - TEST_BYTES.len()).unwrap() |
| ), |
| )), |
| None, |
| ) |
| ); |
| } |
| } |