blob: 7d9f1f5560a8fd73b649a4c1914f969ceff23136 [file] [log] [blame]
* Copyright 2023 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package grpc_test
import (
testgrpc ""
testpb ""
// TestServer_MaxHandlers ensures that no more than MaxConcurrentStreams server
// handlers are active at one time.
func (s) TestServer_MaxHandlers(t *testing.T) {
started := make(chan struct{})
blockCalls := grpcsync.NewEvent()
// This stub server does not properly respect the stream context, so it will
// not exit when the context is canceled.
ss := stubserver.StubServer{
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
started <- struct{}{}
return nil
if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}); err != nil {
t.Fatal("Error starting server:", err)
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start one RPC to the server.
ctx1, cancel1 := context.WithCancel(ctx)
_, err := ss.Client.FullDuplexCall(ctx1)
if err != nil {
t.Fatal("Error staring call:", err)
// Wait for the handler to be invoked.
select {
case <-started:
case <-ctx.Done():
t.Fatalf("Timed out waiting for RPC to start on server.")
// Cancel it on the client. The server handler will still be running.
ctx2, cancel2 := context.WithCancel(ctx)
defer cancel2()
s, err := ss.Client.FullDuplexCall(ctx2)
if err != nil {
t.Fatal("Error staring call:", err)
// After 100ms, allow the first call to unblock. That should allow the
// second RPC to run and finish.
select {
case <-started:
t.Fatalf("RPC started unexpectedly.")
case <-time.After(100 * time.Millisecond):
select {
case <-started:
case <-ctx.Done():
t.Fatalf("Timed out waiting for second RPC to start on server.")
if _, err := s.Recv(); err != io.EOF {
t.Fatal("Received unexpected RPC error:", err)
// Tests the case where the stream worker goroutine option is enabled, and a
// number of RPCs are initiated around the same time that Stop() is called. This
// used to result in a write to a closed channel. This test verifies that there
// is no panic.
func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) {
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
// This deferred stop takes care of stopping the server when one of the
// below grpc.Dials fail, and the test exits early.
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
const numChannels = 20
const numRPCLoops = 20
// Create a bunch of clientconns and ensure that they are READY by making an
// RPC on them.
ccs := make([]*grpc.ClientConn, numChannels)
for i := 0; i < numChannels; i++ {
var err error
ccs[i], err = grpc.Dial(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("[iteration: %d] grpc.Dial(%s) failed: %v", i, ss.Address, err)
defer ccs[i].Close()
client := testgrpc.NewTestServiceClient(ccs[i])
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
// Make a bunch of concurrent RPCs on the above clientconns. These will
// eventually race with Stop(), and will start to fail.
var wg sync.WaitGroup
for i := 0; i < numChannels; i++ {
client := testgrpc.NewTestServiceClient(ccs[i])
for j := 0; j < numRPCLoops; j++ {
go func(client testgrpc.TestServiceClient) {
defer wg.Done()
for {
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if err == nil {
if code := status.Code(err); code == codes.Unavailable {
// Once Stop() has been called on the server, we expect
// subsequent calls to fail with Unavailable.
t.Errorf("EmptyCall() failed: %v", err)
// Call Stop() concurrently with the above RPC attempts.
// Tests the case where the stream worker goroutine option is enabled, and both
// Stop() and GracefulStop() care called. This used to result in a close of a
// closed channel. This test verifies that there is no panic.
func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) {
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
defer ss.Stop()
if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
t.Fatalf("Failed to create client to stub server: %v", err)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(ss.CC)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
// Tests the WaitForHandlers ServerOption by leaving an RPC running while Stop
// is called, and ensures Stop doesn't return until the handler returns.
func (s) TestServer_WaitForHandlers(t *testing.T) {
started := grpcsync.NewEvent()
blockCalls := grpcsync.NewEvent()
// This stub server does not properly respect the stream context, so it will
// not exit when the context is canceled.
ss := stubserver.StubServer{
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
return nil
if err := ss.Start([]grpc.ServerOption{grpc.WaitForHandlers(true)}); err != nil {
t.Fatal("Error starting server:", err)
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start one RPC to the server.
ctx1, cancel1 := context.WithCancel(ctx)
_, err := ss.Client.FullDuplexCall(ctx1)
if err != nil {
t.Fatal("Error staring call:", err)
// Wait for the handler to be invoked.
select {
case <-started.Done():
case <-ctx.Done():
t.Fatalf("Timed out waiting for RPC to start on server.")
// Cancel it on the client. The server handler will still be running.
// Close the connection. This might be sufficient to allow the server to
// return if it doesn't properly wait for outstanding method handlers to
// return.
// Try to Stop() the server, which should block indefinitely (until
// blockCalls is fired).
stopped := grpcsync.NewEvent()
go func() {
// Wait 100ms and ensure stopped does not fire.
select {
case <-stopped.Done():
trace := make([]byte, 4096)
trace = trace[0:runtime.Stack(trace, true)]
t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace))
case <-time.After(100 * time.Millisecond):
// Success; unblock the call and wait for stopped.
select {
case <-stopped.Done():
case <-ctx.Done():
t.Fatalf("Timed out waiting for second RPC to start on server.")
// Tests that GracefulStop will wait for all method handlers to return by
// blocking a handler and ensuring GracefulStop doesn't return until after it is
// unblocked.
func (s) TestServer_GracefulStopWaits(t *testing.T) {
started := grpcsync.NewEvent()
blockCalls := grpcsync.NewEvent()
// This stub server does not properly respect the stream context, so it will
// not exit when the context is canceled.
ss := stubserver.StubServer{
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
return nil
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start one RPC to the server.
ctx1, cancel1 := context.WithCancel(ctx)
_, err := ss.Client.FullDuplexCall(ctx1)
if err != nil {
t.Fatal("Error staring call:", err)
// Wait for the handler to be invoked.
select {
case <-started.Done():
case <-ctx.Done():
t.Fatalf("Timed out waiting for RPC to start on server.")
// Cancel it on the client. The server handler will still be running.
// Close the connection. This might be sufficient to allow the server to
// return if it doesn't properly wait for outstanding method handlers to
// return.
// Try to Stop() the server, which should block indefinitely (until
// blockCalls is fired).
stopped := grpcsync.NewEvent()
go func() {
// Wait 100ms and ensure stopped does not fire.
select {
case <-stopped.Done():
trace := make([]byte, 4096)
trace = trace[0:runtime.Stack(trace, true)]
t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace))
case <-time.After(100 * time.Millisecond):
// Success; unblock the call and wait for stopped.
select {
case <-stopped.Done():
case <-ctx.Done():
t.Fatalf("Timed out waiting for second RPC to start on server.")