| // Copyright 2018 The gVisor Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package tcp |
| |
| import ( |
| "encoding/binary" |
| "math" |
| "time" |
| |
| "gvisor.dev/gvisor/pkg/rand" |
| "gvisor.dev/gvisor/pkg/sleep" |
| "gvisor.dev/gvisor/pkg/sync" |
| "gvisor.dev/gvisor/pkg/tcpip" |
| "gvisor.dev/gvisor/pkg/tcpip/buffer" |
| "gvisor.dev/gvisor/pkg/tcpip/hash/jenkins" |
| "gvisor.dev/gvisor/pkg/tcpip/header" |
| "gvisor.dev/gvisor/pkg/tcpip/seqnum" |
| "gvisor.dev/gvisor/pkg/tcpip/stack" |
| "gvisor.dev/gvisor/pkg/waiter" |
| ) |
| |
| // maxSegmentsPerWake is the maximum number of segments to process in the main |
| // protocol goroutine per wake-up. Yielding [after this number of segments are |
| // processed] allows other events to be processed as well (e.g., timeouts, |
| // resets, etc.). |
| const maxSegmentsPerWake = 100 |
| |
| type handshakeState int |
| |
| // The following are the possible states of the TCP connection during a 3-way |
| // handshake. A depiction of the states and transitions can be found in RFC 793, |
| // page 23. |
| const ( |
| handshakeSynSent handshakeState = iota |
| handshakeSynRcvd |
| handshakeCompleted |
| ) |
| |
| // The following are used to set up sleepers. |
| const ( |
| wakerForNotification = iota |
| wakerForNewSegment |
| wakerForResend |
| ) |
| |
| const ( |
| // Maximum space available for options. |
| maxOptionSize = 40 |
| ) |
| |
| // handshake holds the state used during a TCP 3-way handshake. |
| // |
| // NOTE: handshake.ep.mu is held during handshake processing. It is released if |
| // we are going to block and reacquired when we start processing an event. |
| type handshake struct { |
| ep *endpoint |
| state handshakeState |
| active bool |
| flags header.TCPFlags |
| ackNum seqnum.Value |
| |
| // iss is the initial send sequence number, as defined in RFC 793. |
| iss seqnum.Value |
| |
| // rcvWnd is the receive window, as defined in RFC 793. |
| rcvWnd seqnum.Size |
| |
| // sndWnd is the send window, as defined in RFC 793. |
| sndWnd seqnum.Size |
| |
| // mss is the maximum segment size received from the peer. |
| mss uint16 |
| |
| // sndWndScale is the send window scale, as defined in RFC 1323. A |
| // negative value means no scaling is supported by the peer. |
| sndWndScale int |
| |
| // rcvWndScale is the receive window scale, as defined in RFC 1323. |
| rcvWndScale int |
| |
| // startTime is the time at which the first SYN/SYN-ACK was sent. |
| startTime time.Time |
| |
| // deferAccept if non-zero will drop the final ACK for a passive |
| // handshake till an ACK segment with data is received or the timeout is |
| // hit. |
| deferAccept time.Duration |
| |
| // acked is true if the the final ACK for a 3-way handshake has |
| // been received. This is required to stop retransmitting the |
| // original SYN-ACK when deferAccept is enabled. |
| acked bool |
| |
| // sendSYNOpts is the cached values for the SYN options to be sent. |
| sendSYNOpts header.TCPSynOptions |
| } |
| |
| func (e *endpoint) newHandshake() *handshake { |
| h := &handshake{ |
| ep: e, |
| active: true, |
| rcvWnd: seqnum.Size(e.initialReceiveWindow()), |
| rcvWndScale: e.rcvWndScaleForHandshake(), |
| } |
| h.resetState() |
| // Store reference to handshake state in endpoint. |
| e.h = h |
| return h |
| } |
| |
| func (e *endpoint) newPassiveHandshake(isn, irs seqnum.Value, opts *header.TCPSynOptions, deferAccept time.Duration) *handshake { |
| h := e.newHandshake() |
| h.resetToSynRcvd(isn, irs, opts, deferAccept) |
| return h |
| } |
| |
| // FindWndScale determines the window scale to use for the given maximum window |
| // size. |
| func FindWndScale(wnd seqnum.Size) int { |
| if wnd < 0x10000 { |
| return 0 |
| } |
| |
| max := seqnum.Size(math.MaxUint16) |
| s := 0 |
| for wnd > max && s < header.MaxWndScale { |
| s++ |
| max <<= 1 |
| } |
| |
| return s |
| } |
| |
| // resetState resets the state of the handshake object such that it becomes |
| // ready for a new 3-way handshake. |
| func (h *handshake) resetState() { |
| b := make([]byte, 4) |
| if _, err := rand.Read(b); err != nil { |
| panic(err) |
| } |
| |
| h.state = handshakeSynSent |
| h.flags = header.TCPFlagSyn |
| h.ackNum = 0 |
| h.mss = 0 |
| h.iss = generateSecureISN(h.ep.ID, h.ep.stack.Seed()) |
| } |
| |
| // generateSecureISN generates a secure Initial Sequence number based on the |
| // recommendation here https://tools.ietf.org/html/rfc6528#page-3. |
| func generateSecureISN(id stack.TransportEndpointID, seed uint32) seqnum.Value { |
| isnHasher := jenkins.Sum32(seed) |
| isnHasher.Write([]byte(id.LocalAddress)) |
| isnHasher.Write([]byte(id.RemoteAddress)) |
| portBuf := make([]byte, 2) |
| binary.LittleEndian.PutUint16(portBuf, id.LocalPort) |
| isnHasher.Write(portBuf) |
| binary.LittleEndian.PutUint16(portBuf, id.RemotePort) |
| isnHasher.Write(portBuf) |
| // The time period here is 64ns. This is similar to what linux uses |
| // generate a sequence number that overlaps less than one |
| // time per MSL (2 minutes). |
| // |
| // A 64ns clock ticks 10^9/64 = 15625000) times in a second. |
| // To wrap the whole 32 bit space would require |
| // 2^32/1562500 ~ 274 seconds. |
| // |
| // Which sort of guarantees that we won't reuse the ISN for a new |
| // connection for the same tuple for at least 274s. |
| isn := isnHasher.Sum32() + uint32(time.Now().UnixNano()>>6) |
| return seqnum.Value(isn) |
| } |
| |
| // effectiveRcvWndScale returns the effective receive window scale to be used. |
| // If the peer doesn't support window scaling, the effective rcv wnd scale is |
| // zero; otherwise it's the value calculated based on the initial rcv wnd. |
| func (h *handshake) effectiveRcvWndScale() uint8 { |
| if h.sndWndScale < 0 { |
| return 0 |
| } |
| return uint8(h.rcvWndScale) |
| } |
| |
| // resetToSynRcvd resets the state of the handshake object to the SYN-RCVD |
| // state. |
| func (h *handshake) resetToSynRcvd(iss seqnum.Value, irs seqnum.Value, opts *header.TCPSynOptions, deferAccept time.Duration) { |
| h.active = false |
| h.state = handshakeSynRcvd |
| h.flags = header.TCPFlagSyn | header.TCPFlagAck |
| h.iss = iss |
| h.ackNum = irs + 1 |
| h.mss = opts.MSS |
| h.sndWndScale = opts.WS |
| h.deferAccept = deferAccept |
| h.ep.setEndpointState(StateSynRecv) |
| } |
| |
| // checkAck checks if the ACK number, if present, of a segment received during |
| // a TCP 3-way handshake is valid. If it's not, a RST segment is sent back in |
| // response. |
| func (h *handshake) checkAck(s *segment) bool { |
| if s.flagIsSet(header.TCPFlagAck) && s.ackNumber != h.iss+1 { |
| // RFC 793, page 36, states that a reset must be generated when |
| // the connection is in any non-synchronized state and an |
| // incoming segment acknowledges something not yet sent. The |
| // connection remains in the same state. |
| ack := s.sequenceNumber.Add(s.logicalLen()) |
| h.ep.sendRaw(buffer.VectorisedView{}, header.TCPFlagRst|header.TCPFlagAck, s.ackNumber, ack, 0) |
| return false |
| } |
| |
| return true |
| } |
| |
| // synSentState handles a segment received when the TCP 3-way handshake is in |
| // the SYN-SENT state. |
| func (h *handshake) synSentState(s *segment) tcpip.Error { |
| // RFC 793, page 37, states that in the SYN-SENT state, a reset is |
| // acceptable if the ack field acknowledges the SYN. |
| if s.flagIsSet(header.TCPFlagRst) { |
| if s.flagIsSet(header.TCPFlagAck) && s.ackNumber == h.iss+1 { |
| // RFC 793, page 67, states that "If the RST bit is set [and] If the ACK |
| // was acceptable then signal the user "error: connection reset", drop |
| // the segment, enter CLOSED state, delete TCB, and return." |
| h.ep.workerCleanup = true |
| // Although the RFC above calls out ECONNRESET, Linux actually returns |
| // ECONNREFUSED here so we do as well. |
| return &tcpip.ErrConnectionRefused{} |
| } |
| return nil |
| } |
| |
| if !h.checkAck(s) { |
| return nil |
| } |
| |
| // We are in the SYN-SENT state. We only care about segments that have |
| // the SYN flag. |
| if !s.flagIsSet(header.TCPFlagSyn) { |
| return nil |
| } |
| |
| // Parse the SYN options. |
| rcvSynOpts := parseSynSegmentOptions(s) |
| |
| // Remember if the Timestamp option was negotiated. |
| h.ep.maybeEnableTimestamp(&rcvSynOpts) |
| |
| // Remember if the SACKPermitted option was negotiated. |
| h.ep.maybeEnableSACKPermitted(&rcvSynOpts) |
| |
| // Remember the sequence we'll ack from now on. |
| h.ackNum = s.sequenceNumber + 1 |
| h.flags |= header.TCPFlagAck |
| h.mss = rcvSynOpts.MSS |
| h.sndWndScale = rcvSynOpts.WS |
| |
| // If this is a SYN ACK response, we only need to acknowledge the SYN |
| // and the handshake is completed. |
| if s.flagIsSet(header.TCPFlagAck) { |
| h.state = handshakeCompleted |
| |
| h.ep.transitionToStateEstablishedLocked(h) |
| |
| h.ep.sendRaw(buffer.VectorisedView{}, header.TCPFlagAck, h.iss+1, h.ackNum, h.rcvWnd>>h.effectiveRcvWndScale()) |
| return nil |
| } |
| |
| // A SYN segment was received, but no ACK in it. We acknowledge the SYN |
| // but resend our own SYN and wait for it to be acknowledged in the |
| // SYN-RCVD state. |
| h.state = handshakeSynRcvd |
| ttl := h.ep.ttl |
| amss := h.ep.amss |
| h.ep.setEndpointState(StateSynRecv) |
| synOpts := header.TCPSynOptions{ |
| WS: int(h.effectiveRcvWndScale()), |
| TS: rcvSynOpts.TS, |
| TSVal: h.ep.timestamp(), |
| TSEcr: h.ep.recentTimestamp(), |
| |
| // We only send SACKPermitted if the other side indicated it |
| // permits SACK. This is not explicitly defined in the RFC but |
| // this is the behaviour implemented by Linux. |
| SACKPermitted: rcvSynOpts.SACKPermitted, |
| MSS: amss, |
| } |
| if ttl == 0 { |
| ttl = h.ep.route.DefaultTTL() |
| } |
| h.ep.sendSynTCP(h.ep.route, tcpFields{ |
| id: h.ep.ID, |
| ttl: ttl, |
| tos: h.ep.sendTOS, |
| flags: h.flags, |
| seq: h.iss, |
| ack: h.ackNum, |
| rcvWnd: h.rcvWnd, |
| }, synOpts) |
| return nil |
| } |
| |
| // synRcvdState handles a segment received when the TCP 3-way handshake is in |
| // the SYN-RCVD state. |
| func (h *handshake) synRcvdState(s *segment) tcpip.Error { |
| if s.flagIsSet(header.TCPFlagRst) { |
| // RFC 793, page 37, states that in the SYN-RCVD state, a reset |
| // is acceptable if the sequence number is in the window. |
| if s.sequenceNumber.InWindow(h.ackNum, h.rcvWnd) { |
| return &tcpip.ErrConnectionRefused{} |
| } |
| return nil |
| } |
| |
| if !h.checkAck(s) { |
| return nil |
| } |
| |
| // RFC 793, Section 3.9, page 69, states that in the SYN-RCVD state, a |
| // sequence number outside of the window causes an ACK with the proper seq |
| // number and "After sending the acknowledgment, drop the unacceptable |
| // segment and return." |
| if !s.sequenceNumber.InWindow(h.ackNum, h.rcvWnd) { |
| if h.ep.allowOutOfWindowAck() { |
| h.ep.sendRaw(buffer.VectorisedView{}, header.TCPFlagAck, h.iss+1, h.ackNum, h.rcvWnd) |
| } |
| return nil |
| } |
| |
| if s.flagIsSet(header.TCPFlagSyn) && s.sequenceNumber != h.ackNum-1 { |
| // We received two SYN segments with different sequence |
| // numbers, so we reset this and restart the whole |
| // process, except that we don't reset the timer. |
| ack := s.sequenceNumber.Add(s.logicalLen()) |
| seq := seqnum.Value(0) |
| if s.flagIsSet(header.TCPFlagAck) { |
| seq = s.ackNumber |
| } |
| h.ep.sendRaw(buffer.VectorisedView{}, header.TCPFlagRst|header.TCPFlagAck, seq, ack, 0) |
| |
| if !h.active { |
| return &tcpip.ErrInvalidEndpointState{} |
| } |
| |
| h.resetState() |
| synOpts := header.TCPSynOptions{ |
| WS: h.rcvWndScale, |
| TS: h.ep.sendTSOk, |
| TSVal: h.ep.timestamp(), |
| TSEcr: h.ep.recentTimestamp(), |
| SACKPermitted: h.ep.sackPermitted, |
| MSS: h.ep.amss, |
| } |
| h.ep.sendSynTCP(h.ep.route, tcpFields{ |
| id: h.ep.ID, |
| ttl: h.ep.ttl, |
| tos: h.ep.sendTOS, |
| flags: h.flags, |
| seq: h.iss, |
| ack: h.ackNum, |
| rcvWnd: h.rcvWnd, |
| }, synOpts) |
| return nil |
| } |
| |
| // We have previously received (and acknowledged) the peer's SYN. If the |
| // peer acknowledges our SYN, the handshake is completed. |
| if s.flagIsSet(header.TCPFlagAck) { |
| // If deferAccept is not zero and this is a bare ACK and the |
| // timeout is not hit then drop the ACK. |
| if h.deferAccept != 0 && s.data.Size() == 0 && time.Since(h.startTime) < h.deferAccept { |
| h.acked = true |
| h.ep.stack.Stats().DroppedPackets.Increment() |
| return nil |
| } |
| |
| // If the timestamp option is negotiated and the segment does |
| // not carry a timestamp option then the segment must be dropped |
| // as per https://tools.ietf.org/html/rfc7323#section-3.2. |
| if h.ep.sendTSOk && !s.parsedOptions.TS { |
| h.ep.stack.Stats().DroppedPackets.Increment() |
| return nil |
| } |
| |
| // Update timestamp if required. See RFC7323, section-4.3. |
| if h.ep.sendTSOk && s.parsedOptions.TS { |
| h.ep.updateRecentTimestamp(s.parsedOptions.TSVal, h.ackNum, s.sequenceNumber) |
| } |
| h.state = handshakeCompleted |
| |
| h.ep.transitionToStateEstablishedLocked(h) |
| |
| // If the segment has data then requeue it for the receiver |
| // to process it again once main loop is started. |
| if s.data.Size() > 0 { |
| s.incRef() |
| h.ep.enqueueSegment(s) |
| } |
| return nil |
| } |
| |
| return nil |
| } |
| |
| func (h *handshake) handleSegment(s *segment) tcpip.Error { |
| h.sndWnd = s.window |
| if !s.flagIsSet(header.TCPFlagSyn) && h.sndWndScale > 0 { |
| h.sndWnd <<= uint8(h.sndWndScale) |
| } |
| |
| switch h.state { |
| case handshakeSynRcvd: |
| return h.synRcvdState(s) |
| case handshakeSynSent: |
| return h.synSentState(s) |
| } |
| return nil |
| } |
| |
| // processSegments goes through the segment queue and processes up to |
| // maxSegmentsPerWake (if they're available). |
| func (h *handshake) processSegments() tcpip.Error { |
| for i := 0; i < maxSegmentsPerWake; i++ { |
| s := h.ep.segmentQueue.dequeue() |
| if s == nil { |
| return nil |
| } |
| |
| err := h.handleSegment(s) |
| s.decRef() |
| if err != nil { |
| return err |
| } |
| |
| // We stop processing packets once the handshake is completed, |
| // otherwise we may process packets meant to be processed by |
| // the main protocol goroutine. |
| if h.state == handshakeCompleted { |
| break |
| } |
| } |
| |
| // If the queue is not empty, make sure we'll wake up in the next |
| // iteration. |
| if !h.ep.segmentQueue.empty() { |
| h.ep.newSegmentWaker.Assert() |
| } |
| |
| return nil |
| } |
| |
| // start sends the first SYN/SYN-ACK. It does not block, even if link address |
| // resolution is required. |
| func (h *handshake) start() { |
| h.startTime = time.Now() |
| h.ep.amss = calculateAdvertisedMSS(h.ep.userMSS, h.ep.route) |
| var sackEnabled tcpip.TCPSACKEnabled |
| if err := h.ep.stack.TransportProtocolOption(ProtocolNumber, &sackEnabled); err != nil { |
| // If stack returned an error when checking for SACKEnabled |
| // status then just default to switching off SACK negotiation. |
| sackEnabled = false |
| } |
| |
| synOpts := header.TCPSynOptions{ |
| WS: h.rcvWndScale, |
| TS: true, |
| TSVal: h.ep.timestamp(), |
| TSEcr: h.ep.recentTimestamp(), |
| SACKPermitted: bool(sackEnabled), |
| MSS: h.ep.amss, |
| } |
| |
| // start() is also called in a listen context so we want to make sure we only |
| // send the TS/SACK option when we received the TS/SACK in the initial SYN. |
| if h.state == handshakeSynRcvd { |
| synOpts.TS = h.ep.sendTSOk |
| synOpts.SACKPermitted = h.ep.sackPermitted && bool(sackEnabled) |
| if h.sndWndScale < 0 { |
| // Disable window scaling if the peer did not send us |
| // the window scaling option. |
| synOpts.WS = -1 |
| } |
| } |
| |
| h.sendSYNOpts = synOpts |
| h.ep.sendSynTCP(h.ep.route, tcpFields{ |
| id: h.ep.ID, |
| ttl: h.ep.ttl, |
| tos: h.ep.sendTOS, |
| flags: h.flags, |
| seq: h.iss, |
| ack: h.ackNum, |
| rcvWnd: h.rcvWnd, |
| }, synOpts) |
| } |
| |
| // complete completes the TCP 3-way handshake initiated by h.start(). |
| func (h *handshake) complete() tcpip.Error { |
| // Set up the wakers. |
| var s sleep.Sleeper |
| resendWaker := sleep.Waker{} |
| s.AddWaker(&resendWaker, wakerForResend) |
| s.AddWaker(&h.ep.notificationWaker, wakerForNotification) |
| s.AddWaker(&h.ep.newSegmentWaker, wakerForNewSegment) |
| defer s.Done() |
| |
| // Initialize the resend timer. |
| timer, err := newBackoffTimer(time.Second, MaxRTO, resendWaker.Assert) |
| if err != nil { |
| return err |
| } |
| defer timer.stop() |
| for h.state != handshakeCompleted { |
| // Unlock before blocking, and reacquire again afterwards (h.ep.mu is held |
| // throughout handshake processing). |
| h.ep.mu.Unlock() |
| index, _ := s.Fetch(true /* block */) |
| h.ep.mu.Lock() |
| switch index { |
| |
| case wakerForResend: |
| if err := timer.reset(); err != nil { |
| return err |
| } |
| // Resend the SYN/SYN-ACK only if the following conditions hold. |
| // - It's an active handshake (deferAccept does not apply) |
| // - It's a passive handshake and we have not yet got the final-ACK. |
| // - It's a passive handshake and we got an ACK but deferAccept is |
| // enabled and we are now past the deferAccept duration. |
| // The last is required to provide a way for the peer to complete |
| // the connection with another ACK or data (as ACKs are never |
| // retransmitted on their own). |
| if h.active || !h.acked || h.deferAccept != 0 && time.Since(h.startTime) > h.deferAccept { |
| h.ep.sendSynTCP(h.ep.route, tcpFields{ |
| id: h.ep.ID, |
| ttl: h.ep.ttl, |
| tos: h.ep.sendTOS, |
| flags: h.flags, |
| seq: h.iss, |
| ack: h.ackNum, |
| rcvWnd: h.rcvWnd, |
| }, h.sendSYNOpts) |
| } |
| |
| case wakerForNotification: |
| n := h.ep.fetchNotifications() |
| if (n¬ifyClose)|(n¬ifyAbort) != 0 { |
| return &tcpip.ErrAborted{} |
| } |
| if n¬ifyDrain != 0 { |
| for !h.ep.segmentQueue.empty() { |
| s := h.ep.segmentQueue.dequeue() |
| err := h.handleSegment(s) |
| s.decRef() |
| if err != nil { |
| return err |
| } |
| if h.state == handshakeCompleted { |
| return nil |
| } |
| } |
| close(h.ep.drainDone) |
| h.ep.mu.Unlock() |
| <-h.ep.undrain |
| h.ep.mu.Lock() |
| } |
| if n¬ifyError != 0 { |
| return h.ep.lastErrorLocked() |
| } |
| case wakerForNewSegment: |
| if err := h.processSegments(); err != nil { |
| return err |
| } |
| } |
| } |
| |
| return nil |
| } |
| |
| type backoffTimer struct { |
| timeout time.Duration |
| maxTimeout time.Duration |
| t *time.Timer |
| } |
| |
| func newBackoffTimer(timeout, maxTimeout time.Duration, f func()) (*backoffTimer, tcpip.Error) { |
| if timeout > maxTimeout { |
| return nil, &tcpip.ErrTimeout{} |
| } |
| bt := &backoffTimer{timeout: timeout, maxTimeout: maxTimeout} |
| bt.t = time.AfterFunc(timeout, f) |
| return bt, nil |
| } |
| |
| func (bt *backoffTimer) reset() tcpip.Error { |
| bt.timeout *= 2 |
| if bt.timeout > bt.maxTimeout { |
| return &tcpip.ErrTimeout{} |
| } |
| bt.t.Reset(bt.timeout) |
| return nil |
| } |
| |
| func (bt *backoffTimer) stop() { |
| bt.t.Stop() |
| } |
| |
| func parseSynSegmentOptions(s *segment) header.TCPSynOptions { |
| synOpts := header.ParseSynOptions(s.options, s.flagIsSet(header.TCPFlagAck)) |
| if synOpts.TS { |
| s.parsedOptions.TSVal = synOpts.TSVal |
| s.parsedOptions.TSEcr = synOpts.TSEcr |
| } |
| return synOpts |
| } |
| |
| var optionPool = sync.Pool{ |
| New: func() interface{} { |
| return &[maxOptionSize]byte{} |
| }, |
| } |
| |
| func getOptions() []byte { |
| return (*optionPool.Get().(*[maxOptionSize]byte))[:] |
| } |
| |
| func putOptions(options []byte) { |
| // Reslice to full capacity. |
| optionPool.Put(optionsToArray(options)) |
| } |
| |
| func makeSynOptions(opts header.TCPSynOptions) []byte { |
| // Emulate linux option order. This is as follows: |
| // |
| // if md5: NOP NOP MD5SIG 18 md5sig(16) |
| // if mss: MSS 4 mss(2) |
| // if ts and sack_advertise: |
| // SACK 2 TIMESTAMP 2 timestamp(8) |
| // elif ts: NOP NOP TIMESTAMP 10 timestamp(8) |
| // elif sack: NOP NOP SACK 2 |
| // if wscale: NOP WINDOW 3 ws(1) |
| // if sack_blocks: NOP NOP SACK ((2 + (#blocks * 8)) |
| // [for each block] start_seq(4) end_seq(4) |
| // if fastopen_cookie: |
| // if exp: EXP (4 + len(cookie)) FASTOPEN_MAGIC(2) |
| // else: FASTOPEN (2 + len(cookie)) |
| // cookie(variable) [padding to four bytes] |
| // |
| options := getOptions() |
| |
| // Always encode the mss. |
| offset := header.EncodeMSSOption(uint32(opts.MSS), options) |
| |
| // Special ordering is required here. If both TS and SACK are enabled, |
| // then the SACK option precedes TS, with no padding. If they are |
| // enabled individually, then we see padding before the option. |
| if opts.TS && opts.SACKPermitted { |
| offset += header.EncodeSACKPermittedOption(options[offset:]) |
| offset += header.EncodeTSOption(opts.TSVal, opts.TSEcr, options[offset:]) |
| } else if opts.TS { |
| offset += header.EncodeNOP(options[offset:]) |
| offset += header.EncodeNOP(options[offset:]) |
| offset += header.EncodeTSOption(opts.TSVal, opts.TSEcr, options[offset:]) |
| } else if opts.SACKPermitted { |
| offset += header.EncodeNOP(options[offset:]) |
| offset += header.EncodeNOP(options[offset:]) |
| offset += header.EncodeSACKPermittedOption(options[offset:]) |
| } |
| |
| // Initialize the WS option. |
| if opts.WS >= 0 { |
| offset += header.EncodeNOP(options[offset:]) |
| offset += header.EncodeWSOption(opts.WS, options[offset:]) |
| } |
| |
| // Padding to the end; note that this never apply unless we add a |
| // fastopen option, we always expect the offset to remain the same. |
| if delta := header.AddTCPOptionPadding(options, offset); delta != 0 { |
| panic("unexpected option encoding") |
| } |
| |
| return options[:offset] |
| } |
| |
| // tcpFields is a struct to carry different parameters required by the |
| // send*TCP variant functions below. |
| type tcpFields struct { |
| id stack.TransportEndpointID |
| ttl uint8 |
| tos uint8 |
| flags header.TCPFlags |
| seq seqnum.Value |
| ack seqnum.Value |
| rcvWnd seqnum.Size |
| opts []byte |
| txHash uint32 |
| } |
| |
| func (e *endpoint) sendSynTCP(r *stack.Route, tf tcpFields, opts header.TCPSynOptions) tcpip.Error { |
| tf.opts = makeSynOptions(opts) |
| // We ignore SYN send errors and let the callers re-attempt send. |
| if err := e.sendTCP(r, tf, buffer.VectorisedView{}, nil); err != nil { |
| e.stats.SendErrors.SynSendToNetworkFailed.Increment() |
| } |
| putOptions(tf.opts) |
| return nil |
| } |
| |
| func (e *endpoint) sendTCP(r *stack.Route, tf tcpFields, data buffer.VectorisedView, gso *stack.GSO) tcpip.Error { |
| tf.txHash = e.txHash |
| if err := sendTCP(r, tf, data, gso, e.owner); err != nil { |
| e.stats.SendErrors.SegmentSendToNetworkFailed.Increment() |
| return err |
| } |
| e.stats.SegmentsSent.Increment() |
| return nil |
| } |
| |
| func buildTCPHdr(r *stack.Route, tf tcpFields, pkt *stack.PacketBuffer, gso *stack.GSO) { |
| optLen := len(tf.opts) |
| tcp := header.TCP(pkt.TransportHeader().Push(header.TCPMinimumSize + optLen)) |
| pkt.TransportProtocolNumber = header.TCPProtocolNumber |
| tcp.Encode(&header.TCPFields{ |
| SrcPort: tf.id.LocalPort, |
| DstPort: tf.id.RemotePort, |
| SeqNum: uint32(tf.seq), |
| AckNum: uint32(tf.ack), |
| DataOffset: uint8(header.TCPMinimumSize + optLen), |
| Flags: tf.flags, |
| WindowSize: uint16(tf.rcvWnd), |
| }) |
| copy(tcp[header.TCPMinimumSize:], tf.opts) |
| |
| xsum := r.PseudoHeaderChecksum(ProtocolNumber, uint16(pkt.Size())) |
| // Only calculate the checksum if offloading isn't supported. |
| if gso != nil && gso.NeedsCsum { |
| // This is called CHECKSUM_PARTIAL in the Linux kernel. We |
| // calculate a checksum of the pseudo-header and save it in the |
| // TCP header, then the kernel calculate a checksum of the |
| // header and data and get the right sum of the TCP packet. |
| tcp.SetChecksum(xsum) |
| } else if r.RequiresTXTransportChecksum() { |
| xsum = header.ChecksumCombine(xsum, pkt.Data().AsRange().Checksum()) |
| tcp.SetChecksum(^tcp.CalculateChecksum(xsum)) |
| } |
| } |
| |
| func sendTCPBatch(r *stack.Route, tf tcpFields, data buffer.VectorisedView, gso *stack.GSO, owner tcpip.PacketOwner) tcpip.Error { |
| // We need to shallow clone the VectorisedView here as ReadToView will |
| // split the VectorisedView and Trim underlying views as it splits. Not |
| // doing the clone here will cause the underlying views of data itself |
| // to be altered. |
| data = data.Clone(nil) |
| |
| optLen := len(tf.opts) |
| if tf.rcvWnd > math.MaxUint16 { |
| tf.rcvWnd = math.MaxUint16 |
| } |
| |
| mss := int(gso.MSS) |
| n := (data.Size() + mss - 1) / mss |
| |
| size := data.Size() |
| hdrSize := header.TCPMinimumSize + int(r.MaxHeaderLength()) + optLen |
| var pkts stack.PacketBufferList |
| for i := 0; i < n; i++ { |
| packetSize := mss |
| if packetSize > size { |
| packetSize = size |
| } |
| size -= packetSize |
| pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ |
| ReserveHeaderBytes: hdrSize, |
| }) |
| pkt.Hash = tf.txHash |
| pkt.Owner = owner |
| pkt.Data().ReadFromVV(&data, packetSize) |
| buildTCPHdr(r, tf, pkt, gso) |
| tf.seq = tf.seq.Add(seqnum.Size(packetSize)) |
| pkts.PushBack(pkt) |
| } |
| |
| if tf.ttl == 0 { |
| tf.ttl = r.DefaultTTL() |
| } |
| sent, err := r.WritePackets(gso, pkts, stack.NetworkHeaderParams{Protocol: ProtocolNumber, TTL: tf.ttl, TOS: tf.tos}) |
| if err != nil { |
| r.Stats().TCP.SegmentSendErrors.IncrementBy(uint64(n - sent)) |
| } |
| r.Stats().TCP.SegmentsSent.IncrementBy(uint64(sent)) |
| return err |
| } |
| |
| // sendTCP sends a TCP segment with the provided options via the provided |
| // network endpoint and under the provided identity. |
| func sendTCP(r *stack.Route, tf tcpFields, data buffer.VectorisedView, gso *stack.GSO, owner tcpip.PacketOwner) tcpip.Error { |
| optLen := len(tf.opts) |
| if tf.rcvWnd > math.MaxUint16 { |
| tf.rcvWnd = math.MaxUint16 |
| } |
| |
| if r.Loop&stack.PacketLoop == 0 && gso != nil && gso.Type == stack.GSOSW && int(gso.MSS) < data.Size() { |
| return sendTCPBatch(r, tf, data, gso, owner) |
| } |
| |
| pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ |
| ReserveHeaderBytes: header.TCPMinimumSize + int(r.MaxHeaderLength()) + optLen, |
| Data: data, |
| }) |
| pkt.Hash = tf.txHash |
| pkt.Owner = owner |
| buildTCPHdr(r, tf, pkt, gso) |
| |
| if tf.ttl == 0 { |
| tf.ttl = r.DefaultTTL() |
| } |
| if err := r.WritePacket(gso, stack.NetworkHeaderParams{Protocol: ProtocolNumber, TTL: tf.ttl, TOS: tf.tos}, pkt); err != nil { |
| r.Stats().TCP.SegmentSendErrors.Increment() |
| return err |
| } |
| r.Stats().TCP.SegmentsSent.Increment() |
| if (tf.flags & header.TCPFlagRst) != 0 { |
| r.Stats().TCP.ResetsSent.Increment() |
| } |
| return nil |
| } |
| |
| // makeOptions makes an options slice. |
| func (e *endpoint) makeOptions(sackBlocks []header.SACKBlock) []byte { |
| options := getOptions() |
| offset := 0 |
| |
| // N.B. the ordering here matches the ordering used by Linux internally |
| // and described in the raw makeOptions function. We don't include |
| // unnecessary cases here (post connection.) |
| if e.sendTSOk { |
| // Embed the timestamp if timestamp has been enabled. |
| // |
| // We only use the lower 32 bits of the unix time in |
| // milliseconds. This is similar to what Linux does where it |
| // uses the lower 32 bits of the jiffies value in the tsVal |
| // field of the timestamp option. |
| // |
| // Further, RFC7323 section-5.4 recommends millisecond |
| // resolution as the lowest recommended resolution for the |
| // timestamp clock. |
| // |
| // Ref: https://tools.ietf.org/html/rfc7323#section-5.4. |
| offset += header.EncodeNOP(options[offset:]) |
| offset += header.EncodeNOP(options[offset:]) |
| offset += header.EncodeTSOption(e.timestamp(), e.recentTimestamp(), options[offset:]) |
| } |
| if e.sackPermitted && len(sackBlocks) > 0 { |
| offset += header.EncodeNOP(options[offset:]) |
| offset += header.EncodeNOP(options[offset:]) |
| offset += header.EncodeSACKBlocks(sackBlocks, options[offset:]) |
| } |
| |
| // We expect the above to produce an aligned offset. |
| if delta := header.AddTCPOptionPadding(options, offset); delta != 0 { |
| panic("unexpected option encoding") |
| } |
| |
| return options[:offset] |
| } |
| |
| // sendRaw sends a TCP segment to the endpoint's peer. |
| func (e *endpoint) sendRaw(data buffer.VectorisedView, flags header.TCPFlags, seq, ack seqnum.Value, rcvWnd seqnum.Size) tcpip.Error { |
| var sackBlocks []header.SACKBlock |
| if e.EndpointState() == StateEstablished && e.rcv.pendingRcvdSegments.Len() > 0 && (flags&header.TCPFlagAck != 0) { |
| sackBlocks = e.sack.Blocks[:e.sack.NumBlocks] |
| } |
| options := e.makeOptions(sackBlocks) |
| err := e.sendTCP(e.route, tcpFields{ |
| id: e.ID, |
| ttl: e.ttl, |
| tos: e.sendTOS, |
| flags: flags, |
| seq: seq, |
| ack: ack, |
| rcvWnd: rcvWnd, |
| opts: options, |
| }, data, e.gso) |
| putOptions(options) |
| return err |
| } |
| |
| func (e *endpoint) handleWrite() { |
| e.sndBufMu.Lock() |
| next := e.drainSendQueueLocked() |
| e.sndBufMu.Unlock() |
| |
| e.sendData(next) |
| } |
| |
| // Move packets from send queue to send list. |
| // |
| // Precondition: e.sndBufMu must be locked. |
| func (e *endpoint) drainSendQueueLocked() *segment { |
| first := e.sndQueue.Front() |
| if first != nil { |
| e.snd.writeList.PushBackList(&e.sndQueue) |
| e.sndBufInQueue = 0 |
| } |
| return first |
| } |
| |
| // Precondition: e.mu must be locked. |
| func (e *endpoint) sendData(next *segment) { |
| // Initialize the next segment to write if it's currently nil. |
| if e.snd.writeNext == nil { |
| e.snd.writeNext = next |
| } |
| |
| // Push out any new packets. |
| e.snd.sendData() |
| } |
| |
| func (e *endpoint) handleClose() { |
| if !e.EndpointState().connected() { |
| return |
| } |
| // Drain the send queue. |
| e.handleWrite() |
| |
| // Mark send side as closed. |
| e.snd.closed = true |
| } |
| |
| // resetConnectionLocked puts the endpoint in an error state with the given |
| // error code and sends a RST if and only if the error is not ErrConnectionReset |
| // indicating that the connection is being reset due to receiving a RST. This |
| // method must only be called from the protocol goroutine. |
| func (e *endpoint) resetConnectionLocked(err tcpip.Error) { |
| // Only send a reset if the connection is being aborted for a reason |
| // other than receiving a reset. |
| e.setEndpointState(StateError) |
| e.hardError = err |
| switch err.(type) { |
| case *tcpip.ErrConnectionReset, *tcpip.ErrTimeout: |
| default: |
| // The exact sequence number to be used for the RST is the same as the |
| // one used by Linux. We need to handle the case of window being shrunk |
| // which can cause sndNxt to be outside the acceptable window on the |
| // receiver. |
| // |
| // See: https://www.snellman.net/blog/archive/2016-02-01-tcp-rst/ for more |
| // information. |
| sndWndEnd := e.snd.sndUna.Add(e.snd.sndWnd) |
| resetSeqNum := sndWndEnd |
| if !sndWndEnd.LessThan(e.snd.sndNxt) || e.snd.sndNxt.Size(sndWndEnd) < (1<<e.snd.sndWndScale) { |
| resetSeqNum = e.snd.sndNxt |
| } |
| e.sendRaw(buffer.VectorisedView{}, header.TCPFlagAck|header.TCPFlagRst, resetSeqNum, e.rcv.rcvNxt, 0) |
| } |
| } |
| |
| // completeWorkerLocked is called by the worker goroutine when it's about to |
| // exit. |
| func (e *endpoint) completeWorkerLocked() { |
| // Worker is terminating(either due to moving to |
| // CLOSED or ERROR state, ensure we release all |
| // registrations port reservations even if the socket |
| // itself is not yet closed by the application. |
| e.workerRunning = false |
| if e.workerCleanup { |
| e.cleanupLocked() |
| } |
| } |
| |
| // transitionToStateEstablisedLocked transitions a given endpoint |
| // to an established state using the handshake parameters provided. |
| // It also initializes sender/receiver. |
| func (e *endpoint) transitionToStateEstablishedLocked(h *handshake) { |
| // Transfer handshake state to TCP connection. We disable |
| // receive window scaling if the peer doesn't support it |
| // (indicated by a negative send window scale). |
| e.snd = newSender(e, h.iss, h.ackNum-1, h.sndWnd, h.mss, h.sndWndScale) |
| |
| e.rcvListMu.Lock() |
| e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale()) |
| // Bootstrap the auto tuning algorithm. Starting at zero will |
| // result in a really large receive window after the first auto |
| // tuning adjustment. |
| e.rcvAutoParams.prevCopied = int(h.rcvWnd) |
| e.rcvListMu.Unlock() |
| |
| e.setEndpointState(StateEstablished) |
| } |
| |
| // transitionToStateCloseLocked ensures that the endpoint is |
| // cleaned up from the transport demuxer, "before" moving to |
| // StateClose. This will ensure that no packet will be |
| // delivered to this endpoint from the demuxer when the endpoint |
| // is transitioned to StateClose. |
| func (e *endpoint) transitionToStateCloseLocked() { |
| s := e.EndpointState() |
| if s == StateClose { |
| return |
| } |
| |
| if s.connected() { |
| e.stack.Stats().TCP.CurrentConnected.Decrement() |
| e.stack.Stats().TCP.EstablishedClosed.Increment() |
| } |
| |
| // Mark the endpoint as fully closed for reads/writes. |
| e.cleanupLocked() |
| e.setEndpointState(StateClose) |
| } |
| |
| // tryDeliverSegmentFromClosedEndpoint attempts to deliver the parsed |
| // segment to any other endpoint other than the current one. This is called |
| // only when the endpoint is in StateClose and we want to deliver the segment |
| // to any other listening endpoint. We reply with RST if we cannot find one. |
| func (e *endpoint) tryDeliverSegmentFromClosedEndpoint(s *segment) { |
| ep := e.stack.FindTransportEndpoint(e.NetProto, e.TransProto, e.ID, s.nicID) |
| if ep == nil && e.NetProto == header.IPv6ProtocolNumber && e.TransportEndpointInfo.ID.LocalAddress.To4() != "" { |
| // Dual-stack socket, try IPv4. |
| ep = e.stack.FindTransportEndpoint(header.IPv4ProtocolNumber, e.TransProto, e.ID, s.nicID) |
| } |
| if ep == nil { |
| replyWithReset(e.stack, s, stack.DefaultTOS, 0 /* ttl */) |
| s.decRef() |
| return |
| } |
| |
| if e == ep { |
| panic("current endpoint not removed from demuxer, enqueing segments to itself") |
| } |
| |
| if ep := ep.(*endpoint); ep.enqueueSegment(s) { |
| ep.newSegmentWaker.Assert() |
| } |
| } |
| |
| // Drain segment queue from the endpoint and try to re-match the segment to a |
| // different endpoint. This is used when the current endpoint is transitioned to |
| // StateClose and has been unregistered from the transport demuxer. |
| func (e *endpoint) drainClosingSegmentQueue() { |
| for { |
| s := e.segmentQueue.dequeue() |
| if s == nil { |
| break |
| } |
| |
| e.tryDeliverSegmentFromClosedEndpoint(s) |
| } |
| } |
| |
| func (e *endpoint) handleReset(s *segment) (ok bool, err tcpip.Error) { |
| if e.rcv.acceptable(s.sequenceNumber, 0) { |
| // RFC 793, page 37 states that "in all states |
| // except SYN-SENT, all reset (RST) segments are |
| // validated by checking their SEQ-fields." So |
| // we only process it if it's acceptable. |
| switch e.EndpointState() { |
| // In case of a RST in CLOSE-WAIT linux moves |
| // the socket to closed state with an error set |
| // to indicate EPIPE. |
| // |
| // Technically this seems to be at odds w/ RFC. |
| // As per https://tools.ietf.org/html/rfc793#section-2.7 |
| // page 69 the behavior for a segment arriving |
| // w/ RST bit set in CLOSE-WAIT is inlined below. |
| // |
| // ESTABLISHED |
| // FIN-WAIT-1 |
| // FIN-WAIT-2 |
| // CLOSE-WAIT |
| |
| // If the RST bit is set then, any outstanding RECEIVEs and |
| // SEND should receive "reset" responses. All segment queues |
| // should be flushed. Users should also receive an unsolicited |
| // general "connection reset" signal. Enter the CLOSED state, |
| // delete the TCB, and return. |
| case StateCloseWait: |
| e.transitionToStateCloseLocked() |
| e.hardError = &tcpip.ErrAborted{} |
| e.notifyProtocolGoroutine(notifyTickleWorker) |
| return false, nil |
| default: |
| // RFC 793, page 37 states that "in all states |
| // except SYN-SENT, all reset (RST) segments are |
| // validated by checking their SEQ-fields." So |
| // we only process it if it's acceptable. |
| |
| // Notify protocol goroutine. This is required when |
| // handleSegment is invoked from the processor goroutine |
| // rather than the worker goroutine. |
| e.notifyProtocolGoroutine(notifyResetByPeer) |
| return false, &tcpip.ErrConnectionReset{} |
| } |
| } |
| return true, nil |
| } |
| |
| // handleSegments processes all inbound segments. |
| func (e *endpoint) handleSegments(fastPath bool) tcpip.Error { |
| checkRequeue := true |
| for i := 0; i < maxSegmentsPerWake; i++ { |
| if e.EndpointState().closed() { |
| return nil |
| } |
| s := e.segmentQueue.dequeue() |
| if s == nil { |
| checkRequeue = false |
| break |
| } |
| |
| cont, err := e.handleSegment(s) |
| s.decRef() |
| if err != nil { |
| return err |
| } |
| if !cont { |
| return nil |
| } |
| } |
| |
| // When fastPath is true we don't want to wake up the worker |
| // goroutine. If the endpoint has more segments to process the |
| // dispatcher will call handleSegments again anyway. |
| if !fastPath && checkRequeue && !e.segmentQueue.empty() { |
| e.newSegmentWaker.Assert() |
| } |
| |
| // Send an ACK for all processed packets if needed. |
| if e.rcv.rcvNxt != e.snd.maxSentAck { |
| e.snd.sendAck() |
| } |
| |
| e.resetKeepaliveTimer(true /* receivedData */) |
| |
| return nil |
| } |
| |
| func (e *endpoint) probeSegment() { |
| if e.probe != nil { |
| e.probe(e.completeState()) |
| } |
| } |
| |
| // handleSegment handles a given segment and notifies the worker goroutine if |
| // if the connection should be terminated. |
| func (e *endpoint) handleSegment(s *segment) (cont bool, err tcpip.Error) { |
| // Invoke the tcp probe if installed. The tcp probe function will update |
| // the TCPEndpointState after the segment is processed. |
| defer e.probeSegment() |
| |
| if s.flagIsSet(header.TCPFlagRst) { |
| if ok, err := e.handleReset(s); !ok { |
| return false, err |
| } |
| } else if s.flagIsSet(header.TCPFlagSyn) { |
| // See: https://tools.ietf.org/html/rfc5961#section-4.1 |
| // 1) If the SYN bit is set, irrespective of the sequence number, TCP |
| // MUST send an ACK (also referred to as challenge ACK) to the remote |
| // peer: |
| // |
| // <SEQ=SND.NXT><ACK=RCV.NXT><CTL=ACK> |
| // |
| // After sending the acknowledgment, TCP MUST drop the unacceptable |
| // segment and stop processing further. |
| // |
| // By sending an ACK, the remote peer is challenged to confirm the loss |
| // of the previous connection and the request to start a new connection. |
| // A legitimate peer, after restart, would not have a TCB in the |
| // synchronized state. Thus, when the ACK arrives, the peer should send |
| // a RST segment back with the sequence number derived from the ACK |
| // field that caused the RST. |
| |
| // This RST will confirm that the remote peer has indeed closed the |
| // previous connection. Upon receipt of a valid RST, the local TCP |
| // endpoint MUST terminate its connection. The local TCP endpoint |
| // should then rely on SYN retransmission from the remote end to |
| // re-establish the connection. |
| e.snd.maybeSendOutOfWindowAck(s) |
| } else if s.flagIsSet(header.TCPFlagAck) { |
| // Patch the window size in the segment according to the |
| // send window scale. |
| s.window <<= e.snd.sndWndScale |
| |
| // RFC 793, page 41 states that "once in the ESTABLISHED |
| // state all segments must carry current acknowledgment |
| // information." |
| drop, err := e.rcv.handleRcvdSegment(s) |
| if err != nil { |
| return false, err |
| } |
| if drop { |
| return true, nil |
| } |
| |
| // Now check if the received segment has caused us to transition |
| // to a CLOSED state, if yes then terminate processing and do |
| // not invoke the sender. |
| state := e.state |
| if state == StateClose { |
| // When we get into StateClose while processing from the queue, |
| // return immediately and let the protocolMainloop handle it. |
| // |
| // We can reach StateClose only while processing a previous segment |
| // or a notification from the protocolMainLoop (caller goroutine). |
| // This means that with this return, the segment dequeue below can |
| // never occur on a closed endpoint. |
| return false, nil |
| } |
| |
| e.snd.handleRcvdSegment(s) |
| } |
| |
| return true, nil |
| } |
| |
| // keepaliveTimerExpired is called when the keepaliveTimer fires. We send TCP |
| // keepalive packets periodically when the connection is idle. If we don't hear |
| // from the other side after a number of tries, we terminate the connection. |
| func (e *endpoint) keepaliveTimerExpired() tcpip.Error { |
| userTimeout := e.userTimeout |
| |
| e.keepalive.Lock() |
| if !e.SocketOptions().GetKeepAlive() || !e.keepalive.timer.checkExpiration() { |
| e.keepalive.Unlock() |
| return nil |
| } |
| |
| // If a userTimeout is set then abort the connection if it is |
| // exceeded. |
| if userTimeout != 0 && time.Since(e.rcv.lastRcvdAckTime) >= userTimeout && e.keepalive.unacked > 0 { |
| e.keepalive.Unlock() |
| e.stack.Stats().TCP.EstablishedTimedout.Increment() |
| return &tcpip.ErrTimeout{} |
| } |
| |
| if e.keepalive.unacked >= e.keepalive.count { |
| e.keepalive.Unlock() |
| e.stack.Stats().TCP.EstablishedTimedout.Increment() |
| return &tcpip.ErrTimeout{} |
| } |
| |
| // RFC1122 4.2.3.6: TCP keepalive is a dataless ACK with |
| // seg.seq = snd.nxt-1. |
| e.keepalive.unacked++ |
| e.keepalive.Unlock() |
| e.snd.sendSegmentFromView(buffer.VectorisedView{}, header.TCPFlagAck, e.snd.sndNxt-1) |
| e.resetKeepaliveTimer(false) |
| return nil |
| } |
| |
| // resetKeepaliveTimer restarts or stops the keepalive timer, depending on |
| // whether it is enabled for this endpoint. |
| func (e *endpoint) resetKeepaliveTimer(receivedData bool) { |
| e.keepalive.Lock() |
| if receivedData { |
| e.keepalive.unacked = 0 |
| } |
| // Start the keepalive timer IFF it's enabled and there is no pending |
| // data to send. |
| if !e.SocketOptions().GetKeepAlive() || e.snd == nil || e.snd.sndUna != e.snd.sndNxt { |
| e.keepalive.timer.disable() |
| e.keepalive.Unlock() |
| return |
| } |
| if e.keepalive.unacked > 0 { |
| e.keepalive.timer.enable(e.keepalive.interval) |
| } else { |
| e.keepalive.timer.enable(e.keepalive.idle) |
| } |
| e.keepalive.Unlock() |
| } |
| |
| // disableKeepaliveTimer stops the keepalive timer. |
| func (e *endpoint) disableKeepaliveTimer() { |
| e.keepalive.Lock() |
| e.keepalive.timer.disable() |
| e.keepalive.Unlock() |
| } |
| |
| // protocolMainLoop is the main loop of the TCP protocol. It runs in its own |
| // goroutine and is responsible for sending segments and handling received |
| // segments. |
| func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{}) tcpip.Error { |
| e.mu.Lock() |
| var closeTimer *time.Timer |
| var closeWaker sleep.Waker |
| |
| epilogue := func() { |
| // e.mu is expected to be hold upon entering this section. |
| if e.snd != nil { |
| e.snd.resendTimer.cleanup() |
| e.snd.probeTimer.cleanup() |
| e.snd.reorderTimer.cleanup() |
| } |
| |
| if closeTimer != nil { |
| closeTimer.Stop() |
| } |
| |
| e.completeWorkerLocked() |
| |
| if e.drainDone != nil { |
| close(e.drainDone) |
| } |
| |
| e.mu.Unlock() |
| |
| e.drainClosingSegmentQueue() |
| |
| // When the protocol loop exits we should wake up our waiters. |
| e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) |
| } |
| |
| if handshake { |
| if err := e.h.complete(); err != nil { |
| e.lastErrorMu.Lock() |
| e.lastError = err |
| e.lastErrorMu.Unlock() |
| |
| e.setEndpointState(StateError) |
| e.hardError = err |
| |
| e.workerCleanup = true |
| // Lock released below. |
| epilogue() |
| return err |
| } |
| } |
| |
| // Reaching this point means that we successfully completed the 3-way |
| // handshake with our peer. |
| // |
| // Completing the 3-way handshake is an indication that the route is valid |
| // and the remote is reachable as the only way we can complete a handshake |
| // is if our SYN reached the remote and their ACK reached us. |
| e.route.ConfirmReachable() |
| |
| drained := e.drainDone != nil |
| if drained { |
| close(e.drainDone) |
| e.mu.Unlock() |
| <-e.undrain |
| e.mu.Lock() |
| } |
| |
| // Set up the functions that will be called when the main protocol loop |
| // wakes up. |
| funcs := []struct { |
| w *sleep.Waker |
| f func() tcpip.Error |
| }{ |
| { |
| w: &e.sndWaker, |
| f: func() tcpip.Error { |
| e.handleWrite() |
| return nil |
| }, |
| }, |
| { |
| w: &e.sndCloseWaker, |
| f: func() tcpip.Error { |
| e.handleClose() |
| return nil |
| }, |
| }, |
| { |
| w: &closeWaker, |
| f: func() tcpip.Error { |
| // This means the socket is being closed due |
| // to the TCP-FIN-WAIT2 timeout was hit. Just |
| // mark the socket as closed. |
| e.transitionToStateCloseLocked() |
| e.workerCleanup = true |
| return nil |
| }, |
| }, |
| { |
| w: &e.snd.resendWaker, |
| f: func() tcpip.Error { |
| if !e.snd.retransmitTimerExpired() { |
| e.stack.Stats().TCP.EstablishedTimedout.Increment() |
| return &tcpip.ErrTimeout{} |
| } |
| return nil |
| }, |
| }, |
| { |
| w: &e.snd.probeWaker, |
| f: e.snd.probeTimerExpired, |
| }, |
| { |
| w: &e.newSegmentWaker, |
| f: func() tcpip.Error { |
| return e.handleSegments(false /* fastPath */) |
| }, |
| }, |
| { |
| w: &e.keepalive.waker, |
| f: e.keepaliveTimerExpired, |
| }, |
| { |
| w: &e.notificationWaker, |
| f: func() tcpip.Error { |
| n := e.fetchNotifications() |
| if n¬ifyNonZeroReceiveWindow != 0 { |
| e.rcv.nonZeroWindow() |
| } |
| |
| if n¬ifyMTUChanged != 0 { |
| e.sndBufMu.Lock() |
| count := e.packetTooBigCount |
| e.packetTooBigCount = 0 |
| mtu := e.sndMTU |
| e.sndBufMu.Unlock() |
| |
| e.snd.updateMaxPayloadSize(mtu, count) |
| } |
| |
| if n¬ifyReset != 0 || n¬ifyAbort != 0 { |
| return &tcpip.ErrConnectionAborted{} |
| } |
| |
| if n¬ifyResetByPeer != 0 { |
| return &tcpip.ErrConnectionReset{} |
| } |
| |
| if n¬ifyClose != 0 && closeTimer == nil { |
| if e.EndpointState() == StateFinWait2 && e.closed { |
| // The socket has been closed and we are in FIN_WAIT2 |
| // so start the FIN_WAIT2 timer. |
| closeTimer = time.AfterFunc(e.tcpLingerTimeout, closeWaker.Assert) |
| } |
| } |
| |
| if n¬ifyKeepaliveChanged != 0 { |
| // The timer could fire in background |
| // when the endpoint is drained. That's |
| // OK. See above. |
| e.resetKeepaliveTimer(true) |
| } |
| |
| if n¬ifyDrain != 0 { |
| for !e.segmentQueue.empty() { |
| if err := e.handleSegments(false /* fastPath */); err != nil { |
| return err |
| } |
| } |
| if !e.EndpointState().closed() { |
| // Only block the worker if the endpoint |
| // is not in closed state or error state. |
| close(e.drainDone) |
| e.mu.Unlock() |
| <-e.undrain |
| e.mu.Lock() |
| } |
| } |
| |
| if n¬ifyTickleWorker != 0 { |
| // Just a tickle notification. No need to do |
| // anything. |
| return nil |
| } |
| |
| return nil |
| }, |
| }, |
| { |
| w: &e.snd.reorderWaker, |
| f: e.snd.rc.reorderTimerExpired, |
| }, |
| } |
| |
| // Initialize the sleeper based on the wakers in funcs. |
| var s sleep.Sleeper |
| for i := range funcs { |
| s.AddWaker(funcs[i].w, i) |
| } |
| |
| // Notify the caller that the waker initialization is complete and the |
| // endpoint is ready. |
| if wakerInitDone != nil { |
| close(wakerInitDone) |
| } |
| |
| // Tell waiters that the endpoint is connected and writable. |
| e.waiterQueue.Notify(waiter.EventOut) |
| |
| // The following assertions and notifications are needed for restored |
| // endpoints. Fresh newly created endpoints have empty states and should |
| // not invoke any. |
| if !e.segmentQueue.empty() { |
| e.newSegmentWaker.Assert() |
| } |
| |
| e.rcvListMu.Lock() |
| if !e.rcvList.Empty() { |
| e.waiterQueue.Notify(waiter.EventIn) |
| } |
| e.rcvListMu.Unlock() |
| |
| if e.workerCleanup { |
| e.notifyProtocolGoroutine(notifyClose) |
| } |
| |
| // Main loop. Handle segments until both send and receive ends of the |
| // connection have completed. |
| cleanupOnError := func(err tcpip.Error) { |
| e.stack.Stats().TCP.CurrentConnected.Decrement() |
| e.workerCleanup = true |
| if err != nil { |
| e.resetConnectionLocked(err) |
| } |
| // Lock released below. |
| epilogue() |
| } |
| |
| loop: |
| for { |
| switch e.EndpointState() { |
| case StateTimeWait, StateClose, StateError: |
| break loop |
| } |
| |
| e.mu.Unlock() |
| v, _ := s.Fetch(true /* block */) |
| e.mu.Lock() |
| |
| // We need to double check here because the notification may be |
| // stale by the time we got around to processing it. |
| switch e.EndpointState() { |
| case StateError: |
| // If the endpoint has already transitioned to an ERROR |
| // state just pass nil here as any reset that may need |
| // to be sent etc should already have been done and we |
| // just want to terminate the loop and cleanup the |
| // endpoint. |
| cleanupOnError(nil) |
| return nil |
| case StateTimeWait: |
| fallthrough |
| case StateClose: |
| break loop |
| default: |
| if err := funcs[v].f(); err != nil { |
| cleanupOnError(err) |
| return nil |
| } |
| } |
| } |
| |
| var reuseTW func() |
| if e.EndpointState() == StateTimeWait { |
| // Disable close timer as we now entering real TIME_WAIT. |
| if closeTimer != nil { |
| closeTimer.Stop() |
| } |
| // Mark the current sleeper done so as to free all associated |
| // wakers. |
| s.Done() |
| // Wake up any waiters before we enter TIME_WAIT. |
| e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) |
| e.workerCleanup = true |
| reuseTW = e.doTimeWait() |
| } |
| |
| // Handle any StateError transition from StateTimeWait. |
| if e.EndpointState() == StateError { |
| cleanupOnError(nil) |
| return nil |
| } |
| |
| e.transitionToStateCloseLocked() |
| |
| // Lock released below. |
| epilogue() |
| |
| // A new SYN was received during TIME_WAIT and we need to abort |
| // the timewait and redirect the segment to the listener queue |
| if reuseTW != nil { |
| reuseTW() |
| } |
| |
| return nil |
| } |
| |
| // handleTimeWaitSegments processes segments received during TIME_WAIT |
| // state. |
| func (e *endpoint) handleTimeWaitSegments() (extendTimeWait bool, reuseTW func()) { |
| checkRequeue := true |
| for i := 0; i < maxSegmentsPerWake; i++ { |
| s := e.segmentQueue.dequeue() |
| if s == nil { |
| checkRequeue = false |
| break |
| } |
| extTW, newSyn := e.rcv.handleTimeWaitSegment(s) |
| if newSyn { |
| info := e.TransportEndpointInfo |
| newID := info.ID |
| newID.RemoteAddress = "" |
| newID.RemotePort = 0 |
| netProtos := []tcpip.NetworkProtocolNumber{info.NetProto} |
| // If the local address is an IPv4 address then also |
| // look for IPv6 dual stack endpoints that might be |
| // listening on the local address. |
| if newID.LocalAddress.To4() != "" { |
| netProtos = []tcpip.NetworkProtocolNumber{header.IPv4ProtocolNumber, header.IPv6ProtocolNumber} |
| } |
| for _, netProto := range netProtos { |
| if listenEP := e.stack.FindTransportEndpoint(netProto, info.TransProto, newID, s.nicID); listenEP != nil { |
| tcpEP := listenEP.(*endpoint) |
| if EndpointState(tcpEP.State()) == StateListen { |
| reuseTW = func() { |
| if !tcpEP.enqueueSegment(s) { |
| s.decRef() |
| return |
| } |
| tcpEP.newSegmentWaker.Assert() |
| } |
| // We explicitly do not decRef |
| // the segment as it's still |
| // valid and being reflected to |
| // a listening endpoint. |
| return false, reuseTW |
| } |
| } |
| } |
| } |
| if extTW { |
| extendTimeWait = true |
| } |
| s.decRef() |
| } |
| if checkRequeue && !e.segmentQueue.empty() { |
| e.newSegmentWaker.Assert() |
| } |
| return extendTimeWait, nil |
| } |
| |
| // doTimeWait is responsible for handling the TCP behaviour once a socket |
| // enters the TIME_WAIT state. Optionally it can return a closure that |
| // should be executed after releasing the endpoint registrations. This is |
| // done in cases where a new SYN is received during TIME_WAIT that carries |
| // a sequence number larger than one see on the connection. |
| func (e *endpoint) doTimeWait() (twReuse func()) { |
| // Trigger a 2 * MSL time wait state. During this period |
| // we will drop all incoming segments. |
| // NOTE: On Linux this is not configurable and is fixed at 60 seconds. |
| timeWaitDuration := DefaultTCPTimeWaitTimeout |
| |
| // Get the stack wide configuration. |
| var tcpTW tcpip.TCPTimeWaitTimeoutOption |
| if err := e.stack.TransportProtocolOption(ProtocolNumber, &tcpTW); err == nil { |
| timeWaitDuration = time.Duration(tcpTW) |
| } |
| |
| const newSegment = 1 |
| const notification = 2 |
| const timeWaitDone = 3 |
| |
| var s sleep.Sleeper |
| defer s.Done() |
| s.AddWaker(&e.newSegmentWaker, newSegment) |
| s.AddWaker(&e.notificationWaker, notification) |
| |
| var timeWaitWaker sleep.Waker |
| s.AddWaker(&timeWaitWaker, timeWaitDone) |
| timeWaitTimer := time.AfterFunc(timeWaitDuration, timeWaitWaker.Assert) |
| defer timeWaitTimer.Stop() |
| |
| for { |
| e.mu.Unlock() |
| v, _ := s.Fetch(true /* block */) |
| e.mu.Lock() |
| switch v { |
| case newSegment: |
| extendTimeWait, reuseTW := e.handleTimeWaitSegments() |
| if reuseTW != nil { |
| return reuseTW |
| } |
| if extendTimeWait { |
| timeWaitTimer.Reset(timeWaitDuration) |
| } |
| case notification: |
| n := e.fetchNotifications() |
| if n¬ifyAbort != 0 { |
| return nil |
| } |
| if n¬ifyDrain != 0 { |
| for !e.segmentQueue.empty() { |
| // Ignore extending TIME_WAIT during a |
| // save. For sockets in TIME_WAIT we just |
| // terminate the TIME_WAIT early. |
| e.handleTimeWaitSegments() |
| } |
| close(e.drainDone) |
| e.mu.Unlock() |
| <-e.undrain |
| e.mu.Lock() |
| return nil |
| } |
| case timeWaitDone: |
| return nil |
| } |
| } |
| } |