xds: make xds client a singleton (#4015)
- xdsclient.New() no longer takes any input, all configs are from bootstrap file
- added a NewForTesting()
- The returned *Client is a wrapper of the underlying client implementation, for ref-couting
- xds-resolver and xds-server no longer calls bootstrap.NewConfig. It only calls xdsclient.New()
diff --git a/xds/internal/client/bootstrap/bootstrap.go b/xds/internal/client/bootstrap/bootstrap.go
index 789d1d0..ca7c96c 100644
--- a/xds/internal/client/bootstrap/bootstrap.go
+++ b/xds/internal/client/bootstrap/bootstrap.go
@@ -57,10 +57,10 @@
var bootstrapFileReadFunc = ioutil.ReadFile
// Config provides the xDS client with several key bits of information that it
-// requires in its interaction with an xDS server. The Config is initialized
-// from the bootstrap file.
+// requires in its interaction with the management server. The Config is
+// initialized from the bootstrap file.
type Config struct {
- // BalancerName is the name of the xDS server to connect to.
+ // BalancerName is the name of the management server to connect to.
//
// The bootstrap file contains a list of servers (with name+creds), but we
// pick the first one.
@@ -96,7 +96,7 @@
// The format of the bootstrap file will be as follows:
// {
// "xds_server": {
-// "server_uri": <string containing URI of xds server>,
+// "server_uri": <string containing URI of management server>,
// "channel_creds": [
// {
// "type": <string containing channel cred type>,
@@ -168,7 +168,7 @@
return nil, fmt.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err)
}
if len(servers) < 1 {
- return nil, fmt.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any xds server to connect to")
+ return nil, fmt.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any management server to connect to")
}
xs := servers[0]
config.BalancerName = xs.ServerURI
diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go
index 5497fbd..aa715f5 100644
--- a/xds/internal/client/client.go
+++ b/xds/internal/client/client.go
@@ -277,28 +277,6 @@
Localities []Locality
}
-// Options provides all parameters required for the creation of an xDS client.
-type Options struct {
- // Config contains a fully populated bootstrap config. It is the
- // responsibility of the caller to use some sane defaults here if the
- // bootstrap process returned with certain fields left unspecified.
- Config bootstrap.Config
- // DialOpts contains dial options to be used when dialing the xDS server.
- DialOpts []grpc.DialOption
- // TargetName is the target of the parent ClientConn.
- TargetName string
- // WatchExpiryTimeout is the amount of time the client is willing to wait
- // for the first response from the server for any resource being watched.
- // Expiry will not cause cancellation of the watch. It will only trigger the
- // invocation of the registered callback and it is left up to the caller to
- // decide whether or not they want to cancel the watch.
- //
- // If this field is left unspecified, a default value of 15 seconds will be
- // used. This is based on the default value of the initial_fetch_timeout
- // field in corepb.ConfigSource proto.
- WatchExpiryTimeout time.Duration
-}
-
// Function to be overridden in tests.
var newAPIClient = func(apiVersion version.TransportAPI, cc *grpc.ClientConn, opts BuildOptions) (APIClient, error) {
cb := getAPIClientBuilder(apiVersion)
@@ -308,23 +286,19 @@
return cb.Build(cc, opts)
}
-// Client is a full fledged gRPC client which queries a set of discovery APIs
-// (collectively termed as xDS) on a remote management server, to discover
-// various dynamic resources.
-//
-// A single client object will be shared by the xds resolver and balancer
-// implementations. But the same client can only be shared by the same parent
-// ClientConn.
+// clientImpl is the real implementation of the xds client. The exported Client
+// is a wrapper of this struct with a ref count.
//
// Implements UpdateHandler interface.
// TODO(easwars): Make a wrapper struct which implements this interface in the
// style of ccBalancerWrapper so that the Client type does not implement these
// exported methods.
-type Client struct {
- done *grpcsync.Event
- opts Options
- cc *grpc.ClientConn // Connection to the xDS server
- apiClient APIClient
+type clientImpl struct {
+ done *grpcsync.Event
+ config *bootstrap.Config
+ cc *grpc.ClientConn // Connection to the management server.
+ apiClient APIClient
+ watchExpiryTimeout time.Duration
logger *grpclog.PrefixLogger
@@ -345,46 +319,40 @@
lrsClients map[string]*lrsClient
}
-// New returns a new xdsClient configured with opts.
-func New(opts Options) (*Client, error) {
+// newWithConfig returns a new xdsClient with the given config.
+func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration) (*clientImpl, error) {
switch {
- case opts.Config.BalancerName == "":
+ case config.BalancerName == "":
return nil, errors.New("xds: no xds_server name provided in options")
- case opts.Config.Creds == nil:
+ case config.Creds == nil:
return nil, errors.New("xds: no credentials provided in options")
- case opts.Config.NodeProto == nil:
+ case config.NodeProto == nil:
return nil, errors.New("xds: no node_proto provided in options")
}
- switch opts.Config.TransportAPI {
+ switch config.TransportAPI {
case version.TransportV2:
- if _, ok := opts.Config.NodeProto.(*v2corepb.Node); !ok {
- return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", opts.Config.NodeProto, opts.Config.TransportAPI)
+ if _, ok := config.NodeProto.(*v2corepb.Node); !ok {
+ return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", config.NodeProto, config.TransportAPI)
}
case version.TransportV3:
- if _, ok := opts.Config.NodeProto.(*v3corepb.Node); !ok {
- return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", opts.Config.NodeProto, opts.Config.TransportAPI)
+ if _, ok := config.NodeProto.(*v3corepb.Node); !ok {
+ return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", config.NodeProto, config.TransportAPI)
}
}
dopts := []grpc.DialOption{
- opts.Config.Creds,
+ config.Creds,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 5 * time.Minute,
Timeout: 20 * time.Second,
}),
}
- dopts = append(dopts, opts.DialOpts...)
- if opts.WatchExpiryTimeout == 0 {
- // This is based on the default value of the initial_fetch_timeout field
- // in corepb.ConfigSource proto.
- opts.WatchExpiryTimeout = 15 * time.Second
- }
-
- c := &Client{
- done: grpcsync.NewEvent(),
- opts: opts,
+ c := &clientImpl{
+ done: grpcsync.NewEvent(),
+ config: config,
+ watchExpiryTimeout: watchExpiryTimeout,
updateCh: buffer.NewUnbounded(),
ldsWatchers: make(map[string]map[*watchInfo]bool),
@@ -398,18 +366,18 @@
lrsClients: make(map[string]*lrsClient),
}
- cc, err := grpc.Dial(opts.Config.BalancerName, dopts...)
+ cc, err := grpc.Dial(config.BalancerName, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
- return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err)
+ return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", config.BalancerName, err)
}
c.cc = cc
c.logger = prefixLogger((c))
- c.logger.Infof("Created ClientConn to xDS server: %s", opts.Config.BalancerName)
+ c.logger.Infof("Created ClientConn to xDS management server: %s", config.BalancerName)
- apiClient, err := newAPIClient(opts.Config.TransportAPI, cc, BuildOptions{
+ apiClient, err := newAPIClient(config.TransportAPI, cc, BuildOptions{
Parent: c,
- NodeProto: opts.Config.NodeProto,
+ NodeProto: config.NodeProto,
Backoff: backoff.DefaultExponential.Backoff,
Logger: c.logger,
})
@@ -426,7 +394,7 @@
// "certificate_providers" field of the bootstrap file. The key in the returned
// map is the plugin_instance_name. Callers must not modify the returned map.
func (c *Client) CertProviderConfigs() map[string]*certprovider.BuildableConfig {
- return c.opts.Config.CertProviderConfigs
+ return c.config.CertProviderConfigs
}
// run is a goroutine for all the callbacks.
@@ -435,7 +403,7 @@
// goroutine, the callback will be called inline, which might cause a deadlock
// in user's code. Callbacks also cannot be simple `go callback()` because the
// order matters.
-func (c *Client) run() {
+func (c *clientImpl) run() {
for {
select {
case t := <-c.updateCh.Get():
@@ -450,8 +418,8 @@
}
}
-// Close closes the gRPC connection to the xDS server.
-func (c *Client) Close() {
+// Close closes the gRPC connection to the management server.
+func (c *clientImpl) Close() {
if c.done.HasFired() {
return
}
diff --git a/xds/internal/client/client_callback.go b/xds/internal/client/client_callback.go
index a135dae..90ffcde 100644
--- a/xds/internal/client/client_callback.go
+++ b/xds/internal/client/client_callback.go
@@ -26,7 +26,7 @@
// scheduleCallback should only be called by methods of watchInfo, which checks
// for watcher states and maintain consistency.
-func (c *Client) scheduleCallback(wi *watchInfo, update interface{}, err error) {
+func (c *clientImpl) scheduleCallback(wi *watchInfo, update interface{}, err error) {
c.updateCh.Put(&watcherInfoWithUpdate{
wi: wi,
update: update,
@@ -34,7 +34,7 @@
})
}
-func (c *Client) callCallback(wiu *watcherInfoWithUpdate) {
+func (c *clientImpl) callCallback(wiu *watcherInfoWithUpdate) {
c.mu.Lock()
// Use a closure to capture the callback and type assertion, to save one
// more switch case.
@@ -74,7 +74,7 @@
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
-func (c *Client) NewListeners(updates map[string]ListenerUpdate) {
+func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -109,7 +109,7 @@
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
-func (c *Client) NewRouteConfigs(updates map[string]RouteConfigUpdate) {
+func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -130,7 +130,7 @@
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
-func (c *Client) NewClusters(updates map[string]ClusterUpdate) {
+func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -165,7 +165,7 @@
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
-func (c *Client) NewEndpoints(updates map[string]EndpointsUpdate) {
+func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate) {
c.mu.Lock()
defer c.mu.Unlock()
diff --git a/xds/internal/client/client_loadreport.go b/xds/internal/client/client_loadreport.go
index e91316b..be42a6e 100644
--- a/xds/internal/client/client_loadreport.go
+++ b/xds/internal/client/client_loadreport.go
@@ -25,7 +25,7 @@
)
// ReportLoad starts an load reporting stream to the given server. If the server
-// is not an empty string, and is different from the xds server, a new
+// is not an empty string, and is different from the management server, a new
// ClientConn will be created.
//
// The same options used for creating the Client will be used (including
@@ -33,7 +33,7 @@
//
// It returns a Store for the user to report loads, a function to cancel the
// load reporting stream.
-func (c *Client) ReportLoad(server string) (*load.Store, func()) {
+func (c *clientImpl) ReportLoad(server string) (*load.Store, func()) {
c.lrsMu.Lock()
defer c.lrsMu.Unlock()
@@ -58,20 +58,21 @@
}
// lrsClient maps to one lrsServer. It contains:
-// - a ClientConn to this server (only if it's different from the xds server)
+// - a ClientConn to this server (only if it's different from the management
+// server)
// - a load.Store that contains loads only for this server
type lrsClient struct {
- parent *Client
+ parent *clientImpl
server string
- cc *grpc.ClientConn // nil if the server is same as the xds server
+ cc *grpc.ClientConn // nil if the server is same as the management server
refCount int
cancelStream func()
loadStore *load.Store
}
// newLRSClient creates a new LRS stream to the server.
-func newLRSClient(parent *Client, server string) *lrsClient {
+func newLRSClient(parent *clientImpl, server string) *lrsClient {
return &lrsClient{
parent: parent,
server: server,
@@ -109,18 +110,17 @@
}
// startStream starts the LRS stream to the server. If server is not the same
-// xDS server from the parent, it also creates a ClientConn.
+// management server from the parent, it also creates a ClientConn.
func (lrsC *lrsClient) startStream() {
var cc *grpc.ClientConn
lrsC.parent.logger.Infof("Starting load report to server: %s", lrsC.server)
- if lrsC.server == "" || lrsC.server == lrsC.parent.opts.Config.BalancerName {
+ if lrsC.server == "" || lrsC.server == lrsC.parent.config.BalancerName {
// Reuse the xDS client if server is the same.
cc = lrsC.parent.cc
} else {
- lrsC.parent.logger.Infof("LRS server is different from xDS server, starting a new ClientConn")
- dopts := append([]grpc.DialOption{lrsC.parent.opts.Config.Creds}, lrsC.parent.opts.DialOpts...)
- ccNew, err := grpc.Dial(lrsC.server, dopts...)
+ lrsC.parent.logger.Infof("LRS server is different from management server, starting a new ClientConn")
+ ccNew, err := grpc.Dial(lrsC.server, lrsC.parent.config.Creds)
if err != nil {
// An error from a non-blocking dial indicates something serious.
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
diff --git a/xds/internal/client/client_loadreport_test.go b/xds/internal/client/client_loadreport_test.go
index d426247..c9a2709 100644
--- a/xds/internal/client/client_loadreport_test.go
+++ b/xds/internal/client/client_loadreport_test.go
@@ -44,6 +44,8 @@
const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
+
+ defaultClientWatchExpiryTimeout = 15 * time.Second
)
type s struct {
@@ -61,14 +63,12 @@
}
defer sCleanup()
- xdsC, err := client.New(client.Options{
- Config: bootstrap.Config{
- BalancerName: fs.Address,
- Creds: grpc.WithInsecure(),
- NodeProto: &v2corepb.Node{},
- TransportAPI: version.TransportV2,
- },
- })
+ xdsC, err := client.NewWithConfigForTesting(&bootstrap.Config{
+ BalancerName: fs.Address,
+ Creds: grpc.WithInsecure(),
+ NodeProto: &v2corepb.Node{},
+ TransportAPI: version.TransportV2,
+ }, defaultClientWatchExpiryTimeout)
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
diff --git a/xds/internal/client/client_logging.go b/xds/internal/client/client_logging.go
index a47e524..bff3fb1 100644
--- a/xds/internal/client/client_logging.go
+++ b/xds/internal/client/client_logging.go
@@ -29,6 +29,6 @@
var logger = grpclog.Component("xds")
-func prefixLogger(p *Client) *internalgrpclog.PrefixLogger {
+func prefixLogger(p *clientImpl) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
}
diff --git a/xds/internal/client/client_singleton.go b/xds/internal/client/client_singleton.go
new file mode 100644
index 0000000..5d92b41
--- /dev/null
+++ b/xds/internal/client/client_singleton.go
@@ -0,0 +1,101 @@
+/*
+ *
+ * Copyright 2020 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package client
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "google.golang.org/grpc/xds/internal/client/bootstrap"
+)
+
+const defaultWatchExpiryTimeout = 15 * time.Second
+
+// This is the Client returned by New(). It contains one client implementation,
+// and maintains the refcount.
+var singletonClient = &Client{}
+
+// To override in tests.
+var bootstrapNewConfig = bootstrap.NewConfig
+
+// Client is a full fledged gRPC client which queries a set of discovery APIs
+// (collectively termed as xDS) on a remote management server, to discover
+// various dynamic resources.
+//
+// The xds client is a singleton. It will be shared by the xds resolver and
+// balancer implementations, across multiple ClientConns and Servers.
+type Client struct {
+ *clientImpl
+
+ // This mu protects all the fields, including the embedded clientImpl above.
+ mu sync.Mutex
+ refCount int
+}
+
+// New returns a new xdsClient configured by the bootstrap file specified in env
+// variable GRPC_XDS_BOOTSTRAP.
+func New() (*Client, error) {
+ singletonClient.mu.Lock()
+ defer singletonClient.mu.Unlock()
+ // If the client implementation was created, increment ref count and return
+ // the client.
+ if singletonClient.clientImpl != nil {
+ singletonClient.refCount++
+ return singletonClient, nil
+ }
+
+ // Create the new client implementation.
+ config, err := bootstrapNewConfig()
+ if err != nil {
+ return nil, fmt.Errorf("xds: failed to read bootstrap file: %v", err)
+ }
+ c, err := newWithConfig(config, defaultWatchExpiryTimeout)
+ if err != nil {
+ return nil, err
+ }
+
+ singletonClient.clientImpl = c
+ singletonClient.refCount++
+ return singletonClient, nil
+}
+
+// Close closes the client. It does ref count of the xds client implementation,
+// and closes the gRPC connection to the management server when ref count
+// reaches 0.
+func (c *Client) Close() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.refCount--
+ if c.refCount == 0 {
+ c.clientImpl.Close()
+ // Set clientImpl back to nil. So if New() is called after this, a new
+ // implementation will be created.
+ c.clientImpl = nil
+ }
+}
+
+// NewWithConfigForTesting is exported for testing only.
+func NewWithConfigForTesting(config *bootstrap.Config, watchExpiryTimeout time.Duration) (*Client, error) {
+ cl, err := newWithConfig(config, watchExpiryTimeout)
+ if err != nil {
+ return nil, err
+ }
+ return &Client{clientImpl: cl, refCount: 1}, nil
+}
diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go
index 932975c..bb17169 100644
--- a/xds/internal/client/client_test.go
+++ b/xds/internal/client/client_test.go
@@ -26,6 +26,7 @@
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
+ "google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/grpctest"
@@ -44,8 +45,7 @@
}
const (
- testXDSServer = "xds-server"
- chanRecvTimeout = 100 * time.Millisecond
+ testXDSServer = "xds-server"
testLDSName = "test-lds"
testRDSName = "test-rds"
@@ -57,22 +57,20 @@
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)
-func clientOpts(balancerName string, overrideWatchExpiryTImeout bool) Options {
- watchExpiryTimeout := time.Duration(0)
- if overrideWatchExpiryTImeout {
+func clientOpts(balancerName string, overrideWatchExpiryTimeout bool) (*bootstrap.Config, time.Duration) {
+ watchExpiryTimeout := defaultWatchExpiryTimeout
+ if overrideWatchExpiryTimeout {
watchExpiryTimeout = defaultTestWatchExpiryTimeout
}
- return Options{
- Config: bootstrap.Config{
- BalancerName: balancerName,
- Creds: grpc.WithInsecure(),
- NodeProto: xdstestutils.EmptyNodeProtoV2,
- },
- WatchExpiryTimeout: watchExpiryTimeout,
- }
+ return &bootstrap.Config{
+ BalancerName: balancerName,
+ Creds: grpc.WithInsecure(),
+ NodeProto: xdstestutils.EmptyNodeProtoV2,
+ }, watchExpiryTimeout
}
type testAPIClient struct {
+ done *grpcsync.Event
addWatches map[ResourceType]*testutils.Channel
removeWatches map[ResourceType]*testutils.Channel
}
@@ -102,6 +100,7 @@
EndpointsResource: testutils.NewChannel(),
}
return &testAPIClient{
+ done: grpcsync.NewEvent(),
addWatches: addWatches,
removeWatches: removeWatches,
}
@@ -118,7 +117,9 @@
func (c *testAPIClient) reportLoad(context.Context, *grpc.ClientConn, loadReportingOptions) {
}
-func (c *testAPIClient) Close() {}
+func (c *testAPIClient) Close() {
+ c.done.Fire()
+}
// TestWatchCallAnotherWatch covers the case where watch() is called inline by a
// callback. It makes sure it doesn't cause a deadlock.
@@ -126,7 +127,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -216,3 +217,104 @@
}
return nil
}
+
+// Test that multiple New() returns the same Client. And only when the last
+// client is closed, the underlying client is closed.
+func (s) TestClientNewSingleton(t *testing.T) {
+ oldBootstrapNewConfig := bootstrapNewConfig
+ bootstrapNewConfig = func() (*bootstrap.Config, error) {
+ return &bootstrap.Config{
+ BalancerName: testXDSServer,
+ Creds: grpc.WithInsecure(),
+ NodeProto: xdstestutils.EmptyNodeProtoV2,
+ }, nil
+ }
+ defer func() { bootstrapNewConfig = oldBootstrapNewConfig }()
+
+ apiClientCh, cleanup := overrideNewAPIClient()
+ defer cleanup()
+
+ // The first New(). Should create a Client and a new APIClient.
+ client, err := New()
+ if err != nil {
+ t.Fatalf("failed to create client: %v", err)
+ }
+ clientImpl := client.clientImpl
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ // Call New() again. They should all return the same client implementation,
+ // and should not create new API client.
+ const count = 9
+ for i := 0; i < count; i++ {
+ tc, terr := New()
+ if terr != nil {
+ client.Close()
+ t.Fatalf("%d-th call to New() failed with error: %v", i, terr)
+ }
+ if tc.clientImpl != clientImpl {
+ client.Close()
+ tc.Close()
+ t.Fatalf("%d-th call to New() got a different client %p, want %p", i, tc.clientImpl, clientImpl)
+ }
+
+ sctx, scancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
+ defer scancel()
+ _, err := apiClientCh.Receive(sctx)
+ if err == nil {
+ client.Close()
+ t.Fatalf("%d-th call to New() created a new API client", i)
+ }
+ }
+
+ // Call Close(). Nothing should be actually closed until the last ref calls
+ // Close().
+ for i := 0; i < count; i++ {
+ client.Close()
+ if clientImpl.done.HasFired() {
+ t.Fatalf("%d-th call to Close(), unexpected done in the client implemenation", i)
+ }
+ if apiClient.done.HasFired() {
+ t.Fatalf("%d-th call to Close(), unexpected done in the API client", i)
+ }
+ }
+
+ // Call the last Close(). The underlying implementation and API Client
+ // should all be closed.
+ client.Close()
+ if !clientImpl.done.HasFired() {
+ t.Fatalf("want client implementation to be closed, got not done")
+ }
+ if !apiClient.done.HasFired() {
+ t.Fatalf("want API client to be closed, got not done")
+ }
+
+ // Call New() again after the previous Client is actually closed. Should
+ // create a Client and a new APIClient.
+ client2, err2 := New()
+ if err2 != nil {
+ t.Fatalf("failed to create client: %v", err)
+ }
+ defer client2.Close()
+ c2, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient2 := c2.(*testAPIClient)
+
+ // The client wrapper with ref count should be the same.
+ if client2 != client {
+ t.Fatalf("New() after Close() should return the same client wrapper, got different %p, %p", client2, client)
+ }
+ if client2.clientImpl == clientImpl {
+ t.Fatalf("New() after Close() should return different client implementation, got the same %p", client2.clientImpl)
+ }
+ if apiClient2 == apiClient {
+ t.Fatalf("New() after Close() should return different API client, got the same %p", apiClient2)
+ }
+}
diff --git a/xds/internal/client/client_watchers.go b/xds/internal/client/client_watchers.go
index a3524fd..31704af 100644
--- a/xds/internal/client/client_watchers.go
+++ b/xds/internal/client/client_watchers.go
@@ -35,7 +35,7 @@
// watchInfo holds all the information from a watch() call.
type watchInfo struct {
- c *Client
+ c *clientImpl
rType ResourceType
target string
@@ -113,7 +113,7 @@
wi.state = watchInfoStateCanceled
}
-func (c *Client) watch(wi *watchInfo) (cancel func()) {
+func (c *clientImpl) watch(wi *watchInfo) (cancel func()) {
c.mu.Lock()
defer c.mu.Unlock()
c.logger.Debugf("new watch for type %v, resource name %v", wi.rType, wi.target)
@@ -208,7 +208,7 @@
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
-func (c *Client) WatchListener(serviceName string, cb func(ListenerUpdate, error)) (cancel func()) {
+func (c *clientImpl) WatchListener(serviceName string, cb func(ListenerUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: ListenerResource,
@@ -216,7 +216,7 @@
ldsCallback: cb,
}
- wi.expiryTimer = time.AfterFunc(c.opts.WatchExpiryTimeout, func() {
+ wi.expiryTimer = time.AfterFunc(c.watchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
@@ -227,7 +227,7 @@
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
-func (c *Client) WatchRouteConfig(routeName string, cb func(RouteConfigUpdate, error)) (cancel func()) {
+func (c *clientImpl) WatchRouteConfig(routeName string, cb func(RouteConfigUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: RouteConfigResource,
@@ -235,7 +235,7 @@
rdsCallback: cb,
}
- wi.expiryTimer = time.AfterFunc(c.opts.WatchExpiryTimeout, func() {
+ wi.expiryTimer = time.AfterFunc(c.watchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
@@ -250,7 +250,7 @@
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
-func (c *Client) WatchCluster(clusterName string, cb func(ClusterUpdate, error)) (cancel func()) {
+func (c *clientImpl) WatchCluster(clusterName string, cb func(ClusterUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: ClusterResource,
@@ -258,7 +258,7 @@
cdsCallback: cb,
}
- wi.expiryTimer = time.AfterFunc(c.opts.WatchExpiryTimeout, func() {
+ wi.expiryTimer = time.AfterFunc(c.watchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
@@ -272,7 +272,7 @@
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
-func (c *Client) WatchEndpoints(clusterName string, cb func(EndpointsUpdate, error)) (cancel func()) {
+func (c *clientImpl) WatchEndpoints(clusterName string, cb func(EndpointsUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: EndpointsResource,
@@ -280,7 +280,7 @@
edsCallback: cb,
}
- wi.expiryTimer = time.AfterFunc(c.opts.WatchExpiryTimeout, func() {
+ wi.expiryTimer = time.AfterFunc(c.watchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
diff --git a/xds/internal/client/client_watchers_cluster_test.go b/xds/internal/client/client_watchers_cluster_test.go
index c512675..b6b8d61 100644
--- a/xds/internal/client/client_watchers_cluster_test.go
+++ b/xds/internal/client/client_watchers_cluster_test.go
@@ -40,7 +40,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -93,7 +93,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -156,7 +156,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -221,7 +221,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -282,7 +282,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, true))
+ client, err := newWithConfig(clientOpts(testXDSServer, true))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -321,7 +321,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, true))
+ client, err := newWithConfig(clientOpts(testXDSServer, true))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -369,7 +369,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
diff --git a/xds/internal/client/client_watchers_endpoints_test.go b/xds/internal/client/client_watchers_endpoints_test.go
index 822ee59..2bf6aba 100644
--- a/xds/internal/client/client_watchers_endpoints_test.go
+++ b/xds/internal/client/client_watchers_endpoints_test.go
@@ -58,7 +58,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -110,7 +110,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -175,7 +175,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -240,7 +240,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -299,7 +299,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, true))
+ client, err := newWithConfig(clientOpts(testXDSServer, true))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
diff --git a/xds/internal/client/client_watchers_listener_test.go b/xds/internal/client/client_watchers_listener_test.go
index 62a0fe4..a233cd3 100644
--- a/xds/internal/client/client_watchers_listener_test.go
+++ b/xds/internal/client/client_watchers_listener_test.go
@@ -38,7 +38,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -91,7 +91,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -157,7 +157,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -223,7 +223,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -285,7 +285,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
diff --git a/xds/internal/client/client_watchers_route_test.go b/xds/internal/client/client_watchers_route_test.go
index 02425d6..ec04383 100644
--- a/xds/internal/client/client_watchers_route_test.go
+++ b/xds/internal/client/client_watchers_route_test.go
@@ -40,7 +40,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -99,7 +99,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -171,7 +171,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
@@ -250,7 +250,7 @@
apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- client, err := New(clientOpts(testXDSServer, false))
+ client, err := newWithConfig(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
diff --git a/xds/internal/client/tests/client_test.go b/xds/internal/client/tests/client_test.go
index 7887835..ae1209a 100644
--- a/xds/internal/client/tests/client_test.go
+++ b/xds/internal/client/tests/client_test.go
@@ -20,6 +20,7 @@
import (
"testing"
+ "time"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/grpctest"
@@ -42,77 +43,67 @@
testXDSServer = "xds-server"
)
-func clientOpts(balancerName string) xdsclient.Options {
- return xdsclient.Options{
- Config: bootstrap.Config{
- BalancerName: balancerName,
- Creds: grpc.WithInsecure(),
- NodeProto: testutils.EmptyNodeProtoV2,
- },
- }
-}
-
func (s) TestNew(t *testing.T) {
tests := []struct {
name string
- opts xdsclient.Options
+ config *bootstrap.Config
wantErr bool
}{
- {name: "empty-opts", opts: xdsclient.Options{}, wantErr: true},
+ {
+ name: "empty-opts",
+ config: &bootstrap.Config{},
+ wantErr: true,
+ },
{
name: "empty-balancer-name",
- opts: xdsclient.Options{
- Config: bootstrap.Config{
- Creds: grpc.WithInsecure(),
- NodeProto: testutils.EmptyNodeProtoV2,
- },
+ config: &bootstrap.Config{
+ Creds: grpc.WithInsecure(),
+ NodeProto: testutils.EmptyNodeProtoV2,
},
wantErr: true,
},
{
name: "empty-dial-creds",
- opts: xdsclient.Options{
- Config: bootstrap.Config{
- BalancerName: testXDSServer,
- NodeProto: testutils.EmptyNodeProtoV2,
- },
+ config: &bootstrap.Config{
+ BalancerName: testXDSServer,
+ NodeProto: testutils.EmptyNodeProtoV2,
},
wantErr: true,
},
{
name: "empty-node-proto",
- opts: xdsclient.Options{
- Config: bootstrap.Config{
- BalancerName: testXDSServer,
- Creds: grpc.WithInsecure(),
- },
+ config: &bootstrap.Config{
+ BalancerName: testXDSServer,
+ Creds: grpc.WithInsecure(),
},
wantErr: true,
},
{
name: "node-proto-version-mismatch",
- opts: xdsclient.Options{
- Config: bootstrap.Config{
- BalancerName: testXDSServer,
- Creds: grpc.WithInsecure(),
- NodeProto: testutils.EmptyNodeProtoV3,
- TransportAPI: version.TransportV2,
- },
+ config: &bootstrap.Config{
+ BalancerName: testXDSServer,
+ Creds: grpc.WithInsecure(),
+ NodeProto: testutils.EmptyNodeProtoV3,
+ TransportAPI: version.TransportV2,
},
wantErr: true,
},
// TODO(easwars): Add cases for v3 API client.
{
name: "happy-case",
- opts: clientOpts(testXDSServer),
+ config: &bootstrap.Config{
+ BalancerName: testXDSServer,
+ Creds: grpc.WithInsecure(),
+ NodeProto: testutils.EmptyNodeProtoV2,
+ },
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- c, err := xdsclient.New(test.opts)
+ c, err := xdsclient.NewWithConfigForTesting(test.config, 15*time.Second)
if (err != nil) != test.wantErr {
- t.Fatalf("New(%+v) = %v, wantErr: %v", test.opts, err, test.wantErr)
+ t.Fatalf("New(%+v) = %v, wantErr: %v", test.config, err, test.wantErr)
}
if c != nil {
c.Close()
diff --git a/xds/internal/client/v2/client.go b/xds/internal/client/v2/client.go
index 433e690..faf9c6e 100644
--- a/xds/internal/client/v2/client.go
+++ b/xds/internal/client/v2/client.go
@@ -181,8 +181,9 @@
return rType, resp.GetVersionInfo(), resp.GetNonce(), err
}
-// handleLDSResponse processes an LDS response received from the xDS server. On
-// receipt of a good response, it also invokes the registered watcher callback.
+// handleLDSResponse processes an LDS response received from the management
+// server. On receipt of a good response, it also invokes the registered watcher
+// callback.
func (v2c *client) handleLDSResponse(resp *v2xdspb.DiscoveryResponse) error {
update, err := xdsclient.UnmarshalListener(resp.GetResources(), v2c.logger)
if err != nil {
@@ -192,9 +193,9 @@
return nil
}
-// handleRDSResponse processes an RDS response received from the xDS server. On
-// receipt of a good response, it caches validated resources and also invokes
-// the registered watcher callback.
+// handleRDSResponse processes an RDS response received from the management
+// server. On receipt of a good response, it caches validated resources and also
+// invokes the registered watcher callback.
func (v2c *client) handleRDSResponse(resp *v2xdspb.DiscoveryResponse) error {
update, err := xdsclient.UnmarshalRouteConfig(resp.GetResources(), v2c.logger)
if err != nil {
@@ -204,8 +205,9 @@
return nil
}
-// handleCDSResponse processes an CDS response received from the xDS server. On
-// receipt of a good response, it also invokes the registered watcher callback.
+// handleCDSResponse processes an CDS response received from the management
+// server. On receipt of a good response, it also invokes the registered watcher
+// callback.
func (v2c *client) handleCDSResponse(resp *v2xdspb.DiscoveryResponse) error {
update, err := xdsclient.UnmarshalCluster(resp.GetResources(), v2c.logger)
if err != nil {
diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go
index 00e74e5..de34da8 100644
--- a/xds/internal/client/v3/client.go
+++ b/xds/internal/client/v3/client.go
@@ -181,8 +181,9 @@
return rType, resp.GetVersionInfo(), resp.GetNonce(), err
}
-// handleLDSResponse processes an LDS response received from the xDS server. On
-// receipt of a good response, it also invokes the registered watcher callback.
+// handleLDSResponse processes an LDS response received from the management
+// server. On receipt of a good response, it also invokes the registered watcher
+// callback.
func (v3c *client) handleLDSResponse(resp *v3discoverypb.DiscoveryResponse) error {
update, err := xdsclient.UnmarshalListener(resp.GetResources(), v3c.logger)
if err != nil {
@@ -192,9 +193,9 @@
return nil
}
-// handleRDSResponse processes an RDS response received from the xDS server. On
-// receipt of a good response, it caches validated resources and also invokes
-// the registered watcher callback.
+// handleRDSResponse processes an RDS response received from the management
+// server. On receipt of a good response, it caches validated resources and also
+// invokes the registered watcher callback.
func (v3c *client) handleRDSResponse(resp *v3discoverypb.DiscoveryResponse) error {
update, err := xdsclient.UnmarshalRouteConfig(resp.GetResources(), v3c.logger)
if err != nil {
@@ -204,8 +205,9 @@
return nil
}
-// handleCDSResponse processes an CDS response received from the xDS server. On
-// receipt of a good response, it also invokes the registered watcher callback.
+// handleCDSResponse processes an CDS response received from the management
+// server. On receipt of a good response, it also invokes the registered watcher
+// callback.
func (v3c *client) handleCDSResponse(resp *v3discoverypb.DiscoveryResponse) error {
update, err := xdsclient.UnmarshalCluster(resp.GetResources(), v3c.logger)
if err != nil {
diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go
index c3e8323..667e7ae 100644
--- a/xds/internal/resolver/xds_resolver.go
+++ b/xds/internal/resolver/xds_resolver.go
@@ -22,7 +22,6 @@
import (
"fmt"
- "google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
@@ -30,17 +29,15 @@
xdsinternal "google.golang.org/grpc/xds/internal"
xdsclient "google.golang.org/grpc/xds/internal/client"
- "google.golang.org/grpc/xds/internal/client/bootstrap"
)
const xdsScheme = "xds"
// For overriding in unittests.
var (
- newXDSClient = func(opts xdsclient.Options) (xdsClientInterface, error) {
- return xdsclient.New(opts)
+ newXDSClient = func() (xdsClientInterface, error) {
+ return xdsclient.New()
}
- newXDSConfig = bootstrap.NewConfig
)
func init() {
@@ -53,12 +50,7 @@
//
// The xds bootstrap process is performed (and a new xds client is built) every
// time an xds resolver is built.
-func (b *xdsResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, rbo resolver.BuildOptions) (resolver.Resolver, error) {
- config, err := newXDSConfig()
- if err != nil {
- return nil, fmt.Errorf("xds: failed to read bootstrap file: %v", err)
- }
-
+func (b *xdsResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
r := &xdsResolver{
target: t,
cc: cc,
@@ -68,12 +60,7 @@
r.logger = prefixLogger((r))
r.logger.Infof("Creating resolver for target: %+v", t)
- var dopts []grpc.DialOption
- if rbo.Dialer != nil {
- dopts = []grpc.DialOption{grpc.WithContextDialer(rbo.Dialer)}
- }
-
- client, err := newXDSClient(xdsclient.Options{Config: *config, DialOpts: dopts, TargetName: t.Endpoint})
+ client, err := newXDSClient()
if err != nil {
return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
}
diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go
index 776ac24..21714b4 100644
--- a/xds/internal/resolver/xds_resolver_test.go
+++ b/xds/internal/resolver/xds_resolver_test.go
@@ -21,13 +21,10 @@
import (
"context"
"errors"
- "fmt"
- "net"
"testing"
"time"
"github.com/google/go-cmp/cmp"
- "google.golang.org/grpc"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpctest"
@@ -38,8 +35,6 @@
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config
"google.golang.org/grpc/xds/internal/client"
xdsclient "google.golang.org/grpc/xds/internal/client"
- "google.golang.org/grpc/xds/internal/client/bootstrap"
- xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
)
@@ -53,11 +48,6 @@
)
var (
- validConfig = bootstrap.Config{
- BalancerName: balancerName,
- Creds: grpc.WithInsecure(),
- NodeProto: xdstestutils.EmptyNodeProtoV2,
- }
target = resolver.Target{Endpoint: targetStr}
)
@@ -104,87 +94,24 @@
}
}
-func getXDSClientMakerFunc(wantOpts xdsclient.Options) func(xdsclient.Options) (xdsClientInterface, error) {
- return func(gotOpts xdsclient.Options) (xdsClientInterface, error) {
- if gotOpts.Config.BalancerName != wantOpts.Config.BalancerName {
- return nil, fmt.Errorf("got balancerName: %s, want: %s", gotOpts.Config.BalancerName, wantOpts.Config.BalancerName)
- }
- // We cannot compare two DialOption objects to see if they are equal
- // because each of these is a function pointer. So, the only thing we
- // can do here is to check if the got option is nil or not based on
- // what the want option is. We should be able to do extensive
- // credential testing in e2e tests.
- if (gotOpts.Config.Creds != nil) != (wantOpts.Config.Creds != nil) {
- return nil, fmt.Errorf("got len(creds): %v, want: %v", gotOpts.Config.Creds, wantOpts.Config.Creds)
- }
- if len(gotOpts.DialOpts) != len(wantOpts.DialOpts) {
- return nil, fmt.Errorf("got len(DialOpts): %v, want: %v", len(gotOpts.DialOpts), len(wantOpts.DialOpts))
- }
- return fakeclient.NewClient(), nil
- }
-}
-
-func errorDialer(_ context.Context, _ string) (net.Conn, error) {
- return nil, errors.New("dial error")
-}
-
// TestResolverBuilder tests the xdsResolverBuilder's Build method with
// different parameters.
func (s) TestResolverBuilder(t *testing.T) {
tests := []struct {
name string
- rbo resolver.BuildOptions
- config bootstrap.Config
- xdsClientFunc func(xdsclient.Options) (xdsClientInterface, error)
+ xdsClientFunc func() (xdsClientInterface, error)
wantErr bool
}{
{
- name: "empty-config",
- rbo: resolver.BuildOptions{},
- config: bootstrap.Config{},
- wantErr: true,
- },
- {
- name: "no-balancer-name-in-config",
- rbo: resolver.BuildOptions{},
- config: bootstrap.Config{
- Creds: grpc.WithInsecure(),
- NodeProto: xdstestutils.EmptyNodeProtoV2,
+ name: "simple-good",
+ xdsClientFunc: func() (xdsClientInterface, error) {
+ return fakeclient.NewClient(), nil
},
- wantErr: true,
- },
- {
- name: "no-creds-in-config",
- rbo: resolver.BuildOptions{},
- config: bootstrap.Config{
- BalancerName: balancerName,
- NodeProto: xdstestutils.EmptyNodeProtoV2,
- },
- xdsClientFunc: getXDSClientMakerFunc(xdsclient.Options{Config: validConfig}),
- wantErr: true,
- },
- {
- name: "error-dialer-in-rbo",
- rbo: resolver.BuildOptions{Dialer: errorDialer},
- config: validConfig,
- xdsClientFunc: getXDSClientMakerFunc(xdsclient.Options{
- Config: validConfig,
- DialOpts: []grpc.DialOption{grpc.WithContextDialer(errorDialer)},
- }),
wantErr: false,
},
{
- name: "simple-good",
- rbo: resolver.BuildOptions{},
- config: validConfig,
- xdsClientFunc: getXDSClientMakerFunc(xdsclient.Options{Config: validConfig}),
- wantErr: false,
- },
- {
- name: "newXDSClient-throws-error",
- rbo: resolver.BuildOptions{},
- config: validConfig,
- xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) {
+ name: "newXDSClient-throws-error",
+ xdsClientFunc: func() (xdsClientInterface, error) {
return nil, errors.New("newXDSClient-throws-error")
},
wantErr: true,
@@ -192,19 +119,10 @@
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- // Fake out the bootstrap process by providing our own config.
- oldConfigMaker := newXDSConfig
- newXDSConfig = func() (*bootstrap.Config, error) {
- if test.config.BalancerName == "" {
- return nil, fmt.Errorf("no balancer name found in config")
- }
- return &test.config, nil
- }
// Fake out the xdsClient creation process by providing a fake.
oldClientMaker := newXDSClient
newXDSClient = test.xdsClientFunc
defer func() {
- newXDSConfig = oldConfigMaker
newXDSClient = oldClientMaker
}()
@@ -213,7 +131,7 @@
t.Fatalf("resolver.Get(%v) returned nil", xdsScheme)
}
- r, err := builder.Build(target, newTestClientConn(), test.rbo)
+ r, err := builder.Build(target, newTestClientConn(), resolver.BuildOptions{})
if (err != nil) != test.wantErr {
t.Fatalf("builder.Build(%v) returned err: %v, wantErr: %v", target, err, test.wantErr)
}
@@ -227,19 +145,15 @@
}
type setupOpts struct {
- config *bootstrap.Config
- xdsClientFunc func(xdsclient.Options) (xdsClientInterface, error)
+ xdsClientFunc func() (xdsClientInterface, error)
}
func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *testClientConn, func()) {
t.Helper()
- oldConfigMaker := newXDSConfig
- newXDSConfig = func() (*bootstrap.Config, error) { return opts.config, nil }
oldClientMaker := newXDSClient
newXDSClient = opts.xdsClientFunc
cancel := func() {
- newXDSConfig = oldConfigMaker
newXDSClient = oldClientMaker
}
@@ -291,8 +205,7 @@
func (s) TestXDSResolverWatchCallbackAfterClose(t *testing.T) {
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
- config: &validConfig,
- xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil },
+ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer cancel()
@@ -324,8 +237,7 @@
func (s) TestXDSResolverBadServiceUpdate(t *testing.T) {
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
- config: &validConfig,
- xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil },
+ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
@@ -353,8 +265,7 @@
func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
- config: &validConfig,
- xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil },
+ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
@@ -423,8 +334,7 @@
func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) {
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
- config: &validConfig,
- xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil },
+ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
@@ -483,8 +393,7 @@
func (s) TestXDSResolverResourceNotFoundError(t *testing.T) {
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
- config: &validConfig,
- xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil },
+ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
diff --git a/xds/internal/testutils/fakeserver/server.go b/xds/internal/testutils/fakeserver/server.go
index 994f530..6dd5436 100644
--- a/xds/internal/testutils/fakeserver/server.go
+++ b/xds/internal/testutils/fakeserver/server.go
@@ -16,7 +16,7 @@
*
*/
-// Package fakeserver provides a fake implementation of an xDS server.
+// Package fakeserver provides a fake implementation of the management server.
package fakeserver
import (
diff --git a/xds/server.go b/xds/server.go
index 449c27a..51acf08 100644
--- a/xds/server.go
+++ b/xds/server.go
@@ -44,8 +44,8 @@
var (
// These new functions will be overridden in unit tests.
- newXDSClient = func(opts xdsclient.Options) (xdsClientInterface, error) {
- return xdsclient.New(opts)
+ newXDSClient = func() (xdsClientInterface, error) {
+ return xdsclient.New()
}
newXDSConfig = bootstrap.NewConfig
newGRPCServer = func(opts ...grpc.ServerOption) grpcServerInterface {
@@ -163,12 +163,7 @@
return nil
}
- // Read the bootstrap file as part of initializing the xdsClient.
- config, err := newXDSConfig()
- if err != nil {
- return fmt.Errorf("xds: failed to read bootstrap file: %v", err)
- }
- client, err := newXDSClient(xdsclient.Options{Config: *config})
+ client, err := newXDSClient()
if err != nil {
return fmt.Errorf("xds: failed to create xds-client: %v", err)
}
diff --git a/xds/server_test.go b/xds/server_test.go
index 8769cf3..c9b185d 100644
--- a/xds/server_test.go
+++ b/xds/server_test.go
@@ -197,7 +197,7 @@
clientCh := testutils.NewChannel()
origNewXDSClient := newXDSClient
- newXDSClient = func(xdsclient.Options) (xdsClientInterface, error) {
+ newXDSClient = func() (xdsClientInterface, error) {
c := fakeclient.NewClient()
clientCh.Send(c)
return c, nil
@@ -385,7 +385,7 @@
defer func() { newXDSConfig = origNewXDSConfig }()
origNewXDSClient := newXDSClient
- newXDSClient = func(xdsclient.Options) (xdsClientInterface, error) {
+ newXDSClient = func() (xdsClientInterface, error) {
return nil, errors.New("xdsClient creation failed")
}
defer func() { newXDSClient = origNewXDSClient }()