blob: 15e72e55a44b597655ae6e296216ac7ee7c2a002 [file] [log] [blame]
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"
"cloud.google.com/go/iam"
"cloud.google.com/go/internal/optional"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/internal/scheduler"
gax "github.com/googleapis/gax-go/v2"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
durpb "google.golang.org/protobuf/types/known/durationpb"
fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"
vkit "cloud.google.com/go/pubsub/apiv1"
)
// Subscription is a reference to a PubSub subscription.
type Subscription struct {
c *Client
// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
name string
// Settings for pulling messages. Configure these before calling Receive.
ReceiveSettings ReceiveSettings
mu sync.Mutex
receiveActive bool
}
// Subscription creates a reference to a subscription.
func (c *Client) Subscription(id string) *Subscription {
return c.SubscriptionInProject(id, c.projectID)
}
// SubscriptionInProject creates a reference to a subscription in a given project.
func (c *Client) SubscriptionInProject(id, projectID string) *Subscription {
return &Subscription{
c: c,
name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
}
}
// String returns the globally unique printable name of the subscription.
func (s *Subscription) String() string {
return s.name
}
// ID returns the unique identifier of the subscription within its project.
func (s *Subscription) ID() string {
slash := strings.LastIndex(s.name, "/")
if slash == -1 {
// name is not a fully-qualified name.
panic("bad subscription name")
}
return s.name[slash+1:]
}
// Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{
Project: c.fullyQualifiedProjectName(),
})
return &SubscriptionIterator{
c: c,
it: it,
next: func() (string, error) {
sub, err := it.Next()
if err != nil {
return "", err
}
return sub.Name, nil
},
}
}
// SubscriptionIterator is an iterator that returns a series of subscriptions.
type SubscriptionIterator struct {
c *Client
it *vkit.SubscriptionIterator
next func() (string, error)
}
// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
func (subs *SubscriptionIterator) Next() (*Subscription, error) {
subName, err := subs.next()
if err != nil {
return nil, err
}
return &Subscription{c: subs.c, name: subName}, nil
}
// NextConfig returns the next subscription config. If there are no more subscriptions,
// iterator.Done will be returned.
// This call shares the underlying iterator with calls to `SubscriptionIterator.Next`.
// If you wish to use mix calls, create separate iterator instances for both.
func (subs *SubscriptionIterator) NextConfig() (*SubscriptionConfig, error) {
spb, err := subs.it.Next()
if err != nil {
return nil, err
}
cfg, err := protoToSubscriptionConfig(spb, subs.c)
if err != nil {
return nil, err
}
return &cfg, nil
}
// PushConfig contains configuration for subscriptions that operate in push mode.
type PushConfig struct {
// A URL locating the endpoint to which messages should be pushed.
Endpoint string
// Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details.
Attributes map[string]string
// AuthenticationMethod is used by push endpoints to verify the source
// of push requests.
// It can be used with push endpoints that are private by default to
// allow requests only from the Cloud Pub/Sub system, for example.
// This field is optional and should be set only by users interested in
// authenticated push.
AuthenticationMethod AuthenticationMethod
// The format of the delivered message to the push endpoint is defined by
// the chosen wrapper. When unset, `PubsubWrapper` is used.
Wrapper Wrapper
}
func (pc *PushConfig) toProto() *pb.PushConfig {
if pc == nil {
return nil
}
pbCfg := &pb.PushConfig{
Attributes: pc.Attributes,
PushEndpoint: pc.Endpoint,
}
if authMethod := pc.AuthenticationMethod; authMethod != nil {
switch am := authMethod.(type) {
case *OIDCToken:
pbCfg.AuthenticationMethod = am.toProto()
default: // TODO: add others here when GAIC adds more definitions.
}
}
if w := pc.Wrapper; w != nil {
switch wt := w.(type) {
case *PubsubWrapper:
pbCfg.Wrapper = wt.toProto()
case *NoWrapper:
pbCfg.Wrapper = wt.toProto()
default:
}
}
return pbCfg
}
// AuthenticationMethod is used by push subscriptions to verify the source of push requests.
type AuthenticationMethod interface {
isAuthMethod() bool
}
// OIDCToken allows PushConfigs to be authenticated using
// the OpenID Connect protocol https://openid.net/connect/
type OIDCToken struct {
// Audience to be used when generating OIDC token. The audience claim
// identifies the recipients that the JWT is intended for. The audience
// value is a single case-sensitive string. Having multiple values (array)
// for the audience field is not supported. More info about the OIDC JWT
// token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3
// Note: if not specified, the Push endpoint URL will be used.
Audience string
// The service account email to be used for generating the OpenID Connect token.
// The caller of:
// * CreateSubscription
// * UpdateSubscription
// * ModifyPushConfig
// calls must have the iam.serviceAccounts.actAs permission for the service account.
// See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles.
ServiceAccountEmail string
}
var _ AuthenticationMethod = (*OIDCToken)(nil)
func (oidcToken *OIDCToken) isAuthMethod() bool { return true }
func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
if oidcToken == nil {
return nil
}
return &pb.PushConfig_OidcToken_{
OidcToken: &pb.PushConfig_OidcToken{
Audience: oidcToken.Audience,
ServiceAccountEmail: oidcToken.ServiceAccountEmail,
},
}
}
// Wrapper defines the format of message delivered to push endpoints.
type Wrapper interface {
isWrapper() bool
}
// PubsubWrapper denotes sending the payload to the push endpoint in the form of the JSON
// representation of a PubsubMessage
// (https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage).
type PubsubWrapper struct{}
var _ Wrapper = (*PubsubWrapper)(nil)
func (p *PubsubWrapper) isWrapper() bool { return true }
func (p *PubsubWrapper) toProto() *pb.PushConfig_PubsubWrapper_ {
if p == nil {
return nil
}
return &pb.PushConfig_PubsubWrapper_{
PubsubWrapper: &pb.PushConfig_PubsubWrapper{},
}
}
// NoWrapper denotes not wrapping the payload sent to the push endpoint.
type NoWrapper struct {
WriteMetadata bool
}
var _ Wrapper = (*NoWrapper)(nil)
func (n *NoWrapper) isWrapper() bool { return true }
func (n *NoWrapper) toProto() *pb.PushConfig_NoWrapper_ {
if n == nil {
return nil
}
return &pb.PushConfig_NoWrapper_{
NoWrapper: &pb.PushConfig_NoWrapper{
WriteMetadata: n.WriteMetadata,
},
}
}
// BigQueryConfigState denotes the possible states for a BigQuery Subscription.
type BigQueryConfigState int
const (
// BigQueryConfigStateUnspecified is the default value. This value is unused.
BigQueryConfigStateUnspecified = iota
// BigQueryConfigActive means the subscription can actively send messages to BigQuery.
BigQueryConfigActive
// BigQueryConfigPermissionDenied means the subscription cannot write to the BigQuery table because of permission denied errors.
BigQueryConfigPermissionDenied
// BigQueryConfigNotFound means the subscription cannot write to the BigQuery table because it does not exist.
BigQueryConfigNotFound
// BigQueryConfigSchemaMismatch means the subscription cannot write to the BigQuery table due to a schema mismatch.
BigQueryConfigSchemaMismatch
)
// BigQueryConfig configures the subscription to deliver to a BigQuery table.
type BigQueryConfig struct {
// The name of the table to which to write data, of the form
// {projectId}:{datasetId}.{tableId}
Table string
// When true, use the topic's schema as the columns to write to in BigQuery,
// if it exists.
UseTopicSchema bool
// When true, write the subscription name, message_id, publish_time,
// attributes, and ordering_key to additional columns in the table. The
// subscription name, message_id, and publish_time fields are put in their own
// columns while all other message properties (other than data) are written to
// a JSON object in the attributes column.
WriteMetadata bool
// When true and use_topic_schema is true, any fields that are a part of the
// topic schema that are not part of the BigQuery table schema are dropped
// when writing to BigQuery. Otherwise, the schemas must be kept in sync and
// any messages with extra fields are not written and remain in the
// subscription's backlog.
DropUnknownFields bool
// This is an output-only field that indicates whether or not the subscription can
// receive messages. This field is set only in responses from the server;
// it is ignored if it is set in any requests.
State BigQueryConfigState
}
func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig {
if bc == nil {
return nil
}
// If the config is zero valued, this is the sentinel for
// clearing bigquery config and switch back to pull.
if *bc == (BigQueryConfig{}) {
return nil
}
pbCfg := &pb.BigQueryConfig{
Table: bc.Table,
UseTopicSchema: bc.UseTopicSchema,
WriteMetadata: bc.WriteMetadata,
DropUnknownFields: bc.DropUnknownFields,
State: pb.BigQueryConfig_State(bc.State),
}
return pbCfg
}
// CloudStorageConfigState denotes the possible states for a Cloud Storage Subscription.
type CloudStorageConfigState int
const (
// CloudStorageConfigStateUnspecified is the default value. This value is unused.
CloudStorageConfigStateUnspecified = iota
// CloudStorageConfigActive means the subscription can actively send messages to Cloud Storage.
CloudStorageConfigActive
// CloudStorageConfigPermissionDenied means the subscription cannot write to the Cloud storage bucket because of permission denied errors.
CloudStorageConfigPermissionDenied
// CloudStorageConfigNotFound means the subscription cannot write to the Cloud Storage bucket because it does not exist.
CloudStorageConfigNotFound
)
// Configuration options for how to write the message data to Cloud Storage.
type isCloudStorageOutputFormat interface {
isCloudStorageOutputFormat()
}
// CloudStorageOutputFormatTextConfig is the configuration for writing
// message data in text format. Message payloads will be written to files
// as raw text, separated by a newline.
type CloudStorageOutputFormatTextConfig struct{}
// CloudStorageOutputFormatAvroConfig is the configuration for writing
// message data in Avro format. Message payloads and metadata will be written
// to the files as an Avro binary.
type CloudStorageOutputFormatAvroConfig struct {
// When true, write the subscription name, message_id, publish_time,
// attributes, and ordering_key as additional fields in the output.
WriteMetadata bool
}
func (*CloudStorageOutputFormatTextConfig) isCloudStorageOutputFormat() {}
func (*CloudStorageOutputFormatAvroConfig) isCloudStorageOutputFormat() {}
// CloudStorageConfig configures the subscription to deliver to Cloud Storage.
type CloudStorageConfig struct {
// User-provided name for the Cloud Storage bucket.
// The bucket must be created by the user. The bucket name must be without
// any prefix like "gs://". See the [bucket naming
// requirements] (https://cloud.google.com/storage/docs/buckets#naming).
Bucket string
// User-provided prefix for Cloud Storage filename. See the [object naming
// requirements](https://cloud.google.com/storage/docs/objects#naming).
FilenamePrefix string
// User-provided suffix for Cloud Storage filename. See the [object naming
// requirements](https://cloud.google.com/storage/docs/objects#naming).
FilenameSuffix string
// Configuration for how to write message data. Options are
// CloudStorageOutputFormat_TextConfig and CloudStorageOutputFormat_AvroConfig.
// Defaults to text format.
OutputFormat isCloudStorageOutputFormat
// The maximum duration that can elapse before a new Cloud Storage file is
// created. Min 1 minute, max 10 minutes, default 5 minutes. May not exceed
// the subscription's acknowledgement deadline.
MaxDuration optional.Duration
// The maximum bytes that can be written to a Cloud Storage file before a new
// file is created. Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded
// in cases where messages are larger than the limit.
MaxBytes int64
// Output only. An output-only field that indicates whether or not the
// subscription can receive messages.
State CloudStorageConfigState
}
func (cs *CloudStorageConfig) toProto() *pb.CloudStorageConfig {
if cs == nil {
return nil
}
// For the purposes of the live service, an empty/zero-valued config
// is treated the same as nil and clearing this setting.
if (CloudStorageConfig{}) == *cs {
return nil
}
var dur *durationpb.Duration
if cs.MaxDuration != nil {
dur = durationpb.New(optional.ToDuration(cs.MaxDuration))
}
pbCfg := &pb.CloudStorageConfig{
Bucket: cs.Bucket,
FilenamePrefix: cs.FilenamePrefix,
FilenameSuffix: cs.FilenameSuffix,
MaxDuration: dur,
MaxBytes: cs.MaxBytes,
State: pb.CloudStorageConfig_State(cs.State),
}
if out := cs.OutputFormat; out != nil {
if _, ok := out.(*CloudStorageOutputFormatTextConfig); ok {
pbCfg.OutputFormat = &pb.CloudStorageConfig_TextConfig_{}
} else if cfg, ok := out.(*CloudStorageOutputFormatAvroConfig); ok {
pbCfg.OutputFormat = &pb.CloudStorageConfig_AvroConfig_{
AvroConfig: &pb.CloudStorageConfig_AvroConfig{
WriteMetadata: cfg.WriteMetadata,
},
}
}
}
return pbCfg
}
// SubscriptionState denotes the possible states for a Subscription.
type SubscriptionState int
const (
// SubscriptionStateUnspecified is the default value. This value is unused.
SubscriptionStateUnspecified = iota
// SubscriptionStateActive means the subscription can actively send messages to BigQuery.
SubscriptionStateActive
// SubscriptionStateResourceError means the subscription receive messages because of an
// error with the resource to which it pushes messages.
// See the more detailed error state in the corresponding configuration.
SubscriptionStateResourceError
)
// SubscriptionConfig describes the configuration of a subscription. If none of
// PushConfig, BigQueryConfig, or CloudStorageConfig is set, then the subscriber will
// pull and ack messages using API methods. At most one of these fields may be set.
type SubscriptionConfig struct {
// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
name string
// The topic from which this subscription is receiving messages.
Topic *Topic
// If push delivery is used with this subscription, this field is
// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
// or `CloudStorageConfig` can be set. If all are empty, then the
// subscriber will pull and ack messages using API methods.
PushConfig PushConfig
// If delivery to BigQuery is used with this subscription, this field is
// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
// or `CloudStorageConfig` can be set. If all are empty, then the
// subscriber will pull and ack messages using API methods.
BigQueryConfig BigQueryConfig
// If delivery to Cloud Storage is used with this subscription, this field is
// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
// or `CloudStorageConfig` can be set. If all are empty, then the
// subscriber will pull and ack messages using API methods.
CloudStorageConfig CloudStorageConfig
// The default maximum time after a subscriber receives a message before
// the subscriber should acknowledge the message. Note: messages which are
// obtained via Subscription.Receive need not be acknowledged within this
// deadline, as the deadline will be automatically extended.
AckDeadline time.Duration
// Whether to retain acknowledged messages. If true, acknowledged messages
// will not be expunged until they fall out of the RetentionDuration window.
RetainAckedMessages bool
// How long to retain messages in backlog, from the time of publish. If
// RetainAckedMessages is true, this duration affects the retention of
// acknowledged messages, otherwise only unacknowledged messages are retained.
// Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes.
RetentionDuration time.Duration
// Expiration policy specifies the conditions for a subscription's expiration.
// A subscription is considered active as long as any connected subscriber is
// successfully consuming messages from the subscription or is issuing
// operations on the subscription. If `expiration_policy` is not set, a
// *default policy* with `ttl` of 31 days will be used. The minimum allowed
// value for `expiration_policy.ttl` is 1 day.
//
// Use time.Duration(0) to indicate that the subscription should never expire.
ExpirationPolicy optional.Duration
// The set of labels for the subscription.
Labels map[string]string
// EnableMessageOrdering enables message ordering on this subscription.
// This value is only used for subscription creation and update, and
// is not read locally in calls like Subscription.Receive().
//
// If set to false, even if messages are published with ordering keys,
// messages will not be delivered in order.
//
// When calling Subscription.Receive(), the client will check this
// value with a call to Subscription.Config(), which requires the
// roles/viewer or roles/pubsub.viewer role on your service account.
// If that call fails, mesages with ordering keys will be delivered in order.
EnableMessageOrdering bool
// DeadLetterPolicy specifies the conditions for dead lettering messages in
// a subscription. If not set, dead lettering is disabled.
DeadLetterPolicy *DeadLetterPolicy
// Filter is an expression written in the Cloud Pub/Sub filter language. If
// non-empty, then only `PubsubMessage`s whose `attributes` field matches the
// filter are delivered on this subscription. If empty, then no messages are
// filtered out. Cannot be changed after the subscription is created.
Filter string
// RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
RetryPolicy *RetryPolicy
// Detached indicates whether the subscription is detached from its topic.
// Detached subscriptions don't receive messages from their topic and don't
// retain any backlog. `Pull` and `StreamingPull` requests will return
// FAILED_PRECONDITION. If the subscription is a push subscription, pushes to
// the endpoint will not be made.
Detached bool
// TopicMessageRetentionDuration indicates the minimum duration for which a message is
// retained after it is published to the subscription's topic. If this field is
// set, messages published to the subscription's topic in the last
// `TopicMessageRetentionDuration` are always available to subscribers.
// You can enable both topic and subscription retention for the same topic.
// In this situation, the maximum of the retention durations takes effect.
//
// This is an output only field, meaning it will only appear in responses from the backend
// and will be ignored if sent in a request.
TopicMessageRetentionDuration time.Duration
// EnableExactlyOnceDelivery configures Pub/Sub to provide the following guarantees
// for the delivery of a message with a given MessageID on this subscription:
//
// The message sent to a subscriber is guaranteed not to be resent
// before the message's acknowledgement deadline expires.
// An acknowledged message will not be resent to a subscriber.
//
// Note that subscribers may still receive multiple copies of a message
// when `enable_exactly_once_delivery` is true if the message was published
// multiple times by a publisher client. These copies are considered distinct
// by Pub/Sub and have distinct MessageID values.
//
// Lastly, to guarantee messages have been acked or nacked properly, you must
// call Message.AckWithResult() or Message.NackWithResult(). These return an
// AckResult which will be ready if the message has been acked (or failed to be acked).
EnableExactlyOnceDelivery bool
// State indicates whether or not the subscription can receive messages.
// This is an output-only field that indicates whether or not the subscription can
// receive messages. This field is set only in responses from the server;
// it is ignored if it is set in any requests.
State SubscriptionState
}
// String returns the globally unique printable name of the subscription config.
// This method only works when the subscription config is returned from the server,
// such as when calling `client.Subscription` or `client.Subscriptions`.
// Otherwise, this will return an empty string.
func (s *SubscriptionConfig) String() string {
return s.name
}
// ID returns the unique identifier of the subscription within its project.
// This method only works when the subscription config is returned from the server,
// such as when calling `client.Subscription` or `client.Subscriptions`.
// Otherwise, this will return an empty string.
func (s *SubscriptionConfig) ID() string {
slash := strings.LastIndex(s.name, "/")
if slash == -1 {
return ""
}
return s.name[slash+1:]
}
func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
var pbPushConfig *pb.PushConfig
if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil {
pbPushConfig = cfg.PushConfig.toProto()
}
pbBigQueryConfig := cfg.BigQueryConfig.toProto()
pbCloudStorageConfig := cfg.CloudStorageConfig.toProto()
var retentionDuration *durpb.Duration
if cfg.RetentionDuration != 0 {
retentionDuration = durpb.New(cfg.RetentionDuration)
}
var pbDeadLetter *pb.DeadLetterPolicy
if cfg.DeadLetterPolicy != nil {
pbDeadLetter = cfg.DeadLetterPolicy.toProto()
}
var pbRetryPolicy *pb.RetryPolicy
if cfg.RetryPolicy != nil {
pbRetryPolicy = cfg.RetryPolicy.toProto()
}
return &pb.Subscription{
Name: name,
Topic: cfg.Topic.name,
PushConfig: pbPushConfig,
BigqueryConfig: pbBigQueryConfig,
CloudStorageConfig: pbCloudStorageConfig,
AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())),
RetainAckedMessages: cfg.RetainAckedMessages,
MessageRetentionDuration: retentionDuration,
Labels: cfg.Labels,
ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy),
EnableMessageOrdering: cfg.EnableMessageOrdering,
DeadLetterPolicy: pbDeadLetter,
Filter: cfg.Filter,
RetryPolicy: pbRetryPolicy,
Detached: cfg.Detached,
EnableExactlyOnceDelivery: cfg.EnableExactlyOnceDelivery,
}
}
func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) {
rd := time.Hour * 24 * 7
if pbSub.MessageRetentionDuration != nil {
rd = pbSub.MessageRetentionDuration.AsDuration()
}
var expirationPolicy time.Duration
if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil {
expirationPolicy = ttl.AsDuration()
}
dlp := protoToDLP(pbSub.DeadLetterPolicy)
rp := protoToRetryPolicy(pbSub.RetryPolicy)
subC := SubscriptionConfig{
name: pbSub.Name,
Topic: newTopic(c, pbSub.Topic),
AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
RetainAckedMessages: pbSub.RetainAckedMessages,
RetentionDuration: rd,
Labels: pbSub.Labels,
ExpirationPolicy: expirationPolicy,
EnableMessageOrdering: pbSub.EnableMessageOrdering,
DeadLetterPolicy: dlp,
Filter: pbSub.Filter,
RetryPolicy: rp,
Detached: pbSub.Detached,
TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(),
EnableExactlyOnceDelivery: pbSub.EnableExactlyOnceDelivery,
State: SubscriptionState(pbSub.State),
}
if pc := protoToPushConfig(pbSub.PushConfig); pc != nil {
subC.PushConfig = *pc
}
if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil {
subC.BigQueryConfig = *bq
}
if cs := protoToStorageConfig(pbSub.GetCloudStorageConfig()); cs != nil {
subC.CloudStorageConfig = *cs
}
return subC, nil
}
func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
if pbPc == nil {
return nil
}
pc := &PushConfig{
Endpoint: pbPc.PushEndpoint,
Attributes: pbPc.Attributes,
}
if am := pbPc.AuthenticationMethod; am != nil {
if oidcToken, ok := am.(*pb.PushConfig_OidcToken_); ok && oidcToken != nil && oidcToken.OidcToken != nil {
pc.AuthenticationMethod = &OIDCToken{
Audience: oidcToken.OidcToken.GetAudience(),
ServiceAccountEmail: oidcToken.OidcToken.GetServiceAccountEmail(),
}
}
}
if w := pbPc.Wrapper; w != nil {
switch wt := w.(type) {
case *pb.PushConfig_PubsubWrapper_:
pc.Wrapper = &PubsubWrapper{}
case *pb.PushConfig_NoWrapper_:
pc.Wrapper = &NoWrapper{
WriteMetadata: wt.NoWrapper.WriteMetadata,
}
}
}
return pc
}
func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig {
if pbBQ == nil {
return nil
}
bq := &BigQueryConfig{
Table: pbBQ.GetTable(),
UseTopicSchema: pbBQ.GetUseTopicSchema(),
DropUnknownFields: pbBQ.GetDropUnknownFields(),
WriteMetadata: pbBQ.GetWriteMetadata(),
State: BigQueryConfigState(pbBQ.State),
}
return bq
}
func protoToStorageConfig(pbCSC *pb.CloudStorageConfig) *CloudStorageConfig {
if pbCSC == nil {
return nil
}
csc := &CloudStorageConfig{
Bucket: pbCSC.GetBucket(),
FilenamePrefix: pbCSC.GetFilenamePrefix(),
FilenameSuffix: pbCSC.GetFilenameSuffix(),
MaxBytes: pbCSC.GetMaxBytes(),
State: CloudStorageConfigState(pbCSC.GetState()),
}
if dur := pbCSC.GetMaxDuration().AsDuration(); dur != 0 {
csc.MaxDuration = dur
}
if out := pbCSC.OutputFormat; out != nil {
if _, ok := out.(*pb.CloudStorageConfig_TextConfig_); ok {
csc.OutputFormat = &CloudStorageOutputFormatTextConfig{}
} else if cfg, ok := out.(*pb.CloudStorageConfig_AvroConfig_); ok {
csc.OutputFormat = &CloudStorageOutputFormatAvroConfig{WriteMetadata: cfg.AvroConfig.GetWriteMetadata()}
}
}
return csc
}
// DeadLetterPolicy specifies the conditions for dead lettering messages in
// a subscription.
type DeadLetterPolicy struct {
DeadLetterTopic string
MaxDeliveryAttempts int
}
func (dlp *DeadLetterPolicy) toProto() *pb.DeadLetterPolicy {
if dlp == nil || dlp.DeadLetterTopic == "" {
return nil
}
return &pb.DeadLetterPolicy{
DeadLetterTopic: dlp.DeadLetterTopic,
MaxDeliveryAttempts: int32(dlp.MaxDeliveryAttempts),
}
}
func protoToDLP(pbDLP *pb.DeadLetterPolicy) *DeadLetterPolicy {
if pbDLP == nil {
return nil
}
return &DeadLetterPolicy{
DeadLetterTopic: pbDLP.GetDeadLetterTopic(),
MaxDeliveryAttempts: int(pbDLP.MaxDeliveryAttempts),
}
}
// RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
//
// Retry delay will be exponential based on provided minimum and maximum
// backoffs. https://en.wikipedia.org/wiki/Exponential_backoff.
//
// RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded
// events for a given message.
//
// Retry Policy is implemented on a best effort basis. At times, the delay
// between consecutive deliveries may not match the configuration. That is,
// delay can be more or less than configured backoff.
type RetryPolicy struct {
// MinimumBackoff is the minimum delay between consecutive deliveries of a
// given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds.
MinimumBackoff optional.Duration
// MaximumBackoff is the maximum delay between consecutive deliveries of a
// given message. Value should be between 0 and 600 seconds. Defaults to 600 seconds.
MaximumBackoff optional.Duration
}
func (rp *RetryPolicy) toProto() *pb.RetryPolicy {
if rp == nil {
return nil
}
// If RetryPolicy is the empty struct, take this as an instruction
// to remove RetryPolicy from the subscription.
if rp.MinimumBackoff == nil && rp.MaximumBackoff == nil {
return nil
}
// Initialize minDur and maxDur to be negative, such that if the conversion from an
// optional fails, RetryPolicy won't be updated in the proto as it will remain nil.
var minDur time.Duration = -1
var maxDur time.Duration = -1
if rp.MinimumBackoff != nil {
minDur = optional.ToDuration(rp.MinimumBackoff)
}
if rp.MaximumBackoff != nil {
maxDur = optional.ToDuration(rp.MaximumBackoff)
}
var minDurPB, maxDurPB *durpb.Duration
if minDur > 0 {
minDurPB = durpb.New(minDur)
}
if maxDur > 0 {
maxDurPB = durpb.New(maxDur)
}
return &pb.RetryPolicy{
MinimumBackoff: minDurPB,
MaximumBackoff: maxDurPB,
}
}
func protoToRetryPolicy(rp *pb.RetryPolicy) *RetryPolicy {
if rp == nil {
return nil
}
var minBackoff, maxBackoff time.Duration
if rp.MinimumBackoff != nil {
minBackoff = rp.MinimumBackoff.AsDuration()
}
if rp.MaximumBackoff != nil {
maxBackoff = rp.MaximumBackoff.AsDuration()
}
retryPolicy := &RetryPolicy{
MinimumBackoff: minBackoff,
MaximumBackoff: maxBackoff,
}
return retryPolicy
}
// ReceiveSettings configure the Receive method.
// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
type ReceiveSettings struct {
// MaxExtension is the maximum period for which the Subscription should
// automatically extend the ack deadline for each message.
//
// The Subscription will automatically extend the ack deadline of all
// fetched Messages up to the duration specified. Automatic deadline
// extension beyond the initial receipt may be disabled by specifying a
// duration less than 0.
MaxExtension time.Duration
// MaxExtensionPeriod is the maximum duration by which to extend the ack
// deadline at a time. The ack deadline will continue to be extended by up
// to this duration until MaxExtension is reached. Setting MaxExtensionPeriod
// bounds the maximum amount of time before a message redelivery in the
// event the subscriber fails to extend the deadline.
//
// MaxExtensionPeriod must be between 10s and 600s (inclusive). This configuration
// can be disabled by specifying a duration less than (or equal to) 0.
MaxExtensionPeriod time.Duration
// MinExtensionPeriod is the the min duration for a single lease extension attempt.
// By default the 99th percentile of ack latency is used to determine lease extension
// periods but this value can be set to minimize the number of extraneous RPCs sent.
//
// MinExtensionPeriod must be between 10s and 600s (inclusive). This configuration
// can be disabled by specifying a duration less than (or equal to) 0.
// Defaults to off but set to 60 seconds if the subscription has exactly-once delivery enabled,
// which will be added in a future release.
MinExtensionPeriod time.Duration
// MaxOutstandingMessages is the maximum number of unprocessed messages
// (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
// will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
// If the value is negative, then there will be no limit on the number of
// unprocessed messages.
MaxOutstandingMessages int
// MaxOutstandingBytes is the maximum size of unprocessed messages
// (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will
// be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If
// the value is negative, then there will be no limit on the number of bytes
// for unprocessed messages.
MaxOutstandingBytes int
// UseLegacyFlowControl disables enforcing flow control settings at the Cloud
// PubSub server and the less accurate method of only enforcing flow control
// at the client side is used.
// The default is false.
UseLegacyFlowControl bool
// NumGoroutines sets the number of StreamingPull streams to pull messages
// from the subscription.
//
// NumGoroutines defaults to DefaultReceiveSettings.NumGoroutines.
//
// NumGoroutines does not limit the number of messages that can be processed
// concurrently. Even with one goroutine, many messages might be processed at
// once, because that goroutine may continually receive messages and invoke the
// function passed to Receive on them. To limit the number of messages being
// processed concurrently, set MaxOutstandingMessages.
NumGoroutines int
// Synchronous switches the underlying receiving mechanism to unary Pull.
// When Synchronous is false, the more performant StreamingPull is used.
// StreamingPull also has the benefit of subscriber affinity when using
// ordered delivery.
// When Synchronous is true, NumGoroutines is set to 1 and only one Pull
// RPC will be made to poll messages at a time.
// The default is false.
//
// Deprecated.
// Previously, users might use Synchronous mode since StreamingPull had a limitation
// where MaxOutstandingMessages was not always respected with large batches of
// small messages. With server side flow control, this is no longer an issue
// and we recommend switching to the default StreamingPull mode by setting
// Synchronous to false.
// Synchronous mode does not work with exactly once delivery.
Synchronous bool
}
// For synchronous receive, the time to wait if we are already processing
// MaxOutstandingMessages. There is no point calling Pull and asking for zero
// messages, so we pause to allow some message-processing callbacks to finish.
//
// The wait time is large enough to avoid consuming significant CPU, but
// small enough to provide decent throughput. Users who want better
// throughput should not be using synchronous mode.
//
// Waiting might seem like polling, so it's natural to think we could do better by
// noticing when a callback is finished and immediately calling Pull. But if
// callbacks finish in quick succession, this will result in frequent Pull RPCs that
// request a single message, which wastes network bandwidth. Better to wait for a few
// callbacks to finish, so we make fewer RPCs fetching more messages.
//
// This value is unexported so the user doesn't have another knob to think about. Note that
// it is the same value as the one used for nackTicker, so it matches this client's
// idea of a duration that is short, but not so short that we perform excessive RPCs.
const synchronousWaitTime = 100 * time.Millisecond
// DefaultReceiveSettings holds the default values for ReceiveSettings.
var DefaultReceiveSettings = ReceiveSettings{
MaxExtension: 60 * time.Minute,
MaxExtensionPeriod: 0,
MinExtensionPeriod: 0,
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9, // 1G
NumGoroutines: 10,
}
// Delete deletes the subscription.
func (s *Subscription) Delete(ctx context.Context) error {
return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name})
}
// Exists reports whether the subscription exists on the server.
func (s *Subscription) Exists(ctx context.Context) (bool, error) {
_, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
if err == nil {
return true, nil
}
if status.Code(err) == codes.NotFound {
return false, nil
}
return false, err
}
// Config fetches the current configuration for the subscription.
func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {
pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
if err != nil {
return SubscriptionConfig{}, err
}
cfg, err := protoToSubscriptionConfig(pbSub, s.c)
if err != nil {
return SubscriptionConfig{}, err
}
return cfg, nil
}
// SubscriptionConfigToUpdate describes how to update a subscription.
type SubscriptionConfigToUpdate struct {
// If non-nil, the push config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig
// can be set.
// If currently in push mode, set this value to the zero value to revert to a Pull based subscription.
PushConfig *PushConfig
// If non-nil, the bigquery config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig
// can be set.
// If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription,
BigQueryConfig *BigQueryConfig
// If non-nil, the Cloud Storage config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig
// can be set.
// If currently in CloudStorage mode, set this value to the zero value to revert to a Pull based subscription,
CloudStorageConfig *CloudStorageConfig
// If non-zero, the ack deadline is changed.
AckDeadline time.Duration
// If set, RetainAckedMessages is changed.
RetainAckedMessages optional.Bool
// If non-zero, RetentionDuration is changed.
RetentionDuration time.Duration
// If non-zero, Expiration is changed.
ExpirationPolicy optional.Duration
// If non-nil, DeadLetterPolicy is changed. To remove dead lettering from
// a subscription, use the zero value for this struct.
DeadLetterPolicy *DeadLetterPolicy
// If non-nil, the current set of labels is completely
// replaced by the new set.
// This field has beta status. It is not subject to the stability guarantee
// and may change.
Labels map[string]string
// If non-nil, RetryPolicy is changed. To remove an existing retry policy
// (to redeliver messages as soon as possible) use a pointer to the zero value
// for this struct.
RetryPolicy *RetryPolicy
// If set, EnableExactlyOnce is changed.
EnableExactlyOnceDelivery optional.Bool
}
// Update changes an existing subscription according to the fields set in cfg.
// It returns the new SubscriptionConfig.
//
// Update returns an error if no fields were modified.
func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) {
req := s.updateRequest(&cfg)
if err := cfg.validate(); err != nil {
return SubscriptionConfig{}, fmt.Errorf("pubsub: UpdateSubscription %w", err)
}
if len(req.UpdateMask.Paths) == 0 {
return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update")
}
rpsub, err := s.c.subc.UpdateSubscription(ctx, req)
if err != nil {
return SubscriptionConfig{}, err
}
return protoToSubscriptionConfig(rpsub, s.c)
}
func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest {
psub := &pb.Subscription{Name: s.name}
var paths []string
if cfg.PushConfig != nil {
psub.PushConfig = cfg.PushConfig.toProto()
paths = append(paths, "push_config")
}
if cfg.BigQueryConfig != nil {
psub.BigqueryConfig = cfg.BigQueryConfig.toProto()
paths = append(paths, "bigquery_config")
}
if cfg.CloudStorageConfig != nil {
psub.CloudStorageConfig = cfg.CloudStorageConfig.toProto()
paths = append(paths, "cloud_storage_config")
}
if cfg.AckDeadline != 0 {
psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
paths = append(paths, "ack_deadline_seconds")
}
if cfg.RetainAckedMessages != nil {
psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages)
paths = append(paths, "retain_acked_messages")
}
if cfg.RetentionDuration != 0 {
psub.MessageRetentionDuration = durpb.New(cfg.RetentionDuration)
paths = append(paths, "message_retention_duration")
}
if cfg.ExpirationPolicy != nil {
psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy)
paths = append(paths, "expiration_policy")
}
if cfg.DeadLetterPolicy != nil {
psub.DeadLetterPolicy = cfg.DeadLetterPolicy.toProto()
paths = append(paths, "dead_letter_policy")
}
if cfg.Labels != nil {
psub.Labels = cfg.Labels
paths = append(paths, "labels")
}
if cfg.RetryPolicy != nil {
psub.RetryPolicy = cfg.RetryPolicy.toProto()
paths = append(paths, "retry_policy")
}
if cfg.EnableExactlyOnceDelivery != nil {
psub.EnableExactlyOnceDelivery = optional.ToBool(cfg.EnableExactlyOnceDelivery)
paths = append(paths, "enable_exactly_once_delivery")
}
return &pb.UpdateSubscriptionRequest{
Subscription: psub,
UpdateMask: &fmpb.FieldMask{Paths: paths},
}
}
const (
// The minimum expiration policy duration is 1 day as per:
// https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L606-L607
minExpirationPolicy = 24 * time.Hour
// If an expiration policy is not specified, the default of 31 days is used as per:
// https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L605-L606
defaultExpirationPolicy = 31 * 24 * time.Hour
)
func (cfg *SubscriptionConfigToUpdate) validate() error {
if cfg == nil || cfg.ExpirationPolicy == nil {
return nil
}
expPolicy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy
if expPolicy != 0 && expPolicy < min {
return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", expPolicy, min)
}
return nil
}
func expirationPolicyToProto(expirationPolicy optional.Duration) *pb.ExpirationPolicy {
if expirationPolicy == nil {
return nil
}
dur := optional.ToDuration(expirationPolicy)
var ttl *durpb.Duration
// As per:
// https://godoc.org/google.golang.org/genproto/googleapis/pubsub/v1#ExpirationPolicy.Ttl
// if ExpirationPolicy.Ttl is set to nil, the expirationPolicy is toggled to NEVER expire.
if dur != 0 {
ttl = durpb.New(dur)
}
return &pb.ExpirationPolicy{
Ttl: ttl,
}
}
// IAM returns the subscription's IAM handle.
func (s *Subscription) IAM() *iam.Handle {
return iam.InternalNewHandle(s.c.subc.Connection(), s.name)
}
// CreateSubscription creates a new subscription on a topic.
//
// id is the name of the subscription to create. It must start with a letter,
// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-),
// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It
// must be between 3 and 255 characters in length, and must not start with
// "goog".
//
// cfg.Topic is the topic from which the subscription should receive messages. It
// need not belong to the same project as the subscription. This field is required.
//
// cfg.AckDeadline is the maximum time after a subscriber receives a message before
// the subscriber should acknowledge the message. It must be between 10 and 600
// seconds (inclusive), and is rounded down to the nearest second. If the
// provided ackDeadline is 0, then the default value of 10 seconds is used.
// Note: messages which are obtained via Subscription.Receive need not be
// acknowledged within this deadline, as the deadline will be automatically
// extended.
//
// cfg.PushConfig may be set to configure this subscription for push delivery.
//
// If the subscription already exists an error will be returned.
func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) {
if cfg.Topic == nil {
return nil, errors.New("pubsub: require non-nil Topic")
}
if cfg.AckDeadline == 0 {
cfg.AckDeadline = 10 * time.Second
}
if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second {
return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d)
}
sub := c.Subscription(id)
_, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name))
if err != nil {
return nil, err
}
return sub, nil
}
var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription")
// Receive calls f with the outstanding messages from the subscription.
// It blocks until ctx is done, or the service returns a non-retryable error.
//
// The standard way to terminate a Receive is to cancel its context:
//
// cctx, cancel := context.WithCancel(ctx)
// err := sub.Receive(cctx, callback)
// // Call cancel from callback, or another goroutine.
//
// If the service returns a non-retryable error, Receive returns that error after
// all of the outstanding calls to f have returned. If ctx is done, Receive
// returns nil after all of the outstanding calls to f have returned and
// all messages have been acknowledged or have expired.
//
// Receive calls f concurrently from multiple goroutines. It is encouraged to
// process messages synchronously in f, even if that processing is relatively
// time-consuming; Receive will spawn new goroutines for incoming messages,
// limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
//
// The context passed to f will be canceled when ctx is Done or there is a
// fatal service error.
//
// Receive will send an ack deadline extension on message receipt, then
// automatically extend the ack deadline of all fetched Messages up to the
// period specified by s.ReceiveSettings.MaxExtension.
//
// Each Subscription may have only one invocation of Receive active at a time.
func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error {
s.mu.Lock()
if s.receiveActive {
s.mu.Unlock()
return errReceiveInProgress
}
s.receiveActive = true
s.mu.Unlock()
defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()
// TODO(hongalex): move settings check to a helper function to make it more testable
maxCount := s.ReceiveSettings.MaxOutstandingMessages
if maxCount == 0 {
maxCount = DefaultReceiveSettings.MaxOutstandingMessages
}
maxBytes := s.ReceiveSettings.MaxOutstandingBytes
if maxBytes == 0 {
maxBytes = DefaultReceiveSettings.MaxOutstandingBytes
}
maxExt := s.ReceiveSettings.MaxExtension
if maxExt == 0 {
maxExt = DefaultReceiveSettings.MaxExtension
} else if maxExt < 0 {
// If MaxExtension is negative, disable automatic extension.
maxExt = 0
}
maxExtPeriod := s.ReceiveSettings.MaxExtensionPeriod
if maxExtPeriod < 0 {
maxExtPeriod = DefaultReceiveSettings.MaxExtensionPeriod
}
minExtPeriod := s.ReceiveSettings.MinExtensionPeriod
if minExtPeriod < 0 {
minExtPeriod = DefaultReceiveSettings.MinExtensionPeriod
}
var numGoroutines int
switch {
case s.ReceiveSettings.Synchronous:
numGoroutines = 1
case s.ReceiveSettings.NumGoroutines >= 1:
numGoroutines = s.ReceiveSettings.NumGoroutines
default:
numGoroutines = DefaultReceiveSettings.NumGoroutines
}
// TODO(jba): add tests that verify that ReceiveSettings are correctly processed.
po := &pullOptions{
maxExtension: maxExt,
maxExtensionPeriod: maxExtPeriod,
minExtensionPeriod: minExtPeriod,
maxPrefetch: trunc32(int64(maxCount)),
synchronous: s.ReceiveSettings.Synchronous,
maxOutstandingMessages: maxCount,
maxOutstandingBytes: maxBytes,
useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl,
}
fc := newSubscriptionFlowController(FlowControlSettings{
MaxOutstandingMessages: maxCount,
MaxOutstandingBytes: maxBytes,
LimitExceededBehavior: FlowControlBlock,
})
sched := scheduler.NewReceiveScheduler(maxCount)
// Wait for all goroutines started by Receive to return, so instead of an
// obscure goroutine leak we have an obvious blocked call to Receive.
group, gctx := errgroup.WithContext(ctx)
type closeablePair struct {
wg *sync.WaitGroup
iter *messageIterator
}
var pairs []closeablePair
// Cancel a sub-context which, when we finish a single receiver, will kick
// off the context-aware callbacks and the goroutine below (which stops
// all receivers, iterators, and the scheduler).
ctx2, cancel2 := context.WithCancel(gctx)
defer cancel2()
for i := 0; i < numGoroutines; i++ {
// The iterator does not use the context passed to Receive. If it did,
// canceling that context would immediately stop the iterator without
// waiting for unacked messages.
iter := newMessageIterator(s.c.subc, s.name, po)
// We cannot use errgroup from Receive here. Receive might already be
// calling group.Wait, and group.Wait cannot be called concurrently with
// group.Go. We give each receive() its own WaitGroup instead.
//
// Since wg.Add is only called from the main goroutine, wg.Wait is
// guaranteed to be called after all Adds.
var wg sync.WaitGroup
wg.Add(1)
pairs = append(pairs, closeablePair{wg: &wg, iter: iter})
group.Go(func() error {
defer wg.Wait()
defer cancel2()
for {
var maxToPull int32 // maximum number of messages to pull
if po.synchronous {
if po.maxPrefetch < 0 {
// If there is no limit on the number of messages to
// pull, use a reasonable default.
maxToPull = 1000
} else {
// Limit the number of messages in memory to MaxOutstandingMessages
// (here, po.maxPrefetch). For each message currently in memory, we have
// called fc.acquire but not fc.release: this is fc.count(). The next
// call to Pull should fetch no more than the difference between these
// values.
maxToPull = po.maxPrefetch - int32(fc.count())
if maxToPull <= 0 {
// Wait for some callbacks to finish.
if err := gax.Sleep(ctx, synchronousWaitTime); err != nil {
// Return nil if the context is done, not err.
return nil
}
continue
}
}
}
// If the context is done, don't pull more messages.
select {
case <-ctx.Done():
return nil
default:
}
msgs, err := iter.receive(maxToPull)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// If context is done and messages have been pulled,
// nack them.
select {
case <-ctx.Done():
for _, m := range msgs {
m.Nack()
}
return nil
default:
}
for i, msg := range msgs {
msg := msg
// TODO(jba): call acquire closer to when the message is allocated.
if err := fc.acquire(ctx, len(msg.Data)); err != nil {
// TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
for _, m := range msgs[i:] {
m.Nack()
}
// Return nil if the context is done, not err.
return nil
}
iter.eoMu.RLock()
msgAckHandler(msg, iter.enableExactlyOnceDelivery)
iter.eoMu.RUnlock()
wg.Add(1)
// Only schedule messages in order if an ordering key is present and the subscriber client
// received the ordering flag from a Streaming Pull response.
var key string
iter.orderingMu.RLock()
if iter.enableOrdering {
key = msg.OrderingKey
}
iter.orderingMu.RUnlock()
msgLen := len(msg.Data)
if err := sched.Add(key, msg, func(msg interface{}) {
defer wg.Done()
defer fc.release(ctx, msgLen)
f(ctx2, msg.(*Message))
}); err != nil {
wg.Done()
// If there are any errors with scheduling messages,
// nack them so they can be redelivered.
msg.Nack()
// Currently, only this error is returned by the receive scheduler.
if errors.Is(err, scheduler.ErrReceiveDraining) {
return nil
}
return err
}
}
}
})
}
go func() {
<-ctx2.Done()
// Wait for all iterators to stop.
for _, p := range pairs {
p.iter.stop()
p.wg.Done()
}
// This _must_ happen after every iterator has stopped, or some
// iterator will still have undelivered messages but the scheduler will
// already be shut down.
sched.Shutdown()
}()
return group.Wait()
}
type pullOptions struct {
maxExtension time.Duration // the maximum time to extend a message's ack deadline in total
maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
minExtensionPeriod time.Duration // the minimum time to extend a message's lease duration per modack
maxPrefetch int32 // the max number of outstanding messages, used to calculate maxToPull
// If true, use unary Pull instead of StreamingPull, and never pull more
// than maxPrefetch messages.
synchronous bool
maxOutstandingMessages int
maxOutstandingBytes int
useLegacyFlowControl bool
}