blob: 962a449cbaf39139466b4b1872214e4ce2e7249c [file] [log] [blame]
// Copyright 2016 The Netstack Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package sharedmem provides the implemention of data-link layer endpoints
// backed by shared memory.
//
// Shared memory endpoints can be used in the networking stack by calling New()
// to create a new endpoint, and then passing it as an argument to
// Stack.CreateNIC().
package sharedmem
import (
"sync"
"sync/atomic"
"syscall"
"github.com/google/netstack/tcpip"
"github.com/google/netstack/tcpip/buffer"
"github.com/google/netstack/tcpip/header"
"github.com/google/netstack/tcpip/link/sharedmem/queue"
"github.com/google/netstack/tcpip/stack"
"log"
)
// QueueConfig holds all the file descriptors needed to describe a tx or rx
// queue over shared memory. It is used when creating new shared memory
// endpoints to describe tx and rx queues.
type QueueConfig struct {
// DataFD is a file descriptor for the file that contains the data to
// be transmitted via this queue. Descriptors contain offsets within
// this file.
DataFD int
// EventFD is a file descriptor for the event that is signaled when
// data is becomes available in this queue.
EventFD int
// TxPipeFD is a file descriptor for the tx pipe associated with the
// queue.
TxPipeFD int
// RxPipeFD is a file descriptor for the rx pipe associated with the
// queue.
RxPipeFD int
// SharedDataFD is a file descriptor for the file that contains shared
// state between the two ends of the queue. This data specifies, for
// example, whether EventFD signaling is enabled or disabled.
SharedDataFD int
}
type endpoint struct {
// mtu (maximum transmission unit) is the maximum size of a packet.
mtu uint32
// bufferSize is the size of each individual buffer.
bufferSize uint32
// addr is the local address of this endpoint.
addr tcpip.LinkAddress
// rx is the receive queue.
rx rx
// stopRequested is to be accessed atomically only, and determines if
// the worker goroutines should stop.
stopRequested uint32
// Wait group used to indicate that all workers have stopped.
completed sync.WaitGroup
// mu protects the following fields.
mu sync.Mutex
// tx is the transmit queue.
tx tx
// workerStarted specifies whether the worker goroutine was started.
workerStarted bool
}
// New creates a new shared-memory-based endpoint. Buffers will be broken up
// into buffers of "bufferSize" bytes.
func New(mtu, bufferSize uint32, addr tcpip.LinkAddress, tx, rx QueueConfig) (tcpip.LinkEndpointID, error) {
e := &endpoint{
mtu: mtu,
bufferSize: bufferSize,
addr: addr,
}
if err := e.tx.init(bufferSize, &tx); err != nil {
return 0, err
}
if err := e.rx.init(bufferSize, &rx); err != nil {
e.tx.cleanup()
return 0, err
}
return stack.RegisterLinkEndpoint(e), nil
}
// Close frees all resources associated with the endpoint.
func (e *endpoint) Close() {
// Tell dispatch goroutine to stop, then write to the eventfd so that
// it wakes up in case it's sleeping.
atomic.StoreUint32(&e.stopRequested, 1)
syscall.Write(e.rx.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
// Cleanup the queues inline if the worker hasn't started yet; we also
// know it won't start from now on because stopRequested is set to 1.
e.mu.Lock()
workerPresent := e.workerStarted
e.mu.Unlock()
if !workerPresent {
e.tx.cleanup()
e.rx.cleanup()
}
}
// Wait waits until all workers have stopped after a Close() call.
func (e *endpoint) Wait() {
e.completed.Wait()
}
// Attach implements stack.LinkEndpoint.Attach. It launches the goroutine that
// reads packets from the rx queue.
func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
e.mu.Lock()
if !e.workerStarted && atomic.LoadUint32(&e.stopRequested) == 0 {
e.workerStarted = true
e.completed.Add(1)
go e.dispatchLoop(dispatcher)
}
e.mu.Unlock()
}
// MTU implements stack.LinkEndpoint.MTU. It returns the value initialized
// during construction.
func (e *endpoint) MTU() uint32 {
return e.mtu - header.EthernetMinimumSize
}
// MaxHeaderLength implements stack.LinkEndpoint.MaxHeaderLength. It returns the
// ethernet frame header size.
func (*endpoint) MaxHeaderLength() uint16 {
return header.EthernetMinimumSize
}
// LinkAddress implements stack.LinkEndpoint.LinkAddress. It returns the local
// link address.
func (e *endpoint) LinkAddress() tcpip.LinkAddress {
return e.addr
}
// WritePacket writes outbound packets to the file descriptor. If it is not
// currently writable, the packet is dropped.
func (e *endpoint) WritePacket(r *stack.Route, hdr *buffer.Prependable, payload buffer.View, protocol tcpip.NetworkProtocolNumber) *tcpip.Error {
// Add the ethernet header here.
eth := header.Ethernet(hdr.Prepend(header.EthernetMinimumSize))
eth.Encode(&header.EthernetFields{
DstAddr: r.RemoteLinkAddress,
SrcAddr: e.addr,
Type: protocol,
})
// Transmit the packet.
e.mu.Lock()
ok := e.tx.transmit(hdr.UsedBytes(), payload)
e.mu.Unlock()
if !ok {
return tcpip.ErrWouldBlock
}
return nil
}
// dispatchLoop reads packets from the rx queue in a loop and dispatches them
// to the network stack.
func (e *endpoint) dispatchLoop(d stack.NetworkDispatcher) {
// Post initial set of buffers.
limit := e.rx.q.PostedBuffersLimit()
if l := uint64(len(e.rx.data)) / uint64(e.bufferSize); limit > l {
limit = l
}
for i := uint64(0); i < limit; i++ {
b := queue.RxBuffer{
Offset: i * uint64(e.bufferSize),
Size: e.bufferSize,
ID: i,
}
if !e.rx.q.PostBuffers([]queue.RxBuffer{b}) {
log.Printf("Unable to post %v-th buffer", i)
}
}
// Read in a loop until a stop is requested.
var rxb []queue.RxBuffer
views := []buffer.View{nil}
vv := buffer.NewVectorisedView(0, views)
for atomic.LoadUint32(&e.stopRequested) == 0 {
var n uint32
rxb, n = e.rx.postAndReceive(rxb, &e.stopRequested)
// Copy data from the shared area to its own buffer, then
// prepare to repost the buffer.
b := make([]byte, n)
offset := uint32(0)
for i := range rxb {
copy(b[offset:], e.rx.data[rxb[i].Offset:][:rxb[i].Size])
offset += rxb[i].Size
rxb[i].Size = e.bufferSize
}
if n < header.EthernetMinimumSize {
continue
}
// Send packet up the stack.
eth := header.Ethernet(b)
views[0] = b[header.EthernetMinimumSize:]
vv.SetSize(int(n) - header.EthernetMinimumSize)
d.DeliverNetworkPacket(e, e.addr, eth.SourceAddress(), eth.Type(), &vv)
}
// Clean state.
e.tx.cleanup()
e.rx.cleanup()
e.completed.Done()
}