Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 1 | /* |
| 2 | * |
| 3 | * Copyright 2020 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | // Package lrs implements load reporting balancer for xds. |
| 20 | package lrs |
| 21 | |
| 22 | import ( |
| 23 | "encoding/json" |
| 24 | "fmt" |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 25 | "sync" |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 26 | |
| 27 | "google.golang.org/grpc/attributes" |
| 28 | "google.golang.org/grpc/balancer" |
| 29 | "google.golang.org/grpc/internal/grpclog" |
| 30 | "google.golang.org/grpc/serviceconfig" |
| 31 | "google.golang.org/grpc/xds/internal" |
| 32 | xdsinternal "google.golang.org/grpc/xds/internal" |
Easwar Swaminathan | b5802b5 | 2020-09-01 16:56:52 -0700 | [diff] [blame] | 33 | "google.golang.org/grpc/xds/internal/client/load" |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 34 | ) |
| 35 | |
| 36 | func init() { |
| 37 | balancer.Register(&lrsBB{}) |
| 38 | } |
| 39 | |
| 40 | const lrsBalancerName = "lrs_experimental" |
| 41 | |
| 42 | type lrsBB struct{} |
| 43 | |
| 44 | func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { |
| 45 | b := &lrsBalancer{ |
| 46 | cc: cc, |
| 47 | buildOpts: opts, |
| 48 | } |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 49 | b.client = newXDSClientWrapper() |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 50 | b.logger = prefixLogger(b) |
| 51 | b.logger.Infof("Created") |
| 52 | return b |
| 53 | } |
| 54 | |
| 55 | func (l *lrsBB) Name() string { |
| 56 | return lrsBalancerName |
| 57 | } |
| 58 | |
| 59 | func (l *lrsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { |
| 60 | return parseConfig(c) |
| 61 | } |
| 62 | |
| 63 | type lrsBalancer struct { |
| 64 | cc balancer.ClientConn |
| 65 | buildOpts balancer.BuildOptions |
| 66 | |
Easwar Swaminathan | b5802b5 | 2020-09-01 16:56:52 -0700 | [diff] [blame] | 67 | logger *grpclog.PrefixLogger |
| 68 | client *xdsClientWrapper |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 69 | |
| 70 | config *lbConfig |
| 71 | lb balancer.Balancer // The sub balancer. |
| 72 | } |
| 73 | |
| 74 | func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { |
| 75 | newConfig, ok := s.BalancerConfig.(*lbConfig) |
| 76 | if !ok { |
| 77 | return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) |
| 78 | } |
| 79 | |
Easwar Swaminathan | b5802b5 | 2020-09-01 16:56:52 -0700 | [diff] [blame] | 80 | // Update load reporting config or xds client. This needs to be done before |
| 81 | // updating the child policy because we need the loadStore from the updated |
| 82 | // client to be passed to the ccWrapper. |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 83 | if err := b.client.update(newConfig, s.ResolverState.Attributes); err != nil { |
| 84 | return err |
| 85 | } |
Easwar Swaminathan | b5802b5 | 2020-09-01 16:56:52 -0700 | [diff] [blame] | 86 | |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 87 | // If child policy is a different type, recreate the sub-balancer. |
| 88 | if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name { |
| 89 | bb := balancer.Get(newConfig.ChildPolicy.Name) |
| 90 | if bb == nil { |
| 91 | return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name) |
| 92 | } |
| 93 | if b.lb != nil { |
| 94 | b.lb.Close() |
| 95 | } |
Easwar Swaminathan | b5802b5 | 2020-09-01 16:56:52 -0700 | [diff] [blame] | 96 | b.lb = bb.Build(newCCWrapper(b.cc, b.client.loadStore(), newConfig.Locality), b.buildOpts) |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 97 | } |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 98 | b.config = newConfig |
| 99 | |
| 100 | // Addresses and sub-balancer config are sent to sub-balancer. |
| 101 | return b.lb.UpdateClientConnState(balancer.ClientConnState{ |
| 102 | ResolverState: s.ResolverState, |
| 103 | BalancerConfig: b.config.ChildPolicy.Config, |
| 104 | }) |
| 105 | } |
| 106 | |
| 107 | func (b *lrsBalancer) ResolverError(err error) { |
| 108 | if b.lb != nil { |
| 109 | b.lb.ResolverError(err) |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | func (b *lrsBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { |
| 114 | if b.lb != nil { |
| 115 | b.lb.UpdateSubConnState(sc, s) |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | func (b *lrsBalancer) Close() { |
| 120 | if b.lb != nil { |
| 121 | b.lb.Close() |
| 122 | b.lb = nil |
| 123 | } |
| 124 | b.client.close() |
| 125 | } |
| 126 | |
| 127 | type ccWrapper struct { |
| 128 | balancer.ClientConn |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 129 | loadStore load.PerClusterReporter |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 130 | localityID *internal.LocalityID |
| 131 | } |
| 132 | |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 133 | func newCCWrapper(cc balancer.ClientConn, loadStore load.PerClusterReporter, localityID *internal.LocalityID) *ccWrapper { |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 134 | return &ccWrapper{ |
| 135 | ClientConn: cc, |
| 136 | loadStore: loadStore, |
| 137 | localityID: localityID, |
| 138 | } |
| 139 | } |
| 140 | |
| 141 | func (ccw *ccWrapper) UpdateState(s balancer.State) { |
| 142 | s.Picker = newLoadReportPicker(s.Picker, *ccw.localityID, ccw.loadStore) |
| 143 | ccw.ClientConn.UpdateState(s) |
| 144 | } |
| 145 | |
| 146 | // xdsClientInterface contains only the xds_client methods needed by LRS |
| 147 | // balancer. It's defined so we can override xdsclient in tests. |
| 148 | type xdsClientInterface interface { |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 149 | ReportLoad(server string) (*load.Store, func()) |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 150 | Close() |
| 151 | } |
| 152 | |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 153 | type loadStoreWrapper struct { |
| 154 | mu sync.RWMutex |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 155 | cluster string |
| 156 | edsService string |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 157 | // Both store and perCluster will be nil if load reporting is disabled (EDS |
| 158 | // response doesn't have LRS server name). Note that methods on Store and |
| 159 | // perCluster all handle nil, so there's no need to check nil before calling |
| 160 | // them. |
| 161 | store *load.Store |
Menghan Li | 5af6040 | 2020-10-07 10:39:58 -0700 | [diff] [blame] | 162 | perCluster load.PerClusterReporter |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 163 | } |
| 164 | |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 165 | func (lsw *loadStoreWrapper) updateClusterAndService(cluster, edsService string) { |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 166 | lsw.mu.Lock() |
| 167 | defer lsw.mu.Unlock() |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 168 | if cluster == lsw.cluster && edsService == lsw.edsService { |
| 169 | return |
| 170 | } |
| 171 | lsw.cluster = cluster |
| 172 | lsw.edsService = edsService |
| 173 | lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService) |
| 174 | } |
| 175 | |
| 176 | func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) { |
| 177 | lsw.mu.Lock() |
| 178 | defer lsw.mu.Unlock() |
| 179 | if store == lsw.store { |
Menghan Li | 5af6040 | 2020-10-07 10:39:58 -0700 | [diff] [blame] | 180 | return |
| 181 | } |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 182 | lsw.store = store |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 183 | lsw.perCluster = nil |
Menghan Li | 5af6040 | 2020-10-07 10:39:58 -0700 | [diff] [blame] | 184 | lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService) |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 185 | } |
| 186 | |
| 187 | func (lsw *loadStoreWrapper) CallStarted(locality string) { |
| 188 | lsw.mu.RLock() |
| 189 | defer lsw.mu.RUnlock() |
Menghan Li | 5af6040 | 2020-10-07 10:39:58 -0700 | [diff] [blame] | 190 | lsw.perCluster.CallStarted(locality) |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 191 | } |
| 192 | |
| 193 | func (lsw *loadStoreWrapper) CallFinished(locality string, err error) { |
| 194 | lsw.mu.RLock() |
| 195 | defer lsw.mu.RUnlock() |
Menghan Li | 5af6040 | 2020-10-07 10:39:58 -0700 | [diff] [blame] | 196 | lsw.perCluster.CallFinished(locality, err) |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 197 | } |
| 198 | |
| 199 | func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) { |
| 200 | lsw.mu.RLock() |
| 201 | defer lsw.mu.RUnlock() |
Menghan Li | 5af6040 | 2020-10-07 10:39:58 -0700 | [diff] [blame] | 202 | lsw.perCluster.CallServerLoad(locality, name, val) |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 203 | } |
| 204 | |
| 205 | func (lsw *loadStoreWrapper) CallDropped(category string) { |
| 206 | lsw.mu.RLock() |
| 207 | defer lsw.mu.RUnlock() |
Menghan Li | 5af6040 | 2020-10-07 10:39:58 -0700 | [diff] [blame] | 208 | lsw.perCluster.CallDropped(category) |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 209 | } |
| 210 | |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 211 | type xdsClientWrapper struct { |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 212 | c xdsClientInterface |
| 213 | cancelLoadReport func() |
| 214 | clusterName string |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 215 | edsServiceName string |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 216 | lrsServerName string |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 217 | // loadWrapper is a wrapper with loadOriginal, with clusterName and |
| 218 | // edsServiceName. It's used children to report loads. |
| 219 | loadWrapper *loadStoreWrapper |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 220 | } |
| 221 | |
| 222 | func newXDSClientWrapper() *xdsClientWrapper { |
| 223 | return &xdsClientWrapper{ |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 224 | loadWrapper: &loadStoreWrapper{}, |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 225 | } |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 226 | } |
| 227 | |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 228 | // update checks the config and xdsclient, and decides whether it needs to |
| 229 | // restart the load reporting stream. |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 230 | func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) error { |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 231 | var ( |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 232 | restartLoadReport bool |
| 233 | updateLoadClusterAndService bool |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 234 | ) |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 235 | |
| 236 | if attr == nil { |
| 237 | return fmt.Errorf("lrs: failed to get xdsClient from attributes: attributes is nil") |
| 238 | } |
| 239 | clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface) |
| 240 | if clientFromAttr == nil { |
| 241 | return fmt.Errorf("lrs: failed to get xdsClient from attributes: xdsClient not found in attributes") |
| 242 | } |
| 243 | |
| 244 | if w.c != clientFromAttr { |
| 245 | // xds client is different, restart. |
| 246 | restartLoadReport = true |
| 247 | w.c = clientFromAttr |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 248 | } |
| 249 | |
| 250 | // ClusterName is different, restart. ClusterName is from ClusterName and |
| 251 | // EdsServiceName. |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 252 | if w.clusterName != newConfig.ClusterName { |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 253 | updateLoadClusterAndService = true |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 254 | w.clusterName = newConfig.ClusterName |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 255 | } |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 256 | if w.edsServiceName != newConfig.EdsServiceName { |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 257 | updateLoadClusterAndService = true |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 258 | w.edsServiceName = newConfig.EdsServiceName |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 259 | } |
| 260 | |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 261 | if updateLoadClusterAndService { |
| 262 | // This updates the clusterName and serviceName that will reported for the |
| 263 | // loads. The update here is too early, the perfect timing is when the |
| 264 | // picker is updated with the new connection. But from this balancer's point |
| 265 | // of view, it's impossible to tell. |
| 266 | // |
| 267 | // On the other hand, this will almost never happen. Each LRS policy |
| 268 | // shouldn't get updated config. The parent should do a graceful switch when |
| 269 | // the clusterName or serviceName is changed. |
| 270 | w.loadWrapper.updateClusterAndService(w.clusterName, w.edsServiceName) |
| 271 | } |
| 272 | |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 273 | if w.lrsServerName != newConfig.LrsLoadReportingServerName { |
| 274 | // LrsLoadReportingServerName is different, load should be report to a |
| 275 | // different server, restart. |
| 276 | restartLoadReport = true |
| 277 | w.lrsServerName = newConfig.LrsLoadReportingServerName |
| 278 | } |
| 279 | |
| 280 | if restartLoadReport { |
| 281 | if w.cancelLoadReport != nil { |
| 282 | w.cancelLoadReport() |
| 283 | w.cancelLoadReport = nil |
| 284 | } |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 285 | var loadStore *load.Store |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 286 | if w.c != nil { |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 287 | loadStore, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName) |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 288 | } |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 289 | w.loadWrapper.updateLoadStore(loadStore) |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 290 | } |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 291 | |
| 292 | return nil |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 293 | } |
| 294 | |
Menghan Li | 0dc9986 | 2020-09-22 14:26:20 -0700 | [diff] [blame] | 295 | func (w *xdsClientWrapper) loadStore() load.PerClusterReporter { |
Menghan Li | a223251 | 2020-10-26 15:47:47 -0700 | [diff] [blame] | 296 | return w.loadWrapper |
Easwar Swaminathan | b5802b5 | 2020-09-01 16:56:52 -0700 | [diff] [blame] | 297 | } |
| 298 | |
Menghan Li | 1605756 | 2020-08-13 11:47:23 -0700 | [diff] [blame] | 299 | func (w *xdsClientWrapper) close() { |
| 300 | if w.cancelLoadReport != nil { |
| 301 | w.cancelLoadReport() |
| 302 | w.cancelLoadReport = nil |
| 303 | } |
| 304 | } |