| /* |
| * |
| * Copyright 2020 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package weightedtarget |
| |
| import ( |
| "encoding/json" |
| "fmt" |
| "testing" |
| "time" |
| |
| "github.com/google/go-cmp/cmp" |
| "google.golang.org/grpc/attributes" |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/balancer/roundrobin" |
| "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/internal/hierarchy" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/serviceconfig" |
| "google.golang.org/grpc/xds/internal/balancer/balancergroup" |
| "google.golang.org/grpc/xds/internal/testutils" |
| ) |
| |
| type testConfigBalancerBuilder struct { |
| balancer.Builder |
| } |
| |
| func newTestConfigBalancerBuilder() *testConfigBalancerBuilder { |
| return &testConfigBalancerBuilder{ |
| Builder: balancer.Get(roundrobin.Name), |
| } |
| } |
| |
| func (t *testConfigBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { |
| rr := t.Builder.Build(cc, opts) |
| return &testConfigBalancer{ |
| Balancer: rr, |
| } |
| } |
| |
| const testConfigBalancerName = "test_config_balancer" |
| |
| func (t *testConfigBalancerBuilder) Name() string { |
| return testConfigBalancerName |
| } |
| |
| type stringBalancerConfig struct { |
| serviceconfig.LoadBalancingConfig |
| s string |
| } |
| |
| func (t *testConfigBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { |
| // Return string without quotes. |
| return stringBalancerConfig{s: string(c[1 : len(c)-1])}, nil |
| } |
| |
| // testConfigBalancer is a roundrobin balancer, but it takes the balancer config |
| // string and append it to the backend addresses. |
| type testConfigBalancer struct { |
| balancer.Balancer |
| } |
| |
| func (b *testConfigBalancer) UpdateClientConnState(s balancer.ClientConnState) error { |
| c, ok := s.BalancerConfig.(stringBalancerConfig) |
| if !ok { |
| return fmt.Errorf("unexpected balancer config with type %T", s.BalancerConfig) |
| } |
| oneMoreAddr := resolver.Address{Addr: c.s} |
| s.BalancerConfig = nil |
| s.ResolverState.Addresses = append(s.ResolverState.Addresses, oneMoreAddr) |
| return b.Balancer.UpdateClientConnState(s) |
| } |
| |
| func (b *testConfigBalancer) Close() { |
| b.Balancer.Close() |
| } |
| |
| var ( |
| wtbBuilder balancer.Builder |
| wtbParser balancer.ConfigParser |
| testBackendAddrStrs []string |
| ) |
| |
| const testBackendAddrsCount = 12 |
| |
| func init() { |
| balancer.Register(newTestConfigBalancerBuilder()) |
| for i := 0; i < testBackendAddrsCount; i++ { |
| testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)) |
| } |
| wtbBuilder = balancer.Get(weightedTargetName) |
| wtbParser = wtbBuilder.(balancer.ConfigParser) |
| |
| balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond |
| } |
| |
| // TestWeightedTarget covers the cases that a sub-balancer is added and a |
| // sub-balancer is removed. It verifies that the addresses and balancer configs |
| // are forwarded to the right sub-balancer. |
| // |
| // This test is intended to test the glue code in weighted_target. Most of the |
| // functionality tests are covered by the balancer group tests. |
| func TestWeightedTarget(t *testing.T) { |
| cc := testutils.NewTestClientConn(t) |
| wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) |
| |
| // Start with "cluster_1: round_robin". |
| config1, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_1":{"weight":1,"childPolicy":[{"round_robin":""}]}}}`)) |
| if err != nil { |
| t.Fatalf("failed to parse balancer config: %v", err) |
| } |
| |
| // Send the config, and an address with hierarchy path ["cluster_1"]. |
| wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0], Attributes: nil} |
| if err := wtb.UpdateClientConnState(balancer.ClientConnState{ |
| ResolverState: resolver.State{Addresses: []resolver.Address{ |
| hierarchy.Set(wantAddr1, []string{"cluster_1"}), |
| }}, |
| BalancerConfig: config1, |
| }); err != nil { |
| t.Fatalf("failed to update ClientConn state: %v", err) |
| } |
| |
| // Verify that a subconn is created with the address, and the hierarchy path |
| // in the address is cleared. |
| addr1 := <-cc.NewSubConnAddrsCh |
| if want := []resolver.Address{ |
| hierarchy.Set(wantAddr1, []string{}), |
| }; !cmp.Equal(addr1, want, cmp.AllowUnexported(attributes.Attributes{})) { |
| t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr1, want, cmp.AllowUnexported(attributes.Attributes{}))) |
| } |
| |
| // Send subconn state change. |
| sc1 := <-cc.NewSubConnCh |
| wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) |
| wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) |
| |
| // Test pick with one backend. |
| p1 := <-cc.NewPickerCh |
| for i := 0; i < 5; i++ { |
| gotSCSt, _ := p1.Pick(balancer.PickInfo{}) |
| if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { |
| t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) |
| } |
| } |
| |
| // Remove cluster_1, and add "cluster_2: test_config_balancer". |
| wantAddr3Str := testBackendAddrStrs[2] |
| config2, err := wtbParser.ParseConfig([]byte( |
| fmt.Sprintf(`{"targets":{"cluster_2":{"weight":1,"childPolicy":[{%q:%q}]}}}`, testConfigBalancerName, wantAddr3Str), |
| )) |
| if err != nil { |
| t.Fatalf("failed to parse balancer config: %v", err) |
| } |
| |
| // Send the config, and one address with hierarchy path "cluster_2". |
| wantAddr2 := resolver.Address{Addr: testBackendAddrStrs[1], Attributes: nil} |
| if err := wtb.UpdateClientConnState(balancer.ClientConnState{ |
| ResolverState: resolver.State{Addresses: []resolver.Address{ |
| hierarchy.Set(wantAddr2, []string{"cluster_2"}), |
| }}, |
| BalancerConfig: config2, |
| }); err != nil { |
| t.Fatalf("failed to update ClientConn state: %v", err) |
| } |
| |
| // Expect the address sent in the address list. The hierarchy path should be |
| // cleared. |
| addr2 := <-cc.NewSubConnAddrsCh |
| if want := []resolver.Address{ |
| hierarchy.Set(wantAddr2, []string{}), |
| }; !cmp.Equal(addr2, want, cmp.AllowUnexported(attributes.Attributes{})) { |
| t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr2, want, cmp.AllowUnexported(attributes.Attributes{}))) |
| } |
| // Expect the other address sent as balancer config. This address doesn't |
| // have hierarchy path. |
| wantAddr3 := resolver.Address{Addr: wantAddr3Str, Attributes: nil} |
| addr3 := <-cc.NewSubConnAddrsCh |
| if want := []resolver.Address{wantAddr3}; !cmp.Equal(addr3, want, cmp.AllowUnexported(attributes.Attributes{})) { |
| t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr3, want, cmp.AllowUnexported(attributes.Attributes{}))) |
| } |
| |
| // The subconn for cluster_1 should be removed. |
| scToRemove := <-cc.RemoveSubConnCh |
| if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { |
| t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) |
| } |
| wtb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) |
| |
| sc2 := <-cc.NewSubConnCh |
| wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) |
| wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) |
| sc3 := <-cc.NewSubConnCh |
| wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) |
| wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) |
| |
| // Test roundrobin pick with backends in cluster_2. |
| p2 := <-cc.NewPickerCh |
| want := []balancer.SubConn{sc2, sc3} |
| if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil { |
| t.Fatalf("want %v, got %v", want, err) |
| } |
| } |
| |
| func subConnFromPicker(p balancer.Picker) func() balancer.SubConn { |
| return func() balancer.SubConn { |
| scst, _ := p.Pick(balancer.PickInfo{}) |
| return scst.SubConn |
| } |
| } |