Fix settings ack race (#1630)

* First commit.

* fixed a bug

* mend

* remove debug residual

* Apply settings in the reader goroutine instead of writer.

* Post-review updates

* Post-review update.
diff --git a/transport/control.go b/transport/control.go
index dd1a8d4..3ccd746 100644
--- a/transport/control.go
+++ b/transport/control.go
@@ -89,12 +89,16 @@
 func (*windowUpdate) item() {}
 
 type settings struct {
-	ack bool
-	ss  []http2.Setting
+	ss []http2.Setting
 }
 
 func (*settings) item() {}
 
+type settingsAck struct {
+}
+
+func (*settingsAck) item() {}
+
 type resetStream struct {
 	streamID uint32
 	code     http2.ErrCode
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 90cb590..983a2fb 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -805,7 +805,6 @@
 	t.mu.Unlock()
 	t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
 	t.controlBuf.put(&settings{
-		ack: false,
 		ss: []http2.Setting{
 			{
 				ID:  http2.SettingInitialWindowSize,
@@ -922,26 +921,44 @@
 	if f.IsAck() {
 		return
 	}
-	var ss []http2.Setting
+	var rs []http2.Setting
+	var ps []http2.Setting
 	isMaxConcurrentStreamsMissing := true
 	f.ForeachSetting(func(s http2.Setting) error {
 		if s.ID == http2.SettingMaxConcurrentStreams {
 			isMaxConcurrentStreamsMissing = false
 		}
-		ss = append(ss, s)
+		if t.isRestrictive(s) {
+			rs = append(rs, s)
+		} else {
+			ps = append(ps, s)
+		}
 		return nil
 	})
 	if isFirst && isMaxConcurrentStreamsMissing {
 		// This means server is imposing no limits on
 		// maximum number of concurrent streams initiated by client.
 		// So we must remove our self-imposed limit.
-		ss = append(ss, http2.Setting{
+		ps = append(ps, http2.Setting{
 			ID:  http2.SettingMaxConcurrentStreams,
 			Val: math.MaxUint32,
 		})
 	}
-	// The settings will be applied once the ack is sent.
-	t.controlBuf.put(&settings{ack: true, ss: ss})
+	t.applySettings(rs)
+	t.controlBuf.put(&settingsAck{})
+	t.applySettings(ps)
+}
+
+func (t *http2Client) isRestrictive(s http2.Setting) bool {
+	switch s.ID {
+	case http2.SettingMaxConcurrentStreams:
+		return int(s.Val) < t.maxStreams
+	case http2.SettingInitialWindowSize:
+		// Note: we don't acquire a lock here to read streamSendQuota
+		// because the same goroutine updates it later.
+		return s.Val < t.streamSendQuota
+	}
+	return false
 }
 
 func (t *http2Client) handlePing(f *http2.PingFrame) {
@@ -1194,10 +1211,8 @@
 			if s.Val > math.MaxInt32 {
 				s.Val = math.MaxInt32
 			}
-			t.mu.Lock()
 			ms := t.maxStreams
 			t.maxStreams = int(s.Val)
-			t.mu.Unlock()
 			t.streamsQuota.add(int(s.Val) - ms)
 		case http2.SettingInitialWindowSize:
 			t.mu.Lock()
@@ -1216,6 +1231,11 @@
 // The transport layer needs to be refactored to take care of this.
 func (t *http2Client) itemHandler(i item) error {
 	var err error
+	defer func() {
+		if err != nil {
+			errorf(" error in itemHandler: %v", err)
+		}
+	}()
 	switch i := i.(type) {
 	case *dataFrame:
 		err = t.framer.fr.WriteData(i.streamID, i.endStream, i.d)
@@ -1258,12 +1278,9 @@
 	case *windowUpdate:
 		err = t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
 	case *settings:
-		if i.ack {
-			t.applySettings(i.ss)
-			err = t.framer.fr.WriteSettingsAck()
-		} else {
-			err = t.framer.fr.WriteSettings(i.ss...)
-		}
+		err = t.framer.fr.WriteSettings(i.ss...)
+	case *settingsAck:
+		err = t.framer.fr.WriteSettingsAck()
 	case *resetStream:
 		// If the server needs to be to intimated about stream closing,
 		// then we need to make sure the RST_STREAM frame is written to
diff --git a/transport/http2_server.go b/transport/http2_server.go
index 6582024..61f5c7c 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -496,7 +496,6 @@
 	t.mu.Unlock()
 	t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
 	t.controlBuf.put(&settings{
-		ack: false,
 		ss: []http2.Setting{
 			{
 				ID:  http2.SettingInitialWindowSize,
@@ -594,12 +593,29 @@
 	if f.IsAck() {
 		return
 	}
-	var ss []http2.Setting
+	var rs []http2.Setting
+	var ps []http2.Setting
 	f.ForeachSetting(func(s http2.Setting) error {
-		ss = append(ss, s)
+		if t.isRestrictive(s) {
+			rs = append(rs, s)
+		} else {
+			ps = append(ps, s)
+		}
 		return nil
 	})
-	t.controlBuf.put(&settings{ack: true, ss: ss})
+	t.applySettings(rs)
+	t.controlBuf.put(&settingsAck{})
+	t.applySettings(ps)
+}
+
+func (t *http2Server) isRestrictive(s http2.Setting) bool {
+	switch s.ID {
+	case http2.SettingInitialWindowSize:
+		// Note: we don't acquire a lock here to read streamSendQuota
+		// because the same goroutine updates it later.
+		return s.Val < t.streamSendQuota
+	}
+	return false
 }
 
 func (t *http2Server) applySettings(ss []http2.Setting) {
@@ -666,7 +682,7 @@
 
 	if t.pingStrikes > maxPingStrikes {
 		// Send goaway and close the connection.
-		errorf("transport: Got to too many pings from the client, closing the connection.")
+		errorf("transport: Got too many pings from the client, closing the connection.")
 		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
 	}
 }
@@ -1050,11 +1066,9 @@
 	case *windowUpdate:
 		return t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
 	case *settings:
-		if i.ack {
-			t.applySettings(i.ss)
-			return t.framer.fr.WriteSettingsAck()
-		}
 		return t.framer.fr.WriteSettings(i.ss...)
+	case *settingsAck:
+		return t.framer.fr.WriteSettingsAck()
 	case *resetStream:
 		return t.framer.fr.WriteRSTStream(i.streamID, i.code)
 	case *goAway:
diff --git a/transport/transport.go b/transport/transport.go
index e0651c4..9f4dbb2 100644
--- a/transport/transport.go
+++ b/transport/transport.go
@@ -745,6 +745,7 @@
 		case i := <-cbuf.get():
 			cbuf.load()
 			if err := handler(i); err != nil {
+				errorf("transport: Error while handling item. Err: %v", err)
 				return
 			}
 		case <-ctx.Done():
@@ -756,12 +757,14 @@
 			case i := <-cbuf.get():
 				cbuf.load()
 				if err := handler(i); err != nil {
+					errorf("transport: Error while handling item. Err: %v", err)
 					return
 				}
 			case <-ctx.Done():
 				return
 			default:
 				if err := handler(&flushIO{}); err != nil {
+					errorf("transport: Error while flushing. Err: %v", err)
 					return
 				}
 				break hasData
diff --git a/transport/transport_test.go b/transport/transport_test.go
index 6960254..89fdaa5 100644
--- a/transport/transport_test.go
+++ b/transport/transport_test.go
@@ -1093,31 +1093,32 @@
 			}
 		}
 	}()
+	var failureReason string
+	// Test these conditions untill they pass or
+	// we reach the deadline (failure case).
 	for {
 		select {
 		case <-ch:
 		case <-done:
-			t.Fatalf("Client has not received the max stream setting in 5 seconds.")
+			t.Fatalf(failureReason)
 		}
-		cc.mu.Lock()
-		// cc.maxStreams should be equal to 1 after having received settings frame from
-		// server.
-		if cc.maxStreams == 1 {
-			cc.mu.Unlock()
-			select {
-			case <-cc.streamsQuota.acquire():
-				t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.")
-			default:
-				cc.streamsQuota.mu.Lock()
-				quota := cc.streamsQuota.quota
-				cc.streamsQuota.mu.Unlock()
-				if quota != 0 {
-					t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.")
-				}
+		select {
+		case q := <-cc.streamsQuota.acquire():
+			failureReason = "streamsQuota.acquire() becomes readable mistakenly."
+			cc.streamsQuota.add(q)
+		default:
+			cc.streamsQuota.mu.Lock()
+			quota := cc.streamsQuota.quota
+			cc.streamsQuota.mu.Unlock()
+			if quota != 0 {
+				failureReason = "streamsQuota.quota got non-zero quota mistakenly."
+			} else {
+				failureReason = ""
 			}
+		}
+		if failureReason == "" {
 			break
 		}
-		cc.mu.Unlock()
 	}
 	close(ready)
 	// Close the pending stream so that the streams quota becomes available for the next new stream.