blob: f8e7673f7d8e71fa0aabca84667a9fa94fa5c800 [file] [log] [blame]
Menghan Li16057562020-08-13 11:47:23 -07001/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package lrs implements load reporting balancer for xds.
20package lrs
21
22import (
23 "encoding/json"
24 "fmt"
Menghan Li0dc99862020-09-22 14:26:20 -070025 "sync"
Menghan Li16057562020-08-13 11:47:23 -070026
27 "google.golang.org/grpc/attributes"
28 "google.golang.org/grpc/balancer"
29 "google.golang.org/grpc/internal/grpclog"
30 "google.golang.org/grpc/serviceconfig"
31 "google.golang.org/grpc/xds/internal"
32 xdsinternal "google.golang.org/grpc/xds/internal"
Easwar Swaminathanb5802b52020-09-01 16:56:52 -070033 "google.golang.org/grpc/xds/internal/client/load"
Menghan Li16057562020-08-13 11:47:23 -070034)
35
36func init() {
37 balancer.Register(&lrsBB{})
38}
39
40const lrsBalancerName = "lrs_experimental"
41
42type lrsBB struct{}
43
44func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
45 b := &lrsBalancer{
46 cc: cc,
47 buildOpts: opts,
48 }
Menghan Li0dc99862020-09-22 14:26:20 -070049 b.client = newXDSClientWrapper()
Menghan Li16057562020-08-13 11:47:23 -070050 b.logger = prefixLogger(b)
51 b.logger.Infof("Created")
52 return b
53}
54
55func (l *lrsBB) Name() string {
56 return lrsBalancerName
57}
58
59func (l *lrsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
60 return parseConfig(c)
61}
62
63type lrsBalancer struct {
64 cc balancer.ClientConn
65 buildOpts balancer.BuildOptions
66
Easwar Swaminathanb5802b52020-09-01 16:56:52 -070067 logger *grpclog.PrefixLogger
68 client *xdsClientWrapper
Menghan Li16057562020-08-13 11:47:23 -070069
70 config *lbConfig
71 lb balancer.Balancer // The sub balancer.
72}
73
74func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
75 newConfig, ok := s.BalancerConfig.(*lbConfig)
76 if !ok {
77 return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
78 }
79
Easwar Swaminathanb5802b52020-09-01 16:56:52 -070080 // Update load reporting config or xds client. This needs to be done before
81 // updating the child policy because we need the loadStore from the updated
82 // client to be passed to the ccWrapper.
Menghan Lia2232512020-10-26 15:47:47 -070083 if err := b.client.update(newConfig, s.ResolverState.Attributes); err != nil {
84 return err
85 }
Easwar Swaminathanb5802b52020-09-01 16:56:52 -070086
Menghan Li16057562020-08-13 11:47:23 -070087 // If child policy is a different type, recreate the sub-balancer.
88 if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
89 bb := balancer.Get(newConfig.ChildPolicy.Name)
90 if bb == nil {
91 return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
92 }
93 if b.lb != nil {
94 b.lb.Close()
95 }
Easwar Swaminathanb5802b52020-09-01 16:56:52 -070096 b.lb = bb.Build(newCCWrapper(b.cc, b.client.loadStore(), newConfig.Locality), b.buildOpts)
Menghan Li16057562020-08-13 11:47:23 -070097 }
Menghan Li16057562020-08-13 11:47:23 -070098 b.config = newConfig
99
100 // Addresses and sub-balancer config are sent to sub-balancer.
101 return b.lb.UpdateClientConnState(balancer.ClientConnState{
102 ResolverState: s.ResolverState,
103 BalancerConfig: b.config.ChildPolicy.Config,
104 })
105}
106
107func (b *lrsBalancer) ResolverError(err error) {
108 if b.lb != nil {
109 b.lb.ResolverError(err)
110 }
111}
112
113func (b *lrsBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
114 if b.lb != nil {
115 b.lb.UpdateSubConnState(sc, s)
116 }
117}
118
119func (b *lrsBalancer) Close() {
120 if b.lb != nil {
121 b.lb.Close()
122 b.lb = nil
123 }
124 b.client.close()
125}
126
127type ccWrapper struct {
128 balancer.ClientConn
Menghan Li0dc99862020-09-22 14:26:20 -0700129 loadStore load.PerClusterReporter
Menghan Li16057562020-08-13 11:47:23 -0700130 localityID *internal.LocalityID
131}
132
Menghan Li0dc99862020-09-22 14:26:20 -0700133func newCCWrapper(cc balancer.ClientConn, loadStore load.PerClusterReporter, localityID *internal.LocalityID) *ccWrapper {
Menghan Li16057562020-08-13 11:47:23 -0700134 return &ccWrapper{
135 ClientConn: cc,
136 loadStore: loadStore,
137 localityID: localityID,
138 }
139}
140
141func (ccw *ccWrapper) UpdateState(s balancer.State) {
142 s.Picker = newLoadReportPicker(s.Picker, *ccw.localityID, ccw.loadStore)
143 ccw.ClientConn.UpdateState(s)
144}
145
146// xdsClientInterface contains only the xds_client methods needed by LRS
147// balancer. It's defined so we can override xdsclient in tests.
148type xdsClientInterface interface {
Menghan Lia2232512020-10-26 15:47:47 -0700149 ReportLoad(server string) (*load.Store, func())
Menghan Li16057562020-08-13 11:47:23 -0700150 Close()
151}
152
Menghan Li0dc99862020-09-22 14:26:20 -0700153type loadStoreWrapper struct {
154 mu sync.RWMutex
Menghan Li0dc99862020-09-22 14:26:20 -0700155 cluster string
156 edsService string
Menghan Lia2232512020-10-26 15:47:47 -0700157 // Both store and perCluster will be nil if load reporting is disabled (EDS
158 // response doesn't have LRS server name). Note that methods on Store and
159 // perCluster all handle nil, so there's no need to check nil before calling
160 // them.
161 store *load.Store
Menghan Li5af60402020-10-07 10:39:58 -0700162 perCluster load.PerClusterReporter
Menghan Li0dc99862020-09-22 14:26:20 -0700163}
164
Menghan Lia2232512020-10-26 15:47:47 -0700165func (lsw *loadStoreWrapper) updateClusterAndService(cluster, edsService string) {
Menghan Li0dc99862020-09-22 14:26:20 -0700166 lsw.mu.Lock()
167 defer lsw.mu.Unlock()
Menghan Lia2232512020-10-26 15:47:47 -0700168 if cluster == lsw.cluster && edsService == lsw.edsService {
169 return
170 }
171 lsw.cluster = cluster
172 lsw.edsService = edsService
173 lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
174}
175
176func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
177 lsw.mu.Lock()
178 defer lsw.mu.Unlock()
179 if store == lsw.store {
Menghan Li5af60402020-10-07 10:39:58 -0700180 return
181 }
Menghan Li0dc99862020-09-22 14:26:20 -0700182 lsw.store = store
Menghan Lia2232512020-10-26 15:47:47 -0700183 lsw.perCluster = nil
Menghan Li5af60402020-10-07 10:39:58 -0700184 lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
Menghan Li0dc99862020-09-22 14:26:20 -0700185}
186
187func (lsw *loadStoreWrapper) CallStarted(locality string) {
188 lsw.mu.RLock()
189 defer lsw.mu.RUnlock()
Menghan Li5af60402020-10-07 10:39:58 -0700190 lsw.perCluster.CallStarted(locality)
Menghan Li0dc99862020-09-22 14:26:20 -0700191}
192
193func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
194 lsw.mu.RLock()
195 defer lsw.mu.RUnlock()
Menghan Li5af60402020-10-07 10:39:58 -0700196 lsw.perCluster.CallFinished(locality, err)
Menghan Li0dc99862020-09-22 14:26:20 -0700197}
198
199func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
200 lsw.mu.RLock()
201 defer lsw.mu.RUnlock()
Menghan Li5af60402020-10-07 10:39:58 -0700202 lsw.perCluster.CallServerLoad(locality, name, val)
Menghan Li0dc99862020-09-22 14:26:20 -0700203}
204
205func (lsw *loadStoreWrapper) CallDropped(category string) {
206 lsw.mu.RLock()
207 defer lsw.mu.RUnlock()
Menghan Li5af60402020-10-07 10:39:58 -0700208 lsw.perCluster.CallDropped(category)
Menghan Li0dc99862020-09-22 14:26:20 -0700209}
210
Menghan Li16057562020-08-13 11:47:23 -0700211type xdsClientWrapper struct {
Menghan Li16057562020-08-13 11:47:23 -0700212 c xdsClientInterface
213 cancelLoadReport func()
214 clusterName string
Menghan Li0dc99862020-09-22 14:26:20 -0700215 edsServiceName string
Menghan Li16057562020-08-13 11:47:23 -0700216 lrsServerName string
Menghan Lia2232512020-10-26 15:47:47 -0700217 // loadWrapper is a wrapper with loadOriginal, with clusterName and
218 // edsServiceName. It's used children to report loads.
219 loadWrapper *loadStoreWrapper
Menghan Li0dc99862020-09-22 14:26:20 -0700220}
221
222func newXDSClientWrapper() *xdsClientWrapper {
223 return &xdsClientWrapper{
Menghan Lia2232512020-10-26 15:47:47 -0700224 loadWrapper: &loadStoreWrapper{},
Menghan Li0dc99862020-09-22 14:26:20 -0700225 }
Menghan Li16057562020-08-13 11:47:23 -0700226}
227
Menghan Li16057562020-08-13 11:47:23 -0700228// update checks the config and xdsclient, and decides whether it needs to
229// restart the load reporting stream.
Menghan Lia2232512020-10-26 15:47:47 -0700230func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) error {
Menghan Li0dc99862020-09-22 14:26:20 -0700231 var (
Menghan Lia2232512020-10-26 15:47:47 -0700232 restartLoadReport bool
233 updateLoadClusterAndService bool
Menghan Li0dc99862020-09-22 14:26:20 -0700234 )
Menghan Lia2232512020-10-26 15:47:47 -0700235
236 if attr == nil {
237 return fmt.Errorf("lrs: failed to get xdsClient from attributes: attributes is nil")
238 }
239 clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface)
240 if clientFromAttr == nil {
241 return fmt.Errorf("lrs: failed to get xdsClient from attributes: xdsClient not found in attributes")
242 }
243
244 if w.c != clientFromAttr {
245 // xds client is different, restart.
246 restartLoadReport = true
247 w.c = clientFromAttr
Menghan Li16057562020-08-13 11:47:23 -0700248 }
249
250 // ClusterName is different, restart. ClusterName is from ClusterName and
251 // EdsServiceName.
Menghan Li0dc99862020-09-22 14:26:20 -0700252 if w.clusterName != newConfig.ClusterName {
Menghan Lia2232512020-10-26 15:47:47 -0700253 updateLoadClusterAndService = true
Menghan Li0dc99862020-09-22 14:26:20 -0700254 w.clusterName = newConfig.ClusterName
Menghan Li16057562020-08-13 11:47:23 -0700255 }
Menghan Li0dc99862020-09-22 14:26:20 -0700256 if w.edsServiceName != newConfig.EdsServiceName {
Menghan Lia2232512020-10-26 15:47:47 -0700257 updateLoadClusterAndService = true
Menghan Li0dc99862020-09-22 14:26:20 -0700258 w.edsServiceName = newConfig.EdsServiceName
Menghan Li16057562020-08-13 11:47:23 -0700259 }
260
Menghan Lia2232512020-10-26 15:47:47 -0700261 if updateLoadClusterAndService {
262 // This updates the clusterName and serviceName that will reported for the
263 // loads. The update here is too early, the perfect timing is when the
264 // picker is updated with the new connection. But from this balancer's point
265 // of view, it's impossible to tell.
266 //
267 // On the other hand, this will almost never happen. Each LRS policy
268 // shouldn't get updated config. The parent should do a graceful switch when
269 // the clusterName or serviceName is changed.
270 w.loadWrapper.updateClusterAndService(w.clusterName, w.edsServiceName)
271 }
272
Menghan Li16057562020-08-13 11:47:23 -0700273 if w.lrsServerName != newConfig.LrsLoadReportingServerName {
274 // LrsLoadReportingServerName is different, load should be report to a
275 // different server, restart.
276 restartLoadReport = true
277 w.lrsServerName = newConfig.LrsLoadReportingServerName
278 }
279
280 if restartLoadReport {
281 if w.cancelLoadReport != nil {
282 w.cancelLoadReport()
283 w.cancelLoadReport = nil
284 }
Menghan Lia2232512020-10-26 15:47:47 -0700285 var loadStore *load.Store
Menghan Li16057562020-08-13 11:47:23 -0700286 if w.c != nil {
Menghan Lia2232512020-10-26 15:47:47 -0700287 loadStore, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName)
Menghan Li16057562020-08-13 11:47:23 -0700288 }
Menghan Lia2232512020-10-26 15:47:47 -0700289 w.loadWrapper.updateLoadStore(loadStore)
Menghan Li16057562020-08-13 11:47:23 -0700290 }
Menghan Lia2232512020-10-26 15:47:47 -0700291
292 return nil
Menghan Li16057562020-08-13 11:47:23 -0700293}
294
Menghan Li0dc99862020-09-22 14:26:20 -0700295func (w *xdsClientWrapper) loadStore() load.PerClusterReporter {
Menghan Lia2232512020-10-26 15:47:47 -0700296 return w.loadWrapper
Easwar Swaminathanb5802b52020-09-01 16:56:52 -0700297}
298
Menghan Li16057562020-08-13 11:47:23 -0700299func (w *xdsClientWrapper) close() {
300 if w.cancelLoadReport != nil {
301 w.cancelLoadReport()
302 w.cancelLoadReport = nil
303 }
304}