blob: 48b7f0f7c7ac68cbefffebddee658ffd92dc2220 [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test
import (
"context"
"io"
"net"
"strings"
"testing"
"time"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
)
// TestGracefulClientOnGoAway attempts to ensure that when the server sends a
// GOAWAY (in this test, by configuring max connection age on the server), a
// client will never see an error. This requires that the client is appraised
// of the GOAWAY and updates its state accordingly before the transport stops
// accepting new streams. If a subconn is chosen by a picker and receives the
// goaway before creating the stream, an error will occur, but upon transparent
// retry, the clientconn will ensure a ready subconn is chosen.
func (s) TestGracefulClientOnGoAway(t *testing.T) {
const maxConnAge = 100 * time.Millisecond
const testTime = maxConnAge * 10
ss := &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
s := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge}))
defer s.Stop()
testpb.RegisterTestServiceServer(s, ss)
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
go s.Serve(lis)
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Failed to dial server: %v", err)
}
defer cc.Close()
c := testpb.NewTestServiceClient(cc)
endTime := time.Now().Add(testTime)
for time.Now().Before(endTime) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall(_, _) = _, %v; want _, <nil>", err)
}
cancel()
}
}
func (s) TestDetailedGoAwayErrorOnGracefulClosePropagatesToRPCError(t *testing.T) {
rpcDoneOnClient := make(chan struct{})
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
<-rpcDoneOnClient
return status.Error(codes.Internal, "arbitrary status")
},
}
sopts := []grpc.ServerOption{
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionAge: time.Millisecond * 100,
MaxConnectionAgeGrace: time.Nanosecond, // ~instantaneously, but non-zero to avoid default
}),
}
if err := ss.Start(sopts); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
}
const expectedErrorMessageSubstring = "received prior goaway: code: NO_ERROR"
_, err = stream.Recv()
close(rpcDoneOnClient)
if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) {
t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: %q", stream, err, expectedErrorMessageSubstring)
}
}
func (s) TestDetailedGoAwayErrorOnAbruptClosePropagatesToRPCError(t *testing.T) {
// set the min keepalive time very low so that this test can take
// a reasonable amount of time
prev := internal.KeepaliveMinPingTime
internal.KeepaliveMinPingTime = time.Millisecond
defer func() { internal.KeepaliveMinPingTime = prev }()
rpcDoneOnClient := make(chan struct{})
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
<-rpcDoneOnClient
return status.Error(codes.Internal, "arbitrary status")
},
}
sopts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: time.Second * 1000, /* arbitrary, large value */
}),
}
dopts := []grpc.DialOption{
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Millisecond, /* should trigger "too many pings" error quickly */
Timeout: time.Second * 1000, /* arbitrary, large value */
PermitWithoutStream: false,
}),
}
if err := ss.Start(sopts, dopts...); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
}
const expectedErrorMessageSubstring = `received prior goaway: code: ENHANCE_YOUR_CALM, debug data: "too_many_pings"`
_, err = stream.Recv()
close(rpcDoneOnClient)
if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) {
t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: |%v|", stream, err, expectedErrorMessageSubstring)
}
}
func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
for _, e := range listTestEnv() {
if e.name == "handler-tls" {
continue
}
testClientConnCloseAfterGoAwayWithActiveStream(t, e)
}
}
func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if _, err := tc.FullDuplexCall(ctx); err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
}
done := make(chan struct{})
go func() {
te.srv.GracefulStop()
close(done)
}()
time.Sleep(50 * time.Millisecond)
cc.Close()
timeout := time.NewTimer(time.Second)
select {
case <-done:
case <-timeout.C:
t.Fatalf("Test timed-out.")
}
}
func (s) TestServerGoAway(t *testing.T) {
for _, e := range listTestEnv() {
if e.name == "handler-tls" {
continue
}
testServerGoAway(t, e)
}
}
func testServerGoAway(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
// Finish an RPC to make sure the connection is good.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
ch := make(chan struct{})
go func() {
te.srv.GracefulStop()
close(ch)
}()
// Loop until the server side GoAway signal is propagated to the client.
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) != codes.DeadlineExceeded {
cancel()
break
}
cancel()
}
// A new RPC should fail.
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
}
<-ch
awaitNewConnLogOutput()
}
func (s) TestServerGoAwayPendingRPC(t *testing.T) {
for _, e := range listTestEnv() {
if e.name == "handler-tls" {
continue
}
testServerGoAwayPendingRPC(t, e)
}
}
func testServerGoAwayPendingRPC(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.declareLogNoise(
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
// Finish an RPC to make sure the connection is good.
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
}
ch := make(chan struct{})
go func() {
te.srv.GracefulStop()
close(ch)
}()
// Loop until the server side GoAway signal is propagated to the client.
start := time.Now()
errored := false
for time.Since(start) < time.Second {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true))
cancel()
if err != nil {
errored = true
break
}
}
if !errored {
t.Fatalf("GoAway never received by client")
}
respParam := []*testpb.ResponseParameters{{Size: 1}}
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
if err != nil {
t.Fatal(err)
}
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseParameters: respParam,
Payload: payload,
}
// The existing RPC should be still good to proceed.
if err := stream.Send(req); err != nil {
t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
}
if _, err := stream.Recv(); err != nil {
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
}
// The RPC will run until canceled.
cancel()
<-ch
awaitNewConnLogOutput()
}
func (s) TestServerMultipleGoAwayPendingRPC(t *testing.T) {
for _, e := range listTestEnv() {
if e.name == "handler-tls" {
continue
}
testServerMultipleGoAwayPendingRPC(t, e)
}
}
func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.declareLogNoise(
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithCancel(context.Background())
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
// Finish an RPC to make sure the connection is good.
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
}
ch1 := make(chan struct{})
go func() {
te.srv.GracefulStop()
close(ch1)
}()
ch2 := make(chan struct{})
go func() {
te.srv.GracefulStop()
close(ch2)
}()
// Loop until the server side GoAway signal is propagated to the client.
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
cancel()
break
}
cancel()
}
select {
case <-ch1:
t.Fatal("GracefulStop() terminated early")
case <-ch2:
t.Fatal("GracefulStop() terminated early")
default:
}
respParam := []*testpb.ResponseParameters{
{
Size: 1,
},
}
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
if err != nil {
t.Fatal(err)
}
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseParameters: respParam,
Payload: payload,
}
// The existing RPC should be still good to proceed.
if err := stream.Send(req); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
}
if _, err := stream.Recv(); err != nil {
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
}
if err := stream.CloseSend(); err != nil {
t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
}
<-ch1
<-ch2
cancel()
awaitNewConnLogOutput()
}
func (s) TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) {
for _, e := range listTestEnv() {
if e.name == "handler-tls" {
continue
}
testConcurrentClientConnCloseAndServerGoAway(t, e)
}
}
func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.declareLogNoise(
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
}
ch := make(chan struct{})
// Close ClientConn and Server concurrently.
go func() {
te.srv.GracefulStop()
close(ch)
}()
go func() {
cc.Close()
}()
<-ch
}
func (s) TestConcurrentServerStopAndGoAway(t *testing.T) {
for _, e := range listTestEnv() {
if e.name == "handler-tls" {
continue
}
testConcurrentServerStopAndGoAway(t, e)
}
}
func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.declareLogNoise(
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
// Finish an RPC to make sure the connection is good.
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
}
ch := make(chan struct{})
go func() {
te.srv.GracefulStop()
close(ch)
}()
// Loop until the server side GoAway signal is propagated to the client.
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
cancel()
break
}
cancel()
}
// Stop the server and close all the connections.
te.srv.Stop()
respParam := []*testpb.ResponseParameters{
{
Size: 1,
},
}
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
if err != nil {
t.Fatal(err)
}
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseParameters: respParam,
Payload: payload,
}
sendStart := time.Now()
for {
if err := stream.Send(req); err == io.EOF {
// stream.Send should eventually send io.EOF
break
} else if err != nil {
// Send should never return a transport-level error.
t.Fatalf("stream.Send(%v) = %v; want <nil or io.EOF>", req, err)
}
if time.Since(sendStart) > 2*time.Second {
t.Fatalf("stream.Send(_) did not return io.EOF after 2s")
}
time.Sleep(time.Millisecond)
}
if _, err := stream.Recv(); err == nil || err == io.EOF {
t.Fatalf("%v.Recv() = _, %v, want _, <non-nil, non-EOF>", stream, err)
}
<-ch
awaitNewConnLogOutput()
}
// Proxies typically send GO_AWAY followed by connection closure a minute or so later. This
// test ensures that the connection is re-created after GO_AWAY and not affected by the
// subsequent (old) connection closure.
func (s) TestGoAwayThenClose(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
lis1, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
s1 := grpc.NewServer()
defer s1.Stop()
ts := &funcServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil {
t.Errorf("unexpected error from send: %v", err)
return err
}
// Wait forever.
_, err := stream.Recv()
if err == nil {
t.Error("expected to never receive any message")
}
return err
},
}
testpb.RegisterTestServiceServer(s1, ts)
go s1.Serve(lis1)
conn2Established := grpcsync.NewEvent()
lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established)
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
s2 := grpc.NewServer()
defer s2.Stop()
testpb.RegisterTestServiceServer(s2, ts)
r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{Addresses: []resolver.Address{
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
}})
cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer cc.Close()
client := testpb.NewTestServiceClient(cc)
t.Log("Waiting for the ClientConn to enter READY state.")
state := cc.GetState()
for ; state != connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
}
if ctx.Err() != nil {
t.Fatalf("timed out waiting for READY channel state; last state = %v", state)
}
// We make a streaming RPC and do an one-message-round-trip to make sure
// it's created on connection 1.
//
// We use a long-lived RPC because it will cause GracefulStop to send
// GO_AWAY, but the connection won't get closed until the server stops and
// the client receives the error.
t.Log("Creating first streaming RPC to server 1.")
stream, err := client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err)
}
if _, err = stream.Recv(); err != nil {
t.Fatalf("unexpected error from first recv: %v", err)
}
go s2.Serve(lis2)
t.Log("Gracefully stopping server 1.")
go s1.GracefulStop()
t.Log("Waiting for the ClientConn to enter IDLE state.")
for ; state != connectivity.Idle && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
}
if ctx.Err() != nil {
t.Fatalf("timed out waiting for IDLE channel state; last state = %v", state)
}
t.Log("Performing another RPC to create a connection to server 2.")
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
}
t.Log("Waiting for a connection to server 2.")
select {
case <-conn2Established.Done():
case <-ctx.Done():
t.Fatalf("timed out waiting for connection 2 to be established")
}
// Close the listener for server2 to prevent it from allowing new connections.
lis2.Close()
t.Log("Hard closing connection 1.")
s1.Stop()
t.Log("Waiting for the first stream to error.")
if _, err = stream.Recv(); err == nil {
t.Fatal("expected the stream to die, but got a successful Recv")
}
t.Log("Ensuring connection 2 is stable.")
for i := 0; i < 10; i++ {
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
}
}
}
// TestGoAwayStreamIDSmallerThanCreatedStreams tests the scenario where a server
// sends a goaway with a stream id that is smaller than some created streams on
// the client, while the client is simultaneously creating new streams. This
// should not induce a deadlock.
func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}
ctCh := testutils.NewChannel()
go func() {
conn, err := lis.Accept()
if err != nil {
t.Errorf("error in lis.Accept(): %v", err)
}
ct := newClientTester(t, conn)
ctCh.Send(ct)
}()
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("error dialing: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
val, err := ctCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout waiting for client transport (should be given after http2 creation)")
}
ct := val.(*clientTester)
tc := testpb.NewTestServiceClient(cc)
someStreamsCreated := grpcsync.NewEvent()
goAwayWritten := grpcsync.NewEvent()
go func() {
for i := 0; i < 20; i++ {
if i == 10 {
<-goAwayWritten.Done()
}
tc.FullDuplexCall(ctx)
if i == 4 {
someStreamsCreated.Fire()
}
}
}()
<-someStreamsCreated.Done()
ct.writeGoAway(1, http2.ErrCodeNo, []byte{})
goAwayWritten.Fire()
}
// TestTwoGoAwayPingFrames tests the scenario where you get two go away ping
// frames from the client during graceful shutdown. This should not crash the
// server.
func (s) TestTwoGoAwayPingFrames(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to listen: %v", err)
}
defer lis.Close()
s := grpc.NewServer()
defer s.Stop()
go s.Serve(lis)
conn, err := net.DialTimeout("tcp", lis.Addr().String(), defaultTestTimeout)
if err != nil {
t.Fatalf("Failed to dial: %v", err)
}
st := newServerTesterFromConn(t, conn)
st.greet()
pingReceivedClientSide := testutils.NewChannel()
go func() {
for {
f, err := st.readFrame()
if err != nil {
return
}
switch f.(type) {
case *http2.GoAwayFrame:
case *http2.PingFrame:
pingReceivedClientSide.Send(nil)
default:
t.Errorf("server tester received unexpected frame type %T", f)
}
}
}()
gsDone := testutils.NewChannel()
go func() {
s.GracefulStop()
gsDone.Send(nil)
}()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := pingReceivedClientSide.Receive(ctx); err != nil {
t.Fatalf("Error waiting for ping frame client side from graceful shutdown: %v", err)
}
// Write two goaway pings here.
st.writePing(true, [8]byte{1, 6, 1, 8, 0, 3, 3, 9})
st.writePing(true, [8]byte{1, 6, 1, 8, 0, 3, 3, 9})
// Close the conn to finish up the Graceful Shutdown process.
conn.Close()
if _, err := gsDone.Receive(ctx); err != nil {
t.Fatalf("Error waiting for graceful shutdown of the server: %v", err)
}
}