| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| // Code generated by go generate; DO NOT EDIT. |
| |
| package {{ .Pkg }} |
| |
| import ( |
| "context" |
| "fmt" |
| "sync" |
| "syscall/zx" |
| "syscall/zx/zxwait" |
| "reflect" |
| "unsafe" |
| |
| "go.fuchsia.dev/fuchsia/src/connectivity/network/netstack/link/fifo" |
| |
| "gvisor.dev/gvisor/pkg/tcpip" |
| "gvisor.dev/gvisor/pkg/tcpip/stack" |
| |
| {{ range .Imports }} |
| "{{ . }}"{{ end }} |
| ) |
| |
| type {{ .Handler }} struct { |
| TxDepth, RxDepth uint32 |
| RxFifo, TxFifo zx.Handle |
| rx {{ .Entries }} |
| tx struct { |
| mu struct { |
| sync.Mutex |
| waiters int |
| |
| entries {{ .Entries }} |
| |
| // detached signals to incoming writes that the receiver is unable |
| // to service them. |
| detached bool |
| } |
| cond sync.Cond |
| } |
| Stats struct { |
| Rx fifo.RxStats |
| Tx fifo.TxStats |
| } |
| } |
| |
| |
| func (h *{{ .Handler }}) TxReceiverLoop(wasSent func(entry *{{ .EntryType }}) bool) error { |
| scratch := make([]{{ .EntryType }}, h.TxDepth) |
| for { |
| h.tx.mu.Lock() |
| detached := h.tx.mu.detached |
| h.tx.mu.Unlock() |
| if detached { |
| return nil |
| } |
| |
| if err := zxwait.WithRetryContext(context.Background(), func() error { |
| status, count := FifoRead(h.TxFifo, scratch) |
| if status != zx.ErrOk { |
| return &zx.Error{Status: status, Text: "FifoRead(TX)"} |
| } |
| var notSent uint64 |
| for i := range scratch[:count] { |
| if !wasSent(&scratch[i]) { |
| notSent++ |
| } |
| } |
| h.Stats.Tx.Drops.IncrementBy(notSent) |
| h.Stats.Tx.Reads(count).Increment() |
| h.tx.mu.Lock() |
| n := h.tx.mu.entries.AddReadied(scratch[:count]) |
| h.tx.mu.entries.IncrementReadied(uint16(n)) |
| h.tx.mu.Unlock() |
| h.tx.cond.Broadcast() |
| |
| if n := uint32(n); count != n { |
| return fmt.Errorf("FifoRead(TX): tx_depth invariant violation; observed=%d expected=%d", h.TxDepth-n+count, h.TxDepth) |
| } |
| return nil |
| }, h.TxFifo, zx.SignalFIFOReadable, zx.SignalFIFOPeerClosed); err != nil { |
| return err |
| } |
| } |
| } |
| |
| func (h *{{ .Handler }}) TxSenderLoop() error { |
| scratch := make([]{{ .EntryType }}, h.TxDepth) |
| for { |
| var batch []{{ .EntryType }} |
| h.tx.mu.Lock() |
| for { |
| if h.tx.mu.detached { |
| h.tx.mu.Unlock() |
| return nil |
| } |
| if batchSize := len(scratch) - int(h.tx.mu.entries.InFlight()); batchSize != 0 { |
| if h.tx.mu.entries.HaveQueued() { |
| if h.tx.mu.waiters == 0 || !h.tx.mu.entries.HaveReadied() { |
| // No application threads are waiting OR application threads are waiting on the |
| // reader. |
| // |
| // This condition is an optimization; if application threads are waiting when |
| // buffers are available then we were probably just woken up by the reader having |
| // retrieved buffers from the fifo. We avoid creating a batch until the application |
| // threads have all been satisfied, or until the buffers have all been used up. |
| batch = scratch[:batchSize] |
| break |
| } |
| } |
| } |
| h.tx.cond.Wait() |
| } |
| n := h.tx.mu.entries.GetQueued(batch) |
| h.tx.mu.entries.IncrementSent(uint16(n)) |
| h.tx.mu.Unlock() |
| |
| switch status, count := FifoWrite(h.TxFifo, batch[:n]); status { |
| case zx.ErrOk: |
| if n := uint32(n); count != n { |
| return fmt.Errorf("FifoWrite(TX): tx_depth invariant violation; observed=%d expected=%d", h.TxDepth-n+count, h.TxDepth) |
| } |
| h.Stats.Tx.Writes(count).Increment() |
| default: |
| return &zx.Error{Status: status, Text: "FifoWrite(TX)"} |
| } |
| } |
| } |
| |
| func (h *{{ .Handler }}) RxLoop(process func(entry *{{ .EntryType }}) {{ if .Signal }} , signal zx.Signals, onSignal func() {{ end }} ) error { |
| scratch := make([]{{ .EntryType }}, h.RxDepth) |
| for { |
| if batchSize := len(scratch) - int(h.rx.InFlight()); batchSize != 0 && h.rx.HaveQueued() { |
| n := h.rx.GetQueued(scratch[:batchSize]) |
| h.rx.IncrementSent(uint16(n)) |
| |
| status, count := FifoWrite(h.RxFifo, scratch[:n]) |
| switch status { |
| case zx.ErrOk: |
| h.Stats.Rx.Writes(count).Increment() |
| if n := uint32(n); count != n { |
| return fmt.Errorf("FifoWrite(RX): rx_depth invariant violation; observed=%d expected=%d", h.RxDepth-n+count, h.RxDepth) |
| } |
| default: |
| return &zx.Error{Status: status, Text: "FifoWrite(RX)"} |
| } |
| } |
| |
| for h.rx.HaveReadied() { |
| entry := h.rx.GetReadied() |
| process(entry) |
| h.rx.IncrementQueued(1) |
| } |
| |
| for { |
| signals := zx.Signals(zx.SignalFIFOReadable | zx.SignalFIFOPeerClosed {{ if .Signal }} | signal {{ end }}) |
| if int(h.rx.InFlight()) != len(scratch) && h.rx.HaveQueued() { |
| signals |= zx.SignalFIFOWritable |
| } |
| obs, err := zxwait.WaitContext(context.Background(), h.RxFifo, signals) |
| if err != nil { |
| return err |
| } |
| {{ if .Signal }} |
| if obs&signal != 0 { |
| onSignal() |
| } |
| {{ end }} |
| if obs&zx.SignalFIFOPeerClosed != 0 { |
| return fmt.Errorf("FifoRead(RX): peer closed") |
| } |
| if obs&zx.SignalFIFOReadable != 0 { |
| switch status, count := FifoRead(h.RxFifo, scratch); status { |
| case zx.ErrOk: |
| h.Stats.Rx.Reads(count).Increment() |
| n := h.rx.AddReadied(scratch[:count]) |
| h.rx.IncrementReadied(uint16(n)) |
| |
| if n := uint32(n); count != n { |
| return fmt.Errorf("FifoRead(RX): rx_depth invariant violation; observed=%d expected=%d", h.RxDepth-n+count, h.RxDepth) |
| } |
| default: |
| return &zx.Error{Status: status, Text: "FifoRead(RX)"} |
| } |
| break |
| } |
| if obs&zx.SignalFIFOWritable != 0 { |
| break |
| } |
| } |
| } |
| } |
| |
| func (h *{{ .Handler }}) ProcessWrite(pkts stack.PacketBufferList, processor func(*{{ .EntryType }}, *stack.PacketBuffer)) (int, tcpip.Error) { |
| i := 0 |
| |
| for pkt := pkts.Front(); pkt != nil; { |
| h.tx.mu.Lock() |
| for { |
| if h.tx.mu.detached { |
| h.tx.mu.Unlock() |
| return i, &tcpip.ErrClosedForSend{} |
| } |
| |
| if h.tx.mu.entries.HaveReadied() { |
| break |
| } |
| |
| h.tx.mu.waiters++ |
| h.tx.cond.Wait() |
| h.tx.mu.waiters-- |
| } |
| |
| // Queue as many remaining packets as possible; if we run out of space, |
| // we'll return to the waiting state in the outer loop. |
| for { |
| entry := h.tx.mu.entries.GetReadied() |
| processor(entry, pkt) |
| h.tx.mu.entries.IncrementQueued(1) |
| |
| i++ |
| pkt = pkt.Next() |
| if pkt == nil || !h.tx.mu.entries.HaveReadied() { |
| break |
| } |
| } |
| h.tx.mu.Unlock() |
| h.tx.cond.Broadcast() |
| } |
| |
| return i, nil |
| } |
| |
| func (h *{{ .Handler }}) DetachTx() { |
| h.tx.mu.Lock() |
| h.tx.mu.detached = true |
| h.tx.mu.Unlock() |
| h.tx.cond.Broadcast() |
| } |
| |
| func (h *{{ .Handler }}) InitRx(capacity uint16) uint16 { |
| return h.rx.Init(capacity) |
| } |
| |
| func (h *{{ .Handler }}) InitTx(capacity uint16) uint16 { |
| h.tx.cond.L = &h.tx.mu.Mutex |
| return h.tx.mu.entries.Init(capacity) |
| } |
| |
| func (h *{{ .Handler }}) PushInitialRx(entry {{ .EntryType }}) { |
| h.rx.storage[h.rx.readied] = entry |
| h.rx.IncrementReadied(1) |
| h.rx.IncrementQueued(1) |
| } |
| |
| func (h *{{ .Handler }}) PushInitialTx(entry {{ .EntryType }}) { |
| h.tx.mu.entries.storage[h.tx.mu.entries.readied] = entry |
| h.tx.mu.entries.IncrementReadied(1) |
| } |
| |
| func FifoWrite(handle zx.Handle, b []{{ .EntryType }}) (zx.Status, uint32) { |
| var actual uint |
| var _x {{ .EntryType }} |
| data := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data) |
| status := zx.Sys_fifo_write(handle, uint(unsafe.Sizeof(_x)), data, uint(len(b)), &actual) |
| return status, uint32(actual) |
| } |
| |
| func FifoRead(handle zx.Handle, b []{{ .EntryType }}) (zx.Status, uint32) { |
| var actual uint |
| var _x {{ .EntryType }} |
| data := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data) |
| status := zx.Sys_fifo_read(handle, uint(unsafe.Sizeof(_x)), data, uint(len(b)), &actual) |
| return status, uint32(actual) |
| } |