pubsub: support retry policy

This adds support for RetryPolicy, a new pub/sub service level feature
that allows adding delays between message delivery retries.

Fixes #1942

Change-Id: Icf4baa8c9ddf5c029ff5aea80616890f4ec3a0e9
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/55510
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Cody Oss <codyoss@google.com>
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index 180f883..7b9257d 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -1538,6 +1538,73 @@
 	opts := withGRPCHeadersAssertion(t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
 	client := integrationTestClient(ctx, t, opts...)
 	defer client.Close()
+	topic, err := client.CreateTopic(ctx, topicIDs.New())
+	if err != nil {
+		t.Fatalf("CreateTopic error: %v", err)
+	}
+	defer topic.Delete(ctx)
+	defer topic.Stop()
+	cfg := SubscriptionConfig{
+		Topic:  topic,
+		Filter: "attributes.event_type = \"1\"",
+	}
+	var sub *Subscription
+	if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil {
+		t.Fatalf("CreateSub error: %v", err)
+	}
+	defer sub.Delete(ctx)
+	got, err := sub.Config(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	want := SubscriptionConfig{
+		Topic:               topic,
+		AckDeadline:         10 * time.Second,
+		RetainAckedMessages: false,
+		RetentionDuration:   defaultRetentionDuration,
+		ExpirationPolicy:    defaultExpirationPolicy,
+		Filter:              "attributes.event_type = \"1\"",
+	}
+	if diff := testutil.Diff(got, want); diff != "" {
+		t.Fatalf("SubsciptionConfig; got: - want: +\n%s", diff)
+	}
+	attrs := make(map[string]string)
+	attrs["event_type"] = "1"
+	res := topic.Publish(ctx, &Message{
+		Data:       []byte("hello world"),
+		Attributes: attrs,
+	})
+	if _, err := res.Get(ctx); err != nil {
+		t.Fatalf("Publish message error: %v", err)
+	}
+	// Publish the same message with a different event_type
+	// and check it is filtered out.
+	attrs["event_type"] = "2"
+	res = topic.Publish(ctx, &Message{
+		Data:       []byte("hello world"),
+		Attributes: attrs,
+	})
+	if _, err := res.Get(ctx); err != nil {
+		t.Fatalf("Publish message error: %v", err)
+	}
+	ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
+	defer cancel()
+	err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
+		defer m.Ack()
+		if m.Attributes["event_type"] != "1" {
+			t.Fatalf("Got message with attributes that should be filtered out: %v", m.Attributes)
+		}
+	})
+	if err != nil {
+		t.Fatalf("Streaming pull error: %v\n", err)
+	}
+}
+
+func TestIntegration_RetryPolicy(t *testing.T) {
+	t.Parallel()
+	ctx := context.Background()
+	client := integrationTestClient(ctx, t)
+	defer client.Close()
 
 	topic, err := client.CreateTopic(ctx, topicIDs.New())
 	if err != nil {
@@ -1547,10 +1614,12 @@
 	defer topic.Stop()
 
 	cfg := SubscriptionConfig{
-		Topic:  topic,
-		Filter: "attributes.event_type = \"1\"",
+		Topic: topic,
+		RetryPolicy: &RetryPolicy{
+			MinimumBackoff: 20 * time.Second,
+			MaximumBackoff: 500 * time.Second,
+		},
 	}
-
 	var sub *Subscription
 	if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil {
 		t.Fatalf("CreateSub error: %v", err)
@@ -1567,42 +1636,30 @@
 		RetainAckedMessages: false,
 		RetentionDuration:   defaultRetentionDuration,
 		ExpirationPolicy:    defaultExpirationPolicy,
-		Filter:              "attributes.event_type = \"1\"",
+		RetryPolicy: &RetryPolicy{
+			MinimumBackoff: 20 * time.Second,
+			MaximumBackoff: 500 * time.Second,
+		},
 	}
 	if diff := testutil.Diff(got, want); diff != "" {
-		t.Fatalf("SubsciptionConfig; got: - want: +\n%s", diff)
+		t.Fatalf("\ngot: - want: +\n%s", diff)
 	}
 
-	attrs := make(map[string]string)
-	attrs["event_type"] = "1"
-	res := topic.Publish(ctx, &Message{
-		Data:       []byte("hello world"),
-		Attributes: attrs,
-	})
-	if _, err := res.Get(ctx); err != nil {
-		t.Fatalf("Publish message error: %v", err)
+	// Test clearing the RetryPolicy
+	cfgToUpdate := SubscriptionConfigToUpdate{
+		RetryPolicy: &RetryPolicy{},
 	}
-
-	// Publish the same message with a different event_type
-	// and check it is filtered out.
-	attrs["event_type"] = "2"
-	res = topic.Publish(ctx, &Message{
-		Data:       []byte("hello world"),
-		Attributes: attrs,
-	})
-	if _, err := res.Get(ctx); err != nil {
-		t.Fatalf("Publish message error: %v", err)
-	}
-
-	ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
-	defer cancel()
-	err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
-		defer m.Ack()
-		if m.Attributes["event_type"] != "1" {
-			t.Fatalf("Got message with attributes that should be filtered out: %v", m.Attributes)
-		}
-	})
+	_, err = sub.Update(ctx, cfgToUpdate)
 	if err != nil {
-		t.Fatalf("Streaming pull error: %v\n", err)
+		t.Fatalf("got error while updating sub: %v", err)
+	}
+
+	got, err = sub.Config(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	want.RetryPolicy = nil
+	if diff := testutil.Diff(got, want); diff != "" {
+		t.Fatalf("\ngot: - want: +\n%s", diff)
 	}
 }
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index 5cba018..c9ac22f 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -243,6 +243,9 @@
 	// accessible to all users. This field is subject to change or removal
 	// without notice.
 	Filter string
+
+	// RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
+	RetryPolicy *RetryPolicy
 }
 
 func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
@@ -258,6 +261,10 @@
 	if cfg.DeadLetterPolicy != nil {
 		pbDeadLetter = cfg.DeadLetterPolicy.toProto()
 	}
+	var pbRetryPolicy *pb.RetryPolicy
+	if cfg.RetryPolicy != nil {
+		pbRetryPolicy = cfg.RetryPolicy.toProto()
+	}
 	return &pb.Subscription{
 		Name:                     name,
 		Topic:                    cfg.Topic.name,
@@ -270,6 +277,7 @@
 		EnableMessageOrdering:    cfg.EnableMessageOrdering,
 		DeadLetterPolicy:         pbDeadLetter,
 		Filter:                   cfg.Filter,
+		RetryPolicy:              pbRetryPolicy,
 	}
 }
 
@@ -290,6 +298,7 @@
 		}
 	}
 	dlp := protoToDLP(pbSub.DeadLetterPolicy)
+	rp := protoToRetryPolicy(pbSub.RetryPolicy)
 	subC := SubscriptionConfig{
 		Topic:               newTopic(c, pbSub.Topic),
 		AckDeadline:         time.Second * time.Duration(pbSub.AckDeadlineSeconds),
@@ -299,6 +308,7 @@
 		ExpirationPolicy:    expirationPolicy,
 		DeadLetterPolicy:    dlp,
 		Filter:              pbSub.Filter,
+		RetryPolicy:         rp,
 	}
 	pc := protoToPushConfig(pbSub.PushConfig)
 	if pc != nil {
@@ -352,6 +362,87 @@
 	}
 }
 
+// RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
+//
+// Retry delay will be exponential based on provided minimum and maximum
+// backoffs. https://en.wikipedia.org/wiki/Exponential_backoff.
+//
+// RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded
+// events for a given message.
+//
+// Retry Policy is implemented on a best effort basis. At times, the delay
+// between consecutive deliveries may not match the configuration. That is,
+// delay can be more or less than configured backoff.
+type RetryPolicy struct {
+	// MinimumBackoff is the minimum delay between consecutive deliveries of a
+	// given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds.
+	MinimumBackoff optional.Duration
+	// MaximumBackoff is the maximum delay between consecutive deliveries of a
+	// given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds.
+	MaximumBackoff optional.Duration
+}
+
+func (rp *RetryPolicy) toProto() *pb.RetryPolicy {
+	if rp == nil {
+		return nil
+	}
+	// If RetryPolicy is the empty struct, take this as an instruction
+	// to remove RetryPolicy from the subscription.
+	if rp.MinimumBackoff == nil && rp.MaximumBackoff == nil {
+		return nil
+	}
+
+	// Initialize minDur and maxDur to be negative, such that if the conversion from an
+	// optional fails, RetryPolicy won't be updated in the proto as it will remain nil.
+	var minDur time.Duration = -1
+	var maxDur time.Duration = -1
+	if rp.MinimumBackoff != nil {
+		minDur = optional.ToDuration(rp.MinimumBackoff)
+	}
+	if rp.MaximumBackoff != nil {
+		maxDur = optional.ToDuration(rp.MaximumBackoff)
+	}
+
+	var minDurPB, maxDurPB *durpb.Duration
+	if minDur > 0 {
+		minDurPB = ptypes.DurationProto(minDur)
+	}
+	if maxDur > 0 {
+		maxDurPB = ptypes.DurationProto(maxDur)
+	}
+
+	return &pb.RetryPolicy{
+		MinimumBackoff: minDurPB,
+		MaximumBackoff: maxDurPB,
+	}
+}
+
+func protoToRetryPolicy(rp *pb.RetryPolicy) *RetryPolicy {
+	if rp == nil {
+		return nil
+	}
+	var minBackoff, maxBackoff time.Duration
+	var err error
+	if rp.MinimumBackoff != nil {
+		minBackoff, err = ptypes.Duration(rp.MinimumBackoff)
+		if err != nil {
+			return nil
+		}
+	}
+	if rp.MaximumBackoff != nil {
+		maxBackoff, err = ptypes.Duration(rp.MaximumBackoff)
+		if err != nil {
+			return nil
+		}
+	}
+
+	retryPolicy := &RetryPolicy{
+		MinimumBackoff: minBackoff,
+		MaximumBackoff: maxBackoff,
+	}
+	return retryPolicy
+}
+
 // ReceiveSettings configure the Receive method.
 // A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
 type ReceiveSettings struct {
@@ -497,6 +588,11 @@
 	// This field has beta status. It is not subject to the stability guarantee
 	// and may change.
 	Labels map[string]string
+
+	// If non-nil, RetryPolicy is changed. To remove an existing retry policy
+	// (to redeliver messages as soon as possible) use a pointer to the zero value
+	// for this struct.
+	RetryPolicy *RetryPolicy
 }
 
 // Update changes an existing subscription according to the fields set in cfg.
@@ -549,6 +645,10 @@
 		psub.Labels = cfg.Labels
 		paths = append(paths, "labels")
 	}
+	if cfg.RetryPolicy != nil {
+		psub.RetryPolicy = cfg.RetryPolicy.toProto()
+		paths = append(paths, "retry_policy")
+	}
 	return &pb.UpdateSubscriptionRequest{
 		Subscription: psub,
 		UpdateMask:   &fmpb.FieldMask{Paths: paths},
@@ -569,11 +669,11 @@
 	if cfg == nil || cfg.ExpirationPolicy == nil {
 		return nil
 	}
-	policy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy
-	if policy == 0 || policy >= min {
-		return nil
+	expPolicy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy
+	if expPolicy != 0 && expPolicy < min {
+		return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", expPolicy, min)
 	}
-	return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", policy, min)
+	return nil
 }
 
 func expirationPolicyToProto(expirationPolicy optional.Duration) *pb.ExpirationPolicy {
diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go
index 386779e..8fe6710 100644
--- a/pubsub/subscription_test.go
+++ b/pubsub/subscription_test.go
@@ -360,3 +360,18 @@
 		t.Errorf("toMessage with dead-lettered enabled failed\ngot: %d, want %d", *got.DeliveryAttempt, receivedMsg.DeliveryAttempt)
 	}
 }
+
+func TestRetryPolicy_toProto(t *testing.T) {
+	in := &RetryPolicy{
+		MinimumBackoff: 20 * time.Second,
+		MaximumBackoff: 300 * time.Second,
+	}
+	got := in.toProto()
+	want := &pb.RetryPolicy{
+		MinimumBackoff: ptypes.DurationProto(20 * time.Second),
+		MaximumBackoff: ptypes.DurationProto(300 * time.Second),
+	}
+	if diff := testutil.Diff(got, want); diff != "" {
+		t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
+	}
+}