| /* |
| * Copyright 2019 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package cdsbalancer |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "testing" |
| "time" |
| |
| "github.com/google/go-cmp/cmp" |
| "github.com/google/go-cmp/cmp/cmpopts" |
| "google.golang.org/grpc/attributes" |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/internal" |
| "google.golang.org/grpc/internal/grpctest" |
| "google.golang.org/grpc/internal/testutils" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/serviceconfig" |
| xdsinternal "google.golang.org/grpc/xds/internal" |
| "google.golang.org/grpc/xds/internal/balancer/edsbalancer" |
| xdsclient "google.golang.org/grpc/xds/internal/client" |
| "google.golang.org/grpc/xds/internal/testutils/fakeclient" |
| ) |
| |
| const ( |
| clusterName = "cluster1" |
| serviceName = "service1" |
| defaultTestTimeout = 1 * time.Second |
| ) |
| |
| type s struct { |
| grpctest.Tester |
| } |
| |
| func Test(t *testing.T) { |
| grpctest.RunSubTests(t, s{}) |
| } |
| |
| type testClientConn struct { |
| balancer.ClientConn |
| |
| newPickerCh *testutils.Channel // The last picker updated. |
| } |
| |
| func newTestClientConn() *testClientConn { |
| return &testClientConn{ |
| newPickerCh: testutils.NewChannelWithSize(1), |
| } |
| } |
| |
| func (tcc *testClientConn) UpdateState(bs balancer.State) { |
| tcc.newPickerCh.Replace(bs) |
| } |
| |
| // cdsWatchInfo wraps the update and the error sent in a CDS watch callback. |
| type cdsWatchInfo struct { |
| update xdsclient.ClusterUpdate |
| err error |
| } |
| |
| // invokeWatchCb invokes the CDS watch callback registered by the cdsBalancer |
| // and waits for appropriate state to be pushed to the provided edsBalancer. |
| func invokeWatchCbAndWait(xdsC *fakeclient.Client, cdsW cdsWatchInfo, wantCCS balancer.ClientConnState, edsB *testEDSBalancer) error { |
| xdsC.InvokeWatchClusterCallback(cdsW.update, cdsW.err) |
| if cdsW.err != nil { |
| return edsB.waitForResolverError(cdsW.err) |
| } |
| return edsB.waitForClientConnUpdate(wantCCS) |
| } |
| |
| // testEDSBalancer is a fake edsBalancer used to verify different actions from |
| // the cdsBalancer. It contains a bunch of channels to signal different events |
| // to the test. |
| type testEDSBalancer struct { |
| // ccsCh is a channel used to signal the receipt of a ClientConn update. |
| ccsCh *testutils.Channel |
| // scStateCh is a channel used to signal the receipt of a SubConn update. |
| scStateCh *testutils.Channel |
| // resolverErrCh is a channel used to signal a resolver error. |
| resolverErrCh *testutils.Channel |
| // closeCh is a channel used to signal the closing of this balancer. |
| closeCh *testutils.Channel |
| } |
| |
| type subConnWithState struct { |
| sc balancer.SubConn |
| state balancer.SubConnState |
| } |
| |
| func newTestEDSBalancer() *testEDSBalancer { |
| return &testEDSBalancer{ |
| ccsCh: testutils.NewChannel(), |
| scStateCh: testutils.NewChannel(), |
| resolverErrCh: testutils.NewChannel(), |
| closeCh: testutils.NewChannel(), |
| } |
| } |
| |
| func (tb *testEDSBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { |
| tb.ccsCh.Send(ccs) |
| return nil |
| } |
| |
| func (tb *testEDSBalancer) ResolverError(err error) { |
| tb.resolverErrCh.Send(err) |
| } |
| |
| func (tb *testEDSBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { |
| tb.scStateCh.Send(subConnWithState{sc: sc, state: state}) |
| } |
| |
| func (tb *testEDSBalancer) Close() { |
| tb.closeCh.Send(struct{}{}) |
| } |
| |
| // waitForClientConnUpdate verifies if the testEDSBalancer receives the |
| // provided ClientConnState within a reasonable amount of time. |
| func (tb *testEDSBalancer) waitForClientConnUpdate(wantCCS balancer.ClientConnState) error { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| ccs, err := tb.ccsCh.Receive(ctx) |
| if err != nil { |
| return err |
| } |
| gotCCS := ccs.(balancer.ClientConnState) |
| if !cmp.Equal(gotCCS, wantCCS, cmpopts.IgnoreUnexported(attributes.Attributes{})) { |
| return fmt.Errorf("received ClientConnState: %+v, want %+v", gotCCS, wantCCS) |
| } |
| return nil |
| } |
| |
| // waitForSubConnUpdate verifies if the testEDSBalancer receives the provided |
| // SubConn update within a reasonable amount of time. |
| func (tb *testEDSBalancer) waitForSubConnUpdate(wantSCS subConnWithState) error { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| scs, err := tb.scStateCh.Receive(ctx) |
| if err != nil { |
| return err |
| } |
| gotSCS := scs.(subConnWithState) |
| if !cmp.Equal(gotSCS, wantSCS, cmp.AllowUnexported(subConnWithState{})) { |
| return fmt.Errorf("received SubConnState: %+v, want %+v", gotSCS, wantSCS) |
| } |
| return nil |
| } |
| |
| // waitForResolverError verifies if the testEDSBalancer receives the |
| // provided resolver error within a reasonable amount of time. |
| func (tb *testEDSBalancer) waitForResolverError(wantErr error) error { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| gotErr, err := tb.resolverErrCh.Receive(ctx) |
| if err != nil { |
| return err |
| } |
| if gotErr != wantErr { |
| return fmt.Errorf("received resolver error: %v, want %v", gotErr, wantErr) |
| } |
| return nil |
| } |
| |
| // waitForClose verifies that the edsBalancer is closed with a reasonable |
| // amount of time. |
| func (tb *testEDSBalancer) waitForClose() error { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := tb.closeCh.Receive(ctx); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| // cdsCCS is a helper function to construct a good update passed from the |
| // xdsResolver to the cdsBalancer. |
| func cdsCCS(cluster string, xdsClient interface{}) balancer.ClientConnState { |
| const cdsLBConfig = `{ |
| "loadBalancingConfig":[ |
| { |
| "cds_experimental":{ |
| "Cluster": "%s" |
| } |
| } |
| ] |
| }` |
| jsonSC := fmt.Sprintf(cdsLBConfig, cluster) |
| return balancer.ClientConnState{ |
| ResolverState: resolver.State{ |
| ServiceConfig: internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC), |
| Attributes: attributes.New(xdsinternal.XDSClientID, xdsClient), |
| }, |
| BalancerConfig: &lbConfig{ClusterName: clusterName}, |
| } |
| } |
| |
| // edsCCS is a helper function to construct a good update passed from the |
| // cdsBalancer to the edsBalancer. |
| func edsCCS(service string, enableLRS bool, xdsClient interface{}) balancer.ClientConnState { |
| lbCfg := &edsbalancer.EDSConfig{EDSServiceName: service} |
| if enableLRS { |
| lbCfg.LrsLoadReportingServerName = new(string) |
| } |
| return balancer.ClientConnState{ |
| ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsClient)}, |
| BalancerConfig: lbCfg, |
| } |
| } |
| |
| // setup creates a cdsBalancer and an edsBalancer (and overrides the |
| // newEDSBalancer function to return it), and also returns a cleanup function. |
| func setup() (*cdsBalancer, *testEDSBalancer, *testClientConn, func()) { |
| builder := cdsBB{} |
| tcc := newTestClientConn() |
| cdsB := builder.Build(tcc, balancer.BuildOptions{}) |
| |
| edsB := newTestEDSBalancer() |
| oldEDSBalancerBuilder := newEDSBalancer |
| newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) { |
| return edsB, nil |
| } |
| |
| return cdsB.(*cdsBalancer), edsB, tcc, func() { |
| newEDSBalancer = oldEDSBalancerBuilder |
| } |
| } |
| |
| // setupWithWatch does everything that setup does, and also pushes a ClientConn |
| // update to the cdsBalancer and waits for a CDS watch call to be registered. |
| func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *testClientConn, func()) { |
| t.Helper() |
| |
| xdsC := fakeclient.NewClient() |
| cdsB, edsB, tcc, cancel := setup() |
| if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { |
| t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) |
| } |
| |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| gotCluster, err := xdsC.WaitForWatchCluster(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) |
| } |
| if gotCluster != clusterName { |
| t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, clusterName) |
| } |
| return xdsC, cdsB, edsB, tcc, cancel |
| } |
| |
| // TestUpdateClientConnState invokes the UpdateClientConnState method on the |
| // cdsBalancer with different inputs and verifies that the CDS watch API on the |
| // provided xdsClient is invoked appropriately. |
| func (s) TestUpdateClientConnState(t *testing.T) { |
| xdsC := fakeclient.NewClient() |
| |
| tests := []struct { |
| name string |
| ccs balancer.ClientConnState |
| wantErr error |
| wantCluster string |
| }{ |
| { |
| name: "bad-lbCfg-type", |
| ccs: balancer.ClientConnState{BalancerConfig: nil}, |
| wantErr: balancer.ErrBadResolverState, |
| }, |
| { |
| name: "empty-cluster-in-lbCfg", |
| ccs: balancer.ClientConnState{BalancerConfig: &lbConfig{ClusterName: ""}}, |
| wantErr: balancer.ErrBadResolverState, |
| }, |
| { |
| name: "no-xdsClient-in-attributes", |
| ccs: balancer.ClientConnState{ |
| ResolverState: resolver.State{ |
| Attributes: attributes.New("key", "value"), |
| }, |
| BalancerConfig: &lbConfig{ClusterName: clusterName}, |
| }, |
| wantErr: balancer.ErrBadResolverState, |
| }, |
| { |
| name: "bad-xdsClient-in-attributes", |
| ccs: balancer.ClientConnState{ |
| ResolverState: resolver.State{ |
| Attributes: attributes.New(xdsinternal.XDSClientID, "value"), |
| }, |
| BalancerConfig: &lbConfig{ClusterName: clusterName}, |
| }, |
| wantErr: balancer.ErrBadResolverState, |
| }, |
| { |
| name: "happy-good-case", |
| ccs: cdsCCS(clusterName, xdsC), |
| wantCluster: clusterName, |
| }, |
| } |
| |
| for _, test := range tests { |
| t.Run(test.name, func(t *testing.T) { |
| cdsB, _, _, cancel := setup() |
| defer func() { |
| cancel() |
| cdsB.Close() |
| }() |
| |
| if err := cdsB.UpdateClientConnState(test.ccs); err != test.wantErr { |
| t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) |
| } |
| if test.wantErr != nil { |
| // When we wanted an error and got it, we should return early. |
| return |
| } |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| gotCluster, err := xdsC.WaitForWatchCluster(ctx) |
| if err != nil { |
| t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) |
| } |
| if gotCluster != test.wantCluster { |
| t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, test.wantCluster) |
| } |
| }) |
| } |
| } |
| |
| // TestUpdateClientConnStateAfterClose invokes the UpdateClientConnState method |
| // on the cdsBalancer after close and verifies that it returns an error. |
| func (s) TestUpdateClientConnStateAfterClose(t *testing.T) { |
| cdsB, _, _, cancel := setup() |
| defer cancel() |
| cdsB.Close() |
| |
| if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, fakeclient.NewClient())); err != errBalancerClosed { |
| t.Fatalf("UpdateClientConnState() after close returned %v, want %v", err, errBalancerClosed) |
| } |
| } |
| |
| // TestUpdateClientConnStateWithSameState verifies that a ClientConnState |
| // update with the same cluster and xdsClient does not cause the cdsBalancer to |
| // create a new watch. |
| func (s) TestUpdateClientConnStateWithSameState(t *testing.T) { |
| xdsC, cdsB, _, _, cancel := setupWithWatch(t) |
| defer func() { |
| cancel() |
| cdsB.Close() |
| }() |
| |
| if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { |
| t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) |
| } |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| if _, err := xdsC.WaitForWatchCluster(ctx); err != context.DeadlineExceeded { |
| t.Fatalf("waiting for WatchCluster() should have timed out, but returned error: %v", err) |
| } |
| } |
| |
| // TestHandleClusterUpdate invokes the registered CDS watch callback with |
| // different updates and verifies that the expect ClientConnState is propagated |
| // to the edsBalancer. |
| func (s) TestHandleClusterUpdate(t *testing.T) { |
| xdsC, cdsB, edsB, _, cancel := setupWithWatch(t) |
| defer func() { |
| cancel() |
| cdsB.Close() |
| }() |
| |
| tests := []struct { |
| name string |
| cdsUpdate xdsclient.ClusterUpdate |
| updateErr error |
| wantCCS balancer.ClientConnState |
| }{ |
| { |
| name: "happy-case-with-lrs", |
| cdsUpdate: xdsclient.ClusterUpdate{ServiceName: serviceName, EnableLRS: true}, |
| wantCCS: edsCCS(serviceName, true, xdsC), |
| }, |
| { |
| name: "happy-case-without-lrs", |
| cdsUpdate: xdsclient.ClusterUpdate{ServiceName: serviceName}, |
| wantCCS: edsCCS(serviceName, false, xdsC), |
| }, |
| } |
| |
| for _, test := range tests { |
| t.Run(test.name, func(t *testing.T) { |
| if err := invokeWatchCbAndWait(xdsC, cdsWatchInfo{test.cdsUpdate, test.updateErr}, test.wantCCS, edsB); err != nil { |
| t.Fatal(err) |
| } |
| }) |
| } |
| } |
| |
| // TestHandleClusterUpdateError covers the cases that an error is returned from |
| // the watcher. |
| // |
| // Includes error with and without a child eds balancer, and whether error is a |
| // resource-not-found error. |
| func (s) TestHandleClusterUpdateError(t *testing.T) { |
| xdsC, cdsB, edsB, tcc, cancel := setupWithWatch(t) |
| defer func() { |
| cancel() |
| cdsB.Close() |
| }() |
| |
| // An error before eds balancer is built. Should result in an error picker. |
| // And this is not a resource not found error, watch shouldn't be canceled. |
| err1 := errors.New("cdsBalancer resolver error 1") |
| xdsC.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{}, err1) |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| if err := xdsC.WaitForCancelClusterWatch(ctx); err == nil { |
| t.Fatal("watch was canceled, want not canceled (timeout error)") |
| } |
| if err := edsB.waitForResolverError(err1); err == nil { |
| t.Fatal("eds balancer shouldn't get error (shouldn't be built yet)") |
| } |
| ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| state, err := tcc.newPickerCh.Receive(ctx) |
| if err != nil { |
| t.Fatalf("failed to get picker, expect an error picker") |
| } |
| picker := state.(balancer.State).Picker |
| if _, perr := picker.Pick(balancer.PickInfo{}); perr == nil { |
| t.Fatalf("want picker to always fail, got nil") |
| } |
| |
| cdsUpdate := xdsclient.ClusterUpdate{ServiceName: serviceName} |
| wantCCS := edsCCS(serviceName, false, xdsC) |
| if err := invokeWatchCbAndWait(xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { |
| t.Fatal(err) |
| } |
| |
| // An error after eds balancer is build, eds should receive the error. This |
| // is not a resource not found error, watch shouldn't be canceled |
| err2 := errors.New("cdsBalancer resolver error 2") |
| xdsC.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{}, err2) |
| ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| if err := xdsC.WaitForCancelClusterWatch(ctx); err == nil { |
| t.Fatal("watch was canceled, want not canceled (timeout error)") |
| } |
| if err := edsB.waitForResolverError(err2); err != nil { |
| t.Fatalf("eds balancer should get error, waitForError failed: %v", err) |
| } |
| |
| // A resource not found error. Watch should not be canceled because this |
| // means CDS resource is removed, and eds should receive the error. |
| resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "cdsBalancer resource not found error") |
| xdsC.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{}, resourceErr) |
| ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| if err := xdsC.WaitForCancelClusterWatch(ctx); err == nil { |
| t.Fatalf("want watch to be not canceled, watchForCancel should timeout") |
| } |
| if err := edsB.waitForResolverError(resourceErr); err != nil { |
| t.Fatalf("eds balancer should get resource-not-found error, waitForError failed: %v", err) |
| } |
| } |
| |
| // TestResolverError verifies that resolvers errors (with type |
| // resource-not-found or others) are handled correctly. |
| func (s) TestResolverError(t *testing.T) { |
| xdsC, cdsB, edsB, tcc, cancel := setupWithWatch(t) |
| defer func() { |
| cancel() |
| cdsB.Close() |
| }() |
| |
| // An error before eds balancer is built. Should result in an error picker. |
| // Not a resource not found error, watch shouldn't be canceled. |
| err1 := errors.New("cdsBalancer resolver error 1") |
| cdsB.ResolverError(err1) |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| if err := xdsC.WaitForCancelClusterWatch(ctx); err == nil { |
| t.Fatal("watch was canceled, want not canceled (timeout error)") |
| } |
| if err := edsB.waitForResolverError(err1); err == nil { |
| t.Fatal("eds balancer shouldn't get error (shouldn't be built yet)") |
| } |
| ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| state, err := tcc.newPickerCh.Receive(ctx) |
| if err != nil { |
| t.Fatalf("failed to get picker, expect an error picker") |
| } |
| picker := state.(balancer.State).Picker |
| if _, perr := picker.Pick(balancer.PickInfo{}); perr == nil { |
| t.Fatalf("want picker to always fail, got nil") |
| } |
| |
| cdsUpdate := xdsclient.ClusterUpdate{ServiceName: serviceName} |
| wantCCS := edsCCS(serviceName, false, xdsC) |
| if err := invokeWatchCbAndWait(xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Not a resource not found error, watch shouldn't be canceled, and eds |
| // should receive the error. |
| err2 := errors.New("cdsBalancer resolver error 2") |
| cdsB.ResolverError(err2) |
| ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| if err := xdsC.WaitForCancelClusterWatch(ctx); err == nil { |
| t.Fatal("watch was canceled, want not canceled (timeout error)") |
| } |
| if err := edsB.waitForResolverError(err2); err != nil { |
| t.Fatalf("eds balancer should get error, waitForError failed: %v", err) |
| } |
| |
| // A resource not found error. Watch should be canceled, and eds should |
| // receive the error. |
| resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "cdsBalancer resource not found error") |
| cdsB.ResolverError(resourceErr) |
| ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| if err := xdsC.WaitForCancelClusterWatch(ctx); err != nil { |
| t.Fatalf("want watch to be canceled, watchForCancel failed: %v", err) |
| } |
| if err := edsB.waitForResolverError(resourceErr); err != nil { |
| t.Fatalf("eds balancer should get resource-not-found error, waitForError failed: %v", err) |
| } |
| } |
| |
| // TestUpdateSubConnState pushes a SubConn update to the cdsBalancer and |
| // verifies that the update is propagated to the edsBalancer. |
| func (s) TestUpdateSubConnState(t *testing.T) { |
| xdsC, cdsB, edsB, _, cancel := setupWithWatch(t) |
| defer func() { |
| cancel() |
| cdsB.Close() |
| }() |
| |
| cdsUpdate := xdsclient.ClusterUpdate{ServiceName: serviceName} |
| wantCCS := edsCCS(serviceName, false, xdsC) |
| if err := invokeWatchCbAndWait(xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { |
| t.Fatal(err) |
| } |
| |
| var sc balancer.SubConn |
| state := balancer.SubConnState{ConnectivityState: connectivity.Ready} |
| cdsB.UpdateSubConnState(sc, state) |
| if err := edsB.waitForSubConnUpdate(subConnWithState{sc: sc, state: state}); err != nil { |
| t.Fatal(err) |
| } |
| } |
| |
| // TestClose calls Close() on the cdsBalancer, and verifies that the underlying |
| // edsBalancer is also closed. |
| func (s) TestClose(t *testing.T) { |
| xdsC, cdsB, edsB, _, cancel := setupWithWatch(t) |
| defer cancel() |
| |
| cdsUpdate := xdsclient.ClusterUpdate{ServiceName: serviceName} |
| wantCCS := edsCCS(serviceName, false, xdsC) |
| if err := invokeWatchCbAndWait(xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { |
| t.Fatal(err) |
| } |
| |
| cdsB.Close() |
| ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer ctxCancel() |
| if err := xdsC.WaitForCancelClusterWatch(ctx); err != nil { |
| t.Fatal(err) |
| } |
| if err := edsB.waitForClose(); err != nil { |
| t.Fatal(err) |
| } |
| } |
| |
| // TestParseConfig exercises the config parsing functionality in the cds |
| // balancer builder. |
| func (s) TestParseConfig(t *testing.T) { |
| bb := cdsBB{} |
| if gotName := bb.Name(); gotName != cdsName { |
| t.Fatalf("cdsBB.Name() = %v, want %v", gotName, cdsName) |
| } |
| |
| tests := []struct { |
| name string |
| input json.RawMessage |
| wantCfg serviceconfig.LoadBalancingConfig |
| wantErr bool |
| }{ |
| { |
| name: "good-lb-config", |
| input: json.RawMessage(`{"Cluster": "cluster1"}`), |
| wantCfg: &lbConfig{ClusterName: clusterName}, |
| }, |
| { |
| name: "unknown-fields-in-lb-config", |
| input: json.RawMessage(`{"Unknown": "foobar"}`), |
| wantCfg: &lbConfig{ClusterName: ""}, |
| }, |
| { |
| name: "empty-lb-config", |
| input: json.RawMessage(""), |
| wantErr: true, |
| }, |
| } |
| |
| for _, test := range tests { |
| t.Run(test.name, func(t *testing.T) { |
| gotCfg, gotErr := bb.ParseConfig(test.input) |
| if (gotErr != nil) != test.wantErr { |
| t.Fatalf("bb.ParseConfig(%v) = %v, wantErr %v", string(test.input), gotErr, test.wantErr) |
| } |
| if !test.wantErr { |
| if !cmp.Equal(gotCfg, test.wantCfg) { |
| t.Fatalf("bb.ParseConfig(%v) = %v, want %v", string(test.input), gotCfg, test.wantCfg) |
| } |
| } |
| }) |
| } |
| } |