| /* |
| * |
| * Copyright 2022 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package orca |
| |
| import ( |
| "fmt" |
| "time" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/internal" |
| ointernal "google.golang.org/grpc/orca/internal" |
| "google.golang.org/grpc/status" |
| |
| v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3" |
| v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3" |
| ) |
| |
| func init() { |
| ointernal.AllowAnyMinReportingInterval = func(so *ServiceOptions) { |
| so.allowAnyMinReportingInterval = true |
| } |
| internal.ORCAAllowAnyMinReportingInterval = ointernal.AllowAnyMinReportingInterval |
| } |
| |
| // minReportingInterval is the absolute minimum value supported for |
| // out-of-band metrics reporting from the ORCA service implementation |
| // provided by the orca package. |
| const minReportingInterval = 30 * time.Second |
| |
| // Service provides an implementation of the OpenRcaService as defined in the |
| // [ORCA] service protos. Instances of this type must be created via calls to |
| // Register() or NewService(). |
| // |
| // Server applications can use the SetXxx() and DeleteXxx() methods to record |
| // measurements corresponding to backend metrics, which eventually get pushed to |
| // clients who have initiated the SteamCoreMetrics streaming RPC. |
| // |
| // [ORCA]: https://github.com/cncf/xds/blob/main/xds/service/orca/v3/orca.proto |
| type Service struct { |
| v3orcaservicegrpc.UnimplementedOpenRcaServiceServer |
| |
| // Minimum reporting interval, as configured by the user, or the default. |
| minReportingInterval time.Duration |
| |
| smProvider ServerMetricsProvider |
| } |
| |
| // ServiceOptions contains options to configure the ORCA service implementation. |
| type ServiceOptions struct { |
| // ServerMetricsProvider is the provider to be used by the service for |
| // reporting OOB server metrics to clients. Typically obtained via |
| // NewServerMetricsRecorder. This field is required. |
| ServerMetricsProvider ServerMetricsProvider |
| |
| // MinReportingInterval sets the lower bound for how often out-of-band |
| // metrics are reported on the streaming RPC initiated by the client. If |
| // unspecified, negative or less than the default value of 30s, the default |
| // is used. Clients may request a higher value as part of the |
| // StreamCoreMetrics streaming RPC. |
| MinReportingInterval time.Duration |
| |
| // Allow a minReportingInterval which is less than the default of 30s. |
| // Used for testing purposes only. |
| allowAnyMinReportingInterval bool |
| } |
| |
| // A ServerMetricsProvider provides ServerMetrics upon request. |
| type ServerMetricsProvider interface { |
| // ServerMetrics returns the current set of server metrics. It should |
| // return a read-only, immutable copy of the data that is active at the |
| // time of the call. |
| ServerMetrics() *ServerMetrics |
| } |
| |
| // NewService creates a new ORCA service implementation configured using the |
| // provided options. |
| func NewService(opts ServiceOptions) (*Service, error) { |
| // The default minimum supported reporting interval value can be overridden |
| // for testing purposes through the orca internal package. |
| if opts.ServerMetricsProvider == nil { |
| return nil, fmt.Errorf("ServerMetricsProvider not specified") |
| } |
| if !opts.allowAnyMinReportingInterval { |
| if opts.MinReportingInterval < 0 || opts.MinReportingInterval < minReportingInterval { |
| opts.MinReportingInterval = minReportingInterval |
| } |
| } |
| service := &Service{ |
| minReportingInterval: opts.MinReportingInterval, |
| smProvider: opts.ServerMetricsProvider, |
| } |
| return service, nil |
| } |
| |
| // Register creates a new ORCA service implementation configured using the |
| // provided options and registers the same on the provided grpc Server. |
| func Register(s *grpc.Server, opts ServiceOptions) error { |
| // TODO(https://github.com/cncf/xds/issues/41): replace *grpc.Server with |
| // grpc.ServiceRegistrar when possible. |
| service, err := NewService(opts) |
| if err != nil { |
| return err |
| } |
| v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, service) |
| return nil |
| } |
| |
| // determineReportingInterval determines the reporting interval for out-of-band |
| // metrics. If the reporting interval is not specified in the request, or is |
| // negative or is less than the configured minimum (via |
| // ServiceOptions.MinReportingInterval), the latter is used. Else the value from |
| // the incoming request is used. |
| func (s *Service) determineReportingInterval(req *v3orcaservicepb.OrcaLoadReportRequest) time.Duration { |
| if req.GetReportInterval() == nil { |
| return s.minReportingInterval |
| } |
| dur := req.GetReportInterval().AsDuration() |
| if dur < s.minReportingInterval { |
| logger.Warningf("Received reporting interval %q is less than configured minimum: %v. Using minimum", dur, s.minReportingInterval) |
| return s.minReportingInterval |
| } |
| return dur |
| } |
| |
| func (s *Service) sendMetricsResponse(stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error { |
| return stream.Send(s.smProvider.ServerMetrics().toLoadReportProto()) |
| } |
| |
| // StreamCoreMetrics streams custom backend metrics injected by the server |
| // application. |
| func (s *Service) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error { |
| ticker := time.NewTicker(s.determineReportingInterval(req)) |
| defer ticker.Stop() |
| |
| for { |
| if err := s.sendMetricsResponse(stream); err != nil { |
| return err |
| } |
| // Send a response containing the currently recorded metrics |
| select { |
| case <-stream.Context().Done(): |
| return status.Error(codes.Canceled, "Stream has ended.") |
| case <-ticker.C: |
| } |
| } |
| } |