Fix settings ack race (#1630)
* First commit.
* fixed a bug
* mend
* remove debug residual
* Apply settings in the reader goroutine instead of writer.
* Post-review updates
* Post-review update.
diff --git a/transport/control.go b/transport/control.go
index dd1a8d4..3ccd746 100644
--- a/transport/control.go
+++ b/transport/control.go
@@ -89,12 +89,16 @@
func (*windowUpdate) item() {}
type settings struct {
- ack bool
- ss []http2.Setting
+ ss []http2.Setting
}
func (*settings) item() {}
+type settingsAck struct {
+}
+
+func (*settingsAck) item() {}
+
type resetStream struct {
streamID uint32
code http2.ErrCode
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 90cb590..983a2fb 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -805,7 +805,6 @@
t.mu.Unlock()
t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
t.controlBuf.put(&settings{
- ack: false,
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
@@ -922,26 +921,44 @@
if f.IsAck() {
return
}
- var ss []http2.Setting
+ var rs []http2.Setting
+ var ps []http2.Setting
isMaxConcurrentStreamsMissing := true
f.ForeachSetting(func(s http2.Setting) error {
if s.ID == http2.SettingMaxConcurrentStreams {
isMaxConcurrentStreamsMissing = false
}
- ss = append(ss, s)
+ if t.isRestrictive(s) {
+ rs = append(rs, s)
+ } else {
+ ps = append(ps, s)
+ }
return nil
})
if isFirst && isMaxConcurrentStreamsMissing {
// This means server is imposing no limits on
// maximum number of concurrent streams initiated by client.
// So we must remove our self-imposed limit.
- ss = append(ss, http2.Setting{
+ ps = append(ps, http2.Setting{
ID: http2.SettingMaxConcurrentStreams,
Val: math.MaxUint32,
})
}
- // The settings will be applied once the ack is sent.
- t.controlBuf.put(&settings{ack: true, ss: ss})
+ t.applySettings(rs)
+ t.controlBuf.put(&settingsAck{})
+ t.applySettings(ps)
+}
+
+func (t *http2Client) isRestrictive(s http2.Setting) bool {
+ switch s.ID {
+ case http2.SettingMaxConcurrentStreams:
+ return int(s.Val) < t.maxStreams
+ case http2.SettingInitialWindowSize:
+ // Note: we don't acquire a lock here to read streamSendQuota
+ // because the same goroutine updates it later.
+ return s.Val < t.streamSendQuota
+ }
+ return false
}
func (t *http2Client) handlePing(f *http2.PingFrame) {
@@ -1194,10 +1211,8 @@
if s.Val > math.MaxInt32 {
s.Val = math.MaxInt32
}
- t.mu.Lock()
ms := t.maxStreams
t.maxStreams = int(s.Val)
- t.mu.Unlock()
t.streamsQuota.add(int(s.Val) - ms)
case http2.SettingInitialWindowSize:
t.mu.Lock()
@@ -1216,6 +1231,11 @@
// The transport layer needs to be refactored to take care of this.
func (t *http2Client) itemHandler(i item) error {
var err error
+ defer func() {
+ if err != nil {
+ errorf(" error in itemHandler: %v", err)
+ }
+ }()
switch i := i.(type) {
case *dataFrame:
err = t.framer.fr.WriteData(i.streamID, i.endStream, i.d)
@@ -1258,12 +1278,9 @@
case *windowUpdate:
err = t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
case *settings:
- if i.ack {
- t.applySettings(i.ss)
- err = t.framer.fr.WriteSettingsAck()
- } else {
- err = t.framer.fr.WriteSettings(i.ss...)
- }
+ err = t.framer.fr.WriteSettings(i.ss...)
+ case *settingsAck:
+ err = t.framer.fr.WriteSettingsAck()
case *resetStream:
// If the server needs to be to intimated about stream closing,
// then we need to make sure the RST_STREAM frame is written to
diff --git a/transport/http2_server.go b/transport/http2_server.go
index 6582024..61f5c7c 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -496,7 +496,6 @@
t.mu.Unlock()
t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
t.controlBuf.put(&settings{
- ack: false,
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
@@ -594,12 +593,29 @@
if f.IsAck() {
return
}
- var ss []http2.Setting
+ var rs []http2.Setting
+ var ps []http2.Setting
f.ForeachSetting(func(s http2.Setting) error {
- ss = append(ss, s)
+ if t.isRestrictive(s) {
+ rs = append(rs, s)
+ } else {
+ ps = append(ps, s)
+ }
return nil
})
- t.controlBuf.put(&settings{ack: true, ss: ss})
+ t.applySettings(rs)
+ t.controlBuf.put(&settingsAck{})
+ t.applySettings(ps)
+}
+
+func (t *http2Server) isRestrictive(s http2.Setting) bool {
+ switch s.ID {
+ case http2.SettingInitialWindowSize:
+ // Note: we don't acquire a lock here to read streamSendQuota
+ // because the same goroutine updates it later.
+ return s.Val < t.streamSendQuota
+ }
+ return false
}
func (t *http2Server) applySettings(ss []http2.Setting) {
@@ -666,7 +682,7 @@
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
- errorf("transport: Got to too many pings from the client, closing the connection.")
+ errorf("transport: Got too many pings from the client, closing the connection.")
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}
@@ -1050,11 +1066,9 @@
case *windowUpdate:
return t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
case *settings:
- if i.ack {
- t.applySettings(i.ss)
- return t.framer.fr.WriteSettingsAck()
- }
return t.framer.fr.WriteSettings(i.ss...)
+ case *settingsAck:
+ return t.framer.fr.WriteSettingsAck()
case *resetStream:
return t.framer.fr.WriteRSTStream(i.streamID, i.code)
case *goAway:
diff --git a/transport/transport.go b/transport/transport.go
index e0651c4..9f4dbb2 100644
--- a/transport/transport.go
+++ b/transport/transport.go
@@ -745,6 +745,7 @@
case i := <-cbuf.get():
cbuf.load()
if err := handler(i); err != nil {
+ errorf("transport: Error while handling item. Err: %v", err)
return
}
case <-ctx.Done():
@@ -756,12 +757,14 @@
case i := <-cbuf.get():
cbuf.load()
if err := handler(i); err != nil {
+ errorf("transport: Error while handling item. Err: %v", err)
return
}
case <-ctx.Done():
return
default:
if err := handler(&flushIO{}); err != nil {
+ errorf("transport: Error while flushing. Err: %v", err)
return
}
break hasData
diff --git a/transport/transport_test.go b/transport/transport_test.go
index 6960254..89fdaa5 100644
--- a/transport/transport_test.go
+++ b/transport/transport_test.go
@@ -1093,31 +1093,32 @@
}
}
}()
+ var failureReason string
+ // Test these conditions untill they pass or
+ // we reach the deadline (failure case).
for {
select {
case <-ch:
case <-done:
- t.Fatalf("Client has not received the max stream setting in 5 seconds.")
+ t.Fatalf(failureReason)
}
- cc.mu.Lock()
- // cc.maxStreams should be equal to 1 after having received settings frame from
- // server.
- if cc.maxStreams == 1 {
- cc.mu.Unlock()
- select {
- case <-cc.streamsQuota.acquire():
- t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.")
- default:
- cc.streamsQuota.mu.Lock()
- quota := cc.streamsQuota.quota
- cc.streamsQuota.mu.Unlock()
- if quota != 0 {
- t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.")
- }
+ select {
+ case q := <-cc.streamsQuota.acquire():
+ failureReason = "streamsQuota.acquire() becomes readable mistakenly."
+ cc.streamsQuota.add(q)
+ default:
+ cc.streamsQuota.mu.Lock()
+ quota := cc.streamsQuota.quota
+ cc.streamsQuota.mu.Unlock()
+ if quota != 0 {
+ failureReason = "streamsQuota.quota got non-zero quota mistakenly."
+ } else {
+ failureReason = ""
}
+ }
+ if failureReason == "" {
break
}
- cc.mu.Unlock()
}
close(ready)
// Close the pending stream so that the streams quota becomes available for the next new stream.