blob: a20cb0dc1ce4aeaf1c805f6b0a3013b4adcc994f [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package weightedtarget
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
const (
defaultTestTimeout = 5 * time.Second
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
type testConfigBalancerBuilder struct {
balancer.Builder
}
func newTestConfigBalancerBuilder() *testConfigBalancerBuilder {
return &testConfigBalancerBuilder{
Builder: balancer.Get(roundrobin.Name),
}
}
// pickAndCheckError returns a function which takes a picker, invokes the Pick() method
// multiple times and ensures that the error returned by the picker matches the provided error.
func pickAndCheckError(want error) func(balancer.Picker) error {
const rpcCount = 5
return func(p balancer.Picker) error {
for i := 0; i < rpcCount; i++ {
if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), want.Error()) {
return fmt.Errorf("picker.Pick() returned error: %v, want: %v", err, want)
}
}
return nil
}
}
func (t *testConfigBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
rr := t.Builder.Build(cc, opts)
return &testConfigBalancer{
Balancer: rr,
}
}
const testConfigBalancerName = "test_config_balancer"
func (t *testConfigBalancerBuilder) Name() string {
return testConfigBalancerName
}
type stringBalancerConfig struct {
serviceconfig.LoadBalancingConfig
configStr string
}
func (t *testConfigBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg string
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, fmt.Errorf("failed to unmarshal config in %q: %v", testConfigBalancerName, err)
}
return stringBalancerConfig{configStr: cfg}, nil
}
// testConfigBalancer is a roundrobin balancer, but it takes the balancer config
// string and adds it as an address attribute to the backend addresses.
type testConfigBalancer struct {
balancer.Balancer
}
// configKey is the type used as the key to store balancer config in the
// Attributes field of resolver.Address.
type configKey struct{}
func setConfigKey(addr resolver.Address, config string) resolver.Address {
addr.Attributes = addr.Attributes.WithValue(configKey{}, config)
return addr
}
func getConfigKey(attr *attributes.Attributes) (string, bool) {
v := attr.Value(configKey{})
name, ok := v.(string)
return name, ok
}
func (b *testConfigBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
c, ok := s.BalancerConfig.(stringBalancerConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type %T", s.BalancerConfig)
}
addrsWithAttr := make([]resolver.Address, len(s.ResolverState.Addresses))
for i, addr := range s.ResolverState.Addresses {
addrsWithAttr[i] = setConfigKey(addr, c.configStr)
}
s.BalancerConfig = nil
s.ResolverState.Addresses = addrsWithAttr
return b.Balancer.UpdateClientConnState(s)
}
func (b *testConfigBalancer) Close() {
b.Balancer.Close()
}
var (
wtbBuilder balancer.Builder
wtbParser balancer.ConfigParser
testBackendAddrStrs []string
)
const testBackendAddrsCount = 12
func init() {
balancer.Register(newTestConfigBalancerBuilder())
for i := 0; i < testBackendAddrsCount; i++ {
testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
wtbBuilder = balancer.Get(Name)
wtbParser = wtbBuilder.(balancer.ConfigParser)
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
NewRandomWRR = testutils.NewTestWRR
}
// TestWeightedTarget covers the cases that a sub-balancer is added and a
// sub-balancer is removed. It verifies that the addresses and balancer configs
// are forwarded to the right sub-balancer. This test is intended to test the
// glue code in weighted_target.
func (s) TestWeightedTarget(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
// Start with "cluster_1: round_robin".
config1, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight":1,
"childPolicy": [{"round_robin": ""}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
addr1 := resolver.Address{Addr: testBackendAddrStrs[1], Attributes: nil}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr1, []string{"cluster_1"})}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
verifyAddressInNewSubConn(t, cc, addr1)
// Send subconn state change.
sc1 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p := <-cc.NewPickerCh
// Test pick with one backend.
for i := 0; i < 5; i++ {
gotSCSt, _ := p.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
// Remove cluster_1, and add "cluster_2: test_config_balancer". The
// test_config_balancer adds an address attribute whose value is set to the
// config that is passed to it.
config2, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_2": {
"weight":1,
"childPolicy": [{"test_config_balancer": "cluster_2"}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and one address with hierarchy path "cluster_2".
addr2 := resolver.Address{Addr: testBackendAddrStrs[2], Attributes: nil}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr2, []string{"cluster_2"})}},
BalancerConfig: config2,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Expect a new subConn from the test_config_balancer which has an address
// attribute set to the config that was passed to it.
verifyAddressInNewSubConn(t, cc, setConfigKey(addr2, "cluster_2"))
// The subconn for cluster_1 should be removed.
scRemoved := <-cc.RemoveSubConnCh
if !cmp.Equal(scRemoved, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
}
wtb.UpdateSubConnState(scRemoved, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
sc2 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p = <-cc.NewPickerCh
// Test pick with one backend.
for i := 0; i < 5; i++ {
gotSCSt, _ := p.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
}
}
// Replace child policy of "cluster_1" to "round_robin".
config3, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_2": {
"weight":1,
"childPolicy": [{"round_robin": ""}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_2"].
addr3 := resolver.Address{Addr: testBackendAddrStrs[3], Attributes: nil}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr3, []string{"cluster_2"})}},
BalancerConfig: config3,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
verifyAddressInNewSubConn(t, cc, addr3)
// The subconn from the test_config_balancer should be removed.
scRemoved = <-cc.RemoveSubConnCh
if !cmp.Equal(scRemoved, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
}
wtb.UpdateSubConnState(scRemoved, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
// Send subconn state change.
sc3 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p = <-cc.NewPickerCh
// Test pick with one backend.
for i := 0; i < 5; i++ {
gotSCSt, _ := p.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3)
}
}
}
// TestWeightedTarget_OneSubBalancer_AddRemoveBackend tests the case where we
// have a weighted target balancer will one sub-balancer, and we add and remove
// backends from the subBalancer.
func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
// Start with "cluster_1: round_robin".
config, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight":1,
"childPolicy": [{"round_robin": ""}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr1, []string{"cluster_1"})}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
verifyAddressInNewSubConn(t, cc, addr1)
// Expect one SubConn, and move it to READY.
sc1 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p := <-cc.NewPickerCh
// Test pick with one backend.
for i := 0; i < 5; i++ {
gotSCSt, _ := p.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
// Send two addresses.
addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
hierarchy.Set(addr2, []string{"cluster_1"}),
}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
verifyAddressInNewSubConn(t, cc, addr2)
// Expect one new SubConn, and move it to READY.
sc2 := <-cc.NewSubConnCh
// Update the SubConn to become READY.
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p = <-cc.NewPickerCh
// Test round robin pick.
want := []balancer.SubConn{sc1, sc2}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Remove the first address.
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr2, []string{"cluster_1"})}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Expect one SubConn to be removed.
scRemoved := <-cc.RemoveSubConnCh
if !cmp.Equal(scRemoved, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
}
wtb.UpdateSubConnState(scRemoved, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
p = <-cc.NewPickerCh
// Test pick with only the second SubConn.
for i := 0; i < 5; i++ {
gotSC, _ := p.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSC.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSC, sc2)
}
}
}
// TestWeightedTarget_TwoSubBalancers_OneBackend tests the case where we have a
// weighted target balancer with two sub-balancers, each with one backend.
func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
// Start with "cluster_1: test_config_balancer, cluster_2: test_config_balancer".
config, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight":1,
"childPolicy": [{"test_config_balancer": "cluster_1"}]
},
"cluster_2": {
"weight":1,
"childPolicy": [{"test_config_balancer": "cluster_2"}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config with one address for each cluster.
addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
hierarchy.Set(addr2, []string{"cluster_2"}),
}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
scs := waitForNewSubConns(t, cc, 2)
verifySubConnAddrs(t, scs, map[string][]resolver.Address{
"cluster_1": {addr1},
"cluster_2": {addr2},
})
// We expect a single subConn on each subBalancer.
sc1 := scs["cluster_1"][0].sc
sc2 := scs["cluster_2"][0].sc
// Send state changes for both SubConns, and wait for the picker.
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p := <-cc.NewPickerCh
// Test roundrobin on the last picker.
want := []balancer.SubConn{sc1, sc2}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// TestWeightedTarget_TwoSubBalancers_MoreBackends tests the case where we have
// a weighted target balancer with two sub-balancers, each with more than one
// backend.
func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
// Start with "cluster_1: round_robin, cluster_2: round_robin".
config, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight":1,
"childPolicy": [{"test_config_balancer": "cluster_1"}]
},
"cluster_2": {
"weight":1,
"childPolicy": [{"test_config_balancer": "cluster_2"}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config with two backends for each cluster.
addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
addr3 := resolver.Address{Addr: testBackendAddrStrs[3]}
addr4 := resolver.Address{Addr: testBackendAddrStrs[4]}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
hierarchy.Set(addr2, []string{"cluster_1"}),
hierarchy.Set(addr3, []string{"cluster_2"}),
hierarchy.Set(addr4, []string{"cluster_2"}),
}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
scs := waitForNewSubConns(t, cc, 4)
verifySubConnAddrs(t, scs, map[string][]resolver.Address{
"cluster_1": {addr1, addr2},
"cluster_2": {addr3, addr4},
})
// We expect two subConns on each subBalancer.
sc1 := scs["cluster_1"][0].sc
sc2 := scs["cluster_1"][1].sc
sc3 := scs["cluster_2"][0].sc
sc4 := scs["cluster_2"][1].sc
// Send state changes for all SubConns, and wait for the picker.
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p := <-cc.NewPickerCh
// Test roundrobin on the last picker. RPCs should be sent equally to all
// backends.
want := []balancer.SubConn{sc1, sc2, sc3, sc4}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Turn sc2's connection down, should be RR between balancers.
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p = <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc1, sc3, sc4}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Remove subConn corresponding to addr3.
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
hierarchy.Set(addr2, []string{"cluster_1"}),
hierarchy.Set(addr4, []string{"cluster_2"}),
}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
scRemoved := <-cc.RemoveSubConnCh
if !cmp.Equal(scRemoved, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scRemoved)
}
wtb.UpdateSubConnState(scRemoved, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
p = <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc4}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Turn sc1's connection down.
wantSubConnErr := errors.New("subConn connection error")
wtb.UpdateSubConnState(sc1, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: wantSubConnErr,
})
p = <-cc.NewPickerCh
want = []balancer.SubConn{sc4}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Turn last connection to connecting.
wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
p = <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err)
}
}
// Turn all connections down.
wtb.UpdateSubConnState(sc4, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: wantSubConnErr,
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := cc.WaitForPicker(ctx, pickAndCheckError(wantSubConnErr)); err != nil {
t.Fatal(err)
}
}
// TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends tests the
// case where we have a weighted target balancer with two sub-balancers of
// differing weights.
func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
// Start with two subBalancers, one with twice the weight of the other.
config, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight": 2,
"childPolicy": [{"test_config_balancer": "cluster_1"}]
},
"cluster_2": {
"weight": 1,
"childPolicy": [{"test_config_balancer": "cluster_2"}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config with two backends for each cluster.
addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
addr3 := resolver.Address{Addr: testBackendAddrStrs[3]}
addr4 := resolver.Address{Addr: testBackendAddrStrs[4]}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
hierarchy.Set(addr2, []string{"cluster_1"}),
hierarchy.Set(addr3, []string{"cluster_2"}),
hierarchy.Set(addr4, []string{"cluster_2"}),
}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
scs := waitForNewSubConns(t, cc, 4)
verifySubConnAddrs(t, scs, map[string][]resolver.Address{
"cluster_1": {addr1, addr2},
"cluster_2": {addr3, addr4},
})
// We expect two subConns on each subBalancer.
sc1 := scs["cluster_1"][0].sc
sc2 := scs["cluster_1"][1].sc
sc3 := scs["cluster_2"][0].sc
sc4 := scs["cluster_2"][1].sc
// Send state changes for all SubConns, and wait for the picker.
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p := <-cc.NewPickerCh
// Test roundrobin on the last picker. Twice the number of RPCs should be
// sent to cluster_1 when compared to cluster_2.
want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// TestWeightedTarget_ThreeSubBalancers_RemoveBalancer tests the case where we
// have a weighted target balancer with three sub-balancers and we remove one of
// the subBalancers.
func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
// Start with two subBalancers, one with twice the weight of the other.
config, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight": 1,
"childPolicy": [{"test_config_balancer": "cluster_1"}]
},
"cluster_2": {
"weight": 1,
"childPolicy": [{"test_config_balancer": "cluster_2"}]
},
"cluster_3": {
"weight": 1,
"childPolicy": [{"test_config_balancer": "cluster_3"}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config with one backend for each cluster.
addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
addr3 := resolver.Address{Addr: testBackendAddrStrs[3]}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
hierarchy.Set(addr2, []string{"cluster_2"}),
hierarchy.Set(addr3, []string{"cluster_3"}),
}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
scs := waitForNewSubConns(t, cc, 3)
verifySubConnAddrs(t, scs, map[string][]resolver.Address{
"cluster_1": {addr1},
"cluster_2": {addr2},
"cluster_3": {addr3},
})
// We expect one subConn on each subBalancer.
sc1 := scs["cluster_1"][0].sc
sc2 := scs["cluster_2"][0].sc
sc3 := scs["cluster_3"][0].sc
// Send state changes for all SubConns, and wait for the picker.
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2, sc3}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Remove the second balancer, while the others two are ready.
config, err = wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight": 1,
"childPolicy": [{"test_config_balancer": "cluster_1"}]
},
"cluster_3": {
"weight": 1,
"childPolicy": [{"test_config_balancer": "cluster_3"}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
hierarchy.Set(addr3, []string{"cluster_3"}),
}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Removing a subBalancer causes the weighted target LB policy to push a new
// picker which ensures that the removed subBalancer is not picked for RPCs.
p = <-cc.NewPickerCh
scRemoved := <-cc.RemoveSubConnCh
if !cmp.Equal(scRemoved, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scRemoved)
}
want = []balancer.SubConn{sc1, sc3}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Move balancer 3 into transient failure.
wantSubConnErr := errors.New("subConn connection error")
wtb.UpdateSubConnState(sc3, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: wantSubConnErr,
})
<-cc.NewPickerCh
// Remove the first balancer, while the third is transient failure.
config, err = wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_3": {
"weight": 1,
"childPolicy": [{"test_config_balancer": "cluster_3"}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr3, []string{"cluster_3"}),
}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Removing a subBalancer causes the weighted target LB policy to push a new
// picker which ensures that the removed subBalancer is not picked for RPCs.
scRemoved = <-cc.RemoveSubConnCh
if !cmp.Equal(scRemoved, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := cc.WaitForPicker(ctx, pickAndCheckError(wantSubConnErr)); err != nil {
t.Fatal(err)
}
}
// TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends tests the case
// where we have a weighted target balancer with two sub-balancers, and we
// change the weight of these subBalancers.
func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
// Start with two subBalancers, one with twice the weight of the other.
config, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight": 2,
"childPolicy": [{"test_config_balancer": "cluster_1"}]
},
"cluster_2": {
"weight": 1,
"childPolicy": [{"test_config_balancer": "cluster_2"}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config with two backends for each cluster.
addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
addr3 := resolver.Address{Addr: testBackendAddrStrs[3]}
addr4 := resolver.Address{Addr: testBackendAddrStrs[4]}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
hierarchy.Set(addr2, []string{"cluster_1"}),
hierarchy.Set(addr3, []string{"cluster_2"}),
hierarchy.Set(addr4, []string{"cluster_2"}),
}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
scs := waitForNewSubConns(t, cc, 4)
verifySubConnAddrs(t, scs, map[string][]resolver.Address{
"cluster_1": {addr1, addr2},
"cluster_2": {addr3, addr4},
})
// We expect two subConns on each subBalancer.
sc1 := scs["cluster_1"][0].sc
sc2 := scs["cluster_1"][1].sc
sc3 := scs["cluster_2"][0].sc
sc4 := scs["cluster_2"][1].sc
// Send state changes for all SubConns, and wait for the picker.
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p := <-cc.NewPickerCh
// Test roundrobin on the last picker. Twice the number of RPCs should be
// sent to cluster_1 when compared to cluster_2.
want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Change the weight of cluster_1.
config, err = wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight": 3,
"childPolicy": [{"test_config_balancer": "cluster_1"}]
},
"cluster_2": {
"weight": 1,
"childPolicy": [{"test_config_balancer": "cluster_2"}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
hierarchy.Set(addr2, []string{"cluster_1"}),
hierarchy.Set(addr3, []string{"cluster_2"}),
hierarchy.Set(addr4, []string{"cluster_2"}),
}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Weight change causes a new picker to be pushed to the channel.
p = <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc2, sc3, sc4}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// TestWeightedTarget_InitOneSubBalancerTransientFailure tests that at init
// time, with two sub-balancers, if one sub-balancer reports transient_failure,
// the picks won't fail with transient_failure, and should instead wait for the
// other sub-balancer.
func (s) TestWeightedTarget_InitOneSubBalancerTransientFailure(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
// Start with "cluster_1: test_config_balancer, cluster_2: test_config_balancer".
config, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight":1,
"childPolicy": [{"test_config_balancer": "cluster_1"}]
},
"cluster_2": {
"weight":1,
"childPolicy": [{"test_config_balancer": "cluster_2"}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config with one address for each cluster.
addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
hierarchy.Set(addr2, []string{"cluster_2"}),
}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
scs := waitForNewSubConns(t, cc, 2)
verifySubConnAddrs(t, scs, map[string][]resolver.Address{
"cluster_1": {addr1},
"cluster_2": {addr2},
})
// We expect a single subConn on each subBalancer.
sc1 := scs["cluster_1"][0].sc
_ = scs["cluster_2"][0].sc
// Set one subconn to TransientFailure, this will trigger one sub-balancer
// to report transient failure.
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
r, err := p.Pick(balancer.PickInfo{})
if err != balancer.ErrNoSubConnAvailable {
t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrNoSubConnAvailable, r, err)
}
}
}
// Test that with two sub-balancers, both in transient_failure, if one turns
// connecting, the overall state stays in transient_failure, and all picks
// return transient failure error.
func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
// Start with "cluster_1: test_config_balancer, cluster_2: test_config_balancer".
config, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight":1,
"childPolicy": [{"test_config_balancer": "cluster_1"}]
},
"cluster_2": {
"weight":1,
"childPolicy": [{"test_config_balancer": "cluster_2"}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config with one address for each cluster.
addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
hierarchy.Set(addr2, []string{"cluster_2"}),
}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
scs := waitForNewSubConns(t, cc, 2)
verifySubConnAddrs(t, scs, map[string][]resolver.Address{
"cluster_1": {addr1},
"cluster_2": {addr2},
})
// We expect a single subConn on each subBalancer.
sc1 := scs["cluster_1"][0].sc
sc2 := scs["cluster_2"][0].sc
// Set both subconn to TransientFailure, this will put both sub-balancers in
// transient failure.
wantSubConnErr := errors.New("subConn connection error")
wtb.UpdateSubConnState(sc1, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: wantSubConnErr,
})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: wantSubConnErr,
})
p := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), wantSubConnErr.Error()) {
t.Fatalf("picker.Pick() returned error: %v, want: %v", err, wantSubConnErr)
}
}
// Set one subconn to Connecting, it shouldn't change the overall state.
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
select {
case <-time.After(100 * time.Millisecond):
case <-cc.NewPickerCh:
t.Fatal("received new picker from the LB policy when expecting none")
}
for i := 0; i < 5; i++ {
if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), wantSubConnErr.Error()) {
t.Fatalf("picker.Pick() returned error: %v, want: %v", err, wantSubConnErr)
}
}
}
// Verify that a SubConn is created with the expected address and hierarchy
// path cleared.
func verifyAddressInNewSubConn(t *testing.T, cc *testutils.TestClientConn, addr resolver.Address) {
t.Helper()
gotAddr := <-cc.NewSubConnAddrsCh
wantAddr := []resolver.Address{hierarchy.Set(addr, []string{})}
if diff := cmp.Diff(gotAddr, wantAddr, cmp.AllowUnexported(attributes.Attributes{})); diff != "" {
t.Fatalf("got unexpected new subconn addrs: %v", diff)
}
}
// subConnWithAddr wraps a subConn and the address for which it was created.
type subConnWithAddr struct {
sc balancer.SubConn
addr resolver.Address
}
// waitForNewSubConns waits for `num` number of subConns to be created. This is
// expected to be used from tests using the "test_config_balancer" LB policy,
// which adds an address attribute with value set to the balancer config.
//
// Returned value is a map from subBalancer (identified by its config) to
// subConns created by it.
func waitForNewSubConns(t *testing.T, cc *testutils.TestClientConn, num int) map[string][]subConnWithAddr {
t.Helper()
scs := make(map[string][]subConnWithAddr)
for i := 0; i < num; i++ {
addrs := <-cc.NewSubConnAddrsCh
if len(addrs) != 1 {
t.Fatalf("received subConns with %d addresses, want 1", len(addrs))
}
cfg, ok := getConfigKey(addrs[0].Attributes)
if !ok {
t.Fatalf("received subConn address %v contains no attribute for balancer config", addrs[0])
}
sc := <-cc.NewSubConnCh
scWithAddr := subConnWithAddr{sc: sc, addr: addrs[0]}
scs[cfg] = append(scs[cfg], scWithAddr)
}
return scs
}
func verifySubConnAddrs(t *testing.T, scs map[string][]subConnWithAddr, wantSubConnAddrs map[string][]resolver.Address) {
t.Helper()
if len(scs) != len(wantSubConnAddrs) {
t.Fatalf("got new subConns %+v, want %v", scs, wantSubConnAddrs)
}
for cfg, scsWithAddr := range scs {
if len(scsWithAddr) != len(wantSubConnAddrs[cfg]) {
t.Fatalf("got new subConns %+v, want %v", scs, wantSubConnAddrs)
}
wantAddrs := wantSubConnAddrs[cfg]
for i, scWithAddr := range scsWithAddr {
if diff := cmp.Diff(wantAddrs[i].Addr, scWithAddr.addr.Addr); diff != "" {
t.Fatalf("got unexpected new subconn addrs: %v", diff)
}
}
}
}
const initIdleBalancerName = "test-init-Idle-balancer"
var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
func init() {
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
err := fmt.Errorf("wrong picker error")
if state.ConnectivityState == connectivity.Idle {
err = errTestInitIdle
}
bd.ClientConn.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &testutils.TestConstPicker{Err: err},
})
},
})
}
// TestInitialIdle covers the case that if the child reports Idle, the overall
// state will be Idle.
func (s) TestInitialIdle(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
config, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight":1,
"childPolicy": [{"test-init-Idle-balancer": ""}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
addrs := []resolver.Address{{Addr: testBackendAddrStrs[0], Attributes: nil}}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addrs[0], []string{"cds:cluster_1"})}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
for range addrs {
sc := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
}
if state := <-cc.NewStateCh; state != connectivity.Idle {
t.Fatalf("Received aggregated state: %v, want Idle", state)
}
}
// TestIgnoreSubBalancerStateTransitions covers the case that if the child reports a
// transition from TF to Connecting, the overall state will still be TF.
func (s) TestIgnoreSubBalancerStateTransitions(t *testing.T) {
cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)}
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
config, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight":1,
"childPolicy": [{"round_robin": ""}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
addr := resolver.Address{Addr: testBackendAddrStrs[0], Attributes: nil}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr, []string{"cluster_1"})}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
sc := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// Verify that the SubConnState update from TF to Connecting is ignored.
if len(cc.states) != 2 || cc.states[0].ConnectivityState != connectivity.Connecting || cc.states[1].ConnectivityState != connectivity.TransientFailure {
t.Fatalf("cc.states = %v; want [Connecting, TransientFailure]", cc.states)
}
}
// tcc wraps a testutils.TestClientConn but stores all state transitions in a
// slice.
type tcc struct {
*testutils.TestClientConn
states []balancer.State
}
func (t *tcc) UpdateState(bs balancer.State) {
t.states = append(t.states, bs)
t.TestClientConn.UpdateState(bs)
}
func (s) TestUpdateStatePauses(t *testing.T) {
cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)}
balFuncs := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: nil})
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: nil})
return nil
},
}
stub.Register("update_state_balancer", balFuncs)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
config, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight":1,
"childPolicy": [{"update_state_balancer": ""}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
addrs := []resolver.Address{{Addr: testBackendAddrStrs[0], Attributes: nil}}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addrs[0], []string{"cds:cluster_1"})}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Verify that the only state update is the second one called by the child.
if len(cc.states) != 1 || cc.states[0].ConnectivityState != connectivity.Ready {
t.Fatalf("cc.states = %v; want [connectivity.Ready]", cc.states)
}
}