blob: a2c5e8ce358c29626d6889c2c88f1786525f85a4 [file] [log] [blame]
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edsbalancer
import (
"fmt"
"reflect"
"sort"
"testing"
"time"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/testutils"
)
var (
testClusterNames = []string{"test-cluster-1", "test-cluster-2"}
testSubZones = []string{"I", "II", "III", "IV"}
testEndpointAddrs []string
)
const testBackendAddrsCount = 12
func init() {
for i := 0; i < testBackendAddrsCount; i++ {
testEndpointAddrs = append(testEndpointAddrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
}
// One locality
// - add backend
// - remove backend
// - replace backend
// - change drop rate
func (s) TestEDS_OneLocality(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)
// Pick with only the first backend.
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
// The same locality, add one more backend.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab2.Build()))
sc2 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc2, connectivity.Connecting)
edsb.handleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with two subconns.
p2 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// The same locality, delete first backend.
clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab3.Build()))
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
edsb.handleSubConnStateChange(scToRemove, connectivity.Shutdown)
// Test pick with only the second subconn.
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p3.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
}
}
// The same locality, replace backend.
clab4 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab4.Build()))
sc3 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc3, connectivity.Connecting)
edsb.handleSubConnStateChange(sc3, connectivity.Ready)
scToRemove = <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove)
}
edsb.handleSubConnStateChange(scToRemove, connectivity.Shutdown)
// Test pick with only the third subconn.
p4 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p4.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3)
}
}
// The same locality, different drop rate, dropping 50%.
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50})
clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab5.Build()))
// Picks with drops.
p5 := <-cc.NewPickerCh
for i := 0; i < 100; i++ {
_, err := p5.Pick(balancer.PickInfo{})
// TODO: the dropping algorithm needs a design. When the dropping algorithm
// is fixed, this test also needs fix.
if i < 50 && err == nil {
t.Errorf("The first 50%% picks should be drops, got error <nil>")
} else if i > 50 && err != nil {
t.Errorf("The second 50%% picks should be non-drops, got error %v", err)
}
}
// The same locality, remove drops.
clab6 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab6.Build()))
// Pick without drops.
p6 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p6.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3)
}
}
}
// 2 locality
// - start with 2 locality
// - add locality
// - remove locality
// - address change for the <not-the-first> locality
// - update locality weight
func (s) TestEDS_TwoLocalities(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
// Two localities, each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)
// Add the second locality later to make sure sc2 belongs to the second
// locality. Otherwise the test is flaky because of a map is used in EDS to
// keep localities.
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
sc2 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc2, connectivity.Connecting)
edsb.handleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with two subconns.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Add another locality, with one backend.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab2.Build()))
sc3 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc3, connectivity.Connecting)
edsb.handleSubConnStateChange(sc3, connectivity.Ready)
// Test roundrobin with three subconns.
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc2, sc3}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Remove first locality.
clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab3.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab3.Build()))
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
edsb.handleSubConnStateChange(scToRemove, connectivity.Shutdown)
// Test pick with two subconns (without the first one).
p3 := <-cc.NewPickerCh
want = []balancer.SubConn{sc2, sc3}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Add a backend to the last locality.
clab4 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab4.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab4.Build()))
sc4 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc4, connectivity.Connecting)
edsb.handleSubConnStateChange(sc4, connectivity.Ready)
// Test pick with two subconns (without the first one).
p4 := <-cc.NewPickerCh
// Locality-1 will be picked twice, and locality-2 will be picked twice.
// Locality-1 contains only sc2, locality-2 contains sc3 and sc4. So expect
// two sc2's and sc3, sc4.
want = []balancer.SubConn{sc2, sc2, sc3, sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p4)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Change weight of the locality[1].
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab5.AddLocality(testSubZones[1], 2, 0, testEndpointAddrs[1:2], nil)
clab5.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab5.Build()))
// Test pick with two subconns different locality weight.
p5 := <-cc.NewPickerCh
// Locality-1 will be picked four times, and locality-2 will be picked twice
// (weight 2 and 1). Locality-1 contains only sc2, locality-2 contains sc3 and
// sc4. So expect four sc2's and sc3, sc4.
want = []balancer.SubConn{sc2, sc2, sc2, sc2, sc3, sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p5)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Change weight of the locality[1] to 0, it should never be picked.
clab6 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.AddLocality(testSubZones[1], 0, 0, testEndpointAddrs[1:2], nil)
clab6.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab6.Build()))
// Changing weight of locality[1] to 0 caused it to be removed. It's subconn
// should also be removed.
//
// NOTE: this is because we handle locality with weight 0 same as the
// locality doesn't exist. If this changes in the future, this removeSubConn
// behavior will also change.
scToRemove2 := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove2, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove2)
}
// Test pick with two subconns different locality weight.
p6 := <-cc.NewPickerCh
// Locality-1 will be not be picked, and locality-2 will be picked.
// Locality-2 contains sc3 and sc4. So expect sc3, sc4.
want = []balancer.SubConn{sc3, sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p6)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// The EDS balancer gets EDS resp with unhealthy endpoints. Test that only
// healthy ones are used.
func (s) TestEDS_EndpointsHealth(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
// Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:6], &testutils.AddLocalityOptions{
Health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
corepb.HealthStatus_UNKNOWN,
corepb.HealthStatus_DRAINING,
corepb.HealthStatus_TIMEOUT,
corepb.HealthStatus_DEGRADED,
},
})
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[6:12], &testutils.AddLocalityOptions{
Health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
corepb.HealthStatus_UNKNOWN,
corepb.HealthStatus_DRAINING,
corepb.HealthStatus_TIMEOUT,
corepb.HealthStatus_DEGRADED,
},
})
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
var (
readySCs []balancer.SubConn
newSubConnAddrStrs []string
)
for i := 0; i < 4; i++ {
addr := <-cc.NewSubConnAddrsCh
newSubConnAddrStrs = append(newSubConnAddrStrs, addr[0].Addr)
sc := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc, connectivity.Connecting)
edsb.handleSubConnStateChange(sc, connectivity.Ready)
readySCs = append(readySCs, sc)
}
wantNewSubConnAddrStrs := []string{
testEndpointAddrs[0],
testEndpointAddrs[2],
testEndpointAddrs[6],
testEndpointAddrs[8],
}
sortStrTrans := cmp.Transformer("Sort", func(in []string) []string {
out := append([]string(nil), in...) // Copy input to avoid mutating it.
sort.Strings(out)
return out
})
if !cmp.Equal(newSubConnAddrStrs, wantNewSubConnAddrStrs, sortStrTrans) {
t.Fatalf("want newSubConn with address %v, got %v", wantNewSubConnAddrStrs, newSubConnAddrStrs)
}
// There should be exactly 4 new SubConns. Check to make sure there's no
// more subconns being created.
select {
case <-cc.NewSubConnCh:
t.Fatalf("Got unexpected new subconn")
case <-time.After(time.Microsecond * 100):
}
// Test roundrobin with the subconns.
p1 := <-cc.NewPickerCh
want := readySCs
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
func (s) TestClose(t *testing.T) {
edsb := newEDSBalancerImpl(nil, nil, nil, nil)
// This is what could happen when switching between fallback and eds. This
// make sure it doesn't panic.
edsb.close()
}
// TestEDS_EmptyUpdate covers the cases when eds impl receives an empty update.
//
// It should send an error picker with transient failure to the parent.
func (s) TestEDS_EmptyUpdate(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
// The first update is an empty update.
edsb.handleEDSResponse(xdsclient.EndpointsUpdate{})
// Pick should fail with transient failure, and all priority removed error.
perr0 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
_, err := perr0.Pick(balancer.PickInfo{})
if !reflect.DeepEqual(err, errAllPrioritiesRemoved) {
t.Fatalf("picker.Pick, got error %v, want error %v", err, errAllPrioritiesRemoved)
}
}
// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)
// Pick with only the first backend.
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !reflect.DeepEqual(gotSCSt.SubConn, sc1) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
edsb.handleEDSResponse(xdsclient.EndpointsUpdate{})
// Pick should fail with transient failure, and all priority removed error.
perr1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
_, err := perr1.Pick(balancer.PickInfo{})
if !reflect.DeepEqual(err, errAllPrioritiesRemoved) {
t.Fatalf("picker.Pick, got error %v, want error %v", err, errAllPrioritiesRemoved)
}
}
// Handle another update with priorities and localities.
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
sc2 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc2, connectivity.Connecting)
edsb.handleSubConnStateChange(sc2, connectivity.Ready)
// Pick with only the first backend.
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p2.Pick(balancer.PickInfo{})
if !reflect.DeepEqual(gotSCSt.SubConn, sc2) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
}
}
}
// Create XDS balancer, and update sub-balancer before handling eds responses.
// Then switch between round-robin and test-const-balancer after handling first
// eds response.
func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
t.Logf("update sub-balancer to test-const-balancer")
edsb.handleChildPolicy("test-const-balancer", nil)
// Two localities, each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
for i := 0; i < 2; i++ {
sc := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc, connectivity.Ready)
}
p0 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
_, err := p0.Pick(balancer.PickInfo{})
if err != testutils.ErrTestConstPicker {
t.Fatalf("picker.Pick, got err %+v, want err %+v", err, testutils.ErrTestConstPicker)
}
}
t.Logf("update sub-balancer to round-robin")
edsb.handleChildPolicy(roundrobin.Name, nil)
for i := 0; i < 2; i++ {
<-cc.RemoveSubConnCh
}
sc1 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)
sc2 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc2, connectivity.Connecting)
edsb.handleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with two subconns.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
t.Logf("update sub-balancer to test-const-balancer")
edsb.handleChildPolicy("test-const-balancer", nil)
for i := 0; i < 2; i++ {
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) &&
!cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want (%v or %v), got %v", sc1, sc2, scToRemove)
}
edsb.handleSubConnStateChange(scToRemove, connectivity.Shutdown)
}
for i := 0; i < 2; i++ {
sc := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc, connectivity.Ready)
}
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
_, err := p2.Pick(balancer.PickInfo{})
if err != testutils.ErrTestConstPicker {
t.Fatalf("picker.Pick, got err %q, want err %q", err, testutils.ErrTestConstPicker)
}
}
t.Logf("update sub-balancer to round-robin")
edsb.handleChildPolicy(roundrobin.Name, nil)
for i := 0; i < 2; i++ {
<-cc.RemoveSubConnCh
}
sc3 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc3, connectivity.Connecting)
edsb.handleSubConnStateChange(sc3, connectivity.Ready)
sc4 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc4, connectivity.Connecting)
edsb.handleSubConnStateChange(sc4, connectivity.Ready)
p3 := <-cc.NewPickerCh
want = []balancer.SubConn{sc3, sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
func init() {
balancer.Register(&testInlineUpdateBalancerBuilder{})
}
// A test balancer that updates balancer.State inline when handling ClientConn
// state.
type testInlineUpdateBalancerBuilder struct{}
func (*testInlineUpdateBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &testInlineUpdateBalancer{cc: cc}
}
func (*testInlineUpdateBalancerBuilder) Name() string {
return "test-inline-update-balancer"
}
type testInlineUpdateBalancer struct {
cc balancer.ClientConn
}
func (tb *testInlineUpdateBalancer) ResolverError(error) {
panic("not implemented")
}
func (tb *testInlineUpdateBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
}
var errTestInlineStateUpdate = fmt.Errorf("don't like addresses, empty or not")
func (tb *testInlineUpdateBalancer) UpdateClientConnState(balancer.ClientConnState) error {
tb.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &testutils.TestConstPicker{Err: errTestInlineStateUpdate},
})
return nil
}
func (*testInlineUpdateBalancer) Close() {
}
// When the child policy update picker inline in a handleClientUpdate call
// (e.g., roundrobin handling empty addresses). There could be deadlock caused
// by acquiring a locked mutex.
func (s) TestEDS_ChildPolicyUpdatePickerInline(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = func(p priorityType, state balancer.State) {
// For this test, euqueue needs to happen asynchronously (like in the
// real implementation).
go edsb.updateState(p, state)
}
edsb.handleChildPolicy("test-inline-update-balancer", nil)
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
p0 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
_, err := p0.Pick(balancer.PickInfo{})
if err != errTestInlineStateUpdate {
t.Fatalf("picker.Pick, got err %q, want err %q", err, errTestInlineStateUpdate)
}
}
}
func (s) TestDropPicker(t *testing.T) {
const pickCount = 12
var constPicker = &testutils.TestConstPicker{
SC: testutils.TestSubConns[0],
}
tests := []struct {
name string
drops []*dropper
}{
{
name: "no drop",
drops: nil,
},
{
name: "one drop",
drops: []*dropper{
newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 2}),
},
},
{
name: "two drops",
drops: []*dropper{
newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 3}),
newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 2}),
},
},
{
name: "three drops",
drops: []*dropper{
newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 3}),
newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 4}),
newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 2}),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := newDropPicker(constPicker, tt.drops, nil)
// scCount is the number of sc's returned by pick. The opposite of
// drop-count.
var (
scCount int
wantCount = pickCount
)
for _, dp := range tt.drops {
wantCount = wantCount * int(dp.c.Denominator-dp.c.Numerator) / int(dp.c.Denominator)
}
for i := 0; i < pickCount; i++ {
_, err := p.Pick(balancer.PickInfo{})
if err == nil {
scCount++
}
}
if scCount != (wantCount) {
t.Errorf("drops: %+v, scCount %v, wantCount %v", tt.drops, scCount, wantCount)
}
})
}
}
type loadStoreWrapper struct {
xdsClientInterface
ls *load.Store
}
func (l *loadStoreWrapper) LoadStore() *load.Store {
return l.ls
}
func (s) TestEDS_LoadReport(t *testing.T) {
// We create an xdsClientWrapper with a dummy xdsClientInterface which only
// implements the LoadStore() method to return the underlying load.Store to
// be used.
loadStore := &load.Store{}
cw := &xdsClientWrapper{
xdsClient: &loadStoreWrapper{ls: loadStore},
}
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, cw, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
backendToBalancerID := make(map[balancer.SubConn]internal.LocalityID)
// Two localities, each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)
locality1 := internal.LocalityID{SubZone: testSubZones[0]}
backendToBalancerID[sc1] = locality1
// Add the second locality later to make sure sc2 belongs to the second
// locality. Otherwise the test is flaky because of a map is used in EDS to
// keep localities.
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
sc2 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc2, connectivity.Connecting)
edsb.handleSubConnStateChange(sc2, connectivity.Ready)
locality2 := internal.LocalityID{SubZone: testSubZones[1]}
backendToBalancerID[sc2] = locality2
// Test roundrobin with two subconns.
p1 := <-cc.NewPickerCh
// We expect the 10 picks to be split between the localities since they are
// of equal weight. And since we only mark the picks routed to sc2 as done,
// the picks on sc1 should show up as inProgress.
wantStoreData := &load.Data{
LocalityStats: map[string]load.LocalityData{
locality1.String(): {RequestStats: load.RequestData{InProgress: 5}},
locality2.String(): {RequestStats: load.RequestData{Succeeded: 5}},
},
}
for i := 0; i < 10; i++ {
scst, _ := p1.Pick(balancer.PickInfo{})
if scst.Done != nil && scst.SubConn != sc1 {
scst.Done(balancer.DoneInfo{})
}
}
gotStoreData := loadStore.Stats()
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
}
}