blob: 7461a6b05a1a106f5e7b5b5a5f4108ee151801a6 [file] [log] [blame]
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package orca
import (
"fmt"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal"
ointernal "google.golang.org/grpc/orca/internal"
"google.golang.org/grpc/status"
v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
)
func init() {
ointernal.AllowAnyMinReportingInterval = func(so *ServiceOptions) {
so.allowAnyMinReportingInterval = true
}
internal.ORCAAllowAnyMinReportingInterval = ointernal.AllowAnyMinReportingInterval
}
// minReportingInterval is the absolute minimum value supported for
// out-of-band metrics reporting from the ORCA service implementation
// provided by the orca package.
const minReportingInterval = 30 * time.Second
// Service provides an implementation of the OpenRcaService as defined in the
// [ORCA] service protos. Instances of this type must be created via calls to
// Register() or NewService().
//
// Server applications can use the SetXxx() and DeleteXxx() methods to record
// measurements corresponding to backend metrics, which eventually get pushed to
// clients who have initiated the SteamCoreMetrics streaming RPC.
//
// [ORCA]: https://github.com/cncf/xds/blob/main/xds/service/orca/v3/orca.proto
type Service struct {
v3orcaservicegrpc.UnimplementedOpenRcaServiceServer
// Minimum reporting interval, as configured by the user, or the default.
minReportingInterval time.Duration
smProvider ServerMetricsProvider
}
// ServiceOptions contains options to configure the ORCA service implementation.
type ServiceOptions struct {
// ServerMetricsProvider is the provider to be used by the service for
// reporting OOB server metrics to clients. Typically obtained via
// NewServerMetricsRecorder. This field is required.
ServerMetricsProvider ServerMetricsProvider
// MinReportingInterval sets the lower bound for how often out-of-band
// metrics are reported on the streaming RPC initiated by the client. If
// unspecified, negative or less than the default value of 30s, the default
// is used. Clients may request a higher value as part of the
// StreamCoreMetrics streaming RPC.
MinReportingInterval time.Duration
// Allow a minReportingInterval which is less than the default of 30s.
// Used for testing purposes only.
allowAnyMinReportingInterval bool
}
// A ServerMetricsProvider provides ServerMetrics upon request.
type ServerMetricsProvider interface {
// ServerMetrics returns the current set of server metrics. It should
// return a read-only, immutable copy of the data that is active at the
// time of the call.
ServerMetrics() *ServerMetrics
}
// NewService creates a new ORCA service implementation configured using the
// provided options.
func NewService(opts ServiceOptions) (*Service, error) {
// The default minimum supported reporting interval value can be overridden
// for testing purposes through the orca internal package.
if opts.ServerMetricsProvider == nil {
return nil, fmt.Errorf("ServerMetricsProvider not specified")
}
if !opts.allowAnyMinReportingInterval {
if opts.MinReportingInterval < 0 || opts.MinReportingInterval < minReportingInterval {
opts.MinReportingInterval = minReportingInterval
}
}
service := &Service{
minReportingInterval: opts.MinReportingInterval,
smProvider: opts.ServerMetricsProvider,
}
return service, nil
}
// Register creates a new ORCA service implementation configured using the
// provided options and registers the same on the provided grpc Server.
func Register(s *grpc.Server, opts ServiceOptions) error {
// TODO(https://github.com/cncf/xds/issues/41): replace *grpc.Server with
// grpc.ServiceRegistrar when possible.
service, err := NewService(opts)
if err != nil {
return err
}
v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, service)
return nil
}
// determineReportingInterval determines the reporting interval for out-of-band
// metrics. If the reporting interval is not specified in the request, or is
// negative or is less than the configured minimum (via
// ServiceOptions.MinReportingInterval), the latter is used. Else the value from
// the incoming request is used.
func (s *Service) determineReportingInterval(req *v3orcaservicepb.OrcaLoadReportRequest) time.Duration {
if req.GetReportInterval() == nil {
return s.minReportingInterval
}
dur := req.GetReportInterval().AsDuration()
if dur < s.minReportingInterval {
logger.Warningf("Received reporting interval %q is less than configured minimum: %v. Using minimum", dur, s.minReportingInterval)
return s.minReportingInterval
}
return dur
}
func (s *Service) sendMetricsResponse(stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error {
return stream.Send(s.smProvider.ServerMetrics().toLoadReportProto())
}
// StreamCoreMetrics streams custom backend metrics injected by the server
// application.
func (s *Service) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error {
ticker := time.NewTicker(s.determineReportingInterval(req))
defer ticker.Stop()
for {
if err := s.sendMetricsResponse(stream); err != nil {
return err
}
// Send a response containing the currently recorded metrics
select {
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Stream has ended.")
case <-ticker.C:
}
}
}