Decouple transport flow control from application read. (#1265)

* Decouple transport flow control from application read.

* post-review update

* Added comment in http2_server as well.

* Added another test

* Fixed typos in comments.
diff --git a/transport/control.go b/transport/control.go
index 68dfdd5..3db471a 100644
--- a/transport/control.go
+++ b/transport/control.go
@@ -240,11 +240,3 @@
 	}
 	return 0
 }
-
-func (f *inFlow) resetPendingData() uint32 {
-	f.mu.Lock()
-	defer f.mu.Unlock()
-	n := f.pendingData
-	f.pendingData = 0
-	return n
-}
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 713f762..d27199b 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -595,11 +595,6 @@
 	s.mu.Lock()
 	rstStream = s.rstStream
 	rstError = s.rstError
-	if q := s.fc.resetPendingData(); q > 0 {
-		if n := t.fc.onRead(q); n > 0 {
-			t.controlBuf.put(&windowUpdate{0, n})
-		}
-	}
 	if s.state == streamDone {
 		s.mu.Unlock()
 		return
@@ -831,9 +826,6 @@
 	if s.state == streamDone {
 		return
 	}
-	if w := t.fc.onRead(n); w > 0 {
-		t.controlBuf.put(&windowUpdate{0, w})
-	}
 	if w := s.fc.onRead(n); w > 0 {
 		t.controlBuf.put(&windowUpdate{s.id, w})
 	}
@@ -845,22 +837,26 @@
 		t.notifyError(connectionErrorf(true, err, "%v", err))
 		return
 	}
+	// Decouple connection's flow control from application's read.
+	// An update on connection's flow control should not depend on
+	// whether user application has read the data or not. Such a
+	// restriction is already imposed on the stream's flow control,
+	// and therefore the sender will be blocked anyways.
+	// Decoupling the connection flow control will prevent other
+	// active(fast) streams from starving in presence of slow or
+	// inactive streams.
+	if w := t.fc.onRead(uint32(size)); w > 0 {
+		t.controlBuf.put(&windowUpdate{0, w})
+	}
 	// Select the right stream to dispatch.
 	s, ok := t.getStream(f)
 	if !ok {
-		if w := t.fc.onRead(uint32(size)); w > 0 {
-			t.controlBuf.put(&windowUpdate{0, w})
-		}
 		return
 	}
 	if size > 0 {
 		s.mu.Lock()
 		if s.state == streamDone {
 			s.mu.Unlock()
-			// The stream has been closed. Release the corresponding quota.
-			if w := t.fc.onRead(uint32(size)); w > 0 {
-				t.controlBuf.put(&windowUpdate{0, w})
-			}
 			return
 		}
 		if err := s.fc.onData(uint32(size)); err != nil {
@@ -872,9 +868,6 @@
 			return
 		}
 		if f.Header().Flags.Has(http2.FlagDataPadded) {
-			if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
-				t.controlBuf.put(&windowUpdate{0, w})
-			}
 			if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
 				t.controlBuf.put(&windowUpdate{s.id, w})
 			}
diff --git a/transport/http2_server.go b/transport/http2_server.go
index 559d28d..130a39d 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -462,9 +462,6 @@
 	if s.state == streamDone {
 		return
 	}
-	if w := t.fc.onRead(n); w > 0 {
-		t.controlBuf.put(&windowUpdate{0, w})
-	}
 	if w := s.fc.onRead(n); w > 0 {
 		t.controlBuf.put(&windowUpdate{s.id, w})
 	}
@@ -477,22 +474,26 @@
 		t.Close()
 		return
 	}
+	// Decouple connection's flow control from application's read.
+	// An update on connection's flow control should not depend on
+	// whether user application has read the data or not. Such a
+	// restriction is already imposed on the stream's flow control,
+	// and therefore the sender will be blocked anyways.
+	// Decoupling the connection flow control will prevent other
+	// active(fast) streams from starving in presence of slow or
+	// inactive streams.
+	if w := t.fc.onRead(uint32(size)); w > 0 {
+		t.controlBuf.put(&windowUpdate{0, w})
+	}
 	// Select the right stream to dispatch.
 	s, ok := t.getStream(f)
 	if !ok {
-		if w := t.fc.onRead(uint32(size)); w > 0 {
-			t.controlBuf.put(&windowUpdate{0, w})
-		}
 		return
 	}
 	if size > 0 {
 		s.mu.Lock()
 		if s.state == streamDone {
 			s.mu.Unlock()
-			// The stream has been closed. Release the corresponding quota.
-			if w := t.fc.onRead(uint32(size)); w > 0 {
-				t.controlBuf.put(&windowUpdate{0, w})
-			}
 			return
 		}
 		if err := s.fc.onData(uint32(size)); err != nil {
@@ -502,9 +503,6 @@
 			return
 		}
 		if f.Header().Flags.Has(http2.FlagDataPadded) {
-			if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
-				t.controlBuf.put(&windowUpdate{0, w})
-			}
 			if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
 				t.controlBuf.put(&windowUpdate{s.id, w})
 			}
@@ -1066,11 +1064,6 @@
 	// called to interrupt the potential blocking on other goroutines.
 	s.cancel()
 	s.mu.Lock()
-	if q := s.fc.resetPendingData(); q > 0 {
-		if w := t.fc.onRead(q); w > 0 {
-			t.controlBuf.put(&windowUpdate{0, w})
-		}
-	}
 	if s.state == streamDone {
 		s.mu.Unlock()
 		return
diff --git a/transport/transport_test.go b/transport/transport_test.go
index 72bd104..ec45727 100644
--- a/transport/transport_test.go
+++ b/transport/transport_test.go
@@ -481,7 +481,7 @@
 	}
 }
 
-// TestKeepaliveServer tests that a server closes conneciton with a client that doesn't respond to keepalive pings.
+// TestKeepaliveServer tests that a server closes connection with a client that doesn't respond to keepalive pings.
 func TestKeepaliveServer(t *testing.T) {
 	serverConfig := &ServerConfig{
 		KeepaliveParams: keepalive.ServerParameters{
@@ -1164,6 +1164,186 @@
 	server.stop()
 }
 
+func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
+	connectOptions := ConnectOptions{
+		InitialWindowSize:     defaultWindowSize,
+		InitialConnWindowSize: defaultWindowSize,
+	}
+	server, client := setUpWithOptions(t, 0, &ServerConfig{}, suspended, connectOptions)
+	defer server.stop()
+	defer client.Close()
+
+	waitWhileTrue(t, func() (bool, error) {
+		server.mu.Lock()
+		defer server.mu.Unlock()
+
+		if len(server.conns) == 0 {
+			return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
+		}
+		return false, nil
+	})
+
+	var st *http2Server
+	server.mu.Lock()
+	for k := range server.conns {
+		st = k.(*http2Server)
+	}
+	server.mu.Unlock()
+	cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+	if err != nil {
+		t.Fatalf("Client failed to create first stream. Err: %v", err)
+	}
+
+	var sstream1 *Stream
+	// Access stream on the server.
+	waitWhileTrue(t, func() (bool, error) {
+		st.mu.Lock()
+		defer st.mu.Unlock()
+
+		if len(st.activeStreams) != 1 {
+			return true, fmt.Errorf("timed-out while waiting for server to have created a stream")
+		}
+		for _, v := range st.activeStreams {
+			sstream1 = v
+		}
+		return false, nil
+	})
+
+	// Exhaust client's connection window.
+	<-st.writableChan
+	if err := st.framer.writeData(true, sstream1.id, true, make([]byte, defaultWindowSize)); err != nil {
+		st.writableChan <- 0
+		t.Fatalf("Server failed to write data. Err: %v", err)
+	}
+	st.writableChan <- 0
+	// Create another stream on client.
+	cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+	if err != nil {
+		t.Fatalf("Client failed to create second stream. Err: %v", err)
+	}
+
+	var sstream2 *Stream
+	waitWhileTrue(t, func() (bool, error) {
+		st.mu.Lock()
+		defer st.mu.Unlock()
+
+		if len(st.activeStreams) != 2 {
+			return true, fmt.Errorf("timed-out while waiting for server to have created the second stream")
+		}
+		for _, v := range st.activeStreams {
+			if v.id == cstream2.id {
+				sstream2 = v
+			}
+		}
+		if sstream2 == nil {
+			return true, fmt.Errorf("didn't find stream corresponding to client cstream.id: %v on the server", cstream2.id)
+		}
+		return false, nil
+	})
+
+	// Server should be able to send data on the new stream, even though the client hasn't read anything on the first stream.
+	<-st.writableChan
+	if err := st.framer.writeData(true, sstream2.id, true, make([]byte, defaultWindowSize)); err != nil {
+		st.writableChan <- 0
+		t.Fatalf("Server failed to write data. Err: %v", err)
+	}
+	st.writableChan <- 0
+
+	// Client should be able to read data on second stream.
+	if _, err := cstream2.Read(make([]byte, defaultWindowSize)); err != nil {
+		t.Fatalf("_.Read(_) = _, %v, want _, <nil>", err)
+	}
+
+	// Client should be able to read data on first stream.
+	if _, err := cstream1.Read(make([]byte, defaultWindowSize)); err != nil {
+		t.Fatalf("_.Read(_) = _, %v, want _, <nil>", err)
+	}
+}
+
+func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
+	serverConfig := &ServerConfig{
+		InitialWindowSize:     defaultWindowSize,
+		InitialConnWindowSize: defaultWindowSize,
+	}
+	server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+	defer server.stop()
+	defer client.Close()
+	waitWhileTrue(t, func() (bool, error) {
+		server.mu.Lock()
+		defer server.mu.Unlock()
+
+		if len(server.conns) == 0 {
+			return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
+		}
+		return false, nil
+	})
+	var st *http2Server
+	server.mu.Lock()
+	for k := range server.conns {
+		st = k.(*http2Server)
+	}
+	server.mu.Unlock()
+	cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+	if err != nil {
+		t.Fatalf("Failed to create 1st stream. Err: %v", err)
+	}
+	// Exhaust server's connection window.
+	if err := client.Write(cstream1, make([]byte, defaultWindowSize), &Options{Last: true}); err != nil {
+		t.Fatalf("Client failed to write data. Err: %v", err)
+	}
+	//Client should be able to create another stream and send data on it.
+	cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
+	if err != nil {
+		t.Fatalf("Failed to create 2nd stream. Err: %v", err)
+	}
+	if err := client.Write(cstream2, make([]byte, defaultWindowSize), &Options{}); err != nil {
+		t.Fatalf("Client failed to write data. Err: %v", err)
+	}
+	// Get the streams on server.
+	waitWhileTrue(t, func() (bool, error) {
+		st.mu.Lock()
+		defer st.mu.Unlock()
+
+		if len(st.activeStreams) != 2 {
+			return true, fmt.Errorf("timed-out while waiting for server to have created the streams")
+		}
+		return false, nil
+	})
+	var sstream1 *Stream
+	st.mu.Lock()
+	for _, v := range st.activeStreams {
+		if v.id == 1 {
+			sstream1 = v
+		}
+	}
+	st.mu.Unlock()
+	// Trying to write more on a max-ed out stream should result in a RST_STREAM from the server.
+	ct := client.(*http2Client)
+	<-ct.writableChan
+	if err := ct.framer.writeData(true, cstream2.id, true, make([]byte, 1)); err != nil {
+		t.Fatalf("Client failed to write. Err: %v", err)
+	}
+	ct.writableChan <- 0
+	code := http2ErrConvTab[http2.ErrCodeFlowControl]
+	waitWhileTrue(t, func() (bool, error) {
+		cstream2.mu.Lock()
+		defer cstream2.mu.Unlock()
+		if cstream2.status.Code() != code {
+			return true, fmt.Errorf("want code = %v, got %v", code, cstream2.status.Code())
+		}
+		return false, nil
+	})
+	// Reading from the stream on server should succeed.
+	if _, err := sstream1.Read(make([]byte, defaultWindowSize)); err != nil {
+		t.Fatalf("_.Read(_) = %v, want <nil>", err)
+	}
+
+	if _, err := sstream1.Read(make([]byte, 1)); err != io.EOF {
+		t.Fatalf("_.Read(_) = %v, want io.EOF", err)
+	}
+
+}
+
 func TestServerWithMisbehavedClient(t *testing.T) {
 	server, ct := setUp(t, 0, math.MaxUint32, suspended)
 	callHdr := &CallHdr{
@@ -1224,7 +1404,7 @@
 		}
 		ss.fc.mu.Unlock()
 	}
-	if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != http2MaxFrameLen || sc.fc.pendingUpdate != 0 {
+	if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != http2MaxFrameLen {
 		t.Fatalf("Server mistakenly updates inbound flow control params: got %d, %d, %d, %d; want %d, %d, %d, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, http2MaxFrameLen, 0, http2MaxFrameLen, 0)
 	}
 	// Keep sending until the server inbound window is drained for that stream.
@@ -1245,24 +1425,10 @@
 		t.Fatalf("%v got status %v; want Code=%v", s, s.status, code)
 	}
 
-	if ss.fc.pendingData != 0 || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize {
-		t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, >%d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize)
+	if sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize {
+		t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize)
 	}
 	ct.CloseStream(s, nil)
-	// Test server behavior for violation of connection flow control window size restriction.
-	//
-	// Keep creating new streams until the connection window is drained on the server and
-	// the server tears down the connection.
-	for {
-		s, err := ct.NewStream(context.Background(), callHdr)
-		if err != nil {
-			// The server tears down the connection.
-			break
-		}
-		<-cc.writableChan
-		cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen))
-		cc.writableChan <- 0
-	}
 	ct.Close()
 	server.stop()
 }
@@ -1293,7 +1459,7 @@
 			break
 		}
 	}
-	if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData <= initialWindowSize || conn.fc.pendingUpdate != 0 {
+	if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
 		t.Fatalf("Client mistakenly updates inbound flow control params: got %d, %d, %d, %d; want >%d, %d, >%d, %d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize, 0, initialWindowSize, 0)
 	}
 
@@ -1305,25 +1471,9 @@
 	}
 
 	conn.CloseStream(s, err)
-	if s.fc.pendingData != 0 || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
-		t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, >%d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize)
+	if conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
+		t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize)
 	}
-	// Test the logic for the violation of the connection flow control window size restriction.
-	//
-	// Generate enough streams to drain the connection window. Make the server flood the traffic
-	// to violate flow control window size of the connection.
-	callHdr.Method = "foo.Connection"
-	for i := 0; i < int(initialConnWindowSize/initialWindowSize+10); i++ {
-		s, err := ct.NewStream(context.Background(), callHdr)
-		if err != nil {
-			break
-		}
-		if err := ct.Write(s, d, &Options{Last: true, Delay: false}); err != nil {
-			break
-		}
-	}
-	// http2Client.errChan is closed due to connection flow control window size violation.
-	<-conn.Error()
 	ct.Close()
 	server.stop()
 }