blob: be42a6e0c383141bfacd6b3603c7d0cd6614bc8f [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package client
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal/client/load"
)
// ReportLoad starts an load reporting stream to the given server. If the server
// is not an empty string, and is different from the management server, a new
// ClientConn will be created.
//
// The same options used for creating the Client will be used (including
// NodeProto, and dial options if necessary).
//
// It returns a Store for the user to report loads, a function to cancel the
// load reporting stream.
func (c *clientImpl) ReportLoad(server string) (*load.Store, func()) {
c.lrsMu.Lock()
defer c.lrsMu.Unlock()
// If there's already a client to this server, use it. Otherwise, create
// one.
lrsC, ok := c.lrsClients[server]
if !ok {
lrsC = newLRSClient(c, server)
c.lrsClients[server] = lrsC
}
store := lrsC.ref()
return store, func() {
// This is a callback, need to hold lrsMu.
c.lrsMu.Lock()
defer c.lrsMu.Unlock()
if lrsC.unRef() {
// Delete the lrsClient from map if this is the last reference.
delete(c.lrsClients, server)
}
}
}
// lrsClient maps to one lrsServer. It contains:
// - a ClientConn to this server (only if it's different from the management
// server)
// - a load.Store that contains loads only for this server
type lrsClient struct {
parent *clientImpl
server string
cc *grpc.ClientConn // nil if the server is same as the management server
refCount int
cancelStream func()
loadStore *load.Store
}
// newLRSClient creates a new LRS stream to the server.
func newLRSClient(parent *clientImpl, server string) *lrsClient {
return &lrsClient{
parent: parent,
server: server,
refCount: 0,
}
}
// ref increments the refCount. If this is the first ref, it starts the LRS stream.
//
// Not thread-safe, caller needs to synchronize.
func (lrsC *lrsClient) ref() *load.Store {
lrsC.refCount++
if lrsC.refCount == 1 {
lrsC.startStream()
}
return lrsC.loadStore
}
// unRef decrements the refCount, and closes the stream if refCount reaches 0
// (and close the cc if cc is not xDS cc). It returns whether refCount reached 0
// after this call.
//
// Not thread-safe, caller needs to synchronize.
func (lrsC *lrsClient) unRef() (closed bool) {
lrsC.refCount--
if lrsC.refCount != 0 {
return false
}
lrsC.parent.logger.Infof("Stopping load report to server: %s", lrsC.server)
lrsC.cancelStream()
if lrsC.cc != nil {
lrsC.cc.Close()
}
return true
}
// startStream starts the LRS stream to the server. If server is not the same
// management server from the parent, it also creates a ClientConn.
func (lrsC *lrsClient) startStream() {
var cc *grpc.ClientConn
lrsC.parent.logger.Infof("Starting load report to server: %s", lrsC.server)
if lrsC.server == "" || lrsC.server == lrsC.parent.config.BalancerName {
// Reuse the xDS client if server is the same.
cc = lrsC.parent.cc
} else {
lrsC.parent.logger.Infof("LRS server is different from management server, starting a new ClientConn")
ccNew, err := grpc.Dial(lrsC.server, lrsC.parent.config.Creds)
if err != nil {
// An error from a non-blocking dial indicates something serious.
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
return
}
cc = ccNew
lrsC.cc = ccNew
}
var ctx context.Context
ctx, lrsC.cancelStream = context.WithCancel(context.Background())
// Create the store and stream.
lrsC.loadStore = load.NewStore()
go lrsC.parent.apiClient.reportLoad(ctx, cc, loadReportingOptions{loadStore: lrsC.loadStore})
}