pubsub: support retry policy
This adds support for RetryPolicy, a new pub/sub service level feature
that allows adding delays between message delivery retries.
Fixes #1942
Change-Id: Icf4baa8c9ddf5c029ff5aea80616890f4ec3a0e9
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/55510
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Cody Oss <codyoss@google.com>
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index 180f883..7b9257d 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -1538,6 +1538,73 @@
opts := withGRPCHeadersAssertion(t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
client := integrationTestClient(ctx, t, opts...)
defer client.Close()
+ topic, err := client.CreateTopic(ctx, topicIDs.New())
+ if err != nil {
+ t.Fatalf("CreateTopic error: %v", err)
+ }
+ defer topic.Delete(ctx)
+ defer topic.Stop()
+ cfg := SubscriptionConfig{
+ Topic: topic,
+ Filter: "attributes.event_type = \"1\"",
+ }
+ var sub *Subscription
+ if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil {
+ t.Fatalf("CreateSub error: %v", err)
+ }
+ defer sub.Delete(ctx)
+ got, err := sub.Config(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ want := SubscriptionConfig{
+ Topic: topic,
+ AckDeadline: 10 * time.Second,
+ RetainAckedMessages: false,
+ RetentionDuration: defaultRetentionDuration,
+ ExpirationPolicy: defaultExpirationPolicy,
+ Filter: "attributes.event_type = \"1\"",
+ }
+ if diff := testutil.Diff(got, want); diff != "" {
+ t.Fatalf("SubsciptionConfig; got: - want: +\n%s", diff)
+ }
+ attrs := make(map[string]string)
+ attrs["event_type"] = "1"
+ res := topic.Publish(ctx, &Message{
+ Data: []byte("hello world"),
+ Attributes: attrs,
+ })
+ if _, err := res.Get(ctx); err != nil {
+ t.Fatalf("Publish message error: %v", err)
+ }
+ // Publish the same message with a different event_type
+ // and check it is filtered out.
+ attrs["event_type"] = "2"
+ res = topic.Publish(ctx, &Message{
+ Data: []byte("hello world"),
+ Attributes: attrs,
+ })
+ if _, err := res.Get(ctx); err != nil {
+ t.Fatalf("Publish message error: %v", err)
+ }
+ ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
+ defer cancel()
+ err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
+ defer m.Ack()
+ if m.Attributes["event_type"] != "1" {
+ t.Fatalf("Got message with attributes that should be filtered out: %v", m.Attributes)
+ }
+ })
+ if err != nil {
+ t.Fatalf("Streaming pull error: %v\n", err)
+ }
+}
+
+func TestIntegration_RetryPolicy(t *testing.T) {
+ t.Parallel()
+ ctx := context.Background()
+ client := integrationTestClient(ctx, t)
+ defer client.Close()
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
@@ -1547,10 +1614,12 @@
defer topic.Stop()
cfg := SubscriptionConfig{
- Topic: topic,
- Filter: "attributes.event_type = \"1\"",
+ Topic: topic,
+ RetryPolicy: &RetryPolicy{
+ MinimumBackoff: 20 * time.Second,
+ MaximumBackoff: 500 * time.Second,
+ },
}
-
var sub *Subscription
if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil {
t.Fatalf("CreateSub error: %v", err)
@@ -1567,42 +1636,30 @@
RetainAckedMessages: false,
RetentionDuration: defaultRetentionDuration,
ExpirationPolicy: defaultExpirationPolicy,
- Filter: "attributes.event_type = \"1\"",
+ RetryPolicy: &RetryPolicy{
+ MinimumBackoff: 20 * time.Second,
+ MaximumBackoff: 500 * time.Second,
+ },
}
if diff := testutil.Diff(got, want); diff != "" {
- t.Fatalf("SubsciptionConfig; got: - want: +\n%s", diff)
+ t.Fatalf("\ngot: - want: +\n%s", diff)
}
- attrs := make(map[string]string)
- attrs["event_type"] = "1"
- res := topic.Publish(ctx, &Message{
- Data: []byte("hello world"),
- Attributes: attrs,
- })
- if _, err := res.Get(ctx); err != nil {
- t.Fatalf("Publish message error: %v", err)
+ // Test clearing the RetryPolicy
+ cfgToUpdate := SubscriptionConfigToUpdate{
+ RetryPolicy: &RetryPolicy{},
}
-
- // Publish the same message with a different event_type
- // and check it is filtered out.
- attrs["event_type"] = "2"
- res = topic.Publish(ctx, &Message{
- Data: []byte("hello world"),
- Attributes: attrs,
- })
- if _, err := res.Get(ctx); err != nil {
- t.Fatalf("Publish message error: %v", err)
- }
-
- ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
- err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
- defer m.Ack()
- if m.Attributes["event_type"] != "1" {
- t.Fatalf("Got message with attributes that should be filtered out: %v", m.Attributes)
- }
- })
+ _, err = sub.Update(ctx, cfgToUpdate)
if err != nil {
- t.Fatalf("Streaming pull error: %v\n", err)
+ t.Fatalf("got error while updating sub: %v", err)
+ }
+
+ got, err = sub.Config(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ want.RetryPolicy = nil
+ if diff := testutil.Diff(got, want); diff != "" {
+ t.Fatalf("\ngot: - want: +\n%s", diff)
}
}
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index 5cba018..c9ac22f 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -243,6 +243,9 @@
// accessible to all users. This field is subject to change or removal
// without notice.
Filter string
+
+ // RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
+ RetryPolicy *RetryPolicy
}
func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
@@ -258,6 +261,10 @@
if cfg.DeadLetterPolicy != nil {
pbDeadLetter = cfg.DeadLetterPolicy.toProto()
}
+ var pbRetryPolicy *pb.RetryPolicy
+ if cfg.RetryPolicy != nil {
+ pbRetryPolicy = cfg.RetryPolicy.toProto()
+ }
return &pb.Subscription{
Name: name,
Topic: cfg.Topic.name,
@@ -270,6 +277,7 @@
EnableMessageOrdering: cfg.EnableMessageOrdering,
DeadLetterPolicy: pbDeadLetter,
Filter: cfg.Filter,
+ RetryPolicy: pbRetryPolicy,
}
}
@@ -290,6 +298,7 @@
}
}
dlp := protoToDLP(pbSub.DeadLetterPolicy)
+ rp := protoToRetryPolicy(pbSub.RetryPolicy)
subC := SubscriptionConfig{
Topic: newTopic(c, pbSub.Topic),
AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
@@ -299,6 +308,7 @@
ExpirationPolicy: expirationPolicy,
DeadLetterPolicy: dlp,
Filter: pbSub.Filter,
+ RetryPolicy: rp,
}
pc := protoToPushConfig(pbSub.PushConfig)
if pc != nil {
@@ -352,6 +362,87 @@
}
}
+// RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
+//
+// Retry delay will be exponential based on provided minimum and maximum
+// backoffs. https://en.wikipedia.org/wiki/Exponential_backoff.
+//
+// RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded
+// events for a given message.
+//
+// Retry Policy is implemented on a best effort basis. At times, the delay
+// between consecutive deliveries may not match the configuration. That is,
+// delay can be more or less than configured backoff.
+type RetryPolicy struct {
+ // MinimumBackoff is the minimum delay between consecutive deliveries of a
+ // given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds.
+ MinimumBackoff optional.Duration
+ // MaximumBackoff is the maximum delay between consecutive deliveries of a
+ // given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds.
+ MaximumBackoff optional.Duration
+}
+
+func (rp *RetryPolicy) toProto() *pb.RetryPolicy {
+ if rp == nil {
+ return nil
+ }
+ // If RetryPolicy is the empty struct, take this as an instruction
+ // to remove RetryPolicy from the subscription.
+ if rp.MinimumBackoff == nil && rp.MaximumBackoff == nil {
+ return nil
+ }
+
+ // Initialize minDur and maxDur to be negative, such that if the conversion from an
+ // optional fails, RetryPolicy won't be updated in the proto as it will remain nil.
+ var minDur time.Duration = -1
+ var maxDur time.Duration = -1
+ if rp.MinimumBackoff != nil {
+ minDur = optional.ToDuration(rp.MinimumBackoff)
+ }
+ if rp.MaximumBackoff != nil {
+ maxDur = optional.ToDuration(rp.MaximumBackoff)
+ }
+
+ var minDurPB, maxDurPB *durpb.Duration
+ if minDur > 0 {
+ minDurPB = ptypes.DurationProto(minDur)
+ }
+ if maxDur > 0 {
+ maxDurPB = ptypes.DurationProto(maxDur)
+ }
+
+ return &pb.RetryPolicy{
+ MinimumBackoff: minDurPB,
+ MaximumBackoff: maxDurPB,
+ }
+}
+
+func protoToRetryPolicy(rp *pb.RetryPolicy) *RetryPolicy {
+ if rp == nil {
+ return nil
+ }
+ var minBackoff, maxBackoff time.Duration
+ var err error
+ if rp.MinimumBackoff != nil {
+ minBackoff, err = ptypes.Duration(rp.MinimumBackoff)
+ if err != nil {
+ return nil
+ }
+ }
+ if rp.MaximumBackoff != nil {
+ maxBackoff, err = ptypes.Duration(rp.MaximumBackoff)
+ if err != nil {
+ return nil
+ }
+ }
+
+ retryPolicy := &RetryPolicy{
+ MinimumBackoff: minBackoff,
+ MaximumBackoff: maxBackoff,
+ }
+ return retryPolicy
+}
+
// ReceiveSettings configure the Receive method.
// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
type ReceiveSettings struct {
@@ -497,6 +588,11 @@
// This field has beta status. It is not subject to the stability guarantee
// and may change.
Labels map[string]string
+
+ // If non-nil, RetryPolicy is changed. To remove an existing retry policy
+ // (to redeliver messages as soon as possible) use a pointer to the zero value
+ // for this struct.
+ RetryPolicy *RetryPolicy
}
// Update changes an existing subscription according to the fields set in cfg.
@@ -549,6 +645,10 @@
psub.Labels = cfg.Labels
paths = append(paths, "labels")
}
+ if cfg.RetryPolicy != nil {
+ psub.RetryPolicy = cfg.RetryPolicy.toProto()
+ paths = append(paths, "retry_policy")
+ }
return &pb.UpdateSubscriptionRequest{
Subscription: psub,
UpdateMask: &fmpb.FieldMask{Paths: paths},
@@ -569,11 +669,11 @@
if cfg == nil || cfg.ExpirationPolicy == nil {
return nil
}
- policy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy
- if policy == 0 || policy >= min {
- return nil
+ expPolicy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy
+ if expPolicy != 0 && expPolicy < min {
+ return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", expPolicy, min)
}
- return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", policy, min)
+ return nil
}
func expirationPolicyToProto(expirationPolicy optional.Duration) *pb.ExpirationPolicy {
diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go
index 386779e..8fe6710 100644
--- a/pubsub/subscription_test.go
+++ b/pubsub/subscription_test.go
@@ -360,3 +360,18 @@
t.Errorf("toMessage with dead-lettered enabled failed\ngot: %d, want %d", *got.DeliveryAttempt, receivedMsg.DeliveryAttempt)
}
}
+
+func TestRetryPolicy_toProto(t *testing.T) {
+ in := &RetryPolicy{
+ MinimumBackoff: 20 * time.Second,
+ MaximumBackoff: 300 * time.Second,
+ }
+ got := in.toProto()
+ want := &pb.RetryPolicy{
+ MinimumBackoff: ptypes.DurationProto(20 * time.Second),
+ MaximumBackoff: ptypes.DurationProto(300 * time.Second),
+ }
+ if diff := testutil.Diff(got, want); diff != "" {
+ t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
+ }
+}