blob: e904ec4694a2a722c37dfbfcdf64d6dec964fda7 [file] [log] [blame]
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc_test
import (
"context"
"io"
"runtime"
"sync"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/status"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
// TestServer_MaxHandlers ensures that no more than MaxConcurrentStreams server
// handlers are active at one time.
func (s) TestServer_MaxHandlers(t *testing.T) {
started := make(chan struct{})
blockCalls := grpcsync.NewEvent()
// This stub server does not properly respect the stream context, so it will
// not exit when the context is canceled.
ss := stubserver.StubServer{
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
started <- struct{}{}
<-blockCalls.Done()
return nil
},
}
if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start one RPC to the server.
ctx1, cancel1 := context.WithCancel(ctx)
_, err := ss.Client.FullDuplexCall(ctx1)
if err != nil {
t.Fatal("Error staring call:", err)
}
// Wait for the handler to be invoked.
select {
case <-started:
case <-ctx.Done():
t.Fatalf("Timed out waiting for RPC to start on server.")
}
// Cancel it on the client. The server handler will still be running.
cancel1()
ctx2, cancel2 := context.WithCancel(ctx)
defer cancel2()
s, err := ss.Client.FullDuplexCall(ctx2)
if err != nil {
t.Fatal("Error staring call:", err)
}
// After 100ms, allow the first call to unblock. That should allow the
// second RPC to run and finish.
select {
case <-started:
blockCalls.Fire()
t.Fatalf("RPC started unexpectedly.")
case <-time.After(100 * time.Millisecond):
blockCalls.Fire()
}
select {
case <-started:
case <-ctx.Done():
t.Fatalf("Timed out waiting for second RPC to start on server.")
}
if _, err := s.Recv(); err != io.EOF {
t.Fatal("Received unexpected RPC error:", err)
}
}
// Tests the case where the stream worker goroutine option is enabled, and a
// number of RPCs are initiated around the same time that Stop() is called. This
// used to result in a write to a closed channel. This test verifies that there
// is no panic.
func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) {
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
// This deferred stop takes care of stopping the server when one of the
// below grpc.Dials fail, and the test exits early.
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
const numChannels = 20
const numRPCLoops = 20
// Create a bunch of clientconns and ensure that they are READY by making an
// RPC on them.
ccs := make([]*grpc.ClientConn, numChannels)
for i := 0; i < numChannels; i++ {
var err error
ccs[i], err = grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("[iteration: %d] grpc.NewClient(%s) failed: %v", i, ss.Address, err)
}
defer ccs[i].Close()
client := testgrpc.NewTestServiceClient(ccs[i])
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
}
// Make a bunch of concurrent RPCs on the above clientconns. These will
// eventually race with Stop(), and will start to fail.
var wg sync.WaitGroup
for i := 0; i < numChannels; i++ {
client := testgrpc.NewTestServiceClient(ccs[i])
for j := 0; j < numRPCLoops; j++ {
wg.Add(1)
go func(client testgrpc.TestServiceClient) {
defer wg.Done()
for {
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if err == nil {
continue
}
if code := status.Code(err); code == codes.Unavailable {
// Once Stop() has been called on the server, we expect
// subsequent calls to fail with Unavailable.
return
}
t.Errorf("EmptyCall() failed: %v", err)
return
}
}(client)
}
}
// Call Stop() concurrently with the above RPC attempts.
ss.Stop()
wg.Wait()
}
// Tests the case where the stream worker goroutine option is enabled, and both
// Stop() and GracefulStop() care called. This used to result in a close of a
// closed channel. This test verifies that there is no panic.
func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) {
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
defer ss.Stop()
if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
t.Fatalf("Failed to create client to stub server: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(ss.CC)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
ss.S.GracefulStop()
}
// Tests the WaitForHandlers ServerOption by leaving an RPC running while Stop
// is called, and ensures Stop doesn't return until the handler returns.
func (s) TestServer_WaitForHandlers(t *testing.T) {
started := grpcsync.NewEvent()
blockCalls := grpcsync.NewEvent()
// This stub server does not properly respect the stream context, so it will
// not exit when the context is canceled.
ss := stubserver.StubServer{
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
started.Fire()
<-blockCalls.Done()
return nil
},
}
if err := ss.Start([]grpc.ServerOption{grpc.WaitForHandlers(true)}); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start one RPC to the server.
ctx1, cancel1 := context.WithCancel(ctx)
_, err := ss.Client.FullDuplexCall(ctx1)
if err != nil {
t.Fatal("Error staring call:", err)
}
// Wait for the handler to be invoked.
select {
case <-started.Done():
case <-ctx.Done():
t.Fatalf("Timed out waiting for RPC to start on server.")
}
// Cancel it on the client. The server handler will still be running.
cancel1()
// Close the connection. This might be sufficient to allow the server to
// return if it doesn't properly wait for outstanding method handlers to
// return.
ss.CC.Close()
// Try to Stop() the server, which should block indefinitely (until
// blockCalls is fired).
stopped := grpcsync.NewEvent()
go func() {
ss.S.Stop()
stopped.Fire()
}()
// Wait 100ms and ensure stopped does not fire.
select {
case <-stopped.Done():
trace := make([]byte, 4096)
trace = trace[0:runtime.Stack(trace, true)]
blockCalls.Fire()
t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace))
case <-time.After(100 * time.Millisecond):
// Success; unblock the call and wait for stopped.
blockCalls.Fire()
}
select {
case <-stopped.Done():
case <-ctx.Done():
t.Fatalf("Timed out waiting for second RPC to start on server.")
}
}
// Tests that GracefulStop will wait for all method handlers to return by
// blocking a handler and ensuring GracefulStop doesn't return until after it is
// unblocked.
func (s) TestServer_GracefulStopWaits(t *testing.T) {
started := grpcsync.NewEvent()
blockCalls := grpcsync.NewEvent()
// This stub server does not properly respect the stream context, so it will
// not exit when the context is canceled.
ss := stubserver.StubServer{
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
started.Fire()
<-blockCalls.Done()
return nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start one RPC to the server.
ctx1, cancel1 := context.WithCancel(ctx)
_, err := ss.Client.FullDuplexCall(ctx1)
if err != nil {
t.Fatal("Error staring call:", err)
}
// Wait for the handler to be invoked.
select {
case <-started.Done():
case <-ctx.Done():
t.Fatalf("Timed out waiting for RPC to start on server.")
}
// Cancel it on the client. The server handler will still be running.
cancel1()
// Close the connection. This might be sufficient to allow the server to
// return if it doesn't properly wait for outstanding method handlers to
// return.
ss.CC.Close()
// Try to Stop() the server, which should block indefinitely (until
// blockCalls is fired).
stopped := grpcsync.NewEvent()
go func() {
ss.S.GracefulStop()
stopped.Fire()
}()
// Wait 100ms and ensure stopped does not fire.
select {
case <-stopped.Done():
trace := make([]byte, 4096)
trace = trace[0:runtime.Stack(trace, true)]
blockCalls.Fire()
t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace))
case <-time.After(100 * time.Millisecond):
// Success; unblock the call and wait for stopped.
blockCalls.Fire()
}
select {
case <-stopped.Done():
case <-ctx.Done():
t.Fatalf("Timed out waiting for second RPC to start on server.")
}
}