transport: fix error handling on Stream deletion (#1275)

This patch writes client-side error before closing the active stream
to fix blocking `RecvMsg` issue on `grpc.ClientStream` [1].

Previous gRPC client stream just exits on `ClientTransport.Error` [2].
And latest gRPC added another select case on client connection context
cancel [3]. Now when client stream closes from client connection context
cancel, it calls `CloseStream` with `ErrClientConnClosing` error. And then
the stream gets deleted from `*http2Client.activeStreams`, without processing
the error [4]. Then in-flight `RecvMsg` call on this client will block on
`*parser.Reader.recvMsg` [5].

In short,

1. `ClientConn.Close`.
2. in-flight streams will receive case `<-cc.ctx.Done()`
   https://github.com/grpc/grpc-go/blob/master/stream.go#L253-L255.
3. `cs.closeTransportStream(ErrClientConnClosing)` calls `cs.t.CloseStream(cs.s, err)`.
4. `CloseStream(cs.s, err)` calls `delete(t.activeStreams, s.id)`
   without handling the error.
5. in-flight streams will never receive error, left hanging.

I can reproduce in etcd tests with in-flight `recvMsg` calls to `Observe` RPC.

---
[1] https://github.com/coreos/etcd/pull/7896#issuecomment-305241742
[2] https://github.com/grpc/grpc-go/blob/v1.2.x/stream.go#L235-L238
[3] https://github.com/grpc/grpc-go/pull/1136
[4] https://github.com/grpc/grpc-go/blob/master/transport/http2_client.go#L569
[5] https://github.com/grpc/grpc-go/blob/master/rpc_util.go#L280

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 92b5180..52c9610 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -566,6 +566,10 @@
 		t.mu.Unlock()
 		return
 	}
+	if err != nil {
+		// notify in-flight streams, before the deletion
+		s.write(recvMsg{err: err})
+	}
 	delete(t.activeStreams, s.id)
 	if t.state == draining && len(t.activeStreams) == 0 {
 		// The transport is draining and s is the last live stream on t.
diff --git a/transport/transport_test.go b/transport/transport_test.go
index ec45727..d1dce6f 100644
--- a/transport/transport_test.go
+++ b/transport/transport_test.go
@@ -397,6 +397,43 @@
 	return tr
 }
 
+// TestInflightStreamClosing ensures that closing in-flight stream
+// sends StreamError to concurrent stream reader.
+func TestInflightStreamClosing(t *testing.T) {
+	serverConfig := &ServerConfig{}
+	server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+	defer server.stop()
+	defer client.Close()
+
+	stream, err := client.NewStream(context.Background(), &CallHdr{})
+	if err != nil {
+		t.Fatalf("Client failed to create RPC request: %v", err)
+	}
+
+	donec := make(chan struct{})
+	serr := StreamError{Desc: "client connection is closing"}
+	go func() {
+		defer close(donec)
+		if _, err := stream.Read(make([]byte, defaultWindowSize)); err != serr {
+			t.Errorf("unexpected Stream error %v, expected %v", err, serr)
+		}
+	}()
+
+	// should unblock concurrent stream.Read
+	client.CloseStream(stream, serr)
+
+	// wait for stream.Read error
+	timeout := time.NewTimer(5 * time.Second)
+	select {
+	case <-donec:
+		if !timeout.Stop() {
+			<-timeout.C
+		}
+	case <-timeout.C:
+		t.Fatalf("Test timed out, expected a StreamError.")
+	}
+}
+
 // TestMaxConnectionIdle tests that a server will send GoAway to a idle client.
 // An idle client is one who doesn't make any RPC calls for a duration of
 // MaxConnectionIdle time.