blob: 8e402a7f18c2a8ea9d99f792879a581260fd324b [file] [log] [blame] [edit]
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"errors"
"fmt"
"log"
"runtime"
"strings"
"sync"
"time"
"cloud.google.com/go/iam"
"cloud.google.com/go/internal/optional"
ipubsub "cloud.google.com/go/internal/pubsub"
vkit "cloud.google.com/go/pubsub/apiv1"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/internal/scheduler"
gax "github.com/googleapis/gax-go/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/api/support/bundler"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
// MaxPublishRequestCount is the maximum number of messages that can be in
// a single publish request, as defined by the PubSub service.
MaxPublishRequestCount = 1000
// MaxPublishRequestBytes is the maximum size of a single publish request
// in bytes, as defined by the PubSub service.
MaxPublishRequestBytes = 1e7
)
const (
// TODO: math.MaxInt was added in Go 1.17. We should use that once 1.17
// becomes the minimum supported version of Go.
intSize = 32 << (^uint(0) >> 63)
maxInt = 1<<(intSize-1) - 1
)
// ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes.
var ErrOversizedMessage = bundler.ErrOversizedItem
// Topic is a reference to a PubSub topic.
//
// The methods of Topic are safe for use by multiple goroutines.
type Topic struct {
c *Client
// The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>"
name string
// Settings for publishing messages. All changes must be made before the
// first call to Publish. The default is DefaultPublishSettings.
PublishSettings PublishSettings
mu sync.RWMutex
stopped bool
scheduler *scheduler.PublishScheduler
flowController
// EnableMessageOrdering enables delivery of ordered keys.
EnableMessageOrdering bool
// enableTracing enables OTel tracing of Pub/Sub messages on this topic.
// This is configured at client instantiation, and allows
// disabling tracing even when a tracer provider is detectd.
enableTracing bool
}
// PublishSettings control the bundling of published messages.
type PublishSettings struct {
// Publish a non-empty batch after this delay has passed.
DelayThreshold time.Duration
// Publish a batch when it has this many messages. The maximum is
// MaxPublishRequestCount.
CountThreshold int
// Publish a batch when its size in bytes reaches this value.
ByteThreshold int
// The number of goroutines used in each of the data structures that are
// involved along the the Publish path. Adjusting this value adjusts
// concurrency along the publish path.
//
// Defaults to a multiple of GOMAXPROCS.
NumGoroutines int
// The maximum time that the client will attempt to publish a bundle of messages.
Timeout time.Duration
// The maximum number of bytes that the Bundler will keep in memory before
// returning ErrOverflow. This is now superseded by FlowControlSettings.MaxOutstandingBytes.
// If MaxOutstandingBytes is set, that value will override BufferedByteLimit.
//
// Defaults to DefaultPublishSettings.BufferedByteLimit.
// Deprecated: Set `Topic.PublishSettings.FlowControlSettings.MaxOutstandingBytes` instead.
BufferedByteLimit int
// FlowControlSettings defines publisher flow control settings.
FlowControlSettings FlowControlSettings
// EnableCompression enables transport compression for Publish operations
EnableCompression bool
// CompressionBytesThreshold defines the threshold (in bytes) above which messages
// are compressed for transport. Only takes effect if EnableCompression is true.
CompressionBytesThreshold int
}
func (ps *PublishSettings) shouldCompress(batchSize int) bool {
return ps.EnableCompression && batchSize > ps.CompressionBytesThreshold
}
// DefaultPublishSettings holds the default values for topics' PublishSettings.
var DefaultPublishSettings = PublishSettings{
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 100,
ByteThreshold: 1e6,
Timeout: 60 * time.Second,
// By default, limit the bundler to 10 times the max message size. The number 10 is
// chosen as a reasonable amount of messages in the worst case whilst still
// capping the number to a low enough value to not OOM users.
BufferedByteLimit: 10 * MaxPublishRequestBytes,
FlowControlSettings: FlowControlSettings{
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: -1,
LimitExceededBehavior: FlowControlIgnore,
},
// Publisher compression defaults matches Java's defaults
// https://github.com/googleapis/java-pubsub/blob/7d33e7891db1b2e32fd523d7655b6c11ea140a8b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L717-L718
EnableCompression: false,
CompressionBytesThreshold: 240,
}
// CreateTopic creates a new topic.
//
// The specified topic ID must start with a letter, and contain only letters
// ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.),
// tildes (~), plus (+) or percent signs (%). It must be between 3 and 255
// characters in length, and must not start with "goog". For more information,
// see: https://cloud.google.com/pubsub/docs/admin#resource_names
//
// If the topic already exists an error will be returned.
func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error) {
t := c.Topic(topicID)
_, err := c.pubc.CreateTopic(ctx, &pb.Topic{Name: t.name})
if err != nil {
return nil, err
}
return t, nil
}
// CreateTopicWithConfig creates a topic from TopicConfig.
//
// The specified topic ID must start with a letter, and contain only letters
// ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.),
// tildes (~), plus (+) or percent signs (%). It must be between 3 and 255
// characters in length, and must not start with "goog". For more information,
// see: https://cloud.google.com/pubsub/docs/admin#resource_names.
//
// If the topic already exists, an error will be returned.
func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error) {
t := c.Topic(topicID)
topic := tc.toProto()
topic.Name = t.name
_, err := c.pubc.CreateTopic(ctx, topic)
if err != nil {
return nil, err
}
return t, nil
}
// Topic creates a reference to a topic in the client's project.
//
// If a Topic's Publish method is called, it has background goroutines
// associated with it. Clean them up by calling Topic.Stop.
//
// Avoid creating many Topic instances if you use them to publish.
func (c *Client) Topic(id string) *Topic {
return c.TopicInProject(id, c.projectID)
}
// TopicInProject creates a reference to a topic in the given project.
//
// If a Topic's Publish method is called, it has background goroutines
// associated with it. Clean them up by calling Topic.Stop.
//
// Avoid creating many Topic instances if you use them to publish.
func (c *Client) TopicInProject(id, projectID string) *Topic {
return newTopic(c, fmt.Sprintf("projects/%s/topics/%s", projectID, id))
}
func newTopic(c *Client, name string) *Topic {
return &Topic{
c: c,
name: name,
PublishSettings: DefaultPublishSettings,
enableTracing: c.enableTracing,
}
}
// TopicState denotes the possible states for a topic.
type TopicState int
const (
// TopicStateUnspecified is the default value. This value is unused.
TopicStateUnspecified = iota
// TopicStateActive means the topic does not have any persistent errors.
TopicStateActive
// TopicStateIngestionResourceError means ingestion from the data source
// has encountered a permanent error.
// See the more detailed error state in the corresponding ingestion
// source configuration.
TopicStateIngestionResourceError
)
// TopicConfig describes the configuration of a topic.
type TopicConfig struct {
// The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>"
name string
// The set of labels for the topic.
Labels map[string]string
// The topic's message storage policy.
MessageStoragePolicy MessageStoragePolicy
// The name of the Cloud KMS key to be used to protect access to messages
// published to this topic, in the format
// "projects/P/locations/L/keyRings/R/cryptoKeys/K".
KMSKeyName string
// Schema defines the schema settings upon topic creation.
SchemaSettings *SchemaSettings
// RetentionDuration configures the minimum duration to retain a message
// after it is published to the topic. If this field is set, messages published
// to the topic in the last `RetentionDuration` are always available to subscribers.
// For instance, it allows any attached subscription to [seek to a
// timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time)
// that is up to `RetentionDuration` in the past. If this field is
// not set, message retention is controlled by settings on individual
// subscriptions. Cannot be more than 31 days or less than 10 minutes.
//
// For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention.
RetentionDuration optional.Duration
// State is an output-only field indicating the state of the topic.
State TopicState
// IngestionDataSourceSettings are settings for ingestion from a
// data source into this topic.
IngestionDataSourceSettings *IngestionDataSourceSettings
// MessageTransforms are the transforms to be applied to messages published to the topic.
// Transforms are applied in the order specified.
MessageTransforms []MessageTransform
}
// String returns the printable globally unique name for the topic config.
// This method only works when the topic config is returned from the server,
// such as when calling `client.Topic` or `client.Topics`.
// Otherwise, this will return an empty string.
func (t *TopicConfig) String() string {
return t.name
}
// ID returns the unique identifier of the topic within its project.
// This method only works when the topic config is returned from the server,
// such as when calling `client.Topic` or `client.Topics`.
// Otherwise, this will return an empty string.
func (t *TopicConfig) ID() string {
slash := strings.LastIndex(t.name, "/")
if slash == -1 {
return ""
}
return t.name[slash+1:]
}
func (tc *TopicConfig) toProto() *pb.Topic {
var retDur *durationpb.Duration
if tc.RetentionDuration != nil {
retDur = durationpb.New(optional.ToDuration(tc.RetentionDuration))
}
pbt := &pb.Topic{
Labels: tc.Labels,
MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
KmsKeyName: tc.KMSKeyName,
SchemaSettings: schemaSettingsToProto(tc.SchemaSettings),
MessageRetentionDuration: retDur,
IngestionDataSourceSettings: tc.IngestionDataSourceSettings.toProto(),
MessageTransforms: messageTransformsToProto(tc.MessageTransforms),
}
return pbt
}
// TopicConfigToUpdate describes how to update a topic.
type TopicConfigToUpdate struct {
// If non-nil, the current set of labels is completely
// replaced by the new set.
Labels map[string]string
// If non-nil, the existing policy (containing the list of regions)
// is completely replaced by the new policy.
//
// Use the zero value &MessageStoragePolicy{} to reset the topic back to
// using the organization's Resource Location Restriction policy.
//
// If nil, the policy remains unchanged.
//
// This field has beta status. It is not subject to the stability guarantee
// and may change.
MessageStoragePolicy *MessageStoragePolicy
// If set to a positive duration between 10 minutes and 31 days, RetentionDuration is changed.
// If set to a negative value, this clears RetentionDuration from the topic.
// If nil, the retention duration remains unchanged.
RetentionDuration optional.Duration
// Schema defines the schema settings upon topic creation.
//
// Use the zero value &SchemaSettings{} to remove the schema from the topic.
SchemaSettings *SchemaSettings
// IngestionDataSourceSettings are settings for ingestion from a
// data source into this topic.
//
// When changing this value, the entire data source settings object must be applied,
// rather than just the differences. This includes the source and logging settings.
//
// Use the zero value &IngestionDataSourceSettings{} to remove the ingestion settings from the topic.
IngestionDataSourceSettings *IngestionDataSourceSettings
// If non-nil, the entire list of message transforms is replaced with the following.
MessageTransforms []MessageTransform
}
func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
tc := TopicConfig{
name: pbt.Name,
Labels: pbt.Labels,
MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
KMSKeyName: pbt.KmsKeyName,
SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings),
State: TopicState(pbt.State),
IngestionDataSourceSettings: protoToIngestionDataSourceSettings(pbt.IngestionDataSourceSettings),
MessageTransforms: protoToMessageTransforms(pbt.MessageTransforms),
}
if pbt.GetMessageRetentionDuration() != nil {
tc.RetentionDuration = pbt.GetMessageRetentionDuration().AsDuration()
}
return tc
}
// DetachSubscriptionResult is the response for the DetachSubscription method.
// Reserved for future use.
type DetachSubscriptionResult struct{}
// DetachSubscription detaches a subscription from its topic. All messages
// retained in the subscription are dropped. Subsequent `Pull` and `StreamingPull`
// requests will return FAILED_PRECONDITION. If the subscription is a push
// subscription, pushes to the endpoint will stop.
func (c *Client) DetachSubscription(ctx context.Context, sub string) (*DetachSubscriptionResult, error) {
_, err := c.pubc.DetachSubscription(ctx, &pb.DetachSubscriptionRequest{
Subscription: sub,
})
if err != nil {
return nil, err
}
return &DetachSubscriptionResult{}, nil
}
// MessageStoragePolicy constrains how messages published to the topic may be stored. It
// is determined when the topic is created based on the policy configured at
// the project level.
type MessageStoragePolicy struct {
// AllowedPersistenceRegions is the list of GCP regions where messages that are published
// to the topic may be persisted in storage. Messages published by publishers running in
// non-allowed GCP regions (or running outside of GCP altogether) will be
// routed for storage in one of the allowed regions.
//
// If empty, it indicates a misconfiguration at the project or organization level, which
// will result in all Publish operations failing. This field cannot be empty in updates.
//
// If nil, then the policy is not defined on a topic level. When used in updates, it resets
// the regions back to the organization level Resource Location Restriction policy.
//
// For more information, see
// https://cloud.google.com/pubsub/docs/resource-location-restriction#pubsub-storage-locations.
AllowedPersistenceRegions []string
}
func protoToMessageStoragePolicy(msp *pb.MessageStoragePolicy) MessageStoragePolicy {
if msp == nil {
return MessageStoragePolicy{}
}
return MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
}
func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePolicy {
if msp == nil || msp.AllowedPersistenceRegions == nil {
return nil
}
return &pb.MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
}
// IngestionDataSourceSettings enables ingestion from a data source into this topic.
type IngestionDataSourceSettings struct {
Source IngestionDataSource
PlatformLogsSettings *PlatformLogsSettings
}
// IngestionDataSource is the kind of ingestion source to be used.
type IngestionDataSource interface {
isIngestionDataSource() bool
}
// AWSKinesisState denotes the possible states for ingestion from Amazon Kinesis Data Streams.
type AWSKinesisState int
const (
// AWSKinesisStateUnspecified is the default value. This value is unused.
AWSKinesisStateUnspecified = iota
// AWSKinesisStateActive means ingestion is active.
AWSKinesisStateActive
// AWSKinesisStatePermissionDenied means encountering an error while consumign data from Kinesis.
// This can happen if:
// - The provided `aws_role_arn` does not exist or does not have the
// appropriate permissions attached.
// - The provided `aws_role_arn` is not set up properly for Identity
// Federation using `gcp_service_account`.
// - The Pub/Sub SA is not granted the
// `iam.serviceAccounts.getOpenIdToken` permission on
// `gcp_service_account`.
AWSKinesisStatePermissionDenied
// AWSKinesisStatePublishPermissionDenied means permission denied encountered while publishing to the topic.
// This can happen due to Pub/Sub SA has not been granted the appropriate publish
// permissions https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher
AWSKinesisStatePublishPermissionDenied
// AWSKinesisStateStreamNotFound means the Kinesis stream does not exist.
AWSKinesisStateStreamNotFound
// AWSKinesisStateConsumerNotFound means the Kinesis consumer does not exist.
AWSKinesisStateConsumerNotFound
)
// IngestionDataSourceAWSKinesis are ingestion settings for Amazon Kinesis Data Streams.
type IngestionDataSourceAWSKinesis struct {
// State is an output-only field indicating the state of the kinesis connection.
State AWSKinesisState
// StreamARN is the Kinesis stream ARN to ingest data from.
StreamARN string
// ConsumerARn is the Kinesis consumer ARN to used for ingestion in Enhanced
// Fan-Out mode. The consumer must be already created and ready to be used.
ConsumerARN string
// AWSRoleARn is the AWS role ARN to be used for Federated Identity authentication
// with Kinesis. Check the Pub/Sub docs for how to set up this role and the
// required permissions that need to be attached to it.
AWSRoleARN string
// GCPServiceAccount is the GCP service account to be used for Federated Identity
// authentication with Kinesis (via a `AssumeRoleWithWebIdentity` call for
// the provided role). The `aws_role_arn` must be set up with
// `accounts.google.com:sub` equals to this service account number.
GCPServiceAccount string
}
var _ IngestionDataSource = (*IngestionDataSourceAWSKinesis)(nil)
func (i *IngestionDataSourceAWSKinesis) isIngestionDataSource() bool {
return true
}
// CloudStorageIngestionState denotes the possible states for ingestion from Cloud Storage.
type CloudStorageIngestionState int
const (
// CloudStorageIngestionStateUnspecified is the default value. This value is unused.
CloudStorageIngestionStateUnspecified = iota
// CloudStorageIngestionStateActive means ingestion is active.
CloudStorageIngestionStateActive
// CloudStorageIngestionPermissionDenied means encountering an error while calling the Cloud Storage API.
// This can happen if the Pub/Sub SA has not been granted the
// [appropriate permissions](https://cloud.google.com/storage/docs/access-control/iam-permissions):
// - storage.objects.list: to list the objects in a bucket.
// - storage.objects.get: to read the objects in a bucket.
// - storage.buckets.get: to verify the bucket exists.
CloudStorageIngestionPermissionDenied
// CloudStorageIngestionPublishPermissionDenied means encountering an error when publishing to the topic.
// This can happen if the Pub/Sub SA has not been granted the [appropriate publish
// permissions](https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher)
CloudStorageIngestionPublishPermissionDenied
// CloudStorageIngestionBucketNotFound means the provided bucket doesn't exist.
CloudStorageIngestionBucketNotFound
// CloudStorageIngestionTooManyObjects means the bucket has too many objects, ingestion will be paused.
CloudStorageIngestionTooManyObjects
)
// IngestionDataSourceCloudStorage are ingestion settings for Cloud Storage.
type IngestionDataSourceCloudStorage struct {
// State is an output-only field indicating the state of the Cloud storage ingestion source.
State CloudStorageIngestionState
// Bucket is the Cloud Storage bucket. The bucket name must be without any
// prefix like "gs://". See the bucket naming requirements (https://cloud.google.com/storage/docs/buckets#naming).
Bucket string
// InputFormat is the format of objects in Cloud Storage.
// Defaults to TextFormat.
InputFormat ingestionDataSourceCloudStorageInputFormat
// MinimumObjectCreateTime means objects with a larger or equal creation timestamp will be
// ingested.
MinimumObjectCreateTime time.Time
// MatchGlob is the pattern used to match objects that will be ingested. If
// empty, all objects will be ingested. See the [supported
// patterns](https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-objects-and-prefixes-using-glob).
MatchGlob string
}
var _ IngestionDataSource = (*IngestionDataSourceCloudStorage)(nil)
func (i *IngestionDataSourceCloudStorage) isIngestionDataSource() bool {
return true
}
type ingestionDataSourceCloudStorageInputFormat interface {
isCloudStorageIngestionInputFormat() bool
}
var _ ingestionDataSourceCloudStorageInputFormat = (*IngestionDataSourceCloudStorageTextFormat)(nil)
var _ ingestionDataSourceCloudStorageInputFormat = (*IngestionDataSourceCloudStorageAvroFormat)(nil)
var _ ingestionDataSourceCloudStorageInputFormat = (*IngestionDataSourceCloudStoragePubSubAvroFormat)(nil)
// IngestionDataSourceCloudStorageTextFormat means Cloud Storage data will be interpreted as text.
type IngestionDataSourceCloudStorageTextFormat struct {
Delimiter string
}
func (i *IngestionDataSourceCloudStorageTextFormat) isCloudStorageIngestionInputFormat() bool {
return true
}
// IngestionDataSourceCloudStorageAvroFormat means Cloud Storage data will be interpreted in Avro format.
type IngestionDataSourceCloudStorageAvroFormat struct{}
func (i *IngestionDataSourceCloudStorageAvroFormat) isCloudStorageIngestionInputFormat() bool {
return true
}
// IngestionDataSourceCloudStoragePubSubAvroFormat is used assuming the data was written using Cloud
// Storage subscriptions https://cloud.google.com/pubsub/docs/cloudstorage.
type IngestionDataSourceCloudStoragePubSubAvroFormat struct{}
func (i *IngestionDataSourceCloudStoragePubSubAvroFormat) isCloudStorageIngestionInputFormat() bool {
return true
}
// EventHubsState denotes the possible states for ingestion from Event Hubs.
type EventHubsState int
const (
// EventHubsStateUnspecified is the default value. This value is unused.
EventHubsStateUnspecified = iota
// EventHubsStateActive means the state is active.
EventHubsStateActive
// EventHubsStatePermissionDenied indicates encountered permission denied error
// while consuming data from Event Hubs.
// This can happen when `client_id`, or `tenant_id` are invalid. Or the
// right permissions haven't been granted.
EventHubsStatePermissionDenied
// EventHubsStatePublishPermissionDenied indicates permission denied encountered
// while publishing to the topic.
EventHubsStatePublishPermissionDenied
// EventHubsStateNamespaceNotFound indicates the provided Event Hubs namespace couldn't be found.
EventHubsStateNamespaceNotFound
// EventHubsStateNotFound indicates the provided Event Hub couldn't be found.
EventHubsStateNotFound
// EventHubsStateSubscriptionNotFound indicates the provided Event Hubs subscription couldn't be found.
EventHubsStateSubscriptionNotFound
// EventHubsStateResourceGroupNotFound indicates the provided Event Hubs resource group couldn't be found.
EventHubsStateResourceGroupNotFound
)
// IngestionDataSourceAzureEventHubs are ingestion settings for Azure Event Hubs.
type IngestionDataSourceAzureEventHubs struct {
// Output only field that indicates the state of the Event Hubs ingestion source.
State EventHubsState
// Name of the resource group within the Azure subscription
ResourceGroup string
// Name of the Event Hubs namespace
Namespace string
// Rame of the Event Hub.
EventHub string
// Client ID of the Azure application that is being used to authenticate Pub/Sub.
ClientID string
// Tenant ID of the Azure application that is being used to authenticate Pub/Sub.
TenantID string
// The Azure subscription ID
SubscriptionID string
// GCPServiceAccount is the GCP service account to be used for Federated Identity
// authentication.
GCPServiceAccount string
}
var _ IngestionDataSource = (*IngestionDataSourceAzureEventHubs)(nil)
func (i *IngestionDataSourceAzureEventHubs) isIngestionDataSource() bool {
return true
}
// AmazonMSKState denotes the possible states for ingestion from Amazon MSK.
type AmazonMSKState int
const (
// AmazonMSKStateUnspecified is the default value. This value is unused.
AmazonMSKStateUnspecified = iota
// AmazonMSKActive indicates MSK topic is active.
AmazonMSKActive
// AmazonMSKPermissionDenied indicates permission denied encountered while consuming data from Amazon MSK.
AmazonMSKPermissionDenied
// AmazonMSKPublishPermissionDenied indicates permission denied encountered while publishing to the topic.
AmazonMSKPublishPermissionDenied
// AmazonMSKClusterNotFound indicates the provided Msk cluster wasn't found.
AmazonMSKClusterNotFound
// AmazonMSKTopicNotFound indicates the provided topic wasn't found.
AmazonMSKTopicNotFound
)
// IngestionDataSourceAmazonMSK are ingestion settings for Amazon MSK.
type IngestionDataSourceAmazonMSK struct {
// An output-only field that indicates the state of the Amazon
// MSK ingestion source.
State AmazonMSKState
// The Amazon Resource Name (ARN) that uniquely identifies the
// cluster.
ClusterARN string
// The name of the topic in the Amazon MSK cluster that Pub/Sub
// will import from.
Topic string
// AWS role ARN to be used for Federated Identity authentication
// with Amazon MSK. Check the Pub/Sub docs for how to set up this role and
// the required permissions that need to be attached to it.
AWSRoleARN string
// The GCP service account to be used for Federated Identity
// authentication with Amazon MSK (via a `AssumeRoleWithWebIdentity` call
// for the provided role). The `aws_role_arn` must be set up with
// `accounts.google.com:sub` equals to this service account number.
GCPServiceAccount string
}
var _ IngestionDataSource = (*IngestionDataSourceAmazonMSK)(nil)
func (i *IngestionDataSourceAmazonMSK) isIngestionDataSource() bool {
return true
}
// ConfluentCloudState denotes state of ingestion topic with confluent cloud
type ConfluentCloudState int
const (
// ConfluentCloudStateUnspecified is the default value. This value is unused.
ConfluentCloudStateUnspecified = iota
// ConfluentCloudActive indicates the state is active.
ConfluentCloudActive = 1
// ConfluentCloudPermissionDenied indicates permission denied encountered
// while consuming data from Confluent Cloud.
ConfluentCloudPermissionDenied = 2
// ConfluentCloudPublishPermissionDenied indicates permission denied encountered
// while publishing to the topic.
ConfluentCloudPublishPermissionDenied = 3
// ConfluentCloudUnreachableBootstrapServer indicates the provided bootstrap
// server address is unreachable.
ConfluentCloudUnreachableBootstrapServer = 4
// ConfluentCloudClusterNotFound indicates the provided cluster wasn't found.
ConfluentCloudClusterNotFound = 5
// ConfluentCloudTopicNotFound indicates the provided topic wasn't found.
ConfluentCloudTopicNotFound = 6
)
// IngestionDataSourceConfluentCloud are ingestion settings for confluent cloud.
type IngestionDataSourceConfluentCloud struct {
// An output-only field that indicates the state of the
// Confluent Cloud ingestion source.
State ConfluentCloudState
// The address of the bootstrap server. The format is url:port.
BootstrapServer string
// The id of the cluster.
ClusterID string
// The name of the topic in the Confluent Cloud cluster that
// Pub/Sub will import from.
Topic string
// The id of the identity pool to be used for Federated Identity
// authentication with Confluent Cloud. See
// https://docs.confluent.io/cloud/current/security/authenticate/workload-identities/identity-providers/oauth/identity-pools.html#add-oauth-identity-pools.
IdentityPoolID string
// The GCP service account to be used for Federated Identity
// authentication with `identity_pool_id`.
GCPServiceAccount string
}
var _ IngestionDataSource = (*IngestionDataSourceConfluentCloud)(nil)
func (i *IngestionDataSourceConfluentCloud) isIngestionDataSource() bool {
return true
}
func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *IngestionDataSourceSettings {
if pbs == nil {
return nil
}
s := &IngestionDataSourceSettings{}
if k := pbs.GetAwsKinesis(); k != nil {
s.Source = &IngestionDataSourceAWSKinesis{
State: AWSKinesisState(k.State),
StreamARN: k.GetStreamArn(),
ConsumerARN: k.GetConsumerArn(),
AWSRoleARN: k.GetAwsRoleArn(),
GCPServiceAccount: k.GetGcpServiceAccount(),
}
} else if cs := pbs.GetCloudStorage(); cs != nil {
var format ingestionDataSourceCloudStorageInputFormat
switch t := cs.InputFormat.(type) {
case *pb.IngestionDataSourceSettings_CloudStorage_TextFormat_:
format = &IngestionDataSourceCloudStorageTextFormat{
Delimiter: *t.TextFormat.Delimiter,
}
case *pb.IngestionDataSourceSettings_CloudStorage_AvroFormat_:
format = &IngestionDataSourceCloudStorageAvroFormat{}
case *pb.IngestionDataSourceSettings_CloudStorage_PubsubAvroFormat:
format = &IngestionDataSourceCloudStoragePubSubAvroFormat{}
}
s.Source = &IngestionDataSourceCloudStorage{
State: CloudStorageIngestionState(cs.GetState()),
Bucket: cs.GetBucket(),
InputFormat: format,
MinimumObjectCreateTime: cs.GetMinimumObjectCreateTime().AsTime(),
MatchGlob: cs.GetMatchGlob(),
}
} else if e := pbs.GetAzureEventHubs(); e != nil {
s.Source = &IngestionDataSourceAzureEventHubs{
State: EventHubsState(e.GetState()),
ResourceGroup: e.GetResourceGroup(),
Namespace: e.GetNamespace(),
EventHub: e.GetEventHub(),
ClientID: e.GetClientId(),
TenantID: e.GetTenantId(),
SubscriptionID: e.GetSubscriptionId(),
GCPServiceAccount: e.GetGcpServiceAccount(),
}
} else if m := pbs.GetAwsMsk(); m != nil {
s.Source = &IngestionDataSourceAmazonMSK{
State: AmazonMSKState(m.GetState()),
ClusterARN: m.GetClusterArn(),
Topic: m.GetTopic(),
AWSRoleARN: m.GetAwsRoleArn(),
GCPServiceAccount: m.GetGcpServiceAccount(),
}
} else if c := pbs.GetConfluentCloud(); c != nil {
s.Source = &IngestionDataSourceConfluentCloud{
State: ConfluentCloudState(c.GetState()),
BootstrapServer: c.GetBootstrapServer(),
ClusterID: c.GetClusterId(),
Topic: c.GetTopic(),
IdentityPoolID: c.GetIdentityPoolId(),
GCPServiceAccount: c.GetGcpServiceAccount(),
}
}
if pbs.PlatformLogsSettings != nil {
s.PlatformLogsSettings = &PlatformLogsSettings{
Severity: PlatformLogsSeverity(pbs.PlatformLogsSettings.Severity),
}
}
return s
}
func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings {
if i == nil {
return nil
}
// An empty/zero-valued config is treated the same as nil and clearing this setting.
if (IngestionDataSourceSettings{}) == *i {
return nil
}
pbs := &pb.IngestionDataSourceSettings{}
if i.PlatformLogsSettings != nil {
pbs.PlatformLogsSettings = &pb.PlatformLogsSettings{
Severity: pb.PlatformLogsSettings_Severity(i.PlatformLogsSettings.Severity),
}
}
if out := i.Source; out != nil {
if k, ok := out.(*IngestionDataSourceAWSKinesis); ok {
pbs.Source = &pb.IngestionDataSourceSettings_AwsKinesis_{
AwsKinesis: &pb.IngestionDataSourceSettings_AwsKinesis{
State: pb.IngestionDataSourceSettings_AwsKinesis_State(k.State),
StreamArn: k.StreamARN,
ConsumerArn: k.ConsumerARN,
AwsRoleArn: k.AWSRoleARN,
GcpServiceAccount: k.GCPServiceAccount,
},
}
}
if cs, ok := out.(*IngestionDataSourceCloudStorage); ok {
switch format := cs.InputFormat.(type) {
case *IngestionDataSourceCloudStorageTextFormat:
pbs.Source = &pb.IngestionDataSourceSettings_CloudStorage_{
CloudStorage: &pb.IngestionDataSourceSettings_CloudStorage{
State: pb.IngestionDataSourceSettings_CloudStorage_State(cs.State),
Bucket: cs.Bucket,
InputFormat: &pb.IngestionDataSourceSettings_CloudStorage_TextFormat_{
TextFormat: &pb.IngestionDataSourceSettings_CloudStorage_TextFormat{
Delimiter: &format.Delimiter,
},
},
MinimumObjectCreateTime: timestamppb.New(cs.MinimumObjectCreateTime),
MatchGlob: cs.MatchGlob,
},
}
case *IngestionDataSourceCloudStorageAvroFormat:
pbs.Source = &pb.IngestionDataSourceSettings_CloudStorage_{
CloudStorage: &pb.IngestionDataSourceSettings_CloudStorage{
Bucket: cs.Bucket,
InputFormat: &pb.IngestionDataSourceSettings_CloudStorage_AvroFormat_{
AvroFormat: &pb.IngestionDataSourceSettings_CloudStorage_AvroFormat{},
},
MinimumObjectCreateTime: timestamppb.New(cs.MinimumObjectCreateTime),
MatchGlob: cs.MatchGlob,
},
}
case *IngestionDataSourceCloudStoragePubSubAvroFormat:
pbs.Source = &pb.IngestionDataSourceSettings_CloudStorage_{
CloudStorage: &pb.IngestionDataSourceSettings_CloudStorage{
State: pb.IngestionDataSourceSettings_CloudStorage_State(cs.State),
Bucket: cs.Bucket,
InputFormat: &pb.IngestionDataSourceSettings_CloudStorage_PubsubAvroFormat{
PubsubAvroFormat: &pb.IngestionDataSourceSettings_CloudStorage_PubSubAvroFormat{},
},
MinimumObjectCreateTime: timestamppb.New(cs.MinimumObjectCreateTime),
MatchGlob: cs.MatchGlob,
},
}
}
}
if e, ok := out.(*IngestionDataSourceAzureEventHubs); ok {
pbs.Source = &pb.IngestionDataSourceSettings_AzureEventHubs_{
AzureEventHubs: &pb.IngestionDataSourceSettings_AzureEventHubs{
ResourceGroup: e.ResourceGroup,
Namespace: e.Namespace,
EventHub: e.EventHub,
ClientId: e.ClientID,
TenantId: e.TenantID,
SubscriptionId: e.SubscriptionID,
GcpServiceAccount: e.GCPServiceAccount,
},
}
}
if m, ok := out.(*IngestionDataSourceAmazonMSK); ok {
pbs.Source = &pb.IngestionDataSourceSettings_AwsMsk_{
AwsMsk: &pb.IngestionDataSourceSettings_AwsMsk{
ClusterArn: m.ClusterARN,
Topic: m.Topic,
AwsRoleArn: m.AWSRoleARN,
GcpServiceAccount: m.GCPServiceAccount,
},
}
}
if c, ok := out.(*IngestionDataSourceConfluentCloud); ok {
pbs.Source = &pb.IngestionDataSourceSettings_ConfluentCloud_{
ConfluentCloud: &pb.IngestionDataSourceSettings_ConfluentCloud{
BootstrapServer: c.BootstrapServer,
ClusterId: c.ClusterID,
Topic: c.Topic,
IdentityPoolId: c.IdentityPoolID,
GcpServiceAccount: c.GCPServiceAccount,
},
}
}
}
return pbs
}
// PlatformLogsSettings configures logging produced by Pub/Sub.
// Currently only valid on Cloud Storage ingestion topics.
type PlatformLogsSettings struct {
Severity PlatformLogsSeverity
}
// PlatformLogsSeverity are the severity levels of Platform Logs.
type PlatformLogsSeverity int32
const (
// PlatformLogsSeverityUnspecified is the default value. Logs level is unspecified. Logs will be disabled.
PlatformLogsSeverityUnspecified PlatformLogsSeverity = iota
// PlatformLogsSeverityDisabled means logs will be disabled.
PlatformLogsSeverityDisabled
// PlatformLogsSeverityDebug means debug logs and higher-severity logs will be written.
PlatformLogsSeverityDebug
// PlatformLogsSeverityInfo means info logs and higher-severity logs will be written.
PlatformLogsSeverityInfo
// PlatformLogsSeverityWarning means warning logs and higher-severity logs will be written.
PlatformLogsSeverityWarning
// PlatformLogsSeverityError means only error logs will be written.
PlatformLogsSeverityError
)
// Config returns the TopicConfig for the topic.
func (t *Topic) Config(ctx context.Context) (TopicConfig, error) {
pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
if err != nil {
return TopicConfig{}, err
}
return protoToTopicConfig(pbt), nil
}
// Update changes an existing topic according to the fields set in cfg. It returns
// the new TopicConfig.
func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error) {
req := t.updateRequest(cfg)
if len(req.UpdateMask.Paths) == 0 {
return TopicConfig{}, errors.New("pubsub: UpdateTopic call with nothing to update")
}
rpt, err := t.c.pubc.UpdateTopic(ctx, req)
if err != nil {
return TopicConfig{}, err
}
return protoToTopicConfig(rpt), nil
}
func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
pt := &pb.Topic{Name: t.name}
var paths []string
if cfg.Labels != nil {
pt.Labels = cfg.Labels
paths = append(paths, "labels")
}
if cfg.MessageStoragePolicy != nil {
pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy)
paths = append(paths, "message_storage_policy")
}
if cfg.RetentionDuration != nil {
r := optional.ToDuration(cfg.RetentionDuration)
pt.MessageRetentionDuration = durationpb.New(r)
if r < 0 {
// Clear MessageRetentionDuration if sentinel value is read.
pt.MessageRetentionDuration = nil
}
paths = append(paths, "message_retention_duration")
}
// Updating SchemaSettings' field masks are more complicated here
// since each field should be able to be independently edited, while
// preserving the current values for everything else. We also denote
// the zero value SchemaSetting to mean clearing or removing schema
// from the topic.
if cfg.SchemaSettings != nil {
pt.SchemaSettings = schemaSettingsToProto(cfg.SchemaSettings)
clearSchema := true
if pt.SchemaSettings.Schema != "" {
paths = append(paths, "schema_settings.schema")
clearSchema = false
}
if pt.SchemaSettings.Encoding != pb.Encoding_ENCODING_UNSPECIFIED {
paths = append(paths, "schema_settings.encoding")
clearSchema = false
}
if pt.SchemaSettings.FirstRevisionId != "" {
paths = append(paths, "schema_settings.first_revision_id")
clearSchema = false
}
if pt.SchemaSettings.LastRevisionId != "" {
paths = append(paths, "schema_settings.last_revision_id")
clearSchema = false
}
// Clear the schema if all of its values are equal to the zero value.
if clearSchema {
paths = append(paths, "schema_settings")
pt.SchemaSettings = nil
}
}
if cfg.IngestionDataSourceSettings != nil {
pt.IngestionDataSourceSettings = cfg.IngestionDataSourceSettings.toProto()
paths = append(paths, "ingestion_data_source_settings")
}
if cfg.MessageTransforms != nil {
pt.MessageTransforms = messageTransformsToProto(cfg.MessageTransforms)
paths = append(paths, "message_transforms")
}
return &pb.UpdateTopicRequest{
Topic: pt,
UpdateMask: &fmpb.FieldMask{Paths: paths},
}
}
// Topics returns an iterator which returns all of the topics for the client's project.
func (c *Client) Topics(ctx context.Context) *TopicIterator {
it := c.pubc.ListTopics(ctx, &pb.ListTopicsRequest{Project: c.fullyQualifiedProjectName()})
return &TopicIterator{
c: c,
it: it,
next: func() (string, error) {
topic, err := it.Next()
if err != nil {
return "", err
}
return topic.Name, nil
},
}
}
// TopicIterator is an iterator that returns a series of topics.
type TopicIterator struct {
c *Client
it *vkit.TopicIterator
next func() (string, error)
}
// Next returns the next topic. If there are no more topics, iterator.Done will be returned.
func (tps *TopicIterator) Next() (*Topic, error) {
topicName, err := tps.next()
if err != nil {
return nil, err
}
return newTopic(tps.c, topicName), nil
}
// NextConfig returns the next topic config. If there are no more topics,
// iterator.Done will be returned.
// This call shares the underlying iterator with calls to `TopicIterator.Next`.
// If you wish to use mix calls, create separate iterator instances for both.
func (t *TopicIterator) NextConfig() (*TopicConfig, error) {
tpb, err := t.it.Next()
if err != nil {
return nil, err
}
cfg := protoToTopicConfig(tpb)
return &cfg, nil
}
// ID returns the unique identifier of the topic within its project.
func (t *Topic) ID() string {
slash := strings.LastIndex(t.name, "/")
if slash == -1 {
// name is not a fully-qualified name.
panic("bad topic name")
}
return t.name[slash+1:]
}
// String returns the printable globally unique name for the topic.
func (t *Topic) String() string {
return t.name
}
// Delete deletes the topic.
func (t *Topic) Delete(ctx context.Context) error {
return t.c.pubc.DeleteTopic(ctx, &pb.DeleteTopicRequest{Topic: t.name})
}
// Exists reports whether the topic exists on the server.
func (t *Topic) Exists(ctx context.Context) (bool, error) {
if t.name == "_deleted-topic_" {
return false, nil
}
_, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
if err == nil {
return true, nil
}
if status.Code(err) == codes.NotFound {
return false, nil
}
return false, err
}
// IAM returns the topic's IAM handle.
func (t *Topic) IAM() *iam.Handle {
return iam.InternalNewHandle(t.c.pubc.Connection(), t.name)
}
// Subscriptions returns an iterator which returns the subscriptions for this topic.
//
// Some of the returned subscriptions may belong to a project other than t.
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator {
it := t.c.pubc.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{
Topic: t.name,
})
return &SubscriptionIterator{
c: t.c,
next: it.Next,
}
}
// ErrTopicStopped indicates that topic has been stopped and further publishing will fail.
var ErrTopicStopped = errors.New("pubsub: Stop has been called for this topic")
// A PublishResult holds the result from a call to Publish.
//
// Call Get to obtain the result of the Publish call. Example:
//
// // Get blocks until Publish completes or ctx is done.
// id, err := r.Get(ctx)
// if err != nil {
// // TODO: Handle error.
// }
type PublishResult = ipubsub.PublishResult
var errTopicOrderingNotEnabled = errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")
// Publish publishes msg to the topic asynchronously. Messages are batched and
// sent according to the topic's PublishSettings. Publish never blocks.
//
// Publish returns a non-nil PublishResult which will be ready when the
// message has been sent (or has failed to be sent) to the server.
//
// Publish creates goroutines for batching and sending messages. These goroutines
// need to be stopped by calling t.Stop(). Once stopped, future calls to Publish
// will immediately return a PublishResult with an error.
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
var createSpan trace.Span
if t.enableTracing {
opts := getPublishSpanAttributes(t.c.projectID, t.ID(), msg)
opts = append(opts, trace.WithAttributes(semconv.CodeFunction("Publish")))
ctx, createSpan = startSpan(ctx, createSpanName, t.ID(), opts...)
}
ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
if err != nil {
log.Printf("pubsub: cannot create context with tag in Publish: %v", err)
}
r := ipubsub.NewPublishResult()
if !t.EnableMessageOrdering && msg.OrderingKey != "" {
ipubsub.SetPublishResult(r, "", errTopicOrderingNotEnabled)
spanRecordError(createSpan, errTopicOrderingNotEnabled)
return r
}
// Calculate the size of the encoded proto message by accounting
// for the length of an individual PubSubMessage and Data/Attributes field.
msgSize := proto.Size(&pb.PubsubMessage{
Data: msg.Data,
Attributes: msg.Attributes,
OrderingKey: msg.OrderingKey,
})
if t.enableTracing {
createSpan.SetAttributes(semconv.MessagingMessageBodySize(len(msg.Data)))
}
t.initBundler()
t.mu.RLock()
defer t.mu.RUnlock()
if t.stopped {
ipubsub.SetPublishResult(r, "", ErrTopicStopped)
spanRecordError(createSpan, ErrTopicStopped)
return r
}
var batcherSpan trace.Span
var fcSpan trace.Span
if t.enableTracing {
_, fcSpan = startSpan(ctx, publishFCSpanName, "")
}
if err := t.flowController.acquire(ctx, msgSize); err != nil {
t.scheduler.Pause(msg.OrderingKey)
ipubsub.SetPublishResult(r, "", err)
spanRecordError(fcSpan, err)
return r
}
if t.enableTracing {
fcSpan.End()
}
bmsg := &bundledMessage{
msg: msg,
res: r,
size: msgSize,
createSpan: createSpan,
}
if t.enableTracing {
_, batcherSpan = startSpan(ctx, batcherSpanName, "")
bmsg.batcherSpan = batcherSpan
// Inject the context from the first publish span rather than from flow control / batching.
injectPropagation(ctx, msg)
}
if err := t.scheduler.Add(msg.OrderingKey, bmsg, msgSize); err != nil {
t.scheduler.Pause(msg.OrderingKey)
ipubsub.SetPublishResult(r, "", err)
spanRecordError(createSpan, err)
}
return r
}
// Stop sends all remaining published messages and stop goroutines created for handling
// publishing. Returns once all outstanding messages have been sent or have
// failed to be sent.
func (t *Topic) Stop() {
t.mu.Lock()
noop := t.stopped || t.scheduler == nil
t.stopped = true
t.mu.Unlock()
if noop {
return
}
t.scheduler.FlushAndStop()
}
// Flush blocks until all remaining messages are sent.
func (t *Topic) Flush() {
if t.stopped || t.scheduler == nil {
return
}
t.scheduler.Flush()
}
type bundledMessage struct {
msg *Message
res *PublishResult
size int
// createSpan is the entire publish createSpan (from user calling Publish to the publish RPC resolving).
createSpan trace.Span
// batcherSpan traces the message batching operation in publish scheduler.
batcherSpan trace.Span
}
func (t *Topic) initBundler() {
t.mu.RLock()
noop := t.stopped || t.scheduler != nil
t.mu.RUnlock()
if noop {
return
}
t.mu.Lock()
defer t.mu.Unlock()
// Must re-check, since we released the lock.
if t.stopped || t.scheduler != nil {
return
}
timeout := t.PublishSettings.Timeout
workers := t.PublishSettings.NumGoroutines
// Unless overridden, allow many goroutines per CPU to call the Publish RPC
// concurrently. The default value was determined via extensive load
// testing (see the loadtest subdirectory).
if t.PublishSettings.NumGoroutines == 0 {
workers = 25 * runtime.GOMAXPROCS(0)
}
t.scheduler = scheduler.NewPublishScheduler(workers, func(bundle interface{}) {
// Use a context detached from the one passed to NewClient.
ctx := context.Background()
if timeout != 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
bmsgs := bundle.([]*bundledMessage)
if t.enableTracing {
for _, m := range bmsgs {
m.batcherSpan.End()
m.createSpan.AddEvent(eventPublishStart, trace.WithAttributes(semconv.MessagingBatchMessageCount(len(bmsgs))))
}
}
t.publishMessageBundle(ctx, bmsgs)
if t.enableTracing {
for _, m := range bmsgs {
m.createSpan.AddEvent(eventPublishEnd)
m.createSpan.End()
}
}
})
t.scheduler.DelayThreshold = t.PublishSettings.DelayThreshold
t.scheduler.BundleCountThreshold = t.PublishSettings.CountThreshold
if t.scheduler.BundleCountThreshold > MaxPublishRequestCount {
t.scheduler.BundleCountThreshold = MaxPublishRequestCount
}
t.scheduler.BundleByteThreshold = t.PublishSettings.ByteThreshold
fcs := DefaultPublishSettings.FlowControlSettings
fcs.LimitExceededBehavior = t.PublishSettings.FlowControlSettings.LimitExceededBehavior
if t.PublishSettings.FlowControlSettings.MaxOutstandingBytes > 0 {
b := t.PublishSettings.FlowControlSettings.MaxOutstandingBytes
fcs.MaxOutstandingBytes = b
// If MaxOutstandingBytes is set, disable BufferedByteLimit by setting it to maxint.
// This is because there's no way to set "unlimited" for BufferedByteLimit,
// and simply setting it to MaxOutstandingBytes occasionally leads to issues where
// BufferedByteLimit is reached even though there are resources available.
t.PublishSettings.BufferedByteLimit = maxInt
}
if t.PublishSettings.FlowControlSettings.MaxOutstandingMessages > 0 {
fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages
}
t.flowController = newTopicFlowController(fcs)
bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit
if t.PublishSettings.BufferedByteLimit > 0 {
bufferedByteLimit = t.PublishSettings.BufferedByteLimit
}
t.scheduler.BufferedByteLimit = bufferedByteLimit
// Calculate the max limit of a single bundle. 5 comes from the number of bytes
// needed to be reserved for encoding the PubsubMessage repeated field.
t.scheduler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) - 5
}
// ErrPublishingPaused is a custom error indicating that the publish paused for the specified ordering key.
type ErrPublishingPaused struct {
OrderingKey string
}
func (e ErrPublishingPaused) Error() string {
return fmt.Sprintf("pubsub: Publishing for ordering key, %s, paused due to previous error. Call topic.ResumePublish(orderingKey) before resuming publishing", e.OrderingKey)
}
func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) {
ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
if err != nil {
log.Printf("pubsub: cannot create context with tag in publishMessageBundle: %v", err)
}
numMsgs := len(bms)
pbMsgs := make([]*pb.PubsubMessage, numMsgs)
var orderingKey string
if numMsgs != 0 {
// extract the ordering key for this batch. since
// messages in the same batch share the same ordering
// key, it doesn't matter which we read from.
orderingKey = bms[0].msg.OrderingKey
}
if t.enableTracing {
links := make([]trace.Link, 0, numMsgs)
for _, bm := range bms {
if bm.createSpan.SpanContext().IsSampled() {
links = append(links, trace.Link{SpanContext: bm.createSpan.SpanContext()})
}
}
projectID, topicID := parseResourceName(t.name)
var pSpan trace.Span
opts := getCommonOptions(projectID, topicID)
// Add link to publish RPC span of createSpan(s).
opts = append(opts, trace.WithLinks(links...))
opts = append(
opts,
trace.WithAttributes(
semconv.MessagingBatchMessageCount(numMsgs),
semconv.CodeFunction("publishMessageBundle"),
),
)
ctx, pSpan = startSpan(ctx, publishRPCSpanName, topicID, opts...)
defer pSpan.End()
// Add the reverse link to createSpan(s) of publish RPC span.
if pSpan.SpanContext().IsSampled() {
for _, bm := range bms {
bm.createSpan.AddLink(trace.Link{
SpanContext: pSpan.SpanContext(),
Attributes: []attribute.KeyValue{
semconv.MessagingOperationName(publishRPCSpanName),
},
})
}
}
}
var batchSize int
for i, bm := range bms {
pbMsgs[i] = &pb.PubsubMessage{
Data: bm.msg.Data,
Attributes: bm.msg.Attributes,
OrderingKey: bm.msg.OrderingKey,
}
batchSize = batchSize + proto.Size(pbMsgs[i])
bm.msg = nil // release bm.msg for GC
}
var res *pb.PublishResponse
start := time.Now()
if orderingKey != "" && t.scheduler.IsPaused(orderingKey) {
err = ErrPublishingPaused{OrderingKey: orderingKey}
} else {
// Apply custom publish retryer on top of user specified retryer and
// default retryer.
opts := t.c.pubc.CallOptions.Publish
var settings gax.CallSettings
for _, opt := range opts {
opt.Resolve(&settings)
}
r := &publishRetryer{defaultRetryer: settings.Retry()}
gaxOpts := []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
gax.WithRetry(func() gax.Retryer { return r }),
}
if t.PublishSettings.shouldCompress(batchSize) {
gaxOpts = append(gaxOpts, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
}
res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{
Topic: t.name,
Messages: pbMsgs,
}, gaxOpts...)
}
end := time.Now()
if err != nil {
t.scheduler.Pause(orderingKey)
// Update context with error tag for OpenCensus,
// using same stats.Record() call as success case.
ctx, _ = tag.New(ctx, tag.Upsert(keyStatus, "ERROR"),
tag.Upsert(keyError, err.Error()))
}
stats.Record(ctx,
PublishLatency.M(float64(end.Sub(start)/time.Millisecond)),
PublishedMessages.M(int64(len(bms))))
for i, bm := range bms {
t.flowController.release(ctx, bm.size)
if err != nil {
ipubsub.SetPublishResult(bm.res, "", err)
spanRecordError(bm.createSpan, err)
} else {
ipubsub.SetPublishResult(bm.res, res.MessageIds[i], nil)
if t.enableTracing {
bm.createSpan.SetAttributes(semconv.MessagingMessageIDKey.String(res.MessageIds[i]))
}
}
}
}
// ResumePublish resumes accepting messages for the provided ordering key.
// Publishing using an ordering key might be paused if an error is
// encountered while publishing, to prevent messages from being published
// out of order.
func (t *Topic) ResumePublish(orderingKey string) {
t.mu.RLock()
noop := t.scheduler == nil
t.mu.RUnlock()
if noop {
return
}
t.scheduler.Resume(orderingKey)
}