| /* |
| * |
| * Copyright 2021 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package clusterresolver |
| |
| import ( |
| "context" |
| "fmt" |
| "testing" |
| |
| "github.com/google/go-cmp/cmp" |
| "google.golang.org/grpc/internal/testutils" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/resolver/manual" |
| xdstestutils "google.golang.org/grpc/xds/internal/testutils" |
| "google.golang.org/grpc/xds/internal/testutils/fakeclient" |
| "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" |
| ) |
| |
| const ( |
| testDNSTarget = "dns.com" |
| ) |
| |
| var ( |
| testEDSUpdates []xdsresource.EndpointsUpdate |
| ) |
| |
| func init() { |
| clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) |
| clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) |
| testEDSUpdates = append(testEDSUpdates, parseEDSRespProtoForTesting(clab1.Build())) |
| clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) |
| clab2.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) |
| testEDSUpdates = append(testEDSUpdates, parseEDSRespProtoForTesting(clab2.Build())) |
| } |
| |
| // Test the simple case with one EDS resource to watch. |
| func (s) TestResourceResolverOneEDSResource(t *testing.T) { |
| for _, test := range []struct { |
| name string |
| clusterName, edsName string |
| wantName string |
| edsUpdate xdsresource.EndpointsUpdate |
| want []priorityConfig |
| }{ |
| {name: "watch EDS", |
| clusterName: testClusterName, |
| edsName: testEDSService, |
| wantName: testEDSService, |
| edsUpdate: testEDSUpdates[0], |
| want: []priorityConfig{{ |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| EDSServiceName: testEDSService, |
| }, |
| edsResp: testEDSUpdates[0], |
| childNameGen: newNameGenerator(0), |
| }}, |
| }, |
| { |
| name: "watch EDS no EDS name", // Will watch for cluster name. |
| clusterName: testClusterName, |
| wantName: testClusterName, |
| edsUpdate: testEDSUpdates[1], |
| want: []priorityConfig{{ |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| }, |
| edsResp: testEDSUpdates[1], |
| childNameGen: newNameGenerator(0), |
| }}, |
| }, |
| } { |
| t.Run(test.name, func(t *testing.T) { |
| fakeClient := fakeclient.NewClient() |
| rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) |
| rr.updateMechanisms([]DiscoveryMechanism{{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: test.clusterName, |
| EDSServiceName: test.edsName, |
| }}) |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| gotEDSName, err := fakeClient.WaitForWatchEDS(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) |
| } |
| if gotEDSName != test.wantName { |
| t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName, test.wantName) |
| } |
| |
| // Invoke callback, should get an update. |
| fakeClient.InvokeWatchEDSCallback("", test.edsUpdate, nil) |
| select { |
| case u := <-rr.updateChannel: |
| if diff := cmp.Diff(u.priorities, test.want, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" { |
| t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| // Close the resource resolver. Should stop EDS watch. |
| rr.stop() |
| edsNameCanceled, err := fakeClient.WaitForCancelEDSWatch(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) |
| } |
| if edsNameCanceled != test.wantName { |
| t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled, testEDSService) |
| } |
| }) |
| } |
| } |
| |
| func setupDNS() (chan resolver.Target, chan struct{}, chan resolver.ResolveNowOptions, *manual.Resolver, func()) { |
| dnsTargetCh := make(chan resolver.Target, 1) |
| dnsCloseCh := make(chan struct{}, 1) |
| resolveNowCh := make(chan resolver.ResolveNowOptions, 1) |
| |
| mr := manual.NewBuilderWithScheme("dns") |
| mr.BuildCallback = func(target resolver.Target, _ resolver.ClientConn, _ resolver.BuildOptions) { dnsTargetCh <- target } |
| mr.CloseCallback = func() { dnsCloseCh <- struct{}{} } |
| mr.ResolveNowCallback = func(opts resolver.ResolveNowOptions) { resolveNowCh <- opts } |
| oldNewDNS := newDNS |
| newDNS = func(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { |
| return mr.Build(target, cc, opts) |
| } |
| return dnsTargetCh, dnsCloseCh, resolveNowCh, mr, func() { newDNS = oldNewDNS } |
| } |
| |
| // Test the simple case of one DNS resolver. |
| func (s) TestResourceResolverOneDNSResource(t *testing.T) { |
| for _, test := range []struct { |
| name string |
| target string |
| wantTarget resolver.Target |
| addrs []resolver.Address |
| want []priorityConfig |
| }{ |
| { |
| name: "watch DNS", |
| target: testDNSTarget, |
| wantTarget: resolver.Target{Scheme: "dns", URL: *testutils.MustParseURL("dns:///" + testDNSTarget)}, |
| addrs: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}, |
| want: []priorityConfig{{ |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeLogicalDNS, |
| DNSHostname: testDNSTarget, |
| }, |
| addresses: []string{"1.1.1.1", "2.2.2.2"}, |
| childNameGen: newNameGenerator(0), |
| }}, |
| }, |
| } { |
| t.Run(test.name, func(t *testing.T) { |
| dnsTargetCh, dnsCloseCh, _, dnsR, cleanup := setupDNS() |
| defer cleanup() |
| fakeClient := fakeclient.NewClient() |
| rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) |
| rr.updateMechanisms([]DiscoveryMechanism{{ |
| Type: DiscoveryMechanismTypeLogicalDNS, |
| DNSHostname: test.target, |
| }}) |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| select { |
| case target := <-dnsTargetCh: |
| if diff := cmp.Diff(target, test.wantTarget); diff != "" { |
| t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for building DNS resolver") |
| } |
| |
| // Invoke callback, should get an update. |
| dnsR.UpdateState(resolver.State{Addresses: test.addrs}) |
| select { |
| case u := <-rr.updateChannel: |
| if diff := cmp.Diff(u.priorities, test.want, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" { |
| t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| // Close the resource resolver. Should close the underlying resolver. |
| rr.stop() |
| select { |
| case <-dnsCloseCh: |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for closing DNS resolver") |
| } |
| }) |
| } |
| } |
| |
| // Test that changing EDS name would cause a cancel and a new watch. |
| // |
| // Also, changes that don't actually change EDS names (e.g. changing cluster |
| // name but not service name, or change circuit breaking count) doesn't do |
| // anything. |
| // |
| // - update DiscoveryMechanism |
| // - same EDS name to watch, but different MaxCurrentCount: no new watch |
| // - different cluster name, but same EDS name: no new watch |
| func (s) TestResourceResolverChangeEDSName(t *testing.T) { |
| fakeClient := fakeclient.NewClient() |
| rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) |
| rr.updateMechanisms([]DiscoveryMechanism{{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| EDSServiceName: testEDSService, |
| }}) |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| gotEDSName1, err := fakeClient.WaitForWatchEDS(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) |
| } |
| if gotEDSName1 != testEDSService { |
| t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testEDSService) |
| } |
| |
| // Invoke callback, should get an update. |
| fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) |
| select { |
| case u := <-rr.updateChannel: |
| if diff := cmp.Diff(u.priorities, []priorityConfig{{ |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| EDSServiceName: testEDSService, |
| }, |
| edsResp: testEDSUpdates[0], |
| childNameGen: newNameGenerator(0), |
| }}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" { |
| t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| |
| // Change name to watch. |
| rr.updateMechanisms([]DiscoveryMechanism{{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| }}) |
| edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) |
| } |
| if edsNameCanceled1 != gotEDSName1 { |
| t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled1, testEDSService) |
| } |
| gotEDSName2, err := fakeClient.WaitForWatchEDS(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) |
| } |
| if gotEDSName2 != testClusterName { |
| t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName2, testClusterName) |
| } |
| // Shouldn't get any update, because the new resource hasn't received any |
| // update. |
| shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) |
| defer shortCancel() |
| select { |
| case u := <-rr.updateChannel: |
| t.Fatalf("get unexpected update %+v", u) |
| case <-shortCtx.Done(): |
| } |
| |
| // Invoke callback, should get an update. |
| fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil) |
| select { |
| case u := <-rr.updateChannel: |
| if diff := cmp.Diff(u.priorities, []priorityConfig{{ |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| }, |
| edsResp: testEDSUpdates[1], |
| childNameGen: newNameGenerator(1), |
| }}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" { |
| t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| |
| // Change circuit breaking count, should get an update with new circuit |
| // breaking count, but shouldn't trigger new watch. |
| rr.updateMechanisms([]DiscoveryMechanism{{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| MaxConcurrentRequests: newUint32(123), |
| }}) |
| shortCtx, shortCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) |
| defer shortCancel() |
| if n, err := fakeClient.WaitForWatchEDS(shortCtx); err == nil { |
| t.Fatalf("unexpected watch started for EDS: %v", n) |
| } |
| select { |
| case u := <-rr.updateChannel: |
| if diff := cmp.Diff(u.priorities, []priorityConfig{{ |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| MaxConcurrentRequests: newUint32(123), |
| }, |
| edsResp: testEDSUpdates[1], |
| childNameGen: newNameGenerator(1), |
| }}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" { |
| t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| |
| // Close the resource resolver. Should stop EDS watch. |
| rr.stop() |
| edsNameCanceled, err := fakeClient.WaitForCancelEDSWatch(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) |
| } |
| if edsNameCanceled != gotEDSName2 { |
| t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled, gotEDSName2) |
| } |
| } |
| |
| // Test the case that same resources with the same priority should not add new |
| // EDS watch, and also should not trigger an update. |
| func (s) TestResourceResolverNoChangeNoUpdate(t *testing.T) { |
| fakeClient := fakeclient.NewClient() |
| rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) |
| rr.updateMechanisms([]DiscoveryMechanism{ |
| { |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[0], |
| }, |
| { |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[1], |
| MaxConcurrentRequests: newUint32(100), |
| }, |
| }) |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| gotEDSName1, err := fakeClient.WaitForWatchEDS(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) |
| } |
| if gotEDSName1 != testClusterNames[0] { |
| t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testClusterNames[0]) |
| } |
| gotEDSName2, err := fakeClient.WaitForWatchEDS(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) |
| } |
| if gotEDSName2 != testClusterNames[1] { |
| t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName2, testClusterNames[1]) |
| } |
| |
| // Invoke callback, should get an update. |
| fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) |
| // Shouldn't send update, because only one resource received an update. |
| shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) |
| defer shortCancel() |
| select { |
| case u := <-rr.updateChannel: |
| t.Fatalf("get unexpected update %+v", u) |
| case <-shortCtx.Done(): |
| } |
| fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil) |
| select { |
| case u := <-rr.updateChannel: |
| if diff := cmp.Diff(u.priorities, []priorityConfig{ |
| { |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[0], |
| }, |
| edsResp: testEDSUpdates[0], |
| childNameGen: newNameGenerator(0), |
| }, |
| { |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[1], |
| MaxConcurrentRequests: newUint32(100), |
| }, |
| edsResp: testEDSUpdates[1], |
| childNameGen: newNameGenerator(1), |
| }, |
| }, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" { |
| t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| |
| // Send the same resources with the same priorities, shouldn't any change. |
| rr.updateMechanisms([]DiscoveryMechanism{ |
| { |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[0], |
| }, |
| { |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[1], |
| MaxConcurrentRequests: newUint32(100), |
| }, |
| }) |
| shortCtx, shortCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) |
| defer shortCancel() |
| if n, err := fakeClient.WaitForWatchEDS(shortCtx); err == nil { |
| t.Fatalf("unexpected watch started for EDS: %v", n) |
| } |
| shortCtx, shortCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) |
| defer shortCancel() |
| select { |
| case u := <-rr.updateChannel: |
| t.Fatalf("unexpected update: %+v", u) |
| case <-shortCtx.Done(): |
| } |
| |
| // Close the resource resolver. Should stop EDS watch. |
| rr.stop() |
| edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) |
| } |
| if edsNameCanceled1 != gotEDSName1 && edsNameCanceled1 != gotEDSName2 { |
| t.Fatalf("xdsClient.CancelEDS called for %v, want: %v or %v", edsNameCanceled1, gotEDSName1, gotEDSName2) |
| } |
| edsNameCanceled2, err := fakeClient.WaitForCancelEDSWatch(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) |
| } |
| if edsNameCanceled2 != gotEDSName2 && edsNameCanceled2 != gotEDSName1 { |
| t.Fatalf("xdsClient.CancelEDS called for %v, want: %v or %v", edsNameCanceled2, gotEDSName1, gotEDSName2) |
| } |
| } |
| |
| // Test the case that same resources are watched, but with different priority. |
| // Should not add new EDS watch, but should trigger an update with the new |
| // priorities. |
| func (s) TestResourceResolverChangePriority(t *testing.T) { |
| fakeClient := fakeclient.NewClient() |
| rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) |
| rr.updateMechanisms([]DiscoveryMechanism{ |
| { |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[0], |
| }, |
| { |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[1], |
| }, |
| }) |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| gotEDSName1, err := fakeClient.WaitForWatchEDS(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) |
| } |
| if gotEDSName1 != testClusterNames[0] { |
| t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testClusterNames[0]) |
| } |
| gotEDSName2, err := fakeClient.WaitForWatchEDS(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) |
| } |
| if gotEDSName2 != testClusterNames[1] { |
| t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName2, testClusterNames[1]) |
| } |
| |
| // Invoke callback, should get an update. |
| fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) |
| // Shouldn't send update, because only one resource received an update. |
| shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) |
| defer shortCancel() |
| select { |
| case u := <-rr.updateChannel: |
| t.Fatalf("get unexpected update %+v", u) |
| case <-shortCtx.Done(): |
| } |
| fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil) |
| select { |
| case u := <-rr.updateChannel: |
| if diff := cmp.Diff(u.priorities, []priorityConfig{ |
| { |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[0], |
| }, |
| edsResp: testEDSUpdates[0], |
| childNameGen: newNameGenerator(0), |
| }, |
| { |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[1], |
| }, |
| edsResp: testEDSUpdates[1], |
| childNameGen: newNameGenerator(1), |
| }, |
| }, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" { |
| t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| |
| // Send the same resources with different priorities, shouldn't trigger |
| // watch, but should trigger an update with the new priorities. |
| rr.updateMechanisms([]DiscoveryMechanism{ |
| { |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[1], |
| }, |
| { |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[0], |
| }, |
| }) |
| shortCtx, shortCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) |
| defer shortCancel() |
| if n, err := fakeClient.WaitForWatchEDS(shortCtx); err == nil { |
| t.Fatalf("unexpected watch started for EDS: %v", n) |
| } |
| select { |
| case u := <-rr.updateChannel: |
| if diff := cmp.Diff(u.priorities, []priorityConfig{ |
| { |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[1], |
| }, |
| edsResp: testEDSUpdates[1], |
| childNameGen: newNameGenerator(1), |
| }, |
| { |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterNames[0], |
| }, |
| edsResp: testEDSUpdates[0], |
| childNameGen: newNameGenerator(0), |
| }, |
| }, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" { |
| t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| |
| // Close the resource resolver. Should stop EDS watch. |
| rr.stop() |
| edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) |
| } |
| if edsNameCanceled1 != gotEDSName1 && edsNameCanceled1 != gotEDSName2 { |
| t.Fatalf("xdsClient.CancelEDS called for %v, want: %v or %v", edsNameCanceled1, gotEDSName1, gotEDSName2) |
| } |
| edsNameCanceled2, err := fakeClient.WaitForCancelEDSWatch(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) |
| } |
| if edsNameCanceled2 != gotEDSName2 && edsNameCanceled2 != gotEDSName1 { |
| t.Fatalf("xdsClient.CancelEDS called for %v, want: %v or %v", edsNameCanceled2, gotEDSName1, gotEDSName2) |
| } |
| } |
| |
| // Test the case that covers resource for both EDS and DNS. |
| func (s) TestResourceResolverEDSAndDNS(t *testing.T) { |
| dnsTargetCh, dnsCloseCh, _, dnsR, cleanup := setupDNS() |
| defer cleanup() |
| fakeClient := fakeclient.NewClient() |
| rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) |
| rr.updateMechanisms([]DiscoveryMechanism{ |
| { |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| }, |
| { |
| Type: DiscoveryMechanismTypeLogicalDNS, |
| DNSHostname: testDNSTarget, |
| }, |
| }) |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| gotEDSName1, err := fakeClient.WaitForWatchEDS(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) |
| } |
| if gotEDSName1 != testClusterName { |
| t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testClusterName) |
| } |
| select { |
| case target := <-dnsTargetCh: |
| if diff := cmp.Diff(target, resolver.Target{Scheme: "dns", URL: *testutils.MustParseURL("dns:///" + testDNSTarget)}); diff != "" { |
| t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for building DNS resolver") |
| } |
| |
| fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) |
| // Shouldn't send update, because only one resource received an update. |
| shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) |
| defer shortCancel() |
| select { |
| case u := <-rr.updateChannel: |
| t.Fatalf("get unexpected update %+v", u) |
| case <-shortCtx.Done(): |
| } |
| // Invoke DNS, should get an update. |
| dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}}) |
| select { |
| case u := <-rr.updateChannel: |
| if diff := cmp.Diff(u.priorities, []priorityConfig{ |
| { |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| }, |
| edsResp: testEDSUpdates[0], |
| childNameGen: newNameGenerator(0), |
| }, |
| { |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeLogicalDNS, |
| DNSHostname: testDNSTarget, |
| }, |
| addresses: []string{"1.1.1.1", "2.2.2.2"}, |
| childNameGen: newNameGenerator(1), |
| }, |
| }, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" { |
| t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| |
| // Close the resource resolver. Should stop EDS watch. |
| rr.stop() |
| edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) |
| } |
| if edsNameCanceled1 != gotEDSName1 { |
| t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled1, gotEDSName1) |
| } |
| select { |
| case <-dnsCloseCh: |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for closing DNS resolver") |
| } |
| } |
| |
| // Test the case that covers resource changing between EDS and DNS. |
| func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) { |
| dnsTargetCh, dnsCloseCh, _, dnsR, cleanup := setupDNS() |
| defer cleanup() |
| fakeClient := fakeclient.NewClient() |
| rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) |
| rr.updateMechanisms([]DiscoveryMechanism{{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| }}) |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| gotEDSName1, err := fakeClient.WaitForWatchEDS(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) |
| } |
| if gotEDSName1 != testClusterName { |
| t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testClusterName) |
| } |
| |
| // Invoke callback, should get an update. |
| fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) |
| select { |
| case u := <-rr.updateChannel: |
| if diff := cmp.Diff(u.priorities, []priorityConfig{{ |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| }, |
| edsResp: testEDSUpdates[0], |
| childNameGen: newNameGenerator(0), |
| }}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" { |
| t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| |
| // Update to watch DNS instead. Should cancel EDS, and start DNS. |
| rr.updateMechanisms([]DiscoveryMechanism{{ |
| Type: DiscoveryMechanismTypeLogicalDNS, |
| DNSHostname: testDNSTarget, |
| }}) |
| select { |
| case target := <-dnsTargetCh: |
| if diff := cmp.Diff(target, resolver.Target{Scheme: "dns", URL: *testutils.MustParseURL("dns:///" + testDNSTarget)}); diff != "" { |
| t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for building DNS resolver") |
| } |
| edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) |
| } |
| if edsNameCanceled1 != gotEDSName1 { |
| t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled1, gotEDSName1) |
| } |
| |
| dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}}) |
| select { |
| case u := <-rr.updateChannel: |
| if diff := cmp.Diff(u.priorities, []priorityConfig{{ |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeLogicalDNS, |
| DNSHostname: testDNSTarget, |
| }, |
| addresses: []string{"1.1.1.1", "2.2.2.2"}, |
| childNameGen: newNameGenerator(1), |
| }}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" { |
| t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| |
| // Close the resource resolver. Should stop DNS. |
| rr.stop() |
| select { |
| case <-dnsCloseCh: |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for closing DNS resolver") |
| } |
| } |
| |
| // Test the case that covers errors for both EDS and DNS. |
| func (s) TestResourceResolverError(t *testing.T) { |
| dnsTargetCh, dnsCloseCh, _, dnsR, cleanup := setupDNS() |
| defer cleanup() |
| fakeClient := fakeclient.NewClient() |
| rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) |
| rr.updateMechanisms([]DiscoveryMechanism{ |
| { |
| Type: DiscoveryMechanismTypeEDS, |
| Cluster: testClusterName, |
| }, |
| { |
| Type: DiscoveryMechanismTypeLogicalDNS, |
| DNSHostname: testDNSTarget, |
| }, |
| }) |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| gotEDSName1, err := fakeClient.WaitForWatchEDS(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) |
| } |
| if gotEDSName1 != testClusterName { |
| t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testClusterName) |
| } |
| select { |
| case target := <-dnsTargetCh: |
| if diff := cmp.Diff(target, resolver.Target{Scheme: "dns", URL: *testutils.MustParseURL("dns:///" + testDNSTarget)}); diff != "" { |
| t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for building DNS resolver") |
| } |
| |
| // Invoke callback with an error, should get an update. |
| edsErr := fmt.Errorf("EDS error") |
| fakeClient.InvokeWatchEDSCallback(gotEDSName1, xdsresource.EndpointsUpdate{}, edsErr) |
| select { |
| case u := <-rr.updateChannel: |
| if u.err != edsErr { |
| t.Fatalf("got unexpected error from update, want %v, got %v", edsErr, u.err) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| |
| // Invoke DNS with an error, should get an update. |
| dnsErr := fmt.Errorf("DNS error") |
| dnsR.ReportError(dnsErr) |
| select { |
| case u := <-rr.updateChannel: |
| if u.err != dnsErr { |
| t.Fatalf("got unexpected error from update, want %v, got %v", dnsErr, u.err) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| |
| // Close the resource resolver. Should stop EDS watch. |
| rr.stop() |
| edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) |
| } |
| if edsNameCanceled1 != gotEDSName1 { |
| t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled1, gotEDSName1) |
| } |
| select { |
| case <-dnsCloseCh: |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for closing DNS resolver") |
| } |
| } |
| |
| // Test re-resolve of the DNS resolver. |
| func (s) TestResourceResolverDNSResolveNow(t *testing.T) { |
| dnsTargetCh, dnsCloseCh, resolveNowCh, dnsR, cleanup := setupDNS() |
| defer cleanup() |
| fakeClient := fakeclient.NewClient() |
| rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) |
| rr.updateMechanisms([]DiscoveryMechanism{{ |
| Type: DiscoveryMechanismTypeLogicalDNS, |
| DNSHostname: testDNSTarget, |
| }}) |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| select { |
| case target := <-dnsTargetCh: |
| if diff := cmp.Diff(target, resolver.Target{Scheme: "dns", URL: *testutils.MustParseURL("dns:///" + testDNSTarget)}); diff != "" { |
| t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for building DNS resolver") |
| } |
| |
| // Invoke callback, should get an update. |
| dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}}) |
| select { |
| case u := <-rr.updateChannel: |
| if diff := cmp.Diff(u.priorities, []priorityConfig{{ |
| mechanism: DiscoveryMechanism{ |
| Type: DiscoveryMechanismTypeLogicalDNS, |
| DNSHostname: testDNSTarget, |
| }, |
| addresses: []string{"1.1.1.1", "2.2.2.2"}, |
| childNameGen: newNameGenerator(0), |
| }}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" { |
| t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) |
| } |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for update from update channel.") |
| } |
| rr.resolveNow() |
| select { |
| case <-resolveNowCh: |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for re-resolve") |
| } |
| // Close the resource resolver. Should close the underlying resolver. |
| rr.stop() |
| select { |
| case <-dnsCloseCh: |
| case <-ctx.Done(): |
| t.Fatal("Timed out waiting for closing DNS resolver") |
| } |
| } |