blob: 559033116667aa191b55757f462818f0a4fb9da7 [file] [log] [blame]
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package orca
import (
"context"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/orca/internal"
"google.golang.org/grpc/status"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
"google.golang.org/protobuf/types/known/durationpb"
)
type producerBuilder struct{}
// Build constructs and returns a producer and its cleanup function
func (*producerBuilder) Build(cci interface{}) (balancer.Producer, func()) {
ctx, cancel := context.WithCancel(context.Background())
p := &producer{
client: v3orcaservicegrpc.NewOpenRcaServiceClient(cci.(grpc.ClientConnInterface)),
closed: grpcsync.NewEvent(),
intervals: make(map[time.Duration]int),
listeners: make(map[OOBListener]struct{}),
backoff: internal.DefaultBackoffFunc,
}
go p.run(ctx)
return p, func() {
cancel()
<-p.closed.Done() // Block until stream stopped.
}
}
var producerBuilderSingleton = &producerBuilder{}
// OOBListener is used to receive out-of-band load reports as they arrive.
type OOBListener interface {
// OnLoadReport is called when a load report is received.
OnLoadReport(*v3orcapb.OrcaLoadReport)
}
// OOBListenerOptions contains options to control how an OOBListener is called.
type OOBListenerOptions struct {
// ReportInterval specifies how often to request the server to provide a
// load report. May be provided less frequently if the server requires a
// longer interval, or may be provided more frequently if another
// subscriber requests a shorter interval.
ReportInterval time.Duration
}
// RegisterOOBListener registers an out-of-band load report listener on sc.
// Any OOBListener may only be registered once per subchannel at a time. The
// returned stop function must be called when no longer needed. Do not
// register a single OOBListener more than once per SubConn.
func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOptions) (stop func()) {
pr, close := sc.GetOrBuildProducer(producerBuilderSingleton)
p := pr.(*producer)
p.registerListener(l, opts.ReportInterval)
// TODO: When we can register for SubConn state updates, don't call run()
// until READY and automatically call stop() on SHUTDOWN.
// If stop is called multiple times, prevent it from having any effect on
// subsequent calls.
return grpcsync.OnceFunc(func() {
p.unregisterListener(l, opts.ReportInterval)
close()
})
}
type producer struct {
client v3orcaservicegrpc.OpenRcaServiceClient
closed *grpcsync.Event // fired when closure completes
// backoff is called between stream attempts to determine how long to delay
// to avoid overloading a server experiencing problems. The attempt count
// is incremented when stream errors occur and is reset when the stream
// reports a result.
backoff func(int) time.Duration
mu sync.Mutex
intervals map[time.Duration]int // map from interval time to count of listeners requesting that time
listeners map[OOBListener]struct{} // set of registered listeners
}
// registerListener adds the listener and its requested report interval to the
// producer.
func (p *producer) registerListener(l OOBListener, interval time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()
p.listeners[l] = struct{}{}
p.intervals[interval]++
}
// registerListener removes the listener and its requested report interval to
// the producer.
func (p *producer) unregisterListener(l OOBListener, interval time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.listeners, l)
p.intervals[interval]--
if p.intervals[interval] == 0 {
delete(p.intervals, interval)
}
}
// minInterval returns the smallest key in p.intervals.
func (p *producer) minInterval() time.Duration {
p.mu.Lock()
defer p.mu.Unlock()
var min time.Duration
first := true
for t := range p.intervals {
if t < min || first {
min = t
first = false
}
}
return min
}
// run manages the ORCA OOB stream on the subchannel.
func (p *producer) run(ctx context.Context) {
defer p.closed.Fire()
backoffAttempt := 0
backoffTimer := time.NewTimer(0)
for ctx.Err() == nil {
select {
case <-backoffTimer.C:
case <-ctx.Done():
return
}
resetBackoff, err := p.runStream(ctx)
if resetBackoff {
backoffTimer.Reset(0)
backoffAttempt = 0
} else {
backoffTimer.Reset(p.backoff(backoffAttempt))
backoffAttempt++
}
switch {
case err == nil:
// No error was encountered; restart the stream.
case ctx.Err() != nil:
// Producer was stopped; exit immediately and without logging an
// error.
return
case status.Code(err) == codes.Unimplemented:
// Unimplemented; do not retry.
logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.")
return
case status.Code(err) == codes.Unavailable:
// The SubConn is not currently ready; backoff silently.
//
// TODO: don't attempt the stream until the state is READY to
// minimize the chances of this case and to avoid using the
// exponential backoff mechanism, as we should know it's safe to
// retry when the state is READY again.
default:
// Log all other errors.
logger.Error("Received unexpected stream error:", err)
}
}
}
// runStream runs a single stream on the subchannel and returns the resulting
// error, if any, and whether or not the run loop should reset the backoff
// timer to zero or advance it.
func (p *producer) runStream(ctx context.Context) (resetBackoff bool, err error) {
interval := p.minInterval()
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := p.client.StreamCoreMetrics(streamCtx, &v3orcaservicepb.OrcaLoadReportRequest{
ReportInterval: durationpb.New(interval),
})
if err != nil {
return false, err
}
for {
report, err := stream.Recv()
if err != nil {
return resetBackoff, err
}
resetBackoff = true
p.mu.Lock()
for l := range p.listeners {
l.OnLoadReport(report)
}
p.mu.Unlock()
if interval != p.minInterval() {
// restart stream to use new interval
return true, nil
}
}
}