latency: sleep in Write when BDP is exceeded to avoid buffer bloat (#1330)
diff --git a/benchmark/latency/latency.go b/benchmark/latency/latency.go
index fc70695..66a8a0d 100644
--- a/benchmark/latency/latency.go
+++ b/benchmark/latency/latency.go
@@ -105,6 +105,13 @@
} else {
p = nil
}
+ if c.network.Kbps > 0 {
+ if congestion := c.lastSendEnd.Sub(tNow) - c.delay; congestion > 0 {
+ // The network is full; sleep until this packet can be sent.
+ sleep(congestion)
+ tNow = tNow.Add(congestion)
+ }
+ }
c.lastSendEnd = c.lastSendEnd.Add(c.network.pktTime(len(pkt)))
hdr := header{ReadTime: c.lastSendEnd.Add(c.delay).UnixNano(), Sz: int32(len(pkt))}
if err := binary.Write(c.Conn, binary.BigEndian, hdr); err != nil {
diff --git a/benchmark/latency/latency_test.go b/benchmark/latency/latency_test.go
index e09d6a8..2298b13 100644
--- a/benchmark/latency/latency_test.go
+++ b/benchmark/latency/latency_test.go
@@ -20,6 +20,7 @@
import (
"bytes"
+ "fmt"
"net"
"reflect"
"sync"
@@ -65,7 +66,9 @@
sleepTimes = nil
}
- latency := 10 * time.Millisecond
+ // Use a fairly high latency to cause a large BDP and avoid sleeps while
+ // writing due to simulation of full buffers.
+ latency := 1 * time.Second
c, err := (&Network{Kbps: 1, Latency: latency, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
if err != nil {
t.Fatalf("Unexpected error creating connection: %v", err)
@@ -175,7 +178,9 @@
return tn
}
- n := &Network{Kbps: 2, Latency: 20 * time.Millisecond, MTU: 10}
+ // Use a fairly high latency to cause a large BDP and avoid sleeps while
+ // writing due to simulation of full buffers.
+ n := &Network{Kbps: 2, Latency: 1 * time.Second, MTU: 10}
// 2 kbps = .25 kBps = 256 Bps
byteLatency := func(n int) time.Duration {
return time.Duration(n) * time.Second / 256
@@ -286,3 +291,63 @@
sleep(10 * time.Second)
read(clientConn, len(pkt1), pkt1, tn)
}
+
+func TestBufferBloat(t *testing.T) {
+ defer restoreHooks()()
+
+ // Infinitely fast CPU: time doesn't pass unless sleep is called.
+ tn := time.Unix(123, 0)
+ now = func() time.Time { return tn }
+ // Capture sleep times for checking later.
+ var sleepTimes []time.Duration
+ sleep = func(d time.Duration) {
+ sleepTimes = append(sleepTimes, d)
+ tn = tn.Add(d)
+ }
+
+ wantSleeps := func(want ...time.Duration) error {
+ if !reflect.DeepEqual(want, sleepTimes) {
+ return fmt.Errorf("sleepTimes = %v; want %v", sleepTimes, want)
+ }
+ sleepTimes = nil
+ return nil
+ }
+
+ n := &Network{Kbps: 8 /* 1KBps */, Latency: time.Second, MTU: 8}
+ bdpBytes := (n.Kbps * 1024 / 8) * int(n.Latency/time.Second) // 1024
+ c, err := n.Conn(bufConn{&bytes.Buffer{}})
+ if err != nil {
+ t.Fatalf("Unexpected error creating connection: %v", err)
+ }
+ wantSleeps(n.Latency) // Connection creation delay.
+
+ write := func(n int, sleeps ...time.Duration) {
+ if wt, err := c.Write(make([]byte, n)); err != nil || wt != n {
+ t.Fatalf("c.Write(<%v bytes>) = %v, %v; want %v, nil", n, wt, err, n)
+ }
+ if err := wantSleeps(sleeps...); err != nil {
+ t.Fatalf("After writing %v bytes: %v", n, err)
+ }
+ }
+
+ read := func(n int, sleeps ...time.Duration) {
+ if rd, err := c.Read(make([]byte, n)); err != nil || rd != n {
+ t.Fatalf("c.Read(_) = %v, %v; want %v, nil", rd, err, n)
+ }
+ if err := wantSleeps(sleeps...); err != nil {
+ t.Fatalf("After reading %v bytes: %v", n, err)
+ }
+ }
+
+ write(8) // No reads and buffer not full, so no sleeps yet.
+ read(8, time.Second+n.pktTime(8))
+
+ write(bdpBytes) // Fill the buffer.
+ write(1) // We can send one extra packet even when the buffer is full.
+ write(n.MTU, n.pktTime(1)) // Make sure we sleep to clear the previous write.
+ write(1, n.pktTime(n.MTU))
+ write(n.MTU+1, n.pktTime(1), n.pktTime(n.MTU))
+
+ tn = tn.Add(10 * time.Second) // Wait long enough for the buffer to clear.
+ write(bdpBytes) // No sleeps required.
+}