blob: 5b0ab562c651445821dcf0abc69cd996573e5e54 [file] [log] [blame]
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package lrs implements load reporting service for xds balancer.
package lrs
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/xds/internal"
basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
loadreportpb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/endpoint/load_report"
lrsgrpc "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/load_stats/v2/lrs"
lrspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/load_stats/v2/lrs"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
)
const negativeOneUInt64 = ^uint64(0)
// Store defines the interface for a load store. It keeps loads and can report
// them to a server when requested.
type Store interface {
CallDropped(category string)
CallStarted(l internal.Locality)
CallFinished(l internal.Locality, err error)
CallServerLoad(l internal.Locality, name string, d float64)
ReportTo(ctx context.Context, cc *grpc.ClientConn)
}
type rpcCountData struct {
// Only atomic accesses are allowed for the fields.
succeeded *uint64
errored *uint64
inProgress *uint64
// Map from load name to load data (sum+count). Loading data from map is
// atomic, but updating data takes a lock, which could cause contention when
// multiple RPCs try to report loads for the same name.
//
// To fix the contention, shard this map.
serverLoads sync.Map // map[string]*rpcLoadData
}
func newRPCCountData() *rpcCountData {
return &rpcCountData{
succeeded: new(uint64),
errored: new(uint64),
inProgress: new(uint64),
}
}
func (rcd *rpcCountData) incrSucceeded() {
atomic.AddUint64(rcd.succeeded, 1)
}
func (rcd *rpcCountData) loadAndClearSucceeded() uint64 {
return atomic.SwapUint64(rcd.succeeded, 0)
}
func (rcd *rpcCountData) incrErrored() {
atomic.AddUint64(rcd.errored, 1)
}
func (rcd *rpcCountData) loadAndClearErrored() uint64 {
return atomic.SwapUint64(rcd.errored, 0)
}
func (rcd *rpcCountData) incrInProgress() {
atomic.AddUint64(rcd.inProgress, 1)
}
func (rcd *rpcCountData) decrInProgress() {
atomic.AddUint64(rcd.inProgress, negativeOneUInt64) // atomic.Add(x, -1)
}
func (rcd *rpcCountData) loadInProgress() uint64 {
return atomic.LoadUint64(rcd.inProgress) // InProgress count is not clear when reading.
}
func (rcd *rpcCountData) addServerLoad(name string, d float64) {
loads, ok := rcd.serverLoads.Load(name)
if !ok {
tl := newRPCLoadData()
loads, _ = rcd.serverLoads.LoadOrStore(name, tl)
}
loads.(*rpcLoadData).add(d)
}
// Data for server loads (from trailers or oob). Fields in this struct must be
// updated consistently.
//
// The current solution is to hold a lock, which could cause contention. To fix,
// shard serverLoads map in rpcCountData.
type rpcLoadData struct {
mu sync.Mutex
sum float64
count uint64
}
func newRPCLoadData() *rpcLoadData {
return &rpcLoadData{}
}
func (rld *rpcLoadData) add(v float64) {
rld.mu.Lock()
rld.sum += v
rld.count++
rld.mu.Unlock()
}
func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) {
rld.mu.Lock()
s = rld.sum
rld.sum = 0
c = rld.count
rld.count = 0
rld.mu.Unlock()
return
}
// lrsStore collects loads from xds balancer, and periodically sends load to the
// server.
type lrsStore struct {
node *basepb.Node
backoff backoff.Strategy
lastReported time.Time
drops sync.Map // map[string]*uint64
localityRPCCount sync.Map // map[internal.Locality]*rpcCountData
}
// NewStore creates a store for load reports.
func NewStore(serviceName string) Store {
return &lrsStore{
node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: serviceName},
},
},
},
},
backoff: backoff.Exponential{
MaxDelay: 120 * time.Second,
},
lastReported: time.Now(),
}
}
// Update functions are called by picker for each RPC. To avoid contention, all
// updates are done atomically.
// CallDropped adds one drop record with the given category to store.
func (ls *lrsStore) CallDropped(category string) {
p, ok := ls.drops.Load(category)
if !ok {
tp := new(uint64)
p, _ = ls.drops.LoadOrStore(category, tp)
}
atomic.AddUint64(p.(*uint64), 1)
}
func (ls *lrsStore) CallStarted(l internal.Locality) {
p, ok := ls.localityRPCCount.Load(l)
if !ok {
tp := newRPCCountData()
p, _ = ls.localityRPCCount.LoadOrStore(l, tp)
}
p.(*rpcCountData).incrInProgress()
}
func (ls *lrsStore) CallFinished(l internal.Locality, err error) {
p, ok := ls.localityRPCCount.Load(l)
if !ok {
// The map is never cleared, only values in the map are reset. So the
// case where entry for call-finish is not found should never happen.
return
}
p.(*rpcCountData).decrInProgress()
if err == nil {
p.(*rpcCountData).incrSucceeded()
} else {
p.(*rpcCountData).incrErrored()
}
}
func (ls *lrsStore) CallServerLoad(l internal.Locality, name string, d float64) {
p, ok := ls.localityRPCCount.Load(l)
if !ok {
// The map is never cleared, only values in the map are reset. So the
// case where entry for CallServerLoad is not found should never happen.
return
}
p.(*rpcCountData).addServerLoad(name, d)
}
func (ls *lrsStore) buildStats(clusterName string) []*loadreportpb.ClusterStats {
var (
totalDropped uint64
droppedReqs []*loadreportpb.ClusterStats_DroppedRequests
localityStats []*loadreportpb.UpstreamLocalityStats
)
ls.drops.Range(func(category, countP interface{}) bool {
tempCount := atomic.SwapUint64(countP.(*uint64), 0)
if tempCount == 0 {
return true
}
totalDropped += tempCount
droppedReqs = append(droppedReqs, &loadreportpb.ClusterStats_DroppedRequests{
Category: category.(string),
DroppedCount: tempCount,
})
return true
})
ls.localityRPCCount.Range(func(locality, countP interface{}) bool {
tempLocality := locality.(internal.Locality)
tempCount := countP.(*rpcCountData)
tempSucceeded := tempCount.loadAndClearSucceeded()
tempInProgress := tempCount.loadInProgress()
tempErrored := tempCount.loadAndClearErrored()
if tempSucceeded == 0 && tempInProgress == 0 && tempErrored == 0 {
return true
}
var loadMetricStats []*loadreportpb.EndpointLoadMetricStats
tempCount.serverLoads.Range(func(name, data interface{}) bool {
tempName := name.(string)
tempSum, tempCount := data.(*rpcLoadData).loadAndClear()
if tempCount == 0 {
return true
}
loadMetricStats = append(loadMetricStats,
&loadreportpb.EndpointLoadMetricStats{
MetricName: tempName,
NumRequestsFinishedWithMetric: tempCount,
TotalMetricValue: tempSum,
},
)
return true
})
localityStats = append(localityStats, &loadreportpb.UpstreamLocalityStats{
Locality: &basepb.Locality{
Region: tempLocality.Region,
Zone: tempLocality.Zone,
SubZone: tempLocality.SubZone,
},
TotalSuccessfulRequests: tempSucceeded,
TotalRequestsInProgress: tempInProgress,
TotalErrorRequests: tempErrored,
LoadMetricStats: loadMetricStats,
UpstreamEndpointStats: nil, // TODO: populate for per endpoint loads.
})
return true
})
dur := time.Since(ls.lastReported)
ls.lastReported = time.Now()
var ret []*loadreportpb.ClusterStats
ret = append(ret, &loadreportpb.ClusterStats{
ClusterName: clusterName,
UpstreamLocalityStats: localityStats,
TotalDroppedRequests: totalDropped,
DroppedRequests: droppedReqs,
LoadReportInterval: ptypes.DurationProto(dur),
})
return ret
}
// ReportTo makes a streaming lrs call to cc and blocks.
//
// It retries the call (with backoff) until ctx is canceled.
func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn) {
c := lrsgrpc.NewLoadReportingServiceClient(cc)
var (
retryCount int
doBackoff bool
)
for {
select {
case <-ctx.Done():
return
default:
}
if doBackoff {
backoffTimer := time.NewTimer(ls.backoff.Backoff(retryCount))
select {
case <-backoffTimer.C:
case <-ctx.Done():
backoffTimer.Stop()
return
}
retryCount++
}
doBackoff = true
stream, err := c.StreamLoadStats(ctx)
if err != nil {
grpclog.Infof("lrs: failed to create stream: %v", err)
continue
}
if err := stream.Send(&lrspb.LoadStatsRequest{
Node: ls.node,
}); err != nil {
grpclog.Infof("lrs: failed to send first request: %v", err)
continue
}
first, err := stream.Recv()
if err != nil {
grpclog.Infof("lrs: failed to receive first response: %v", err)
continue
}
interval, err := ptypes.Duration(first.LoadReportingInterval)
if err != nil {
grpclog.Infof("lrs: failed to convert report interval: %v", err)
continue
}
if len(first.Clusters) != 1 {
grpclog.Infof("lrs: received multiple clusters %v, expect one cluster", first.Clusters)
continue
}
if first.ReportEndpointGranularity {
// TODO: fixme to support per endpoint loads.
grpclog.Infof("lrs: endpoint loads requested, but not supported by current implementation")
continue
}
// No backoff afterwards.
doBackoff = false
retryCount = 0
ls.sendLoads(ctx, stream, first.Clusters[0], interval)
}
}
func (ls *lrsStore) sendLoads(ctx context.Context, stream lrsgrpc.LoadReportingService_StreamLoadStatsClient, clusterName string, interval time.Duration) {
tick := time.NewTicker(interval)
defer tick.Stop()
for {
select {
case <-tick.C:
case <-ctx.Done():
return
}
if err := stream.Send(&lrspb.LoadStatsRequest{
Node: ls.node,
ClusterStats: ls.buildStats(clusterName),
}); err != nil {
grpclog.Infof("lrs: failed to send report: %v", err)
return
}
}
}