Merge pull request #903 from improbable-io/blocking-graceful-shutdown-fix
Make concurrent Server.GracefulStop calls all behave equivalently.
diff --git a/server.go b/server.go
index 2f73cca..e0bb187 100644
--- a/server.go
+++ b/server.go
@@ -527,7 +527,7 @@
defer s.mu.Unlock()
if s.conns != nil {
delete(s.conns, c)
- s.cv.Signal()
+ s.cv.Broadcast()
}
}
@@ -828,7 +828,7 @@
st := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
- s.cv.Signal()
+ s.cv.Broadcast()
s.mu.Unlock()
for lis := range listeners {
@@ -852,17 +852,19 @@
func (s *Server) GracefulStop() {
s.mu.Lock()
defer s.mu.Unlock()
- if s.drain == true || s.conns == nil {
+ if s.conns == nil {
return
}
- s.drain = true
for lis := range s.lis {
lis.Close()
}
s.lis = nil
s.cancel()
- for c := range s.conns {
- c.(transport.ServerTransport).Drain()
+ if !s.drain {
+ for c := range s.conns {
+ c.(transport.ServerTransport).Drain()
+ }
+ s.drain = true
}
for len(s.conns) != 0 {
s.cv.Wait()
diff --git a/test/end2end_test.go b/test/end2end_test.go
index c4178ef..7307129 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -765,6 +765,93 @@
awaitNewConnLogOutput()
}
+func TestServerMultipleGoAwayPendingRPC(t *testing.T) {
+ defer leakCheck(t)()
+ for _, e := range listTestEnv() {
+ if e.name == "handler-tls" {
+ continue
+ }
+ testServerMultipleGoAwayPendingRPC(t, e)
+ }
+}
+
+func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
+ te := newTest(t, e)
+ te.userAgent = testAppUA
+ te.declareLogNoise(
+ "transport: http2Client.notifyError got notified that the client transport was broken EOF",
+ "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
+ "grpc: addrConn.resetTransport failed to create client transport: connection error",
+ )
+ te.startServer(&testServer{security: e.security})
+ defer te.tearDown()
+
+ cc := te.clientConn()
+ tc := testpb.NewTestServiceClient(cc)
+ ctx, cancel := context.WithCancel(context.Background())
+ stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
+ if err != nil {
+ t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
+ }
+ // Finish an RPC to make sure the connection is good.
+ if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
+ t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
+ }
+ ch1 := make(chan struct{})
+ go func() {
+ te.srv.GracefulStop()
+ close(ch1)
+ }()
+ ch2 := make(chan struct{})
+ go func() {
+ te.srv.GracefulStop()
+ close(ch2)
+ }()
+ // Loop until the server side GoAway signal is propagated to the client.
+ for {
+ ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
+ if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil {
+ continue
+ }
+ break
+ }
+ select {
+ case <-ch1:
+ t.Fatal("GracefulStop() terminated early")
+ case <-ch2:
+ t.Fatal("GracefulStop() terminated early")
+ default:
+ }
+ respParam := []*testpb.ResponseParameters{
+ {
+ Size: proto.Int32(1),
+ },
+ }
+ payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
+ if err != nil {
+ t.Fatal(err)
+ }
+ req := &testpb.StreamingOutputCallRequest{
+ ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
+ ResponseParameters: respParam,
+ Payload: payload,
+ }
+ // The existing RPC should be still good to proceed.
+ if err := stream.Send(req); err != nil {
+ t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
+ }
+ if _, err := stream.Recv(); err != nil {
+ t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
+ }
+ if err := stream.CloseSend(); err != nil {
+ t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
+ }
+ <-ch1
+ <-ch2
+ cancel()
+ awaitNewConnLogOutput()
+}
+
func TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {