blob: 7f9e566ca5b5063533aba07326ca2097bd944fe9 [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package weightedtarget
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/testutils"
)
type testConfigBalancerBuilder struct {
balancer.Builder
}
func newTestConfigBalancerBuilder() *testConfigBalancerBuilder {
return &testConfigBalancerBuilder{
Builder: balancer.Get(roundrobin.Name),
}
}
func (t *testConfigBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
rr := t.Builder.Build(cc, opts)
return &testConfigBalancer{
Balancer: rr,
}
}
const testConfigBalancerName = "test_config_balancer"
func (t *testConfigBalancerBuilder) Name() string {
return testConfigBalancerName
}
type stringBalancerConfig struct {
serviceconfig.LoadBalancingConfig
s string
}
func (t *testConfigBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
// Return string without quotes.
return stringBalancerConfig{s: string(c[1 : len(c)-1])}, nil
}
// testConfigBalancer is a roundrobin balancer, but it takes the balancer config
// string and append it to the backend addresses.
type testConfigBalancer struct {
balancer.Balancer
}
func (b *testConfigBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
c, ok := s.BalancerConfig.(stringBalancerConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type %T", s.BalancerConfig)
}
oneMoreAddr := resolver.Address{Addr: c.s}
s.BalancerConfig = nil
s.ResolverState.Addresses = append(s.ResolverState.Addresses, oneMoreAddr)
return b.Balancer.UpdateClientConnState(s)
}
func (b *testConfigBalancer) Close() {
b.Balancer.Close()
}
var (
wtbBuilder balancer.Builder
wtbParser balancer.ConfigParser
testBackendAddrStrs []string
)
const testBackendAddrsCount = 12
func init() {
balancer.Register(newTestConfigBalancerBuilder())
for i := 0; i < testBackendAddrsCount; i++ {
testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
wtbBuilder = balancer.Get(weightedTargetName)
wtbParser = wtbBuilder.(balancer.ConfigParser)
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
}
// TestWeightedTarget covers the cases that a sub-balancer is added and a
// sub-balancer is removed. It verifies that the addresses and balancer configs
// are forwarded to the right sub-balancer.
//
// This test is intended to test the glue code in weighted_target. Most of the
// functionality tests are covered by the balancer group tests.
func TestWeightedTarget(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
// Start with "cluster_1: round_robin".
config1, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_1":{"weight":1,"childPolicy":[{"round_robin":""}]}}}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0], Attributes: nil}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddr1, []string{"cluster_1"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
addr1 := <-cc.NewSubConnAddrsCh
if want := []resolver.Address{
hierarchy.Set(wantAddr1, []string{}),
}; !cmp.Equal(addr1, want, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr1, want, cmp.AllowUnexported(attributes.Attributes{})))
}
// Send subconn state change.
sc1 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
// Remove cluster_1, and add "cluster_2: test_config_balancer".
wantAddr3Str := testBackendAddrStrs[2]
config2, err := wtbParser.ParseConfig([]byte(
fmt.Sprintf(`{"targets":{"cluster_2":{"weight":1,"childPolicy":[{%q:%q}]}}}`, testConfigBalancerName, wantAddr3Str),
))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and one address with hierarchy path "cluster_2".
wantAddr2 := resolver.Address{Addr: testBackendAddrStrs[1], Attributes: nil}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddr2, []string{"cluster_2"}),
}},
BalancerConfig: config2,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Expect the address sent in the address list. The hierarchy path should be
// cleared.
addr2 := <-cc.NewSubConnAddrsCh
if want := []resolver.Address{
hierarchy.Set(wantAddr2, []string{}),
}; !cmp.Equal(addr2, want, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr2, want, cmp.AllowUnexported(attributes.Attributes{})))
}
// Expect the other address sent as balancer config. This address doesn't
// have hierarchy path.
wantAddr3 := resolver.Address{Addr: wantAddr3Str, Attributes: nil}
addr3 := <-cc.NewSubConnAddrsCh
if want := []resolver.Address{wantAddr3}; !cmp.Equal(addr3, want, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr3, want, cmp.AllowUnexported(attributes.Attributes{})))
}
// The subconn for cluster_1 should be removed.
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
wtb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
sc2 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc3 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin pick with backends in cluster_2.
p2 := <-cc.NewPickerCh
want := []balancer.SubConn{sc2, sc3}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
return func() balancer.SubConn {
scst, _ := p.Pick(balancer.PickInfo{})
return scst.SubConn
}
}