blob: bbc1835a78c391066c9dc5735e2113971cdbc35f [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"encoding/json"
"reflect"
"sync"
"testing"
"time"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/balancer"
discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery"
edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
"google.golang.org/grpc/balancer/xds/lrs"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/resolver"
)
var lbABuilder = &balancerABuilder{}
func init() {
balancer.Register(lbABuilder)
balancer.Register(&balancerBBuilder{})
}
type s struct{}
func (s) Teardown(t *testing.T) {
leakcheck.Check(t)
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
const (
fakeBalancerA = "fake_balancer_A"
fakeBalancerB = "fake_balancer_B"
fakeBalancerC = "fake_balancer_C"
)
var (
testBalancerNameFooBar = "foo.bar"
testLBConfigFooBar = &xdsConfig{
BalancerName: testBalancerNameFooBar,
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"}
specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"}
// mu protects the access of latestFakeEdsBalancer
mu sync.Mutex
latestFakeEdsBalancer *fakeEDSBalancer
)
type balancerABuilder struct {
mu sync.Mutex
lastBalancer *balancerA
}
func (b *balancerABuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b.mu.Lock()
b.lastBalancer = &balancerA{cc: cc, subconnStateChange: make(chan *scStateChange, 10)}
b.mu.Unlock()
return b.lastBalancer
}
func (b *balancerABuilder) Name() string {
return string(fakeBalancerA)
}
func (b *balancerABuilder) getLastBalancer() *balancerA {
b.mu.Lock()
defer b.mu.Unlock()
return b.lastBalancer
}
func (b *balancerABuilder) clearLastBalancer() {
b.mu.Lock()
defer b.mu.Unlock()
b.lastBalancer = nil
}
type balancerBBuilder struct{}
func (b *balancerBBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &balancerB{cc: cc}
}
func (*balancerBBuilder) Name() string {
return string(fakeBalancerB)
}
type balancerA struct {
cc balancer.ClientConn
subconnStateChange chan *scStateChange
}
func (b *balancerA) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
b.subconnStateChange <- &scStateChange{sc: sc, state: state}
}
func (b *balancerA) HandleResolvedAddrs(addrs []resolver.Address, err error) {
_, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerA), balancer.NewSubConnOptions{})
}
func (b *balancerA) Close() {}
type balancerB struct {
cc balancer.ClientConn
}
func (balancerB) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
panic("implement me")
}
func (b *balancerB) HandleResolvedAddrs(addrs []resolver.Address, err error) {
_, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerB), balancer.NewSubConnOptions{})
}
func (balancerB) Close() {}
func newTestClientConn() *testClientConn {
return &testClientConn{
newSubConns: make(chan []resolver.Address, 10),
}
}
type testClientConn struct {
newSubConns chan []resolver.Address
}
func (t *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
t.newSubConns <- addrs
return nil, nil
}
func (testClientConn) RemoveSubConn(balancer.SubConn) {
}
func (testClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
}
func (testClientConn) ResolveNow(resolver.ResolveNowOption) {}
func (testClientConn) Target() string {
return testServiceName
}
type scStateChange struct {
sc balancer.SubConn
state connectivity.State
}
type fakeEDSBalancer struct {
cc balancer.ClientConn
edsChan chan *edspb.ClusterLoadAssignment
childPolicy chan *loadBalancingConfig
fallbackPolicy chan *loadBalancingConfig
subconnStateChange chan *scStateChange
loadStore lrs.Store
}
func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
f.subconnStateChange <- &scStateChange{sc: sc, state: state}
}
func (f *fakeEDSBalancer) Close() {
mu.Lock()
defer mu.Unlock()
latestFakeEdsBalancer = nil
}
func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *edspb.ClusterLoadAssignment) {
f.edsChan <- edsResp
}
func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
f.childPolicy <- &loadBalancingConfig{
Name: name,
Config: config,
}
}
func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface {
lb := &fakeEDSBalancer{
cc: cc,
edsChan: make(chan *edspb.ClusterLoadAssignment, 10),
childPolicy: make(chan *loadBalancingConfig, 10),
fallbackPolicy: make(chan *loadBalancingConfig, 10),
subconnStateChange: make(chan *scStateChange, 10),
loadStore: loadStore,
}
mu.Lock()
latestFakeEdsBalancer = lb
mu.Unlock()
return lb
}
func getLatestEdsBalancer() *fakeEDSBalancer {
mu.Lock()
defer mu.Unlock()
return latestFakeEdsBalancer
}
type fakeSubConn struct{}
func (*fakeSubConn) UpdateAddresses([]resolver.Address) {
panic("implement me")
}
func (*fakeSubConn) Connect() {
panic("implement me")
}
func (s) TestXdsBalanceHandleResolvedAddrs(t *testing.T) {
startupTimeout = 500 * time.Millisecond
defer func() { startupTimeout = defaultTimeout }()
builder := balancer.Get(xdsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
if !ok {
t.Fatalf("unable to type assert to *xdsBalancer")
}
defer lb.Close()
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
for i := 0; i < 3; i++ {
lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: testLBConfigFooBar,
})
select {
case nsc := <-cc.newSubConns:
if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
}
case <-time.After(2 * time.Second):
t.Fatal("timeout when geting new subconn result")
}
addrs = addrs[:2-i]
}
}
func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
startupTimeout = 500 * time.Millisecond
originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer
defer func() {
startupTimeout = defaultTimeout
newEDSBalancer = originalNewEDSBalancer
}()
builder := balancer.Get(xdsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
if !ok {
t.Fatalf("unable to type assert to *xdsBalancer")
}
defer lb.Close()
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: testLBConfigFooBar,
})
// verify fallback takes over
select {
case nsc := <-cc.newSubConns:
if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
}
case <-time.After(2 * time.Second):
t.Fatalf("timeout when geting new subconn result")
}
var cleanups []func()
defer func() {
for _, cleanup := range cleanups {
cleanup()
}
}()
// In the first iteration, an eds balancer takes over fallback balancer
// In the second iteration, a new xds client takes over previous one.
for i := 0; i < 2; i++ {
addr, td, _, cleanup := setupServer(t)
cleanups = append(cleanups, cleanup)
workingLBConfig := &xdsConfig{
BalancerName: addr,
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: workingLBConfig,
})
td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
var j int
for j = 0; j < 10; j++ {
if edsLB := getLatestEdsBalancer(); edsLB != nil { // edsLB won't change between the two iterations
select {
case gotEDS := <-edsLB.edsChan:
if !proto.Equal(gotEDS, testClusterLoadAssignmentWithoutEndpoints) {
t.Fatalf("edsBalancer got eds: %v, want %v", gotEDS, testClusterLoadAssignmentWithoutEndpoints)
}
case <-time.After(time.Second):
t.Fatal("haven't got EDS update after 1s")
}
break
}
time.Sleep(100 * time.Millisecond)
}
if j == 10 {
t.Fatal("edsBalancer instance has not been created or updated after 1s")
}
}
}
// switch child policy, lb stays the same
// cds->eds or eds -> cds, restart xdsClient, lb stays the same
func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer
defer func() {
newEDSBalancer = originalNewEDSBalancer
}()
builder := balancer.Get(xdsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
if !ok {
t.Fatalf("unable to type assert to *xdsBalancer")
}
defer lb.Close()
var cleanups []func()
defer func() {
for _, cleanup := range cleanups {
cleanup()
}
}()
for _, test := range []struct {
cfg *xdsConfig
responseToSend *discoverypb.DiscoveryResponse
expectedChildPolicy *loadBalancingConfig
}{
{
cfg: &xdsConfig{
ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerA,
Config: json.RawMessage("{}"),
},
},
responseToSend: testEDSRespWithoutEndpoints,
expectedChildPolicy: &loadBalancingConfig{
Name: string(fakeBalancerA),
Config: json.RawMessage(`{}`),
},
},
{
cfg: &xdsConfig{
ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerB,
Config: json.RawMessage("{}"),
},
},
expectedChildPolicy: &loadBalancingConfig{
Name: string(fakeBalancerB),
Config: json.RawMessage(`{}`),
},
},
{
cfg: &xdsConfig{},
responseToSend: testCDSResp,
expectedChildPolicy: &loadBalancingConfig{
Name: "ROUND_ROBIN",
},
},
} {
addr, td, _, cleanup := setupServer(t)
cleanups = append(cleanups, cleanup)
test.cfg.BalancerName = addr
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: test.cfg})
if test.responseToSend != nil {
td.sendResp(&response{resp: test.responseToSend})
}
var i int
for i = 0; i < 10; i++ {
if edsLB := getLatestEdsBalancer(); edsLB != nil {
select {
case childPolicy := <-edsLB.childPolicy:
if !reflect.DeepEqual(childPolicy, test.expectedChildPolicy) {
t.Fatalf("got childPolicy %v, want %v", childPolicy, test.expectedChildPolicy)
}
case <-time.After(time.Second):
t.Fatal("haven't got policy update after 1s")
}
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("edsBalancer instance has not been created or updated after 1s")
}
}
}
// not in fallback mode, overwrite fallback info.
// in fallback mode, update config or switch balancer.
func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) {
originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer
defer func() {
newEDSBalancer = originalNewEDSBalancer
}()
builder := balancer.Get(xdsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
if !ok {
t.Fatalf("unable to type assert to *xdsBalancer")
}
defer lb.Close()
addr, td, _, cleanup := setupServer(t)
cfg := xdsConfig{
BalancerName: addr,
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: &cfg})
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
cfg2 := cfg
cfg2.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerB}
lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &cfg2,
})
td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
var i int
for i = 0; i < 10; i++ {
if edsLB := getLatestEdsBalancer(); edsLB != nil {
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
}
cleanup()
// verify fallback balancer B takes over
select {
case nsc := <-cc.newSubConns:
if !reflect.DeepEqual(append(addrs, specialAddrForBalancerB), nsc) {
t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerB))
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout when geting new subconn result")
}
cfg3 := cfg
cfg3.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerA}
lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &cfg3,
})
// verify fallback balancer A takes over
select {
case nsc := <-cc.newSubConns:
if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
}
case <-time.After(2 * time.Second):
t.Fatalf("timeout when geting new subconn result")
}
}
func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer
defer func() {
newEDSBalancer = originalNewEDSBalancer
}()
builder := balancer.Get(xdsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
if !ok {
t.Fatalf("unable to type assert to *xdsBalancer")
}
defer lb.Close()
addr, td, _, cleanup := setupServer(t)
defer cleanup()
cfg := &xdsConfig{
BalancerName: addr,
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
expectedScStateChange := &scStateChange{
sc: &fakeSubConn{},
state: connectivity.Ready,
}
var i int
for i = 0; i < 10; i++ {
if edsLB := getLatestEdsBalancer(); edsLB != nil {
lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state})
select {
case scsc := <-edsLB.subconnStateChange:
if !reflect.DeepEqual(scsc, expectedScStateChange) {
t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
}
case <-time.After(time.Second):
t.Fatal("haven't got subconn state change after 1s")
}
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
}
// lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
// to make sure there's a new one created and get the pointer to it.
lbABuilder.clearLastBalancer()
cleanup()
// switch to fallback
// fallback balancer A takes over
for i = 0; i < 10; i++ {
if fblb := lbABuilder.getLastBalancer(); fblb != nil {
lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state})
select {
case scsc := <-fblb.subconnStateChange:
if !reflect.DeepEqual(scsc, expectedScStateChange) {
t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
}
case <-time.After(time.Second):
t.Fatal("haven't got subconn state change after 1s")
}
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("balancerA instance has not been created after 1s")
}
}
func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) {
originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer
defer func() {
newEDSBalancer = originalNewEDSBalancer
}()
builder := balancer.Get(xdsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
if !ok {
t.Fatalf("unable to type assert to *xdsBalancer")
}
defer lb.Close()
addr, td, _, cleanup := setupServer(t)
defer cleanup()
cfg := &xdsConfig{
BalancerName: addr,
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
expectedScStateChange := &scStateChange{
sc: &fakeSubConn{},
state: connectivity.Ready,
}
var i int
for i = 0; i < 10; i++ {
if edsLB := getLatestEdsBalancer(); edsLB != nil {
lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state})
select {
case scsc := <-edsLB.subconnStateChange:
if !reflect.DeepEqual(scsc, expectedScStateChange) {
t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
}
case <-time.After(time.Second):
t.Fatal("haven't got subconn state change after 1s")
}
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
}
// lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
// to make sure there's a new one created and get the pointer to it.
lbABuilder.clearLastBalancer()
cleanup()
// switch to fallback
// fallback balancer A takes over
for i = 0; i < 10; i++ {
if fblb := lbABuilder.getLastBalancer(); fblb != nil {
lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state})
select {
case scsc := <-fblb.subconnStateChange:
if !reflect.DeepEqual(scsc, expectedScStateChange) {
t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
}
case <-time.After(time.Second):
t.Fatal("haven't got subconn state change after 1s")
}
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("balancerA instance has not been created after 1s")
}
}
func (s) TestXdsBalancerConfigParsingSelectingLBPolicy(t *testing.T) {
js := json.RawMessage(`{
"balancerName": "fake.foo.bar",
"childPolicy": [{"fake_balancer_C": {}}, {"fake_balancer_A": {}}, {"fake_balancer_B": {}}],
"fallbackPolicy": [{"fake_balancer_C": {}}, {"fake_balancer_B": {}}, {"fake_balancer_A": {}}]
}`)
cfg, err := (&xdsBalancerBuilder{}).ParseConfig(js)
if err != nil {
t.Fatalf("unable to unmarshal balancer config into xds config: %v", err)
}
xdsCfg := cfg.(*xdsConfig)
wantChildPolicy := &loadBalancingConfig{Name: string(fakeBalancerA), Config: json.RawMessage(`{}`)}
if !reflect.DeepEqual(xdsCfg.ChildPolicy, wantChildPolicy) {
t.Fatalf("got child policy %v, want %v", xdsCfg.ChildPolicy, wantChildPolicy)
}
wantFallbackPolicy := &loadBalancingConfig{Name: string(fakeBalancerB), Config: json.RawMessage(`{}`)}
if !reflect.DeepEqual(xdsCfg.FallBackPolicy, wantFallbackPolicy) {
t.Fatalf("got fallback policy %v, want %v", xdsCfg.FallBackPolicy, wantFallbackPolicy)
}
}
func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) {
tests := []struct {
name string
s string
want *xdsConfig
}{
{
name: "empty",
s: "{}",
want: &xdsConfig{},
},
{
name: "success1",
s: `{"childPolicy":[{"pick_first":{}}]}`,
want: &xdsConfig{
ChildPolicy: &loadBalancingConfig{
Name: "pick_first",
Config: json.RawMessage(`{}`),
},
},
},
{
name: "success2",
s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`,
want: &xdsConfig{
ChildPolicy: &loadBalancingConfig{
Name: "round_robin",
Config: json.RawMessage(`{}`),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var cfg xdsConfig
if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !reflect.DeepEqual(&cfg, tt.want) {
t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want)
}
})
}
}