blob: 2c2cc7e8e7331982de8d393c2c6fa7d8cb705111 [file] [log] [blame]
// Copyright 2018 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package queue provides the implementation of transmit and receive queues
// based on shared memory ring buffers.
package queue
import (
"encoding/binary"
"sync/atomic"
"github.com/google/netstack/tcpip/link/sharedmem/pipe"
"log"
)
const (
// Offsets within a posted buffer.
postedOffset = 0
postedSize = 8
postedRemainingInGroup = 12
postedUserData = 16
postedID = 24
sizeOfPostedBuffer = 32
// Offsets within a received packet header.
consumedPacketSize = 0
consumedPacketReserved = 4
sizeOfConsumedPacketHeader = 8
// Offsets within a consumed buffer.
consumedOffset = 0
consumedSize = 8
consumedUserData = 12
consumedID = 20
sizeOfConsumedBuffer = 28
// The following are the allowed states of the shared data area.
eventFDUninitialized = 0
eventFDDisabled = 1
eventFDEnabled = 2
)
// RxBuffer is the descriptor of a receive buffer.
type RxBuffer struct {
Offset uint64
Size uint32
ID uint64
UserData uint64
}
// Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx
// pipe is used to "post" buffers, while the rx pipe is used to receive packets
// whose contents have been written to previously posted buffers.
//
// This struct is thread-compatible.
type Rx struct {
tx pipe.Tx
rx pipe.Rx
sharedEventFDState *uint32
}
// Init initializes the receive queue with the given pipes, and shared state
// pointer -- the latter is used to enable/disable eventfd notifications.
func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) {
r.sharedEventFDState = sharedEventFDState
r.tx.Init(tx)
r.rx.Init(rx)
}
// EnableNotification updates the shared state such that the peer will notify
// the eventfd when there are packets to be dequeued.
func (r *Rx) EnableNotification() {
atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled)
}
// DisableNotification updates the shared state such that the peer will not
// notify the eventfd.
func (r *Rx) DisableNotification() {
atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled)
}
// PostedBuffersLimit returns the maximum number of buffers that can be posted
// before the tx queue fills up.
func (r *Rx) PostedBuffersLimit() uint64 {
return r.tx.Capacity(sizeOfPostedBuffer)
}
// PostBuffers makes the given buffers available for receiving data from the
// peer. Once they are posted, the peer is free to write to them and will
// eventually post them back for consumption.
func (r *Rx) PostBuffers(buffers []RxBuffer) bool {
for i := range buffers {
b := r.tx.Push(sizeOfPostedBuffer)
if b == nil {
r.tx.Abort()
return false
}
pb := &buffers[i]
binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset)
binary.LittleEndian.PutUint32(b[postedSize:], pb.Size)
binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0)
binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData)
binary.LittleEndian.PutUint64(b[postedID:], pb.ID)
}
r.tx.Flush()
return true
}
// Dequeue receives buffers that have been previously posted by PostBuffers()
// and that have been filled by the peer and posted back.
//
// This is similar to append() in that new buffers are appended to "bufs", with
// reallocation only if "bufs" doesn't have enough capacity.
func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) {
for {
outBufs := bufs
// Pull the next descriptor from the rx pipe.
b := r.rx.Pull()
if b == nil {
return bufs, 0
}
if len(b) < sizeOfConsumedPacketHeader {
log.Printf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader)
r.rx.Flush()
continue
}
totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:])
// Calculate the number of buffer descriptors and copy them
// over to the output.
count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer
offset := sizeOfConsumedPacketHeader
buffersSize := uint32(0)
for i := count; i > 0; i-- {
s := binary.LittleEndian.Uint32(b[offset+consumedSize:])
buffersSize += s
if buffersSize < s {
// The buffer size overflows an unsigned 32-bit
// integer, so break out and force it to be
// ignored.
totalDataSize = 1
buffersSize = 0
break
}
outBufs = append(outBufs, RxBuffer{
Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]),
Size: s,
ID: binary.LittleEndian.Uint64(b[offset+consumedID:]),
})
offset += sizeOfConsumedBuffer
}
r.rx.Flush()
if buffersSize < totalDataSize {
// The descriptor is corrupted, ignore it.
log.Printf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize)
continue
}
return outBufs, totalDataSize
}
}
// Bytes returns the byte slices on which the queue operates.
func (r *Rx) Bytes() (tx, rx []byte) {
return r.tx.Bytes(), r.rx.Bytes()
}
// DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue.
func DecodeRxBufferHeader(b []byte) RxBuffer {
return RxBuffer{
Offset: binary.LittleEndian.Uint64(b[postedOffset:]),
Size: binary.LittleEndian.Uint32(b[postedSize:]),
ID: binary.LittleEndian.Uint64(b[postedID:]),
UserData: binary.LittleEndian.Uint64(b[postedUserData:]),
}
}
// RxCompletionSize returns the number of bytes needed to encode an rx
// completion containing "count" buffers.
func RxCompletionSize(count int) uint64 {
return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer
}
// EncodeRxCompletion encodes an rx completion header.
func EncodeRxCompletion(b []byte, size, reserved uint32) {
binary.LittleEndian.PutUint32(b[consumedPacketSize:], size)
binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved)
}
// EncodeRxCompletionBuffer encodes the i-th rx completion buffer header.
func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) {
b = b[RxCompletionSize(i):]
binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset)
binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size)
binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData)
binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID)
}