Piggyback window updates for connection with those of a stream. (#1273)
diff --git a/transport/control.go b/transport/control.go
index 3db471a..beff034 100644
--- a/transport/control.go
+++ b/transport/control.go
@@ -68,6 +68,7 @@
type windowUpdate struct {
streamID uint32
increment uint32
+ flush bool
}
func (*windowUpdate) item() {}
@@ -240,3 +241,11 @@
}
return 0
}
+
+func (f *inFlow) resetPendingUpdate() uint32 {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ n := f.pendingUpdate
+ f.pendingUpdate = 0
+ return n
+}
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 27a72a0..92b5180 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -813,7 +813,11 @@
return
}
if w := s.fc.maybeAdjust(n); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
+ // Piggyback conneciton's window update along.
+ if cw := t.fc.resetPendingUpdate(); cw > 0 {
+ t.controlBuf.put(&windowUpdate{0, cw, false})
+ }
+ t.controlBuf.put(&windowUpdate{s.id, w, true})
}
}
@@ -827,7 +831,10 @@
return
}
if w := s.fc.onRead(n); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
+ if cw := t.fc.resetPendingUpdate(); cw > 0 {
+ t.controlBuf.put(&windowUpdate{0, cw, false})
+ }
+ t.controlBuf.put(&windowUpdate{s.id, w, true})
}
}
@@ -846,7 +853,7 @@
// active(fast) streams from starving in presence of slow or
// inactive streams.
if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
+ t.controlBuf.put(&windowUpdate{0, w, true})
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
@@ -869,7 +876,7 @@
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
+ t.controlBuf.put(&windowUpdate{s.id, w, true})
}
}
s.mu.Unlock()
@@ -1185,7 +1192,7 @@
case <-t.writableChan:
switch i := i.(type) {
case *windowUpdate:
- t.framer.writeWindowUpdate(true, i.streamID, i.increment)
+ t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
case *settings:
if i.ack {
t.framer.writeSettingsAck(true)
diff --git a/transport/http2_server.go b/transport/http2_server.go
index 85590d2..3a18f30 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -449,7 +449,10 @@
return
}
if w := s.fc.maybeAdjust(n); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
+ if cw := t.fc.resetPendingUpdate(); cw > 0 {
+ t.controlBuf.put(&windowUpdate{0, cw, false})
+ }
+ t.controlBuf.put(&windowUpdate{s.id, w, true})
}
}
@@ -463,7 +466,10 @@
return
}
if w := s.fc.onRead(n); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
+ if cw := t.fc.resetPendingUpdate(); cw > 0 {
+ t.controlBuf.put(&windowUpdate{0, cw, false})
+ }
+ t.controlBuf.put(&windowUpdate{s.id, w, true})
}
}
@@ -483,7 +489,7 @@
// active(fast) streams from starving in presence of slow or
// inactive streams.
if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
+ t.controlBuf.put(&windowUpdate{0, w, true})
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
@@ -504,7 +510,7 @@
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
+ t.controlBuf.put(&windowUpdate{s.id, w, true})
}
}
s.mu.Unlock()
@@ -979,7 +985,7 @@
case <-t.writableChan:
switch i := i.(type) {
case *windowUpdate:
- t.framer.writeWindowUpdate(true, i.streamID, i.increment)
+ t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
case *settings:
if i.ack {
t.framer.writeSettingsAck(true)