lrs: handle multiple clusters in LRS stream (#3935)
diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go
index 6b124c2..a148db0 100644
--- a/xds/internal/balancer/edsbalancer/eds.go
+++ b/xds/internal/balancer/edsbalancer/eds.go
@@ -177,7 +177,9 @@
return
}
- x.client.handleUpdate(cfg, u.ResolverState.Attributes)
+ if err := x.client.handleUpdate(cfg, u.ResolverState.Attributes); err != nil {
+ x.logger.Warningf("failed to update xds clients: %v", err)
+ }
if x.config == nil {
x.config = cfg
diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go
index a56acb7..292ea4f 100644
--- a/xds/internal/balancer/edsbalancer/eds_impl_test.go
+++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go
@@ -688,9 +688,10 @@
// be used.
loadStore := load.NewStore()
lsWrapper := &loadStoreWrapper{}
- lsWrapper.update(loadStore, testClusterNames[0])
+ lsWrapper.updateServiceName(testClusterNames[0])
+ lsWrapper.updateLoadStore(loadStore)
cw := &xdsClientWrapper{
- load: lsWrapper,
+ loadWrapper: lsWrapper,
}
cc := testutils.NewTestClientConn(t)
diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
index f22c162..fe4e996 100644
--- a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
+++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
@@ -19,6 +19,7 @@
package edsbalancer
import (
+ "fmt"
"sync"
"google.golang.org/grpc"
@@ -35,8 +36,7 @@
// balancer. It's defined so we can override xdsclientNew function in tests.
type xdsClientInterface interface {
WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func())
- LoadStore() *load.Store
- ReportLoad(server string, clusterName string) (cancel func())
+ ReportLoad(server string) (loadStore *load.Store, cancel func())
Close()
}
@@ -48,20 +48,34 @@
)
type loadStoreWrapper struct {
- mu sync.RWMutex
+ mu sync.RWMutex
+ service string
+ // Both store and perCluster will be nil if load reporting is disabled (EDS
+ // response doesn't have LRS server name). Note that methods on Store and
+ // perCluster all handle nil, so there's no need to check nil before calling
+ // them.
store *load.Store
- service string
perCluster load.PerClusterReporter
}
-func (lsw *loadStoreWrapper) update(store *load.Store, service string) {
+func (lsw *loadStoreWrapper) updateServiceName(service string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
- if store == lsw.store && service == lsw.service {
+ if lsw.service == service {
+ return
+ }
+ lsw.service = service
+ lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
+}
+
+func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
+ lsw.mu.Lock()
+ defer lsw.mu.Unlock()
+ if store == lsw.store {
return
}
lsw.store = store
- lsw.service = service
+ lsw.perCluster = nil
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}
@@ -102,7 +116,9 @@
// xdsClient could come from attributes, or created with balancerName.
xdsClient xdsClientInterface
- load *loadStoreWrapper
+ // loadWrapper is a wrapper with loadOriginal, with clusterName and
+ // edsServiceName. It's used children to report loads.
+ loadWrapper *loadStoreWrapper
// edsServiceName is the edsServiceName currently being watched, not
// necessary the edsServiceName from service config.
//
@@ -127,7 +143,7 @@
logger: logger,
newEDSUpdate: newEDSUpdate,
bbo: bbo,
- load: &loadStoreWrapper{},
+ loadWrapper: &loadStoreWrapper{},
}
}
@@ -168,12 +184,12 @@
// the balancerName (from bootstrap file or from service config) changed.
// - if balancer names are the same, do nothing, and return false
// - if balancer names are different, create new one, and return true
-func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.Attributes) bool {
+func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.Attributes) (bool, error) {
if attr != nil {
if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
// This will also clear balancerName, to indicate that client is
// from attributes.
- return c.replaceXDSClient(clientFromAttr, "")
+ return c.replaceXDSClient(clientFromAttr, ""), nil
}
}
@@ -184,7 +200,7 @@
}
if c.balancerName == clientConfig.BalancerName {
- return false
+ return false, nil
}
var dopts []grpc.DialOption
@@ -192,16 +208,20 @@
dopts = []grpc.DialOption{grpc.WithContextDialer(dialer)}
}
+ // TODO: there's no longer a need to read bootstrap file and create a new
+ // xds client. The EDS balancer should always get the xds client from
+ // attributes. Otherwise, this function should just fail. Also, xdsclient
+ // will be shared by multiple clients, so trying to make an xds client is
+ // just the wrong move.
newClient, err := xdsclientNew(xdsclient.Options{Config: *clientConfig, DialOpts: dopts})
if err != nil {
// This should never fail. xdsclientnew does a non-blocking dial, and
// all the config passed in should be validated.
//
// This could leave c.xdsClient as nil if this is the first update.
- c.logger.Warningf("eds: failed to create xdsClient, error: %v", err)
- return false
+ return false, fmt.Errorf("eds: failed to create xdsClient, error: %v", err)
}
- return c.replaceXDSClient(newClient, clientConfig.BalancerName)
+ return c.replaceXDSClient(newClient, clientConfig.BalancerName), nil
}
// startEndpointsWatch starts the EDS watch. Caller can call this when the
@@ -214,10 +234,6 @@
// This usually means load report needs to be restarted, but this function does
// NOT do that. Caller needs to call startLoadReport separately.
func (c *xdsClientWrapper) startEndpointsWatch() {
- if c.xdsClient == nil {
- return
- }
-
if c.cancelEndpointsWatch != nil {
c.cancelEndpointsWatch()
}
@@ -238,31 +254,32 @@
// Caller can cal this when the loadReportServer name changes, but
// edsServiceName doesn't (so we only need to restart load reporting, not EDS
// watch).
-func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) {
- if c.xdsClient == nil {
- c.logger.Warningf("xds: xdsClient is nil when trying to start load reporting. This means xdsClient wasn't passed in from the resolver, and xdsClient.New failed")
- return
- }
+func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) *load.Store {
if c.cancelLoadReport != nil {
c.cancelLoadReport()
}
c.loadReportServer = loadReportServer
+ var loadStore *load.Store
if c.loadReportServer != nil {
- c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, c.edsServiceName)
+ loadStore, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer)
}
+ return loadStore
}
func (c *xdsClientWrapper) loadStore() load.PerClusterReporter {
- if c == nil || c.load.store == nil {
+ if c == nil {
return nil
}
- return c.load
+ return c.loadWrapper
}
// handleUpdate applies the service config and attributes updates to the client,
// including updating the xds_client to use, and updating the EDS name to watch.
-func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) {
- clientChanged := c.updateXDSClient(config, attr)
+func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) error {
+ clientChanged, err := c.updateXDSClient(config, attr)
+ if err != nil {
+ return err
+ }
// Need to restart EDS watch when one of the following happens:
// - the xds_client is updated
@@ -277,14 +294,17 @@
//
// This is OK for now, because we don't actually expect edsServiceName
// to change. Fix this (a bigger change) will happen later.
- c.load.update(c.xdsClient.LoadStore(), c.edsServiceName)
+ c.loadWrapper.updateServiceName(c.edsServiceName)
}
// Only need to restart load reporting when:
// - the loadReportServer name changed
if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
- c.startLoadReport(config.LrsLoadReportingServerName)
+ loadStore := c.startLoadReport(config.LrsLoadReportingServerName)
+ c.loadWrapper.updateLoadStore(loadStore)
}
+
+ return nil
}
func (c *xdsClientWrapper) cancelWatch() {
diff --git a/xds/internal/balancer/edsbalancer/xds_lrs_test.go b/xds/internal/balancer/edsbalancer/xds_lrs_test.go
index 8d888ec..955f544 100644
--- a/xds/internal/balancer/edsbalancer/xds_lrs_test.go
+++ b/xds/internal/balancer/edsbalancer/xds_lrs_test.go
@@ -66,7 +66,7 @@
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
- if got.Server != "" || got.Cluster != testEDSClusterName {
- t.Fatalf("xdsClient.ReportLoad called with {%v, %v}: want {\"\", %v}", got.Server, got.Cluster, testEDSClusterName)
+ if got.Server != "" {
+ t.Fatalf("xdsClient.ReportLoad called with {%v}: want {\"\"}", got.Server)
}
}
diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go
index 1361fb1..f8e7673 100644
--- a/xds/internal/balancer/lrs/balancer.go
+++ b/xds/internal/balancer/lrs/balancer.go
@@ -80,7 +80,9 @@
// Update load reporting config or xds client. This needs to be done before
// updating the child policy because we need the loadStore from the updated
// client to be passed to the ccWrapper.
- b.client.update(newConfig, s.ResolverState.Attributes)
+ if err := b.client.update(newConfig, s.ResolverState.Attributes); err != nil {
+ return err
+ }
// If child policy is a different type, recreate the sub-balancer.
if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
@@ -144,28 +146,41 @@
// xdsClientInterface contains only the xds_client methods needed by LRS
// balancer. It's defined so we can override xdsclient in tests.
type xdsClientInterface interface {
- LoadStore() *load.Store
- ReportLoad(server string, clusterName string) func()
+ ReportLoad(server string) (*load.Store, func())
Close()
}
type loadStoreWrapper struct {
mu sync.RWMutex
- store *load.Store
cluster string
edsService string
+ // Both store and perCluster will be nil if load reporting is disabled (EDS
+ // response doesn't have LRS server name). Note that methods on Store and
+ // perCluster all handle nil, so there's no need to check nil before calling
+ // them.
+ store *load.Store
perCluster load.PerClusterReporter
}
-func (lsw *loadStoreWrapper) update(store *load.Store, cluster, edsService string) {
+func (lsw *loadStoreWrapper) updateClusterAndService(cluster, edsService string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
- if store == lsw.store && cluster == lsw.cluster && edsService == lsw.edsService {
+ if cluster == lsw.cluster && edsService == lsw.edsService {
+ return
+ }
+ lsw.cluster = cluster
+ lsw.edsService = edsService
+ lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
+}
+
+func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
+ lsw.mu.Lock()
+ defer lsw.mu.Unlock()
+ if store == lsw.store {
return
}
lsw.store = store
- lsw.cluster = cluster
- lsw.edsService = edsService
+ lsw.perCluster = nil
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}
@@ -199,44 +214,62 @@
clusterName string
edsServiceName string
lrsServerName string
- load *loadStoreWrapper
+ // loadWrapper is a wrapper with loadOriginal, with clusterName and
+ // edsServiceName. It's used children to report loads.
+ loadWrapper *loadStoreWrapper
}
func newXDSClientWrapper() *xdsClientWrapper {
return &xdsClientWrapper{
- load: &loadStoreWrapper{},
+ loadWrapper: &loadStoreWrapper{},
}
}
// update checks the config and xdsclient, and decides whether it needs to
// restart the load reporting stream.
-func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) {
+func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) error {
var (
- restartLoadReport bool
- updateLoadStore bool
+ restartLoadReport bool
+ updateLoadClusterAndService bool
)
- if attr != nil {
- if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
- if w.c != clientFromAttr {
- // xds client is different, restart.
- restartLoadReport = true
- updateLoadStore = true
- w.c = clientFromAttr
- }
- }
+
+ if attr == nil {
+ return fmt.Errorf("lrs: failed to get xdsClient from attributes: attributes is nil")
+ }
+ clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface)
+ if clientFromAttr == nil {
+ return fmt.Errorf("lrs: failed to get xdsClient from attributes: xdsClient not found in attributes")
+ }
+
+ if w.c != clientFromAttr {
+ // xds client is different, restart.
+ restartLoadReport = true
+ w.c = clientFromAttr
}
// ClusterName is different, restart. ClusterName is from ClusterName and
// EdsServiceName.
if w.clusterName != newConfig.ClusterName {
- updateLoadStore = true
+ updateLoadClusterAndService = true
w.clusterName = newConfig.ClusterName
}
if w.edsServiceName != newConfig.EdsServiceName {
- updateLoadStore = true
+ updateLoadClusterAndService = true
w.edsServiceName = newConfig.EdsServiceName
}
+ if updateLoadClusterAndService {
+ // This updates the clusterName and serviceName that will reported for the
+ // loads. The update here is too early, the perfect timing is when the
+ // picker is updated with the new connection. But from this balancer's point
+ // of view, it's impossible to tell.
+ //
+ // On the other hand, this will almost never happen. Each LRS policy
+ // shouldn't get updated config. The parent should do a graceful switch when
+ // the clusterName or serviceName is changed.
+ w.loadWrapper.updateClusterAndService(w.clusterName, w.edsServiceName)
+ }
+
if w.lrsServerName != newConfig.LrsLoadReportingServerName {
// LrsLoadReportingServerName is different, load should be report to a
// different server, restart.
@@ -244,34 +277,23 @@
w.lrsServerName = newConfig.LrsLoadReportingServerName
}
- // This updates the clusterName and serviceName that will reported for the
- // loads. The update here is too early, the perfect timing is when the
- // picker is updated with the new connection. But from this balancer's point
- // of view, it's impossible to tell.
- //
- // On the other hand, this will almost never happen. Each LRS policy
- // shouldn't get updated config. The parent should do a graceful switch when
- // the clusterName or serviceName is changed.
- if updateLoadStore {
- w.load.update(w.c.LoadStore(), w.clusterName, w.edsServiceName)
- }
-
if restartLoadReport {
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
+ var loadStore *load.Store
if w.c != nil {
- w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName, w.clusterName)
+ loadStore, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName)
}
+ w.loadWrapper.updateLoadStore(loadStore)
}
+
+ return nil
}
func (w *xdsClientWrapper) loadStore() load.PerClusterReporter {
- if w.load.store == nil {
- return nil
- }
- return w.load
+ return w.loadWrapper
}
func (w *xdsClientWrapper) close() {
diff --git a/xds/internal/balancer/lrs/balancer_test.go b/xds/internal/balancer/lrs/balancer_test.go
index 789cfea..38dd573 100644
--- a/xds/internal/balancer/lrs/balancer_test.go
+++ b/xds/internal/balancer/lrs/balancer_test.go
@@ -84,8 +84,8 @@
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
- if got.Server != testLRSServerName || got.Cluster != testClusterName {
- t.Fatalf("xdsClient.ReportLoad called with {%q, %q}: want {%q, %q}", got.Server, got.Cluster, testLRSServerName, testClusterName)
+ if got.Server != testLRSServerName {
+ t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
}
sc1 := <-cc.NewSubConnCh
diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go
index 0d61963..3478569 100644
--- a/xds/internal/client/client.go
+++ b/xds/internal/client/client.go
@@ -30,6 +30,7 @@
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/golang/protobuf/proto"
+ "google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/backoff"
@@ -39,7 +40,6 @@
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/client/bootstrap"
- "google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/version"
)
@@ -78,9 +78,6 @@
// Backoff returns the amount of time to backoff before retrying broken
// streams.
Backoff func(int) time.Duration
- // LoadStore contains load reports which need to be pushed to the management
- // server.
- LoadStore *load.Store
// Logger provides enhanced logging capabilities.
Logger *grpclog.PrefixLogger
}
@@ -99,6 +96,12 @@
// APIClient represents the functionality provided by transport protocol
// version specific implementations of the xDS client.
+//
+// TODO: unexport this interface and all the methods after the PR to make
+// xdsClient sharable by clients. AddWatch and RemoveWatch are exported for
+// v2/v3 to override because they need to keep track of LDS name for RDS to use.
+// After the share xdsClient change, that's no longer necessary. After that, we
+// will still keep this interface for testing purposes.
type APIClient interface {
// AddWatch adds a watch for an xDS resource given its type and name.
AddWatch(ResourceType, string)
@@ -107,21 +110,18 @@
// given its type and name.
RemoveWatch(ResourceType, string)
- // ReportLoad starts an LRS stream to periodically report load using the
+ // reportLoad starts an LRS stream to periodically report load using the
// provided ClientConn, which represent a connection to the management
// server.
- ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions)
+ reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions)
// Close cleans up resources allocated by the API client.
Close()
}
-// LoadReportingOptions contains configuration knobs for reporting load data.
-type LoadReportingOptions struct {
- // ClusterName is the cluster name for which load is being reported.
- ClusterName string
- // TargetName is the target of the parent ClientConn.
- TargetName string
+// loadReportingOptions contains configuration knobs for reporting load data.
+type loadReportingOptions struct {
+ loadStore *load.Store
}
// UpdateHandler receives and processes (by taking appropriate actions) xDS
@@ -328,7 +328,6 @@
opts Options
cc *grpc.ClientConn // Connection to the xDS server
apiClient APIClient
- loadStore *load.Store
logger *grpclog.PrefixLogger
@@ -342,6 +341,11 @@
cdsCache map[string]ClusterUpdate
edsWatchers map[string]map[*watchInfo]bool
edsCache map[string]EndpointsUpdate
+
+ // Changes to map lrsClients and the lrsClient inside the map need to be
+ // protected by lrsMu.
+ lrsMu sync.Mutex
+ lrsClients map[string]*lrsClient
}
// New returns a new xdsClient configured with opts.
@@ -382,9 +386,8 @@
}
c := &Client{
- done: grpcsync.NewEvent(),
- opts: opts,
- loadStore: load.NewStore(),
+ done: grpcsync.NewEvent(),
+ opts: opts,
updateCh: buffer.NewUnbounded(),
ldsWatchers: make(map[string]map[*watchInfo]bool),
@@ -395,6 +398,7 @@
cdsCache: make(map[string]ClusterUpdate),
edsWatchers: make(map[string]map[*watchInfo]bool),
edsCache: make(map[string]EndpointsUpdate),
+ lrsClients: make(map[string]*lrsClient),
}
cc, err := grpc.Dial(opts.Config.BalancerName, dopts...)
@@ -410,7 +414,6 @@
Parent: c,
NodeProto: opts.Config.NodeProto,
Backoff: backoff.DefaultExponential.Backoff,
- LoadStore: c.loadStore,
Logger: c.logger,
})
if err != nil {
diff --git a/xds/internal/client/client_loadreport.go b/xds/internal/client/client_loadreport.go
index e52c3b9..e91316b 100644
--- a/xds/internal/client/client_loadreport.go
+++ b/xds/internal/client/client_loadreport.go
@@ -24,53 +24,116 @@
"google.golang.org/grpc/xds/internal/client/load"
)
-// NodeMetadataHostnameKey is the metadata key for specifying the target name in
-// the node proto of an LRS request.
-const NodeMetadataHostnameKey = "PROXYLESS_CLIENT_HOSTNAME"
-
-// LoadStore returns the underlying load data store used by the xDS client.
-func (c *Client) LoadStore() *load.Store {
- return c.loadStore
-}
-
-// ReportLoad sends the load of the given clusterName to the given server. If
-// the server is not an empty string, and is different from the xds server, a
-// new ClientConn will be created.
+// ReportLoad starts an load reporting stream to the given server. If the server
+// is not an empty string, and is different from the xds server, a new
+// ClientConn will be created.
//
// The same options used for creating the Client will be used (including
// NodeProto, and dial options if necessary).
//
-// It returns a function to cancel the load reporting stream. If server is
-// different from xds server, the ClientConn will also be closed.
-func (c *Client) ReportLoad(server string, clusterName string) func() {
- var (
- cc *grpc.ClientConn
- closeCC bool
- )
- c.logger.Infof("Starting load report to server: %s", server)
- if server == "" || server == c.opts.Config.BalancerName {
- cc = c.cc
+// It returns a Store for the user to report loads, a function to cancel the
+// load reporting stream.
+func (c *Client) ReportLoad(server string) (*load.Store, func()) {
+ c.lrsMu.Lock()
+ defer c.lrsMu.Unlock()
+
+ // If there's already a client to this server, use it. Otherwise, create
+ // one.
+ lrsC, ok := c.lrsClients[server]
+ if !ok {
+ lrsC = newLRSClient(c, server)
+ c.lrsClients[server] = lrsC
+ }
+
+ store := lrsC.ref()
+ return store, func() {
+ // This is a callback, need to hold lrsMu.
+ c.lrsMu.Lock()
+ defer c.lrsMu.Unlock()
+ if lrsC.unRef() {
+ // Delete the lrsClient from map if this is the last reference.
+ delete(c.lrsClients, server)
+ }
+ }
+}
+
+// lrsClient maps to one lrsServer. It contains:
+// - a ClientConn to this server (only if it's different from the xds server)
+// - a load.Store that contains loads only for this server
+type lrsClient struct {
+ parent *Client
+ server string
+
+ cc *grpc.ClientConn // nil if the server is same as the xds server
+ refCount int
+ cancelStream func()
+ loadStore *load.Store
+}
+
+// newLRSClient creates a new LRS stream to the server.
+func newLRSClient(parent *Client, server string) *lrsClient {
+ return &lrsClient{
+ parent: parent,
+ server: server,
+ refCount: 0,
+ }
+}
+
+// ref increments the refCount. If this is the first ref, it starts the LRS stream.
+//
+// Not thread-safe, caller needs to synchronize.
+func (lrsC *lrsClient) ref() *load.Store {
+ lrsC.refCount++
+ if lrsC.refCount == 1 {
+ lrsC.startStream()
+ }
+ return lrsC.loadStore
+}
+
+// unRef decrements the refCount, and closes the stream if refCount reaches 0
+// (and close the cc if cc is not xDS cc). It returns whether refCount reached 0
+// after this call.
+//
+// Not thread-safe, caller needs to synchronize.
+func (lrsC *lrsClient) unRef() (closed bool) {
+ lrsC.refCount--
+ if lrsC.refCount != 0 {
+ return false
+ }
+ lrsC.parent.logger.Infof("Stopping load report to server: %s", lrsC.server)
+ lrsC.cancelStream()
+ if lrsC.cc != nil {
+ lrsC.cc.Close()
+ }
+ return true
+}
+
+// startStream starts the LRS stream to the server. If server is not the same
+// xDS server from the parent, it also creates a ClientConn.
+func (lrsC *lrsClient) startStream() {
+ var cc *grpc.ClientConn
+
+ lrsC.parent.logger.Infof("Starting load report to server: %s", lrsC.server)
+ if lrsC.server == "" || lrsC.server == lrsC.parent.opts.Config.BalancerName {
+ // Reuse the xDS client if server is the same.
+ cc = lrsC.parent.cc
} else {
- c.logger.Infof("LRS server is different from xDS server, starting a new ClientConn")
- dopts := append([]grpc.DialOption{c.opts.Config.Creds}, c.opts.DialOpts...)
- ccNew, err := grpc.Dial(server, dopts...)
+ lrsC.parent.logger.Infof("LRS server is different from xDS server, starting a new ClientConn")
+ dopts := append([]grpc.DialOption{lrsC.parent.opts.Config.Creds}, lrsC.parent.opts.DialOpts...)
+ ccNew, err := grpc.Dial(lrsC.server, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
- c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err)
- return func() {}
+ lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
+ return
}
cc = ccNew
- closeCC = true
+ lrsC.cc = ccNew
}
- ctx, cancel := context.WithCancel(context.Background())
- go c.apiClient.ReportLoad(ctx, c.cc, LoadReportingOptions{
- ClusterName: clusterName,
- TargetName: c.opts.TargetName,
- })
- return func() {
- cancel()
- if closeCC {
- cc.Close()
- }
- }
+
+ var ctx context.Context
+ ctx, lrsC.cancelStream = context.WithCancel(context.Background())
+
+ // Create the store and stream.
+ lrsC.loadStore = load.NewStore()
+ go lrsC.parent.apiClient.reportLoad(ctx, cc, loadReportingOptions{loadStore: lrsC.loadStore})
}
diff --git a/xds/internal/client/client_loadreport_test.go b/xds/internal/client/client_loadreport_test.go
new file mode 100644
index 0000000..d426247
--- /dev/null
+++ b/xds/internal/client/client_loadreport_test.go
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright 2020 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package client_test
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
+ endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
+ lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
+ durationpb "github.com/golang/protobuf/ptypes/duration"
+ "github.com/google/go-cmp/cmp"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/internal/grpctest"
+ "google.golang.org/grpc/status"
+ "google.golang.org/grpc/xds/internal/client"
+ "google.golang.org/grpc/xds/internal/client/bootstrap"
+ "google.golang.org/grpc/xds/internal/testutils/fakeserver"
+ "google.golang.org/grpc/xds/internal/version"
+ "google.golang.org/protobuf/testing/protocmp"
+
+ _ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client.
+)
+
+const (
+ defaultTestTimeout = 5 * time.Second
+ defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
+)
+
+type s struct {
+ grpctest.Tester
+}
+
+func Test(t *testing.T) {
+ grpctest.RunSubTests(t, s{})
+}
+
+func (s) TestLRSClient(t *testing.T) {
+ fs, sCleanup, err := fakeserver.StartServer()
+ if err != nil {
+ t.Fatalf("failed to start fake xDS server: %v", err)
+ }
+ defer sCleanup()
+
+ xdsC, err := client.New(client.Options{
+ Config: bootstrap.Config{
+ BalancerName: fs.Address,
+ Creds: grpc.WithInsecure(),
+ NodeProto: &v2corepb.Node{},
+ TransportAPI: version.TransportV2,
+ },
+ })
+ if err != nil {
+ t.Fatalf("failed to create xds client: %v", err)
+ }
+ defer xdsC.Close()
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+ if u, err := fs.NewConnChan.Receive(ctx); err != nil {
+ t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
+ }
+
+ // Report to the same address should not create new ClientConn.
+ store1, lrsCancel1 := xdsC.ReportLoad(fs.Address)
+ defer lrsCancel1()
+ sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := fs.NewConnChan.Receive(sCtx); err != context.DeadlineExceeded {
+ t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err)
+ }
+
+ fs2, sCleanup2, err := fakeserver.StartServer()
+ if err != nil {
+ t.Fatalf("failed to start fake xDS server: %v", err)
+ }
+ defer sCleanup2()
+
+ // Report to a different address should create new ClientConn.
+ store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address)
+ defer lrsCancel2()
+ if u, err := fs2.NewConnChan.Receive(ctx); err != nil {
+ t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
+ }
+
+ if store1 == store2 {
+ t.Fatalf("got same store for different servers, want different")
+ }
+
+ if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil {
+ t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
+ }
+ store2.PerCluster("cluster", "eds").CallDropped("test")
+
+ // Send one resp to the client.
+ fs2.LRSResponseChan <- &fakeserver.Response{
+ Resp: &lrspb.LoadStatsResponse{
+ SendAllClusters: true,
+ LoadReportingInterval: &durationpb.Duration{Nanos: 50000000},
+ },
+ }
+
+ // Server should receive a req with the loads.
+ u, err := fs2.LRSRequestChan.Receive(ctx)
+ if err != nil {
+ t.Fatalf("unexpected LRS request: %v, %v, want error canceled", u, err)
+ }
+ receivedLoad := u.(*fakeserver.Request).Req.(*lrspb.LoadStatsRequest).ClusterStats
+ if len(receivedLoad) <= 0 {
+ t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test")
+ }
+ receivedLoad[0].LoadReportInterval = nil
+ want := (&endpointpb.ClusterStats{
+ ClusterName: "cluster",
+ ClusterServiceName: "eds",
+ TotalDroppedRequests: 1,
+ DroppedRequests: []*endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}},
+ })
+ if d := cmp.Diff(want, receivedLoad[0], protocmp.Transform()); d != "" {
+ t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test, diff (-want +got):\n%s", d)
+ }
+
+ // Cancel this load reporting stream, server should see error canceled.
+ lrsCancel2()
+
+ // Server should receive a stream canceled error.
+ if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil || status.Code(u.(*fakeserver.Request).Err) != codes.Canceled {
+ t.Errorf("unexpected LRS request: %v, %v, want error canceled", u, err)
+ }
+}
diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go
index 4a53138..a2a92a2 100644
--- a/xds/internal/client/client_test.go
+++ b/xds/internal/client/client_test.go
@@ -115,7 +115,7 @@
c.removeWatches[resourceType].Send(resourceName)
}
-func (c *testAPIClient) ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions) {
+func (c *testAPIClient) reportLoad(context.Context, *grpc.ClientConn, loadReportingOptions) {
}
func (c *testAPIClient) Close() {}
diff --git a/xds/internal/client/transport_helper.go b/xds/internal/client/transport_helper.go
index 607f26f..b286a61 100644
--- a/xds/internal/client/transport_helper.go
+++ b/xds/internal/client/transport_helper.go
@@ -24,6 +24,7 @@
"time"
"github.com/golang/protobuf/proto"
+ "google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/buffer"
@@ -71,19 +72,21 @@
NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error)
// SendFirstLoadStatsRequest constructs and sends the first request on the
- // LRS stream. This contains the node proto with appropriate metadata
- // fields.
- SendFirstLoadStatsRequest(s grpc.ClientStream, targetName string) error
+ // LRS stream.
+ SendFirstLoadStatsRequest(s grpc.ClientStream) error
// HandleLoadStatsResponse receives the first response from the server which
// contains the load reporting interval and the clusters for which the
// server asks the client to report load for.
- HandleLoadStatsResponse(s grpc.ClientStream, clusterName string) (time.Duration, error)
+ //
+ // If the response sets SendAllClusters to true, the returned clusters is
+ // nil.
+ HandleLoadStatsResponse(s grpc.ClientStream) (clusters []string, _ time.Duration, _ error)
// SendLoadStatsRequest will be invoked at regular intervals to send load
// report with load data reported since the last time this method was
// invoked.
- SendLoadStatsRequest(s grpc.ClientStream, clusterName string) error
+ SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error
}
// TransportHelper contains all xDS transport protocol related functionality
@@ -443,9 +446,9 @@
return target, rType, version, nonce, send
}
-// ReportLoad starts an LRS stream to report load data to the management server.
+// reportLoad starts an LRS stream to report load data to the management server.
// It blocks until the context is cancelled.
-func (t *TransportHelper) ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions) {
+func (t *TransportHelper) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions) {
retries := 0
for {
if ctx.Err() != nil {
@@ -472,23 +475,23 @@
}
logger.Infof("lrs: created LRS stream")
- if err := t.vClient.SendFirstLoadStatsRequest(stream, opts.TargetName); err != nil {
+ if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil {
logger.Warningf("lrs: failed to send first request: %v", err)
continue
}
- interval, err := t.vClient.HandleLoadStatsResponse(stream, opts.ClusterName)
+ clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream)
if err != nil {
logger.Warning(err)
continue
}
retries = 0
- t.sendLoads(ctx, stream, opts.ClusterName, interval)
+ t.sendLoads(ctx, stream, opts.loadStore, clusters, interval)
}
}
-func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStream, clusterName string, interval time.Duration) {
+func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStream, store *load.Store, clusterNames []string, interval time.Duration) {
tick := time.NewTicker(interval)
defer tick.Stop()
for {
@@ -497,7 +500,7 @@
case <-ctx.Done():
return
}
- if err := t.vClient.SendLoadStatsRequest(stream, clusterName); err != nil {
+ if err := t.vClient.SendLoadStatsRequest(stream, store.Stats(clusterNames)); err != nil {
logger.Warning(err)
return
}
diff --git a/xds/internal/client/v2/client.go b/xds/internal/client/v2/client.go
index 7b063ad..433e690 100644
--- a/xds/internal/client/v2/client.go
+++ b/xds/internal/client/v2/client.go
@@ -28,7 +28,6 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
- "google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/version"
v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
@@ -69,7 +68,6 @@
cc: cc,
parent: opts.Parent,
nodeProto: nodeProto,
- loadStore: opts.LoadStore,
logger: opts.Logger,
}
v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background())
@@ -88,7 +86,6 @@
ctx context.Context
cancelCtx context.CancelFunc
parent xdsclient.UpdateHandler
- loadStore *load.Store
logger *grpclog.PrefixLogger
// ClientConn to the xDS gRPC server. Owned by the parent xdsClient.
diff --git a/xds/internal/client/v2/loadreport.go b/xds/internal/client/v2/loadreport.go
index a06dcb8..69405fc 100644
--- a/xds/internal/client/v2/loadreport.go
+++ b/xds/internal/client/v2/loadreport.go
@@ -26,17 +26,18 @@
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
+ "google.golang.org/grpc/xds/internal/client/load"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
v2endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
- structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal"
- xdsclient "google.golang.org/grpc/xds/internal/client"
)
+const clientFeatureLRSSendAllClusters = "envoy.lrs.supports_send_all_clusters"
+
type lrsStream lrsgrpc.LoadReportingService_StreamLoadStatsClient
func (v2c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) {
@@ -44,7 +45,7 @@
return c.StreamLoadStats(ctx)
}
-func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream, targetName string) error {
+func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error {
stream, ok := s.(lrsStream)
if !ok {
return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s)
@@ -53,72 +54,52 @@
if node == nil {
node = &v2corepb.Node{}
}
- if node.Metadata == nil {
- node.Metadata = &structpb.Struct{}
- }
- if node.Metadata.Fields == nil {
- node.Metadata.Fields = make(map[string]*structpb.Value)
- }
- node.Metadata.Fields[xdsclient.NodeMetadataHostnameKey] = &structpb.Value{
- Kind: &structpb.Value_StringValue{StringValue: targetName},
- }
+ node.ClientFeatures = append(node.ClientFeatures, clientFeatureLRSSendAllClusters)
req := &lrspb.LoadStatsRequest{Node: node}
v2c.logger.Infof("lrs: sending init LoadStatsRequest: %v", req)
return stream.Send(req)
}
-func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream, clusterName string) (time.Duration, error) {
+func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) {
stream, ok := s.(lrsStream)
if !ok {
- return 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s)
+ return nil, 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s)
}
resp, err := stream.Recv()
if err != nil {
- return 0, fmt.Errorf("lrs: failed to receive first response: %v", err)
+ return nil, 0, fmt.Errorf("lrs: failed to receive first response: %v", err)
}
v2c.logger.Infof("lrs: received first LoadStatsResponse: %+v", resp)
interval, err := ptypes.Duration(resp.GetLoadReportingInterval())
if err != nil {
- return 0, fmt.Errorf("lrs: failed to convert report interval: %v", err)
+ return nil, 0, fmt.Errorf("lrs: failed to convert report interval: %v", err)
}
- // The LRS client should join the clusters it knows with the cluster
- // list from response, and send loads for them.
- //
- // But the LRS client now only supports one cluster. TODO: extend it to
- // support multiple clusters.
- var clusterFoundInResponse bool
- for _, c := range resp.Clusters {
- if c == clusterName {
- clusterFoundInResponse = true
- }
- }
- if !clusterFoundInResponse {
- return 0, fmt.Errorf("lrs: received clusters %v does not contain expected {%v}", resp.Clusters, clusterName)
- }
if resp.ReportEndpointGranularity {
// TODO: fixme to support per endpoint loads.
- return 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation")
+ return nil, 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation")
}
- return interval, nil
+ clusters := resp.Clusters
+ if resp.SendAllClusters {
+ // Return nil to send stats for all clusters.
+ clusters = nil
+ }
+
+ return clusters, interval, nil
}
-func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, clusterName string) error {
+func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error {
stream, ok := s.(lrsStream)
if !ok {
return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s)
}
- if v2c.loadStore == nil {
- return errors.New("lrs: LoadStore is not initialized")
- }
var clusterStats []*v2endpointpb.ClusterStats
- sds := v2c.loadStore.Stats([]string{clusterName})
- for _, sd := range sds {
+ for _, sd := range loads {
var (
droppedReqs []*v2endpointpb.ClusterStats_DroppedRequests
localityStats []*v2endpointpb.UpstreamLocalityStats
diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go
index 5d8d719..00e74e5 100644
--- a/xds/internal/client/v3/client.go
+++ b/xds/internal/client/v3/client.go
@@ -29,7 +29,6 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
- "google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/version"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
@@ -69,7 +68,6 @@
cc: cc,
parent: opts.Parent,
nodeProto: nodeProto,
- loadStore: opts.LoadStore,
logger: opts.Logger,
}
v3c.ctx, v3c.cancelCtx = context.WithCancel(context.Background())
@@ -88,7 +86,6 @@
ctx context.Context
cancelCtx context.CancelFunc
parent xdsclient.UpdateHandler
- loadStore *load.Store
logger *grpclog.PrefixLogger
// ClientConn to the xDS gRPC server. Owned by the parent xdsClient.
diff --git a/xds/internal/client/v3/loadreport.go b/xds/internal/client/v3/loadreport.go
index beca34c..74e1863 100644
--- a/xds/internal/client/v3/loadreport.go
+++ b/xds/internal/client/v3/loadreport.go
@@ -26,17 +26,18 @@
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
+ "google.golang.org/grpc/xds/internal/client/load"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
- structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal"
- xdsclient "google.golang.org/grpc/xds/internal/client"
)
+const clientFeatureLRSSendAllClusters = "envoy.lrs.supports_send_all_clusters"
+
type lrsStream lrsgrpc.LoadReportingService_StreamLoadStatsClient
func (v3c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) {
@@ -44,7 +45,7 @@
return c.StreamLoadStats(ctx)
}
-func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream, targetName string) error {
+func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error {
stream, ok := s.(lrsStream)
if !ok {
return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s)
@@ -53,72 +54,52 @@
if node == nil {
node = &v3corepb.Node{}
}
- if node.Metadata == nil {
- node.Metadata = &structpb.Struct{}
- }
- if node.Metadata.Fields == nil {
- node.Metadata.Fields = make(map[string]*structpb.Value)
- }
- node.Metadata.Fields[xdsclient.NodeMetadataHostnameKey] = &structpb.Value{
- Kind: &structpb.Value_StringValue{StringValue: targetName},
- }
+ node.ClientFeatures = append(node.ClientFeatures, clientFeatureLRSSendAllClusters)
req := &lrspb.LoadStatsRequest{Node: node}
v3c.logger.Infof("lrs: sending init LoadStatsRequest: %v", req)
return stream.Send(req)
}
-func (v3c *client) HandleLoadStatsResponse(s grpc.ClientStream, clusterName string) (time.Duration, error) {
+func (v3c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) {
stream, ok := s.(lrsStream)
if !ok {
- return 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s)
+ return nil, 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s)
}
resp, err := stream.Recv()
if err != nil {
- return 0, fmt.Errorf("lrs: failed to receive first response: %v", err)
+ return nil, 0, fmt.Errorf("lrs: failed to receive first response: %v", err)
}
v3c.logger.Infof("lrs: received first LoadStatsResponse: %+v", resp)
interval, err := ptypes.Duration(resp.GetLoadReportingInterval())
if err != nil {
- return 0, fmt.Errorf("lrs: failed to convert report interval: %v", err)
+ return nil, 0, fmt.Errorf("lrs: failed to convert report interval: %v", err)
}
- // The LRS client should join the clusters it knows with the cluster
- // list from response, and send loads for them.
- //
- // But the LRS client now only supports one cluster. TODO: extend it to
- // support multiple clusters.
- var clusterFoundInResponse bool
- for _, c := range resp.Clusters {
- if c == clusterName {
- clusterFoundInResponse = true
- }
- }
- if !clusterFoundInResponse {
- return 0, fmt.Errorf("lrs: received clusters %v does not contain expected {%v}", resp.Clusters, clusterName)
- }
if resp.ReportEndpointGranularity {
// TODO: fixme to support per endpoint loads.
- return 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation")
+ return nil, 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation")
}
- return interval, nil
+ clusters := resp.Clusters
+ if resp.SendAllClusters {
+ // Return nil to send stats for all clusters.
+ clusters = nil
+ }
+
+ return clusters, interval, nil
}
-func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, clusterName string) error {
+func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error {
stream, ok := s.(lrsStream)
if !ok {
return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s)
}
- if v3c.loadStore == nil {
- return errors.New("lrs: LoadStore is not initialized")
- }
var clusterStats []*v3endpointpb.ClusterStats
- sds := v3c.loadStore.Stats([]string{clusterName})
- for _, sd := range sds {
+ for _, sd := range loads {
var (
droppedReqs []*v3endpointpb.ClusterStats_DroppedRequests
localityStats []*v3endpointpb.UpstreamLocalityStats
diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go
index cd2710e..408817c 100644
--- a/xds/internal/testutils/fakeclient/client.go
+++ b/xds/internal/testutils/fakeclient/client.go
@@ -156,14 +156,12 @@
type ReportLoadArgs struct {
// Server is the name of the server to which the load is reported.
Server string
- // Cluster is the name of the cluster for which load is reported.
- Cluster string
}
// ReportLoad starts reporting load about clusterName to server.
-func (xdsC *Client) ReportLoad(server string, clusterName string) (cancel func()) {
- xdsC.loadReportCh.Send(ReportLoadArgs{Server: server, Cluster: clusterName})
- return func() {}
+func (xdsC *Client) ReportLoad(server string) (loadStore *load.Store, cancel func()) {
+ xdsC.loadReportCh.Send(ReportLoadArgs{Server: server})
+ return xdsC.loadStore, func() {}
}
// LoadStore returns the underlying load data store.
diff --git a/xds/internal/testutils/fakeserver/server.go b/xds/internal/testutils/fakeserver/server.go
index 4cff720..994f530 100644
--- a/xds/internal/testutils/fakeserver/server.go
+++ b/xds/internal/testutils/fakeserver/server.go
@@ -203,10 +203,10 @@
func (lrsS *lrsServer) StreamLoadStats(s lrsgrpc.LoadReportingService_StreamLoadStatsServer) error {
req, err := s.Recv()
+ lrsS.reqChan.Send(&Request{req, err})
if err != nil {
return err
}
- lrsS.reqChan.Send(&Request{req, err})
select {
case r := <-lrsS.respChan:
@@ -222,12 +222,12 @@
for {
req, err := s.Recv()
+ lrsS.reqChan.Send(&Request{req, err})
if err != nil {
if err == io.EOF {
return nil
}
return err
}
- lrsS.reqChan.Send(&Request{req, err})
}
}