| // Copyright 2016 The Netstack Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package sharedmem |
| |
| import ( |
| "sync/atomic" |
| "syscall" |
| |
| "github.com/google/netstack/tcpip/link/rawfile" |
| "github.com/google/netstack/tcpip/link/sharedmem/queue" |
| ) |
| |
| // rx holds all state associated with an rx queue. |
| type rx struct { |
| data []byte |
| sharedData []byte |
| q queue.Rx |
| eventFD int |
| } |
| |
| // init initializes all state needed by the rx queue based on the information |
| // provided. |
| // |
| // The caller always retains ownership of all file descriptors passed in. The |
| // queue implementation will duplicate any that it may need in the future. |
| func (r *rx) init(mtu uint32, c *QueueConfig) error { |
| // Map in all buffers. |
| txPipe, err := getBuffer(c.TxPipeFD) |
| if err != nil { |
| return err |
| } |
| |
| rxPipe, err := getBuffer(c.RxPipeFD) |
| if err != nil { |
| syscall.Munmap(txPipe) |
| return err |
| } |
| |
| data, err := getBuffer(c.DataFD) |
| if err != nil { |
| syscall.Munmap(txPipe) |
| syscall.Munmap(rxPipe) |
| return err |
| } |
| |
| sharedData, err := getBuffer(c.SharedDataFD) |
| if err != nil { |
| syscall.Munmap(txPipe) |
| syscall.Munmap(rxPipe) |
| syscall.Munmap(data) |
| return err |
| } |
| |
| // Duplicate the eventFD so that caller can close it but we can still |
| // use it. |
| efd, err := syscall.Dup(c.EventFD) |
| if err != nil { |
| syscall.Munmap(txPipe) |
| syscall.Munmap(rxPipe) |
| syscall.Munmap(data) |
| syscall.Munmap(sharedData) |
| return err |
| } |
| |
| // Set the eventfd as non-blocking. |
| if err := syscall.SetNonblock(efd, true); err != nil { |
| syscall.Munmap(txPipe) |
| syscall.Munmap(rxPipe) |
| syscall.Munmap(data) |
| syscall.Munmap(sharedData) |
| syscall.Close(efd) |
| return err |
| } |
| |
| // Initialize state based on buffers. |
| r.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData)) |
| r.data = data |
| r.eventFD = efd |
| r.sharedData = sharedData |
| |
| return nil |
| } |
| |
| // cleanup releases all resources allocated during init(). It must only be |
| // called if init() has previously succeeded. |
| func (r *rx) cleanup() { |
| a, b := r.q.Bytes() |
| syscall.Munmap(a) |
| syscall.Munmap(b) |
| |
| syscall.Munmap(r.data) |
| syscall.Munmap(r.sharedData) |
| syscall.Close(r.eventFD) |
| } |
| |
| // postAndReceive posts the provided buffers (if any), and then tries to read |
| // from the receive queue. |
| // |
| // Capacity permitting, it reuses the posted buffer slice to store the buffers |
| // that were read as well. |
| // |
| // This function will block if there aren't any available packets. |
| func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue.RxBuffer, uint32) { |
| // Post the buffers first. If we cannot post, sleep until we can. We |
| // never post more than will fit concurrently, so it's safe to wait |
| // until enough room is available. |
| if len(b) != 0 && !r.q.PostBuffers(b) { |
| r.q.EnableNotification() |
| for !r.q.PostBuffers(b) { |
| var tmp [8]byte |
| rawfile.BlockingRead(r.eventFD, tmp[:]) |
| if atomic.LoadUint32(stopRequested) != 0 { |
| r.q.DisableNotification() |
| return nil, 0 |
| } |
| } |
| r.q.DisableNotification() |
| } |
| |
| // Read the next set of descriptors. |
| b, n := r.q.Dequeue(b[:0]) |
| if len(b) != 0 { |
| return b, n |
| } |
| |
| // Data isn't immediately available. Enable eventfd notifications. |
| r.q.EnableNotification() |
| for { |
| b, n = r.q.Dequeue(b) |
| if len(b) != 0 { |
| break |
| } |
| |
| // Wait for notification. |
| var tmp [8]byte |
| rawfile.BlockingRead(r.eventFD, tmp[:]) |
| if atomic.LoadUint32(stopRequested) != 0 { |
| r.q.DisableNotification() |
| return nil, 0 |
| } |
| } |
| r.q.DisableNotification() |
| |
| return b, n |
| } |