Make window size configurable. (#1210)
* Make window size configurable.
diff --git a/clientconn.go b/clientconn.go
index 0b09b3f..434cab4 100644
--- a/clientconn.go
+++ b/clientconn.go
@@ -105,6 +105,22 @@
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
+// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func WithInitialWindowSize(s int32) DialOption {
+ return func(o *dialOptions) {
+ o.copts.InitialWindowSize = s
+ }
+}
+
+// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func WithInitialConnWindowSize(s int32) DialOption {
+ return func(o *dialOptions) {
+ o.copts.InitialConnWindowSize = s
+ }
+}
+
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
func WithMaxMsgSize(s int) DialOption {
return func(o *dialOptions) {
diff --git a/server.go b/server.go
index 54085ee..8bae298 100644
--- a/server.go
+++ b/server.go
@@ -107,20 +107,22 @@
}
type options struct {
- creds credentials.TransportCredentials
- codec Codec
- cp Compressor
- dc Decompressor
- maxMsgSize int
- unaryInt UnaryServerInterceptor
- streamInt StreamServerInterceptor
- inTapHandle tap.ServerInHandle
- statsHandler stats.Handler
- maxConcurrentStreams uint32
- useHandlerImpl bool // use http.Handler-based server
- unknownStreamDesc *StreamDesc
- keepaliveParams keepalive.ServerParameters
- keepalivePolicy keepalive.EnforcementPolicy
+ creds credentials.TransportCredentials
+ codec Codec
+ cp Compressor
+ dc Decompressor
+ maxMsgSize int
+ unaryInt UnaryServerInterceptor
+ streamInt StreamServerInterceptor
+ inTapHandle tap.ServerInHandle
+ statsHandler stats.Handler
+ maxConcurrentStreams uint32
+ useHandlerImpl bool // use http.Handler-based server
+ unknownStreamDesc *StreamDesc
+ keepaliveParams keepalive.ServerParameters
+ keepalivePolicy keepalive.EnforcementPolicy
+ initialWindowSize int32
+ initialConnWindowSize int32
}
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
@@ -128,6 +130,22 @@
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options)
+// InitialWindowSize returns a ServerOption that sets window size for stream.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func InitialWindowSize(s int32) ServerOption {
+ return func(o *options) {
+ o.initialWindowSize = s
+ }
+}
+
+// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func InitialConnWindowSize(s int32) ServerOption {
+ return func(o *options) {
+ o.initialConnWindowSize = s
+ }
+}
+
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
return func(o *options) {
@@ -483,12 +501,14 @@
// transport.NewServerTransport).
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
config := &transport.ServerConfig{
- MaxStreams: s.opts.maxConcurrentStreams,
- AuthInfo: authInfo,
- InTapHandle: s.opts.inTapHandle,
- StatsHandler: s.opts.statsHandler,
- KeepaliveParams: s.opts.keepaliveParams,
- KeepalivePolicy: s.opts.keepalivePolicy,
+ MaxStreams: s.opts.maxConcurrentStreams,
+ AuthInfo: authInfo,
+ InTapHandle: s.opts.inTapHandle,
+ StatsHandler: s.opts.statsHandler,
+ KeepaliveParams: s.opts.keepaliveParams,
+ KeepalivePolicy: s.opts.keepalivePolicy,
+ InitialWindowSize: s.opts.initialWindowSize,
+ InitialConnWindowSize: s.opts.initialConnWindowSize,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
diff --git a/test/end2end_test.go b/test/end2end_test.go
index 7c83591..01b3e4f 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -431,20 +431,24 @@
cancel context.CancelFunc
// Configurable knobs, after newTest returns:
- testServer testpb.TestServiceServer // nil means none
- healthServer *health.Server // nil means disabled
- maxStream uint32
- tapHandle tap.ServerInHandle
- maxMsgSize int
- userAgent string
- clientCompression bool
- serverCompression bool
- unaryClientInt grpc.UnaryClientInterceptor
- streamClientInt grpc.StreamClientInterceptor
- unaryServerInt grpc.UnaryServerInterceptor
- streamServerInt grpc.StreamServerInterceptor
- unknownHandler grpc.StreamHandler
- sc <-chan grpc.ServiceConfig
+ testServer testpb.TestServiceServer // nil means none
+ healthServer *health.Server // nil means disabled
+ maxStream uint32
+ tapHandle tap.ServerInHandle
+ maxMsgSize int
+ userAgent string
+ clientCompression bool
+ serverCompression bool
+ unaryClientInt grpc.UnaryClientInterceptor
+ streamClientInt grpc.StreamClientInterceptor
+ unaryServerInt grpc.UnaryServerInterceptor
+ streamServerInt grpc.StreamServerInterceptor
+ unknownHandler grpc.StreamHandler
+ sc <-chan grpc.ServiceConfig
+ serverInitialWindowSize int32
+ serverInitialConnWindowSize int32
+ clientInitialWindowSize int32
+ clientInitialConnWindowSize int32
// srv and srvAddr are set once startServer is called.
srv *grpc.Server
@@ -512,6 +516,12 @@
if te.unknownHandler != nil {
sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
}
+ if te.serverInitialWindowSize > 0 {
+ sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
+ }
+ if te.serverInitialConnWindowSize > 0 {
+ sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
+ }
la := "localhost:0"
switch te.e.network {
case "unix":
@@ -605,6 +615,12 @@
if te.e.balancer {
opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil)))
}
+ if te.clientInitialWindowSize > 0 {
+ opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
+ }
+ if te.clientInitialConnWindowSize > 0 {
+ opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
+ }
var err error
te.cc, err = grpc.Dial(te.srvAddr, opts...)
if err != nil {
@@ -3881,3 +3897,90 @@
t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err)
}
}
+
+type windowSizeConfig struct {
+ serverStream int32
+ serverConn int32
+ clientStream int32
+ clientConn int32
+}
+
+func max(a, b int32) int32 {
+ if a > b {
+ return a
+ }
+ return b
+}
+
+func TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
+ defer leakCheck(t)()
+ wc := windowSizeConfig{
+ serverStream: 8 * 1024 * 1024,
+ serverConn: 12 * 1024 * 1024,
+ clientStream: 6 * 1024 * 1024,
+ clientConn: 8 * 1024 * 1024,
+ }
+ for _, e := range listTestEnv() {
+ testConfigurableWindowSize(t, e, wc)
+ }
+}
+
+func TestConfigurableWindowSizeWithSmallWindow(t *testing.T) {
+ defer leakCheck(t)()
+ wc := windowSizeConfig{
+ serverStream: 1,
+ serverConn: 1,
+ clientStream: 1,
+ clientConn: 1,
+ }
+ for _, e := range listTestEnv() {
+ testConfigurableWindowSize(t, e, wc)
+ }
+}
+
+func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
+ te := newTest(t, e)
+ te.serverInitialWindowSize = wc.serverStream
+ te.serverInitialConnWindowSize = wc.serverConn
+ te.clientInitialWindowSize = wc.clientStream
+ te.clientInitialConnWindowSize = wc.clientConn
+
+ te.startServer(&testServer{security: e.security})
+ defer te.tearDown()
+
+ cc := te.clientConn()
+ tc := testpb.NewTestServiceClient(cc)
+ stream, err := tc.FullDuplexCall(context.Background())
+ if err != nil {
+ t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
+ }
+ numOfIter := 11
+ // Set message size to exhaust largest of window sizes.
+ messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1)
+ messageSize = max(messageSize, 64*1024)
+ payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize)
+ if err != nil {
+ t.Fatal(err)
+ }
+ respParams := []*testpb.ResponseParameters{
+ {
+ Size: proto.Int32(messageSize),
+ },
+ }
+ req := &testpb.StreamingOutputCallRequest{
+ ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
+ ResponseParameters: respParams,
+ Payload: payload,
+ }
+ for i := 0; i < numOfIter; i++ {
+ if err := stream.Send(req); err != nil {
+ t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
+ }
+ if _, err := stream.Recv(); err != nil {
+ t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
+ }
+ }
+ if err := stream.CloseSend(); err != nil {
+ t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
+ }
+}
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 380fff6..bc202df 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -110,6 +110,8 @@
statsHandler stats.Handler
+ initialWindowSize int32
+
mu sync.Mutex // guard the following variables
state transportState // the state of underlying connection
activeStreams map[uint32]*Stream
@@ -198,6 +200,10 @@
if kp.Timeout == 0 {
kp.Timeout = defaultClientKeepaliveTimeout
}
+ icwz := int32(initialConnWindowSize)
+ if opts.InitialConnWindowSize >= defaultWindowSize {
+ icwz = opts.InitialConnWindowSize
+ }
var buf bytes.Buffer
t := &http2Client{
ctx: ctx,
@@ -209,27 +215,31 @@
localAddr: conn.LocalAddr(),
authInfo: authInfo,
// The client initiated stream id is odd starting from 1.
- nextID: 1,
- writableChan: make(chan int, 1),
- shutdownChan: make(chan struct{}),
- errorChan: make(chan struct{}),
- goAway: make(chan struct{}),
- awakenKeepalive: make(chan struct{}, 1),
- framer: newFramer(conn),
- hBuf: &buf,
- hEnc: hpack.NewEncoder(&buf),
- controlBuf: newRecvBuffer(),
- fc: &inFlow{limit: initialConnWindowSize},
- sendQuotaPool: newQuotaPool(defaultWindowSize),
- scheme: scheme,
- state: reachable,
- activeStreams: make(map[uint32]*Stream),
- creds: opts.PerRPCCredentials,
- maxStreams: defaultMaxStreamsClient,
- streamsQuota: newQuotaPool(defaultMaxStreamsClient),
- streamSendQuota: defaultWindowSize,
- kp: kp,
- statsHandler: opts.StatsHandler,
+ nextID: 1,
+ writableChan: make(chan int, 1),
+ shutdownChan: make(chan struct{}),
+ errorChan: make(chan struct{}),
+ goAway: make(chan struct{}),
+ awakenKeepalive: make(chan struct{}, 1),
+ framer: newFramer(conn),
+ hBuf: &buf,
+ hEnc: hpack.NewEncoder(&buf),
+ controlBuf: newRecvBuffer(),
+ fc: &inFlow{limit: uint32(icwz)},
+ sendQuotaPool: newQuotaPool(defaultWindowSize),
+ scheme: scheme,
+ state: reachable,
+ activeStreams: make(map[uint32]*Stream),
+ creds: opts.PerRPCCredentials,
+ maxStreams: defaultMaxStreamsClient,
+ streamsQuota: newQuotaPool(defaultMaxStreamsClient),
+ streamSendQuota: defaultWindowSize,
+ kp: kp,
+ statsHandler: opts.StatsHandler,
+ initialWindowSize: initialWindowSize,
+ }
+ if opts.InitialWindowSize >= defaultWindowSize {
+ t.initialWindowSize = opts.InitialWindowSize
}
// Make sure awakenKeepalive can't be written upon.
// keepalive routine will make it writable, if need be.
@@ -258,10 +268,10 @@
t.Close()
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
}
- if initialWindowSize != defaultWindowSize {
+ if t.initialWindowSize != defaultWindowSize {
err = t.framer.writeSettings(true, http2.Setting{
ID: http2.SettingInitialWindowSize,
- Val: uint32(initialWindowSize),
+ Val: uint32(t.initialWindowSize),
})
} else {
err = t.framer.writeSettings(true)
@@ -271,7 +281,7 @@
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
- if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
+ if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: %v", err)
@@ -294,7 +304,7 @@
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
buf: newRecvBuffer(),
- fc: &inFlow{limit: initialWindowSize},
+ fc: &inFlow{limit: uint32(t.initialWindowSize)},
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
headerChan: make(chan struct{}),
}
diff --git a/transport/http2_server.go b/transport/http2_server.go
index 14cd19c..94bd4b5 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -113,6 +113,8 @@
// 1 means yes.
resetPingStrikes uint32 // Accessed atomically.
+ initialWindowSize int32
+
mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
@@ -142,16 +144,24 @@
Val: maxStreams,
})
}
- if initialWindowSize != defaultWindowSize {
+ iwz := int32(initialWindowSize)
+ if config.InitialWindowSize >= defaultWindowSize {
+ iwz = config.InitialWindowSize
+ }
+ icwz := int32(initialConnWindowSize)
+ if config.InitialConnWindowSize >= defaultWindowSize {
+ icwz = config.InitialConnWindowSize
+ }
+ if iwz != defaultWindowSize {
settings = append(settings, http2.Setting{
ID: http2.SettingInitialWindowSize,
- Val: uint32(initialWindowSize)})
+ Val: uint32(iwz)})
}
if err := framer.writeSettings(true, settings...); err != nil {
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
- if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
+ if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
return nil, connectionErrorf(true, err, "transport: %v", err)
}
@@ -180,28 +190,29 @@
}
var buf bytes.Buffer
t := &http2Server{
- ctx: context.Background(),
- conn: conn,
- remoteAddr: conn.RemoteAddr(),
- localAddr: conn.LocalAddr(),
- authInfo: config.AuthInfo,
- framer: framer,
- hBuf: &buf,
- hEnc: hpack.NewEncoder(&buf),
- maxStreams: maxStreams,
- inTapHandle: config.InTapHandle,
- controlBuf: newRecvBuffer(),
- fc: &inFlow{limit: initialConnWindowSize},
- sendQuotaPool: newQuotaPool(defaultWindowSize),
- state: reachable,
- writableChan: make(chan int, 1),
- shutdownChan: make(chan struct{}),
- activeStreams: make(map[uint32]*Stream),
- streamSendQuota: defaultWindowSize,
- stats: config.StatsHandler,
- kp: kp,
- idle: time.Now(),
- kep: kep,
+ ctx: context.Background(),
+ conn: conn,
+ remoteAddr: conn.RemoteAddr(),
+ localAddr: conn.LocalAddr(),
+ authInfo: config.AuthInfo,
+ framer: framer,
+ hBuf: &buf,
+ hEnc: hpack.NewEncoder(&buf),
+ maxStreams: maxStreams,
+ inTapHandle: config.InTapHandle,
+ controlBuf: newRecvBuffer(),
+ fc: &inFlow{limit: uint32(icwz)},
+ sendQuotaPool: newQuotaPool(defaultWindowSize),
+ state: reachable,
+ writableChan: make(chan int, 1),
+ shutdownChan: make(chan struct{}),
+ activeStreams: make(map[uint32]*Stream),
+ streamSendQuota: defaultWindowSize,
+ stats: config.StatsHandler,
+ kp: kp,
+ idle: time.Now(),
+ kep: kep,
+ initialWindowSize: iwz,
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
@@ -224,7 +235,7 @@
id: frame.Header().StreamID,
st: t,
buf: buf,
- fc: &inFlow{limit: initialWindowSize},
+ fc: &inFlow{limit: uint32(t.initialWindowSize)},
}
var state decodeState
diff --git a/transport/transport.go b/transport/transport.go
index 88c2c98..c22333c 100644
--- a/transport/transport.go
+++ b/transport/transport.go
@@ -392,12 +392,14 @@
// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
- MaxStreams uint32
- AuthInfo credentials.AuthInfo
- InTapHandle tap.ServerInHandle
- StatsHandler stats.Handler
- KeepaliveParams keepalive.ServerParameters
- KeepalivePolicy keepalive.EnforcementPolicy
+ MaxStreams uint32
+ AuthInfo credentials.AuthInfo
+ InTapHandle tap.ServerInHandle
+ StatsHandler stats.Handler
+ KeepaliveParams keepalive.ServerParameters
+ KeepalivePolicy keepalive.EnforcementPolicy
+ InitialWindowSize int32
+ InitialConnWindowSize int32
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
@@ -425,6 +427,10 @@
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
+ // InitialWindowSize sets the intial window size for a stream.
+ InitialWindowSize int32
+ // InitialConnWindowSize sets the intial window size for a connection.
+ InitialConnWindowSize int32
}
// TargetInfo contains the information of the target such as network address and metadata.
diff --git a/transport/transport_test.go b/transport/transport_test.go
index 4e986e5..7429f2e 100644
--- a/transport/transport_test.go
+++ b/transport/transport_test.go
@@ -1266,3 +1266,153 @@
}
}
}
+
+type windowSizeConfig struct {
+ serverStream int32
+ serverConn int32
+ clientStream int32
+ clientConn int32
+}
+
+func TestAccountCheckWindowSizeWithLargeWindow(t *testing.T) {
+ wc := windowSizeConfig{
+ serverStream: 10 * 1024 * 1024,
+ serverConn: 12 * 1024 * 1024,
+ clientStream: 6 * 1024 * 1024,
+ clientConn: 8 * 1024 * 1024,
+ }
+ testAccountCheckWindowSize(t, wc)
+}
+
+func TestAccountCheckWindowSizeWithSmallWindow(t *testing.T) {
+ wc := windowSizeConfig{
+ serverStream: defaultWindowSize,
+ // Note this is smaller than initialConnWindowSize which is the current default.
+ serverConn: defaultWindowSize,
+ clientStream: defaultWindowSize,
+ clientConn: defaultWindowSize,
+ }
+ testAccountCheckWindowSize(t, wc)
+}
+
+func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
+ serverConfig := &ServerConfig{
+ InitialWindowSize: wc.serverStream,
+ InitialConnWindowSize: wc.serverConn,
+ }
+ connectOptions := ConnectOptions{
+ InitialWindowSize: wc.clientStream,
+ InitialConnWindowSize: wc.clientConn,
+ }
+ server, client := setUpWithOptions(t, 0, serverConfig, suspended, connectOptions)
+ defer server.stop()
+ defer client.Close()
+
+ // Wait for server conns to be populated with new server transport.
+ waitWhileTrue(t, func() (bool, error) {
+ server.mu.Lock()
+ defer server.mu.Unlock()
+ if len(server.conns) == 0 {
+ return true, fmt.Errorf("timed out waiting for server transport to be created")
+ }
+ return false, nil
+ })
+ var st *http2Server
+ server.mu.Lock()
+ for k := range server.conns {
+ st = k.(*http2Server)
+ }
+ server.mu.Unlock()
+ ct := client.(*http2Client)
+ cstream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+ if err != nil {
+ t.Fatalf("Failed to create stream. Err: %v", err)
+ }
+ // Wait for server to receive headers.
+ waitWhileTrue(t, func() (bool, error) {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+ if len(st.activeStreams) == 0 {
+ return true, fmt.Errorf("timed out waiting for server to receive headers")
+ }
+ return false, nil
+ })
+ // Sleeping to make sure the settings are applied in case of negative test.
+ time.Sleep(time.Second)
+
+ waitWhileTrue(t, func() (bool, error) {
+ if lim := st.fc.limit; lim != uint32(serverConfig.InitialConnWindowSize) {
+ return true, fmt.Errorf("Server transport flow control window size: got %v, want %v", lim, serverConfig.InitialConnWindowSize)
+ }
+ return false, nil
+ })
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ serverSendQuota, err := wait(ctx, nil, nil, nil, st.sendQuotaPool.acquire())
+ if err != nil {
+ t.Fatalf("Error while acquiring sendQuota on server. Err: %v", err)
+ }
+ cancel()
+ st.sendQuotaPool.add(serverSendQuota)
+ if serverSendQuota != int(connectOptions.InitialConnWindowSize) {
+ t.Fatalf("Server send quota(%v) not equal to client's window size(%v) on conn.", serverSendQuota, connectOptions.InitialConnWindowSize)
+ }
+ st.mu.Lock()
+ if st.streamSendQuota != uint32(connectOptions.InitialWindowSize) {
+ t.Fatalf("Server stream send quota(%v) not equal to client's window size(%v) on stream.", ct.streamSendQuota, connectOptions.InitialWindowSize)
+ }
+ st.mu.Unlock()
+ if ct.fc.limit != uint32(connectOptions.InitialConnWindowSize) {
+ t.Fatalf("Client transport flow control window size is %v, want %v", ct.fc.limit, connectOptions.InitialConnWindowSize)
+ }
+ ctx, cancel = context.WithTimeout(context.Background(), time.Second)
+ clientSendQuota, err := wait(ctx, nil, nil, nil, ct.sendQuotaPool.acquire())
+ if err != nil {
+ t.Fatalf("Error while acquiring sendQuota on client. Err: %v", err)
+ }
+ cancel()
+ ct.sendQuotaPool.add(clientSendQuota)
+ if clientSendQuota != int(serverConfig.InitialConnWindowSize) {
+ t.Fatalf("Client send quota(%v) not equal to server's window size(%v) on conn.", clientSendQuota, serverConfig.InitialConnWindowSize)
+ }
+ ct.mu.Lock()
+ if ct.streamSendQuota != uint32(serverConfig.InitialWindowSize) {
+ t.Fatalf("Client stream send quota(%v) not equal to server's window size(%v) on stream.", ct.streamSendQuota, serverConfig.InitialWindowSize)
+ }
+ ct.mu.Unlock()
+ if cstream.fc.limit != uint32(connectOptions.InitialWindowSize) {
+ t.Fatalf("Client stream flow control window size is %v, want %v", cstream.fc.limit, connectOptions.InitialWindowSize)
+ }
+ var sstream *Stream
+ st.mu.Lock()
+ for _, v := range st.activeStreams {
+ sstream = v
+ }
+ st.mu.Unlock()
+ if sstream.fc.limit != uint32(serverConfig.InitialWindowSize) {
+ t.Fatalf("Server stream flow control window size is %v, want %v", sstream.fc.limit, serverConfig.InitialWindowSize)
+ }
+}
+
+func waitWhileTrue(t *testing.T, condition func() (bool, error)) {
+ var (
+ wait bool
+ err error
+ )
+ timer := time.NewTimer(time.Second * 5)
+ for {
+ wait, err = condition()
+ if wait {
+ select {
+ case <-timer.C:
+ t.Fatalf(err.Error())
+ default:
+ time.Sleep(50 * time.Millisecond)
+ continue
+ }
+ }
+ if !timer.Stop() {
+ <-timer.C
+ }
+ break
+ }
+}