| /* |
| * |
| * Copyright 2020 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| // Package adaptive provides functionality for adaptive client-side throttling. |
| package adaptive |
| |
| import ( |
| "sync" |
| "time" |
| |
| "google.golang.org/grpc/internal/grpcrand" |
| ) |
| |
| // For overriding in unittests. |
| var ( |
| timeNowFunc = func() time.Time { return time.Now() } |
| randFunc = func() float64 { return grpcrand.Float64() } |
| ) |
| |
| const ( |
| defaultDuration = 30 * time.Second |
| defaultBins = 100 |
| defaultRatioForAccepts = 2.0 |
| defaultRequestsPadding = 8.0 |
| ) |
| |
| // Throttler implements a client-side throttling recommendation system. All |
| // methods are safe for concurrent use by multiple goroutines. |
| // |
| // The throttler has the following knobs for which we will use defaults for |
| // now. If there is a need to make them configurable at a later point in time, |
| // support for the same will be added. |
| // * Duration: amount of recent history that will be taken into account for |
| // making client-side throttling decisions. A default of 30 seconds is used. |
| // * Bins: number of bins to be used for bucketing historical data. A default |
| // of 100 is used. |
| // * RatioForAccepts: ratio by which accepts are multiplied, typically a value |
| // slightly larger than 1.0. This is used to make the throttler behave as if |
| // the backend had accepted more requests than it actually has, which lets us |
| // err on the side of sending to the backend more requests than we think it |
| // will accept for the sake of speeding up the propagation of state. A |
| // default of 2.0 is used. |
| // * RequestsPadding: is used to decrease the (client-side) throttling |
| // probability in the low QPS regime (to speed up propagation of state), as |
| // well as to safeguard against hitting a client-side throttling probability |
| // of 100%. The weight of this value decreases as the number of requests in |
| // recent history grows. A default of 8 is used. |
| // |
| // The adaptive throttler attempts to estimate the probability that a request |
| // will be throttled using recent history. Server requests (both throttled and |
| // accepted) are registered with the throttler (via the RegisterBackendResponse |
| // method), which then recommends client-side throttling (via the |
| // ShouldThrottle method) with probability given by: |
| // (requests - RatioForAccepts * accepts) / (requests + RequestsPadding) |
| type Throttler struct { |
| ratioForAccepts float64 |
| requestsPadding float64 |
| |
| // Number of total accepts and throttles in the lookback period. |
| mu sync.Mutex |
| accepts *lookback |
| throttles *lookback |
| } |
| |
| // New initializes a new adaptive throttler with the default values. |
| func New() *Throttler { |
| return newWithArgs(defaultDuration, defaultBins, defaultRatioForAccepts, defaultRequestsPadding) |
| } |
| |
| // newWithArgs initializes a new adaptive throttler with the provided values. |
| // Used only in unittests. |
| func newWithArgs(duration time.Duration, bins int64, ratioForAccepts, requestsPadding float64) *Throttler { |
| return &Throttler{ |
| ratioForAccepts: ratioForAccepts, |
| requestsPadding: requestsPadding, |
| accepts: newLookback(bins, duration), |
| throttles: newLookback(bins, duration), |
| } |
| } |
| |
| // ShouldThrottle returns a probabilistic estimate of whether the server would |
| // throttle the next request. This should be called for every request before |
| // allowing it to hit the network. If the returned value is true, the request |
| // should be aborted immediately (as if it had been throttled by the server). |
| func (t *Throttler) ShouldThrottle() bool { |
| randomProbability := randFunc() |
| now := timeNowFunc() |
| |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| |
| accepts, throttles := float64(t.accepts.sum(now)), float64(t.throttles.sum(now)) |
| requests := accepts + throttles |
| throttleProbability := (requests - t.ratioForAccepts*accepts) / (requests + t.requestsPadding) |
| if throttleProbability <= randomProbability { |
| return false |
| } |
| |
| t.throttles.add(now, 1) |
| return true |
| } |
| |
| // RegisterBackendResponse registers a response received from the backend for a |
| // request allowed by ShouldThrottle. This should be called for every response |
| // received from the backend (i.e., once for each request for which |
| // ShouldThrottle returned false). |
| func (t *Throttler) RegisterBackendResponse(throttled bool) { |
| now := timeNowFunc() |
| |
| t.mu.Lock() |
| if throttled { |
| t.throttles.add(now, 1) |
| } else { |
| t.accepts.add(now, 1) |
| } |
| t.mu.Unlock() |
| } |