fix(pubsub): retry deadline exceeded errors in Acknowledge (#3157)
* fix(pubsub): retry deadline exceeded errors in Acknowledge
* add comment about gapic handling unavailable
* handle context deadline exceeded errors as well
* add backoff to context deadline exceeded errors
* fix wording of comments (transparent over transient)
diff --git a/pubsub/iterator.go b/pubsub/iterator.go
index 2675d42..49a3742 100644
--- a/pubsub/iterator.go
+++ b/pubsub/iterator.go
@@ -385,12 +385,53 @@
return it.sendAckIDRPC(m, maxPayload-overhead, func(ids []string) error {
recordStat(it.ctx, AckCount, int64(len(ids)))
addAcks(ids)
- // Use context.Background() as the call's context, not it.ctx. We don't
- // want to cancel this RPC when the iterator is stopped.
- return it.subc.Acknowledge(context.Background(), &pb.AcknowledgeRequest{
- Subscription: it.subName,
- AckIds: ids,
- })
+ bo := gax.Backoff{
+ Initial: 100 * time.Millisecond,
+ Max: time.Second,
+ Multiplier: 2,
+ }
+ cctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
+ defer cancel()
+ for {
+ // Use context.Background() as the call's context, not it.ctx. We don't
+ // want to cancel this RPC when the iterator is stopped.
+ cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
+ defer cancel2()
+ err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{
+ Subscription: it.subName,
+ AckIds: ids,
+ })
+ // Retry DeadlineExceeded errors a few times before giving up and
+ // allowing the message to expire and be redelivered.
+ // The underlying library handles other retries, currently only
+ // codes.Unavailable.
+ switch status.Code(err) {
+ case codes.DeadlineExceeded:
+ // Use the outer context with timeout here. Errors from gax, including
+ // context deadline exceeded should be transparent, as unacked messages
+ // will be redelivered.
+ if err := gax.Sleep(cctx, bo.Pause()); err != nil {
+ return nil
+ }
+ default:
+ if err == nil {
+ return nil
+ }
+ // This addresses an error where `context deadline exceeded` errors
+ // not captured by the previous case causes fatal errors.
+ // See https://github.com/googleapis/google-cloud-go/issues/3060
+ if strings.Contains(err.Error(), "context deadline exceeded") {
+ // Context deadline exceeded errors here should be transparent
+ // to prevent the iterator from shutting down.
+ if err := gax.Sleep(cctx, bo.Pause()); err != nil {
+ return nil
+ }
+ continue
+ }
+ // Any other error is fatal.
+ return err
+ }
+ }
})
}
@@ -443,7 +484,7 @@
return nil
}
// This addresses an error where `context deadline exceeded` errors
- // not captured by the previous case do not cause fatal errors.
+ // not captured by the previous case causes fatal errors.
// See https://github.com/googleapis/google-cloud-go/issues/3060
if strings.Contains(err.Error(), "context deadline exceeded") {
recordStat(it.ctx, ModAckTimeoutCount, 1)