fix(pubsub): do not propagate context deadline exceeded error (#3055)
diff --git a/pubsub/iterator.go b/pubsub/iterator.go
index 84cc2f0..2675d42 100644
--- a/pubsub/iterator.go
+++ b/pubsub/iterator.go
@@ -17,6 +17,7 @@
import (
"context"
"io"
+ "strings"
"sync"
"time"
@@ -438,6 +439,16 @@
recordStat(it.ctx, ModAckTimeoutCount, 1)
return nil
default:
+ if err == nil {
+ return nil
+ }
+ // This addresses an error where `context deadline exceeded` errors
+ // not captured by the previous case do not cause fatal errors.
+ // See https://github.com/googleapis/google-cloud-go/issues/3060
+ if strings.Contains(err.Error(), "context deadline exceeded") {
+ recordStat(it.ctx, ModAckTimeoutCount, 1)
+ return nil
+ }
// Any other error is fatal.
return err
}
diff --git a/pubsub/iterator_test.go b/pubsub/iterator_test.go
index c90b5fd..68ec8c1 100644
--- a/pubsub/iterator_test.go
+++ b/pubsub/iterator_test.go
@@ -366,3 +366,35 @@
}
return u
}
+
+func TestIterator_ModifyAckContextDeadline(t *testing.T) {
+ // Test that all context deadline exceeded errors in ModAckDeadline
+ // are not propagated to the client.
+ opts := []pstest.ServerReactorOption{
+ pstest.WithErrorInjection("ModifyAckDeadline", codes.Unknown, "context deadline exceeded"),
+ }
+ srv := pstest.NewServer(opts...)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
+ s, client, err := initConn(ctx, srv.Addr)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ srv.Publish(fullyQualifiedTopicName, []byte("some-message"), nil)
+ cctx, cancel := context.WithTimeout(ctx, time.Duration(5*time.Second))
+ defer cancel()
+ err = s.Receive(cctx, func(ctx context.Context, m *Message) {
+ m.Ack()
+ })
+ if err != nil {
+ t.Fatalf("Got error in Receive: %v", err)
+ }
+
+ err = client.Close()
+ if err != nil {
+ t.Fatal(err)
+ }
+}