feat(pubsub): support publisher compression (#9711)
* feat(pubsub): support publisher compression
* address review comments
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index 735ae46..6c83055 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -2177,6 +2177,32 @@
}
}
+func TestIntegration_PublishCompression(t *testing.T) {
+ ctx := context.Background()
+ client := integrationTestClient(ctx, t)
+ defer client.Close()
+
+ topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer topic.Delete(ctx)
+ defer topic.Stop()
+
+ topic.PublishSettings.EnableCompression = true
+ topic.PublishSettings.CompressionBytesThreshold = 50
+
+ const messageSizeBytes = 1000
+
+ msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))}
+ res := topic.Publish(ctx, msg)
+
+ _, err = res.Get(ctx)
+ if err != nil {
+ t.Errorf("publish result got err: %v", err)
+ }
+}
+
// createTopicWithRetry creates a topic, wrapped with testutil.Retry and returns the created topic or an error.
func createTopicWithRetry(ctx context.Context, t *testing.T, c *Client, topicID string, cfg *TopicConfig) (*Topic, error) {
var topic *Topic
diff --git a/pubsub/topic.go b/pubsub/topic.go
index b85b4b1..b953cb4 100644
--- a/pubsub/topic.go
+++ b/pubsub/topic.go
@@ -36,6 +36,7 @@
"google.golang.org/api/support/bundler"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
@@ -117,6 +118,17 @@
// FlowControlSettings defines publisher flow control settings.
FlowControlSettings FlowControlSettings
+
+ // EnableCompression enables transport compression for Publish operations
+ EnableCompression bool
+
+ // CompressionBytesThreshold defines the threshold (in bytes) above which messages
+ // are compressed for transport. Only takes effect if EnableCompression is true.
+ CompressionBytesThreshold int
+}
+
+func (ps *PublishSettings) shouldCompress(batchSize int) bool {
+ return ps.EnableCompression && batchSize > ps.CompressionBytesThreshold
}
// DefaultPublishSettings holds the default values for topics' PublishSettings.
@@ -134,6 +146,10 @@
MaxOutstandingBytes: -1,
LimitExceededBehavior: FlowControlIgnore,
},
+ // Publisher compression defaults matches Java's defaults
+ // https://github.com/googleapis/java-pubsub/blob/7d33e7891db1b2e32fd523d7655b6c11ea140a8b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L717-L718
+ EnableCompression: false,
+ CompressionBytesThreshold: 240,
}
// CreateTopic creates a new topic.
@@ -875,6 +891,7 @@
}
pbMsgs := make([]*pb.PubsubMessage, len(bms))
var orderingKey string
+ batchSize := 0
for i, bm := range bms {
orderingKey = bm.msg.OrderingKey
pbMsgs[i] = &pb.PubsubMessage{
@@ -882,6 +899,7 @@
Attributes: bm.msg.Attributes,
OrderingKey: bm.msg.OrderingKey,
}
+ batchSize = batchSize + proto.Size(pbMsgs[i])
bm.msg = nil // release bm.msg for GC
}
var res *pb.PublishResponse
@@ -897,11 +915,17 @@
opt.Resolve(&settings)
}
r := &publishRetryer{defaultRetryer: settings.Retry()}
+ gaxOpts := []gax.CallOption{
+ gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
+ gax.WithRetry(func() gax.Retryer { return r }),
+ }
+ if t.PublishSettings.shouldCompress(batchSize) {
+ gaxOpts = append(gaxOpts, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
+ }
res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{
Topic: t.name,
Messages: pbMsgs,
- }, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
- gax.WithRetry(func() gax.Retryer { return r }))
+ }, gaxOpts...)
}
end := time.Now()
if err != nil {
diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go
index eb53bef..974e0c4 100644
--- a/pubsub/topic_test.go
+++ b/pubsub/topic_test.go
@@ -747,3 +747,26 @@
t.Errorf("got %v, want errTopicOrderingNotEnabled", err)
}
}
+
+func TestPublishCompression(t *testing.T) {
+ ctx := context.Background()
+ client, srv := newFake(t)
+ defer client.Close()
+ defer srv.Close()
+
+ topic := mustCreateTopic(t, client, "topic-compression")
+ defer topic.Stop()
+
+ topic.PublishSettings.EnableCompression = true
+ topic.PublishSettings.CompressionBytesThreshold = 50
+
+ const messageSizeBytes = 1000
+
+ msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))}
+ res := topic.Publish(ctx, msg)
+
+ _, err := res.Get(ctx)
+ if err != nil {
+ t.Errorf("publish result got err: %v", err)
+ }
+}