Remove self-imposed limit on max concurrent streams if the server doesn't impose any. (#1624)

* Remove self-imposed limit on max concurrent streams if the server allows it.

* Remove test necessitating buggy behavior.
diff --git a/test/end2end_test.go b/test/end2end_test.go
index be9eb27..6d59d29 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -3675,52 +3675,6 @@
 	}
 }
 
-const defaultMaxStreamsClient = 100
-
-func TestExceedDefaultMaxStreamsLimit(t *testing.T) {
-	defer leakcheck.Check(t)
-	for _, e := range listTestEnv() {
-		if e.name == "handler-tls" {
-			// The default max stream limit in handler_server is not 100?
-			continue
-		}
-		testExceedDefaultMaxStreamsLimit(t, e)
-	}
-}
-
-func testExceedDefaultMaxStreamsLimit(t *testing.T, e env) {
-	te := newTest(t, e)
-	te.declareLogNoise(
-		"http2Client.notifyError got notified that the client transport was broken",
-		"Conn.resetTransport failed to create client transport",
-		"grpc: the connection is closing",
-	)
-	// When masStream is set to 0 the server doesn't send a settings frame for
-	// MaxConcurrentStreams, essentially allowing infinite (math.MaxInt32) streams.
-	// In such a case, there should be a default cap on the client-side.
-	te.maxStream = 0
-	te.startServer(&testServer{security: e.security})
-	defer te.tearDown()
-
-	cc := te.clientConn()
-	tc := testpb.NewTestServiceClient(cc)
-
-	// Create as many streams as a client can.
-	for i := 0; i < defaultMaxStreamsClient; i++ {
-		if _, err := tc.StreamingInputCall(te.ctx); err != nil {
-			t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
-		}
-	}
-
-	// Trying to create one more should timeout.
-	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
-	defer cancel()
-	_, err := tc.StreamingInputCall(ctx)
-	if err == nil || grpc.Code(err) != codes.DeadlineExceeded {
-		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
-	}
-}
-
 func TestStreamsQuotaRecovery(t *testing.T) {
 	defer leakcheck.Check(t)
 	for _, e := range listTestEnv() {
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 1057512..90cb590 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -918,15 +918,28 @@
 	s.write(recvMsg{err: io.EOF})
 }
 
-func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
+func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
 	if f.IsAck() {
 		return
 	}
 	var ss []http2.Setting
+	isMaxConcurrentStreamsMissing := true
 	f.ForeachSetting(func(s http2.Setting) error {
+		if s.ID == http2.SettingMaxConcurrentStreams {
+			isMaxConcurrentStreamsMissing = false
+		}
 		ss = append(ss, s)
 		return nil
 	})
+	if isFirst && isMaxConcurrentStreamsMissing {
+		// This means server is imposing no limits on
+		// maximum number of concurrent streams initiated by client.
+		// So we must remove our self-imposed limit.
+		ss = append(ss, http2.Setting{
+			ID:  http2.SettingMaxConcurrentStreams,
+			Val: math.MaxUint32,
+		})
+	}
 	// The settings will be applied once the ack is sent.
 	t.controlBuf.put(&settings{ack: true, ss: ss})
 }
@@ -1125,7 +1138,7 @@
 		t.Close()
 		return
 	}
-	t.handleSettings(sf)
+	t.handleSettings(sf, true)
 
 	// loop to keep reading incoming messages on this transport.
 	for {
@@ -1158,7 +1171,7 @@
 		case *http2.RSTStreamFrame:
 			t.handleRSTStream(frame)
 		case *http2.SettingsFrame:
-			t.handleSettings(frame)
+			t.handleSettings(frame, false)
 		case *http2.PingFrame:
 			t.handlePing(frame)
 		case *http2.GoAwayFrame: