blob: bacc4a89ab0b0b78ea6bd5f0b843e2f359c1a919 [file] [log] [blame]
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package orca implements Open Request Cost Aggregation, which is an open
// standard for request cost aggregation and reporting by backends and the
// corresponding aggregation of such reports by L7 load balancers (such as
// Envoy) on the data plane. In a proxyless world with gRPC enabled
// applications, aggregation of such reports will be done by the gRPC client.
//
// # Experimental
//
// Notice: All APIs is this package are EXPERIMENTAL and may be changed or
// removed in a later release.
package orca
import (
"context"
"errors"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)
var (
logger = grpclog.Component("orca-backend-metrics")
joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)
)
const trailerMetadataKey = "endpoint-load-metrics-bin"
// CallMetricsServerOption returns a server option which enables the reporting
// of per-RPC custom backend metrics for unary and streaming RPCs.
//
// Server applications interested in injecting custom backend metrics should
// pass the server option returned from this function as the first argument to
// grpc.NewServer().
//
// Subsequently, server RPC handlers can retrieve a reference to the RPC
// specific custom metrics recorder [CallMetricRecorder] to be used, via a call
// to CallMetricRecorderFromContext(), and inject custom metrics at any time
// during the RPC lifecycle.
//
// The injected custom metrics will be sent as part of trailer metadata, as a
// binary-encoded [ORCA LoadReport] protobuf message, with the metadata key
// being set be "endpoint-load-metrics-bin".
//
// [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15
func CallMetricsServerOption() grpc.ServerOption {
return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt), grpc.ChainStreamInterceptor(streamInt))
}
func unaryInt(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// We don't allocate the metric recorder here. It will be allocated the
// first time the user calls CallMetricRecorderFromContext().
rw := &recorderWrapper{}
ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw)
resp, err := handler(ctxWithRecorder, req)
// It is safe to access the underlying metric recorder inside the wrapper at
// this point, as the user's RPC handler is done executing, and therefore
// there will be no more calls to CallMetricRecorderFromContext(), which is
// where the metric recorder is lazy allocated.
if rw.r == nil {
return resp, err
}
setTrailerMetadata(ctx, rw.r)
return resp, err
}
func streamInt(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// We don't allocate the metric recorder here. It will be allocated the
// first time the user calls CallMetricRecorderFromContext().
rw := &recorderWrapper{}
ws := &wrappedStream{
ServerStream: ss,
ctx: newContextWithRecorderWrapper(ss.Context(), rw),
}
err := handler(srv, ws)
// It is safe to access the underlying metric recorder inside the wrapper at
// this point, as the user's RPC handler is done executing, and therefore
// there will be no more calls to CallMetricRecorderFromContext(), which is
// where the metric recorder is lazy allocated.
if rw.r == nil {
return err
}
setTrailerMetadata(ss.Context(), rw.r)
return err
}
// setTrailerMetadata adds a trailer metadata entry with key being set to
// `trailerMetadataKey` and value being set to the binary-encoded
// orca.OrcaLoadReport protobuf message.
//
// This function is called from the unary and streaming interceptors defined
// above. Any errors encountered here are not propagated to the caller because
// they are ignored there. Hence we simply log any errors encountered here at
// warning level, and return nothing.
func setTrailerMetadata(ctx context.Context, r *CallMetricRecorder) {
b, err := proto.Marshal(r.toLoadReportProto())
if err != nil {
logger.Warningf("failed to marshal load report: %v", err)
return
}
if err := grpc.SetTrailer(ctx, metadata.Pairs(trailerMetadataKey, string(b))); err != nil {
logger.Warningf("failed to set trailer metadata: %v", err)
}
}
// wrappedStream wraps the grpc.ServerStream received by the streaming
// interceptor. Overrides only the Context() method to return a context which
// contains a reference to the CallMetricRecorder corresponding to this stream.
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) Context() context.Context {
return w.ctx
}
// ErrLoadReportMissing indicates no ORCA load report was found in trailers.
var ErrLoadReportMissing = errors.New("orca load report missing in provided metadata")
// ToLoadReport unmarshals a binary encoded [ORCA LoadReport] protobuf message
// from md and returns the corresponding struct. The load report is expected to
// be stored as the value for key "endpoint-load-metrics-bin".
//
// If no load report was found in the provided metadata, ErrLoadReportMissing is
// returned.
//
// [ORCA LoadReport]: (https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15)
func ToLoadReport(md metadata.MD) (*v3orcapb.OrcaLoadReport, error) {
vs := md.Get(trailerMetadataKey)
if len(vs) == 0 {
return nil, ErrLoadReportMissing
}
ret := new(v3orcapb.OrcaLoadReport)
if err := proto.Unmarshal([]byte(vs[0]), ret); err != nil {
return nil, fmt.Errorf("failed to unmarshal load report found in metadata: %v", err)
}
return ret, nil
}
// loadParser implements the Parser interface defined in `internal/balancerload`
// package. This interface is used by the client stream to parse load reports
// sent by the server in trailer metadata. The parsed loads are then sent to
// balancers via balancer.DoneInfo.
//
// The grpc package cannot directly call orca.ToLoadReport() as that would cause
// an import cycle. Hence this roundabout method is used.
type loadParser struct{}
func (loadParser) Parse(md metadata.MD) interface{} {
lr, err := ToLoadReport(md)
if err != nil {
logger.Errorf("Parse(%v) failed: %v", err)
}
return lr
}
func init() {
balancerload.SetParser(loadParser{})
}