lrs: add Store.Stats() to report loads for multiple clusters (#3905)

- unexport `perClusterStore` and it's `stats()`
- add `Store.Stats(clusterNames)` to report loads for the given clusters
  - refactor store's map to a two layer map
- move `lastLoadReportAt` from client ton the load store, because a client can now have multiple clusters, each with a different `lastLoadReportAt`
  - all tests will ignore `ReportInterval` when comparing Data
diff --git a/xds/internal/balancer/balancergroup/balancergroup_test.go b/xds/internal/balancer/balancergroup/balancergroup_test.go
index daf8f88..0474e7c 100644
--- a/xds/internal/balancer/balancergroup/balancergroup_test.go
+++ b/xds/internal/balancer/balancergroup/balancergroup_test.go
@@ -70,7 +70,7 @@
 	}
 }
 
-func newTestBalancerGroup(t *testing.T, loadStore *load.PerClusterStore) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) {
+func newTestBalancerGroup(t *testing.T, loadStore load.PerClusterReporter) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) {
 	cc := testutils.NewTestClientConn(t)
 	gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
 	gator.Start()
@@ -400,8 +400,12 @@
 }
 
 func (s) TestBalancerGroup_LoadReport(t *testing.T) {
-	loadStore := &load.PerClusterStore{}
-	cc, gator, bg := newTestBalancerGroup(t, loadStore)
+	loadStore := load.NewStore()
+	const (
+		testCluster    = "test-cluster"
+		testEDSService = "test-eds-service"
+	)
+	cc, gator, bg := newTestBalancerGroup(t, loadStore.PerCluster(testCluster, testEDSService))
 
 	backendToBalancerID := make(map[balancer.SubConn]string)
 
@@ -440,7 +444,9 @@
 	// subConns in each group, we expect the picks to be equally split between
 	// the subConns. We do not call Done() on picks routed to sc1, so we expect
 	// these to show up as pending rpcs.
-	wantStoreData := &load.Data{
+	wantStoreData := []*load.Data{{
+		Cluster: testCluster,
+		Service: testEDSService,
 		LocalityStats: map[string]load.LocalityData{
 			testBalancerIDs[0]: {
 				RequestStats: load.RequestData{Succeeded: 10, InProgress: 10},
@@ -461,7 +467,7 @@
 				},
 			},
 		},
-	}
+	}}
 	for i := 0; i < 30; i++ {
 		scst, _ := p1.Pick(balancer.PickInfo{})
 		if scst.Done != nil && scst.SubConn != sc1 {
@@ -476,9 +482,9 @@
 		}
 	}
 
-	gotStoreData := loadStore.Stats()
-	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.EquateApprox(0, 0.1)); diff != "" {
-		t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
+	gotStoreData := loadStore.Stats([]string{testCluster})
+	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.EquateApprox(0, 0.1), cmpopts.IgnoreFields(load.Data{}, "ReportInterval")); diff != "" {
+		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
 	}
 }
 
diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go
index 0febc1b..a56acb7 100644
--- a/xds/internal/balancer/edsbalancer/eds_impl_test.go
+++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go
@@ -687,8 +687,10 @@
 	// implements the LoadStore() method to return the underlying load.Store to
 	// be used.
 	loadStore := load.NewStore()
+	lsWrapper := &loadStoreWrapper{}
+	lsWrapper.update(loadStore, testClusterNames[0])
 	cw := &xdsClientWrapper{
-		load: &loadStoreWrapper{store: loadStore, service: testClusterNames[0]},
+		load: lsWrapper,
 	}
 
 	cc := testutils.NewTestClientConn(t)
@@ -723,12 +725,14 @@
 	// We expect the 10 picks to be split between the localities since they are
 	// of equal weight. And since we only mark the picks routed to sc2 as done,
 	// the picks on sc1 should show up as inProgress.
-	wantStoreData := &load.Data{
+	wantStoreData := []*load.Data{{
+		Cluster: testClusterNames[0],
+		Service: "",
 		LocalityStats: map[string]load.LocalityData{
 			locality1.String(): {RequestStats: load.RequestData{InProgress: 5}},
 			locality2.String(): {RequestStats: load.RequestData{Succeeded: 5}},
 		},
-	}
+	}}
 	for i := 0; i < 10; i++ {
 		scst, _ := p1.Pick(balancer.PickInfo{})
 		if scst.Done != nil && scst.SubConn != sc1 {
@@ -736,8 +740,8 @@
 		}
 	}
 
-	gotStoreData := loadStore.PerCluster(testClusterNames[0], "").Stats()
-	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
-		t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
+	gotStoreData := loadStore.Stats(testClusterNames[0:1])
+	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(load.Data{}, "ReportInterval")); diff != "" {
+		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
 	}
 }
diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
index 99d986f..f22c162 100644
--- a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
+++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
@@ -48,40 +48,45 @@
 )
 
 type loadStoreWrapper struct {
-	mu      sync.RWMutex
-	store   *load.Store
-	service string
+	mu         sync.RWMutex
+	store      *load.Store
+	service    string
+	perCluster load.PerClusterReporter
 }
 
 func (lsw *loadStoreWrapper) update(store *load.Store, service string) {
 	lsw.mu.Lock()
 	defer lsw.mu.Unlock()
+	if store == lsw.store && service == lsw.service {
+		return
+	}
 	lsw.store = store
 	lsw.service = service
+	lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
 }
 
 func (lsw *loadStoreWrapper) CallStarted(locality string) {
 	lsw.mu.RLock()
 	defer lsw.mu.RUnlock()
-	lsw.store.PerCluster(lsw.service, "").CallStarted(locality)
+	lsw.perCluster.CallStarted(locality)
 }
 
 func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
 	lsw.mu.RLock()
 	defer lsw.mu.RUnlock()
-	lsw.store.PerCluster(lsw.service, "").CallFinished(locality, err)
+	lsw.perCluster.CallFinished(locality, err)
 }
 
 func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
 	lsw.mu.RLock()
 	defer lsw.mu.RUnlock()
-	lsw.store.PerCluster(lsw.service, "").CallServerLoad(locality, name, val)
+	lsw.perCluster.CallServerLoad(locality, name, val)
 }
 
 func (lsw *loadStoreWrapper) CallDropped(category string) {
 	lsw.mu.RLock()
 	defer lsw.mu.RUnlock()
-	lsw.store.PerCluster(lsw.service, "").CallDropped(category)
+	lsw.perCluster.CallDropped(category)
 }
 
 // xdsclientWrapper is responsible for getting the xds client from attributes or
diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go
index b5bf300..1361fb1 100644
--- a/xds/internal/balancer/lrs/balancer.go
+++ b/xds/internal/balancer/lrs/balancer.go
@@ -154,38 +154,43 @@
 	store      *load.Store
 	cluster    string
 	edsService string
+	perCluster load.PerClusterReporter
 }
 
 func (lsw *loadStoreWrapper) update(store *load.Store, cluster, edsService string) {
 	lsw.mu.Lock()
 	defer lsw.mu.Unlock()
+	if store == lsw.store && cluster == lsw.cluster && edsService == lsw.edsService {
+		return
+	}
 	lsw.store = store
 	lsw.cluster = cluster
 	lsw.edsService = edsService
+	lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
 }
 
 func (lsw *loadStoreWrapper) CallStarted(locality string) {
 	lsw.mu.RLock()
 	defer lsw.mu.RUnlock()
-	lsw.store.PerCluster(lsw.cluster, lsw.edsService).CallStarted(locality)
+	lsw.perCluster.CallStarted(locality)
 }
 
 func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
 	lsw.mu.RLock()
 	defer lsw.mu.RUnlock()
-	lsw.store.PerCluster(lsw.cluster, lsw.edsService).CallFinished(locality, err)
+	lsw.perCluster.CallFinished(locality, err)
 }
 
 func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
 	lsw.mu.RLock()
 	defer lsw.mu.RUnlock()
-	lsw.store.PerCluster(lsw.cluster, lsw.edsService).CallServerLoad(locality, name, val)
+	lsw.perCluster.CallServerLoad(locality, name, val)
 }
 
 func (lsw *loadStoreWrapper) CallDropped(category string) {
 	lsw.mu.RLock()
 	defer lsw.mu.RUnlock()
-	lsw.store.PerCluster(lsw.cluster, lsw.edsService).CallDropped(category)
+	lsw.perCluster.CallDropped(category)
 }
 
 type xdsClientWrapper struct {
diff --git a/xds/internal/balancer/lrs/balancer_test.go b/xds/internal/balancer/lrs/balancer_test.go
index 386b143..789cfea 100644
--- a/xds/internal/balancer/lrs/balancer_test.go
+++ b/xds/internal/balancer/lrs/balancer_test.go
@@ -116,7 +116,14 @@
 	if loadStore == nil {
 		t.Fatal("loadStore is nil in xdsClient")
 	}
-	sd := loadStore.PerCluster(testClusterName, testServiceName).Stats()
+	sds := loadStore.Stats([]string{testClusterName})
+	if len(sds) == 0 {
+		t.Fatalf("loads for cluster %v not found in store", testClusterName)
+	}
+	sd := sds[0]
+	if sd.Cluster != testClusterName || sd.Service != testServiceName {
+		t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
+	}
 	localityData, ok := sd.LocalityStats[testLocality.String()]
 	if !ok {
 		t.Fatalf("loads for %v not found in store", testLocality)
diff --git a/xds/internal/client/load/store.go b/xds/internal/client/load/store.go
index c0c741f..a6ec1ec 100644
--- a/xds/internal/client/load/store.go
+++ b/xds/internal/client/load/store.go
@@ -20,63 +20,112 @@
 import (
 	"sync"
 	"sync/atomic"
+	"time"
 )
 
 const negativeOneUInt64 = ^uint64(0)
 
-// A pair of cluster and service name. The same cluster can be used by multiple
-// services, and one service can use multiple clusters. So we need a pair with
-// both name to accurately indicate where the load belongs.
-type storeKey struct {
-	cluster string
-	service string
-}
-
 // Store keeps the loads for multiple clusters and services to be reported via
-// LRS. It is safe for concurrent use.
+// LRS. It contains loads to reported to one LRS server. Create multiple stores
+// for multiple servers.
+//
+// It is safe for concurrent use.
 type Store struct {
-	mu       sync.RWMutex
-	clusters map[storeKey]*PerClusterStore
+	// mu only protects the map (2 layers). The read/write to *perClusterStore
+	// doesn't need to hold the mu.
+	mu sync.Mutex
+	// clusters is a map with cluster name as the key. The second layer is a map
+	// with service name as the key. Each value (perClusterStore) contains data
+	// for a (cluster, service) pair.
+	//
+	// Note that new entries are added to this map, but never removed. This is
+	// potentially a memory leak. But the memory is allocated for each new
+	// (cluster,service) pair, and the memory allocated is just pointers and
+	// maps. So this shouldn't get too bad.
+	clusters map[string]map[string]*perClusterStore
 }
 
 // NewStore creates a Store.
 func NewStore() *Store {
 	return &Store{
-		clusters: make(map[storeKey]*PerClusterStore),
+		clusters: make(map[string]map[string]*perClusterStore),
 	}
 }
 
-// PerCluster returns the PerClusterStore for the given clusterName +
+// Stats returns the load data for the given cluster names. Data is returned in
+// a slice with no specific order.
+//
+// If no clusterName is given (an empty slice), all data for all known clusters
+// is returned.
+//
+// If a cluster's Data is empty (no load to report), it's not appended to the
+// returned slice.
+func (s *Store) Stats(clusterNames []string) []*Data {
+	var ret []*Data
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	if len(clusterNames) == 0 {
+		for _, c := range s.clusters {
+			ret = appendClusterStats(ret, c)
+		}
+		return ret
+	}
+
+	for _, n := range clusterNames {
+		if c, ok := s.clusters[n]; ok {
+			ret = appendClusterStats(ret, c)
+		}
+	}
+	return ret
+}
+
+// appendClusterStats gets Data for the given cluster, append to ret, and return
+// the new slice.
+//
+// Data is only appended to ret if it's not empty.
+func appendClusterStats(ret []*Data, cluster map[string]*perClusterStore) []*Data {
+	for _, d := range cluster {
+		data := d.stats()
+		if data == nil {
+			// Skip this data if it doesn't contain any information.
+			continue
+		}
+		ret = append(ret, data)
+	}
+	return ret
+}
+
+// PerCluster returns the perClusterStore for the given clusterName +
 // serviceName.
-func (ls *Store) PerCluster(clusterName, serviceName string) *PerClusterStore {
-	if ls == nil {
+func (s *Store) PerCluster(clusterName, serviceName string) PerClusterReporter {
+	if s == nil {
 		return nil
 	}
 
-	k := storeKey{
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	c, ok := s.clusters[clusterName]
+	if !ok {
+		c = make(map[string]*perClusterStore)
+		s.clusters[clusterName] = c
+	}
+
+	if p, ok := c[serviceName]; ok {
+		return p
+	}
+	p := &perClusterStore{
 		cluster: clusterName,
 		service: serviceName,
 	}
-
-	ls.mu.RLock()
-	if p, ok := ls.clusters[k]; ok {
-		ls.mu.RUnlock()
-		return p
-	}
-	ls.mu.RUnlock()
-
-	ls.mu.Lock()
-	defer ls.mu.Unlock()
-	p, ok := ls.clusters[k]
-	if !ok {
-		p = &PerClusterStore{}
-		ls.clusters[k] = p
-	}
+	c[serviceName] = p
 	return p
 }
 
-// PerClusterStore is a repository for LB policy implementations to report store
-// load data. It is safe for concurrent use.
+// perClusterStore is a repository for LB policy implementations to report store
+// load data. It contains load for a (cluster, edsService) pair.
+//
+// It is safe for concurrent use.
 //
 // TODO(easwars): Use regular maps with mutexes instead of sync.Map here. The
 // latter is optimized for two common use cases: (1) when the entry for a given
@@ -87,16 +136,20 @@
 // RWMutex.
 // Neither of these conditions are met here, and we should transition to a
 // regular map with a mutex for better type safety.
-type PerClusterStore struct {
+type perClusterStore struct {
+	cluster, service string
 	drops            sync.Map // map[string]*uint64
 	localityRPCCount sync.Map // map[string]*rpcCountData
+
+	mu               sync.Mutex
+	lastLoadReportAt time.Time
 }
 
 // Update functions are called by picker for each RPC. To avoid contention, all
 // updates are done atomically.
 
 // CallDropped adds one drop record with the given category to store.
-func (ls *PerClusterStore) CallDropped(category string) {
+func (ls *perClusterStore) CallDropped(category string) {
 	if ls == nil {
 		return
 	}
@@ -110,7 +163,7 @@
 }
 
 // CallStarted adds one call started record for the given locality.
-func (ls *PerClusterStore) CallStarted(locality string) {
+func (ls *perClusterStore) CallStarted(locality string) {
 	if ls == nil {
 		return
 	}
@@ -125,7 +178,7 @@
 
 // CallFinished adds one call finished record for the given locality.
 // For successful calls, err needs to be nil.
-func (ls *PerClusterStore) CallFinished(locality string, err error) {
+func (ls *perClusterStore) CallFinished(locality string, err error) {
 	if ls == nil {
 		return
 	}
@@ -146,7 +199,7 @@
 
 // CallServerLoad adds one server load record for the given locality. The
 // load type is specified by desc, and its value by val.
-func (ls *PerClusterStore) CallServerLoad(locality, name string, d float64) {
+func (ls *perClusterStore) CallServerLoad(locality, name string, d float64) {
 	if ls == nil {
 		return
 	}
@@ -154,21 +207,28 @@
 	p, ok := ls.localityRPCCount.Load(locality)
 	if !ok {
 		// The map is never cleared, only values in the map are reset. So the
-		// case where entry for CallServerLoad is not found should never happen.
+		// case where entry for callServerLoad is not found should never happen.
 		return
 	}
 	p.(*rpcCountData).addServerLoad(name, d)
 }
 
 // Data contains all load data reported to the Store since the most recent call
-// to Stats().
+// to stats().
 type Data struct {
+	// Cluster is the name of the cluster this data is for.
+	Cluster string
+	// Service is the name of the EDS service this data is for.
+	Service string
 	// TotalDrops is the total number of dropped requests.
 	TotalDrops uint64
 	// Drops is the number of dropped requests per category.
 	Drops map[string]uint64
 	// LocalityStats contains load reports per locality.
 	LocalityStats map[string]LocalityData
+	// ReportInternal is the duration since last time load was reported (stats()
+	// was called).
+	ReportInterval time.Duration
 }
 
 // LocalityData contains load data for a single locality.
@@ -198,21 +258,25 @@
 	Sum float64
 }
 
-func newStoreData() *Data {
+func newData(cluster, service string) *Data {
 	return &Data{
+		Cluster:       cluster,
+		Service:       service,
 		Drops:         make(map[string]uint64),
 		LocalityStats: make(map[string]LocalityData),
 	}
 }
 
-// Stats returns and resets all loads reported to the store, except inProgress
+// stats returns and resets all loads reported to the store, except inProgress
 // rpc counts.
-func (ls *PerClusterStore) Stats() *Data {
+//
+// It returns nil if the store doesn't contain any (new) data.
+func (ls *perClusterStore) stats() *Data {
 	if ls == nil {
 		return nil
 	}
 
-	sd := newStoreData()
+	sd := newData(ls.cluster, ls.service)
 	ls.drops.Range(func(key, val interface{}) bool {
 		d := atomic.SwapUint64(val.(*uint64), 0)
 		if d == 0 {
@@ -253,6 +317,15 @@
 		sd.LocalityStats[key.(string)] = ld
 		return true
 	})
+
+	ls.mu.Lock()
+	sd.ReportInterval = time.Since(ls.lastLoadReportAt)
+	ls.lastLoadReportAt = time.Now()
+	ls.mu.Unlock()
+
+	if sd.TotalDrops == 0 && len(sd.Drops) == 0 && len(sd.LocalityStats) == 0 {
+		return nil
+	}
 	return sd
 }
 
diff --git a/xds/internal/client/load/store_test.go b/xds/internal/client/load/store_test.go
index df9aed7..4d62ebc 100644
--- a/xds/internal/client/load/store_test.go
+++ b/xds/internal/client/load/store_test.go
@@ -19,6 +19,7 @@
 
 import (
 	"fmt"
+	"sort"
 	"sync"
 	"testing"
 
@@ -56,7 +57,7 @@
 		}
 	)
 
-	ls := PerClusterStore{}
+	ls := perClusterStore{}
 	var wg sync.WaitGroup
 	for category, count := range drops {
 		for i := 0; i < count; i++ {
@@ -69,9 +70,9 @@
 	}
 	wg.Wait()
 
-	gotStoreData := ls.Stats()
-	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
-		t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
+	gotStoreData := ls.stats()
+	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" {
+		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
 	}
 }
 
@@ -118,7 +119,7 @@
 		}
 	)
 
-	ls := PerClusterStore{}
+	ls := perClusterStore{}
 	var wg sync.WaitGroup
 	for locality, data := range localityData {
 		wg.Add(data.start)
@@ -128,7 +129,7 @@
 				wg.Done()
 			}(locality)
 		}
-		// The calls to CallStarted() need to happen before the other calls are
+		// The calls to callStarted() need to happen before the other calls are
 		// made. Hence the wait here.
 		wg.Wait()
 
@@ -152,9 +153,9 @@
 		wg.Wait()
 	}
 
-	gotStoreData := ls.Stats()
-	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
-		t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
+	gotStoreData := ls.stats()
+	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" {
+		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
 	}
 }
 
@@ -211,7 +212,7 @@
 		}
 	)
 
-	reportLoad := func(ls *PerClusterStore) {
+	reportLoad := func(ls *perClusterStore) {
 		for category, count := range drops {
 			for i := 0; i < count; i++ {
 				ls.CallDropped(category)
@@ -233,14 +234,14 @@
 		}
 	}
 
-	ls := PerClusterStore{}
+	ls := perClusterStore{}
 	reportLoad(&ls)
-	gotStoreData := ls.Stats()
-	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
-		t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
+	gotStoreData := ls.stats()
+	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" {
+		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
 	}
 
-	// The above call to Stats() should have reset all load reports except the
+	// The above call to stats() should have reset all load reports except the
 	// inProgress rpc count. We are now going to push the same load data into
 	// the store. So, we should expect to see twice the count for inProgress.
 	for _, l := range localities {
@@ -249,8 +250,196 @@
 		wantStoreData.LocalityStats[l] = ls
 	}
 	reportLoad(&ls)
-	gotStoreData = ls.Stats()
-	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
-		t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
+	gotStoreData = ls.stats()
+	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" {
+		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
+	}
+}
+
+var sortDataSlice = cmp.Transformer("SortDataSlice", func(in []*Data) []*Data {
+	out := append([]*Data(nil), in...) // Copy input to avoid mutating it
+	sort.Slice(out,
+		func(i, j int) bool {
+			if out[i].Cluster < out[j].Cluster {
+				return true
+			}
+			if out[i].Cluster == out[j].Cluster {
+				return out[i].Service < out[j].Service
+			}
+			return false
+		},
+	)
+	return out
+})
+
+// Test all load are returned for the given clusters, and all clusters are
+// reported if no cluster is specified.
+func TestStoreStats(t *testing.T) {
+	var (
+		testClusters = []string{"c0", "c1", "c2"}
+		testServices = []string{"s0", "s1"}
+		testLocality = "test-locality"
+	)
+
+	store := NewStore()
+	for _, c := range testClusters {
+		for _, s := range testServices {
+			store.PerCluster(c, s).CallStarted(testLocality)
+			store.PerCluster(c, s).CallServerLoad(testLocality, "abc", 123)
+			store.PerCluster(c, s).CallDropped("dropped")
+			store.PerCluster(c, s).CallFinished(testLocality, nil)
+		}
+	}
+
+	wantC0 := []*Data{
+		{
+			Cluster: "c0", Service: "s0",
+			TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
+			LocalityStats: map[string]LocalityData{
+				"test-locality": {
+					RequestStats: RequestData{Succeeded: 1},
+					LoadStats:    map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
+				},
+			},
+		},
+		{
+			Cluster: "c0", Service: "s1",
+			TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
+			LocalityStats: map[string]LocalityData{
+				"test-locality": {
+					RequestStats: RequestData{Succeeded: 1},
+					LoadStats:    map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
+				},
+			},
+		},
+	}
+	// Call Stats with just "c0", this should return data for "c0", and not
+	// touch data for other clusters.
+	gotC0 := store.Stats([]string{"c0"})
+	if diff := cmp.Diff(wantC0, gotC0, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" {
+		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
+	}
+
+	wantOther := []*Data{
+		{
+			Cluster: "c1", Service: "s0",
+			TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
+			LocalityStats: map[string]LocalityData{
+				"test-locality": {
+					RequestStats: RequestData{Succeeded: 1},
+					LoadStats:    map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
+				},
+			},
+		},
+		{
+			Cluster: "c1", Service: "s1",
+			TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
+			LocalityStats: map[string]LocalityData{
+				"test-locality": {
+					RequestStats: RequestData{Succeeded: 1},
+					LoadStats:    map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
+				},
+			},
+		},
+		{
+			Cluster: "c2", Service: "s0",
+			TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
+			LocalityStats: map[string]LocalityData{
+				"test-locality": {
+					RequestStats: RequestData{Succeeded: 1},
+					LoadStats:    map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
+				},
+			},
+		},
+		{
+			Cluster: "c2", Service: "s1",
+			TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
+			LocalityStats: map[string]LocalityData{
+				"test-locality": {
+					RequestStats: RequestData{Succeeded: 1},
+					LoadStats:    map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
+				},
+			},
+		},
+	}
+	// Call Stats with empty slice, this should return data for all the
+	// remaining clusters, and not include c0 (because c0 data was cleared).
+	gotOther := store.Stats(nil)
+	if diff := cmp.Diff(wantOther, gotOther, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" {
+		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
+	}
+}
+
+// Test the cases that if a cluster doesn't have load to report, its data is not
+// appended to the slice returned by Stats().
+func TestStoreStatsEmptyDataNotReported(t *testing.T) {
+	var (
+		testServices = []string{"s0", "s1"}
+		testLocality = "test-locality"
+	)
+
+	store := NewStore()
+	// "c0"'s RPCs all finish with success.
+	for _, s := range testServices {
+		store.PerCluster("c0", s).CallStarted(testLocality)
+		store.PerCluster("c0", s).CallFinished(testLocality, nil)
+	}
+	// "c1"'s RPCs never finish (always inprocess).
+	for _, s := range testServices {
+		store.PerCluster("c1", s).CallStarted(testLocality)
+	}
+
+	want0 := []*Data{
+		{
+			Cluster: "c0", Service: "s0",
+			LocalityStats: map[string]LocalityData{
+				"test-locality": {RequestStats: RequestData{Succeeded: 1}},
+			},
+		},
+		{
+			Cluster: "c0", Service: "s1",
+			LocalityStats: map[string]LocalityData{
+				"test-locality": {RequestStats: RequestData{Succeeded: 1}},
+			},
+		},
+		{
+			Cluster: "c1", Service: "s0",
+			LocalityStats: map[string]LocalityData{
+				"test-locality": {RequestStats: RequestData{InProgress: 1}},
+			},
+		},
+		{
+			Cluster: "c1", Service: "s1",
+			LocalityStats: map[string]LocalityData{
+				"test-locality": {RequestStats: RequestData{InProgress: 1}},
+			},
+		},
+	}
+	// Call Stats with empty slice, this should return data for all the
+	// clusters.
+	got0 := store.Stats(nil)
+	if diff := cmp.Diff(want0, got0, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" {
+		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
+	}
+
+	want1 := []*Data{
+		{
+			Cluster: "c1", Service: "s0",
+			LocalityStats: map[string]LocalityData{
+				"test-locality": {RequestStats: RequestData{InProgress: 1}},
+			},
+		},
+		{
+			Cluster: "c1", Service: "s1",
+			LocalityStats: map[string]LocalityData{
+				"test-locality": {RequestStats: RequestData{InProgress: 1}},
+			},
+		},
+	}
+	// Call Stats with empty slice again, this should return data only for "c1",
+	// because "c0" data was cleared, but "c1" has in-progress RPCs.
+	got1 := store.Stats(nil)
+	if diff := cmp.Diff(want1, got1, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" {
+		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
 	}
 }
diff --git a/xds/internal/client/v2/client.go b/xds/internal/client/v2/client.go
index f838b5f..c3de39c 100644
--- a/xds/internal/client/v2/client.go
+++ b/xds/internal/client/v2/client.go
@@ -23,7 +23,6 @@
 	"context"
 	"fmt"
 	"sync"
-	"time"
 
 	"github.com/golang/protobuf/proto"
 	"google.golang.org/grpc"
@@ -103,8 +102,6 @@
 	// processing needs this to do the host matching.
 	ldsResourceName string
 	ldsWatchCount   int
-
-	lastLoadReportAt time.Time
 }
 
 // AddWatch overrides the transport helper's AddWatch to save the LDS
diff --git a/xds/internal/client/v2/loadreport.go b/xds/internal/client/v2/loadreport.go
index 899cb1b..a06dcb8 100644
--- a/xds/internal/client/v2/loadreport.go
+++ b/xds/internal/client/v2/loadreport.go
@@ -116,58 +116,58 @@
 		return errors.New("lrs: LoadStore is not initialized")
 	}
 
-	var (
-		droppedReqs   []*v2endpointpb.ClusterStats_DroppedRequests
-		localityStats []*v2endpointpb.UpstreamLocalityStats
-	)
-
-	sd := v2c.loadStore.PerCluster(clusterName, "").Stats()
-	for category, count := range sd.Drops {
-		droppedReqs = append(droppedReqs, &v2endpointpb.ClusterStats_DroppedRequests{
-			Category:     category,
-			DroppedCount: count,
-		})
-	}
-	for l, localityData := range sd.LocalityStats {
-		lid, err := internal.LocalityIDFromString(l)
-		if err != nil {
-			return err
-		}
-		var loadMetricStats []*v2endpointpb.EndpointLoadMetricStats
-		for name, loadData := range localityData.LoadStats {
-			loadMetricStats = append(loadMetricStats, &v2endpointpb.EndpointLoadMetricStats{
-				MetricName:                    name,
-				NumRequestsFinishedWithMetric: loadData.Count,
-				TotalMetricValue:              loadData.Sum,
+	var clusterStats []*v2endpointpb.ClusterStats
+	sds := v2c.loadStore.Stats([]string{clusterName})
+	for _, sd := range sds {
+		var (
+			droppedReqs   []*v2endpointpb.ClusterStats_DroppedRequests
+			localityStats []*v2endpointpb.UpstreamLocalityStats
+		)
+		for category, count := range sd.Drops {
+			droppedReqs = append(droppedReqs, &v2endpointpb.ClusterStats_DroppedRequests{
+				Category:     category,
+				DroppedCount: count,
 			})
 		}
-		localityStats = append(localityStats, &v2endpointpb.UpstreamLocalityStats{
-			Locality: &v2corepb.Locality{
-				Region:  lid.Region,
-				Zone:    lid.Zone,
-				SubZone: lid.SubZone,
-			},
-			TotalSuccessfulRequests: localityData.RequestStats.Succeeded,
-			TotalRequestsInProgress: localityData.RequestStats.InProgress,
-			TotalErrorRequests:      localityData.RequestStats.Errored,
-			LoadMetricStats:         loadMetricStats,
-			UpstreamEndpointStats:   nil, // TODO: populate for per endpoint loads.
-		})
-	}
+		for l, localityData := range sd.LocalityStats {
+			lid, err := internal.LocalityIDFromString(l)
+			if err != nil {
+				return err
+			}
+			var loadMetricStats []*v2endpointpb.EndpointLoadMetricStats
+			for name, loadData := range localityData.LoadStats {
+				loadMetricStats = append(loadMetricStats, &v2endpointpb.EndpointLoadMetricStats{
+					MetricName:                    name,
+					NumRequestsFinishedWithMetric: loadData.Count,
+					TotalMetricValue:              loadData.Sum,
+				})
+			}
+			localityStats = append(localityStats, &v2endpointpb.UpstreamLocalityStats{
+				Locality: &v2corepb.Locality{
+					Region:  lid.Region,
+					Zone:    lid.Zone,
+					SubZone: lid.SubZone,
+				},
+				TotalSuccessfulRequests: localityData.RequestStats.Succeeded,
+				TotalRequestsInProgress: localityData.RequestStats.InProgress,
+				TotalErrorRequests:      localityData.RequestStats.Errored,
+				LoadMetricStats:         loadMetricStats,
+				UpstreamEndpointStats:   nil, // TODO: populate for per endpoint loads.
+			})
+		}
 
-	dur := time.Since(v2c.lastLoadReportAt)
-	v2c.lastLoadReportAt = time.Now()
-
-	cs := []*v2endpointpb.ClusterStats{
-		{
-			ClusterName:           clusterName,
+		clusterStats = append(clusterStats, &v2endpointpb.ClusterStats{
+			ClusterName:           sd.Cluster,
+			ClusterServiceName:    sd.Service,
 			UpstreamLocalityStats: localityStats,
 			TotalDroppedRequests:  sd.TotalDrops,
 			DroppedRequests:       droppedReqs,
-			LoadReportInterval:    ptypes.DurationProto(dur),
-		},
+			LoadReportInterval:    ptypes.DurationProto(sd.ReportInterval),
+		})
+
 	}
-	req := &lrspb.LoadStatsRequest{ClusterStats: cs}
+
+	req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats}
 	v2c.logger.Infof("lrs: sending LRS loads: %+v", req)
 	return stream.Send(req)
 }
diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go
index ba90861..9894280 100644
--- a/xds/internal/client/v3/client.go
+++ b/xds/internal/client/v3/client.go
@@ -23,7 +23,6 @@
 	"context"
 	"fmt"
 	"sync"
-	"time"
 
 	"github.com/golang/protobuf/proto"
 	"google.golang.org/grpc"
@@ -103,8 +102,6 @@
 	// processing needs this to do the host matching.
 	ldsResourceName string
 	ldsWatchCount   int
-
-	lastLoadReportAt time.Time
 }
 
 // AddWatch overrides the transport helper's AddWatch to save the LDS
diff --git a/xds/internal/client/v3/loadreport.go b/xds/internal/client/v3/loadreport.go
index 1997277..beca34c 100644
--- a/xds/internal/client/v3/loadreport.go
+++ b/xds/internal/client/v3/loadreport.go
@@ -116,58 +116,57 @@
 		return errors.New("lrs: LoadStore is not initialized")
 	}
 
-	var (
-		droppedReqs   []*v3endpointpb.ClusterStats_DroppedRequests
-		localityStats []*v3endpointpb.UpstreamLocalityStats
-	)
-
-	sd := v3c.loadStore.PerCluster(clusterName, "").Stats()
-	for category, count := range sd.Drops {
-		droppedReqs = append(droppedReqs, &v3endpointpb.ClusterStats_DroppedRequests{
-			Category:     category,
-			DroppedCount: count,
-		})
-	}
-	for l, localityData := range sd.LocalityStats {
-		lid, err := internal.LocalityIDFromString(l)
-		if err != nil {
-			return err
-		}
-		var loadMetricStats []*v3endpointpb.EndpointLoadMetricStats
-		for name, loadData := range localityData.LoadStats {
-			loadMetricStats = append(loadMetricStats, &v3endpointpb.EndpointLoadMetricStats{
-				MetricName:                    name,
-				NumRequestsFinishedWithMetric: loadData.Count,
-				TotalMetricValue:              loadData.Sum,
+	var clusterStats []*v3endpointpb.ClusterStats
+	sds := v3c.loadStore.Stats([]string{clusterName})
+	for _, sd := range sds {
+		var (
+			droppedReqs   []*v3endpointpb.ClusterStats_DroppedRequests
+			localityStats []*v3endpointpb.UpstreamLocalityStats
+		)
+		for category, count := range sd.Drops {
+			droppedReqs = append(droppedReqs, &v3endpointpb.ClusterStats_DroppedRequests{
+				Category:     category,
+				DroppedCount: count,
 			})
 		}
-		localityStats = append(localityStats, &v3endpointpb.UpstreamLocalityStats{
-			Locality: &v3corepb.Locality{
-				Region:  lid.Region,
-				Zone:    lid.Zone,
-				SubZone: lid.SubZone,
-			},
-			TotalSuccessfulRequests: localityData.RequestStats.Succeeded,
-			TotalRequestsInProgress: localityData.RequestStats.InProgress,
-			TotalErrorRequests:      localityData.RequestStats.Errored,
-			LoadMetricStats:         loadMetricStats,
-			UpstreamEndpointStats:   nil, // TODO: populate for per endpoint loads.
-		})
-	}
+		for l, localityData := range sd.LocalityStats {
+			lid, err := internal.LocalityIDFromString(l)
+			if err != nil {
+				return err
+			}
+			var loadMetricStats []*v3endpointpb.EndpointLoadMetricStats
+			for name, loadData := range localityData.LoadStats {
+				loadMetricStats = append(loadMetricStats, &v3endpointpb.EndpointLoadMetricStats{
+					MetricName:                    name,
+					NumRequestsFinishedWithMetric: loadData.Count,
+					TotalMetricValue:              loadData.Sum,
+				})
+			}
+			localityStats = append(localityStats, &v3endpointpb.UpstreamLocalityStats{
+				Locality: &v3corepb.Locality{
+					Region:  lid.Region,
+					Zone:    lid.Zone,
+					SubZone: lid.SubZone,
+				},
+				TotalSuccessfulRequests: localityData.RequestStats.Succeeded,
+				TotalRequestsInProgress: localityData.RequestStats.InProgress,
+				TotalErrorRequests:      localityData.RequestStats.Errored,
+				LoadMetricStats:         loadMetricStats,
+				UpstreamEndpointStats:   nil, // TODO: populate for per endpoint loads.
+			})
+		}
 
-	dur := time.Since(v3c.lastLoadReportAt)
-	v3c.lastLoadReportAt = time.Now()
-
-	cs := []*v3endpointpb.ClusterStats{
-		{
-			ClusterName:           clusterName,
+		clusterStats = append(clusterStats, &v3endpointpb.ClusterStats{
+			ClusterName:           sd.Cluster,
+			ClusterServiceName:    sd.Service,
 			UpstreamLocalityStats: localityStats,
 			TotalDroppedRequests:  sd.TotalDrops,
 			DroppedRequests:       droppedReqs,
-			LoadReportInterval:    ptypes.DurationProto(dur),
-		},
+			LoadReportInterval:    ptypes.DurationProto(sd.ReportInterval),
+		})
 	}
-	req := &lrspb.LoadStatsRequest{ClusterStats: cs}
+
+	req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats}
 	v3c.logger.Infof("lrs: sending LRS loads: %+v", req)
 	return stream.Send(req)
 }