| package events |
| |
| import ( |
| "fmt" |
| "math/rand" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| "github.com/sirupsen/logrus" |
| ) |
| |
| // RetryingSink retries the write until success or an ErrSinkClosed is |
| // returned. Underlying sink must have p > 0 of succeeding or the sink will |
| // block. Retry is configured with a RetryStrategy. Concurrent calls to a |
| // retrying sink are serialized through the sink, meaning that if one is |
| // in-flight, another will not proceed. |
| type RetryingSink struct { |
| sink Sink |
| strategy RetryStrategy |
| closed chan struct{} |
| once sync.Once |
| } |
| |
| // NewRetryingSink returns a sink that will retry writes to a sink, backing |
| // off on failure. Parameters threshold and backoff adjust the behavior of the |
| // circuit breaker. |
| func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink { |
| rs := &RetryingSink{ |
| sink: sink, |
| strategy: strategy, |
| closed: make(chan struct{}), |
| } |
| |
| return rs |
| } |
| |
| // Write attempts to flush the events to the downstream sink until it succeeds |
| // or the sink is closed. |
| func (rs *RetryingSink) Write(event Event) error { |
| logger := logrus.WithField("event", event) |
| |
| retry: |
| select { |
| case <-rs.closed: |
| return ErrSinkClosed |
| default: |
| } |
| |
| if backoff := rs.strategy.Proceed(event); backoff > 0 { |
| select { |
| case <-time.After(backoff): |
| // TODO(stevvooe): This branch holds up the next try. Before, we |
| // would simply break to the "retry" label and then possibly wait |
| // again. However, this requires all retry strategies to have a |
| // large probability of probing the sync for success, rather than |
| // just backing off and sending the request. |
| case <-rs.closed: |
| return ErrSinkClosed |
| } |
| } |
| |
| if err := rs.sink.Write(event); err != nil { |
| if err == ErrSinkClosed { |
| // terminal! |
| return err |
| } |
| |
| logger := logger.WithError(err) // shadow!! |
| |
| if rs.strategy.Failure(event, err) { |
| logger.Errorf("retryingsink: dropped event") |
| return nil |
| } |
| |
| logger.Errorf("retryingsink: error writing event, retrying") |
| goto retry |
| } |
| |
| rs.strategy.Success(event) |
| return nil |
| } |
| |
| // Close closes the sink and the underlying sink. |
| func (rs *RetryingSink) Close() error { |
| rs.once.Do(func() { |
| close(rs.closed) |
| }) |
| |
| return nil |
| } |
| |
| func (rs *RetryingSink) String() string { |
| // Serialize a copy of the RetryingSink without the sync.Once, to avoid |
| // a data race. |
| rs2 := map[string]interface{}{ |
| "sink": rs.sink, |
| "strategy": rs.strategy, |
| "closed": rs.closed, |
| } |
| return fmt.Sprint(rs2) |
| } |
| |
| // RetryStrategy defines a strategy for retrying event sink writes. |
| // |
| // All methods should be goroutine safe. |
| type RetryStrategy interface { |
| // Proceed is called before every event send. If proceed returns a |
| // positive, non-zero integer, the retryer will back off by the provided |
| // duration. |
| // |
| // An event is provided, by may be ignored. |
| Proceed(event Event) time.Duration |
| |
| // Failure reports a failure to the strategy. If this method returns true, |
| // the event should be dropped. |
| Failure(event Event, err error) bool |
| |
| // Success should be called when an event is sent successfully. |
| Success(event Event) |
| } |
| |
| // Breaker implements a circuit breaker retry strategy. |
| // |
| // The current implementation never drops events. |
| type Breaker struct { |
| threshold int |
| recent int |
| last time.Time |
| backoff time.Duration // time after which we retry after failure. |
| mu sync.Mutex |
| } |
| |
| var _ RetryStrategy = &Breaker{} |
| |
| // NewBreaker returns a breaker that will backoff after the threshold has been |
| // tripped. A Breaker is thread safe and may be shared by many goroutines. |
| func NewBreaker(threshold int, backoff time.Duration) *Breaker { |
| return &Breaker{ |
| threshold: threshold, |
| backoff: backoff, |
| } |
| } |
| |
| // Proceed checks the failures against the threshold. |
| func (b *Breaker) Proceed(event Event) time.Duration { |
| b.mu.Lock() |
| defer b.mu.Unlock() |
| |
| if b.recent < b.threshold { |
| return 0 |
| } |
| |
| return b.last.Add(b.backoff).Sub(time.Now()) |
| } |
| |
| // Success resets the breaker. |
| func (b *Breaker) Success(event Event) { |
| b.mu.Lock() |
| defer b.mu.Unlock() |
| |
| b.recent = 0 |
| b.last = time.Time{} |
| } |
| |
| // Failure records the failure and latest failure time. |
| func (b *Breaker) Failure(event Event, err error) bool { |
| b.mu.Lock() |
| defer b.mu.Unlock() |
| |
| b.recent++ |
| b.last = time.Now().UTC() |
| return false // never drop events. |
| } |
| |
| var ( |
| // DefaultExponentialBackoffConfig provides a default configuration for |
| // exponential backoff. |
| DefaultExponentialBackoffConfig = ExponentialBackoffConfig{ |
| Base: time.Second, |
| Factor: time.Second, |
| Max: 20 * time.Second, |
| } |
| ) |
| |
| // ExponentialBackoffConfig configures backoff parameters. |
| // |
| // Note that these parameters operate on the upper bound for choosing a random |
| // value. For example, at Base=1s, a random value in [0,1s) will be chosen for |
| // the backoff value. |
| type ExponentialBackoffConfig struct { |
| // Base is the minimum bound for backing off after failure. |
| Base time.Duration |
| |
| // Factor sets the amount of time by which the backoff grows with each |
| // failure. |
| Factor time.Duration |
| |
| // Max is the absolute maxiumum bound for a single backoff. |
| Max time.Duration |
| } |
| |
| // ExponentialBackoff implements random backoff with exponentially increasing |
| // bounds as the number consecutive failures increase. |
| type ExponentialBackoff struct { |
| failures uint64 // consecutive failure counter (needs to be 64-bit aligned) |
| config ExponentialBackoffConfig |
| } |
| |
| // NewExponentialBackoff returns an exponential backoff strategy with the |
| // desired config. If config is nil, the default is returned. |
| func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff { |
| return &ExponentialBackoff{ |
| config: config, |
| } |
| } |
| |
| // Proceed returns the next randomly bound exponential backoff time. |
| func (b *ExponentialBackoff) Proceed(event Event) time.Duration { |
| return b.backoff(atomic.LoadUint64(&b.failures)) |
| } |
| |
| // Success resets the failures counter. |
| func (b *ExponentialBackoff) Success(event Event) { |
| atomic.StoreUint64(&b.failures, 0) |
| } |
| |
| // Failure increments the failure counter. |
| func (b *ExponentialBackoff) Failure(event Event, err error) bool { |
| atomic.AddUint64(&b.failures, 1) |
| return false |
| } |
| |
| // backoff calculates the amount of time to wait based on the number of |
| // consecutive failures. |
| func (b *ExponentialBackoff) backoff(failures uint64) time.Duration { |
| if failures <= 0 { |
| // proceed normally when there are no failures. |
| return 0 |
| } |
| |
| factor := b.config.Factor |
| if factor <= 0 { |
| factor = DefaultExponentialBackoffConfig.Factor |
| } |
| |
| backoff := b.config.Base + factor*time.Duration(1<<(failures-1)) |
| |
| max := b.config.Max |
| if max <= 0 { |
| max = DefaultExponentialBackoffConfig.Max |
| } |
| |
| if backoff > max || backoff < 0 { |
| backoff = max |
| } |
| |
| // Choose a uniformly distributed value from [0, backoff). |
| return time.Duration(rand.Int63n(int64(backoff))) |
| } |