| /* |
| * |
| * Copyright 2022 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package test |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "strings" |
| "testing" |
| |
| "github.com/google/go-cmp/cmp" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/credentials/insecure" |
| "google.golang.org/grpc/internal" |
| "google.golang.org/grpc/internal/balancer/stub" |
| "google.golang.org/grpc/internal/stubserver" |
| "google.golang.org/grpc/internal/testutils" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/resolver/manual" |
| "google.golang.org/grpc/serviceconfig" |
| "google.golang.org/grpc/status" |
| testpb "google.golang.org/grpc/test/grpc_testing" |
| ) |
| |
| // TestResolverUpdateDuringBuild_ServiceConfigParseError makes the |
| // resolver.Builder call into the ClientConn, during the Build call, with a |
| // service config parsing error. |
| // |
| // We use two separate mutexes in the code which make sure there is no data race |
| // in this code path, and also that there is no deadlock. |
| func (s) TestResolverUpdateDuringBuild_ServiceConfigParseError(t *testing.T) { |
| // Setting InitialState on the manual resolver makes it call into the |
| // ClientConn during the Build call. |
| r := manual.NewBuilderWithScheme("whatever") |
| r.InitialState(resolver.State{ServiceConfig: &serviceconfig.ParseResult{Err: errors.New("resolver build err")}}) |
| |
| cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) |
| if err != nil { |
| t.Fatalf("Dial(_, _) = _, %v; want _, nil", err) |
| } |
| defer cc.Close() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| client := testpb.NewTestServiceClient(cc) |
| const wantMsg = "error parsing service config" |
| const wantCode = codes.Unavailable |
| if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != wantCode || !strings.Contains(status.Convert(err).Message(), wantMsg) { |
| t.Fatalf("EmptyCall RPC failed: %v; want code: %v, want message: %q", err, wantCode, wantMsg) |
| } |
| } |
| |
| type fakeConfig struct { |
| serviceconfig.Config |
| } |
| |
| // TestResolverUpdateDuringBuild_ServiceConfigInvalidTypeError makes the |
| // resolver.Builder call into the ClientConn, during the Build call, with an |
| // invalid service config type. |
| // |
| // We use two separate mutexes in the code which make sure there is no data race |
| // in this code path, and also that there is no deadlock. |
| func (s) TestResolverUpdateDuringBuild_ServiceConfigInvalidTypeError(t *testing.T) { |
| // Setting InitialState on the manual resolver makes it call into the |
| // ClientConn during the Build call. |
| r := manual.NewBuilderWithScheme("whatever") |
| r.InitialState(resolver.State{ServiceConfig: &serviceconfig.ParseResult{Config: fakeConfig{}}}) |
| |
| cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) |
| if err != nil { |
| t.Fatalf("Dial(_, _) = _, %v; want _, nil", err) |
| } |
| defer cc.Close() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| client := testpb.NewTestServiceClient(cc) |
| const wantMsg = "illegal service config type" |
| const wantCode = codes.Unavailable |
| if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != wantCode || !strings.Contains(status.Convert(err).Message(), wantMsg) { |
| t.Fatalf("EmptyCall RPC failed: %v; want code: %v, want message: %q", err, wantCode, wantMsg) |
| } |
| } |
| |
| // TestResolverUpdate_InvalidServiceConfigAsFirstUpdate makes the resolver send |
| // an update with an invalid service config as its first update. This should |
| // make the ClientConn apply the failing LB policy, and should result in RPC |
| // errors indicating the failing service config. |
| func (s) TestResolverUpdate_InvalidServiceConfigAsFirstUpdate(t *testing.T) { |
| r := manual.NewBuilderWithScheme("whatever") |
| |
| cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) |
| if err != nil { |
| t.Fatalf("Dial(_, _) = _, %v; want _, nil", err) |
| } |
| defer cc.Close() |
| |
| scpr := r.CC.ParseServiceConfig("bad json service config") |
| r.UpdateState(resolver.State{ServiceConfig: scpr}) |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| client := testpb.NewTestServiceClient(cc) |
| const wantMsg = "error parsing service config" |
| const wantCode = codes.Unavailable |
| if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != wantCode || !strings.Contains(status.Convert(err).Message(), wantMsg) { |
| t.Fatalf("EmptyCall RPC failed: %v; want code: %v, want message: %q", err, wantCode, wantMsg) |
| } |
| } |
| |
| func verifyClientConnStateUpdate(got, want balancer.ClientConnState) error { |
| if got, want := got.ResolverState.Addresses, want.ResolverState.Addresses; !cmp.Equal(got, want) { |
| return fmt.Errorf("update got unexpected addresses: %v, want %v", got, want) |
| } |
| if got, want := got.ResolverState.ServiceConfig.Config, want.ResolverState.ServiceConfig.Config; !internal.EqualServiceConfigForTesting(got, want) { |
| return fmt.Errorf("received unexpected service config: \ngot: %v \nwant: %v", got, want) |
| } |
| if got, want := got.BalancerConfig, want.BalancerConfig; !cmp.Equal(got, want) { |
| return fmt.Errorf("received unexpected balancer config: \ngot: %v \nwant: %v", cmp.Diff(nil, got), cmp.Diff(nil, want)) |
| } |
| return nil |
| } |
| |
| // TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate tests the scenario |
| // where the resolver sends an update with an invalid service config after |
| // having sent a good update. This should result in the ClientConn discarding |
| // the new invalid service config, and continuing to use the old good config. |
| func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) { |
| type wrappingBalancerConfig struct { |
| serviceconfig.LoadBalancingConfig |
| Config string `json:"config,omitempty"` |
| } |
| |
| // Register a stub balancer which uses a "pick_first" balancer underneath and |
| // signals on a channel when it receives ClientConn updates. |
| ccUpdateCh := testutils.NewChannel() |
| stub.Register(t.Name(), stub.BalancerFuncs{ |
| Init: func(bd *stub.BalancerData) { |
| pf := balancer.Get(grpc.PickFirstBalancerName) |
| bd.Data = pf.Build(bd.ClientConn, bd.BuildOptions) |
| }, |
| ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { |
| cfg := &wrappingBalancerConfig{} |
| if err := json.Unmarshal(lbCfg, cfg); err != nil { |
| return nil, err |
| } |
| return cfg, nil |
| }, |
| UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { |
| if _, ok := ccs.BalancerConfig.(*wrappingBalancerConfig); !ok { |
| return fmt.Errorf("received balancer config of unsupported type %T", ccs.BalancerConfig) |
| } |
| bal := bd.Data.(balancer.Balancer) |
| ccUpdateCh.Send(ccs) |
| return bal.UpdateClientConnState(ccs) |
| }, |
| UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { |
| bal := bd.Data.(balancer.Balancer) |
| bal.UpdateSubConnState(sc, state) |
| }, |
| }) |
| |
| // Start a backend exposing the test service. |
| backend := &stubserver.StubServer{ |
| EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, |
| } |
| if err := backend.StartServer(); err != nil { |
| t.Fatalf("Failed to start backend: %v", err) |
| } |
| t.Logf("Started TestService backend at: %q", backend.Address) |
| defer backend.Stop() |
| |
| r := manual.NewBuilderWithScheme("whatever") |
| |
| cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) |
| if err != nil { |
| t.Fatalf("Dial(_, _) = _, %v; want _, nil", err) |
| } |
| defer cc.Close() |
| |
| // Push a resolver update and verify that our balancer receives the update. |
| addrs := []resolver.Address{{Addr: backend.Address}} |
| const lbCfg = "wrapping balancer LB policy config" |
| goodSC := r.CC.ParseServiceConfig(fmt.Sprintf(` |
| { |
| "loadBalancingConfig": [ |
| { |
| "%v": { |
| "config": "%s" |
| } |
| } |
| ] |
| }`, t.Name(), lbCfg)) |
| r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: goodSC}) |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| wantCCS := balancer.ClientConnState{ |
| ResolverState: resolver.State{ |
| Addresses: addrs, |
| ServiceConfig: goodSC, |
| }, |
| BalancerConfig: &wrappingBalancerConfig{Config: lbCfg}, |
| } |
| ccs, err := ccUpdateCh.Receive(ctx) |
| if err != nil { |
| t.Fatalf("Timeout when waiting for ClientConnState update from grpc") |
| } |
| gotCCS := ccs.(balancer.ClientConnState) |
| if err := verifyClientConnStateUpdate(gotCCS, wantCCS); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Ensure RPCs are successful. |
| client := testpb.NewTestServiceClient(cc) |
| if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { |
| t.Fatalf("EmptyCall RPC failed: %v", err) |
| } |
| |
| // Push a bad resolver update and ensure that the update is propagated to our |
| // stub balancer. But since the pushed update contains an invalid service |
| // config, our balancer should continue to see the old loadBalancingConfig. |
| badSC := r.CC.ParseServiceConfig("bad json service config") |
| wantCCS.ResolverState.ServiceConfig = badSC |
| r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: badSC}) |
| ccs, err = ccUpdateCh.Receive(ctx) |
| if err != nil { |
| t.Fatalf("Timeout when waiting for ClientConnState update from grpc") |
| } |
| gotCCS = ccs.(balancer.ClientConnState) |
| if err := verifyClientConnStateUpdate(gotCCS, wantCCS); err != nil { |
| t.Fatal(err) |
| } |
| |
| // RPCs should continue to be successful since the ClientConn is using the old |
| // good service config. |
| if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { |
| t.Fatalf("EmptyCall RPC failed: %v", err) |
| } |
| } |