transport: fix error handling on Stream deletion (#1275)
This patch writes client-side error before closing the active stream
to fix blocking `RecvMsg` issue on `grpc.ClientStream` [1].
Previous gRPC client stream just exits on `ClientTransport.Error` [2].
And latest gRPC added another select case on client connection context
cancel [3]. Now when client stream closes from client connection context
cancel, it calls `CloseStream` with `ErrClientConnClosing` error. And then
the stream gets deleted from `*http2Client.activeStreams`, without processing
the error [4]. Then in-flight `RecvMsg` call on this client will block on
`*parser.Reader.recvMsg` [5].
In short,
1. `ClientConn.Close`.
2. in-flight streams will receive case `<-cc.ctx.Done()`
https://github.com/grpc/grpc-go/blob/master/stream.go#L253-L255.
3. `cs.closeTransportStream(ErrClientConnClosing)` calls `cs.t.CloseStream(cs.s, err)`.
4. `CloseStream(cs.s, err)` calls `delete(t.activeStreams, s.id)`
without handling the error.
5. in-flight streams will never receive error, left hanging.
I can reproduce in etcd tests with in-flight `recvMsg` calls to `Observe` RPC.
---
[1] https://github.com/coreos/etcd/pull/7896#issuecomment-305241742
[2] https://github.com/grpc/grpc-go/blob/v1.2.x/stream.go#L235-L238
[3] https://github.com/grpc/grpc-go/pull/1136
[4] https://github.com/grpc/grpc-go/blob/master/transport/http2_client.go#L569
[5] https://github.com/grpc/grpc-go/blob/master/rpc_util.go#L280
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 92b5180..52c9610 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -566,6 +566,10 @@
t.mu.Unlock()
return
}
+ if err != nil {
+ // notify in-flight streams, before the deletion
+ s.write(recvMsg{err: err})
+ }
delete(t.activeStreams, s.id)
if t.state == draining && len(t.activeStreams) == 0 {
// The transport is draining and s is the last live stream on t.
diff --git a/transport/transport_test.go b/transport/transport_test.go
index ec45727..d1dce6f 100644
--- a/transport/transport_test.go
+++ b/transport/transport_test.go
@@ -397,6 +397,43 @@
return tr
}
+// TestInflightStreamClosing ensures that closing in-flight stream
+// sends StreamError to concurrent stream reader.
+func TestInflightStreamClosing(t *testing.T) {
+ serverConfig := &ServerConfig{}
+ server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
+ defer server.stop()
+ defer client.Close()
+
+ stream, err := client.NewStream(context.Background(), &CallHdr{})
+ if err != nil {
+ t.Fatalf("Client failed to create RPC request: %v", err)
+ }
+
+ donec := make(chan struct{})
+ serr := StreamError{Desc: "client connection is closing"}
+ go func() {
+ defer close(donec)
+ if _, err := stream.Read(make([]byte, defaultWindowSize)); err != serr {
+ t.Errorf("unexpected Stream error %v, expected %v", err, serr)
+ }
+ }()
+
+ // should unblock concurrent stream.Read
+ client.CloseStream(stream, serr)
+
+ // wait for stream.Read error
+ timeout := time.NewTimer(5 * time.Second)
+ select {
+ case <-donec:
+ if !timeout.Stop() {
+ <-timeout.C
+ }
+ case <-timeout.C:
+ t.Fatalf("Test timed out, expected a StreamError.")
+ }
+}
+
// TestMaxConnectionIdle tests that a server will send GoAway to a idle client.
// An idle client is one who doesn't make any RPC calls for a duration of
// MaxConnectionIdle time.