lrs: handle multiple clusters in LRS stream (#3935)

diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go
index 6b124c2..a148db0 100644
--- a/xds/internal/balancer/edsbalancer/eds.go
+++ b/xds/internal/balancer/edsbalancer/eds.go
@@ -177,7 +177,9 @@
 			return
 		}
 
-		x.client.handleUpdate(cfg, u.ResolverState.Attributes)
+		if err := x.client.handleUpdate(cfg, u.ResolverState.Attributes); err != nil {
+			x.logger.Warningf("failed to update xds clients: %v", err)
+		}
 
 		if x.config == nil {
 			x.config = cfg
diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go
index a56acb7..292ea4f 100644
--- a/xds/internal/balancer/edsbalancer/eds_impl_test.go
+++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go
@@ -688,9 +688,10 @@
 	// be used.
 	loadStore := load.NewStore()
 	lsWrapper := &loadStoreWrapper{}
-	lsWrapper.update(loadStore, testClusterNames[0])
+	lsWrapper.updateServiceName(testClusterNames[0])
+	lsWrapper.updateLoadStore(loadStore)
 	cw := &xdsClientWrapper{
-		load: lsWrapper,
+		loadWrapper: lsWrapper,
 	}
 
 	cc := testutils.NewTestClientConn(t)
diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
index f22c162..fe4e996 100644
--- a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
+++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
@@ -19,6 +19,7 @@
 package edsbalancer
 
 import (
+	"fmt"
 	"sync"
 
 	"google.golang.org/grpc"
@@ -35,8 +36,7 @@
 // balancer. It's defined so we can override xdsclientNew function in tests.
 type xdsClientInterface interface {
 	WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func())
-	LoadStore() *load.Store
-	ReportLoad(server string, clusterName string) (cancel func())
+	ReportLoad(server string) (loadStore *load.Store, cancel func())
 	Close()
 }
 
@@ -48,20 +48,34 @@
 )
 
 type loadStoreWrapper struct {
-	mu         sync.RWMutex
+	mu      sync.RWMutex
+	service string
+	// Both store and perCluster will be nil if load reporting is disabled (EDS
+	// response doesn't have LRS server name). Note that methods on Store and
+	// perCluster all handle nil, so there's no need to check nil before calling
+	// them.
 	store      *load.Store
-	service    string
 	perCluster load.PerClusterReporter
 }
 
-func (lsw *loadStoreWrapper) update(store *load.Store, service string) {
+func (lsw *loadStoreWrapper) updateServiceName(service string) {
 	lsw.mu.Lock()
 	defer lsw.mu.Unlock()
-	if store == lsw.store && service == lsw.service {
+	if lsw.service == service {
+		return
+	}
+	lsw.service = service
+	lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
+}
+
+func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
+	lsw.mu.Lock()
+	defer lsw.mu.Unlock()
+	if store == lsw.store {
 		return
 	}
 	lsw.store = store
-	lsw.service = service
+	lsw.perCluster = nil
 	lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
 }
 
@@ -102,7 +116,9 @@
 	// xdsClient could come from attributes, or created with balancerName.
 	xdsClient xdsClientInterface
 
-	load *loadStoreWrapper
+	// loadWrapper is a wrapper with loadOriginal, with clusterName and
+	// edsServiceName. It's used children to report loads.
+	loadWrapper *loadStoreWrapper
 	// edsServiceName is the edsServiceName currently being watched, not
 	// necessary the edsServiceName from service config.
 	//
@@ -127,7 +143,7 @@
 		logger:       logger,
 		newEDSUpdate: newEDSUpdate,
 		bbo:          bbo,
-		load:         &loadStoreWrapper{},
+		loadWrapper:  &loadStoreWrapper{},
 	}
 }
 
@@ -168,12 +184,12 @@
 // the balancerName (from bootstrap file or from service config) changed.
 // - if balancer names are the same, do nothing, and return false
 // - if balancer names are different, create new one, and return true
-func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.Attributes) bool {
+func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.Attributes) (bool, error) {
 	if attr != nil {
 		if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
 			// This will also clear balancerName, to indicate that client is
 			// from attributes.
-			return c.replaceXDSClient(clientFromAttr, "")
+			return c.replaceXDSClient(clientFromAttr, ""), nil
 		}
 	}
 
@@ -184,7 +200,7 @@
 	}
 
 	if c.balancerName == clientConfig.BalancerName {
-		return false
+		return false, nil
 	}
 
 	var dopts []grpc.DialOption
@@ -192,16 +208,20 @@
 		dopts = []grpc.DialOption{grpc.WithContextDialer(dialer)}
 	}
 
+	// TODO: there's no longer a need to read bootstrap file and create a new
+	// xds client. The EDS balancer should always get the xds client from
+	// attributes. Otherwise, this function should just fail. Also, xdsclient
+	// will be shared by multiple clients, so trying to make an xds client is
+	// just the wrong move.
 	newClient, err := xdsclientNew(xdsclient.Options{Config: *clientConfig, DialOpts: dopts})
 	if err != nil {
 		// This should never fail. xdsclientnew does a non-blocking dial, and
 		// all the config passed in should be validated.
 		//
 		// This could leave c.xdsClient as nil if this is the first update.
-		c.logger.Warningf("eds: failed to create xdsClient, error: %v", err)
-		return false
+		return false, fmt.Errorf("eds: failed to create xdsClient, error: %v", err)
 	}
-	return c.replaceXDSClient(newClient, clientConfig.BalancerName)
+	return c.replaceXDSClient(newClient, clientConfig.BalancerName), nil
 }
 
 // startEndpointsWatch starts the EDS watch. Caller can call this when the
@@ -214,10 +234,6 @@
 // This usually means load report needs to be restarted, but this function does
 // NOT do that. Caller needs to call startLoadReport separately.
 func (c *xdsClientWrapper) startEndpointsWatch() {
-	if c.xdsClient == nil {
-		return
-	}
-
 	if c.cancelEndpointsWatch != nil {
 		c.cancelEndpointsWatch()
 	}
@@ -238,31 +254,32 @@
 // Caller can cal this when the loadReportServer name changes, but
 // edsServiceName doesn't (so we only need to restart load reporting, not EDS
 // watch).
-func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) {
-	if c.xdsClient == nil {
-		c.logger.Warningf("xds: xdsClient is nil when trying to start load reporting. This means xdsClient wasn't passed in from the resolver, and xdsClient.New failed")
-		return
-	}
+func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) *load.Store {
 	if c.cancelLoadReport != nil {
 		c.cancelLoadReport()
 	}
 	c.loadReportServer = loadReportServer
+	var loadStore *load.Store
 	if c.loadReportServer != nil {
-		c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, c.edsServiceName)
+		loadStore, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer)
 	}
+	return loadStore
 }
 
 func (c *xdsClientWrapper) loadStore() load.PerClusterReporter {
-	if c == nil || c.load.store == nil {
+	if c == nil {
 		return nil
 	}
-	return c.load
+	return c.loadWrapper
 }
 
 // handleUpdate applies the service config and attributes updates to the client,
 // including updating the xds_client to use, and updating the EDS name to watch.
-func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) {
-	clientChanged := c.updateXDSClient(config, attr)
+func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) error {
+	clientChanged, err := c.updateXDSClient(config, attr)
+	if err != nil {
+		return err
+	}
 
 	// Need to restart EDS watch when one of the following happens:
 	// - the xds_client is updated
@@ -277,14 +294,17 @@
 		//
 		// This is OK for now, because we don't actually expect edsServiceName
 		// to change. Fix this (a bigger change) will happen later.
-		c.load.update(c.xdsClient.LoadStore(), c.edsServiceName)
+		c.loadWrapper.updateServiceName(c.edsServiceName)
 	}
 
 	// Only need to restart load reporting when:
 	// - the loadReportServer name changed
 	if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
-		c.startLoadReport(config.LrsLoadReportingServerName)
+		loadStore := c.startLoadReport(config.LrsLoadReportingServerName)
+		c.loadWrapper.updateLoadStore(loadStore)
 	}
+
+	return nil
 }
 
 func (c *xdsClientWrapper) cancelWatch() {
diff --git a/xds/internal/balancer/edsbalancer/xds_lrs_test.go b/xds/internal/balancer/edsbalancer/xds_lrs_test.go
index 8d888ec..955f544 100644
--- a/xds/internal/balancer/edsbalancer/xds_lrs_test.go
+++ b/xds/internal/balancer/edsbalancer/xds_lrs_test.go
@@ -66,7 +66,7 @@
 	if err != nil {
 		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
 	}
-	if got.Server != "" || got.Cluster != testEDSClusterName {
-		t.Fatalf("xdsClient.ReportLoad called with {%v, %v}: want {\"\", %v}", got.Server, got.Cluster, testEDSClusterName)
+	if got.Server != "" {
+		t.Fatalf("xdsClient.ReportLoad called with {%v}: want {\"\"}", got.Server)
 	}
 }
diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go
index 1361fb1..f8e7673 100644
--- a/xds/internal/balancer/lrs/balancer.go
+++ b/xds/internal/balancer/lrs/balancer.go
@@ -80,7 +80,9 @@
 	// Update load reporting config or xds client. This needs to be done before
 	// updating the child policy because we need the loadStore from the updated
 	// client to be passed to the ccWrapper.
-	b.client.update(newConfig, s.ResolverState.Attributes)
+	if err := b.client.update(newConfig, s.ResolverState.Attributes); err != nil {
+		return err
+	}
 
 	// If child policy is a different type, recreate the sub-balancer.
 	if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
@@ -144,28 +146,41 @@
 // xdsClientInterface contains only the xds_client methods needed by LRS
 // balancer. It's defined so we can override xdsclient in tests.
 type xdsClientInterface interface {
-	LoadStore() *load.Store
-	ReportLoad(server string, clusterName string) func()
+	ReportLoad(server string) (*load.Store, func())
 	Close()
 }
 
 type loadStoreWrapper struct {
 	mu         sync.RWMutex
-	store      *load.Store
 	cluster    string
 	edsService string
+	// Both store and perCluster will be nil if load reporting is disabled (EDS
+	// response doesn't have LRS server name). Note that methods on Store and
+	// perCluster all handle nil, so there's no need to check nil before calling
+	// them.
+	store      *load.Store
 	perCluster load.PerClusterReporter
 }
 
-func (lsw *loadStoreWrapper) update(store *load.Store, cluster, edsService string) {
+func (lsw *loadStoreWrapper) updateClusterAndService(cluster, edsService string) {
 	lsw.mu.Lock()
 	defer lsw.mu.Unlock()
-	if store == lsw.store && cluster == lsw.cluster && edsService == lsw.edsService {
+	if cluster == lsw.cluster && edsService == lsw.edsService {
+		return
+	}
+	lsw.cluster = cluster
+	lsw.edsService = edsService
+	lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
+}
+
+func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
+	lsw.mu.Lock()
+	defer lsw.mu.Unlock()
+	if store == lsw.store {
 		return
 	}
 	lsw.store = store
-	lsw.cluster = cluster
-	lsw.edsService = edsService
+	lsw.perCluster = nil
 	lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
 }
 
@@ -199,44 +214,62 @@
 	clusterName      string
 	edsServiceName   string
 	lrsServerName    string
-	load             *loadStoreWrapper
+	// loadWrapper is a wrapper with loadOriginal, with clusterName and
+	// edsServiceName. It's used children to report loads.
+	loadWrapper *loadStoreWrapper
 }
 
 func newXDSClientWrapper() *xdsClientWrapper {
 	return &xdsClientWrapper{
-		load: &loadStoreWrapper{},
+		loadWrapper: &loadStoreWrapper{},
 	}
 }
 
 // update checks the config and xdsclient, and decides whether it needs to
 // restart the load reporting stream.
-func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) {
+func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) error {
 	var (
-		restartLoadReport bool
-		updateLoadStore   bool
+		restartLoadReport           bool
+		updateLoadClusterAndService bool
 	)
-	if attr != nil {
-		if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
-			if w.c != clientFromAttr {
-				// xds client is different, restart.
-				restartLoadReport = true
-				updateLoadStore = true
-				w.c = clientFromAttr
-			}
-		}
+
+	if attr == nil {
+		return fmt.Errorf("lrs: failed to get xdsClient from attributes: attributes is nil")
+	}
+	clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface)
+	if clientFromAttr == nil {
+		return fmt.Errorf("lrs: failed to get xdsClient from attributes: xdsClient not found in attributes")
+	}
+
+	if w.c != clientFromAttr {
+		// xds client is different, restart.
+		restartLoadReport = true
+		w.c = clientFromAttr
 	}
 
 	// ClusterName is different, restart. ClusterName is from ClusterName and
 	// EdsServiceName.
 	if w.clusterName != newConfig.ClusterName {
-		updateLoadStore = true
+		updateLoadClusterAndService = true
 		w.clusterName = newConfig.ClusterName
 	}
 	if w.edsServiceName != newConfig.EdsServiceName {
-		updateLoadStore = true
+		updateLoadClusterAndService = true
 		w.edsServiceName = newConfig.EdsServiceName
 	}
 
+	if updateLoadClusterAndService {
+		// This updates the clusterName and serviceName that will reported for the
+		// loads. The update here is too early, the perfect timing is when the
+		// picker is updated with the new connection. But from this balancer's point
+		// of view, it's impossible to tell.
+		//
+		// On the other hand, this will almost never happen. Each LRS policy
+		// shouldn't get updated config. The parent should do a graceful switch when
+		// the clusterName or serviceName is changed.
+		w.loadWrapper.updateClusterAndService(w.clusterName, w.edsServiceName)
+	}
+
 	if w.lrsServerName != newConfig.LrsLoadReportingServerName {
 		// LrsLoadReportingServerName is different, load should be report to a
 		// different server, restart.
@@ -244,34 +277,23 @@
 		w.lrsServerName = newConfig.LrsLoadReportingServerName
 	}
 
-	// This updates the clusterName and serviceName that will reported for the
-	// loads. The update here is too early, the perfect timing is when the
-	// picker is updated with the new connection. But from this balancer's point
-	// of view, it's impossible to tell.
-	//
-	// On the other hand, this will almost never happen. Each LRS policy
-	// shouldn't get updated config. The parent should do a graceful switch when
-	// the clusterName or serviceName is changed.
-	if updateLoadStore {
-		w.load.update(w.c.LoadStore(), w.clusterName, w.edsServiceName)
-	}
-
 	if restartLoadReport {
 		if w.cancelLoadReport != nil {
 			w.cancelLoadReport()
 			w.cancelLoadReport = nil
 		}
+		var loadStore *load.Store
 		if w.c != nil {
-			w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName, w.clusterName)
+			loadStore, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName)
 		}
+		w.loadWrapper.updateLoadStore(loadStore)
 	}
+
+	return nil
 }
 
 func (w *xdsClientWrapper) loadStore() load.PerClusterReporter {
-	if w.load.store == nil {
-		return nil
-	}
-	return w.load
+	return w.loadWrapper
 }
 
 func (w *xdsClientWrapper) close() {
diff --git a/xds/internal/balancer/lrs/balancer_test.go b/xds/internal/balancer/lrs/balancer_test.go
index 789cfea..38dd573 100644
--- a/xds/internal/balancer/lrs/balancer_test.go
+++ b/xds/internal/balancer/lrs/balancer_test.go
@@ -84,8 +84,8 @@
 	if err != nil {
 		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
 	}
-	if got.Server != testLRSServerName || got.Cluster != testClusterName {
-		t.Fatalf("xdsClient.ReportLoad called with {%q, %q}: want {%q, %q}", got.Server, got.Cluster, testLRSServerName, testClusterName)
+	if got.Server != testLRSServerName {
+		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
 	}
 
 	sc1 := <-cc.NewSubConnCh
diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go
index 0d61963..3478569 100644
--- a/xds/internal/client/client.go
+++ b/xds/internal/client/client.go
@@ -30,6 +30,7 @@
 	v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
 	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
 	"github.com/golang/protobuf/proto"
+	"google.golang.org/grpc/xds/internal/client/load"
 
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/internal/backoff"
@@ -39,7 +40,6 @@
 	"google.golang.org/grpc/keepalive"
 	"google.golang.org/grpc/xds/internal"
 	"google.golang.org/grpc/xds/internal/client/bootstrap"
-	"google.golang.org/grpc/xds/internal/client/load"
 	"google.golang.org/grpc/xds/internal/version"
 )
 
@@ -78,9 +78,6 @@
 	// Backoff returns the amount of time to backoff before retrying broken
 	// streams.
 	Backoff func(int) time.Duration
-	// LoadStore contains load reports which need to be pushed to the management
-	// server.
-	LoadStore *load.Store
 	// Logger provides enhanced logging capabilities.
 	Logger *grpclog.PrefixLogger
 }
@@ -99,6 +96,12 @@
 
 // APIClient represents the functionality provided by transport protocol
 // version specific implementations of the xDS client.
+//
+// TODO: unexport this interface and all the methods after the PR to make
+// xdsClient sharable by clients. AddWatch and RemoveWatch are exported for
+// v2/v3 to override because they need to keep track of LDS name for RDS to use.
+// After the share xdsClient change, that's no longer necessary. After that, we
+// will still keep this interface for testing purposes.
 type APIClient interface {
 	// AddWatch adds a watch for an xDS resource given its type and name.
 	AddWatch(ResourceType, string)
@@ -107,21 +110,18 @@
 	// given its type and name.
 	RemoveWatch(ResourceType, string)
 
-	// ReportLoad starts an LRS stream to periodically report load using the
+	// reportLoad starts an LRS stream to periodically report load using the
 	// provided ClientConn, which represent a connection to the management
 	// server.
-	ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions)
+	reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions)
 
 	// Close cleans up resources allocated by the API client.
 	Close()
 }
 
-// LoadReportingOptions contains configuration knobs for reporting load data.
-type LoadReportingOptions struct {
-	// ClusterName is the cluster name for which load is being reported.
-	ClusterName string
-	// TargetName is the target of the parent ClientConn.
-	TargetName string
+// loadReportingOptions contains configuration knobs for reporting load data.
+type loadReportingOptions struct {
+	loadStore *load.Store
 }
 
 // UpdateHandler receives and processes (by taking appropriate actions) xDS
@@ -328,7 +328,6 @@
 	opts      Options
 	cc        *grpc.ClientConn // Connection to the xDS server
 	apiClient APIClient
-	loadStore *load.Store
 
 	logger *grpclog.PrefixLogger
 
@@ -342,6 +341,11 @@
 	cdsCache    map[string]ClusterUpdate
 	edsWatchers map[string]map[*watchInfo]bool
 	edsCache    map[string]EndpointsUpdate
+
+	// Changes to map lrsClients and the lrsClient inside the map need to be
+	// protected by lrsMu.
+	lrsMu      sync.Mutex
+	lrsClients map[string]*lrsClient
 }
 
 // New returns a new xdsClient configured with opts.
@@ -382,9 +386,8 @@
 	}
 
 	c := &Client{
-		done:      grpcsync.NewEvent(),
-		opts:      opts,
-		loadStore: load.NewStore(),
+		done: grpcsync.NewEvent(),
+		opts: opts,
 
 		updateCh:    buffer.NewUnbounded(),
 		ldsWatchers: make(map[string]map[*watchInfo]bool),
@@ -395,6 +398,7 @@
 		cdsCache:    make(map[string]ClusterUpdate),
 		edsWatchers: make(map[string]map[*watchInfo]bool),
 		edsCache:    make(map[string]EndpointsUpdate),
+		lrsClients:  make(map[string]*lrsClient),
 	}
 
 	cc, err := grpc.Dial(opts.Config.BalancerName, dopts...)
@@ -410,7 +414,6 @@
 		Parent:    c,
 		NodeProto: opts.Config.NodeProto,
 		Backoff:   backoff.DefaultExponential.Backoff,
-		LoadStore: c.loadStore,
 		Logger:    c.logger,
 	})
 	if err != nil {
diff --git a/xds/internal/client/client_loadreport.go b/xds/internal/client/client_loadreport.go
index e52c3b9..e91316b 100644
--- a/xds/internal/client/client_loadreport.go
+++ b/xds/internal/client/client_loadreport.go
@@ -24,53 +24,116 @@
 	"google.golang.org/grpc/xds/internal/client/load"
 )
 
-// NodeMetadataHostnameKey is the metadata key for specifying the target name in
-// the node proto of an LRS request.
-const NodeMetadataHostnameKey = "PROXYLESS_CLIENT_HOSTNAME"
-
-// LoadStore returns the underlying load data store used by the xDS client.
-func (c *Client) LoadStore() *load.Store {
-	return c.loadStore
-}
-
-// ReportLoad sends the load of the given clusterName to the given server. If
-// the server is not an empty string, and is different from the xds server, a
-// new ClientConn will be created.
+// ReportLoad starts an load reporting stream to the given server. If the server
+// is not an empty string, and is different from the xds server, a new
+// ClientConn will be created.
 //
 // The same options used for creating the Client will be used (including
 // NodeProto, and dial options if necessary).
 //
-// It returns a function to cancel the load reporting stream. If server is
-// different from xds server, the ClientConn will also be closed.
-func (c *Client) ReportLoad(server string, clusterName string) func() {
-	var (
-		cc      *grpc.ClientConn
-		closeCC bool
-	)
-	c.logger.Infof("Starting load report to server: %s", server)
-	if server == "" || server == c.opts.Config.BalancerName {
-		cc = c.cc
+// It returns a Store for the user to report loads, a function to cancel the
+// load reporting stream.
+func (c *Client) ReportLoad(server string) (*load.Store, func()) {
+	c.lrsMu.Lock()
+	defer c.lrsMu.Unlock()
+
+	// If there's already a client to this server, use it. Otherwise, create
+	// one.
+	lrsC, ok := c.lrsClients[server]
+	if !ok {
+		lrsC = newLRSClient(c, server)
+		c.lrsClients[server] = lrsC
+	}
+
+	store := lrsC.ref()
+	return store, func() {
+		// This is a callback, need to hold lrsMu.
+		c.lrsMu.Lock()
+		defer c.lrsMu.Unlock()
+		if lrsC.unRef() {
+			// Delete the lrsClient from map if this is the last reference.
+			delete(c.lrsClients, server)
+		}
+	}
+}
+
+// lrsClient maps to one lrsServer. It contains:
+// - a ClientConn to this server (only if it's different from the xds server)
+// - a load.Store that contains loads only for this server
+type lrsClient struct {
+	parent *Client
+	server string
+
+	cc           *grpc.ClientConn // nil if the server is same as the xds server
+	refCount     int
+	cancelStream func()
+	loadStore    *load.Store
+}
+
+// newLRSClient creates a new LRS stream to the server.
+func newLRSClient(parent *Client, server string) *lrsClient {
+	return &lrsClient{
+		parent:   parent,
+		server:   server,
+		refCount: 0,
+	}
+}
+
+// ref increments the refCount. If this is the first ref, it starts the LRS stream.
+//
+// Not thread-safe, caller needs to synchronize.
+func (lrsC *lrsClient) ref() *load.Store {
+	lrsC.refCount++
+	if lrsC.refCount == 1 {
+		lrsC.startStream()
+	}
+	return lrsC.loadStore
+}
+
+// unRef decrements the refCount, and closes the stream if refCount reaches 0
+// (and close the cc if cc is not xDS cc). It returns whether refCount reached 0
+// after this call.
+//
+// Not thread-safe, caller needs to synchronize.
+func (lrsC *lrsClient) unRef() (closed bool) {
+	lrsC.refCount--
+	if lrsC.refCount != 0 {
+		return false
+	}
+	lrsC.parent.logger.Infof("Stopping load report to server: %s", lrsC.server)
+	lrsC.cancelStream()
+	if lrsC.cc != nil {
+		lrsC.cc.Close()
+	}
+	return true
+}
+
+// startStream starts the LRS stream to the server. If server is not the same
+// xDS server from the parent, it also creates a ClientConn.
+func (lrsC *lrsClient) startStream() {
+	var cc *grpc.ClientConn
+
+	lrsC.parent.logger.Infof("Starting load report to server: %s", lrsC.server)
+	if lrsC.server == "" || lrsC.server == lrsC.parent.opts.Config.BalancerName {
+		// Reuse the xDS client if server is the same.
+		cc = lrsC.parent.cc
 	} else {
-		c.logger.Infof("LRS server is different from xDS server, starting a new ClientConn")
-		dopts := append([]grpc.DialOption{c.opts.Config.Creds}, c.opts.DialOpts...)
-		ccNew, err := grpc.Dial(server, dopts...)
+		lrsC.parent.logger.Infof("LRS server is different from xDS server, starting a new ClientConn")
+		dopts := append([]grpc.DialOption{lrsC.parent.opts.Config.Creds}, lrsC.parent.opts.DialOpts...)
+		ccNew, err := grpc.Dial(lrsC.server, dopts...)
 		if err != nil {
 			// An error from a non-blocking dial indicates something serious.
-			c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err)
-			return func() {}
+			lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
+			return
 		}
 		cc = ccNew
-		closeCC = true
+		lrsC.cc = ccNew
 	}
-	ctx, cancel := context.WithCancel(context.Background())
-	go c.apiClient.ReportLoad(ctx, c.cc, LoadReportingOptions{
-		ClusterName: clusterName,
-		TargetName:  c.opts.TargetName,
-	})
-	return func() {
-		cancel()
-		if closeCC {
-			cc.Close()
-		}
-	}
+
+	var ctx context.Context
+	ctx, lrsC.cancelStream = context.WithCancel(context.Background())
+
+	// Create the store and stream.
+	lrsC.loadStore = load.NewStore()
+	go lrsC.parent.apiClient.reportLoad(ctx, cc, loadReportingOptions{loadStore: lrsC.loadStore})
 }
diff --git a/xds/internal/client/client_loadreport_test.go b/xds/internal/client/client_loadreport_test.go
new file mode 100644
index 0000000..d426247
--- /dev/null
+++ b/xds/internal/client/client_loadreport_test.go
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright 2020 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package client_test
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
+	endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
+	lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
+	durationpb "github.com/golang/protobuf/ptypes/duration"
+	"github.com/google/go-cmp/cmp"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/internal/grpctest"
+	"google.golang.org/grpc/status"
+	"google.golang.org/grpc/xds/internal/client"
+	"google.golang.org/grpc/xds/internal/client/bootstrap"
+	"google.golang.org/grpc/xds/internal/testutils/fakeserver"
+	"google.golang.org/grpc/xds/internal/version"
+	"google.golang.org/protobuf/testing/protocmp"
+
+	_ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client.
+)
+
+const (
+	defaultTestTimeout      = 5 * time.Second
+	defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
+)
+
+type s struct {
+	grpctest.Tester
+}
+
+func Test(t *testing.T) {
+	grpctest.RunSubTests(t, s{})
+}
+
+func (s) TestLRSClient(t *testing.T) {
+	fs, sCleanup, err := fakeserver.StartServer()
+	if err != nil {
+		t.Fatalf("failed to start fake xDS server: %v", err)
+	}
+	defer sCleanup()
+
+	xdsC, err := client.New(client.Options{
+		Config: bootstrap.Config{
+			BalancerName: fs.Address,
+			Creds:        grpc.WithInsecure(),
+			NodeProto:    &v2corepb.Node{},
+			TransportAPI: version.TransportV2,
+		},
+	})
+	if err != nil {
+		t.Fatalf("failed to create xds client: %v", err)
+	}
+	defer xdsC.Close()
+	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+	defer cancel()
+	if u, err := fs.NewConnChan.Receive(ctx); err != nil {
+		t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
+	}
+
+	// Report to the same address should not create new ClientConn.
+	store1, lrsCancel1 := xdsC.ReportLoad(fs.Address)
+	defer lrsCancel1()
+	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := fs.NewConnChan.Receive(sCtx); err != context.DeadlineExceeded {
+		t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err)
+	}
+
+	fs2, sCleanup2, err := fakeserver.StartServer()
+	if err != nil {
+		t.Fatalf("failed to start fake xDS server: %v", err)
+	}
+	defer sCleanup2()
+
+	// Report to a different address should create new ClientConn.
+	store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address)
+	defer lrsCancel2()
+	if u, err := fs2.NewConnChan.Receive(ctx); err != nil {
+		t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
+	}
+
+	if store1 == store2 {
+		t.Fatalf("got same store for different servers, want different")
+	}
+
+	if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil {
+		t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
+	}
+	store2.PerCluster("cluster", "eds").CallDropped("test")
+
+	// Send one resp to the client.
+	fs2.LRSResponseChan <- &fakeserver.Response{
+		Resp: &lrspb.LoadStatsResponse{
+			SendAllClusters:       true,
+			LoadReportingInterval: &durationpb.Duration{Nanos: 50000000},
+		},
+	}
+
+	// Server should receive a req with the loads.
+	u, err := fs2.LRSRequestChan.Receive(ctx)
+	if err != nil {
+		t.Fatalf("unexpected LRS request: %v, %v, want error canceled", u, err)
+	}
+	receivedLoad := u.(*fakeserver.Request).Req.(*lrspb.LoadStatsRequest).ClusterStats
+	if len(receivedLoad) <= 0 {
+		t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test")
+	}
+	receivedLoad[0].LoadReportInterval = nil
+	want := (&endpointpb.ClusterStats{
+		ClusterName:          "cluster",
+		ClusterServiceName:   "eds",
+		TotalDroppedRequests: 1,
+		DroppedRequests:      []*endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}},
+	})
+	if d := cmp.Diff(want, receivedLoad[0], protocmp.Transform()); d != "" {
+		t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test, diff (-want +got):\n%s", d)
+	}
+
+	// Cancel this load reporting stream, server should see error canceled.
+	lrsCancel2()
+
+	// Server should receive a stream canceled error.
+	if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil || status.Code(u.(*fakeserver.Request).Err) != codes.Canceled {
+		t.Errorf("unexpected LRS request: %v, %v, want error canceled", u, err)
+	}
+}
diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go
index 4a53138..a2a92a2 100644
--- a/xds/internal/client/client_test.go
+++ b/xds/internal/client/client_test.go
@@ -115,7 +115,7 @@
 	c.removeWatches[resourceType].Send(resourceName)
 }
 
-func (c *testAPIClient) ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions) {
+func (c *testAPIClient) reportLoad(context.Context, *grpc.ClientConn, loadReportingOptions) {
 }
 
 func (c *testAPIClient) Close() {}
diff --git a/xds/internal/client/transport_helper.go b/xds/internal/client/transport_helper.go
index 607f26f..b286a61 100644
--- a/xds/internal/client/transport_helper.go
+++ b/xds/internal/client/transport_helper.go
@@ -24,6 +24,7 @@
 	"time"
 
 	"github.com/golang/protobuf/proto"
+	"google.golang.org/grpc/xds/internal/client/load"
 
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/internal/buffer"
@@ -71,19 +72,21 @@
 	NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error)
 
 	// SendFirstLoadStatsRequest constructs and sends the first request on the
-	// LRS stream. This contains the node proto with appropriate metadata
-	// fields.
-	SendFirstLoadStatsRequest(s grpc.ClientStream, targetName string) error
+	// LRS stream.
+	SendFirstLoadStatsRequest(s grpc.ClientStream) error
 
 	// HandleLoadStatsResponse receives the first response from the server which
 	// contains the load reporting interval and the clusters for which the
 	// server asks the client to report load for.
-	HandleLoadStatsResponse(s grpc.ClientStream, clusterName string) (time.Duration, error)
+	//
+	// If the response sets SendAllClusters to true, the returned clusters is
+	// nil.
+	HandleLoadStatsResponse(s grpc.ClientStream) (clusters []string, _ time.Duration, _ error)
 
 	// SendLoadStatsRequest will be invoked at regular intervals to send load
 	// report with load data reported since the last time this method was
 	// invoked.
-	SendLoadStatsRequest(s grpc.ClientStream, clusterName string) error
+	SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error
 }
 
 // TransportHelper contains all xDS transport protocol related functionality
@@ -443,9 +446,9 @@
 	return target, rType, version, nonce, send
 }
 
-// ReportLoad starts an LRS stream to report load data to the management server.
+// reportLoad starts an LRS stream to report load data to the management server.
 // It blocks until the context is cancelled.
-func (t *TransportHelper) ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions) {
+func (t *TransportHelper) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions) {
 	retries := 0
 	for {
 		if ctx.Err() != nil {
@@ -472,23 +475,23 @@
 		}
 		logger.Infof("lrs: created LRS stream")
 
-		if err := t.vClient.SendFirstLoadStatsRequest(stream, opts.TargetName); err != nil {
+		if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil {
 			logger.Warningf("lrs: failed to send first request: %v", err)
 			continue
 		}
 
-		interval, err := t.vClient.HandleLoadStatsResponse(stream, opts.ClusterName)
+		clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream)
 		if err != nil {
 			logger.Warning(err)
 			continue
 		}
 
 		retries = 0
-		t.sendLoads(ctx, stream, opts.ClusterName, interval)
+		t.sendLoads(ctx, stream, opts.loadStore, clusters, interval)
 	}
 }
 
-func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStream, clusterName string, interval time.Duration) {
+func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStream, store *load.Store, clusterNames []string, interval time.Duration) {
 	tick := time.NewTicker(interval)
 	defer tick.Stop()
 	for {
@@ -497,7 +500,7 @@
 		case <-ctx.Done():
 			return
 		}
-		if err := t.vClient.SendLoadStatsRequest(stream, clusterName); err != nil {
+		if err := t.vClient.SendLoadStatsRequest(stream, store.Stats(clusterNames)); err != nil {
 			logger.Warning(err)
 			return
 		}
diff --git a/xds/internal/client/v2/client.go b/xds/internal/client/v2/client.go
index 7b063ad..433e690 100644
--- a/xds/internal/client/v2/client.go
+++ b/xds/internal/client/v2/client.go
@@ -28,7 +28,6 @@
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/internal/grpclog"
 	xdsclient "google.golang.org/grpc/xds/internal/client"
-	"google.golang.org/grpc/xds/internal/client/load"
 	"google.golang.org/grpc/xds/internal/version"
 
 	v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
@@ -69,7 +68,6 @@
 		cc:        cc,
 		parent:    opts.Parent,
 		nodeProto: nodeProto,
-		loadStore: opts.LoadStore,
 		logger:    opts.Logger,
 	}
 	v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background())
@@ -88,7 +86,6 @@
 	ctx       context.Context
 	cancelCtx context.CancelFunc
 	parent    xdsclient.UpdateHandler
-	loadStore *load.Store
 	logger    *grpclog.PrefixLogger
 
 	// ClientConn to the xDS gRPC server. Owned by the parent xdsClient.
diff --git a/xds/internal/client/v2/loadreport.go b/xds/internal/client/v2/loadreport.go
index a06dcb8..69405fc 100644
--- a/xds/internal/client/v2/loadreport.go
+++ b/xds/internal/client/v2/loadreport.go
@@ -26,17 +26,18 @@
 
 	"github.com/golang/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
+	"google.golang.org/grpc/xds/internal/client/load"
 
 	v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
 	v2endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
 	lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
 	lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
-	structpb "github.com/golang/protobuf/ptypes/struct"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/xds/internal"
-	xdsclient "google.golang.org/grpc/xds/internal/client"
 )
 
+const clientFeatureLRSSendAllClusters = "envoy.lrs.supports_send_all_clusters"
+
 type lrsStream lrsgrpc.LoadReportingService_StreamLoadStatsClient
 
 func (v2c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) {
@@ -44,7 +45,7 @@
 	return c.StreamLoadStats(ctx)
 }
 
-func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream, targetName string) error {
+func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error {
 	stream, ok := s.(lrsStream)
 	if !ok {
 		return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s)
@@ -53,72 +54,52 @@
 	if node == nil {
 		node = &v2corepb.Node{}
 	}
-	if node.Metadata == nil {
-		node.Metadata = &structpb.Struct{}
-	}
-	if node.Metadata.Fields == nil {
-		node.Metadata.Fields = make(map[string]*structpb.Value)
-	}
-	node.Metadata.Fields[xdsclient.NodeMetadataHostnameKey] = &structpb.Value{
-		Kind: &structpb.Value_StringValue{StringValue: targetName},
-	}
+	node.ClientFeatures = append(node.ClientFeatures, clientFeatureLRSSendAllClusters)
 
 	req := &lrspb.LoadStatsRequest{Node: node}
 	v2c.logger.Infof("lrs: sending init LoadStatsRequest: %v", req)
 	return stream.Send(req)
 }
 
-func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream, clusterName string) (time.Duration, error) {
+func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) {
 	stream, ok := s.(lrsStream)
 	if !ok {
-		return 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s)
+		return nil, 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s)
 	}
 
 	resp, err := stream.Recv()
 	if err != nil {
-		return 0, fmt.Errorf("lrs: failed to receive first response: %v", err)
+		return nil, 0, fmt.Errorf("lrs: failed to receive first response: %v", err)
 	}
 	v2c.logger.Infof("lrs: received first LoadStatsResponse: %+v", resp)
 
 	interval, err := ptypes.Duration(resp.GetLoadReportingInterval())
 	if err != nil {
-		return 0, fmt.Errorf("lrs: failed to convert report interval: %v", err)
+		return nil, 0, fmt.Errorf("lrs: failed to convert report interval: %v", err)
 	}
 
-	// The LRS client should join the clusters it knows with the cluster
-	// list from response, and send loads for them.
-	//
-	// But the LRS client now only supports one cluster. TODO: extend it to
-	// support multiple clusters.
-	var clusterFoundInResponse bool
-	for _, c := range resp.Clusters {
-		if c == clusterName {
-			clusterFoundInResponse = true
-		}
-	}
-	if !clusterFoundInResponse {
-		return 0, fmt.Errorf("lrs: received clusters %v does not contain expected {%v}", resp.Clusters, clusterName)
-	}
 	if resp.ReportEndpointGranularity {
 		// TODO: fixme to support per endpoint loads.
-		return 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation")
+		return nil, 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation")
 	}
 
-	return interval, nil
+	clusters := resp.Clusters
+	if resp.SendAllClusters {
+		// Return nil to send stats for all clusters.
+		clusters = nil
+	}
+
+	return clusters, interval, nil
 }
 
-func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, clusterName string) error {
+func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error {
 	stream, ok := s.(lrsStream)
 	if !ok {
 		return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s)
 	}
-	if v2c.loadStore == nil {
-		return errors.New("lrs: LoadStore is not initialized")
-	}
 
 	var clusterStats []*v2endpointpb.ClusterStats
-	sds := v2c.loadStore.Stats([]string{clusterName})
-	for _, sd := range sds {
+	for _, sd := range loads {
 		var (
 			droppedReqs   []*v2endpointpb.ClusterStats_DroppedRequests
 			localityStats []*v2endpointpb.UpstreamLocalityStats
diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go
index 5d8d719..00e74e5 100644
--- a/xds/internal/client/v3/client.go
+++ b/xds/internal/client/v3/client.go
@@ -29,7 +29,6 @@
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/internal/grpclog"
 	xdsclient "google.golang.org/grpc/xds/internal/client"
-	"google.golang.org/grpc/xds/internal/client/load"
 	"google.golang.org/grpc/xds/internal/version"
 
 	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
@@ -69,7 +68,6 @@
 		cc:        cc,
 		parent:    opts.Parent,
 		nodeProto: nodeProto,
-		loadStore: opts.LoadStore,
 		logger:    opts.Logger,
 	}
 	v3c.ctx, v3c.cancelCtx = context.WithCancel(context.Background())
@@ -88,7 +86,6 @@
 	ctx       context.Context
 	cancelCtx context.CancelFunc
 	parent    xdsclient.UpdateHandler
-	loadStore *load.Store
 	logger    *grpclog.PrefixLogger
 
 	// ClientConn to the xDS gRPC server. Owned by the parent xdsClient.
diff --git a/xds/internal/client/v3/loadreport.go b/xds/internal/client/v3/loadreport.go
index beca34c..74e1863 100644
--- a/xds/internal/client/v3/loadreport.go
+++ b/xds/internal/client/v3/loadreport.go
@@ -26,17 +26,18 @@
 
 	"github.com/golang/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
+	"google.golang.org/grpc/xds/internal/client/load"
 
 	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
 	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
 	lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
 	lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
-	structpb "github.com/golang/protobuf/ptypes/struct"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/xds/internal"
-	xdsclient "google.golang.org/grpc/xds/internal/client"
 )
 
+const clientFeatureLRSSendAllClusters = "envoy.lrs.supports_send_all_clusters"
+
 type lrsStream lrsgrpc.LoadReportingService_StreamLoadStatsClient
 
 func (v3c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) {
@@ -44,7 +45,7 @@
 	return c.StreamLoadStats(ctx)
 }
 
-func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream, targetName string) error {
+func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error {
 	stream, ok := s.(lrsStream)
 	if !ok {
 		return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s)
@@ -53,72 +54,52 @@
 	if node == nil {
 		node = &v3corepb.Node{}
 	}
-	if node.Metadata == nil {
-		node.Metadata = &structpb.Struct{}
-	}
-	if node.Metadata.Fields == nil {
-		node.Metadata.Fields = make(map[string]*structpb.Value)
-	}
-	node.Metadata.Fields[xdsclient.NodeMetadataHostnameKey] = &structpb.Value{
-		Kind: &structpb.Value_StringValue{StringValue: targetName},
-	}
+	node.ClientFeatures = append(node.ClientFeatures, clientFeatureLRSSendAllClusters)
 
 	req := &lrspb.LoadStatsRequest{Node: node}
 	v3c.logger.Infof("lrs: sending init LoadStatsRequest: %v", req)
 	return stream.Send(req)
 }
 
-func (v3c *client) HandleLoadStatsResponse(s grpc.ClientStream, clusterName string) (time.Duration, error) {
+func (v3c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) {
 	stream, ok := s.(lrsStream)
 	if !ok {
-		return 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s)
+		return nil, 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s)
 	}
 
 	resp, err := stream.Recv()
 	if err != nil {
-		return 0, fmt.Errorf("lrs: failed to receive first response: %v", err)
+		return nil, 0, fmt.Errorf("lrs: failed to receive first response: %v", err)
 	}
 	v3c.logger.Infof("lrs: received first LoadStatsResponse: %+v", resp)
 
 	interval, err := ptypes.Duration(resp.GetLoadReportingInterval())
 	if err != nil {
-		return 0, fmt.Errorf("lrs: failed to convert report interval: %v", err)
+		return nil, 0, fmt.Errorf("lrs: failed to convert report interval: %v", err)
 	}
 
-	// The LRS client should join the clusters it knows with the cluster
-	// list from response, and send loads for them.
-	//
-	// But the LRS client now only supports one cluster. TODO: extend it to
-	// support multiple clusters.
-	var clusterFoundInResponse bool
-	for _, c := range resp.Clusters {
-		if c == clusterName {
-			clusterFoundInResponse = true
-		}
-	}
-	if !clusterFoundInResponse {
-		return 0, fmt.Errorf("lrs: received clusters %v does not contain expected {%v}", resp.Clusters, clusterName)
-	}
 	if resp.ReportEndpointGranularity {
 		// TODO: fixme to support per endpoint loads.
-		return 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation")
+		return nil, 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation")
 	}
 
-	return interval, nil
+	clusters := resp.Clusters
+	if resp.SendAllClusters {
+		// Return nil to send stats for all clusters.
+		clusters = nil
+	}
+
+	return clusters, interval, nil
 }
 
-func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, clusterName string) error {
+func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error {
 	stream, ok := s.(lrsStream)
 	if !ok {
 		return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s)
 	}
-	if v3c.loadStore == nil {
-		return errors.New("lrs: LoadStore is not initialized")
-	}
 
 	var clusterStats []*v3endpointpb.ClusterStats
-	sds := v3c.loadStore.Stats([]string{clusterName})
-	for _, sd := range sds {
+	for _, sd := range loads {
 		var (
 			droppedReqs   []*v3endpointpb.ClusterStats_DroppedRequests
 			localityStats []*v3endpointpb.UpstreamLocalityStats
diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go
index cd2710e..408817c 100644
--- a/xds/internal/testutils/fakeclient/client.go
+++ b/xds/internal/testutils/fakeclient/client.go
@@ -156,14 +156,12 @@
 type ReportLoadArgs struct {
 	// Server is the name of the server to which the load is reported.
 	Server string
-	// Cluster is the name of the cluster for which load is reported.
-	Cluster string
 }
 
 // ReportLoad starts reporting load about clusterName to server.
-func (xdsC *Client) ReportLoad(server string, clusterName string) (cancel func()) {
-	xdsC.loadReportCh.Send(ReportLoadArgs{Server: server, Cluster: clusterName})
-	return func() {}
+func (xdsC *Client) ReportLoad(server string) (loadStore *load.Store, cancel func()) {
+	xdsC.loadReportCh.Send(ReportLoadArgs{Server: server})
+	return xdsC.loadStore, func() {}
 }
 
 // LoadStore returns the underlying load data store.
diff --git a/xds/internal/testutils/fakeserver/server.go b/xds/internal/testutils/fakeserver/server.go
index 4cff720..994f530 100644
--- a/xds/internal/testutils/fakeserver/server.go
+++ b/xds/internal/testutils/fakeserver/server.go
@@ -203,10 +203,10 @@
 
 func (lrsS *lrsServer) StreamLoadStats(s lrsgrpc.LoadReportingService_StreamLoadStatsServer) error {
 	req, err := s.Recv()
+	lrsS.reqChan.Send(&Request{req, err})
 	if err != nil {
 		return err
 	}
-	lrsS.reqChan.Send(&Request{req, err})
 
 	select {
 	case r := <-lrsS.respChan:
@@ -222,12 +222,12 @@
 
 	for {
 		req, err := s.Recv()
+		lrsS.reqChan.Send(&Request{req, err})
 		if err != nil {
 			if err == io.EOF {
 				return nil
 			}
 			return err
 		}
-		lrsS.reqChan.Send(&Request{req, err})
 	}
 }