| /* |
| * Copyright 2020 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| // Package load provides functionality to record and maintain load data. |
| package load |
| |
| import ( |
| "sync" |
| "sync/atomic" |
| "time" |
| ) |
| |
| const negativeOneUInt64 = ^uint64(0) |
| |
| // Store keeps the loads for multiple clusters and services to be reported via |
| // LRS. It contains loads to reported to one LRS server. Create multiple stores |
| // for multiple servers. |
| // |
| // It is safe for concurrent use. |
| type Store struct { |
| // mu only protects the map (2 layers). The read/write to *perClusterStore |
| // doesn't need to hold the mu. |
| mu sync.Mutex |
| // clusters is a map with cluster name as the key. The second layer is a map |
| // with service name as the key. Each value (perClusterStore) contains data |
| // for a (cluster, service) pair. |
| // |
| // Note that new entries are added to this map, but never removed. This is |
| // potentially a memory leak. But the memory is allocated for each new |
| // (cluster,service) pair, and the memory allocated is just pointers and |
| // maps. So this shouldn't get too bad. |
| clusters map[string]map[string]*perClusterStore |
| } |
| |
| // NewStore creates a Store. |
| func NewStore() *Store { |
| return &Store{ |
| clusters: make(map[string]map[string]*perClusterStore), |
| } |
| } |
| |
| // Stats returns the load data for the given cluster names. Data is returned in |
| // a slice with no specific order. |
| // |
| // If no clusterName is given (an empty slice), all data for all known clusters |
| // is returned. |
| // |
| // If a cluster's Data is empty (no load to report), it's not appended to the |
| // returned slice. |
| func (s *Store) Stats(clusterNames []string) []*Data { |
| var ret []*Data |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| |
| if len(clusterNames) == 0 { |
| for _, c := range s.clusters { |
| ret = appendClusterStats(ret, c) |
| } |
| return ret |
| } |
| |
| for _, n := range clusterNames { |
| if c, ok := s.clusters[n]; ok { |
| ret = appendClusterStats(ret, c) |
| } |
| } |
| return ret |
| } |
| |
| // appendClusterStats gets Data for the given cluster, append to ret, and return |
| // the new slice. |
| // |
| // Data is only appended to ret if it's not empty. |
| func appendClusterStats(ret []*Data, cluster map[string]*perClusterStore) []*Data { |
| for _, d := range cluster { |
| data := d.stats() |
| if data == nil { |
| // Skip this data if it doesn't contain any information. |
| continue |
| } |
| ret = append(ret, data) |
| } |
| return ret |
| } |
| |
| // PerCluster returns the perClusterStore for the given clusterName + |
| // serviceName. |
| func (s *Store) PerCluster(clusterName, serviceName string) PerClusterReporter { |
| if s == nil { |
| return nil |
| } |
| |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| c, ok := s.clusters[clusterName] |
| if !ok { |
| c = make(map[string]*perClusterStore) |
| s.clusters[clusterName] = c |
| } |
| |
| if p, ok := c[serviceName]; ok { |
| return p |
| } |
| p := &perClusterStore{ |
| cluster: clusterName, |
| service: serviceName, |
| } |
| c[serviceName] = p |
| return p |
| } |
| |
| // perClusterStore is a repository for LB policy implementations to report store |
| // load data. It contains load for a (cluster, edsService) pair. |
| // |
| // It is safe for concurrent use. |
| // |
| // TODO(easwars): Use regular maps with mutexes instead of sync.Map here. The |
| // latter is optimized for two common use cases: (1) when the entry for a given |
| // key is only ever written once but read many times, as in caches that only |
| // grow, or (2) when multiple goroutines read, write, and overwrite entries for |
| // disjoint sets of keys. In these two cases, use of a Map may significantly |
| // reduce lock contention compared to a Go map paired with a separate Mutex or |
| // RWMutex. |
| // Neither of these conditions are met here, and we should transition to a |
| // regular map with a mutex for better type safety. |
| type perClusterStore struct { |
| cluster, service string |
| drops sync.Map // map[string]*uint64 |
| localityRPCCount sync.Map // map[string]*rpcCountData |
| |
| mu sync.Mutex |
| lastLoadReportAt time.Time |
| } |
| |
| // Update functions are called by picker for each RPC. To avoid contention, all |
| // updates are done atomically. |
| |
| // CallDropped adds one drop record with the given category to store. |
| func (ls *perClusterStore) CallDropped(category string) { |
| if ls == nil { |
| return |
| } |
| |
| p, ok := ls.drops.Load(category) |
| if !ok { |
| tp := new(uint64) |
| p, _ = ls.drops.LoadOrStore(category, tp) |
| } |
| atomic.AddUint64(p.(*uint64), 1) |
| } |
| |
| // CallStarted adds one call started record for the given locality. |
| func (ls *perClusterStore) CallStarted(locality string) { |
| if ls == nil { |
| return |
| } |
| |
| p, ok := ls.localityRPCCount.Load(locality) |
| if !ok { |
| tp := newRPCCountData() |
| p, _ = ls.localityRPCCount.LoadOrStore(locality, tp) |
| } |
| p.(*rpcCountData).incrInProgress() |
| } |
| |
| // CallFinished adds one call finished record for the given locality. |
| // For successful calls, err needs to be nil. |
| func (ls *perClusterStore) CallFinished(locality string, err error) { |
| if ls == nil { |
| return |
| } |
| |
| p, ok := ls.localityRPCCount.Load(locality) |
| if !ok { |
| // The map is never cleared, only values in the map are reset. So the |
| // case where entry for call-finish is not found should never happen. |
| return |
| } |
| p.(*rpcCountData).decrInProgress() |
| if err == nil { |
| p.(*rpcCountData).incrSucceeded() |
| } else { |
| p.(*rpcCountData).incrErrored() |
| } |
| } |
| |
| // CallServerLoad adds one server load record for the given locality. The |
| // load type is specified by desc, and its value by val. |
| func (ls *perClusterStore) CallServerLoad(locality, name string, d float64) { |
| if ls == nil { |
| return |
| } |
| |
| p, ok := ls.localityRPCCount.Load(locality) |
| if !ok { |
| // The map is never cleared, only values in the map are reset. So the |
| // case where entry for callServerLoad is not found should never happen. |
| return |
| } |
| p.(*rpcCountData).addServerLoad(name, d) |
| } |
| |
| // Data contains all load data reported to the Store since the most recent call |
| // to stats(). |
| type Data struct { |
| // Cluster is the name of the cluster this data is for. |
| Cluster string |
| // Service is the name of the EDS service this data is for. |
| Service string |
| // TotalDrops is the total number of dropped requests. |
| TotalDrops uint64 |
| // Drops is the number of dropped requests per category. |
| Drops map[string]uint64 |
| // LocalityStats contains load reports per locality. |
| LocalityStats map[string]LocalityData |
| // ReportInternal is the duration since last time load was reported (stats() |
| // was called). |
| ReportInterval time.Duration |
| } |
| |
| // LocalityData contains load data for a single locality. |
| type LocalityData struct { |
| // RequestStats contains counts of requests made to the locality. |
| RequestStats RequestData |
| // LoadStats contains server load data for requests made to the locality, |
| // indexed by the load type. |
| LoadStats map[string]ServerLoadData |
| } |
| |
| // RequestData contains request counts. |
| type RequestData struct { |
| // Succeeded is the number of succeeded requests. |
| Succeeded uint64 |
| // Errored is the number of requests which ran into errors. |
| Errored uint64 |
| // InProgress is the number of requests in flight. |
| InProgress uint64 |
| } |
| |
| // ServerLoadData contains server load data. |
| type ServerLoadData struct { |
| // Count is the number of load reports. |
| Count uint64 |
| // Sum is the total value of all load reports. |
| Sum float64 |
| } |
| |
| func newData(cluster, service string) *Data { |
| return &Data{ |
| Cluster: cluster, |
| Service: service, |
| Drops: make(map[string]uint64), |
| LocalityStats: make(map[string]LocalityData), |
| } |
| } |
| |
| // stats returns and resets all loads reported to the store, except inProgress |
| // rpc counts. |
| // |
| // It returns nil if the store doesn't contain any (new) data. |
| func (ls *perClusterStore) stats() *Data { |
| if ls == nil { |
| return nil |
| } |
| |
| sd := newData(ls.cluster, ls.service) |
| ls.drops.Range(func(key, val interface{}) bool { |
| d := atomic.SwapUint64(val.(*uint64), 0) |
| if d == 0 { |
| return true |
| } |
| sd.TotalDrops += d |
| sd.Drops[key.(string)] = d |
| return true |
| }) |
| ls.localityRPCCount.Range(func(key, val interface{}) bool { |
| countData := val.(*rpcCountData) |
| succeeded := countData.loadAndClearSucceeded() |
| inProgress := countData.loadInProgress() |
| errored := countData.loadAndClearErrored() |
| if succeeded == 0 && inProgress == 0 && errored == 0 { |
| return true |
| } |
| |
| ld := LocalityData{ |
| RequestStats: RequestData{ |
| Succeeded: succeeded, |
| Errored: errored, |
| InProgress: inProgress, |
| }, |
| LoadStats: make(map[string]ServerLoadData), |
| } |
| countData.serverLoads.Range(func(key, val interface{}) bool { |
| sum, count := val.(*rpcLoadData).loadAndClear() |
| if count == 0 { |
| return true |
| } |
| ld.LoadStats[key.(string)] = ServerLoadData{ |
| Count: count, |
| Sum: sum, |
| } |
| return true |
| }) |
| sd.LocalityStats[key.(string)] = ld |
| return true |
| }) |
| |
| ls.mu.Lock() |
| sd.ReportInterval = time.Since(ls.lastLoadReportAt) |
| ls.lastLoadReportAt = time.Now() |
| ls.mu.Unlock() |
| |
| if sd.TotalDrops == 0 && len(sd.Drops) == 0 && len(sd.LocalityStats) == 0 { |
| return nil |
| } |
| return sd |
| } |
| |
| type rpcCountData struct { |
| // Only atomic accesses are allowed for the fields. |
| succeeded *uint64 |
| errored *uint64 |
| inProgress *uint64 |
| |
| // Map from load desc to load data (sum+count). Loading data from map is |
| // atomic, but updating data takes a lock, which could cause contention when |
| // multiple RPCs try to report loads for the same desc. |
| // |
| // To fix the contention, shard this map. |
| serverLoads sync.Map // map[string]*rpcLoadData |
| } |
| |
| func newRPCCountData() *rpcCountData { |
| return &rpcCountData{ |
| succeeded: new(uint64), |
| errored: new(uint64), |
| inProgress: new(uint64), |
| } |
| } |
| |
| func (rcd *rpcCountData) incrSucceeded() { |
| atomic.AddUint64(rcd.succeeded, 1) |
| } |
| |
| func (rcd *rpcCountData) loadAndClearSucceeded() uint64 { |
| return atomic.SwapUint64(rcd.succeeded, 0) |
| } |
| |
| func (rcd *rpcCountData) incrErrored() { |
| atomic.AddUint64(rcd.errored, 1) |
| } |
| |
| func (rcd *rpcCountData) loadAndClearErrored() uint64 { |
| return atomic.SwapUint64(rcd.errored, 0) |
| } |
| |
| func (rcd *rpcCountData) incrInProgress() { |
| atomic.AddUint64(rcd.inProgress, 1) |
| } |
| |
| func (rcd *rpcCountData) decrInProgress() { |
| atomic.AddUint64(rcd.inProgress, negativeOneUInt64) // atomic.Add(x, -1) |
| } |
| |
| func (rcd *rpcCountData) loadInProgress() uint64 { |
| return atomic.LoadUint64(rcd.inProgress) // InProgress count is not clear when reading. |
| } |
| |
| func (rcd *rpcCountData) addServerLoad(name string, d float64) { |
| loads, ok := rcd.serverLoads.Load(name) |
| if !ok { |
| tl := newRPCLoadData() |
| loads, _ = rcd.serverLoads.LoadOrStore(name, tl) |
| } |
| loads.(*rpcLoadData).add(d) |
| } |
| |
| // Data for server loads (from trailers or oob). Fields in this struct must be |
| // updated consistently. |
| // |
| // The current solution is to hold a lock, which could cause contention. To fix, |
| // shard serverLoads map in rpcCountData. |
| type rpcLoadData struct { |
| mu sync.Mutex |
| sum float64 |
| count uint64 |
| } |
| |
| func newRPCLoadData() *rpcLoadData { |
| return &rpcLoadData{} |
| } |
| |
| func (rld *rpcLoadData) add(v float64) { |
| rld.mu.Lock() |
| rld.sum += v |
| rld.count++ |
| rld.mu.Unlock() |
| } |
| |
| func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) { |
| rld.mu.Lock() |
| s = rld.sum |
| rld.sum = 0 |
| c = rld.count |
| rld.count = 0 |
| rld.mu.Unlock() |
| return |
| } |