Make window size configurable. (#1210)

* Make window size configurable.
diff --git a/clientconn.go b/clientconn.go
index 0b09b3f..434cab4 100644
--- a/clientconn.go
+++ b/clientconn.go
@@ -105,6 +105,22 @@
 // DialOption configures how we set up the connection.
 type DialOption func(*dialOptions)
 
+// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func WithInitialWindowSize(s int32) DialOption {
+	return func(o *dialOptions) {
+		o.copts.InitialWindowSize = s
+	}
+}
+
+// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func WithInitialConnWindowSize(s int32) DialOption {
+	return func(o *dialOptions) {
+		o.copts.InitialConnWindowSize = s
+	}
+}
+
 // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
 func WithMaxMsgSize(s int) DialOption {
 	return func(o *dialOptions) {
diff --git a/server.go b/server.go
index 54085ee..8bae298 100644
--- a/server.go
+++ b/server.go
@@ -107,20 +107,22 @@
 }
 
 type options struct {
-	creds                credentials.TransportCredentials
-	codec                Codec
-	cp                   Compressor
-	dc                   Decompressor
-	maxMsgSize           int
-	unaryInt             UnaryServerInterceptor
-	streamInt            StreamServerInterceptor
-	inTapHandle          tap.ServerInHandle
-	statsHandler         stats.Handler
-	maxConcurrentStreams uint32
-	useHandlerImpl       bool // use http.Handler-based server
-	unknownStreamDesc    *StreamDesc
-	keepaliveParams      keepalive.ServerParameters
-	keepalivePolicy      keepalive.EnforcementPolicy
+	creds                 credentials.TransportCredentials
+	codec                 Codec
+	cp                    Compressor
+	dc                    Decompressor
+	maxMsgSize            int
+	unaryInt              UnaryServerInterceptor
+	streamInt             StreamServerInterceptor
+	inTapHandle           tap.ServerInHandle
+	statsHandler          stats.Handler
+	maxConcurrentStreams  uint32
+	useHandlerImpl        bool // use http.Handler-based server
+	unknownStreamDesc     *StreamDesc
+	keepaliveParams       keepalive.ServerParameters
+	keepalivePolicy       keepalive.EnforcementPolicy
+	initialWindowSize     int32
+	initialConnWindowSize int32
 }
 
 var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
@@ -128,6 +130,22 @@
 // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
 type ServerOption func(*options)
 
+// InitialWindowSize returns a ServerOption that sets window size for stream.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func InitialWindowSize(s int32) ServerOption {
+	return func(o *options) {
+		o.initialWindowSize = s
+	}
+}
+
+// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func InitialConnWindowSize(s int32) ServerOption {
+	return func(o *options) {
+		o.initialConnWindowSize = s
+	}
+}
+
 // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
 func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
 	return func(o *options) {
@@ -483,12 +501,14 @@
 // transport.NewServerTransport).
 func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
 	config := &transport.ServerConfig{
-		MaxStreams:      s.opts.maxConcurrentStreams,
-		AuthInfo:        authInfo,
-		InTapHandle:     s.opts.inTapHandle,
-		StatsHandler:    s.opts.statsHandler,
-		KeepaliveParams: s.opts.keepaliveParams,
-		KeepalivePolicy: s.opts.keepalivePolicy,
+		MaxStreams:            s.opts.maxConcurrentStreams,
+		AuthInfo:              authInfo,
+		InTapHandle:           s.opts.inTapHandle,
+		StatsHandler:          s.opts.statsHandler,
+		KeepaliveParams:       s.opts.keepaliveParams,
+		KeepalivePolicy:       s.opts.keepalivePolicy,
+		InitialWindowSize:     s.opts.initialWindowSize,
+		InitialConnWindowSize: s.opts.initialConnWindowSize,
 	}
 	st, err := transport.NewServerTransport("http2", c, config)
 	if err != nil {
diff --git a/test/end2end_test.go b/test/end2end_test.go
index 7c83591..01b3e4f 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -431,20 +431,24 @@
 	cancel context.CancelFunc
 
 	// Configurable knobs, after newTest returns:
-	testServer        testpb.TestServiceServer // nil means none
-	healthServer      *health.Server           // nil means disabled
-	maxStream         uint32
-	tapHandle         tap.ServerInHandle
-	maxMsgSize        int
-	userAgent         string
-	clientCompression bool
-	serverCompression bool
-	unaryClientInt    grpc.UnaryClientInterceptor
-	streamClientInt   grpc.StreamClientInterceptor
-	unaryServerInt    grpc.UnaryServerInterceptor
-	streamServerInt   grpc.StreamServerInterceptor
-	unknownHandler    grpc.StreamHandler
-	sc                <-chan grpc.ServiceConfig
+	testServer                  testpb.TestServiceServer // nil means none
+	healthServer                *health.Server           // nil means disabled
+	maxStream                   uint32
+	tapHandle                   tap.ServerInHandle
+	maxMsgSize                  int
+	userAgent                   string
+	clientCompression           bool
+	serverCompression           bool
+	unaryClientInt              grpc.UnaryClientInterceptor
+	streamClientInt             grpc.StreamClientInterceptor
+	unaryServerInt              grpc.UnaryServerInterceptor
+	streamServerInt             grpc.StreamServerInterceptor
+	unknownHandler              grpc.StreamHandler
+	sc                          <-chan grpc.ServiceConfig
+	serverInitialWindowSize     int32
+	serverInitialConnWindowSize int32
+	clientInitialWindowSize     int32
+	clientInitialConnWindowSize int32
 
 	// srv and srvAddr are set once startServer is called.
 	srv     *grpc.Server
@@ -512,6 +516,12 @@
 	if te.unknownHandler != nil {
 		sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
 	}
+	if te.serverInitialWindowSize > 0 {
+		sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
+	}
+	if te.serverInitialConnWindowSize > 0 {
+		sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
+	}
 	la := "localhost:0"
 	switch te.e.network {
 	case "unix":
@@ -605,6 +615,12 @@
 	if te.e.balancer {
 		opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil)))
 	}
+	if te.clientInitialWindowSize > 0 {
+		opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
+	}
+	if te.clientInitialConnWindowSize > 0 {
+		opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
+	}
 	var err error
 	te.cc, err = grpc.Dial(te.srvAddr, opts...)
 	if err != nil {
@@ -3881,3 +3897,90 @@
 		t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err)
 	}
 }
+
+type windowSizeConfig struct {
+	serverStream int32
+	serverConn   int32
+	clientStream int32
+	clientConn   int32
+}
+
+func max(a, b int32) int32 {
+	if a > b {
+		return a
+	}
+	return b
+}
+
+func TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
+	defer leakCheck(t)()
+	wc := windowSizeConfig{
+		serverStream: 8 * 1024 * 1024,
+		serverConn:   12 * 1024 * 1024,
+		clientStream: 6 * 1024 * 1024,
+		clientConn:   8 * 1024 * 1024,
+	}
+	for _, e := range listTestEnv() {
+		testConfigurableWindowSize(t, e, wc)
+	}
+}
+
+func TestConfigurableWindowSizeWithSmallWindow(t *testing.T) {
+	defer leakCheck(t)()
+	wc := windowSizeConfig{
+		serverStream: 1,
+		serverConn:   1,
+		clientStream: 1,
+		clientConn:   1,
+	}
+	for _, e := range listTestEnv() {
+		testConfigurableWindowSize(t, e, wc)
+	}
+}
+
+func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
+	te := newTest(t, e)
+	te.serverInitialWindowSize = wc.serverStream
+	te.serverInitialConnWindowSize = wc.serverConn
+	te.clientInitialWindowSize = wc.clientStream
+	te.clientInitialConnWindowSize = wc.clientConn
+
+	te.startServer(&testServer{security: e.security})
+	defer te.tearDown()
+
+	cc := te.clientConn()
+	tc := testpb.NewTestServiceClient(cc)
+	stream, err := tc.FullDuplexCall(context.Background())
+	if err != nil {
+		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
+	}
+	numOfIter := 11
+	// Set message size to exhaust largest of window sizes.
+	messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1)
+	messageSize = max(messageSize, 64*1024)
+	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize)
+	if err != nil {
+		t.Fatal(err)
+	}
+	respParams := []*testpb.ResponseParameters{
+		{
+			Size: proto.Int32(messageSize),
+		},
+	}
+	req := &testpb.StreamingOutputCallRequest{
+		ResponseType:       testpb.PayloadType_COMPRESSABLE.Enum(),
+		ResponseParameters: respParams,
+		Payload:            payload,
+	}
+	for i := 0; i < numOfIter; i++ {
+		if err := stream.Send(req); err != nil {
+			t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
+		}
+		if _, err := stream.Recv(); err != nil {
+			t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
+		}
+	}
+	if err := stream.CloseSend(); err != nil {
+		t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
+	}
+}
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 380fff6..bc202df 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -110,6 +110,8 @@
 
 	statsHandler stats.Handler
 
+	initialWindowSize int32
+
 	mu            sync.Mutex     // guard the following variables
 	state         transportState // the state of underlying connection
 	activeStreams map[uint32]*Stream
@@ -198,6 +200,10 @@
 	if kp.Timeout == 0 {
 		kp.Timeout = defaultClientKeepaliveTimeout
 	}
+	icwz := int32(initialConnWindowSize)
+	if opts.InitialConnWindowSize >= defaultWindowSize {
+		icwz = opts.InitialConnWindowSize
+	}
 	var buf bytes.Buffer
 	t := &http2Client{
 		ctx:        ctx,
@@ -209,27 +215,31 @@
 		localAddr:  conn.LocalAddr(),
 		authInfo:   authInfo,
 		// The client initiated stream id is odd starting from 1.
-		nextID:          1,
-		writableChan:    make(chan int, 1),
-		shutdownChan:    make(chan struct{}),
-		errorChan:       make(chan struct{}),
-		goAway:          make(chan struct{}),
-		awakenKeepalive: make(chan struct{}, 1),
-		framer:          newFramer(conn),
-		hBuf:            &buf,
-		hEnc:            hpack.NewEncoder(&buf),
-		controlBuf:      newRecvBuffer(),
-		fc:              &inFlow{limit: initialConnWindowSize},
-		sendQuotaPool:   newQuotaPool(defaultWindowSize),
-		scheme:          scheme,
-		state:           reachable,
-		activeStreams:   make(map[uint32]*Stream),
-		creds:           opts.PerRPCCredentials,
-		maxStreams:      defaultMaxStreamsClient,
-		streamsQuota:    newQuotaPool(defaultMaxStreamsClient),
-		streamSendQuota: defaultWindowSize,
-		kp:              kp,
-		statsHandler:    opts.StatsHandler,
+		nextID:            1,
+		writableChan:      make(chan int, 1),
+		shutdownChan:      make(chan struct{}),
+		errorChan:         make(chan struct{}),
+		goAway:            make(chan struct{}),
+		awakenKeepalive:   make(chan struct{}, 1),
+		framer:            newFramer(conn),
+		hBuf:              &buf,
+		hEnc:              hpack.NewEncoder(&buf),
+		controlBuf:        newRecvBuffer(),
+		fc:                &inFlow{limit: uint32(icwz)},
+		sendQuotaPool:     newQuotaPool(defaultWindowSize),
+		scheme:            scheme,
+		state:             reachable,
+		activeStreams:     make(map[uint32]*Stream),
+		creds:             opts.PerRPCCredentials,
+		maxStreams:        defaultMaxStreamsClient,
+		streamsQuota:      newQuotaPool(defaultMaxStreamsClient),
+		streamSendQuota:   defaultWindowSize,
+		kp:                kp,
+		statsHandler:      opts.StatsHandler,
+		initialWindowSize: initialWindowSize,
+	}
+	if opts.InitialWindowSize >= defaultWindowSize {
+		t.initialWindowSize = opts.InitialWindowSize
 	}
 	// Make sure awakenKeepalive can't be written upon.
 	// keepalive routine will make it writable, if need be.
@@ -258,10 +268,10 @@
 		t.Close()
 		return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
 	}
-	if initialWindowSize != defaultWindowSize {
+	if t.initialWindowSize != defaultWindowSize {
 		err = t.framer.writeSettings(true, http2.Setting{
 			ID:  http2.SettingInitialWindowSize,
-			Val: uint32(initialWindowSize),
+			Val: uint32(t.initialWindowSize),
 		})
 	} else {
 		err = t.framer.writeSettings(true)
@@ -271,7 +281,7 @@
 		return nil, connectionErrorf(true, err, "transport: %v", err)
 	}
 	// Adjust the connection flow control window if needed.
-	if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
+	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
 		if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
 			t.Close()
 			return nil, connectionErrorf(true, err, "transport: %v", err)
@@ -294,7 +304,7 @@
 		method:        callHdr.Method,
 		sendCompress:  callHdr.SendCompress,
 		buf:           newRecvBuffer(),
-		fc:            &inFlow{limit: initialWindowSize},
+		fc:            &inFlow{limit: uint32(t.initialWindowSize)},
 		sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
 		headerChan:    make(chan struct{}),
 	}
diff --git a/transport/http2_server.go b/transport/http2_server.go
index 14cd19c..94bd4b5 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -113,6 +113,8 @@
 	// 1 means yes.
 	resetPingStrikes uint32 // Accessed atomically.
 
+	initialWindowSize int32
+
 	mu            sync.Mutex // guard the following
 	state         transportState
 	activeStreams map[uint32]*Stream
@@ -142,16 +144,24 @@
 			Val: maxStreams,
 		})
 	}
-	if initialWindowSize != defaultWindowSize {
+	iwz := int32(initialWindowSize)
+	if config.InitialWindowSize >= defaultWindowSize {
+		iwz = config.InitialWindowSize
+	}
+	icwz := int32(initialConnWindowSize)
+	if config.InitialConnWindowSize >= defaultWindowSize {
+		icwz = config.InitialConnWindowSize
+	}
+	if iwz != defaultWindowSize {
 		settings = append(settings, http2.Setting{
 			ID:  http2.SettingInitialWindowSize,
-			Val: uint32(initialWindowSize)})
+			Val: uint32(iwz)})
 	}
 	if err := framer.writeSettings(true, settings...); err != nil {
 		return nil, connectionErrorf(true, err, "transport: %v", err)
 	}
 	// Adjust the connection flow control window if needed.
-	if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
+	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
 		if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
 			return nil, connectionErrorf(true, err, "transport: %v", err)
 		}
@@ -180,28 +190,29 @@
 	}
 	var buf bytes.Buffer
 	t := &http2Server{
-		ctx:             context.Background(),
-		conn:            conn,
-		remoteAddr:      conn.RemoteAddr(),
-		localAddr:       conn.LocalAddr(),
-		authInfo:        config.AuthInfo,
-		framer:          framer,
-		hBuf:            &buf,
-		hEnc:            hpack.NewEncoder(&buf),
-		maxStreams:      maxStreams,
-		inTapHandle:     config.InTapHandle,
-		controlBuf:      newRecvBuffer(),
-		fc:              &inFlow{limit: initialConnWindowSize},
-		sendQuotaPool:   newQuotaPool(defaultWindowSize),
-		state:           reachable,
-		writableChan:    make(chan int, 1),
-		shutdownChan:    make(chan struct{}),
-		activeStreams:   make(map[uint32]*Stream),
-		streamSendQuota: defaultWindowSize,
-		stats:           config.StatsHandler,
-		kp:              kp,
-		idle:            time.Now(),
-		kep:             kep,
+		ctx:               context.Background(),
+		conn:              conn,
+		remoteAddr:        conn.RemoteAddr(),
+		localAddr:         conn.LocalAddr(),
+		authInfo:          config.AuthInfo,
+		framer:            framer,
+		hBuf:              &buf,
+		hEnc:              hpack.NewEncoder(&buf),
+		maxStreams:        maxStreams,
+		inTapHandle:       config.InTapHandle,
+		controlBuf:        newRecvBuffer(),
+		fc:                &inFlow{limit: uint32(icwz)},
+		sendQuotaPool:     newQuotaPool(defaultWindowSize),
+		state:             reachable,
+		writableChan:      make(chan int, 1),
+		shutdownChan:      make(chan struct{}),
+		activeStreams:     make(map[uint32]*Stream),
+		streamSendQuota:   defaultWindowSize,
+		stats:             config.StatsHandler,
+		kp:                kp,
+		idle:              time.Now(),
+		kep:               kep,
+		initialWindowSize: iwz,
 	}
 	if t.stats != nil {
 		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
@@ -224,7 +235,7 @@
 		id:  frame.Header().StreamID,
 		st:  t,
 		buf: buf,
-		fc:  &inFlow{limit: initialWindowSize},
+		fc:  &inFlow{limit: uint32(t.initialWindowSize)},
 	}
 
 	var state decodeState
diff --git a/transport/transport.go b/transport/transport.go
index 88c2c98..c22333c 100644
--- a/transport/transport.go
+++ b/transport/transport.go
@@ -392,12 +392,14 @@
 
 // ServerConfig consists of all the configurations to establish a server transport.
 type ServerConfig struct {
-	MaxStreams      uint32
-	AuthInfo        credentials.AuthInfo
-	InTapHandle     tap.ServerInHandle
-	StatsHandler    stats.Handler
-	KeepaliveParams keepalive.ServerParameters
-	KeepalivePolicy keepalive.EnforcementPolicy
+	MaxStreams            uint32
+	AuthInfo              credentials.AuthInfo
+	InTapHandle           tap.ServerInHandle
+	StatsHandler          stats.Handler
+	KeepaliveParams       keepalive.ServerParameters
+	KeepalivePolicy       keepalive.EnforcementPolicy
+	InitialWindowSize     int32
+	InitialConnWindowSize int32
 }
 
 // NewServerTransport creates a ServerTransport with conn or non-nil error
@@ -425,6 +427,10 @@
 	KeepaliveParams keepalive.ClientParameters
 	// StatsHandler stores the handler for stats.
 	StatsHandler stats.Handler
+	// InitialWindowSize sets the intial window size for a stream.
+	InitialWindowSize int32
+	// InitialConnWindowSize sets the intial window size for a connection.
+	InitialConnWindowSize int32
 }
 
 // TargetInfo contains the information of the target such as network address and metadata.
diff --git a/transport/transport_test.go b/transport/transport_test.go
index 4e986e5..7429f2e 100644
--- a/transport/transport_test.go
+++ b/transport/transport_test.go
@@ -1266,3 +1266,153 @@
 		}
 	}
 }
+
+type windowSizeConfig struct {
+	serverStream int32
+	serverConn   int32
+	clientStream int32
+	clientConn   int32
+}
+
+func TestAccountCheckWindowSizeWithLargeWindow(t *testing.T) {
+	wc := windowSizeConfig{
+		serverStream: 10 * 1024 * 1024,
+		serverConn:   12 * 1024 * 1024,
+		clientStream: 6 * 1024 * 1024,
+		clientConn:   8 * 1024 * 1024,
+	}
+	testAccountCheckWindowSize(t, wc)
+}
+
+func TestAccountCheckWindowSizeWithSmallWindow(t *testing.T) {
+	wc := windowSizeConfig{
+		serverStream: defaultWindowSize,
+		// Note this is smaller than initialConnWindowSize which is the current default.
+		serverConn:   defaultWindowSize,
+		clientStream: defaultWindowSize,
+		clientConn:   defaultWindowSize,
+	}
+	testAccountCheckWindowSize(t, wc)
+}
+
+func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
+	serverConfig := &ServerConfig{
+		InitialWindowSize:     wc.serverStream,
+		InitialConnWindowSize: wc.serverConn,
+	}
+	connectOptions := ConnectOptions{
+		InitialWindowSize:     wc.clientStream,
+		InitialConnWindowSize: wc.clientConn,
+	}
+	server, client := setUpWithOptions(t, 0, serverConfig, suspended, connectOptions)
+	defer server.stop()
+	defer client.Close()
+
+	// Wait for server conns to be populated with new server transport.
+	waitWhileTrue(t, func() (bool, error) {
+		server.mu.Lock()
+		defer server.mu.Unlock()
+		if len(server.conns) == 0 {
+			return true, fmt.Errorf("timed out waiting for server transport to be created")
+		}
+		return false, nil
+	})
+	var st *http2Server
+	server.mu.Lock()
+	for k := range server.conns {
+		st = k.(*http2Server)
+	}
+	server.mu.Unlock()
+	ct := client.(*http2Client)
+	cstream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+	if err != nil {
+		t.Fatalf("Failed to create stream. Err: %v", err)
+	}
+	// Wait for server to receive headers.
+	waitWhileTrue(t, func() (bool, error) {
+		st.mu.Lock()
+		defer st.mu.Unlock()
+		if len(st.activeStreams) == 0 {
+			return true, fmt.Errorf("timed out waiting for server to receive headers")
+		}
+		return false, nil
+	})
+	// Sleeping to make sure the settings are applied in case of negative test.
+	time.Sleep(time.Second)
+
+	waitWhileTrue(t, func() (bool, error) {
+		if lim := st.fc.limit; lim != uint32(serverConfig.InitialConnWindowSize) {
+			return true, fmt.Errorf("Server transport flow control window size: got %v, want %v", lim, serverConfig.InitialConnWindowSize)
+		}
+		return false, nil
+	})
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+	serverSendQuota, err := wait(ctx, nil, nil, nil, st.sendQuotaPool.acquire())
+	if err != nil {
+		t.Fatalf("Error while acquiring sendQuota on server. Err: %v", err)
+	}
+	cancel()
+	st.sendQuotaPool.add(serverSendQuota)
+	if serverSendQuota != int(connectOptions.InitialConnWindowSize) {
+		t.Fatalf("Server send quota(%v) not equal to client's window size(%v) on conn.", serverSendQuota, connectOptions.InitialConnWindowSize)
+	}
+	st.mu.Lock()
+	if st.streamSendQuota != uint32(connectOptions.InitialWindowSize) {
+		t.Fatalf("Server stream send quota(%v) not equal to client's window size(%v) on stream.", ct.streamSendQuota, connectOptions.InitialWindowSize)
+	}
+	st.mu.Unlock()
+	if ct.fc.limit != uint32(connectOptions.InitialConnWindowSize) {
+		t.Fatalf("Client transport flow control window size is %v, want %v", ct.fc.limit, connectOptions.InitialConnWindowSize)
+	}
+	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
+	clientSendQuota, err := wait(ctx, nil, nil, nil, ct.sendQuotaPool.acquire())
+	if err != nil {
+		t.Fatalf("Error while acquiring sendQuota on client. Err: %v", err)
+	}
+	cancel()
+	ct.sendQuotaPool.add(clientSendQuota)
+	if clientSendQuota != int(serverConfig.InitialConnWindowSize) {
+		t.Fatalf("Client send quota(%v) not equal to server's window size(%v) on conn.", clientSendQuota, serverConfig.InitialConnWindowSize)
+	}
+	ct.mu.Lock()
+	if ct.streamSendQuota != uint32(serverConfig.InitialWindowSize) {
+		t.Fatalf("Client stream send quota(%v) not equal to server's window size(%v) on stream.", ct.streamSendQuota, serverConfig.InitialWindowSize)
+	}
+	ct.mu.Unlock()
+	if cstream.fc.limit != uint32(connectOptions.InitialWindowSize) {
+		t.Fatalf("Client stream flow control window size is %v, want %v", cstream.fc.limit, connectOptions.InitialWindowSize)
+	}
+	var sstream *Stream
+	st.mu.Lock()
+	for _, v := range st.activeStreams {
+		sstream = v
+	}
+	st.mu.Unlock()
+	if sstream.fc.limit != uint32(serverConfig.InitialWindowSize) {
+		t.Fatalf("Server stream flow control window size is %v, want %v", sstream.fc.limit, serverConfig.InitialWindowSize)
+	}
+}
+
+func waitWhileTrue(t *testing.T, condition func() (bool, error)) {
+	var (
+		wait bool
+		err  error
+	)
+	timer := time.NewTimer(time.Second * 5)
+	for {
+		wait, err = condition()
+		if wait {
+			select {
+			case <-timer.C:
+				t.Fatalf(err.Error())
+			default:
+				time.Sleep(50 * time.Millisecond)
+				continue
+			}
+		}
+		if !timer.Stop() {
+			<-timer.C
+		}
+		break
+	}
+}