latency: sleep in Write when BDP is exceeded to avoid buffer bloat (#1330)

diff --git a/benchmark/latency/latency.go b/benchmark/latency/latency.go
index fc70695..66a8a0d 100644
--- a/benchmark/latency/latency.go
+++ b/benchmark/latency/latency.go
@@ -105,6 +105,13 @@
 		} else {
 			p = nil
 		}
+		if c.network.Kbps > 0 {
+			if congestion := c.lastSendEnd.Sub(tNow) - c.delay; congestion > 0 {
+				// The network is full; sleep until this packet can be sent.
+				sleep(congestion)
+				tNow = tNow.Add(congestion)
+			}
+		}
 		c.lastSendEnd = c.lastSendEnd.Add(c.network.pktTime(len(pkt)))
 		hdr := header{ReadTime: c.lastSendEnd.Add(c.delay).UnixNano(), Sz: int32(len(pkt))}
 		if err := binary.Write(c.Conn, binary.BigEndian, hdr); err != nil {
diff --git a/benchmark/latency/latency_test.go b/benchmark/latency/latency_test.go
index e09d6a8..2298b13 100644
--- a/benchmark/latency/latency_test.go
+++ b/benchmark/latency/latency_test.go
@@ -20,6 +20,7 @@
 
 import (
 	"bytes"
+	"fmt"
 	"net"
 	"reflect"
 	"sync"
@@ -65,7 +66,9 @@
 		sleepTimes = nil
 	}
 
-	latency := 10 * time.Millisecond
+	// Use a fairly high latency to cause a large BDP and avoid sleeps while
+	// writing due to simulation of full buffers.
+	latency := 1 * time.Second
 	c, err := (&Network{Kbps: 1, Latency: latency, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
 	if err != nil {
 		t.Fatalf("Unexpected error creating connection: %v", err)
@@ -175,7 +178,9 @@
 		return tn
 	}
 
-	n := &Network{Kbps: 2, Latency: 20 * time.Millisecond, MTU: 10}
+	// Use a fairly high latency to cause a large BDP and avoid sleeps while
+	// writing due to simulation of full buffers.
+	n := &Network{Kbps: 2, Latency: 1 * time.Second, MTU: 10}
 	// 2 kbps = .25 kBps = 256 Bps
 	byteLatency := func(n int) time.Duration {
 		return time.Duration(n) * time.Second / 256
@@ -286,3 +291,63 @@
 	sleep(10 * time.Second)
 	read(clientConn, len(pkt1), pkt1, tn)
 }
+
+func TestBufferBloat(t *testing.T) {
+	defer restoreHooks()()
+
+	// Infinitely fast CPU: time doesn't pass unless sleep is called.
+	tn := time.Unix(123, 0)
+	now = func() time.Time { return tn }
+	// Capture sleep times for checking later.
+	var sleepTimes []time.Duration
+	sleep = func(d time.Duration) {
+		sleepTimes = append(sleepTimes, d)
+		tn = tn.Add(d)
+	}
+
+	wantSleeps := func(want ...time.Duration) error {
+		if !reflect.DeepEqual(want, sleepTimes) {
+			return fmt.Errorf("sleepTimes = %v; want %v", sleepTimes, want)
+		}
+		sleepTimes = nil
+		return nil
+	}
+
+	n := &Network{Kbps: 8 /* 1KBps */, Latency: time.Second, MTU: 8}
+	bdpBytes := (n.Kbps * 1024 / 8) * int(n.Latency/time.Second) // 1024
+	c, err := n.Conn(bufConn{&bytes.Buffer{}})
+	if err != nil {
+		t.Fatalf("Unexpected error creating connection: %v", err)
+	}
+	wantSleeps(n.Latency) // Connection creation delay.
+
+	write := func(n int, sleeps ...time.Duration) {
+		if wt, err := c.Write(make([]byte, n)); err != nil || wt != n {
+			t.Fatalf("c.Write(<%v bytes>) = %v, %v; want %v, nil", n, wt, err, n)
+		}
+		if err := wantSleeps(sleeps...); err != nil {
+			t.Fatalf("After writing %v bytes: %v", n, err)
+		}
+	}
+
+	read := func(n int, sleeps ...time.Duration) {
+		if rd, err := c.Read(make([]byte, n)); err != nil || rd != n {
+			t.Fatalf("c.Read(_) = %v, %v; want %v, nil", rd, err, n)
+		}
+		if err := wantSleeps(sleeps...); err != nil {
+			t.Fatalf("After reading %v bytes: %v", n, err)
+		}
+	}
+
+	write(8) // No reads and buffer not full, so no sleeps yet.
+	read(8, time.Second+n.pktTime(8))
+
+	write(bdpBytes)            // Fill the buffer.
+	write(1)                   // We can send one extra packet even when the buffer is full.
+	write(n.MTU, n.pktTime(1)) // Make sure we sleep to clear the previous write.
+	write(1, n.pktTime(n.MTU))
+	write(n.MTU+1, n.pktTime(1), n.pktTime(n.MTU))
+
+	tn = tn.Add(10 * time.Second) // Wait long enough for the buffer to clear.
+	write(bdpBytes)               // No sleeps required.
+}