lrs: add Store.Stats() to report loads for multiple clusters (#3905)
- unexport `perClusterStore` and it's `stats()`
- add `Store.Stats(clusterNames)` to report loads for the given clusters
- refactor store's map to a two layer map
- move `lastLoadReportAt` from client ton the load store, because a client can now have multiple clusters, each with a different `lastLoadReportAt`
- all tests will ignore `ReportInterval` when comparing Data
diff --git a/xds/internal/balancer/balancergroup/balancergroup_test.go b/xds/internal/balancer/balancergroup/balancergroup_test.go
index daf8f88..0474e7c 100644
--- a/xds/internal/balancer/balancergroup/balancergroup_test.go
+++ b/xds/internal/balancer/balancergroup/balancergroup_test.go
@@ -70,7 +70,7 @@
}
}
-func newTestBalancerGroup(t *testing.T, loadStore *load.PerClusterStore) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) {
+func newTestBalancerGroup(t *testing.T, loadStore load.PerClusterReporter) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
@@ -400,8 +400,12 @@
}
func (s) TestBalancerGroup_LoadReport(t *testing.T) {
- loadStore := &load.PerClusterStore{}
- cc, gator, bg := newTestBalancerGroup(t, loadStore)
+ loadStore := load.NewStore()
+ const (
+ testCluster = "test-cluster"
+ testEDSService = "test-eds-service"
+ )
+ cc, gator, bg := newTestBalancerGroup(t, loadStore.PerCluster(testCluster, testEDSService))
backendToBalancerID := make(map[balancer.SubConn]string)
@@ -440,7 +444,9 @@
// subConns in each group, we expect the picks to be equally split between
// the subConns. We do not call Done() on picks routed to sc1, so we expect
// these to show up as pending rpcs.
- wantStoreData := &load.Data{
+ wantStoreData := []*load.Data{{
+ Cluster: testCluster,
+ Service: testEDSService,
LocalityStats: map[string]load.LocalityData{
testBalancerIDs[0]: {
RequestStats: load.RequestData{Succeeded: 10, InProgress: 10},
@@ -461,7 +467,7 @@
},
},
},
- }
+ }}
for i := 0; i < 30; i++ {
scst, _ := p1.Pick(balancer.PickInfo{})
if scst.Done != nil && scst.SubConn != sc1 {
@@ -476,9 +482,9 @@
}
}
- gotStoreData := loadStore.Stats()
- if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.EquateApprox(0, 0.1)); diff != "" {
- t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
+ gotStoreData := loadStore.Stats([]string{testCluster})
+ if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.EquateApprox(0, 0.1), cmpopts.IgnoreFields(load.Data{}, "ReportInterval")); diff != "" {
+ t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
}
}
diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go
index 0febc1b..a56acb7 100644
--- a/xds/internal/balancer/edsbalancer/eds_impl_test.go
+++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go
@@ -687,8 +687,10 @@
// implements the LoadStore() method to return the underlying load.Store to
// be used.
loadStore := load.NewStore()
+ lsWrapper := &loadStoreWrapper{}
+ lsWrapper.update(loadStore, testClusterNames[0])
cw := &xdsClientWrapper{
- load: &loadStoreWrapper{store: loadStore, service: testClusterNames[0]},
+ load: lsWrapper,
}
cc := testutils.NewTestClientConn(t)
@@ -723,12 +725,14 @@
// We expect the 10 picks to be split between the localities since they are
// of equal weight. And since we only mark the picks routed to sc2 as done,
// the picks on sc1 should show up as inProgress.
- wantStoreData := &load.Data{
+ wantStoreData := []*load.Data{{
+ Cluster: testClusterNames[0],
+ Service: "",
LocalityStats: map[string]load.LocalityData{
locality1.String(): {RequestStats: load.RequestData{InProgress: 5}},
locality2.String(): {RequestStats: load.RequestData{Succeeded: 5}},
},
- }
+ }}
for i := 0; i < 10; i++ {
scst, _ := p1.Pick(balancer.PickInfo{})
if scst.Done != nil && scst.SubConn != sc1 {
@@ -736,8 +740,8 @@
}
}
- gotStoreData := loadStore.PerCluster(testClusterNames[0], "").Stats()
- if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
- t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
+ gotStoreData := loadStore.Stats(testClusterNames[0:1])
+ if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(load.Data{}, "ReportInterval")); diff != "" {
+ t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
}
}
diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
index 99d986f..f22c162 100644
--- a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
+++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go
@@ -48,40 +48,45 @@
)
type loadStoreWrapper struct {
- mu sync.RWMutex
- store *load.Store
- service string
+ mu sync.RWMutex
+ store *load.Store
+ service string
+ perCluster load.PerClusterReporter
}
func (lsw *loadStoreWrapper) update(store *load.Store, service string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
+ if store == lsw.store && service == lsw.service {
+ return
+ }
lsw.store = store
lsw.service = service
+ lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}
func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
- lsw.store.PerCluster(lsw.service, "").CallStarted(locality)
+ lsw.perCluster.CallStarted(locality)
}
func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
- lsw.store.PerCluster(lsw.service, "").CallFinished(locality, err)
+ lsw.perCluster.CallFinished(locality, err)
}
func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
- lsw.store.PerCluster(lsw.service, "").CallServerLoad(locality, name, val)
+ lsw.perCluster.CallServerLoad(locality, name, val)
}
func (lsw *loadStoreWrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
- lsw.store.PerCluster(lsw.service, "").CallDropped(category)
+ lsw.perCluster.CallDropped(category)
}
// xdsclientWrapper is responsible for getting the xds client from attributes or
diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go
index b5bf300..1361fb1 100644
--- a/xds/internal/balancer/lrs/balancer.go
+++ b/xds/internal/balancer/lrs/balancer.go
@@ -154,38 +154,43 @@
store *load.Store
cluster string
edsService string
+ perCluster load.PerClusterReporter
}
func (lsw *loadStoreWrapper) update(store *load.Store, cluster, edsService string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
+ if store == lsw.store && cluster == lsw.cluster && edsService == lsw.edsService {
+ return
+ }
lsw.store = store
lsw.cluster = cluster
lsw.edsService = edsService
+ lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}
func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
- lsw.store.PerCluster(lsw.cluster, lsw.edsService).CallStarted(locality)
+ lsw.perCluster.CallStarted(locality)
}
func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
- lsw.store.PerCluster(lsw.cluster, lsw.edsService).CallFinished(locality, err)
+ lsw.perCluster.CallFinished(locality, err)
}
func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
- lsw.store.PerCluster(lsw.cluster, lsw.edsService).CallServerLoad(locality, name, val)
+ lsw.perCluster.CallServerLoad(locality, name, val)
}
func (lsw *loadStoreWrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
- lsw.store.PerCluster(lsw.cluster, lsw.edsService).CallDropped(category)
+ lsw.perCluster.CallDropped(category)
}
type xdsClientWrapper struct {
diff --git a/xds/internal/balancer/lrs/balancer_test.go b/xds/internal/balancer/lrs/balancer_test.go
index 386b143..789cfea 100644
--- a/xds/internal/balancer/lrs/balancer_test.go
+++ b/xds/internal/balancer/lrs/balancer_test.go
@@ -116,7 +116,14 @@
if loadStore == nil {
t.Fatal("loadStore is nil in xdsClient")
}
- sd := loadStore.PerCluster(testClusterName, testServiceName).Stats()
+ sds := loadStore.Stats([]string{testClusterName})
+ if len(sds) == 0 {
+ t.Fatalf("loads for cluster %v not found in store", testClusterName)
+ }
+ sd := sds[0]
+ if sd.Cluster != testClusterName || sd.Service != testServiceName {
+ t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
+ }
localityData, ok := sd.LocalityStats[testLocality.String()]
if !ok {
t.Fatalf("loads for %v not found in store", testLocality)
diff --git a/xds/internal/client/load/store.go b/xds/internal/client/load/store.go
index c0c741f..a6ec1ec 100644
--- a/xds/internal/client/load/store.go
+++ b/xds/internal/client/load/store.go
@@ -20,63 +20,112 @@
import (
"sync"
"sync/atomic"
+ "time"
)
const negativeOneUInt64 = ^uint64(0)
-// A pair of cluster and service name. The same cluster can be used by multiple
-// services, and one service can use multiple clusters. So we need a pair with
-// both name to accurately indicate where the load belongs.
-type storeKey struct {
- cluster string
- service string
-}
-
// Store keeps the loads for multiple clusters and services to be reported via
-// LRS. It is safe for concurrent use.
+// LRS. It contains loads to reported to one LRS server. Create multiple stores
+// for multiple servers.
+//
+// It is safe for concurrent use.
type Store struct {
- mu sync.RWMutex
- clusters map[storeKey]*PerClusterStore
+ // mu only protects the map (2 layers). The read/write to *perClusterStore
+ // doesn't need to hold the mu.
+ mu sync.Mutex
+ // clusters is a map with cluster name as the key. The second layer is a map
+ // with service name as the key. Each value (perClusterStore) contains data
+ // for a (cluster, service) pair.
+ //
+ // Note that new entries are added to this map, but never removed. This is
+ // potentially a memory leak. But the memory is allocated for each new
+ // (cluster,service) pair, and the memory allocated is just pointers and
+ // maps. So this shouldn't get too bad.
+ clusters map[string]map[string]*perClusterStore
}
// NewStore creates a Store.
func NewStore() *Store {
return &Store{
- clusters: make(map[storeKey]*PerClusterStore),
+ clusters: make(map[string]map[string]*perClusterStore),
}
}
-// PerCluster returns the PerClusterStore for the given clusterName +
+// Stats returns the load data for the given cluster names. Data is returned in
+// a slice with no specific order.
+//
+// If no clusterName is given (an empty slice), all data for all known clusters
+// is returned.
+//
+// If a cluster's Data is empty (no load to report), it's not appended to the
+// returned slice.
+func (s *Store) Stats(clusterNames []string) []*Data {
+ var ret []*Data
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if len(clusterNames) == 0 {
+ for _, c := range s.clusters {
+ ret = appendClusterStats(ret, c)
+ }
+ return ret
+ }
+
+ for _, n := range clusterNames {
+ if c, ok := s.clusters[n]; ok {
+ ret = appendClusterStats(ret, c)
+ }
+ }
+ return ret
+}
+
+// appendClusterStats gets Data for the given cluster, append to ret, and return
+// the new slice.
+//
+// Data is only appended to ret if it's not empty.
+func appendClusterStats(ret []*Data, cluster map[string]*perClusterStore) []*Data {
+ for _, d := range cluster {
+ data := d.stats()
+ if data == nil {
+ // Skip this data if it doesn't contain any information.
+ continue
+ }
+ ret = append(ret, data)
+ }
+ return ret
+}
+
+// PerCluster returns the perClusterStore for the given clusterName +
// serviceName.
-func (ls *Store) PerCluster(clusterName, serviceName string) *PerClusterStore {
- if ls == nil {
+func (s *Store) PerCluster(clusterName, serviceName string) PerClusterReporter {
+ if s == nil {
return nil
}
- k := storeKey{
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ c, ok := s.clusters[clusterName]
+ if !ok {
+ c = make(map[string]*perClusterStore)
+ s.clusters[clusterName] = c
+ }
+
+ if p, ok := c[serviceName]; ok {
+ return p
+ }
+ p := &perClusterStore{
cluster: clusterName,
service: serviceName,
}
-
- ls.mu.RLock()
- if p, ok := ls.clusters[k]; ok {
- ls.mu.RUnlock()
- return p
- }
- ls.mu.RUnlock()
-
- ls.mu.Lock()
- defer ls.mu.Unlock()
- p, ok := ls.clusters[k]
- if !ok {
- p = &PerClusterStore{}
- ls.clusters[k] = p
- }
+ c[serviceName] = p
return p
}
-// PerClusterStore is a repository for LB policy implementations to report store
-// load data. It is safe for concurrent use.
+// perClusterStore is a repository for LB policy implementations to report store
+// load data. It contains load for a (cluster, edsService) pair.
+//
+// It is safe for concurrent use.
//
// TODO(easwars): Use regular maps with mutexes instead of sync.Map here. The
// latter is optimized for two common use cases: (1) when the entry for a given
@@ -87,16 +136,20 @@
// RWMutex.
// Neither of these conditions are met here, and we should transition to a
// regular map with a mutex for better type safety.
-type PerClusterStore struct {
+type perClusterStore struct {
+ cluster, service string
drops sync.Map // map[string]*uint64
localityRPCCount sync.Map // map[string]*rpcCountData
+
+ mu sync.Mutex
+ lastLoadReportAt time.Time
}
// Update functions are called by picker for each RPC. To avoid contention, all
// updates are done atomically.
// CallDropped adds one drop record with the given category to store.
-func (ls *PerClusterStore) CallDropped(category string) {
+func (ls *perClusterStore) CallDropped(category string) {
if ls == nil {
return
}
@@ -110,7 +163,7 @@
}
// CallStarted adds one call started record for the given locality.
-func (ls *PerClusterStore) CallStarted(locality string) {
+func (ls *perClusterStore) CallStarted(locality string) {
if ls == nil {
return
}
@@ -125,7 +178,7 @@
// CallFinished adds one call finished record for the given locality.
// For successful calls, err needs to be nil.
-func (ls *PerClusterStore) CallFinished(locality string, err error) {
+func (ls *perClusterStore) CallFinished(locality string, err error) {
if ls == nil {
return
}
@@ -146,7 +199,7 @@
// CallServerLoad adds one server load record for the given locality. The
// load type is specified by desc, and its value by val.
-func (ls *PerClusterStore) CallServerLoad(locality, name string, d float64) {
+func (ls *perClusterStore) CallServerLoad(locality, name string, d float64) {
if ls == nil {
return
}
@@ -154,21 +207,28 @@
p, ok := ls.localityRPCCount.Load(locality)
if !ok {
// The map is never cleared, only values in the map are reset. So the
- // case where entry for CallServerLoad is not found should never happen.
+ // case where entry for callServerLoad is not found should never happen.
return
}
p.(*rpcCountData).addServerLoad(name, d)
}
// Data contains all load data reported to the Store since the most recent call
-// to Stats().
+// to stats().
type Data struct {
+ // Cluster is the name of the cluster this data is for.
+ Cluster string
+ // Service is the name of the EDS service this data is for.
+ Service string
// TotalDrops is the total number of dropped requests.
TotalDrops uint64
// Drops is the number of dropped requests per category.
Drops map[string]uint64
// LocalityStats contains load reports per locality.
LocalityStats map[string]LocalityData
+ // ReportInternal is the duration since last time load was reported (stats()
+ // was called).
+ ReportInterval time.Duration
}
// LocalityData contains load data for a single locality.
@@ -198,21 +258,25 @@
Sum float64
}
-func newStoreData() *Data {
+func newData(cluster, service string) *Data {
return &Data{
+ Cluster: cluster,
+ Service: service,
Drops: make(map[string]uint64),
LocalityStats: make(map[string]LocalityData),
}
}
-// Stats returns and resets all loads reported to the store, except inProgress
+// stats returns and resets all loads reported to the store, except inProgress
// rpc counts.
-func (ls *PerClusterStore) Stats() *Data {
+//
+// It returns nil if the store doesn't contain any (new) data.
+func (ls *perClusterStore) stats() *Data {
if ls == nil {
return nil
}
- sd := newStoreData()
+ sd := newData(ls.cluster, ls.service)
ls.drops.Range(func(key, val interface{}) bool {
d := atomic.SwapUint64(val.(*uint64), 0)
if d == 0 {
@@ -253,6 +317,15 @@
sd.LocalityStats[key.(string)] = ld
return true
})
+
+ ls.mu.Lock()
+ sd.ReportInterval = time.Since(ls.lastLoadReportAt)
+ ls.lastLoadReportAt = time.Now()
+ ls.mu.Unlock()
+
+ if sd.TotalDrops == 0 && len(sd.Drops) == 0 && len(sd.LocalityStats) == 0 {
+ return nil
+ }
return sd
}
diff --git a/xds/internal/client/load/store_test.go b/xds/internal/client/load/store_test.go
index df9aed7..4d62ebc 100644
--- a/xds/internal/client/load/store_test.go
+++ b/xds/internal/client/load/store_test.go
@@ -19,6 +19,7 @@
import (
"fmt"
+ "sort"
"sync"
"testing"
@@ -56,7 +57,7 @@
}
)
- ls := PerClusterStore{}
+ ls := perClusterStore{}
var wg sync.WaitGroup
for category, count := range drops {
for i := 0; i < count; i++ {
@@ -69,9 +70,9 @@
}
wg.Wait()
- gotStoreData := ls.Stats()
- if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
- t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
+ gotStoreData := ls.stats()
+ if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" {
+ t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
}
}
@@ -118,7 +119,7 @@
}
)
- ls := PerClusterStore{}
+ ls := perClusterStore{}
var wg sync.WaitGroup
for locality, data := range localityData {
wg.Add(data.start)
@@ -128,7 +129,7 @@
wg.Done()
}(locality)
}
- // The calls to CallStarted() need to happen before the other calls are
+ // The calls to callStarted() need to happen before the other calls are
// made. Hence the wait here.
wg.Wait()
@@ -152,9 +153,9 @@
wg.Wait()
}
- gotStoreData := ls.Stats()
- if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
- t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
+ gotStoreData := ls.stats()
+ if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" {
+ t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
}
}
@@ -211,7 +212,7 @@
}
)
- reportLoad := func(ls *PerClusterStore) {
+ reportLoad := func(ls *perClusterStore) {
for category, count := range drops {
for i := 0; i < count; i++ {
ls.CallDropped(category)
@@ -233,14 +234,14 @@
}
}
- ls := PerClusterStore{}
+ ls := perClusterStore{}
reportLoad(&ls)
- gotStoreData := ls.Stats()
- if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
- t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
+ gotStoreData := ls.stats()
+ if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" {
+ t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
}
- // The above call to Stats() should have reset all load reports except the
+ // The above call to stats() should have reset all load reports except the
// inProgress rpc count. We are now going to push the same load data into
// the store. So, we should expect to see twice the count for inProgress.
for _, l := range localities {
@@ -249,8 +250,196 @@
wantStoreData.LocalityStats[l] = ls
}
reportLoad(&ls)
- gotStoreData = ls.Stats()
- if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
- t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
+ gotStoreData = ls.stats()
+ if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" {
+ t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
+ }
+}
+
+var sortDataSlice = cmp.Transformer("SortDataSlice", func(in []*Data) []*Data {
+ out := append([]*Data(nil), in...) // Copy input to avoid mutating it
+ sort.Slice(out,
+ func(i, j int) bool {
+ if out[i].Cluster < out[j].Cluster {
+ return true
+ }
+ if out[i].Cluster == out[j].Cluster {
+ return out[i].Service < out[j].Service
+ }
+ return false
+ },
+ )
+ return out
+})
+
+// Test all load are returned for the given clusters, and all clusters are
+// reported if no cluster is specified.
+func TestStoreStats(t *testing.T) {
+ var (
+ testClusters = []string{"c0", "c1", "c2"}
+ testServices = []string{"s0", "s1"}
+ testLocality = "test-locality"
+ )
+
+ store := NewStore()
+ for _, c := range testClusters {
+ for _, s := range testServices {
+ store.PerCluster(c, s).CallStarted(testLocality)
+ store.PerCluster(c, s).CallServerLoad(testLocality, "abc", 123)
+ store.PerCluster(c, s).CallDropped("dropped")
+ store.PerCluster(c, s).CallFinished(testLocality, nil)
+ }
+ }
+
+ wantC0 := []*Data{
+ {
+ Cluster: "c0", Service: "s0",
+ TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
+ LocalityStats: map[string]LocalityData{
+ "test-locality": {
+ RequestStats: RequestData{Succeeded: 1},
+ LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
+ },
+ },
+ },
+ {
+ Cluster: "c0", Service: "s1",
+ TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
+ LocalityStats: map[string]LocalityData{
+ "test-locality": {
+ RequestStats: RequestData{Succeeded: 1},
+ LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
+ },
+ },
+ },
+ }
+ // Call Stats with just "c0", this should return data for "c0", and not
+ // touch data for other clusters.
+ gotC0 := store.Stats([]string{"c0"})
+ if diff := cmp.Diff(wantC0, gotC0, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" {
+ t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
+ }
+
+ wantOther := []*Data{
+ {
+ Cluster: "c1", Service: "s0",
+ TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
+ LocalityStats: map[string]LocalityData{
+ "test-locality": {
+ RequestStats: RequestData{Succeeded: 1},
+ LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
+ },
+ },
+ },
+ {
+ Cluster: "c1", Service: "s1",
+ TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
+ LocalityStats: map[string]LocalityData{
+ "test-locality": {
+ RequestStats: RequestData{Succeeded: 1},
+ LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
+ },
+ },
+ },
+ {
+ Cluster: "c2", Service: "s0",
+ TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
+ LocalityStats: map[string]LocalityData{
+ "test-locality": {
+ RequestStats: RequestData{Succeeded: 1},
+ LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
+ },
+ },
+ },
+ {
+ Cluster: "c2", Service: "s1",
+ TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
+ LocalityStats: map[string]LocalityData{
+ "test-locality": {
+ RequestStats: RequestData{Succeeded: 1},
+ LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
+ },
+ },
+ },
+ }
+ // Call Stats with empty slice, this should return data for all the
+ // remaining clusters, and not include c0 (because c0 data was cleared).
+ gotOther := store.Stats(nil)
+ if diff := cmp.Diff(wantOther, gotOther, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" {
+ t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
+ }
+}
+
+// Test the cases that if a cluster doesn't have load to report, its data is not
+// appended to the slice returned by Stats().
+func TestStoreStatsEmptyDataNotReported(t *testing.T) {
+ var (
+ testServices = []string{"s0", "s1"}
+ testLocality = "test-locality"
+ )
+
+ store := NewStore()
+ // "c0"'s RPCs all finish with success.
+ for _, s := range testServices {
+ store.PerCluster("c0", s).CallStarted(testLocality)
+ store.PerCluster("c0", s).CallFinished(testLocality, nil)
+ }
+ // "c1"'s RPCs never finish (always inprocess).
+ for _, s := range testServices {
+ store.PerCluster("c1", s).CallStarted(testLocality)
+ }
+
+ want0 := []*Data{
+ {
+ Cluster: "c0", Service: "s0",
+ LocalityStats: map[string]LocalityData{
+ "test-locality": {RequestStats: RequestData{Succeeded: 1}},
+ },
+ },
+ {
+ Cluster: "c0", Service: "s1",
+ LocalityStats: map[string]LocalityData{
+ "test-locality": {RequestStats: RequestData{Succeeded: 1}},
+ },
+ },
+ {
+ Cluster: "c1", Service: "s0",
+ LocalityStats: map[string]LocalityData{
+ "test-locality": {RequestStats: RequestData{InProgress: 1}},
+ },
+ },
+ {
+ Cluster: "c1", Service: "s1",
+ LocalityStats: map[string]LocalityData{
+ "test-locality": {RequestStats: RequestData{InProgress: 1}},
+ },
+ },
+ }
+ // Call Stats with empty slice, this should return data for all the
+ // clusters.
+ got0 := store.Stats(nil)
+ if diff := cmp.Diff(want0, got0, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" {
+ t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
+ }
+
+ want1 := []*Data{
+ {
+ Cluster: "c1", Service: "s0",
+ LocalityStats: map[string]LocalityData{
+ "test-locality": {RequestStats: RequestData{InProgress: 1}},
+ },
+ },
+ {
+ Cluster: "c1", Service: "s1",
+ LocalityStats: map[string]LocalityData{
+ "test-locality": {RequestStats: RequestData{InProgress: 1}},
+ },
+ },
+ }
+ // Call Stats with empty slice again, this should return data only for "c1",
+ // because "c0" data was cleared, but "c1" has in-progress RPCs.
+ got1 := store.Stats(nil)
+ if diff := cmp.Diff(want1, got1, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" {
+ t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
}
}
diff --git a/xds/internal/client/v2/client.go b/xds/internal/client/v2/client.go
index f838b5f..c3de39c 100644
--- a/xds/internal/client/v2/client.go
+++ b/xds/internal/client/v2/client.go
@@ -23,7 +23,6 @@
"context"
"fmt"
"sync"
- "time"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
@@ -103,8 +102,6 @@
// processing needs this to do the host matching.
ldsResourceName string
ldsWatchCount int
-
- lastLoadReportAt time.Time
}
// AddWatch overrides the transport helper's AddWatch to save the LDS
diff --git a/xds/internal/client/v2/loadreport.go b/xds/internal/client/v2/loadreport.go
index 899cb1b..a06dcb8 100644
--- a/xds/internal/client/v2/loadreport.go
+++ b/xds/internal/client/v2/loadreport.go
@@ -116,58 +116,58 @@
return errors.New("lrs: LoadStore is not initialized")
}
- var (
- droppedReqs []*v2endpointpb.ClusterStats_DroppedRequests
- localityStats []*v2endpointpb.UpstreamLocalityStats
- )
-
- sd := v2c.loadStore.PerCluster(clusterName, "").Stats()
- for category, count := range sd.Drops {
- droppedReqs = append(droppedReqs, &v2endpointpb.ClusterStats_DroppedRequests{
- Category: category,
- DroppedCount: count,
- })
- }
- for l, localityData := range sd.LocalityStats {
- lid, err := internal.LocalityIDFromString(l)
- if err != nil {
- return err
- }
- var loadMetricStats []*v2endpointpb.EndpointLoadMetricStats
- for name, loadData := range localityData.LoadStats {
- loadMetricStats = append(loadMetricStats, &v2endpointpb.EndpointLoadMetricStats{
- MetricName: name,
- NumRequestsFinishedWithMetric: loadData.Count,
- TotalMetricValue: loadData.Sum,
+ var clusterStats []*v2endpointpb.ClusterStats
+ sds := v2c.loadStore.Stats([]string{clusterName})
+ for _, sd := range sds {
+ var (
+ droppedReqs []*v2endpointpb.ClusterStats_DroppedRequests
+ localityStats []*v2endpointpb.UpstreamLocalityStats
+ )
+ for category, count := range sd.Drops {
+ droppedReqs = append(droppedReqs, &v2endpointpb.ClusterStats_DroppedRequests{
+ Category: category,
+ DroppedCount: count,
})
}
- localityStats = append(localityStats, &v2endpointpb.UpstreamLocalityStats{
- Locality: &v2corepb.Locality{
- Region: lid.Region,
- Zone: lid.Zone,
- SubZone: lid.SubZone,
- },
- TotalSuccessfulRequests: localityData.RequestStats.Succeeded,
- TotalRequestsInProgress: localityData.RequestStats.InProgress,
- TotalErrorRequests: localityData.RequestStats.Errored,
- LoadMetricStats: loadMetricStats,
- UpstreamEndpointStats: nil, // TODO: populate for per endpoint loads.
- })
- }
+ for l, localityData := range sd.LocalityStats {
+ lid, err := internal.LocalityIDFromString(l)
+ if err != nil {
+ return err
+ }
+ var loadMetricStats []*v2endpointpb.EndpointLoadMetricStats
+ for name, loadData := range localityData.LoadStats {
+ loadMetricStats = append(loadMetricStats, &v2endpointpb.EndpointLoadMetricStats{
+ MetricName: name,
+ NumRequestsFinishedWithMetric: loadData.Count,
+ TotalMetricValue: loadData.Sum,
+ })
+ }
+ localityStats = append(localityStats, &v2endpointpb.UpstreamLocalityStats{
+ Locality: &v2corepb.Locality{
+ Region: lid.Region,
+ Zone: lid.Zone,
+ SubZone: lid.SubZone,
+ },
+ TotalSuccessfulRequests: localityData.RequestStats.Succeeded,
+ TotalRequestsInProgress: localityData.RequestStats.InProgress,
+ TotalErrorRequests: localityData.RequestStats.Errored,
+ LoadMetricStats: loadMetricStats,
+ UpstreamEndpointStats: nil, // TODO: populate for per endpoint loads.
+ })
+ }
- dur := time.Since(v2c.lastLoadReportAt)
- v2c.lastLoadReportAt = time.Now()
-
- cs := []*v2endpointpb.ClusterStats{
- {
- ClusterName: clusterName,
+ clusterStats = append(clusterStats, &v2endpointpb.ClusterStats{
+ ClusterName: sd.Cluster,
+ ClusterServiceName: sd.Service,
UpstreamLocalityStats: localityStats,
TotalDroppedRequests: sd.TotalDrops,
DroppedRequests: droppedReqs,
- LoadReportInterval: ptypes.DurationProto(dur),
- },
+ LoadReportInterval: ptypes.DurationProto(sd.ReportInterval),
+ })
+
}
- req := &lrspb.LoadStatsRequest{ClusterStats: cs}
+
+ req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats}
v2c.logger.Infof("lrs: sending LRS loads: %+v", req)
return stream.Send(req)
}
diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go
index ba90861..9894280 100644
--- a/xds/internal/client/v3/client.go
+++ b/xds/internal/client/v3/client.go
@@ -23,7 +23,6 @@
"context"
"fmt"
"sync"
- "time"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
@@ -103,8 +102,6 @@
// processing needs this to do the host matching.
ldsResourceName string
ldsWatchCount int
-
- lastLoadReportAt time.Time
}
// AddWatch overrides the transport helper's AddWatch to save the LDS
diff --git a/xds/internal/client/v3/loadreport.go b/xds/internal/client/v3/loadreport.go
index 1997277..beca34c 100644
--- a/xds/internal/client/v3/loadreport.go
+++ b/xds/internal/client/v3/loadreport.go
@@ -116,58 +116,57 @@
return errors.New("lrs: LoadStore is not initialized")
}
- var (
- droppedReqs []*v3endpointpb.ClusterStats_DroppedRequests
- localityStats []*v3endpointpb.UpstreamLocalityStats
- )
-
- sd := v3c.loadStore.PerCluster(clusterName, "").Stats()
- for category, count := range sd.Drops {
- droppedReqs = append(droppedReqs, &v3endpointpb.ClusterStats_DroppedRequests{
- Category: category,
- DroppedCount: count,
- })
- }
- for l, localityData := range sd.LocalityStats {
- lid, err := internal.LocalityIDFromString(l)
- if err != nil {
- return err
- }
- var loadMetricStats []*v3endpointpb.EndpointLoadMetricStats
- for name, loadData := range localityData.LoadStats {
- loadMetricStats = append(loadMetricStats, &v3endpointpb.EndpointLoadMetricStats{
- MetricName: name,
- NumRequestsFinishedWithMetric: loadData.Count,
- TotalMetricValue: loadData.Sum,
+ var clusterStats []*v3endpointpb.ClusterStats
+ sds := v3c.loadStore.Stats([]string{clusterName})
+ for _, sd := range sds {
+ var (
+ droppedReqs []*v3endpointpb.ClusterStats_DroppedRequests
+ localityStats []*v3endpointpb.UpstreamLocalityStats
+ )
+ for category, count := range sd.Drops {
+ droppedReqs = append(droppedReqs, &v3endpointpb.ClusterStats_DroppedRequests{
+ Category: category,
+ DroppedCount: count,
})
}
- localityStats = append(localityStats, &v3endpointpb.UpstreamLocalityStats{
- Locality: &v3corepb.Locality{
- Region: lid.Region,
- Zone: lid.Zone,
- SubZone: lid.SubZone,
- },
- TotalSuccessfulRequests: localityData.RequestStats.Succeeded,
- TotalRequestsInProgress: localityData.RequestStats.InProgress,
- TotalErrorRequests: localityData.RequestStats.Errored,
- LoadMetricStats: loadMetricStats,
- UpstreamEndpointStats: nil, // TODO: populate for per endpoint loads.
- })
- }
+ for l, localityData := range sd.LocalityStats {
+ lid, err := internal.LocalityIDFromString(l)
+ if err != nil {
+ return err
+ }
+ var loadMetricStats []*v3endpointpb.EndpointLoadMetricStats
+ for name, loadData := range localityData.LoadStats {
+ loadMetricStats = append(loadMetricStats, &v3endpointpb.EndpointLoadMetricStats{
+ MetricName: name,
+ NumRequestsFinishedWithMetric: loadData.Count,
+ TotalMetricValue: loadData.Sum,
+ })
+ }
+ localityStats = append(localityStats, &v3endpointpb.UpstreamLocalityStats{
+ Locality: &v3corepb.Locality{
+ Region: lid.Region,
+ Zone: lid.Zone,
+ SubZone: lid.SubZone,
+ },
+ TotalSuccessfulRequests: localityData.RequestStats.Succeeded,
+ TotalRequestsInProgress: localityData.RequestStats.InProgress,
+ TotalErrorRequests: localityData.RequestStats.Errored,
+ LoadMetricStats: loadMetricStats,
+ UpstreamEndpointStats: nil, // TODO: populate for per endpoint loads.
+ })
+ }
- dur := time.Since(v3c.lastLoadReportAt)
- v3c.lastLoadReportAt = time.Now()
-
- cs := []*v3endpointpb.ClusterStats{
- {
- ClusterName: clusterName,
+ clusterStats = append(clusterStats, &v3endpointpb.ClusterStats{
+ ClusterName: sd.Cluster,
+ ClusterServiceName: sd.Service,
UpstreamLocalityStats: localityStats,
TotalDroppedRequests: sd.TotalDrops,
DroppedRequests: droppedReqs,
- LoadReportInterval: ptypes.DurationProto(dur),
- },
+ LoadReportInterval: ptypes.DurationProto(sd.ReportInterval),
+ })
}
- req := &lrspb.LoadStatsRequest{ClusterStats: cs}
+
+ req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats}
v3c.logger.Infof("lrs: sending LRS loads: %+v", req)
return stream.Send(req)
}