Merge pull request #903 from improbable-io/blocking-graceful-shutdown-fix

Make concurrent Server.GracefulStop calls all behave equivalently.
diff --git a/server.go b/server.go
index 2f73cca..e0bb187 100644
--- a/server.go
+++ b/server.go
@@ -527,7 +527,7 @@
 	defer s.mu.Unlock()
 	if s.conns != nil {
 		delete(s.conns, c)
-		s.cv.Signal()
+		s.cv.Broadcast()
 	}
 }
 
@@ -828,7 +828,7 @@
 	st := s.conns
 	s.conns = nil
 	// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
-	s.cv.Signal()
+	s.cv.Broadcast()
 	s.mu.Unlock()
 
 	for lis := range listeners {
@@ -852,17 +852,19 @@
 func (s *Server) GracefulStop() {
 	s.mu.Lock()
 	defer s.mu.Unlock()
-	if s.drain == true || s.conns == nil {
+	if s.conns == nil {
 		return
 	}
-	s.drain = true
 	for lis := range s.lis {
 		lis.Close()
 	}
 	s.lis = nil
 	s.cancel()
-	for c := range s.conns {
-		c.(transport.ServerTransport).Drain()
+	if !s.drain {
+		for c := range s.conns {
+			c.(transport.ServerTransport).Drain()
+		}
+		s.drain = true
 	}
 	for len(s.conns) != 0 {
 		s.cv.Wait()
diff --git a/test/end2end_test.go b/test/end2end_test.go
index c4178ef..7307129 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -765,6 +765,93 @@
 	awaitNewConnLogOutput()
 }
 
+func TestServerMultipleGoAwayPendingRPC(t *testing.T) {
+	defer leakCheck(t)()
+	for _, e := range listTestEnv() {
+		if e.name == "handler-tls" {
+			continue
+		}
+		testServerMultipleGoAwayPendingRPC(t, e)
+	}
+}
+
+func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
+	te := newTest(t, e)
+	te.userAgent = testAppUA
+	te.declareLogNoise(
+		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
+		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
+		"grpc: addrConn.resetTransport failed to create client transport: connection error",
+	)
+	te.startServer(&testServer{security: e.security})
+	defer te.tearDown()
+
+	cc := te.clientConn()
+	tc := testpb.NewTestServiceClient(cc)
+	ctx, cancel := context.WithCancel(context.Background())
+	stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
+	if err != nil {
+		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
+	}
+	// Finish an RPC to make sure the connection is good.
+	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
+		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
+	}
+	ch1 := make(chan struct{})
+	go func() {
+		te.srv.GracefulStop()
+		close(ch1)
+	}()
+	ch2 := make(chan struct{})
+	go func() {
+		te.srv.GracefulStop()
+		close(ch2)
+	}()
+	// Loop until the server side GoAway signal is propagated to the client.
+	for {
+		ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
+		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil {
+			continue
+		}
+		break
+	}
+	select {
+	case <-ch1:
+		t.Fatal("GracefulStop() terminated early")
+	case <-ch2:
+		t.Fatal("GracefulStop() terminated early")
+	default:
+	}
+	respParam := []*testpb.ResponseParameters{
+		{
+			Size: proto.Int32(1),
+		},
+	}
+	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
+	if err != nil {
+		t.Fatal(err)
+	}
+	req := &testpb.StreamingOutputCallRequest{
+		ResponseType:       testpb.PayloadType_COMPRESSABLE.Enum(),
+		ResponseParameters: respParam,
+		Payload:            payload,
+	}
+	// The existing RPC should be still good to proceed.
+	if err := stream.Send(req); err != nil {
+		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
+	}
+	if _, err := stream.Recv(); err != nil {
+		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
+	}
+	if err := stream.CloseSend(); err != nil {
+		t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
+	}
+	<-ch1
+	<-ch2
+	cancel()
+	awaitNewConnLogOutput()
+}
+
 func TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) {
 	defer leakCheck(t)()
 	for _, e := range listTestEnv() {