blob: 29e40e1183dcec0c3cbe125cf07e907a7da864f5 [file] [log] [blame]
// Copyright 2016 The Netstack Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sharedmem
import (
"sync/atomic"
"syscall"
"github.com/google/netstack/tcpip/link/rawfile"
"github.com/google/netstack/tcpip/link/sharedmem/queue"
)
// rx holds all state associated with an rx queue.
type rx struct {
data []byte
sharedData []byte
q queue.Rx
eventFD int
}
// init initializes all state needed by the rx queue based on the information
// provided.
//
// The caller always retains ownership of all file descriptors passed in. The
// queue implementation will duplicate any that it may need in the future.
func (r *rx) init(mtu uint32, c *QueueConfig) error {
// Map in all buffers.
txPipe, err := getBuffer(c.TxPipeFD)
if err != nil {
return err
}
rxPipe, err := getBuffer(c.RxPipeFD)
if err != nil {
syscall.Munmap(txPipe)
return err
}
data, err := getBuffer(c.DataFD)
if err != nil {
syscall.Munmap(txPipe)
syscall.Munmap(rxPipe)
return err
}
sharedData, err := getBuffer(c.SharedDataFD)
if err != nil {
syscall.Munmap(txPipe)
syscall.Munmap(rxPipe)
syscall.Munmap(data)
return err
}
// Duplicate the eventFD so that caller can close it but we can still
// use it.
efd, err := syscall.Dup(c.EventFD)
if err != nil {
syscall.Munmap(txPipe)
syscall.Munmap(rxPipe)
syscall.Munmap(data)
syscall.Munmap(sharedData)
return err
}
// Set the eventfd as non-blocking.
if err := syscall.SetNonblock(efd, true); err != nil {
syscall.Munmap(txPipe)
syscall.Munmap(rxPipe)
syscall.Munmap(data)
syscall.Munmap(sharedData)
syscall.Close(efd)
return err
}
// Initialize state based on buffers.
r.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData))
r.data = data
r.eventFD = efd
r.sharedData = sharedData
return nil
}
// cleanup releases all resources allocated during init(). It must only be
// called if init() has previously succeeded.
func (r *rx) cleanup() {
a, b := r.q.Bytes()
syscall.Munmap(a)
syscall.Munmap(b)
syscall.Munmap(r.data)
syscall.Munmap(r.sharedData)
syscall.Close(r.eventFD)
}
// postAndReceive posts the provided buffers (if any), and then tries to read
// from the receive queue.
//
// Capacity permitting, it reuses the posted buffer slice to store the buffers
// that were read as well.
//
// This function will block if there aren't any available packets.
func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue.RxBuffer, uint32) {
// Post the buffers first. If we cannot post, sleep until we can. We
// never post more than will fit concurrently, so it's safe to wait
// until enough room is available.
if len(b) != 0 && !r.q.PostBuffers(b) {
r.q.EnableNotification()
for !r.q.PostBuffers(b) {
var tmp [8]byte
rawfile.BlockingRead(r.eventFD, tmp[:])
if atomic.LoadUint32(stopRequested) != 0 {
r.q.DisableNotification()
return nil, 0
}
}
r.q.DisableNotification()
}
// Read the next set of descriptors.
b, n := r.q.Dequeue(b[:0])
if len(b) != 0 {
return b, n
}
// Data isn't immediately available. Enable eventfd notifications.
r.q.EnableNotification()
for {
b, n = r.q.Dequeue(b)
if len(b) != 0 {
break
}
// Wait for notification.
var tmp [8]byte
rawfile.BlockingRead(r.eventFD, tmp[:])
if atomic.LoadUint32(stopRequested) != 0 {
r.q.DisableNotification()
return nil, 0
}
}
r.q.DisableNotification()
return b, n
}