blob: b70afebd40591aa1d7a4eb9dba18aa8d1a2dfe84 [file] [log] [blame]
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test
import (
"context"
"fmt"
"testing"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/fakegrpclb"
"google.golang.org/grpc/internal/testutils/pickfirst"
rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
const (
loadBalancedServiceName = "foo.bar.service"
loadBalancedServicePort = 443
wantGRPCLBTraceDesc = `Channel switches to new LB policy "grpclb"`
wantRoundRobinTraceDesc = `Channel switches to new LB policy "round_robin"`
// This is the number of stub backends set up at the start of each test. The
// first backend is used for the "grpclb" policy and the rest are used for
// other LB policies to test balancer switching.
backendCount = 3
)
// setupBackendsAndFakeGRPCLB sets up backendCount number of stub server
// backends and a fake grpclb server for tests which exercise balancer switch
// scenarios involving grpclb.
//
// The fake grpclb server always returns the first of the configured stub
// backends as backend addresses. So, the tests are free to use the other
// backends with other LB policies to verify balancer switching scenarios.
//
// Returns a cleanup function to be invoked by the caller.
func setupBackendsAndFakeGRPCLB(t *testing.T) ([]*stubserver.StubServer, *fakegrpclb.Server, func()) {
backends, backendsCleanup := startBackendsForBalancerSwitch(t)
lbServer, err := fakegrpclb.NewServer(fakegrpclb.ServerParams{
LoadBalancedServiceName: loadBalancedServiceName,
LoadBalancedServicePort: loadBalancedServicePort,
BackendAddresses: []string{backends[0].Address},
})
if err != nil {
t.Fatalf("failed to create fake grpclb server: %v", err)
}
go func() {
if err := lbServer.Serve(); err != nil {
t.Errorf("fake grpclb Serve() failed: %v", err)
}
}()
return backends, lbServer, func() {
backendsCleanup()
lbServer.Stop()
}
}
// startBackendsForBalancerSwitch spins up a bunch of stub server backends
// exposing the TestService. Returns a cleanup function to be invoked by the
// caller.
func startBackendsForBalancerSwitch(t *testing.T) ([]*stubserver.StubServer, func()) {
t.Helper()
backends := make([]*stubserver.StubServer, backendCount)
for i := 0; i < backendCount; i++ {
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
}
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Logf("Started TestService backend at: %q", backend.Address)
backends[i] = backend
}
return backends, func() {
for _, b := range backends {
b.Stop()
}
}
}
// TestBalancerSwitch_Basic tests the basic scenario of switching from one LB
// policy to another, as specified in the service config.
func (s) TestBalancerSwitch_Basic(t *testing.T) {
backends, cleanup := startBackendsForBalancerSwitch(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update without an LB policy in the service config. The
// channel should pick the default LB policy, which is pick_first.
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}
// Push a resolver update with the service config specifying "round_robin".
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
t.Fatal(err)
}
// Push a resolver update with the service config specifying "pick_first".
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseServiceConfig(t, r, pickFirstServiceConfig),
})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}
}
// TestBalancerSwitch_grpclbToPickFirst tests the scenario where the channel
// starts off "grpclb", switches to "pick_first" and back.
func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update with a GRPCLB service config and a single address
// pointing to the grpclb server we created above. This will cause the
// channel to switch to the "grpclb" balancer, which returns a single
// backend address.
grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
state := resolver.State{ServiceConfig: grpclbConfig}
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[0:1]); err != nil {
t.Fatal(err)
}
// Push a resolver update containing a non-existent grpclb server address.
// This should not lead to a balancer switch.
const nonExistentServer = "non-existent-grpclb-server-address"
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: nonExistentServer}}}))
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
// Push a resolver update containing no grpclb server address. This should
// lead to the channel using the default LB policy which is pick_first. The
// list of addresses pushed as part of this update is different from the one
// returned by the "grpclb" balancer. So, we should see RPCs going to the
// newly configured backends, as part of the balancer switch.
emptyConfig := parseServiceConfig(t, r, `{}`)
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: emptyConfig})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
}
// TestBalancerSwitch_pickFirstToGRPCLB tests the scenario where the channel
// starts off with "pick_first", switches to "grpclb" and back.
func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) {
backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update containing no grpclb server address. This should
// lead to the channel using the default LB policy which is pick_first.
r.UpdateState(resolver.State{Addresses: addrs[1:]})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, which returns a single backend address.
grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
state := resolver.State{ServiceConfig: grpclbConfig}
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
// Push a resolver update containing a non-existent grpclb server address.
// This should not lead to a balancer switch.
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: "nonExistentServer"}}}))
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
// Switch to "pick_first" again by sending no grpclb server addresses.
emptyConfig := parseServiceConfig(t, r, `{}`)
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: emptyConfig})
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
}
// TestBalancerSwitch_RoundRobinToGRPCLB tests the scenario where the channel
// starts off with "round_robin", switches to "grpclb" and back.
//
// Note that this test uses the deprecated `loadBalancingPolicy` field in the
// service config.
func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) {
backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Note the use of the deprecated `loadBalancingPolicy` field here instead
// of the now recommended `loadBalancingConfig` field. The logic in the
// ClientConn which decides which balancer to switch to looks at the
// following places in the given order of preference:
// - `loadBalancingConfig` field
// - addresses of type grpclb
// - `loadBalancingPolicy` field
// If we use the `loadBalancingPolicy` field, the switch to "grpclb" later on
// in the test will not happen as the ClientConn will continue to use the LB
// policy received in the first update.
scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`)
// Push a resolver update with the service config specifying "round_robin".
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
// Push a resolver update with grpclb and a single balancer address
// pointing to the grpclb server we created above. This will cause the
// channel to switch to the "grpclb" balancer, which returns a single
// backend address.
grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
state := resolver.State{ServiceConfig: grpclbConfig}
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
// Switch back to "round_robin".
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr})
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
}
// TestBalancerSwitch_grpclbNotRegistered tests the scenario where the grpclb
// balancer is not registered. Verifies that the ClientConn falls back to the
// default LB policy or the LB policy specified in the service config, and that
// addresses of type "grpclb" are filtered out.
func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) {
// Unregister the grpclb balancer builder for the duration of this test.
grpclbBuilder := balancer.Get("grpclb")
internal.BalancerUnregister(grpclbBuilder.Name())
defer balancer.Register(grpclbBuilder)
backends, cleanup := startBackendsForBalancerSwitch(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update which contains a bunch of stub server backends and a
// grpclb server address. The latter should get the ClientConn to try and
// apply the grpclb policy. But since grpclb is not registered, it should
// fallback to the default LB policy which is pick_first. The ClientConn is
// also expected to filter out the grpclb address when sending the addresses
// list fo pick_first.
grpclbAddr := []resolver.Address{{Addr: "non-existent-grpclb-server-address"}}
grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
state := resolver.State{ServiceConfig: grpclbConfig, Addresses: addrs}
r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: grpclbAddr}))
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}
// Push a resolver update with the same addresses, but with a service config
// specifying "round_robin". The ClientConn is expected to filter out the
// grpclb address when sending the addresses list to round_robin.
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
t.Fatal(err)
}
}
// TestBalancerSwitch_OldBalancerCallsShutdownInClose tests the scenario where
// the balancer being switched out calls Shutdown() in its Close()
// method. Verifies that this sequence of calls doesn't lead to a deadlock.
func (s) TestBalancerSwitch_OldBalancerCallsShutdownInClose(t *testing.T) {
// Register a stub balancer which calls Shutdown() from its Close().
scChan := make(chan balancer.SubConn, 1)
uccsCalled := make(chan struct{}, 1)
stub.Register(t.Name(), stub.BalancerFuncs{
UpdateClientConnState: func(data *stub.BalancerData, ccs balancer.ClientConnState) error {
sc, err := data.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
t.Errorf("failed to create subConn: %v", err)
}
scChan <- sc
close(uccsCalled)
return nil
},
Close: func(data *stub.BalancerData) {
(<-scChan).Shutdown()
},
})
r := manual.NewBuilderWithScheme("whatever")
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update specifying our stub balancer as the LB policy.
scpr := parseServiceConfig(t, r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, t.Name()))
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: "dummy-address"}},
ServiceConfig: scpr,
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for UpdateClientConnState to be called: %v", ctx.Err())
case <-uccsCalled:
}
// The following service config update will switch balancer from our stub
// balancer to pick_first. The former will be closed, which will call
// sc.Shutdown() inline.
//
// This is to make sure the sc.Shutdown() from Close() doesn't cause a
// deadlock (e.g. trying to grab a mutex while it's already locked).
//
// Do it in a goroutine so this test will fail with a helpful message
// (though the goroutine will still leak).
done := make(chan struct{})
go func() {
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: "dummy-address"}},
ServiceConfig: parseServiceConfig(t, r, pickFirstServiceConfig),
})
close(done)
}()
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for resolver.UpdateState to finish: %v", ctx.Err())
case <-done:
}
}
// TestBalancerSwitch_Graceful tests the graceful switching of LB policies. It
// starts off by configuring "round_robin" on the channel and ensures that RPCs
// are successful. Then, it switches to a stub balancer which does not report a
// picker until instructed by the test do to so. At this point, the test
// verifies that RPCs are still successful using the old balancer. Then the test
// asks the new balancer to report a healthy picker and the test verifies that
// the RPCs get routed using the picker reported by the new balancer.
func (s) TestBalancerSwitch_Graceful(t *testing.T) {
backends, cleanup := startBackendsForBalancerSwitch(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update with the service config specifying "round_robin".
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
r.UpdateState(resolver.State{
Addresses: addrs[1:],
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
// Register a stub balancer which uses a "pick_first" balancer underneath and
// signals on a channel when it receives ClientConn updates. But it does not
// forward the ccUpdate to the underlying "pick_first" balancer until the test
// asks it to do so. This allows us to test the graceful switch functionality.
// Until the test asks the stub balancer to forward the ccUpdate, RPCs should
// get routed to the old balancer. And once the test gives the go ahead, RPCs
// should get routed to the new balancer.
ccUpdateCh := make(chan struct{})
waitToProceed := make(chan struct{})
stub.Register(t.Name(), stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
pf := balancer.Get(grpc.PickFirstBalancerName)
bd.Data = pf.Build(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
close(ccUpdateCh)
go func() {
<-waitToProceed
bal.UpdateClientConnState(ccs)
}()
return nil
},
})
// Push a resolver update with the service config specifying our stub
// balancer. We should see a trace event for this balancer switch. But RPCs
// should still be routed to the old balancer since our stub balancer does not
// report a ready picker until we ask it to do so.
r.UpdateState(resolver.State{
Addresses: addrs[:1],
ServiceConfig: r.CC.ParseServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%v": {}}]}`, t.Name())),
})
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for a ClientConnState update on the new balancer")
case <-ccUpdateCh:
}
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
// Ask our stub balancer to forward the earlier received ccUpdate to the
// underlying "pick_first" balancer which will result in a healthy picker
// being reported to the channel. RPCs should start using the new balancer.
close(waitToProceed)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}
}