xds: remove xdsclient.New from EDS balancer (#4001)
diff --git a/xds/internal/balancer/edsbalancer/config.go b/xds/internal/balancer/edsbalancer/config.go
index 95fa6d5..9b59cfa 100644
--- a/xds/internal/balancer/edsbalancer/config.go
+++ b/xds/internal/balancer/edsbalancer/config.go
@@ -29,8 +29,6 @@
// for EDS balancers.
type EDSConfig struct {
serviceconfig.LoadBalancingConfig
- // BalancerName represents the load balancer to use.
- BalancerName string
// ChildPolicy represents the load balancing config for the child
// policy.
ChildPolicy *loadBalancingConfig
@@ -50,7 +48,6 @@
// and Fallbackspolicy are post-processed, and for each, the first installed
// policy is kept.
type edsConfigJSON struct {
- BalancerName string
ChildPolicy []*loadBalancingConfig
FallbackPolicy []*loadBalancingConfig
EDSServiceName string
@@ -66,7 +63,6 @@
return err
}
- l.BalancerName = configJSON.BalancerName
l.EDSServiceName = configJSON.EDSServiceName
l.LrsLoadReportingServerName = configJSON.LRSLoadReportingServerName
diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go
index 2aa98f9..9312b05 100644
--- a/xds/internal/balancer/edsbalancer/eds.go
+++ b/xds/internal/balancer/edsbalancer/eds.go
@@ -52,17 +52,16 @@
type edsBalancerBuilder struct{}
// Build helps implement the balancer.Builder interface.
-func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
+func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
x := &edsBalancer{
cc: cc,
- buildOpts: opts,
closed: grpcsync.NewEvent(),
grpcUpdate: make(chan interface{}),
xdsClientUpdate: make(chan *edsUpdate),
childPolicyUpdate: buffer.NewUnbounded(),
}
x.logger = prefixLogger((x))
- x.client = newXDSClientWrapper(x.handleEDSUpdate, x.buildOpts, x.logger)
+ x.client = newXDSClientWrapper(x.handleEDSUpdate, x.logger)
x.edsImpl = newEDSBalancer(x.cc, x.enqueueChildBalancerState, x.client, x.logger)
x.logger.Infof("Created")
go x.run()
@@ -103,10 +102,9 @@
//
// It currently has only an edsBalancer. Later, we may add fallback.
type edsBalancer struct {
- cc balancer.ClientConn // *xdsClientConn
- buildOpts balancer.BuildOptions
- closed *grpcsync.Event
- logger *grpclog.PrefixLogger
+ cc balancer.ClientConn // *xdsClientConn
+ closed *grpcsync.Event
+ logger *grpclog.PrefixLogger
// edsBalancer continuously monitor the channels below, and will handle events from them in sync.
grpcUpdate chan interface{}
diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go
index 62075ae..085310d 100644
--- a/xds/internal/balancer/edsbalancer/eds_test.go
+++ b/xds/internal/balancer/edsbalancer/eds_test.go
@@ -30,8 +30,9 @@
"github.com/golang/protobuf/jsonpb"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/go-cmp/cmp"
+ "google.golang.org/grpc/attributes"
+ xdsinternal "google.golang.org/grpc/xds/internal"
- "google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
@@ -41,8 +42,6 @@
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
xdsclient "google.golang.org/grpc/xds/internal/client"
- "google.golang.org/grpc/xds/internal/client/bootstrap"
- xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
_ "google.golang.org/grpc/xds/internal/client/v2" // V2 client registration.
@@ -52,14 +51,6 @@
func init() {
balancer.Register(&edsBalancerBuilder{})
-
- bootstrapConfigNew = func() (*bootstrap.Config, error) {
- return &bootstrap.Config{
- BalancerName: testBalancerNameFooBar,
- Creds: grpc.WithInsecure(),
- NodeProto: xdstestutils.EmptyNodeProtoV2,
- }, nil
- }
}
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
@@ -182,32 +173,6 @@
func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") }
func (*fakeSubConn) Connect() { panic("implement me") }
-// waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created
-// with the provided name. It also make sure that the newly created client
-// registers an eds watcher.
-func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakeclient.Client {
- t.Helper()
-
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- val, err := ch.Receive(ctx)
- if err != nil {
- t.Fatalf("error when waiting for a new xds client: %v", err)
- return nil
- }
- xdsC := val.(*fakeclient.Client)
- if xdsC.Name() != wantName {
- t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName)
- return nil
- }
- _, err = xdsC.WaitForWatchEDS(ctx)
- if err != nil {
- t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
- return nil
- }
- return xdsC
-}
-
// waitForNewEDSLB makes sure that a new edsLB is created by the top-level
// edsBalancer.
func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer {
@@ -227,70 +192,15 @@
// edsLB, creates fake version of them and makes them available on the provided
// channels. The returned cancel function should be called by the test for
// cleanup.
-func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() {
+func setup(edsLBCh *testutils.Channel) func() {
origNewEDSBalancer := newEDSBalancer
newEDSBalancer = func(cc balancer.ClientConn, enqueue func(priorityType, balancer.State), _ *xdsClientWrapper, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
edsLB := newFakeEDSBalancer(cc)
defer func() { edsLBCh.Send(edsLB) }()
return edsLB
}
-
- origXdsClientNew := xdsclientNew
- xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
- xdsC := fakeclient.NewClientWithName(opts.Config.BalancerName)
- defer func() { xdsClientCh.Send(xdsC) }()
- return xdsC, nil
- }
return func() {
newEDSBalancer = origNewEDSBalancer
- xdsclientNew = origXdsClientNew
- }
-}
-
-// TestXDSConfigBalancerNameUpdate verifies different scenarios where the
-// balancer name in the lbConfig is updated.
-//
-// The test does the following:
-// * Builds a new xds balancer.
-// * Repeatedly pushes new ClientConnState which specifies different
-// balancerName in the lbConfig. We expect xdsClient objects to created
-// whenever the balancerName changes.
-func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) {
- oldBootstrapConfigNew := bootstrapConfigNew
- bootstrapConfigNew = func() (*bootstrap.Config, error) {
- // Return an error from bootstrap, so the eds balancer will use
- // BalancerName from the config.
- //
- // TODO: remove this when deleting BalancerName from config.
- return nil, fmt.Errorf("no bootstrap available")
- }
- defer func() { bootstrapConfigNew = oldBootstrapConfigNew }()
- edsLBCh := testutils.NewChannel()
- xdsClientCh := testutils.NewChannel()
- cancel := setup(edsLBCh, xdsClientCh)
- defer cancel()
-
- builder := balancer.Get(edsName)
- cc := newNoopTestClientConn()
- edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
- if !ok {
- t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
- }
- defer edsB.Close()
-
- addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
- for i := 0; i < 2; i++ {
- balancerName := fmt.Sprintf("balancer-%d", i)
- edsB.UpdateClientConnState(balancer.ClientConnState{
- ResolverState: resolver.State{Addresses: addrs},
- BalancerConfig: &EDSConfig{
- BalancerName: balancerName,
- EDSServiceName: testEDSClusterName,
- },
- })
-
- xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, balancerName)
- xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
}
}
@@ -351,9 +261,9 @@
// This time around, we expect no new xdsClient or edsLB to be created.
// Instead, we expect the existing edsLB to receive the new child policy.
func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
+ xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
edsLBCh := testutils.NewChannel()
- xdsClientCh := testutils.NewChannel()
- cancel := setup(edsLBCh, xdsClientCh)
+ cancel := setup(edsLBCh)
defer cancel()
builder := balancer.Get(edsName)
@@ -365,8 +275,8 @@
defer edsB.Close()
edsB.UpdateClientConnState(balancer.ClientConnState{
+ ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
BalancerConfig: &EDSConfig{
- BalancerName: testBalancerNameFooBar,
ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerA,
Config: json.RawMessage("{}"),
@@ -374,7 +284,12 @@
EDSServiceName: testEDSClusterName,
},
})
- xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
+
+ ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer ctxCancel()
+ if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
+ t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
+ }
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
edsLB.waitForChildPolicy(&loadBalancingConfig{
@@ -383,8 +298,8 @@
})
edsB.UpdateClientConnState(balancer.ClientConnState{
+ ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
BalancerConfig: &EDSConfig{
- BalancerName: testBalancerNameFooBar,
ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerB,
Config: json.RawMessage("{}"),
@@ -401,9 +316,9 @@
// TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on
// the subConnStateChange to appropriate child balancers.
func (s) TestXDSSubConnStateChange(t *testing.T) {
+ xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
edsLBCh := testutils.NewChannel()
- xdsClientCh := testutils.NewChannel()
- cancel := setup(edsLBCh, xdsClientCh)
+ cancel := setup(edsLBCh)
defer cancel()
builder := balancer.Get(edsName)
@@ -414,16 +329,16 @@
}
defer edsB.Close()
- addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
edsB.UpdateClientConnState(balancer.ClientConnState{
- ResolverState: resolver.State{Addresses: addrs},
- BalancerConfig: &EDSConfig{
- BalancerName: testBalancerNameFooBar,
- EDSServiceName: testEDSClusterName,
- },
+ ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
+ BalancerConfig: &EDSConfig{EDSServiceName: testEDSClusterName},
})
- xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
+ ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer ctxCancel()
+ if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
+ t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
+ }
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
@@ -442,9 +357,9 @@
// If it's connection error, nothing will happen. This will need to change to
// handle fallback.
func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
+ xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
edsLBCh := testutils.NewChannel()
- xdsClientCh := testutils.NewChannel()
- cancel := setup(edsLBCh, xdsClientCh)
+ cancel := setup(edsLBCh)
defer cancel()
builder := balancer.Get(edsName)
@@ -456,15 +371,17 @@
defer edsB.Close()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
- BalancerConfig: &EDSConfig{
- BalancerName: testBalancerNameFooBar,
- EDSServiceName: testEDSClusterName,
- },
+ ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
+ BalancerConfig: &EDSConfig{EDSServiceName: testEDSClusterName},
}); err != nil {
t.Fatal(err)
}
- xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
+ ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer ctxCancel()
+ if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
+ t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
+ }
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
@@ -474,8 +391,6 @@
connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, connectionErr)
- ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer ctxCancel()
if err := xdsC.WaitForCancelEDSWatch(ctx); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
@@ -506,9 +421,9 @@
// If it's connection error, nothing will happen. This will need to change to
// handle fallback.
func (s) TestErrorFromResolver(t *testing.T) {
+ xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
edsLBCh := testutils.NewChannel()
- xdsClientCh := testutils.NewChannel()
- cancel := setup(edsLBCh, xdsClientCh)
+ cancel := setup(edsLBCh)
defer cancel()
builder := balancer.Get(edsName)
@@ -520,15 +435,17 @@
defer edsB.Close()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
- BalancerConfig: &EDSConfig{
- BalancerName: testBalancerNameFooBar,
- EDSServiceName: testEDSClusterName,
- },
+ ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
+ BalancerConfig: &EDSConfig{EDSServiceName: testEDSClusterName},
}); err != nil {
t.Fatal(err)
}
- xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
+ ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer ctxCancel()
+ if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
+ t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
+ }
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
@@ -538,8 +455,6 @@
connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
edsB.ResolverError(connectionErr)
- ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer ctxCancel()
if err := xdsC.WaitForCancelEDSWatch(ctx); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
@@ -610,7 +525,6 @@
name: "manually-generated",
js: json.RawMessage(`
{
- "balancerName": "fake.foo.bar",
"childPolicy": [
{"fake_balancer_C": {}},
{"fake_balancer_A": {}},
@@ -625,7 +539,6 @@
"lrsLoadReportingServerName": "lrs.server"
}`),
want: &EDSConfig{
- BalancerName: "fake.foo.bar",
ChildPolicy: &loadBalancingConfig{
Name: "fake_balancer_A",
Config: json.RawMessage("{}"),
@@ -645,11 +558,9 @@
name: "no-lrs-server-name",
js: json.RawMessage(`
{
- "balancerName": "fake.foo.bar",
"edsServiceName": "eds.service"
}`),
want: &EDSConfig{
- BalancerName: "fake.foo.bar",
EDSServiceName: testEDSName,
LrsLoadReportingServerName: nil,
},
diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
index fe4e996..1646601 100644
--- a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
+++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
@@ -22,13 +22,10 @@
"fmt"
"sync"
- "google.golang.org/grpc"
"google.golang.org/grpc/attributes"
- "google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/grpclog"
xdsinternal "google.golang.org/grpc/xds/internal"
xdsclient "google.golang.org/grpc/xds/internal/client"
- "google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/client/load"
)
@@ -40,13 +37,6 @@
Close()
}
-var (
- xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
- return xdsclient.New(opts)
- }
- bootstrapConfigNew = bootstrap.NewConfig
-)
-
type loadStoreWrapper struct {
mu sync.RWMutex
service string
@@ -110,9 +100,7 @@
logger *grpclog.PrefixLogger
newEDSUpdate func(xdsclient.EndpointsUpdate, error)
- bbo balancer.BuildOptions
- balancerName string
// xdsClient could come from attributes, or created with balancerName.
xdsClient xdsClientInterface
@@ -138,38 +126,14 @@
//
// The given callbacks won't be called until the underlying xds_client is
// working and sends updates.
-func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate, error), bbo balancer.BuildOptions, logger *grpclog.PrefixLogger) *xdsClientWrapper {
+func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate, error), logger *grpclog.PrefixLogger) *xdsClientWrapper {
return &xdsClientWrapper{
logger: logger,
newEDSUpdate: newEDSUpdate,
- bbo: bbo,
loadWrapper: &loadStoreWrapper{},
}
}
-// replaceXDSClient replaces xdsClient fields to the newClient if they are
-// different. If xdsClient is replaced, the balancerName field will also be
-// updated to newBalancerName.
-//
-// If the old xdsClient is replaced, and was created locally (not from
-// attributes), it will be closed.
-//
-// It returns whether xdsClient is replaced.
-func (c *xdsClientWrapper) replaceXDSClient(newClient xdsClientInterface, newBalancerName string) bool {
- if c.xdsClient == newClient {
- return false
- }
- oldClient := c.xdsClient
- oldBalancerName := c.balancerName
- c.xdsClient = newClient
- c.balancerName = newBalancerName
- if oldBalancerName != "" {
- // OldBalancerName!="" means if the old client was not from attributes.
- oldClient.Close()
- }
- return true
-}
-
// updateXDSClient sets xdsClient in wrapper to the correct one based on the
// attributes and service config.
//
@@ -184,44 +148,26 @@
// the balancerName (from bootstrap file or from service config) changed.
// - if balancer names are the same, do nothing, and return false
// - if balancer names are different, create new one, and return true
-func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.Attributes) (bool, error) {
- if attr != nil {
- if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
- // This will also clear balancerName, to indicate that client is
- // from attributes.
- return c.replaceXDSClient(clientFromAttr, ""), nil
- }
+func (c *xdsClientWrapper) updateXDSClient(attr *attributes.Attributes) (bool, error) {
+ if attr == nil {
+ return false, fmt.Errorf("unexported nil attributes, want attributes with xdsClient")
+ }
+ // TODO: change the way xdsClient is retrieved from attributes. One option
+ // is to add helper functions.
+ //
+ // Or, since xdsClient will become a singleton, this can just call
+ // xdsclient.New() instead. And if we decide to do this, do it in Build
+ // instead of when handling updates.
+ clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface)
+ if clientFromAttr == nil {
+ return false, fmt.Errorf("no xdsClient found in attributes")
}
- clientConfig, err := bootstrapConfigNew()
- if err != nil {
- // TODO: propagate this error to ClientConn, and fail RPCs if necessary.
- clientConfig = &bootstrap.Config{BalancerName: config.BalancerName}
- }
-
- if c.balancerName == clientConfig.BalancerName {
+ if c.xdsClient == clientFromAttr {
return false, nil
}
-
- var dopts []grpc.DialOption
- if dialer := c.bbo.Dialer; dialer != nil {
- dopts = []grpc.DialOption{grpc.WithContextDialer(dialer)}
- }
-
- // TODO: there's no longer a need to read bootstrap file and create a new
- // xds client. The EDS balancer should always get the xds client from
- // attributes. Otherwise, this function should just fail. Also, xdsclient
- // will be shared by multiple clients, so trying to make an xds client is
- // just the wrong move.
- newClient, err := xdsclientNew(xdsclient.Options{Config: *clientConfig, DialOpts: dopts})
- if err != nil {
- // This should never fail. xdsclientnew does a non-blocking dial, and
- // all the config passed in should be validated.
- //
- // This could leave c.xdsClient as nil if this is the first update.
- return false, fmt.Errorf("eds: failed to create xdsClient, error: %v", err)
- }
- return c.replaceXDSClient(newClient, clientConfig.BalancerName), nil
+ c.xdsClient = clientFromAttr
+ return true, nil
}
// startEndpointsWatch starts the EDS watch. Caller can call this when the
@@ -276,7 +222,7 @@
// handleUpdate applies the service config and attributes updates to the client,
// including updating the xds_client to use, and updating the EDS name to watch.
func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) error {
- clientChanged, err := c.updateXDSClient(config, attr)
+ clientChanged, err := c.updateXDSClient(attr)
if err != nil {
return err
}
@@ -320,10 +266,6 @@
func (c *xdsClientWrapper) close() {
c.cancelWatch()
- if c.xdsClient != nil && c.balancerName != "" {
- // Only close xdsClient if it's not from attributes.
- c.xdsClient.Close()
- }
}
// equalStringPointers returns true if
diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper_test.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper_test.go
index c8dac9c..38162d1 100644
--- a/xds/internal/balancer/edsbalancer/xds_client_wrapper_test.go
+++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper_test.go
@@ -24,22 +24,12 @@
"fmt"
"testing"
- xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
- "github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
-
- "google.golang.org/grpc"
"google.golang.org/grpc/attributes"
- "google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/testutils"
- "google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
xdsclient "google.golang.org/grpc/xds/internal/client"
- "google.golang.org/grpc/xds/internal/client/bootstrap"
- xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
- "google.golang.org/grpc/xds/internal/testutils/fakeserver"
- "google.golang.org/grpc/xds/internal/version"
)
var (
@@ -49,28 +39,25 @@
// Given a list of resource names, verifies that EDS requests for the same are
// received at the fake server.
-func verifyExpectedRequests(fs *fakeserver.Server, resourceNames ...string) error {
- wantReq := &xdspb.DiscoveryRequest{
- TypeUrl: version.V2EndpointsURL,
- Node: xdstestutils.EmptyNodeProtoV2,
- }
+func verifyExpectedRequests(fc *fakeclient.Client, resourceNames ...string) error {
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
for _, name := range resourceNames {
- if name != "" {
- wantReq.ResourceNames = []string{name}
+ if name == "" {
+ // ResourceName empty string indicates a cancel.
+ if err := fc.WaitForCancelEDSWatch(ctx); err != nil {
+ return fmt.Errorf("timed out when expecting resource %q", name)
+ }
+ return nil
}
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- req, err := fs.XDSRequestChan.Receive(ctx)
+ resName, err := fc.WaitForWatchEDS(ctx)
if err != nil {
- return fmt.Errorf("timed out when expecting request {%+v} at fake server", wantReq)
+ return fmt.Errorf("timed out when expecting resource %q, %p", name, fc)
}
- edsReq := req.(*fakeserver.Request)
- if edsReq.Err != nil {
- return fmt.Errorf("eds RPC failed with err: %v", edsReq.Err)
- }
- if !proto.Equal(edsReq.Req, wantReq) {
- return fmt.Errorf("got EDS request %v, expected: %v, diff: %s", edsReq.Req, wantReq, cmp.Diff(edsReq.Req, wantReq, cmp.Comparer(proto.Equal)))
+ if resName != name {
+ return fmt.Errorf("got EDS request for resource %q, expected: %q", resName, name)
}
}
return nil
@@ -86,34 +73,16 @@
// * Sends updates with different edsServiceNames and expects new watches to be
// registered.
func (s) TestClientWrapperWatchEDS(t *testing.T) {
- fakeServer, cleanup, err := fakeserver.StartServer()
- if err != nil {
- t.Fatalf("Failed to start fake xDS server: %v", err)
- }
- defer cleanup()
- t.Logf("Started fake xDS server at %s...", fakeServer.Address)
+ xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
- cw := newXDSClientWrapper(nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
+ cw := newXDSClientWrapper(nil, nil)
defer cw.close()
t.Logf("Started xDS client wrapper for endpoint %s...", testServiceName)
- oldBootstrapConfigNew := bootstrapConfigNew
- bootstrapConfigNew = func() (*bootstrap.Config, error) {
- return &bootstrap.Config{
- BalancerName: fakeServer.Address,
- Creds: grpc.WithInsecure(),
- NodeProto: xdstestutils.EmptyNodeProtoV2,
- }, nil
- }
- defer func() { bootstrapConfigNew = oldBootstrapConfigNew }()
-
// Update with an non-empty edsServiceName should trigger an EDS watch for
// the same.
- cw.handleUpdate(&EDSConfig{
- BalancerName: fakeServer.Address,
- EDSServiceName: "foobar-1",
- }, nil)
- if err := verifyExpectedRequests(fakeServer, "foobar-1"); err != nil {
+ cw.handleUpdate(&EDSConfig{EDSServiceName: "foobar-1"}, attributes.New(xdsinternal.XDSClientID, xdsC))
+ if err := verifyExpectedRequests(xdsC, "foobar-1"); err != nil {
t.Fatal(err)
}
@@ -121,11 +90,8 @@
// name to another, and make sure a new watch is registered. The previously
// registered watch will be cancelled, which will result in an EDS request
// with no resource names being sent to the server.
- cw.handleUpdate(&EDSConfig{
- BalancerName: fakeServer.Address,
- EDSServiceName: "foobar-2",
- }, nil)
- if err := verifyExpectedRequests(fakeServer, "", "foobar-2"); err != nil {
+ cw.handleUpdate(&EDSConfig{EDSServiceName: "foobar-2"}, attributes.New(xdsinternal.XDSClientID, xdsC))
+ if err := verifyExpectedRequests(xdsC, "", "foobar-2"); err != nil {
t.Fatal(err)
}
}
@@ -146,7 +112,7 @@
edsRespChan.Send(&edsUpdate{resp: update, err: err})
}
- cw := newXDSClientWrapper(newEDS, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
+ cw := newXDSClientWrapper(newEDS, nil)
defer cw.close()
xdsC := fakeclient.NewClient()
@@ -184,14 +150,7 @@
// clientWrapper receives the xdsClient to use in the attributes section of the
// update.
func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) {
- oldxdsclientNew := xdsclientNew
- xdsclientNew = func(_ xdsclient.Options) (xdsClientInterface, error) {
- t.Fatalf("unexpected call to xdsclientNew when xds_client is set in attributes")
- return nil, nil
- }
- defer func() { xdsclientNew = oldxdsclientNew }()
-
- cw := newXDSClientWrapper(nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
+ cw := newXDSClientWrapper(nil, nil)
defer cw.close()
xdsC1 := fakeclient.NewClient()