blob: 21162f01afb7ccd10c9678ca99d52b6d0ca2709c [file] [log] [blame]
// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tcp
import (
"encoding/binary"
"gvisor.dev/gvisor/pkg/rand"
"gvisor.dev/gvisor/pkg/sleep"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip/hash/jenkins"
"gvisor.dev/gvisor/pkg/tcpip/header"
"gvisor.dev/gvisor/pkg/tcpip/stack"
)
// epQueue is a queue of endpoints.
type epQueue struct {
mu sync.Mutex
list endpointList
}
// enqueue adds e to the queue if the endpoint is not already on the queue.
func (q *epQueue) enqueue(e *endpoint) {
q.mu.Lock()
if e.pendingProcessing {
q.mu.Unlock()
return
}
q.list.PushBack(e)
e.pendingProcessing = true
q.mu.Unlock()
}
// dequeue removes and returns the first element from the queue if available,
// returns nil otherwise.
func (q *epQueue) dequeue() *endpoint {
q.mu.Lock()
if e := q.list.Front(); e != nil {
q.list.Remove(e)
e.pendingProcessing = false
q.mu.Unlock()
return e
}
q.mu.Unlock()
return nil
}
// empty returns true if the queue is empty, false otherwise.
func (q *epQueue) empty() bool {
q.mu.Lock()
v := q.list.Empty()
q.mu.Unlock()
return v
}
// processor is responsible for processing packets queued to a tcp endpoint.
type processor struct {
epQ epQueue
sleeper sleep.Sleeper
newEndpointWaker sleep.Waker
closeWaker sleep.Waker
}
func (p *processor) close() {
p.closeWaker.Assert()
}
func (p *processor) queueEndpoint(ep *endpoint) {
// Queue an endpoint for processing by the processor goroutine.
p.epQ.enqueue(ep)
p.newEndpointWaker.Assert()
}
const (
newEndpointWaker = 1
closeWaker = 2
)
func (p *processor) start(wg *sync.WaitGroup) {
defer wg.Done()
defer p.sleeper.Done()
for {
if id, _ := p.sleeper.Fetch(true); id == closeWaker {
break
}
for {
ep := p.epQ.dequeue()
if ep == nil {
break
}
if ep.segmentQueue.empty() {
continue
}
// If socket has transitioned out of connected state then just let the
// worker handle the packet.
//
// NOTE: We read this outside of e.mu lock which means that by the time
// we get to handleSegments the endpoint may not be in ESTABLISHED. But
// this should be fine as all normal shutdown states are handled by
// handleSegments and if the endpoint moves to a CLOSED/ERROR state
// then handleSegments is a noop.
if ep.EndpointState() == StateEstablished && ep.mu.TryLock() {
// If the endpoint is in a connected state then we do direct delivery
// to ensure low latency and avoid scheduler interactions.
switch err := ep.handleSegments(true /* fastPath */); {
case err != nil:
// Send any active resets if required.
ep.resetConnectionLocked(err)
fallthrough
case ep.EndpointState() == StateClose:
ep.notifyProtocolGoroutine(notifyTickleWorker)
case !ep.segmentQueue.empty():
p.epQ.enqueue(ep)
}
ep.mu.Unlock()
} else {
ep.newSegmentWaker.Assert()
}
}
}
}
// dispatcher manages a pool of TCP endpoint processors which are responsible
// for the processing of inbound segments. This fixed pool of processor
// goroutines do full tcp processing. The processor is selected based on the
// hash of the endpoint id to ensure that delivery for the same endpoint happens
// in-order.
type dispatcher struct {
processors []processor
seed uint32
wg sync.WaitGroup
}
func (d *dispatcher) init(nProcessors int) {
d.close()
d.wait()
d.processors = make([]processor, nProcessors)
d.seed = generateRandUint32()
for i := range d.processors {
p := &d.processors[i]
p.sleeper.AddWaker(&p.newEndpointWaker, newEndpointWaker)
p.sleeper.AddWaker(&p.closeWaker, closeWaker)
d.wg.Add(1)
// NB: sleeper-waker registration must happen synchronously to avoid races
// with `close`. It's possible to pull all this logic into `start`, but
// that results in a heap-allocated function literal.
go p.start(&d.wg)
}
}
func (d *dispatcher) close() {
for i := range d.processors {
d.processors[i].close()
}
}
func (d *dispatcher) wait() {
d.wg.Wait()
}
func (d *dispatcher) queuePacket(stackEP stack.TransportEndpoint, id stack.TransportEndpointID, pkt *stack.PacketBuffer) {
ep := stackEP.(*endpoint)
s := newIncomingSegment(id, pkt)
if !s.parse(pkt.RXTransportChecksumValidated) {
ep.stack.Stats().MalformedRcvdPackets.Increment()
ep.stack.Stats().TCP.InvalidSegmentsReceived.Increment()
ep.stats.ReceiveErrors.MalformedPacketsReceived.Increment()
s.decRef()
return
}
if !s.csumValid {
ep.stack.Stats().MalformedRcvdPackets.Increment()
ep.stack.Stats().TCP.ChecksumErrors.Increment()
ep.stats.ReceiveErrors.ChecksumErrors.Increment()
s.decRef()
return
}
ep.stack.Stats().TCP.ValidSegmentsReceived.Increment()
ep.stats.SegmentsReceived.Increment()
if (s.flags & header.TCPFlagRst) != 0 {
ep.stack.Stats().TCP.ResetsReceived.Increment()
}
if !ep.enqueueSegment(s) {
s.decRef()
return
}
// For sockets not in established state let the worker goroutine
// handle the packets.
if ep.EndpointState() != StateEstablished {
ep.newSegmentWaker.Assert()
return
}
d.selectProcessor(id).queueEndpoint(ep)
}
func generateRandUint32() uint32 {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
panic(err)
}
return binary.LittleEndian.Uint32(b)
}
func (d *dispatcher) selectProcessor(id stack.TransportEndpointID) *processor {
var payload [4]byte
binary.LittleEndian.PutUint16(payload[0:], id.LocalPort)
binary.LittleEndian.PutUint16(payload[2:], id.RemotePort)
h := jenkins.Sum32(d.seed)
h.Write(payload[:])
h.Write([]byte(id.LocalAddress))
h.Write([]byte(id.RemoteAddress))
return &d.processors[h.Sum32()%uint32(len(d.processors))]
}