Decouple transport flow control from application read. (#1265)
* Decouple transport flow control from application read.
* post-review update
* Added comment in http2_server as well.
* Added another test
* Fixed typos in comments.
diff --git a/transport/control.go b/transport/control.go
index 68dfdd5..3db471a 100644
--- a/transport/control.go
+++ b/transport/control.go
@@ -240,11 +240,3 @@
}
return 0
}
-
-func (f *inFlow) resetPendingData() uint32 {
- f.mu.Lock()
- defer f.mu.Unlock()
- n := f.pendingData
- f.pendingData = 0
- return n
-}
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 713f762..d27199b 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -595,11 +595,6 @@
s.mu.Lock()
rstStream = s.rstStream
rstError = s.rstError
- if q := s.fc.resetPendingData(); q > 0 {
- if n := t.fc.onRead(q); n > 0 {
- t.controlBuf.put(&windowUpdate{0, n})
- }
- }
if s.state == streamDone {
s.mu.Unlock()
return
@@ -831,9 +826,6 @@
if s.state == streamDone {
return
}
- if w := t.fc.onRead(n); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
if w := s.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
@@ -845,22 +837,26 @@
t.notifyError(connectionErrorf(true, err, "%v", err))
return
}
+ // Decouple connection's flow control from application's read.
+ // An update on connection's flow control should not depend on
+ // whether user application has read the data or not. Such a
+ // restriction is already imposed on the stream's flow control,
+ // and therefore the sender will be blocked anyways.
+ // Decoupling the connection flow control will prevent other
+ // active(fast) streams from starving in presence of slow or
+ // inactive streams.
+ if w := t.fc.onRead(uint32(size)); w > 0 {
+ t.controlBuf.put(&windowUpdate{0, w})
+ }
// Select the right stream to dispatch.
s, ok := t.getStream(f)
if !ok {
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
return
}
if size > 0 {
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
- // The stream has been closed. Release the corresponding quota.
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
return
}
if err := s.fc.onData(uint32(size)); err != nil {
@@ -872,9 +868,6 @@
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
- if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
diff --git a/transport/http2_server.go b/transport/http2_server.go
index 559d28d..130a39d 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -462,9 +462,6 @@
if s.state == streamDone {
return
}
- if w := t.fc.onRead(n); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
if w := s.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
@@ -477,22 +474,26 @@
t.Close()
return
}
+ // Decouple connection's flow control from application's read.
+ // An update on connection's flow control should not depend on
+ // whether user application has read the data or not. Such a
+ // restriction is already imposed on the stream's flow control,
+ // and therefore the sender will be blocked anyways.
+ // Decoupling the connection flow control will prevent other
+ // active(fast) streams from starving in presence of slow or
+ // inactive streams.
+ if w := t.fc.onRead(uint32(size)); w > 0 {
+ t.controlBuf.put(&windowUpdate{0, w})
+ }
// Select the right stream to dispatch.
s, ok := t.getStream(f)
if !ok {
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
return
}
if size > 0 {
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
- // The stream has been closed. Release the corresponding quota.
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
return
}
if err := s.fc.onData(uint32(size)); err != nil {
@@ -502,9 +503,6 @@
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
- if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
@@ -1066,11 +1064,6 @@
// called to interrupt the potential blocking on other goroutines.
s.cancel()
s.mu.Lock()
- if q := s.fc.resetPendingData(); q > 0 {
- if w := t.fc.onRead(q); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- }
if s.state == streamDone {
s.mu.Unlock()
return
diff --git a/transport/transport_test.go b/transport/transport_test.go
index 72bd104..ec45727 100644
--- a/transport/transport_test.go
+++ b/transport/transport_test.go
@@ -481,7 +481,7 @@
}
}
-// TestKeepaliveServer tests that a server closes conneciton with a client that doesn't respond to keepalive pings.
+// TestKeepaliveServer tests that a server closes connection with a client that doesn't respond to keepalive pings.
func TestKeepaliveServer(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
@@ -1164,6 +1164,186 @@
server.stop()
}
+func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
+ connectOptions := ConnectOptions{
+ InitialWindowSize: defaultWindowSize,
+ InitialConnWindowSize: defaultWindowSize,
+ }
+ server, client := setUpWithOptions(t, 0, &ServerConfig{}, suspended, connectOptions)
+ defer server.stop()
+ defer client.Close()
+
+ waitWhileTrue(t, func() (bool, error) {
+ server.mu.Lock()
+ defer server.mu.Unlock()
+
+ if len(server.conns) == 0 {
+ return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
+ }
+ return false, nil
+ })
+
+ var st *http2Server
+ server.mu.Lock()
+ for k := range server.conns {
+ st = k.(*http2Server)
+ }
+ server.mu.Unlock()
+ cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+ if err != nil {
+ t.Fatalf("Client failed to create first stream. Err: %v", err)
+ }
+
+ var sstream1 *Stream
+ // Access stream on the server.
+ waitWhileTrue(t, func() (bool, error) {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+
+ if len(st.activeStreams) != 1 {
+ return true, fmt.Errorf("timed-out while waiting for server to have created a stream")
+ }
+ for _, v := range st.activeStreams {
+ sstream1 = v
+ }
+ return false, nil
+ })
+
+ // Exhaust client's connection window.
+ <-st.writableChan
+ if err := st.framer.writeData(true, sstream1.id, true, make([]byte, defaultWindowSize)); err != nil {
+ st.writableChan <- 0
+ t.Fatalf("Server failed to write data. Err: %v", err)
+ }
+ st.writableChan <- 0
+ // Create another stream on client.
+ cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+ if err != nil {
+ t.Fatalf("Client failed to create second stream. Err: %v", err)
+ }
+
+ var sstream2 *Stream
+ waitWhileTrue(t, func() (bool, error) {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+
+ if len(st.activeStreams) != 2 {
+ return true, fmt.Errorf("timed-out while waiting for server to have created the second stream")
+ }
+ for _, v := range st.activeStreams {
+ if v.id == cstream2.id {
+ sstream2 = v
+ }
+ }
+ if sstream2 == nil {
+ return true, fmt.Errorf("didn't find stream corresponding to client cstream.id: %v on the server", cstream2.id)
+ }
+ return false, nil
+ })
+
+ // Server should be able to send data on the new stream, even though the client hasn't read anything on the first stream.
+ <-st.writableChan
+ if err := st.framer.writeData(true, sstream2.id, true, make([]byte, defaultWindowSize)); err != nil {
+ st.writableChan <- 0
+ t.Fatalf("Server failed to write data. Err: %v", err)
+ }
+ st.writableChan <- 0
+
+ // Client should be able to read data on second stream.
+ if _, err := cstream2.Read(make([]byte, defaultWindowSize)); err != nil {
+ t.Fatalf("_.Read(_) = _, %v, want _, <nil>", err)
+ }
+
+ // Client should be able to read data on first stream.
+ if _, err := cstream1.Read(make([]byte, defaultWindowSize)); err != nil {
+ t.Fatalf("_.Read(_) = _, %v, want _, <nil>", err)
+ }
+}
+
+func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
+ serverConfig := &ServerConfig{
+ InitialWindowSize: defaultWindowSize,
+ InitialConnWindowSize: defaultWindowSize,
+ }
+ server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ defer server.stop()
+ defer client.Close()
+ waitWhileTrue(t, func() (bool, error) {
+ server.mu.Lock()
+ defer server.mu.Unlock()
+
+ if len(server.conns) == 0 {
+ return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
+ }
+ return false, nil
+ })
+ var st *http2Server
+ server.mu.Lock()
+ for k := range server.conns {
+ st = k.(*http2Server)
+ }
+ server.mu.Unlock()
+ cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+ if err != nil {
+ t.Fatalf("Failed to create 1st stream. Err: %v", err)
+ }
+ // Exhaust server's connection window.
+ if err := client.Write(cstream1, make([]byte, defaultWindowSize), &Options{Last: true}); err != nil {
+ t.Fatalf("Client failed to write data. Err: %v", err)
+ }
+ //Client should be able to create another stream and send data on it.
+ cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+ if err != nil {
+ t.Fatalf("Failed to create 2nd stream. Err: %v", err)
+ }
+ if err := client.Write(cstream2, make([]byte, defaultWindowSize), &Options{}); err != nil {
+ t.Fatalf("Client failed to write data. Err: %v", err)
+ }
+ // Get the streams on server.
+ waitWhileTrue(t, func() (bool, error) {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+
+ if len(st.activeStreams) != 2 {
+ return true, fmt.Errorf("timed-out while waiting for server to have created the streams")
+ }
+ return false, nil
+ })
+ var sstream1 *Stream
+ st.mu.Lock()
+ for _, v := range st.activeStreams {
+ if v.id == 1 {
+ sstream1 = v
+ }
+ }
+ st.mu.Unlock()
+ // Trying to write more on a max-ed out stream should result in a RST_STREAM from the server.
+ ct := client.(*http2Client)
+ <-ct.writableChan
+ if err := ct.framer.writeData(true, cstream2.id, true, make([]byte, 1)); err != nil {
+ t.Fatalf("Client failed to write. Err: %v", err)
+ }
+ ct.writableChan <- 0
+ code := http2ErrConvTab[http2.ErrCodeFlowControl]
+ waitWhileTrue(t, func() (bool, error) {
+ cstream2.mu.Lock()
+ defer cstream2.mu.Unlock()
+ if cstream2.status.Code() != code {
+ return true, fmt.Errorf("want code = %v, got %v", code, cstream2.status.Code())
+ }
+ return false, nil
+ })
+ // Reading from the stream on server should succeed.
+ if _, err := sstream1.Read(make([]byte, defaultWindowSize)); err != nil {
+ t.Fatalf("_.Read(_) = %v, want <nil>", err)
+ }
+
+ if _, err := sstream1.Read(make([]byte, 1)); err != io.EOF {
+ t.Fatalf("_.Read(_) = %v, want io.EOF", err)
+ }
+
+}
+
func TestServerWithMisbehavedClient(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, suspended)
callHdr := &CallHdr{
@@ -1224,7 +1404,7 @@
}
ss.fc.mu.Unlock()
}
- if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != http2MaxFrameLen || sc.fc.pendingUpdate != 0 {
+ if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != http2MaxFrameLen {
t.Fatalf("Server mistakenly updates inbound flow control params: got %d, %d, %d, %d; want %d, %d, %d, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, http2MaxFrameLen, 0, http2MaxFrameLen, 0)
}
// Keep sending until the server inbound window is drained for that stream.
@@ -1245,24 +1425,10 @@
t.Fatalf("%v got status %v; want Code=%v", s, s.status, code)
}
- if ss.fc.pendingData != 0 || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize {
- t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, >%d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize)
+ if sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize {
+ t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize)
}
ct.CloseStream(s, nil)
- // Test server behavior for violation of connection flow control window size restriction.
- //
- // Keep creating new streams until the connection window is drained on the server and
- // the server tears down the connection.
- for {
- s, err := ct.NewStream(context.Background(), callHdr)
- if err != nil {
- // The server tears down the connection.
- break
- }
- <-cc.writableChan
- cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen))
- cc.writableChan <- 0
- }
ct.Close()
server.stop()
}
@@ -1293,7 +1459,7 @@
break
}
}
- if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData <= initialWindowSize || conn.fc.pendingUpdate != 0 {
+ if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Client mistakenly updates inbound flow control params: got %d, %d, %d, %d; want >%d, %d, >%d, %d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize, 0, initialWindowSize, 0)
}
@@ -1305,25 +1471,9 @@
}
conn.CloseStream(s, err)
- if s.fc.pendingData != 0 || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
- t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, >%d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize)
+ if conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
+ t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize)
}
- // Test the logic for the violation of the connection flow control window size restriction.
- //
- // Generate enough streams to drain the connection window. Make the server flood the traffic
- // to violate flow control window size of the connection.
- callHdr.Method = "foo.Connection"
- for i := 0; i < int(initialConnWindowSize/initialWindowSize+10); i++ {
- s, err := ct.NewStream(context.Background(), callHdr)
- if err != nil {
- break
- }
- if err := ct.Write(s, d, &Options{Last: true, Delay: false}); err != nil {
- break
- }
- }
- // http2Client.errChan is closed due to connection flow control window size violation.
- <-conn.Error()
ct.Close()
server.stop()
}