blob: 4efdad71b908c35b0287e8630470d48b3b096492 [file] [log] [blame]
// Copyright 2018 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build linux
package sharedmem
import (
"bytes"
"io/ioutil"
"math/rand"
"os"
"strings"
"sync"
"syscall"
"testing"
"time"
"github.com/google/netstack/tcpip"
"github.com/google/netstack/tcpip/buffer"
"github.com/google/netstack/tcpip/header"
"github.com/google/netstack/tcpip/link/sharedmem/pipe"
"github.com/google/netstack/tcpip/link/sharedmem/queue"
"github.com/google/netstack/tcpip/stack"
)
const (
localLinkAddr = "\xde\xad\xbe\xef\x56\x78"
remoteLinkAddr = "\xde\xad\xbe\xef\x12\x34"
queueDataSize = 1024 * 1024
queuePipeSize = 4096
)
type queueBuffers struct {
data []byte
rx pipe.Tx
tx pipe.Rx
}
func initQueue(t *testing.T, q *queueBuffers, c *QueueConfig) {
// Prepare tx pipe.
b, err := getBuffer(c.TxPipeFD)
if err != nil {
t.Fatalf("getBuffer failed: %v", err)
}
q.tx.Init(b)
// Prepare rx pipe.
b, err = getBuffer(c.RxPipeFD)
if err != nil {
t.Fatalf("getBuffer failed: %v", err)
}
q.rx.Init(b)
// Get data slice.
q.data, err = getBuffer(c.DataFD)
if err != nil {
t.Fatalf("getBuffer failed: %v", err)
}
}
func (q *queueBuffers) cleanup() {
syscall.Munmap(q.tx.Bytes())
syscall.Munmap(q.rx.Bytes())
syscall.Munmap(q.data)
}
type packetInfo struct {
addr tcpip.LinkAddress
proto tcpip.NetworkProtocolNumber
vv buffer.VectorisedView
}
type testContext struct {
t *testing.T
ep *endpoint
txCfg QueueConfig
rxCfg QueueConfig
txq queueBuffers
rxq queueBuffers
packetCh chan struct{}
mu sync.Mutex
packets []packetInfo
}
func newTestContext(t *testing.T, mtu, bufferSize uint32, addr tcpip.LinkAddress) *testContext {
var err error
c := &testContext{
t: t,
packetCh: make(chan struct{}, 1000000),
}
c.txCfg = createQueueFDs(t, queueSizes{
dataSize: queueDataSize,
txPipeSize: queuePipeSize,
rxPipeSize: queuePipeSize,
sharedDataSize: 4096,
})
c.rxCfg = createQueueFDs(t, queueSizes{
dataSize: queueDataSize,
txPipeSize: queuePipeSize,
rxPipeSize: queuePipeSize,
sharedDataSize: 4096,
})
initQueue(t, &c.txq, &c.txCfg)
initQueue(t, &c.rxq, &c.rxCfg)
id, err := New(mtu, bufferSize, addr, c.txCfg, c.rxCfg)
if err != nil {
t.Fatalf("New failed: %v", err)
}
c.ep = stack.FindLinkEndpoint(id).(*endpoint)
c.ep.Attach(c)
return c
}
func (c *testContext) DeliverNetworkPacket(_ stack.LinkEndpoint, remoteLinkAddr, localLinkAddr tcpip.LinkAddress, proto tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) {
c.mu.Lock()
c.packets = append(c.packets, packetInfo{
addr: remoteLinkAddr,
proto: proto,
vv: vv.Clone(nil),
})
c.mu.Unlock()
c.packetCh <- struct{}{}
}
func (c *testContext) cleanup() {
c.ep.Close()
closeFDs(&c.txCfg)
closeFDs(&c.rxCfg)
c.txq.cleanup()
c.rxq.cleanup()
}
func (c *testContext) waitForPackets(n int, to <-chan time.Time, errorStr string) {
for i := 0; i < n; i++ {
select {
case <-c.packetCh:
case <-to:
c.t.Fatalf(errorStr)
}
}
}
func (c *testContext) pushRxCompletion(size uint32, bs []queue.RxBuffer) {
b := c.rxq.rx.Push(queue.RxCompletionSize(len(bs)))
queue.EncodeRxCompletion(b, size, 0)
for i := range bs {
queue.EncodeRxCompletionBuffer(b, i, queue.RxBuffer{
Offset: bs[i].Offset,
Size: bs[i].Size,
ID: bs[i].ID,
})
}
}
func randomFill(b []byte) {
for i := range b {
b[i] = byte(rand.Intn(256))
}
}
func shuffle(b []int) {
for i := len(b) - 1; i >= 0; i-- {
j := rand.Intn(i + 1)
b[i], b[j] = b[j], b[i]
}
}
func createFile(t *testing.T, size int64, initQueue bool) int {
tmpDir := os.Getenv("TEST_TMPDIR")
if tmpDir == "" {
tmpDir = os.Getenv("TMPDIR")
}
f, err := ioutil.TempFile(tmpDir, "sharedmem_test")
if err != nil {
t.Fatalf("TempFile failed: %v", err)
}
defer f.Close()
syscall.Unlink(f.Name())
if initQueue {
// Write the "slot-free" flag in the initial queue.
_, err := f.WriteAt([]byte{0, 0, 0, 0, 0, 0, 0, 0x80}, 0)
if err != nil {
t.Fatalf("WriteAt failed: %v", err)
}
}
fd, err := syscall.Dup(int(f.Fd()))
if err != nil {
t.Fatalf("Dup failed: %v", err)
}
if err := syscall.Ftruncate(fd, size); err != nil {
syscall.Close(fd)
t.Fatalf("Ftruncate failed: %v", err)
}
return fd
}
func closeFDs(c *QueueConfig) {
syscall.Close(c.DataFD)
syscall.Close(c.EventFD)
syscall.Close(c.TxPipeFD)
syscall.Close(c.RxPipeFD)
syscall.Close(c.SharedDataFD)
}
type queueSizes struct {
dataSize int64
txPipeSize int64
rxPipeSize int64
sharedDataSize int64
}
func createQueueFDs(t *testing.T, s queueSizes) QueueConfig {
fd, _, err := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, 0, 0)
if err != 0 {
t.Fatalf("eventfd failed: %v", error(err))
}
return QueueConfig{
EventFD: int(fd),
DataFD: createFile(t, s.dataSize, false),
TxPipeFD: createFile(t, s.txPipeSize, true),
RxPipeFD: createFile(t, s.rxPipeSize, true),
SharedDataFD: createFile(t, s.sharedDataSize, false),
}
}
// TestSimpleSend sends 1000 packets with random header and payload sizes,
// then checks that the right payload is received on the shared memory queues.
func TestSimpleSend(t *testing.T) {
c := newTestContext(t, 20000, 1500, localLinkAddr)
defer c.cleanup()
// Prepare route.
r := stack.Route{
RemoteLinkAddress: remoteLinkAddr,
}
for iters := 1000; iters > 0; iters-- {
func() {
// Prepare and send packet.
n := rand.Intn(10000)
hdr := buffer.NewPrependable(n + int(c.ep.MaxHeaderLength()))
hdrBuf := hdr.Prepend(n)
randomFill(hdrBuf)
n = rand.Intn(10000)
buf := buffer.NewView(n)
randomFill(buf)
proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000))
if err := c.ep.WritePacket(&r, hdr, buf.ToVectorisedView(), proto); err != nil {
t.Fatalf("WritePacket failed: %v", err)
}
// Receive packet.
desc := c.txq.tx.Pull()
pi := queue.DecodeTxPacketHeader(desc)
if pi.Reserved != 0 {
t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved)
}
contents := make([]byte, 0, pi.Size)
for i := 0; i < pi.BufferCount; i++ {
bi := queue.DecodeTxBufferHeader(desc, i)
contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...)
}
c.txq.tx.Flush()
defer func() {
// Tell the endpoint about the completion of the write.
b := c.txq.rx.Push(8)
queue.EncodeTxCompletion(b, pi.ID)
c.txq.rx.Flush()
}()
// Check the ethernet header.
ethTemplate := make(header.Ethernet, header.EthernetMinimumSize)
ethTemplate.Encode(&header.EthernetFields{
SrcAddr: localLinkAddr,
DstAddr: remoteLinkAddr,
Type: proto,
})
if got := contents[:header.EthernetMinimumSize]; !bytes.Equal(got, []byte(ethTemplate)) {
t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate)
}
// Compare contents skipping the ethernet header added by the
// endpoint.
merged := append(hdrBuf, buf...)
if uint32(len(contents)) < pi.Size {
t.Fatalf("Sum of buffers is less than packet size: %v < %v", len(contents), pi.Size)
}
contents = contents[:pi.Size][header.EthernetMinimumSize:]
if !bytes.Equal(contents, merged) {
t.Fatalf("Buffers are different: got %x (%v bytes), want %x (%v bytes)", contents, len(contents), merged, len(merged))
}
}()
}
}
// TestPreserveSrcAddressInSend calls WritePacket once with LocalLinkAddress
// set in Route (using much of the same code as TestSimpleSend), then checks
// that the encoded ethernet header received includes the correct SrcAddr.
func TestPreserveSrcAddressInSend(t *testing.T) {
c := newTestContext(t, 20000, 1500, localLinkAddr)
defer c.cleanup()
newLocalLinkAddress := tcpip.LinkAddress(strings.Repeat("0xFE", 6))
// Set both remote and local link address in route.
r := stack.Route{
RemoteLinkAddress: remoteLinkAddr,
LocalLinkAddress: newLocalLinkAddress,
}
// WritePacket panics given a prependable with anything less than
// the minimum size of the ethernet header.
hdr := buffer.NewPrependable(header.EthernetMinimumSize)
proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000))
if err := c.ep.WritePacket(&r, hdr, buffer.VectorisedView{}, proto); err != nil {
t.Fatalf("WritePacket failed: %v", err)
}
// Receive packet.
desc := c.txq.tx.Pull()
pi := queue.DecodeTxPacketHeader(desc)
if pi.Reserved != 0 {
t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved)
}
contents := make([]byte, 0, pi.Size)
for i := 0; i < pi.BufferCount; i++ {
bi := queue.DecodeTxBufferHeader(desc, i)
contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...)
}
c.txq.tx.Flush()
defer func() {
// Tell the endpoint about the completion of the write.
b := c.txq.rx.Push(8)
queue.EncodeTxCompletion(b, pi.ID)
c.txq.rx.Flush()
}()
// Check that the ethernet header contains the expected SrcAddr.
ethTemplate := make(header.Ethernet, header.EthernetMinimumSize)
ethTemplate.Encode(&header.EthernetFields{
SrcAddr: newLocalLinkAddress,
DstAddr: remoteLinkAddr,
Type: proto,
})
if got := contents[:header.EthernetMinimumSize]; !bytes.Equal(got, []byte(ethTemplate)) {
t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate)
}
}
// TestFillTxQueue sends packets until the queue is full.
func TestFillTxQueue(t *testing.T) {
c := newTestContext(t, 20000, 1500, localLinkAddr)
defer c.cleanup()
// Prepare to send a packet.
r := stack.Route{
RemoteLinkAddress: remoteLinkAddr,
}
buf := buffer.NewView(100)
// Each packet is uses no more than 40 bytes, so write that many packets
// until the tx queue if full.
ids := make(map[uint64]struct{})
for i := queuePipeSize / 40; i > 0; i-- {
hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
if err := c.ep.WritePacket(&r, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
t.Fatalf("WritePacket failed unexpectedly: %v", err)
}
// Check that they have different IDs.
desc := c.txq.tx.Pull()
pi := queue.DecodeTxPacketHeader(desc)
if _, ok := ids[pi.ID]; ok {
t.Fatalf("ID (%v) reused", pi.ID)
}
ids[pi.ID] = struct{}{}
}
// Next attempt to write must fail.
hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != want {
t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
}
}
// TestFillTxQueueAfterBadCompletion sends a bad completion, then sends packets
// until the queue is full.
func TestFillTxQueueAfterBadCompletion(t *testing.T) {
c := newTestContext(t, 20000, 1500, localLinkAddr)
defer c.cleanup()
// Send a bad completion.
queue.EncodeTxCompletion(c.txq.rx.Push(8), 1)
c.txq.rx.Flush()
// Prepare to send a packet.
r := stack.Route{
RemoteLinkAddress: remoteLinkAddr,
}
buf := buffer.NewView(100)
// Send two packets so that the id slice has at least two slots.
for i := 2; i > 0; i-- {
hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
if err := c.ep.WritePacket(&r, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
t.Fatalf("WritePacket failed unexpectedly: %v", err)
}
}
// Complete the two writes twice.
for i := 2; i > 0; i-- {
pi := queue.DecodeTxPacketHeader(c.txq.tx.Pull())
queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID)
queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID)
c.txq.rx.Flush()
}
c.txq.tx.Flush()
// Each packet is uses no more than 40 bytes, so write that many packets
// until the tx queue if full.
ids := make(map[uint64]struct{})
for i := queuePipeSize / 40; i > 0; i-- {
hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
if err := c.ep.WritePacket(&r, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
t.Fatalf("WritePacket failed unexpectedly: %v", err)
}
// Check that they have different IDs.
desc := c.txq.tx.Pull()
pi := queue.DecodeTxPacketHeader(desc)
if _, ok := ids[pi.ID]; ok {
t.Fatalf("ID (%v) reused", pi.ID)
}
ids[pi.ID] = struct{}{}
}
// Next attempt to write must fail.
hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != want {
t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
}
}
// TestFillTxMemory sends packets until the we run out of shared memory.
func TestFillTxMemory(t *testing.T) {
const bufferSize = 1500
c := newTestContext(t, 20000, bufferSize, localLinkAddr)
defer c.cleanup()
// Prepare to send a packet.
r := stack.Route{
RemoteLinkAddress: remoteLinkAddr,
}
buf := buffer.NewView(100)
// Each packet is uses up one buffer, so write as many as possible until
// we fill the memory.
ids := make(map[uint64]struct{})
for i := queueDataSize / bufferSize; i > 0; i-- {
hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
if err := c.ep.WritePacket(&r, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
t.Fatalf("WritePacket failed unexpectedly: %v", err)
}
// Check that they have different IDs.
desc := c.txq.tx.Pull()
pi := queue.DecodeTxPacketHeader(desc)
if _, ok := ids[pi.ID]; ok {
t.Fatalf("ID (%v) reused", pi.ID)
}
ids[pi.ID] = struct{}{}
c.txq.tx.Flush()
}
// Next attempt to write must fail.
hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
err := c.ep.WritePacket(&r, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber)
if want := tcpip.ErrWouldBlock; err != want {
t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
}
}
// TestFillTxMemoryWithMultiBuffer sends packets until the we run out of
// shared memory for a 2-buffer packet, but still with room for a 1-buffer
// packet.
func TestFillTxMemoryWithMultiBuffer(t *testing.T) {
const bufferSize = 1500
c := newTestContext(t, 20000, bufferSize, localLinkAddr)
defer c.cleanup()
// Prepare to send a packet.
r := stack.Route{
RemoteLinkAddress: remoteLinkAddr,
}
buf := buffer.NewView(100)
// Each packet is uses up one buffer, so write as many as possible
// until there is only one buffer left.
for i := queueDataSize/bufferSize - 1; i > 0; i-- {
hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
if err := c.ep.WritePacket(&r, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
t.Fatalf("WritePacket failed unexpectedly: %v", err)
}
// Pull the posted buffer.
c.txq.tx.Pull()
c.txq.tx.Flush()
}
// Attempt to write a two-buffer packet. It must fail.
{
hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
uu := buffer.NewView(bufferSize).ToVectorisedView()
if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, hdr, uu, header.IPv4ProtocolNumber); err != want {
t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
}
}
// Attempt to write the one-buffer packet again. It must succeed.
{
hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
if err := c.ep.WritePacket(&r, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
t.Fatalf("WritePacket failed unexpectedly: %v", err)
}
}
}
func pollPull(t *testing.T, p *pipe.Rx, to <-chan time.Time, errStr string) []byte {
t.Helper()
for {
b := p.Pull()
if b != nil {
return b
}
select {
case <-time.After(10 * time.Millisecond):
case <-to:
t.Fatal(errStr)
}
}
}
// TestSimpleReceive completes 1000 different receives with random payload and
// random number of buffers. It checks that the contents match the expected
// values.
func TestSimpleReceive(t *testing.T) {
const bufferSize = 1500
c := newTestContext(t, 20000, bufferSize, localLinkAddr)
defer c.cleanup()
// Check that buffers have been posted.
limit := c.ep.rx.q.PostedBuffersLimit()
for i := uint64(0); i < limit; i++ {
timeout := time.After(2 * time.Second)
bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers to be posted"))
if want := i * bufferSize; want != bi.Offset {
t.Fatalf("Bad posted offset: got %v, want %v", bi.Offset, want)
}
if want := i; want != bi.ID {
t.Fatalf("Bad posted ID: got %v, want %v", bi.ID, want)
}
if bufferSize != bi.Size {
t.Fatalf("Bad posted bufferSize: got %v, want %v", bi.Size, bufferSize)
}
}
c.rxq.tx.Flush()
// Create a slice with the indices 0..limit-1.
idx := make([]int, limit)
for i := range idx {
idx[i] = i
}
// Complete random packets 1000 times.
for iters := 1000; iters > 0; iters-- {
timeout := time.After(2 * time.Second)
// Prepare a random packet.
shuffle(idx)
n := 1 + rand.Intn(10)
bufs := make([]queue.RxBuffer, n)
contents := make([]byte, bufferSize*n-rand.Intn(500))
randomFill(contents)
for i := range bufs {
j := idx[i]
bufs[i].Size = bufferSize
bufs[i].Offset = uint64(bufferSize * j)
bufs[i].ID = uint64(j)
copy(c.rxq.data[bufs[i].Offset:][:bufferSize], contents[i*bufferSize:])
}
// Push completion.
c.pushRxCompletion(uint32(len(contents)), bufs)
c.rxq.rx.Flush()
syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
// Wait for packet to be received, then check it.
c.waitForPackets(1, time.After(time.Second), "Error waiting for packet")
c.mu.Lock()
rcvd := []byte(c.packets[0].vv.First())
c.packets = c.packets[:0]
c.mu.Unlock()
if contents := contents[header.EthernetMinimumSize:]; !bytes.Equal(contents, rcvd) {
t.Fatalf("Unexpected buffer contents: got %x, want %x", rcvd, contents)
}
// Check that buffers have been reposted.
for i := range bufs {
bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffers to be reposted"))
if bi != bufs[i] {
t.Fatalf("Unexpected buffer reposted: got %x, want %x", bi, bufs[i])
}
}
c.rxq.tx.Flush()
}
}
// TestRxBuffersReposted tests that rx buffers get reposted after they have been
// completed.
func TestRxBuffersReposted(t *testing.T) {
const bufferSize = 1500
c := newTestContext(t, 20000, bufferSize, localLinkAddr)
defer c.cleanup()
// Receive all posted buffers.
limit := c.ep.rx.q.PostedBuffersLimit()
buffers := make([]queue.RxBuffer, 0, limit)
for i := limit; i > 0; i-- {
timeout := time.After(2 * time.Second)
buffers = append(buffers, queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers")))
}
c.rxq.tx.Flush()
// Check that all buffers are reposted when individually completed.
for i := range buffers {
timeout := time.After(2 * time.Second)
// Complete the buffer.
c.pushRxCompletion(buffers[i].Size, buffers[i:][:1])
c.rxq.rx.Flush()
syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
// Wait for it to be reposted.
bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
if bi != buffers[i] {
t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[i])
}
}
c.rxq.tx.Flush()
// Check that all buffers are reposted when completed in pairs.
for i := 0; i < len(buffers)/2; i++ {
timeout := time.After(2 * time.Second)
// Complete with two buffers.
c.pushRxCompletion(2*bufferSize, buffers[2*i:][:2])
c.rxq.rx.Flush()
syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
// Wait for them to be reposted.
for j := 0; j < 2; j++ {
bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
if bi != buffers[2*i+j] {
t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[2*i+j])
}
}
}
c.rxq.tx.Flush()
}
// TestReceivePostingIsFull checks that the endpoint will properly handle the
// case when a received buffer cannot be immediately reposted because it hasn't
// been pulled from the tx pipe yet.
func TestReceivePostingIsFull(t *testing.T) {
const bufferSize = 1500
c := newTestContext(t, 20000, bufferSize, localLinkAddr)
defer c.cleanup()
// Complete first posted buffer before flushing it from the tx pipe.
first := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for first buffer to be posted"))
c.pushRxCompletion(first.Size, []queue.RxBuffer{first})
c.rxq.rx.Flush()
syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
// Check that packet is received.
c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
// Complete another buffer.
second := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for second buffer to be posted"))
c.pushRxCompletion(second.Size, []queue.RxBuffer{second})
c.rxq.rx.Flush()
syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
// Check that no packet is received yet, as the worker is blocked trying
// to repost.
select {
case <-time.After(500 * time.Millisecond):
case <-c.packetCh:
t.Fatalf("Unexpected packet received")
}
// Flush tx queue, which will allow the first buffer to be reposted,
// and the second completion to be pulled.
c.rxq.tx.Flush()
syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
// Check that second packet completes.
c.waitForPackets(1, time.After(time.Second), "Timeout waiting for second completed packet")
}
// TestCloseWhileWaitingToPost closes the endpoint while it is waiting to
// repost a buffer. Make sure it backs out.
func TestCloseWhileWaitingToPost(t *testing.T) {
const bufferSize = 1500
c := newTestContext(t, 20000, bufferSize, localLinkAddr)
cleaned := false
defer func() {
if !cleaned {
c.cleanup()
}
}()
// Complete first posted buffer before flushing it from the tx pipe.
bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for initial buffer to be posted"))
c.pushRxCompletion(bi.Size, []queue.RxBuffer{bi})
c.rxq.rx.Flush()
syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
// Wait for packet to be indicated.
c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
// Cleanup and wait for worker to complete.
c.cleanup()
cleaned = true
c.ep.Wait()
}