Remove self-imposed limit on max concurrent streams if the server doesn't impose any. (#1624)
* Remove self-imposed limit on max concurrent streams if the server allows it.
* Remove test necessitating buggy behavior.
diff --git a/test/end2end_test.go b/test/end2end_test.go
index be9eb27..6d59d29 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -3675,52 +3675,6 @@
}
}
-const defaultMaxStreamsClient = 100
-
-func TestExceedDefaultMaxStreamsLimit(t *testing.T) {
- defer leakcheck.Check(t)
- for _, e := range listTestEnv() {
- if e.name == "handler-tls" {
- // The default max stream limit in handler_server is not 100?
- continue
- }
- testExceedDefaultMaxStreamsLimit(t, e)
- }
-}
-
-func testExceedDefaultMaxStreamsLimit(t *testing.T, e env) {
- te := newTest(t, e)
- te.declareLogNoise(
- "http2Client.notifyError got notified that the client transport was broken",
- "Conn.resetTransport failed to create client transport",
- "grpc: the connection is closing",
- )
- // When masStream is set to 0 the server doesn't send a settings frame for
- // MaxConcurrentStreams, essentially allowing infinite (math.MaxInt32) streams.
- // In such a case, there should be a default cap on the client-side.
- te.maxStream = 0
- te.startServer(&testServer{security: e.security})
- defer te.tearDown()
-
- cc := te.clientConn()
- tc := testpb.NewTestServiceClient(cc)
-
- // Create as many streams as a client can.
- for i := 0; i < defaultMaxStreamsClient; i++ {
- if _, err := tc.StreamingInputCall(te.ctx); err != nil {
- t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
- }
- }
-
- // Trying to create one more should timeout.
- ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
- defer cancel()
- _, err := tc.StreamingInputCall(ctx)
- if err == nil || grpc.Code(err) != codes.DeadlineExceeded {
- t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
- }
-}
-
func TestStreamsQuotaRecovery(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 1057512..90cb590 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -918,15 +918,28 @@
s.write(recvMsg{err: io.EOF})
}
-func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
+func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
if f.IsAck() {
return
}
var ss []http2.Setting
+ isMaxConcurrentStreamsMissing := true
f.ForeachSetting(func(s http2.Setting) error {
+ if s.ID == http2.SettingMaxConcurrentStreams {
+ isMaxConcurrentStreamsMissing = false
+ }
ss = append(ss, s)
return nil
})
+ if isFirst && isMaxConcurrentStreamsMissing {
+ // This means server is imposing no limits on
+ // maximum number of concurrent streams initiated by client.
+ // So we must remove our self-imposed limit.
+ ss = append(ss, http2.Setting{
+ ID: http2.SettingMaxConcurrentStreams,
+ Val: math.MaxUint32,
+ })
+ }
// The settings will be applied once the ack is sent.
t.controlBuf.put(&settings{ack: true, ss: ss})
}
@@ -1125,7 +1138,7 @@
t.Close()
return
}
- t.handleSettings(sf)
+ t.handleSettings(sf, true)
// loop to keep reading incoming messages on this transport.
for {
@@ -1158,7 +1171,7 @@
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
- t.handleSettings(frame)
+ t.handleSettings(frame, false)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.GoAwayFrame: