| /* |
| * |
| * Copyright 2020 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package rls |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "math" |
| "testing" |
| "time" |
| |
| "github.com/google/go-cmp/cmp" |
| |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/balancer/rls/internal/cache" |
| "google.golang.org/grpc/balancer/rls/internal/keys" |
| rlspb "google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1" |
| "google.golang.org/grpc/internal/grpcrand" |
| "google.golang.org/grpc/internal/testutils" |
| "google.golang.org/grpc/metadata" |
| ) |
| |
| const defaultTestMaxAge = 5 * time.Second |
| |
| // initKeyBuilderMap initializes a keyBuilderMap of the form: |
| // { |
| // "gFoo": "k1=n1", |
| // "gBar/method1": "k2=n21,n22" |
| // "gFoobar": "k3=n3", |
| // } |
| func initKeyBuilderMap() (keys.BuilderMap, error) { |
| kb1 := &rlspb.GrpcKeyBuilder{ |
| Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "gFoo"}}, |
| Headers: []*rlspb.NameMatcher{{Key: "k1", Names: []string{"n1"}}}, |
| } |
| kb2 := &rlspb.GrpcKeyBuilder{ |
| Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "gBar", Method: "method1"}}, |
| Headers: []*rlspb.NameMatcher{{Key: "k2", Names: []string{"n21", "n22"}}}, |
| } |
| kb3 := &rlspb.GrpcKeyBuilder{ |
| Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "gFoobar"}}, |
| Headers: []*rlspb.NameMatcher{{Key: "k3", Names: []string{"n3"}}}, |
| } |
| return keys.MakeBuilderMap(&rlspb.RouteLookupConfig{ |
| GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{kb1, kb2, kb3}, |
| }) |
| } |
| |
| // fakeSubConn embeds the balancer.SubConn interface and contains an id which |
| // helps verify that the expected subConn was returned by the rlsPicker. |
| type fakeSubConn struct { |
| balancer.SubConn |
| id int |
| } |
| |
| // fakePicker sends a PickResult with a fakeSubConn with the configured id. |
| type fakePicker struct { |
| id int |
| } |
| |
| func (p *fakePicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) { |
| return balancer.PickResult{SubConn: &fakeSubConn{id: p.id}}, nil |
| } |
| |
| // newFakePicker returns a fakePicker configured with a random ID. The subConns |
| // returned by this picker are of type fakefakeSubConn, and contain the same |
| // random ID, which tests can use to verify. |
| func newFakePicker() *fakePicker { |
| return &fakePicker{id: grpcrand.Intn(math.MaxInt32)} |
| } |
| |
| func verifySubConn(sc balancer.SubConn, wantID int) error { |
| fsc, ok := sc.(*fakeSubConn) |
| if !ok { |
| return fmt.Errorf("Pick() returned a SubConn of type %T, want %T", sc, &fakeSubConn{}) |
| } |
| if fsc.id != wantID { |
| return fmt.Errorf("Pick() returned SubConn %d, want %d", fsc.id, wantID) |
| } |
| return nil |
| } |
| |
| // TestPickKeyBuilder verifies the different possible scenarios for forming an |
| // RLS key for an incoming RPC. |
| func TestPickKeyBuilder(t *testing.T) { |
| kbm, err := initKeyBuilderMap() |
| if err != nil { |
| t.Fatalf("Failed to create keyBuilderMap: %v", err) |
| } |
| |
| tests := []struct { |
| desc string |
| rpcPath string |
| md metadata.MD |
| wantKey cache.Key |
| }{ |
| { |
| desc: "non existent service in keyBuilder map", |
| rpcPath: "/gNonExistentService/method", |
| md: metadata.New(map[string]string{"n1": "v1", "n3": "v3"}), |
| wantKey: cache.Key{Path: "/gNonExistentService/method", KeyMap: ""}, |
| }, |
| { |
| desc: "no metadata in incoming context", |
| rpcPath: "/gFoo/method", |
| md: metadata.MD{}, |
| wantKey: cache.Key{Path: "/gFoo/method", KeyMap: ""}, |
| }, |
| { |
| desc: "keyBuilderMatch", |
| rpcPath: "/gFoo/method", |
| md: metadata.New(map[string]string{"n1": "v1", "n3": "v3"}), |
| wantKey: cache.Key{Path: "/gFoo/method", KeyMap: "k1=v1"}, |
| }, |
| } |
| |
| for _, test := range tests { |
| t.Run(test.desc, func(t *testing.T) { |
| randID := grpcrand.Intn(math.MaxInt32) |
| p := rlsPicker{ |
| kbm: kbm, |
| readCache: func(key cache.Key) (*cache.Entry, bool) { |
| if !cmp.Equal(key, test.wantKey) { |
| t.Fatalf("rlsPicker using cacheKey %v, want %v", key, test.wantKey) |
| } |
| |
| now := time.Now() |
| return &cache.Entry{ |
| ExpiryTime: now.Add(defaultTestMaxAge), |
| StaleTime: now.Add(defaultTestMaxAge), |
| // Cache entry is configured with a child policy whose |
| // rlsPicker always returns an empty PickResult and nil |
| // error. |
| ChildPicker: &fakePicker{id: randID}, |
| }, false |
| }, |
| // The other hooks are not set here because they are not expected to be |
| // invoked for these cases and if they get invoked, they will panic. |
| } |
| |
| gotResult, err := p.Pick(balancer.PickInfo{ |
| FullMethodName: test.rpcPath, |
| Ctx: metadata.NewOutgoingContext(context.Background(), test.md), |
| }) |
| if err != nil { |
| t.Fatalf("Pick() failed with error: %v", err) |
| } |
| sc, ok := gotResult.SubConn.(*fakeSubConn) |
| if !ok { |
| t.Fatalf("Pick() returned a SubConn of type %T, want %T", gotResult.SubConn, &fakeSubConn{}) |
| } |
| if sc.id != randID { |
| t.Fatalf("Pick() returned SubConn %d, want %d", sc.id, randID) |
| } |
| }) |
| } |
| } |
| |
| // TestPick_DataCacheMiss_PendingCacheMiss verifies different Pick scenarios |
| // where the entry is neither found in the data cache nor in the pending cache. |
| func TestPick_DataCacheMiss_PendingCacheMiss(t *testing.T) { |
| const ( |
| rpcPath = "/gFoo/method" |
| wantKeyMapStr = "k1=v1" |
| ) |
| kbm, err := initKeyBuilderMap() |
| if err != nil { |
| t.Fatalf("Failed to create keyBuilderMap: %v", err) |
| } |
| md := metadata.New(map[string]string{"n1": "v1", "n3": "v3"}) |
| wantKey := cache.Key{Path: rpcPath, KeyMap: wantKeyMapStr} |
| |
| tests := []struct { |
| desc string |
| // Whether or not a default target is configured. |
| defaultPickExists bool |
| // Whether or not the RLS request should be throttled. |
| throttle bool |
| // Whether or not the test is expected to make a new RLS request. |
| wantRLSRequest bool |
| // Expected error returned by the rlsPicker under test. |
| wantErr error |
| }{ |
| { |
| desc: "rls request throttled with default pick", |
| defaultPickExists: true, |
| throttle: true, |
| }, |
| { |
| desc: "rls request throttled without default pick", |
| throttle: true, |
| wantErr: errRLSThrottled, |
| }, |
| { |
| desc: "rls request not throttled", |
| wantRLSRequest: true, |
| wantErr: balancer.ErrNoSubConnAvailable, |
| }, |
| } |
| |
| for _, test := range tests { |
| t.Run(test.desc, func(t *testing.T) { |
| rlsCh := testutils.NewChannel() |
| defaultPicker := newFakePicker() |
| |
| p := rlsPicker{ |
| kbm: kbm, |
| // Cache lookup fails, no pending entry. |
| readCache: func(key cache.Key) (*cache.Entry, bool) { |
| if !cmp.Equal(key, wantKey) { |
| t.Fatalf("cache lookup using cacheKey %v, want %v", key, wantKey) |
| } |
| return nil, false |
| }, |
| shouldThrottle: func() bool { return test.throttle }, |
| startRLS: func(path string, km keys.KeyMap) { |
| if !test.wantRLSRequest { |
| rlsCh.Send(errors.New("RLS request attempted when none was expected")) |
| return |
| } |
| if path != rpcPath { |
| rlsCh.Send(fmt.Errorf("RLS request initiated for rpcPath %s, want %s", path, rpcPath)) |
| return |
| } |
| if km.Str != wantKeyMapStr { |
| rlsCh.Send(fmt.Errorf("RLS request initiated with keys %v, want %v", km.Str, wantKeyMapStr)) |
| return |
| } |
| rlsCh.Send(nil) |
| }, |
| } |
| if test.defaultPickExists { |
| p.defaultPick = defaultPicker.Pick |
| } |
| |
| gotResult, err := p.Pick(balancer.PickInfo{ |
| FullMethodName: rpcPath, |
| Ctx: metadata.NewOutgoingContext(context.Background(), md), |
| }) |
| if err != test.wantErr { |
| t.Fatalf("Pick() returned error {%v}, want {%v}", err, test.wantErr) |
| } |
| // If the test specified that a new RLS request should be made, |
| // verify it. |
| if test.wantRLSRequest { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if rlsErr, err := rlsCh.Receive(ctx); err != nil || rlsErr != nil { |
| t.Fatalf("startRLS() = %v, error receiving from channel: %v", rlsErr, err) |
| } |
| } |
| if test.wantErr != nil { |
| return |
| } |
| |
| // We get here only for cases where we expect the pick to be |
| // delegated to the default picker. |
| if err := verifySubConn(gotResult.SubConn, defaultPicker.id); err != nil { |
| t.Fatal(err) |
| } |
| }) |
| } |
| } |
| |
| // TestPick_DataCacheMiss_PendingCacheMiss verifies different Pick scenarios |
| // where the entry is not found in the data cache, but there is a entry in the |
| // pending cache. For all of these scenarios, no new RLS request will be sent. |
| func TestPick_DataCacheMiss_PendingCacheHit(t *testing.T) { |
| const ( |
| rpcPath = "/gFoo/method" |
| wantKeyMapStr = "k1=v1" |
| ) |
| kbm, err := initKeyBuilderMap() |
| if err != nil { |
| t.Fatalf("Failed to create keyBuilderMap: %v", err) |
| } |
| md := metadata.New(map[string]string{"n1": "v1", "n3": "v3"}) |
| wantKey := cache.Key{Path: rpcPath, KeyMap: wantKeyMapStr} |
| |
| tests := []struct { |
| desc string |
| defaultPickExists bool |
| }{ |
| { |
| desc: "default pick exists", |
| defaultPickExists: true, |
| }, |
| { |
| desc: "default pick does not exists", |
| }, |
| } |
| for _, test := range tests { |
| t.Run(test.desc, func(t *testing.T) { |
| rlsCh := testutils.NewChannel() |
| p := rlsPicker{ |
| kbm: kbm, |
| // Cache lookup fails, pending entry exists. |
| readCache: func(key cache.Key) (*cache.Entry, bool) { |
| if !cmp.Equal(key, wantKey) { |
| t.Fatalf("cache lookup using cacheKey %v, want %v", key, wantKey) |
| } |
| return nil, true |
| }, |
| // Never throttle. We do not expect an RLS request to be sent out anyways. |
| shouldThrottle: func() bool { return false }, |
| startRLS: func(_ string, _ keys.KeyMap) { |
| rlsCh.Send(nil) |
| }, |
| } |
| if test.defaultPickExists { |
| p.defaultPick = func(info balancer.PickInfo) (balancer.PickResult, error) { |
| // We do not expect the default picker to be invoked at all. |
| // So, if we get here, the test will fail, because it |
| // expects the pick to be queued. |
| return balancer.PickResult{}, nil |
| } |
| } |
| |
| if _, err := p.Pick(balancer.PickInfo{ |
| FullMethodName: rpcPath, |
| Ctx: metadata.NewOutgoingContext(context.Background(), md), |
| }); err != balancer.ErrNoSubConnAvailable { |
| t.Fatalf("Pick() returned error {%v}, want {%v}", err, balancer.ErrNoSubConnAvailable) |
| } |
| |
| // Make sure that no RLS request was sent out. |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := rlsCh.Receive(ctx); err != context.DeadlineExceeded { |
| t.Fatalf("RLS request sent out when pending entry exists") |
| } |
| }) |
| } |
| } |
| |
| // TestPick_DataCacheHit_PendingCacheMiss verifies different Pick scenarios |
| // where the entry is found in the data cache, and there is no entry in the |
| // pending cache. This includes cases where the entry in the data cache is |
| // stale, expired or in backoff. |
| func TestPick_DataCacheHit_PendingCacheMiss(t *testing.T) { |
| const ( |
| rpcPath = "/gFoo/method" |
| wantKeyMapStr = "k1=v1" |
| ) |
| kbm, err := initKeyBuilderMap() |
| if err != nil { |
| t.Fatalf("Failed to create keyBuilderMap: %v", err) |
| } |
| md := metadata.New(map[string]string{"n1": "v1", "n3": "v3"}) |
| wantKey := cache.Key{Path: rpcPath, KeyMap: wantKeyMapStr} |
| rlsLastErr := errors.New("last RLS request failed") |
| |
| tests := []struct { |
| desc string |
| // The cache entry, as returned by the overridden readCache hook. |
| cacheEntry *cache.Entry |
| // Whether or not a default target is configured. |
| defaultPickExists bool |
| // Whether or not the RLS request should be throttled. |
| throttle bool |
| // Whether or not the test is expected to make a new RLS request. |
| wantRLSRequest bool |
| // Whether or not the rlsPicker should delegate to the child picker. |
| wantChildPick bool |
| // Whether or not the rlsPicker should delegate to the default picker. |
| wantDefaultPick bool |
| // Expected error returned by the rlsPicker under test. |
| wantErr error |
| }{ |
| { |
| desc: "valid entry", |
| cacheEntry: &cache.Entry{ |
| ExpiryTime: time.Now().Add(defaultTestMaxAge), |
| StaleTime: time.Now().Add(defaultTestMaxAge), |
| }, |
| wantChildPick: true, |
| }, |
| { |
| desc: "entryStale_requestThrottled", |
| cacheEntry: &cache.Entry{ExpiryTime: time.Now().Add(defaultTestMaxAge)}, |
| throttle: true, |
| wantChildPick: true, |
| }, |
| { |
| desc: "entryStale_requestNotThrottled", |
| cacheEntry: &cache.Entry{ExpiryTime: time.Now().Add(defaultTestMaxAge)}, |
| wantRLSRequest: true, |
| wantChildPick: true, |
| }, |
| { |
| desc: "entryExpired_requestThrottled_defaultPickExists", |
| cacheEntry: &cache.Entry{}, |
| throttle: true, |
| defaultPickExists: true, |
| wantDefaultPick: true, |
| }, |
| { |
| desc: "entryExpired_requestThrottled_defaultPickNotExists", |
| cacheEntry: &cache.Entry{}, |
| throttle: true, |
| wantErr: errRLSThrottled, |
| }, |
| { |
| desc: "entryExpired_requestNotThrottled", |
| cacheEntry: &cache.Entry{}, |
| wantRLSRequest: true, |
| wantErr: balancer.ErrNoSubConnAvailable, |
| }, |
| { |
| desc: "entryExpired_backoffNotExpired_defaultPickExists", |
| cacheEntry: &cache.Entry{ |
| BackoffTime: time.Now().Add(defaultTestMaxAge), |
| CallStatus: rlsLastErr, |
| }, |
| defaultPickExists: true, |
| }, |
| { |
| desc: "entryExpired_backoffNotExpired_defaultPickNotExists", |
| cacheEntry: &cache.Entry{ |
| BackoffTime: time.Now().Add(defaultTestMaxAge), |
| CallStatus: rlsLastErr, |
| }, |
| wantErr: rlsLastErr, |
| }, |
| } |
| |
| for _, test := range tests { |
| t.Run(test.desc, func(t *testing.T) { |
| rlsCh := testutils.NewChannel() |
| childPicker := newFakePicker() |
| defaultPicker := newFakePicker() |
| |
| p := rlsPicker{ |
| kbm: kbm, |
| readCache: func(key cache.Key) (*cache.Entry, bool) { |
| if !cmp.Equal(key, wantKey) { |
| t.Fatalf("cache lookup using cacheKey %v, want %v", key, wantKey) |
| } |
| test.cacheEntry.ChildPicker = childPicker |
| return test.cacheEntry, false |
| }, |
| shouldThrottle: func() bool { return test.throttle }, |
| startRLS: func(path string, km keys.KeyMap) { |
| if !test.wantRLSRequest { |
| rlsCh.Send(errors.New("RLS request attempted when none was expected")) |
| return |
| } |
| if path != rpcPath { |
| rlsCh.Send(fmt.Errorf("RLS request initiated for rpcPath %s, want %s", path, rpcPath)) |
| return |
| } |
| if km.Str != wantKeyMapStr { |
| rlsCh.Send(fmt.Errorf("RLS request initiated with keys %v, want %v", km.Str, wantKeyMapStr)) |
| return |
| } |
| rlsCh.Send(nil) |
| }, |
| } |
| if test.defaultPickExists { |
| p.defaultPick = defaultPicker.Pick |
| } |
| |
| gotResult, err := p.Pick(balancer.PickInfo{ |
| FullMethodName: rpcPath, |
| Ctx: metadata.NewOutgoingContext(context.Background(), md), |
| }) |
| if err != test.wantErr { |
| t.Fatalf("Pick() returned error {%v}, want {%v}", err, test.wantErr) |
| } |
| // If the test specified that a new RLS request should be made, |
| // verify it. |
| if test.wantRLSRequest { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if rlsErr, err := rlsCh.Receive(ctx); err != nil || rlsErr != nil { |
| t.Fatalf("startRLS() = %v, error receiving from channel: %v", rlsErr, err) |
| } |
| } |
| if test.wantErr != nil { |
| return |
| } |
| |
| // We get here only for cases where we expect the pick to be |
| // delegated to the child picker or the default picker. |
| if test.wantChildPick { |
| if err := verifySubConn(gotResult.SubConn, childPicker.id); err != nil { |
| t.Fatal(err) |
| } |
| |
| } |
| if test.wantDefaultPick { |
| if err := verifySubConn(gotResult.SubConn, defaultPicker.id); err != nil { |
| t.Fatal(err) |
| } |
| } |
| }) |
| } |
| } |
| |
| // TestPick_DataCacheHit_PendingCacheHit verifies different Pick scenarios where |
| // the entry is found both in the data cache and in the pending cache. This |
| // mostly verifies cases where the entry is stale, but there is already a |
| // pending RLS request, so no new request should be sent out. |
| func TestPick_DataCacheHit_PendingCacheHit(t *testing.T) { |
| const ( |
| rpcPath = "/gFoo/method" |
| wantKeyMapStr = "k1=v1" |
| ) |
| kbm, err := initKeyBuilderMap() |
| if err != nil { |
| t.Fatalf("Failed to create keyBuilderMap: %v", err) |
| } |
| md := metadata.New(map[string]string{"n1": "v1", "n3": "v3"}) |
| wantKey := cache.Key{Path: rpcPath, KeyMap: wantKeyMapStr} |
| |
| tests := []struct { |
| desc string |
| // The cache entry, as returned by the overridden readCache hook. |
| cacheEntry *cache.Entry |
| // Whether or not a default target is configured. |
| defaultPickExists bool |
| // Expected error returned by the rlsPicker under test. |
| wantErr error |
| }{ |
| { |
| desc: "stale entry", |
| cacheEntry: &cache.Entry{ExpiryTime: time.Now().Add(defaultTestMaxAge)}, |
| }, |
| { |
| desc: "stale entry with default picker", |
| cacheEntry: &cache.Entry{ExpiryTime: time.Now().Add(defaultTestMaxAge)}, |
| defaultPickExists: true, |
| }, |
| { |
| desc: "entryExpired_defaultPickExists", |
| cacheEntry: &cache.Entry{}, |
| defaultPickExists: true, |
| wantErr: balancer.ErrNoSubConnAvailable, |
| }, |
| { |
| desc: "entryExpired_defaultPickNotExists", |
| cacheEntry: &cache.Entry{}, |
| wantErr: balancer.ErrNoSubConnAvailable, |
| }, |
| } |
| for _, test := range tests { |
| t.Run(test.desc, func(t *testing.T) { |
| rlsCh := testutils.NewChannel() |
| childPicker := newFakePicker() |
| |
| p := rlsPicker{ |
| kbm: kbm, |
| readCache: func(key cache.Key) (*cache.Entry, bool) { |
| if !cmp.Equal(key, wantKey) { |
| t.Fatalf("cache lookup using cacheKey %v, want %v", key, wantKey) |
| } |
| test.cacheEntry.ChildPicker = childPicker |
| return test.cacheEntry, true |
| }, |
| // Never throttle. We do not expect an RLS request to be sent out anyways. |
| shouldThrottle: func() bool { return false }, |
| startRLS: func(path string, km keys.KeyMap) { |
| rlsCh.Send(nil) |
| }, |
| } |
| if test.defaultPickExists { |
| p.defaultPick = func(info balancer.PickInfo) (balancer.PickResult, error) { |
| // We do not expect the default picker to be invoked at all. |
| // So, if we get here, we return an error. |
| return balancer.PickResult{}, errors.New("default picker invoked when expecting a child pick") |
| } |
| } |
| |
| gotResult, err := p.Pick(balancer.PickInfo{ |
| FullMethodName: rpcPath, |
| Ctx: metadata.NewOutgoingContext(context.Background(), md), |
| }) |
| if err != test.wantErr { |
| t.Fatalf("Pick() returned error {%v}, want {%v}", err, test.wantErr) |
| } |
| // Make sure that no RLS request was sent out. |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := rlsCh.Receive(ctx); err != context.DeadlineExceeded { |
| t.Fatalf("RLS request sent out when pending entry exists") |
| } |
| if test.wantErr != nil { |
| return |
| } |
| |
| // We get here only for cases where we expect the pick to be |
| // delegated to the child picker. |
| if err := verifySubConn(gotResult.SubConn, childPicker.id); err != nil { |
| t.Fatal(err) |
| } |
| }) |
| } |
| } |