transport: Set buffer pool in tests (#8688)
This PR correctly sets the buffer pool for test clients and servers not
created through the public gRPC API. This allows non-test code to assume
the buffer pool is always present.
RELEASE NOTES: N/A
diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go
index 6209eb2..5bbb641 100644
--- a/internal/transport/http_util.go
+++ b/internal/transport/http_util.go
@@ -411,12 +411,6 @@
var writeBufferMutex sync.Mutex
func newFramer(conn io.ReadWriter, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32, memPool mem.BufferPool) *framer {
- if memPool == nil {
- // Note that this is only supposed to be nil in tests. Otherwise, stream
- // is always initialized with a BufferPool.
- memPool = mem.DefaultBufferPool()
- }
-
if writeBufferSize < 0 {
writeBufferSize = 0
}
diff --git a/internal/transport/keepalive_test.go b/internal/transport/keepalive_test.go
index 962db8d..51ba4d7 100644
--- a/internal/transport/keepalive_test.go
+++ b/internal/transport/keepalive_test.go
@@ -52,11 +52,15 @@
// of MaxConnectionIdle time.
func (s) TestMaxConnectionIdle(t *testing.T) {
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ServerParameters{
MaxConnectionIdle: 30 * time.Millisecond,
},
}
- server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
+ }
+ server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, copts)
defer func() {
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
@@ -95,7 +99,10 @@
MaxConnectionIdle: 100 * time.Millisecond,
},
}
- server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
+ }
+ server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, copts)
defer func() {
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
@@ -125,12 +132,16 @@
func (s) TestMaxConnectionAge(t *testing.T) {
maxConnAge := 100 * time.Millisecond
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ServerParameters{
MaxConnectionAge: maxConnAge,
MaxConnectionAgeGrace: 10 * time.Millisecond,
},
}
- server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
+ }
+ server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, copts)
defer func() {
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
@@ -171,12 +182,16 @@
// clientPreface and the initial Settings frame, and then remains unresponsive.
func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) {
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ServerParameters{
Time: 100 * time.Millisecond,
Timeout: 10 * time.Millisecond,
},
}
- server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
+ }
+ server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, copts)
defer func() {
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
@@ -231,12 +246,16 @@
// the connection with a client that responds to keepalive pings.
func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ServerParameters{
Time: 100 * time.Millisecond,
Timeout: 100 * time.Millisecond,
},
}
- server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
+ }
+ server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, copts)
defer func() {
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
@@ -269,6 +288,7 @@
func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
ChannelzParent: channelzSubChannel(t),
KeepaliveParams: keepalive.ClientParameters{
Time: 10 * time.Millisecond,
@@ -299,6 +319,7 @@
func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
ChannelzParent: channelzSubChannel(t),
KeepaliveParams: keepalive.ClientParameters{
Time: 10 * time.Millisecond,
@@ -329,6 +350,7 @@
func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
connCh := make(chan net.Conn, 1)
copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
ChannelzParent: channelzSubChannel(t),
KeepaliveParams: keepalive.ClientParameters{
Time: 500 * time.Millisecond,
@@ -365,6 +387,7 @@
func (s) TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
server, client, cancel := setUpWithOptions(t, 0,
&ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 50 * time.Millisecond,
PermitWithoutStream: true,
@@ -372,6 +395,7 @@
},
normal,
ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ClientParameters{
Time: 55 * time.Millisecond,
Timeout: time.Second,
@@ -402,12 +426,14 @@
grpctest.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 100 * time.Millisecond,
PermitWithoutStream: true,
},
}
clientOptions := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ClientParameters{
Time: 50 * time.Millisecond,
Timeout: time.Second,
@@ -434,11 +460,13 @@
grpctest.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: time.Second,
},
}
clientOptions := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ClientParameters{
Time: 20 * time.Millisecond,
Timeout: 100 * time.Millisecond,
@@ -465,11 +493,13 @@
grpctest.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: time.Second,
},
}
clientOptions := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ClientParameters{
Time: 50 * time.Millisecond,
Timeout: 100 * time.Millisecond,
@@ -499,12 +529,14 @@
// EnforcementPolicy.
func (s) TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 40 * time.Millisecond,
PermitWithoutStream: true,
},
}
clientOptions := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ClientParameters{
Time: 50 * time.Millisecond,
Timeout: time.Second,
@@ -533,11 +565,13 @@
// EnforcementPolicy.
func (s) TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 40 * time.Millisecond,
},
}
clientOptions := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ClientParameters{
Time: 50 * time.Millisecond,
},
@@ -569,11 +603,13 @@
// side enters a dormant state.
func (s) TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient(t *testing.T) {
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 100 * time.Millisecond,
},
}
clientOptions := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ClientParameters{
Time: 10 * time.Millisecond,
Timeout: 10 * time.Millisecond,
@@ -649,6 +685,7 @@
}
for _, tt := range tests {
sopts := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ServerParameters{
Time: tt.time,
Timeout: tt.timeout,
@@ -656,6 +693,7 @@
}
copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
KeepaliveParams: keepalive.ClientParameters{
Time: tt.time,
Timeout: tt.timeout,
diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go
index 7fc5fa0..0e8cd8e 100644
--- a/internal/transport/transport_test.go
+++ b/internal/transport/transport_test.go
@@ -516,7 +516,10 @@
}
func setUp(t *testing.T, port int, ht hType) (*server, *http2Client, func()) {
- return setUpWithOptions(t, port, &ServerConfig{}, ht, ConnectOptions{})
+ copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
+ }
+ return setUpWithOptions(t, port, &ServerConfig{BufferPool: mem.DefaultBufferPool()}, ht, copts)
}
func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) {
@@ -576,8 +579,13 @@
// TestInflightStreamClosing ensures that closing in-flight stream
// sends status error to concurrent stream reader.
func (s) TestInflightStreamClosing(t *testing.T) {
- serverConfig := &ServerConfig{}
- server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
+ }
+ copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
+ }
+ server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, copts)
defer cancel()
defer server.stop()
defer client.Close(fmt.Errorf("closed manually by test"))
@@ -785,6 +793,7 @@
func (s) TestLargeMessageWithDelayRead(t *testing.T) {
// Disable dynamic flow control.
sc := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
StaticWindowSize: true,
@@ -793,6 +802,7 @@
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
StaticWindowSize: true,
+ BufferPool: mem.DefaultBufferPool(),
}
server, ct, cancel := setUpWithOptions(t, 0, sc, delayRead, co)
defer cancel()
@@ -990,9 +1000,13 @@
func (s) TestMaxStreams(t *testing.T) {
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
MaxStreams: 1,
}
- server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
+ }
+ server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, copts)
defer cancel()
defer ct.Close(fmt.Errorf("closed manually by test"))
defer server.stop()
@@ -1131,8 +1145,10 @@
connectOptions := ConnectOptions{
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
+ BufferPool: mem.DefaultBufferPool(),
}
- server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions)
+ serverConfig := &ServerConfig{BufferPool: mem.DefaultBufferPool()}
+ server, client, cancel := setUpWithOptions(t, 0, serverConfig, notifyCall, connectOptions)
defer cancel()
defer server.stop()
defer client.Close(fmt.Errorf("closed manually by test"))
@@ -1218,10 +1234,14 @@
func (s) TestServerConnDecoupledFromApplicationRead(t *testing.T) {
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
}
- server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
+ }
+ server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, copts)
defer cancel()
defer server.stop()
defer client.Close(fmt.Errorf("closed manually by test"))
@@ -1288,7 +1308,7 @@
}
func (s) TestServerWithMisbehavedClient(t *testing.T) {
- server := setUpServerOnly(t, 0, &ServerConfig{}, suspended)
+ server := setUpServerOnly(t, 0, &ServerConfig{BufferPool: mem.DefaultBufferPool()}, suspended)
defer server.stop()
// Create a client that can override server stream quota.
mconn, err := net.Dial("tcp", server.lis.Addr().String())
@@ -1420,7 +1440,10 @@
time.AfterFunc(100*time.Millisecond, cancel)
parent := channelzSubChannel(t)
- copts := ConnectOptions{ChannelzParent: parent}
+ copts := ConnectOptions{
+ ChannelzParent: parent,
+ BufferPool: mem.DefaultBufferPool(),
+ }
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
_, err = NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
@@ -1514,7 +1537,10 @@
defer cancel()
parent := channelzSubChannel(t)
- copts := ConnectOptions{ChannelzParent: parent}
+ copts := ConnectOptions{
+ ChannelzParent: parent,
+ BufferPool: mem.DefaultBufferPool(),
+ }
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
ct, err := NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
@@ -1700,11 +1726,13 @@
InitialWindowSize: wc.serverStream,
InitialConnWindowSize: wc.serverConn,
StaticWindowSize: true,
+ BufferPool: mem.DefaultBufferPool(),
}
co := ConnectOptions{
InitialWindowSize: wc.clientStream,
InitialConnWindowSize: wc.clientConn,
StaticWindowSize: true,
+ BufferPool: mem.DefaultBufferPool(),
}
server, client, cancel := setUpWithOptions(t, 0, sc, pingpong, co)
defer cancel()
@@ -1949,7 +1977,7 @@
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- server := setUpServerOnly(t, 0, &ServerConfig{}, suspended)
+ server := setUpServerOnly(t, 0, &ServerConfig{BufferPool: mem.DefaultBufferPool()}, suspended)
defer server.stop()
// Create a client directly to not tie what you can send to API of
// http2_client.go (i.e. control headers being sent).
@@ -2149,7 +2177,7 @@
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- server := setUpServerOnly(t, 0, &ServerConfig{}, suspended)
+ server := setUpServerOnly(t, 0, &ServerConfig{BufferPool: mem.DefaultBufferPool()}, suspended)
defer server.stop()
// Create a client directly to not tie what you can send to API of
// http2_client.go (i.e. control headers being sent).
@@ -2516,7 +2544,7 @@
// NewHTTP2Client and verifies that these attributes are received by the
// transport credential handshaker.
func (s) TestClientHandshakeInfo(t *testing.T) {
- server := setUpServerOnly(t, 0, &ServerConfig{}, pingpong)
+ server := setUpServerOnly(t, 0, &ServerConfig{BufferPool: mem.DefaultBufferPool()}, pingpong)
defer server.stop()
const (
@@ -2534,6 +2562,7 @@
copts := ConnectOptions{
TransportCredentials: creds,
ChannelzParent: channelzSubChannel(t),
+ BufferPool: mem.DefaultBufferPool(),
}
tr, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason) {})
if err != nil {
@@ -2551,7 +2580,7 @@
// NewHTTP2Client and verifies that these attributes are received by a custom
// dialer.
func (s) TestClientHandshakeInfoDialer(t *testing.T) {
- server := setUpServerOnly(t, 0, &ServerConfig{}, pingpong)
+ server := setUpServerOnly(t, 0, &ServerConfig{BufferPool: mem.DefaultBufferPool()}, pingpong)
defer server.stop()
const (
@@ -2575,6 +2604,7 @@
copts := ConnectOptions{
Dialer: dialer,
ChannelzParent: channelzSubChannel(t),
+ BufferPool: mem.DefaultBufferPool(),
}
tr, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason) {})
if err != nil {
@@ -2930,7 +2960,10 @@
}
}()
- ct, err := NewHTTP2Client(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {})
+ cOpts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
+ }
+ ct, err := NewHTTP2Client(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, cOpts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("Error while creating client transport: %v", err)
}
@@ -2979,13 +3012,14 @@
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- server := setUpServerOnly(t, 0, &ServerConfig{}, normal)
+ server := setUpServerOnly(t, 0, &ServerConfig{BufferPool: mem.DefaultBufferPool()}, normal)
defer server.stop()
addr := resolver.Address{Addr: "localhost:" + server.port}
isReaderHanging := &atomic.Bool{}
readHangConn := make(chan struct{})
copts := ConnectOptions{
+ BufferPool: mem.DefaultBufferPool(),
Dialer: func(_ context.Context, addr string) (net.Conn, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
@@ -3069,7 +3103,7 @@
// Create the server set up.
connectCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- server := setUpServerOnly(t, 0, &ServerConfig{}, normal)
+ server := setUpServerOnly(t, 0, &ServerConfig{BufferPool: mem.DefaultBufferPool()}, normal)
defer server.stop()
addr := resolver.Address{Addr: "localhost:" + server.port}
isGreetingDone := &atomic.Bool{}
@@ -3082,7 +3116,10 @@
}
return &hangingConn{Conn: conn, hangConn: hangConn, startHanging: isGreetingDone}, nil
}
- copts := ConnectOptions{Dialer: dialer}
+ copts := ConnectOptions{
+ Dialer: dialer,
+ BufferPool: mem.DefaultBufferPool(),
+ }
copts.ChannelzParent = channelzSubChannel(t)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
@@ -3139,9 +3176,10 @@
return &deadlineTestConn{Conn: conn}, nil
}
co := ConnectOptions{
- Dialer: dialer,
+ Dialer: dialer,
+ BufferPool: mem.DefaultBufferPool(),
}
- server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, normal, co)
+ server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{BufferPool: mem.DefaultBufferPool()}, normal, co)
defer cancel()
defer server.stop()
dConn := client.conn.(*deadlineTestConn)
@@ -3197,7 +3235,7 @@
// configured deadline is reached. The test verifies that the server sends an
// RST stream only after the deadline is reached.
func (s) TestServerSendsRSTAfterDeadlineToMisbehavedClient(t *testing.T) {
- server := setUpServerOnly(t, 0, &ServerConfig{}, suspended)
+ server := setUpServerOnly(t, 0, &ServerConfig{BufferPool: mem.DefaultBufferPool()}, suspended)
defer server.stop()
// Create a client that can override server stream quota.
mconn, err := net.Dial("tcp", server.lis.Addr().String())
@@ -3390,12 +3428,13 @@
t.Run(test.name, func(t *testing.T) {
// Setup server configuration with channelz support
serverConfig := &ServerConfig{
+ BufferPool: mem.DefaultBufferPool(),
ChannelzParent: channelz.RegisterServer(t.Name()),
}
defer channelz.RemoveEntry(serverConfig.ChannelzParent.ID)
// Create server and client with normal handler (not notifyCall)
- server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{})
+ server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{BufferPool: mem.DefaultBufferPool()})
defer func() {
client.Close(fmt.Errorf("test cleanup"))
server.stop()