blob: 69405fcd9ad3bce2168cb5075900cf60421f19cd [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package v2
import (
"context"
"errors"
"fmt"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/xds/internal/client/load"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
v2endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal"
)
const clientFeatureLRSSendAllClusters = "envoy.lrs.supports_send_all_clusters"
type lrsStream lrsgrpc.LoadReportingService_StreamLoadStatsClient
func (v2c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) {
c := lrsgrpc.NewLoadReportingServiceClient(cc)
return c.StreamLoadStats(ctx)
}
func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error {
stream, ok := s.(lrsStream)
if !ok {
return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s)
}
node := proto.Clone(v2c.nodeProto).(*v2corepb.Node)
if node == nil {
node = &v2corepb.Node{}
}
node.ClientFeatures = append(node.ClientFeatures, clientFeatureLRSSendAllClusters)
req := &lrspb.LoadStatsRequest{Node: node}
v2c.logger.Infof("lrs: sending init LoadStatsRequest: %v", req)
return stream.Send(req)
}
func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) {
stream, ok := s.(lrsStream)
if !ok {
return nil, 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s)
}
resp, err := stream.Recv()
if err != nil {
return nil, 0, fmt.Errorf("lrs: failed to receive first response: %v", err)
}
v2c.logger.Infof("lrs: received first LoadStatsResponse: %+v", resp)
interval, err := ptypes.Duration(resp.GetLoadReportingInterval())
if err != nil {
return nil, 0, fmt.Errorf("lrs: failed to convert report interval: %v", err)
}
if resp.ReportEndpointGranularity {
// TODO: fixme to support per endpoint loads.
return nil, 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation")
}
clusters := resp.Clusters
if resp.SendAllClusters {
// Return nil to send stats for all clusters.
clusters = nil
}
return clusters, interval, nil
}
func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error {
stream, ok := s.(lrsStream)
if !ok {
return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s)
}
var clusterStats []*v2endpointpb.ClusterStats
for _, sd := range loads {
var (
droppedReqs []*v2endpointpb.ClusterStats_DroppedRequests
localityStats []*v2endpointpb.UpstreamLocalityStats
)
for category, count := range sd.Drops {
droppedReqs = append(droppedReqs, &v2endpointpb.ClusterStats_DroppedRequests{
Category: category,
DroppedCount: count,
})
}
for l, localityData := range sd.LocalityStats {
lid, err := internal.LocalityIDFromString(l)
if err != nil {
return err
}
var loadMetricStats []*v2endpointpb.EndpointLoadMetricStats
for name, loadData := range localityData.LoadStats {
loadMetricStats = append(loadMetricStats, &v2endpointpb.EndpointLoadMetricStats{
MetricName: name,
NumRequestsFinishedWithMetric: loadData.Count,
TotalMetricValue: loadData.Sum,
})
}
localityStats = append(localityStats, &v2endpointpb.UpstreamLocalityStats{
Locality: &v2corepb.Locality{
Region: lid.Region,
Zone: lid.Zone,
SubZone: lid.SubZone,
},
TotalSuccessfulRequests: localityData.RequestStats.Succeeded,
TotalRequestsInProgress: localityData.RequestStats.InProgress,
TotalErrorRequests: localityData.RequestStats.Errored,
LoadMetricStats: loadMetricStats,
UpstreamEndpointStats: nil, // TODO: populate for per endpoint loads.
})
}
clusterStats = append(clusterStats, &v2endpointpb.ClusterStats{
ClusterName: sd.Cluster,
ClusterServiceName: sd.Service,
UpstreamLocalityStats: localityStats,
TotalDroppedRequests: sd.TotalDrops,
DroppedRequests: droppedReqs,
LoadReportInterval: ptypes.DurationProto(sd.ReportInterval),
})
}
req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats}
v2c.logger.Infof("lrs: sending LRS loads: %+v", req)
return stream.Send(req)
}