| /* |
| * |
| * Copyright 2020 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package rls |
| |
| import ( |
| "errors" |
| "time" |
| |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/balancer/rls/internal/cache" |
| "google.golang.org/grpc/balancer/rls/internal/keys" |
| "google.golang.org/grpc/metadata" |
| ) |
| |
| var errRLSThrottled = errors.New("RLS call throttled at client side") |
| |
| // RLS rlsPicker selects the subConn to be used for a particular RPC. It does |
| // not manage subConns directly and usually deletegates to pickers provided by |
| // child policies. |
| // |
| // The RLS LB policy creates a new rlsPicker object whenever its ServiceConfig |
| // is updated and provides a bunch of hooks for the rlsPicker to get the latest |
| // state that it can used to make its decision. |
| type rlsPicker struct { |
| // The keyBuilder map used to generate RLS keys for the RPC. This is built |
| // by the LB policy based on the received ServiceConfig. |
| kbm keys.BuilderMap |
| |
| // The following hooks are setup by the LB policy to enable the rlsPicker to |
| // access state stored in the policy. This approach has the following |
| // advantages: |
| // 1. The rlsPicker is loosely coupled with the LB policy in the sense that |
| // updates happening on the LB policy like the receipt of an RLS |
| // response, or an update to the default rlsPicker etc are not explicitly |
| // pushed to the rlsPicker, but are readily available to the rlsPicker |
| // when it invokes these hooks. And the LB policy takes care of |
| // synchronizing access to these shared state. |
| // 2. It makes unit testing the rlsPicker easy since any number of these |
| // hooks could be overridden. |
| |
| // readCache is used to read from the data cache and the pending request |
| // map in an atomic fashion. The first return parameter is the entry in the |
| // data cache, and the second indicates whether an entry for the same key |
| // is present in the pending cache. |
| readCache func(cache.Key) (*cache.Entry, bool) |
| // shouldThrottle decides if the current RPC should be throttled at the |
| // client side. It uses an adaptive throttling algorithm. |
| shouldThrottle func() bool |
| // startRLS kicks off an RLS request in the background for the provided RPC |
| // path and keyMap. An entry in the pending request map is created before |
| // sending out the request and an entry in the data cache is created or |
| // updated upon receipt of a response. See implementation in the LB policy |
| // for details. |
| startRLS func(string, keys.KeyMap) |
| // defaultPick enables the rlsPicker to delegate the pick decision to the |
| // rlsPicker returned by the child LB policy pointing to the default target |
| // specified in the service config. |
| defaultPick func(balancer.PickInfo) (balancer.PickResult, error) |
| } |
| |
| // Pick makes the routing decision for every outbound RPC. |
| func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { |
| // For every incoming request, we first build the RLS keys using the |
| // keyBuilder we received from the LB policy. If no metadata is present in |
| // the context, we end up using an empty key. |
| km := keys.KeyMap{} |
| md, ok := metadata.FromOutgoingContext(info.Ctx) |
| if ok { |
| km = p.kbm.RLSKey(md, info.FullMethodName) |
| } |
| |
| // We use the LB policy hook to read the data cache and the pending request |
| // map (whether or not an entry exists) for the RPC path and the generated |
| // RLS keys. We will end up kicking off an RLS request only if there is no |
| // pending request for the current RPC path and keys, and either we didn't |
| // find an entry in the data cache or the entry was stale and it wasn't in |
| // backoff. |
| startRequest := false |
| now := time.Now() |
| entry, pending := p.readCache(cache.Key{Path: info.FullMethodName, KeyMap: km.Str}) |
| if entry == nil { |
| startRequest = true |
| } else { |
| entry.Mu.Lock() |
| defer entry.Mu.Unlock() |
| if entry.StaleTime.Before(now) && entry.BackoffTime.Before(now) { |
| // This is the proactive cache refresh. |
| startRequest = true |
| } |
| } |
| |
| if startRequest && !pending { |
| if p.shouldThrottle() { |
| // The entry doesn't exist or has expired and the new RLS request |
| // has been throttled. Treat it as an error and delegate to default |
| // pick, if one exists, or fail the pick. |
| if entry == nil || entry.ExpiryTime.Before(now) { |
| if p.defaultPick != nil { |
| return p.defaultPick(info) |
| } |
| return balancer.PickResult{}, errRLSThrottled |
| } |
| // The proactive refresh has been throttled. Nothing to worry, just |
| // keep using the existing entry. |
| } else { |
| p.startRLS(info.FullMethodName, km) |
| } |
| } |
| |
| if entry != nil { |
| if entry.ExpiryTime.After(now) { |
| // This is the jolly good case where we have found a valid entry in |
| // the data cache. We delegate to the LB policy associated with |
| // this cache entry. |
| return entry.ChildPicker.Pick(info) |
| } else if entry.BackoffTime.After(now) { |
| // The entry has expired, but is in backoff. We delegate to the |
| // default pick, if one exists, or return the error from the last |
| // failed RLS request for this entry. |
| if p.defaultPick != nil { |
| return p.defaultPick(info) |
| } |
| return balancer.PickResult{}, entry.CallStatus |
| } |
| } |
| |
| // We get here only in the following cases: |
| // * No data cache entry or expired entry, RLS request sent out |
| // * No valid data cache entry and Pending cache entry exists |
| // We need to queue to pick which will be handled once the RLS response is |
| // received. |
| return balancer.PickResult{}, balancer.ErrNoSubConnAvailable |
| } |