blob: 085310d44b39103b57fcb94c6ddbafe9d56e18a5 [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package edsbalancer
import (
"bytes"
"context"
"encoding/json"
"fmt"
"reflect"
"testing"
"time"
"github.com/golang/protobuf/jsonpb"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/attributes"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpctest"
scpb "google.golang.org/grpc/internal/proto/grpc_service_config"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
_ "google.golang.org/grpc/xds/internal/client/v2" // V2 client registration.
)
const defaultTestTimeout = 1 * time.Second
func init() {
balancer.Register(&edsBalancerBuilder{})
}
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
return func() balancer.SubConn {
scst, _ := p.Pick(balancer.PickInfo{})
return scst.SubConn
}
}
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
const testBalancerNameFooBar = "foo.bar"
func newNoopTestClientConn() *noopTestClientConn {
return &noopTestClientConn{}
}
// noopTestClientConn is used in EDS balancer config update tests that only
// cover the config update handling, but not SubConn/load-balancing.
type noopTestClientConn struct {
balancer.ClientConn
}
func (t *noopTestClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
return nil, nil
}
func (noopTestClientConn) Target() string { return testServiceName }
type scStateChange struct {
sc balancer.SubConn
state connectivity.State
}
type fakeEDSBalancer struct {
cc balancer.ClientConn
childPolicy *testutils.Channel
subconnStateChange *testutils.Channel
edsUpdate *testutils.Channel
}
func (f *fakeEDSBalancer) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
f.subconnStateChange.Send(&scStateChange{sc: sc, state: state})
}
func (f *fakeEDSBalancer) handleChildPolicy(name string, config json.RawMessage) {
f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config})
}
func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) {
f.edsUpdate.Send(edsResp)
}
func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {}
func (f *fakeEDSBalancer) close() {}
func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
val, err := f.childPolicy.Receive(ctx)
if err != nil {
return fmt.Errorf("error waiting for childPolicy: %v", err)
}
gotPolicy := val.(*loadBalancingConfig)
if !cmp.Equal(gotPolicy, wantPolicy) {
return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy)
}
return nil
}
func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
val, err := f.subconnStateChange.Receive(ctx)
if err != nil {
return fmt.Errorf("error waiting for subconnStateChange: %v", err)
}
gotState := val.(*scStateChange)
if !cmp.Equal(gotState, wantState, cmp.AllowUnexported(scStateChange{})) {
return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState)
}
return nil
}
func (f *fakeEDSBalancer) waitForEDSResponse(wantUpdate xdsclient.EndpointsUpdate) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
val, err := f.edsUpdate.Receive(ctx)
if err != nil {
return fmt.Errorf("error waiting for edsUpdate: %v", err)
}
gotUpdate := val.(xdsclient.EndpointsUpdate)
if !reflect.DeepEqual(gotUpdate, wantUpdate) {
return fmt.Errorf("got edsUpdate %+v, want %+v", gotUpdate, wantUpdate)
}
return nil
}
func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerImplInterface {
return &fakeEDSBalancer{
cc: cc,
childPolicy: testutils.NewChannelWithSize(10),
subconnStateChange: testutils.NewChannelWithSize(10),
edsUpdate: testutils.NewChannelWithSize(10),
}
}
type fakeSubConn struct{}
func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") }
func (*fakeSubConn) Connect() { panic("implement me") }
// waitForNewEDSLB makes sure that a new edsLB is created by the top-level
// edsBalancer.
func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
val, err := ch.Receive(ctx)
if err != nil {
t.Fatalf("error when waiting for a new edsLB: %v", err)
return nil
}
return val.(*fakeEDSBalancer)
}
// setup overrides the functions which are used to create the xdsClient and the
// edsLB, creates fake version of them and makes them available on the provided
// channels. The returned cancel function should be called by the test for
// cleanup.
func setup(edsLBCh *testutils.Channel) func() {
origNewEDSBalancer := newEDSBalancer
newEDSBalancer = func(cc balancer.ClientConn, enqueue func(priorityType, balancer.State), _ *xdsClientWrapper, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
edsLB := newFakeEDSBalancer(cc)
defer func() { edsLBCh.Send(edsLB) }()
return edsLB
}
return func() {
newEDSBalancer = origNewEDSBalancer
}
}
const (
fakeBalancerA = "fake_balancer_A"
fakeBalancerB = "fake_balancer_B"
)
// Install two fake balancers for service config update tests.
//
// ParseConfig only accepts the json if the balancer specified is registered.
func init() {
balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA})
balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB})
}
type fakeBalancerBuilder struct {
name string
}
func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &fakeBalancer{cc: cc}
}
func (b *fakeBalancerBuilder) Name() string {
return b.name
}
type fakeBalancer struct {
cc balancer.ClientConn
}
func (b *fakeBalancer) ResolverError(error) {
panic("implement me")
}
func (b *fakeBalancer) UpdateClientConnState(balancer.ClientConnState) error {
panic("implement me")
}
func (b *fakeBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
panic("implement me")
}
func (b *fakeBalancer) Close() {}
// TestXDSConnfigChildPolicyUpdate verifies scenarios where the childPolicy
// section of the lbConfig is updated.
//
// The test does the following:
// * Builds a new xds balancer.
// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA.
// Verifies that a new xdsClient is created. It then pushes a new edsUpdate
// through the fakexds client. Verifies that a new edsLB is created and it
// receives the expected childPolicy.
// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB.
// This time around, we expect no new xdsClient or edsLB to be created.
// Instead, we expect the existing edsLB to receive the new child policy.
func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
edsLBCh := testutils.NewChannel()
cancel := setup(edsLBCh)
defer cancel()
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
}
defer edsB.Close()
edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
BalancerConfig: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerA,
Config: json.RawMessage("{}"),
},
EDSServiceName: testEDSClusterName,
},
})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
edsLB.waitForChildPolicy(&loadBalancingConfig{
Name: string(fakeBalancerA),
Config: json.RawMessage(`{}`),
})
edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
BalancerConfig: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerB,
Config: json.RawMessage("{}"),
},
EDSServiceName: testEDSClusterName,
},
})
edsLB.waitForChildPolicy(&loadBalancingConfig{
Name: string(fakeBalancerA),
Config: json.RawMessage(`{}`),
})
}
// TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on
// the subConnStateChange to appropriate child balancers.
func (s) TestXDSSubConnStateChange(t *testing.T) {
xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
edsLBCh := testutils.NewChannel()
cancel := setup(edsLBCh)
defer cancel()
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
}
defer edsB.Close()
edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
BalancerConfig: &EDSConfig{EDSServiceName: testEDSClusterName},
})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
fsc := &fakeSubConn{}
state := connectivity.Ready
edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state})
edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state})
}
// TestErrorFromXDSClientUpdate verifies that errros from xdsClient update are
// handled correctly.
//
// If it's resource-not-found, watch will NOT be canceled, the EDS impl will
// receive an empty EDS update, and new RPCs will fail.
//
// If it's connection error, nothing will happen. This will need to change to
// handle fallback.
func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
edsLBCh := testutils.NewChannel()
cancel := setup(edsLBCh)
defer cancel()
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
}
defer edsB.Close()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
BalancerConfig: &EDSConfig{EDSServiceName: testEDSClusterName},
}); err != nil {
t.Fatal(err)
}
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("EDS impl got unexpected EDS response: %v", err)
}
connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, connectionErr)
if err := xdsC.WaitForCancelEDSWatch(ctx); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err == nil {
t.Fatal("eds impl got EDS resp, want timeout error")
}
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, resourceErr)
// Even if error is resource not found, watch shouldn't be canceled, because
// this is an EDS resource removed (and xds client actually never sends this
// error, but we still handles it).
ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelEDSWatch(ctx); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("eds impl expecting empty update, got %v", err)
}
}
// TestErrorFromResolver verifies that resolver errors are handled correctly.
//
// If it's resource-not-found, watch will be canceled, the EDS impl will receive
// an empty EDS update, and new RPCs will fail.
//
// If it's connection error, nothing will happen. This will need to change to
// handle fallback.
func (s) TestErrorFromResolver(t *testing.T) {
xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
edsLBCh := testutils.NewChannel()
cancel := setup(edsLBCh)
defer cancel()
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
}
defer edsB.Close()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
BalancerConfig: &EDSConfig{EDSServiceName: testEDSClusterName},
}); err != nil {
t.Fatal(err)
}
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("EDS impl got unexpected EDS response: %v", err)
}
connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
edsB.ResolverError(connectionErr)
if err := xdsC.WaitForCancelEDSWatch(ctx); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err == nil {
t.Fatal("eds impl got EDS resp, want timeout error")
}
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
edsB.ResolverError(resourceErr)
ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err)
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("EDS impl got unexpected EDS response: %v", err)
}
}
func (s) TestXDSBalancerConfigParsing(t *testing.T) {
const testEDSName = "eds.service"
var testLRSName = "lrs.server"
b := bytes.NewBuffer(nil)
if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{
ChildPolicy: []*scpb.LoadBalancingConfig{
{Policy: &scpb.LoadBalancingConfig_Xds{}},
{Policy: &scpb.LoadBalancingConfig_RoundRobin{
RoundRobin: &scpb.RoundRobinConfig{},
}},
},
FallbackPolicy: []*scpb.LoadBalancingConfig{
{Policy: &scpb.LoadBalancingConfig_Xds{}},
{Policy: &scpb.LoadBalancingConfig_PickFirst{
PickFirst: &scpb.PickFirstConfig{},
}},
},
EdsServiceName: testEDSName,
LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName},
}); err != nil {
t.Fatalf("%v", err)
}
tests := []struct {
name string
js json.RawMessage
want serviceconfig.LoadBalancingConfig
wantErr bool
}{
{
name: "jsonpb-generated",
js: b.Bytes(),
want: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "round_robin",
Config: json.RawMessage("{}"),
},
FallBackPolicy: &loadBalancingConfig{
Name: "pick_first",
Config: json.RawMessage("{}"),
},
EDSServiceName: testEDSName,
LrsLoadReportingServerName: &testLRSName,
},
wantErr: false,
},
{
// json with random balancers, and the first is not registered.
name: "manually-generated",
js: json.RawMessage(`
{
"childPolicy": [
{"fake_balancer_C": {}},
{"fake_balancer_A": {}},
{"fake_balancer_B": {}}
],
"fallbackPolicy": [
{"fake_balancer_C": {}},
{"fake_balancer_B": {}},
{"fake_balancer_A": {}}
],
"edsServiceName": "eds.service",
"lrsLoadReportingServerName": "lrs.server"
}`),
want: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "fake_balancer_A",
Config: json.RawMessage("{}"),
},
FallBackPolicy: &loadBalancingConfig{
Name: "fake_balancer_B",
Config: json.RawMessage("{}"),
},
EDSServiceName: testEDSName,
LrsLoadReportingServerName: &testLRSName,
},
wantErr: false,
},
{
// json with no lrs server name, LrsLoadReportingServerName should
// be nil (not an empty string).
name: "no-lrs-server-name",
js: json.RawMessage(`
{
"edsServiceName": "eds.service"
}`),
want: &EDSConfig{
EDSServiceName: testEDSName,
LrsLoadReportingServerName: nil,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &edsBalancerBuilder{}
got, err := b.ParseConfig(tt.js)
if (err != nil) != tt.wantErr {
t.Errorf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !cmp.Equal(got, tt.want) {
t.Errorf(cmp.Diff(got, tt.want))
}
})
}
}
func (s) TestLoadbalancingConfigParsing(t *testing.T) {
tests := []struct {
name string
s string
want *EDSConfig
}{
{
name: "empty",
s: "{}",
want: &EDSConfig{},
},
{
name: "success1",
s: `{"childPolicy":[{"pick_first":{}}]}`,
want: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "pick_first",
Config: json.RawMessage(`{}`),
},
},
},
{
name: "success2",
s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`,
want: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "round_robin",
Config: json.RawMessage(`{}`),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var cfg EDSConfig
if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !cmp.Equal(&cfg, tt.want) {
t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want)
}
})
}
}
func (s) TestEqualStringPointers(t *testing.T) {
var (
ta1 = "test-a"
ta2 = "test-a"
tb = "test-b"
)
tests := []struct {
name string
a *string
b *string
want bool
}{
{"both-nil", nil, nil, true},
{"a-non-nil", &ta1, nil, false},
{"b-non-nil", nil, &tb, false},
{"equal", &ta1, &ta2, true},
{"different", &ta1, &tb, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := equalStringPointers(tt.a, tt.b); got != tt.want {
t.Errorf("equalStringPointers() = %v, want %v", got, tt.want)
}
})
}
}