| /* |
| * |
| * Copyright 2021 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package rls |
| |
| import ( |
| "context" |
| "strings" |
| "testing" |
| "time" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/balancer/rls/internal/test/e2e" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/internal" |
| "google.golang.org/grpc/internal/balancergroup" |
| "google.golang.org/grpc/internal/grpcsync" |
| "google.golang.org/grpc/internal/grpctest" |
| rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" |
| internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" |
| "google.golang.org/grpc/internal/stubserver" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/resolver/manual" |
| "google.golang.org/grpc/serviceconfig" |
| "google.golang.org/grpc/status" |
| testgrpc "google.golang.org/grpc/test/grpc_testing" |
| testpb "google.golang.org/grpc/test/grpc_testing" |
| "google.golang.org/protobuf/types/known/durationpb" |
| ) |
| |
| const ( |
| defaultTestTimeout = 5 * time.Second |
| defaultTestShortTimeout = 100 * time.Millisecond |
| ) |
| |
| func init() { |
| balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond |
| } |
| |
| type s struct { |
| grpctest.Tester |
| } |
| |
| func Test(t *testing.T) { |
| grpctest.RunSubTests(t, s{}) |
| } |
| |
| // fakeBackoffStrategy is a fake implementation of the backoff.Strategy |
| // interface, for tests to inject the backoff duration. |
| type fakeBackoffStrategy struct { |
| backoff time.Duration |
| } |
| |
| func (f *fakeBackoffStrategy) Backoff(retries int) time.Duration { |
| return f.backoff |
| } |
| |
| // fakeThrottler is a fake implementation of the adaptiveThrottler interface. |
| type fakeThrottler struct { |
| throttleFunc func() bool // Fake throttler implementation. |
| throttleCh chan struct{} // Invocation of ShouldThrottle signals here. |
| } |
| |
| func (f *fakeThrottler) ShouldThrottle() bool { |
| select { |
| case <-f.throttleCh: |
| default: |
| } |
| f.throttleCh <- struct{}{} |
| |
| return f.throttleFunc() |
| } |
| |
| func (f *fakeThrottler) RegisterBackendResponse(bool) {} |
| |
| // alwaysThrottlingThrottler returns a fake throttler which always throttles. |
| func alwaysThrottlingThrottler() *fakeThrottler { |
| return &fakeThrottler{ |
| throttleFunc: func() bool { return true }, |
| throttleCh: make(chan struct{}, 1), |
| } |
| } |
| |
| // neverThrottlingThrottler returns a fake throttler which never throttles. |
| func neverThrottlingThrottler() *fakeThrottler { |
| return &fakeThrottler{ |
| throttleFunc: func() bool { return false }, |
| throttleCh: make(chan struct{}, 1), |
| } |
| } |
| |
| // oneTimeAllowingThrottler returns a fake throttler which does not throttle |
| // requests until the client RPC succeeds, but throttles everything that comes |
| // after. This is useful for tests which need to set up a valid cache entry |
| // before testing other cases. |
| func oneTimeAllowingThrottler(firstRPCDone *grpcsync.Event) *fakeThrottler { |
| return &fakeThrottler{ |
| throttleFunc: firstRPCDone.HasFired, |
| throttleCh: make(chan struct{}, 1), |
| } |
| } |
| |
| func overrideAdaptiveThrottler(t *testing.T, f *fakeThrottler) { |
| origAdaptiveThrottler := newAdaptiveThrottler |
| newAdaptiveThrottler = func() adaptiveThrottler { return f } |
| t.Cleanup(func() { newAdaptiveThrottler = origAdaptiveThrottler }) |
| } |
| |
| // buildBasicRLSConfig constructs a basic service config for the RLS LB policy |
| // with header matching rules. This expects the passed child policy name to |
| // have been registered by the caller. |
| func buildBasicRLSConfig(childPolicyName, rlsServerAddress string) *e2e.RLSConfig { |
| return &e2e.RLSConfig{ |
| RouteLookupConfig: &rlspb.RouteLookupConfig{ |
| GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{ |
| { |
| Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}, |
| Headers: []*rlspb.NameMatcher{ |
| {Key: "k1", Names: []string{"n1"}}, |
| {Key: "k2", Names: []string{"n2"}}, |
| }, |
| }, |
| }, |
| LookupService: rlsServerAddress, |
| LookupServiceTimeout: durationpb.New(defaultTestTimeout), |
| CacheSizeBytes: 1024, |
| }, |
| RouteLookupChannelServiceConfig: `{"loadBalancingConfig": [{"pick_first": {}}]}`, |
| ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicyName}, |
| ChildPolicyConfigTargetFieldName: e2e.RLSChildPolicyTargetNameField, |
| } |
| } |
| |
| // buildBasicRLSConfigWithChildPolicy constructs a very basic service config for |
| // the RLS LB policy. It also registers a test LB policy which is capable of |
| // being a child of the RLS LB policy. |
| func buildBasicRLSConfigWithChildPolicy(t *testing.T, childPolicyName, rlsServerAddress string) *e2e.RLSConfig { |
| childPolicyName = "test-child-policy" + childPolicyName |
| e2e.RegisterRLSChildPolicy(childPolicyName, nil) |
| t.Logf("Registered child policy with name %q", childPolicyName) |
| |
| return &e2e.RLSConfig{ |
| RouteLookupConfig: &rlspb.RouteLookupConfig{ |
| GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{{Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}}}, |
| LookupService: rlsServerAddress, |
| LookupServiceTimeout: durationpb.New(defaultTestTimeout), |
| CacheSizeBytes: 1024, |
| }, |
| RouteLookupChannelServiceConfig: `{"loadBalancingConfig": [{"pick_first": {}}]}`, |
| ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicyName}, |
| ChildPolicyConfigTargetFieldName: e2e.RLSChildPolicyTargetNameField, |
| } |
| } |
| |
| // startBackend starts a backend implementing the TestService on a local port. |
| // It returns a channel for tests to get notified whenever an RPC is invoked on |
| // the backend. This allows tests to ensure that RPCs reach expected backends. |
| // Also returns the address of the backend. |
| func startBackend(t *testing.T, sopts ...grpc.ServerOption) (rpcCh chan struct{}, address string) { |
| t.Helper() |
| |
| rpcCh = make(chan struct{}, 1) |
| backend := &stubserver.StubServer{ |
| EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { |
| select { |
| case rpcCh <- struct{}{}: |
| default: |
| } |
| return &testpb.Empty{}, nil |
| }, |
| } |
| if err := backend.StartServer(sopts...); err != nil { |
| t.Fatalf("Failed to start backend: %v", err) |
| } |
| t.Logf("Started TestService backend at: %q", backend.Address) |
| t.Cleanup(func() { backend.Stop() }) |
| return rpcCh, backend.Address |
| } |
| |
| // startManualResolverWithConfig registers and returns a manual resolver which |
| // pushes the RLS LB policy's service config on the channel. |
| func startManualResolverWithConfig(t *testing.T, rlsConfig *e2e.RLSConfig) *manual.Resolver { |
| t.Helper() |
| |
| scJSON, err := rlsConfig.ServiceConfigJSON() |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON) |
| r := manual.NewBuilderWithScheme("rls-e2e") |
| r.InitialState(resolver.State{ServiceConfig: sc}) |
| t.Cleanup(r.Close) |
| return r |
| } |
| |
| // makeTestRPCAndExpectItToReachBackend is a test helper function which makes |
| // the EmptyCall RPC on the given ClientConn and verifies that it reaches a |
| // backend. The latter is accomplished by listening on the provided channel |
| // which gets pushed to whenever the backend in question gets an RPC. |
| // |
| // There are many instances where it can take a while before the attempted RPC |
| // reaches the expected backend. Examples include, but are not limited to: |
| // - control channel is changed in a config update. The RLS LB policy creates a |
| // new control channel, and sends a new picker to gRPC. But it takes a while |
| // before gRPC actually starts using the new picker. |
| // - test is waiting for a cache entry to expire after which we expect a |
| // different behavior because we have configured the fake RLS server to return |
| // different backends. |
| // |
| // Therefore, we do not return an error when the RPC fails. Instead, we wait for |
| // the context to expire before failing. |
| func makeTestRPCAndExpectItToReachBackend(ctx context.Context, t *testing.T, cc *grpc.ClientConn, ch chan struct{}) { |
| t.Helper() |
| |
| // Drain the backend channel before performing the RPC to remove any |
| // notifications from previous RPCs. |
| select { |
| case <-ch: |
| default: |
| } |
| |
| for { |
| if err := ctx.Err(); err != nil { |
| t.Fatalf("Timeout when waiting for RPCs to be routed to the given target: %v", err) |
| } |
| sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) |
| client := testgrpc.NewTestServiceClient(cc) |
| client.EmptyCall(sCtx, &testpb.Empty{}) |
| |
| select { |
| case <-sCtx.Done(): |
| case <-ch: |
| sCancel() |
| return |
| } |
| } |
| } |
| |
| // makeTestRPCAndVerifyError is a test helper function which makes the EmptyCall |
| // RPC on the given ClientConn and verifies that the RPC fails with the given |
| // status code and error. |
| // |
| // Similar to makeTestRPCAndExpectItToReachBackend, retries until expected |
| // outcome is reached or the provided context has expired. |
| func makeTestRPCAndVerifyError(ctx context.Context, t *testing.T, cc *grpc.ClientConn, wantCode codes.Code, wantErr error) { |
| t.Helper() |
| |
| for { |
| if err := ctx.Err(); err != nil { |
| t.Fatalf("Timeout when waiting for RPCs to fail with given error: %v", err) |
| } |
| sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) |
| client := testgrpc.NewTestServiceClient(cc) |
| _, err := client.EmptyCall(sCtx, &testpb.Empty{}) |
| |
| // If the RPC fails with the expected code and expected error message (if |
| // one was provided), we return. Else we retry after blocking for a little |
| // while to ensure that we don't keep blasting away with RPCs. |
| if code := status.Code(err); code == wantCode { |
| if wantErr == nil || strings.Contains(err.Error(), wantErr.Error()) { |
| sCancel() |
| return |
| } |
| } |
| <-sCtx.Done() |
| } |
| } |
| |
| // verifyRLSRequest is a test helper which listens on a channel to see if an RLS |
| // request was received by the fake RLS server. Based on whether the test |
| // expects a request to be sent out or not, it uses a different timeout. |
| func verifyRLSRequest(t *testing.T, ch chan struct{}, wantRequest bool) { |
| t.Helper() |
| |
| if wantRequest { |
| select { |
| case <-time.After(defaultTestTimeout): |
| t.Fatalf("Timeout when waiting for an RLS request to be sent out") |
| case <-ch: |
| } |
| } else { |
| select { |
| case <-time.After(defaultTestShortTimeout): |
| case <-ch: |
| t.Fatalf("RLS request sent out when not expecting one") |
| } |
| } |
| } |