blob: 5a2a0d2b04ddc087518cb507d79308086a740d9d [file] [log] [blame]
// Copyright 2018 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build linux
// Package sharedmem provides the implemention of data-link layer endpoints
// backed by shared memory.
//
// Shared memory endpoints can be used in the networking stack by calling New()
// to create a new endpoint, and then passing it as an argument to
// Stack.CreateNIC().
package sharedmem
import (
"sync"
"sync/atomic"
"syscall"
"github.com/google/netstack/tcpip"
"github.com/google/netstack/tcpip/buffer"
"github.com/google/netstack/tcpip/header"
"github.com/google/netstack/tcpip/link/sharedmem/queue"
"github.com/google/netstack/tcpip/stack"
"log"
)
// QueueConfig holds all the file descriptors needed to describe a tx or rx
// queue over shared memory. It is used when creating new shared memory
// endpoints to describe tx and rx queues.
type QueueConfig struct {
// DataFD is a file descriptor for the file that contains the data to
// be transmitted via this queue. Descriptors contain offsets within
// this file.
DataFD int
// EventFD is a file descriptor for the event that is signaled when
// data is becomes available in this queue.
EventFD int
// TxPipeFD is a file descriptor for the tx pipe associated with the
// queue.
TxPipeFD int
// RxPipeFD is a file descriptor for the rx pipe associated with the
// queue.
RxPipeFD int
// SharedDataFD is a file descriptor for the file that contains shared
// state between the two ends of the queue. This data specifies, for
// example, whether EventFD signaling is enabled or disabled.
SharedDataFD int
}
type endpoint struct {
// mtu (maximum transmission unit) is the maximum size of a packet.
mtu uint32
// bufferSize is the size of each individual buffer.
bufferSize uint32
// addr is the local address of this endpoint.
addr tcpip.LinkAddress
// rx is the receive queue.
rx rx
// stopRequested is to be accessed atomically only, and determines if
// the worker goroutines should stop.
stopRequested uint32
// Wait group used to indicate that all workers have stopped.
completed sync.WaitGroup
// mu protects the following fields.
mu sync.Mutex
// tx is the transmit queue.
tx tx
// workerStarted specifies whether the worker goroutine was started.
workerStarted bool
}
// New creates a new shared-memory-based endpoint. Buffers will be broken up
// into buffers of "bufferSize" bytes.
func New(mtu, bufferSize uint32, addr tcpip.LinkAddress, tx, rx QueueConfig) (tcpip.LinkEndpointID, error) {
e := &endpoint{
mtu: mtu,
bufferSize: bufferSize,
addr: addr,
}
if err := e.tx.init(bufferSize, &tx); err != nil {
return 0, err
}
if err := e.rx.init(bufferSize, &rx); err != nil {
e.tx.cleanup()
return 0, err
}
return stack.RegisterLinkEndpoint(e), nil
}
// Close frees all resources associated with the endpoint.
func (e *endpoint) Close() {
// Tell dispatch goroutine to stop, then write to the eventfd so that
// it wakes up in case it's sleeping.
atomic.StoreUint32(&e.stopRequested, 1)
syscall.Write(e.rx.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
// Cleanup the queues inline if the worker hasn't started yet; we also
// know it won't start from now on because stopRequested is set to 1.
e.mu.Lock()
workerPresent := e.workerStarted
e.mu.Unlock()
if !workerPresent {
e.tx.cleanup()
e.rx.cleanup()
}
}
// Wait waits until all workers have stopped after a Close() call.
func (e *endpoint) Wait() {
e.completed.Wait()
}
// Attach implements stack.LinkEndpoint.Attach. It launches the goroutine that
// reads packets from the rx queue.
func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
e.mu.Lock()
if !e.workerStarted && atomic.LoadUint32(&e.stopRequested) == 0 {
e.workerStarted = true
e.completed.Add(1)
// Link endpoints are not savable. When transportation endpoints
// are saved, they stop sending outgoing packets and all
// incoming packets are rejected.
go e.dispatchLoop(dispatcher)
}
e.mu.Unlock()
}
// IsAttached implements stack.LinkEndpoint.IsAttached.
func (e *endpoint) IsAttached() bool {
e.mu.Lock()
defer e.mu.Unlock()
return e.workerStarted
}
// MTU implements stack.LinkEndpoint.MTU. It returns the value initialized
// during construction.
func (e *endpoint) MTU() uint32 {
return e.mtu - header.EthernetMinimumSize
}
// Capabilities implements stack.LinkEndpoint.Capabilities.
func (*endpoint) Capabilities() stack.LinkEndpointCapabilities {
return 0
}
// MaxHeaderLength implements stack.LinkEndpoint.MaxHeaderLength. It returns the
// ethernet frame header size.
func (*endpoint) MaxHeaderLength() uint16 {
return header.EthernetMinimumSize
}
// LinkAddress implements stack.LinkEndpoint.LinkAddress. It returns the local
// link address.
func (e *endpoint) LinkAddress() tcpip.LinkAddress {
return e.addr
}
// WritePacket writes outbound packets to the file descriptor. If it is not
// currently writable, the packet is dropped.
func (e *endpoint) WritePacket(r *stack.Route, hdr buffer.Prependable, payload buffer.VectorisedView, protocol tcpip.NetworkProtocolNumber) *tcpip.Error {
// Add the ethernet header here.
eth := header.Ethernet(hdr.Prepend(header.EthernetMinimumSize))
ethHdr := &header.EthernetFields{
DstAddr: r.RemoteLinkAddress,
Type: protocol,
}
if r.LocalLinkAddress != "" {
ethHdr.SrcAddr = r.LocalLinkAddress
} else {
ethHdr.SrcAddr = e.addr
}
eth.Encode(ethHdr)
v := payload.ToView()
// Transmit the packet.
e.mu.Lock()
ok := e.tx.transmit(hdr.View(), v)
e.mu.Unlock()
if !ok {
return tcpip.ErrWouldBlock
}
return nil
}
// dispatchLoop reads packets from the rx queue in a loop and dispatches them
// to the network stack.
func (e *endpoint) dispatchLoop(d stack.NetworkDispatcher) {
// Post initial set of buffers.
limit := e.rx.q.PostedBuffersLimit()
if l := uint64(len(e.rx.data)) / uint64(e.bufferSize); limit > l {
limit = l
}
for i := uint64(0); i < limit; i++ {
b := queue.RxBuffer{
Offset: i * uint64(e.bufferSize),
Size: e.bufferSize,
ID: i,
}
if !e.rx.q.PostBuffers([]queue.RxBuffer{b}) {
log.Printf("Unable to post %v-th buffer", i)
}
}
// Read in a loop until a stop is requested.
var rxb []queue.RxBuffer
for atomic.LoadUint32(&e.stopRequested) == 0 {
var n uint32
rxb, n = e.rx.postAndReceive(rxb, &e.stopRequested)
// Copy data from the shared area to its own buffer, then
// prepare to repost the buffer.
b := make([]byte, n)
offset := uint32(0)
for i := range rxb {
copy(b[offset:], e.rx.data[rxb[i].Offset:][:rxb[i].Size])
offset += rxb[i].Size
rxb[i].Size = e.bufferSize
}
if n < header.EthernetMinimumSize {
continue
}
// Send packet up the stack.
eth := header.Ethernet(b)
d.DeliverNetworkPacket(e, eth.SourceAddress(), eth.DestinationAddress(), eth.Type(), buffer.View(b[header.EthernetMinimumSize:]).ToVectorisedView())
}
// Clean state.
e.tx.cleanup()
e.rx.cleanup()
e.completed.Done()
}