blob: 276f90f3226a5a0126ccb028227da5e15c325aa7 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Code generated by go generate; DO NOT EDIT.
package {{ .Pkg }}
import (
"context"
"fmt"
"sync"
"syscall/zx"
"syscall/zx/zxwait"
"reflect"
"unsafe"
"go.fuchsia.dev/fuchsia/src/connectivity/network/netstack/link/fifo"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/stack"
{{ range .Imports }}
"{{ . }}"{{ end }}
)
type {{ .Handler }} struct {
TxDepth, RxDepth uint32
RxFifo, TxFifo zx.Handle
rx {{ .Entries }}
tx struct {
mu struct {
sync.Mutex
waiters int
entries {{ .Entries }}
// detached signals to incoming writes that the receiver is unable
// to service them.
detached bool
}
cond sync.Cond
}
Stats struct {
Rx fifo.RxStats
Tx fifo.TxStats
}
}
func (h *{{ .Handler }}) TxReceiverLoop(wasSent func(entry *{{ .EntryType }}) bool) error {
scratch := make([]{{ .EntryType }}, h.TxDepth)
for {
h.tx.mu.Lock()
detached := h.tx.mu.detached
h.tx.mu.Unlock()
if detached {
return nil
}
if err := zxwait.WithRetryContext(context.Background(), func() error {
status, count := FifoRead(h.TxFifo, scratch)
if status != zx.ErrOk {
return &zx.Error{Status: status, Text: "FifoRead(TX)"}
}
var notSent uint64
for i := range scratch[:count] {
if !wasSent(&scratch[i]) {
notSent++
}
}
h.Stats.Tx.Drops.IncrementBy(notSent)
h.Stats.Tx.Reads(count).Increment()
h.tx.mu.Lock()
n := h.tx.mu.entries.AddReadied(scratch[:count])
h.tx.mu.entries.IncrementReadied(uint16(n))
h.tx.mu.Unlock()
h.tx.cond.Broadcast()
if n := uint32(n); count != n {
return fmt.Errorf("FifoRead(TX): tx_depth invariant violation; observed=%d expected=%d", h.TxDepth-n+count, h.TxDepth)
}
return nil
}, h.TxFifo, zx.SignalFIFOReadable, zx.SignalFIFOPeerClosed); err != nil {
return err
}
}
}
func (h *{{ .Handler }}) TxSenderLoop() error {
scratch := make([]{{ .EntryType }}, h.TxDepth)
for {
var batch []{{ .EntryType }}
h.tx.mu.Lock()
for {
if h.tx.mu.detached {
h.tx.mu.Unlock()
return nil
}
if batchSize := len(scratch) - int(h.tx.mu.entries.InFlight()); batchSize != 0 {
if h.tx.mu.entries.HaveQueued() {
if h.tx.mu.waiters == 0 || !h.tx.mu.entries.HaveReadied() {
// No application threads are waiting OR application threads are waiting on the
// reader.
//
// This condition is an optimization; if application threads are waiting when
// buffers are available then we were probably just woken up by the reader having
// retrieved buffers from the fifo. We avoid creating a batch until the application
// threads have all been satisfied, or until the buffers have all been used up.
batch = scratch[:batchSize]
break
}
}
}
h.tx.cond.Wait()
}
n := h.tx.mu.entries.GetQueued(batch)
h.tx.mu.entries.IncrementSent(uint16(n))
h.tx.mu.Unlock()
switch status, count := FifoWrite(h.TxFifo, batch[:n]); status {
case zx.ErrOk:
if n := uint32(n); count != n {
return fmt.Errorf("FifoWrite(TX): tx_depth invariant violation; observed=%d expected=%d", h.TxDepth-n+count, h.TxDepth)
}
h.Stats.Tx.Writes(count).Increment()
default:
return &zx.Error{Status: status, Text: "FifoWrite(TX)"}
}
}
}
func (h *{{ .Handler }}) RxLoop(process func(entry *{{ .EntryType }}) {{ if .Signal }} , signal zx.Signals, onSignal func() {{ end }} ) error {
scratch := make([]{{ .EntryType }}, h.RxDepth)
for {
if batchSize := len(scratch) - int(h.rx.InFlight()); batchSize != 0 && h.rx.HaveQueued() {
n := h.rx.GetQueued(scratch[:batchSize])
h.rx.IncrementSent(uint16(n))
status, count := FifoWrite(h.RxFifo, scratch[:n])
switch status {
case zx.ErrOk:
h.Stats.Rx.Writes(count).Increment()
if n := uint32(n); count != n {
return fmt.Errorf("FifoWrite(RX): rx_depth invariant violation; observed=%d expected=%d", h.RxDepth-n+count, h.RxDepth)
}
default:
return &zx.Error{Status: status, Text: "FifoWrite(RX)"}
}
}
for h.rx.HaveReadied() {
entry := h.rx.GetReadied()
process(entry)
h.rx.IncrementQueued(1)
}
for {
signals := zx.Signals(zx.SignalFIFOReadable | zx.SignalFIFOPeerClosed {{ if .Signal }} | signal {{ end }})
if int(h.rx.InFlight()) != len(scratch) && h.rx.HaveQueued() {
signals |= zx.SignalFIFOWritable
}
obs, err := zxwait.WaitContext(context.Background(), h.RxFifo, signals)
if err != nil {
return err
}
{{ if .Signal }}
if obs&signal != 0 {
onSignal()
}
{{ end }}
if obs&zx.SignalFIFOPeerClosed != 0 {
return fmt.Errorf("FifoRead(RX): peer closed")
}
if obs&zx.SignalFIFOReadable != 0 {
switch status, count := FifoRead(h.RxFifo, scratch); status {
case zx.ErrOk:
h.Stats.Rx.Reads(count).Increment()
n := h.rx.AddReadied(scratch[:count])
h.rx.IncrementReadied(uint16(n))
if n := uint32(n); count != n {
return fmt.Errorf("FifoRead(RX): rx_depth invariant violation; observed=%d expected=%d", h.RxDepth-n+count, h.RxDepth)
}
default:
return &zx.Error{Status: status, Text: "FifoRead(RX)"}
}
break
}
if obs&zx.SignalFIFOWritable != 0 {
break
}
}
}
}
func (h *{{ .Handler }}) ProcessWrite(pkts stack.PacketBufferList, processor func(*{{ .EntryType }}, *stack.PacketBuffer)) (int, tcpip.Error) {
i := 0
for pkt := pkts.Front(); pkt != nil; {
h.tx.mu.Lock()
for {
if h.tx.mu.detached {
h.tx.mu.Unlock()
return i, &tcpip.ErrClosedForSend{}
}
if h.tx.mu.entries.HaveReadied() {
break
}
h.tx.mu.waiters++
h.tx.cond.Wait()
h.tx.mu.waiters--
}
// Queue as many remaining packets as possible; if we run out of space,
// we'll return to the waiting state in the outer loop.
for {
entry := h.tx.mu.entries.GetReadied()
processor(entry, pkt)
h.tx.mu.entries.IncrementQueued(1)
i++
pkt = pkt.Next()
if pkt == nil || !h.tx.mu.entries.HaveReadied() {
break
}
}
h.tx.mu.Unlock()
h.tx.cond.Broadcast()
}
return i, nil
}
func (h *{{ .Handler }}) DetachTx() {
h.tx.mu.Lock()
h.tx.mu.detached = true
h.tx.mu.Unlock()
h.tx.cond.Broadcast()
}
func (h *{{ .Handler }}) InitRx(capacity uint16) uint16 {
return h.rx.Init(capacity)
}
func (h *{{ .Handler }}) InitTx(capacity uint16) uint16 {
h.tx.cond.L = &h.tx.mu.Mutex
return h.tx.mu.entries.Init(capacity)
}
func (h *{{ .Handler }}) PushInitialRx(entry {{ .EntryType }}) {
h.rx.storage[h.rx.readied] = entry
h.rx.IncrementReadied(1)
h.rx.IncrementQueued(1)
}
func (h *{{ .Handler }}) PushInitialTx(entry {{ .EntryType }}) {
h.tx.mu.entries.storage[h.tx.mu.entries.readied] = entry
h.tx.mu.entries.IncrementReadied(1)
}
func FifoWrite(handle zx.Handle, b []{{ .EntryType }}) (zx.Status, uint32) {
var actual uint
var _x {{ .EntryType }}
data := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data)
status := zx.Sys_fifo_write(handle, uint(unsafe.Sizeof(_x)), data, uint(len(b)), &actual)
return status, uint32(actual)
}
func FifoRead(handle zx.Handle, b []{{ .EntryType }}) (zx.Status, uint32) {
var actual uint
var _x {{ .EntryType }}
data := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data)
status := zx.Sys_fifo_read(handle, uint(unsafe.Sizeof(_x)), data, uint(len(b)), &actual)
return status, uint32(actual)
}