| /* |
| * |
| * Copyright 2019 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package client |
| |
| import ( |
| "context" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/xds/internal/client/load" |
| ) |
| |
| // ReportLoad starts an load reporting stream to the given server. If the server |
| // is not an empty string, and is different from the management server, a new |
| // ClientConn will be created. |
| // |
| // The same options used for creating the Client will be used (including |
| // NodeProto, and dial options if necessary). |
| // |
| // It returns a Store for the user to report loads, a function to cancel the |
| // load reporting stream. |
| func (c *clientImpl) ReportLoad(server string) (*load.Store, func()) { |
| c.lrsMu.Lock() |
| defer c.lrsMu.Unlock() |
| |
| // If there's already a client to this server, use it. Otherwise, create |
| // one. |
| lrsC, ok := c.lrsClients[server] |
| if !ok { |
| lrsC = newLRSClient(c, server) |
| c.lrsClients[server] = lrsC |
| } |
| |
| store := lrsC.ref() |
| return store, func() { |
| // This is a callback, need to hold lrsMu. |
| c.lrsMu.Lock() |
| defer c.lrsMu.Unlock() |
| if lrsC.unRef() { |
| // Delete the lrsClient from map if this is the last reference. |
| delete(c.lrsClients, server) |
| } |
| } |
| } |
| |
| // lrsClient maps to one lrsServer. It contains: |
| // - a ClientConn to this server (only if it's different from the management |
| // server) |
| // - a load.Store that contains loads only for this server |
| type lrsClient struct { |
| parent *clientImpl |
| server string |
| |
| cc *grpc.ClientConn // nil if the server is same as the management server |
| refCount int |
| cancelStream func() |
| loadStore *load.Store |
| } |
| |
| // newLRSClient creates a new LRS stream to the server. |
| func newLRSClient(parent *clientImpl, server string) *lrsClient { |
| return &lrsClient{ |
| parent: parent, |
| server: server, |
| refCount: 0, |
| } |
| } |
| |
| // ref increments the refCount. If this is the first ref, it starts the LRS stream. |
| // |
| // Not thread-safe, caller needs to synchronize. |
| func (lrsC *lrsClient) ref() *load.Store { |
| lrsC.refCount++ |
| if lrsC.refCount == 1 { |
| lrsC.startStream() |
| } |
| return lrsC.loadStore |
| } |
| |
| // unRef decrements the refCount, and closes the stream if refCount reaches 0 |
| // (and close the cc if cc is not xDS cc). It returns whether refCount reached 0 |
| // after this call. |
| // |
| // Not thread-safe, caller needs to synchronize. |
| func (lrsC *lrsClient) unRef() (closed bool) { |
| lrsC.refCount-- |
| if lrsC.refCount != 0 { |
| return false |
| } |
| lrsC.parent.logger.Infof("Stopping load report to server: %s", lrsC.server) |
| lrsC.cancelStream() |
| if lrsC.cc != nil { |
| lrsC.cc.Close() |
| } |
| return true |
| } |
| |
| // startStream starts the LRS stream to the server. If server is not the same |
| // management server from the parent, it also creates a ClientConn. |
| func (lrsC *lrsClient) startStream() { |
| var cc *grpc.ClientConn |
| |
| lrsC.parent.logger.Infof("Starting load report to server: %s", lrsC.server) |
| if lrsC.server == "" || lrsC.server == lrsC.parent.config.BalancerName { |
| // Reuse the xDS client if server is the same. |
| cc = lrsC.parent.cc |
| } else { |
| lrsC.parent.logger.Infof("LRS server is different from management server, starting a new ClientConn") |
| ccNew, err := grpc.Dial(lrsC.server, lrsC.parent.config.Creds) |
| if err != nil { |
| // An error from a non-blocking dial indicates something serious. |
| lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err) |
| return |
| } |
| cc = ccNew |
| lrsC.cc = ccNew |
| } |
| |
| var ctx context.Context |
| ctx, lrsC.cancelStream = context.WithCancel(context.Background()) |
| |
| // Create the store and stream. |
| lrsC.loadStore = load.NewStore() |
| go lrsC.parent.apiClient.reportLoad(ctx, cc, loadReportingOptions{loadStore: lrsC.loadStore}) |
| } |