feat(pubsub): support publisher compression (#9711)

* feat(pubsub): support publisher compression

* address review comments
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index 735ae46..6c83055 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -2177,6 +2177,32 @@
 	}
 }
 
+func TestIntegration_PublishCompression(t *testing.T) {
+	ctx := context.Background()
+	client := integrationTestClient(ctx, t)
+	defer client.Close()
+
+	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer topic.Delete(ctx)
+	defer topic.Stop()
+
+	topic.PublishSettings.EnableCompression = true
+	topic.PublishSettings.CompressionBytesThreshold = 50
+
+	const messageSizeBytes = 1000
+
+	msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))}
+	res := topic.Publish(ctx, msg)
+
+	_, err = res.Get(ctx)
+	if err != nil {
+		t.Errorf("publish result got err: %v", err)
+	}
+}
+
 // createTopicWithRetry creates a topic, wrapped with testutil.Retry and returns the created topic or an error.
 func createTopicWithRetry(ctx context.Context, t *testing.T, c *Client, topicID string, cfg *TopicConfig) (*Topic, error) {
 	var topic *Topic
diff --git a/pubsub/topic.go b/pubsub/topic.go
index b85b4b1..b953cb4 100644
--- a/pubsub/topic.go
+++ b/pubsub/topic.go
@@ -36,6 +36,7 @@
 	"google.golang.org/api/support/bundler"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/encoding/gzip"
 	"google.golang.org/grpc/status"
 	"google.golang.org/protobuf/proto"
 	"google.golang.org/protobuf/types/known/durationpb"
@@ -117,6 +118,17 @@
 
 	// FlowControlSettings defines publisher flow control settings.
 	FlowControlSettings FlowControlSettings
+
+	// EnableCompression enables transport compression for Publish operations
+	EnableCompression bool
+
+	// CompressionBytesThreshold defines the threshold (in bytes) above which messages
+	// are compressed for transport. Only takes effect if EnableCompression is true.
+	CompressionBytesThreshold int
+}
+
+func (ps *PublishSettings) shouldCompress(batchSize int) bool {
+	return ps.EnableCompression && batchSize > ps.CompressionBytesThreshold
 }
 
 // DefaultPublishSettings holds the default values for topics' PublishSettings.
@@ -134,6 +146,10 @@
 		MaxOutstandingBytes:    -1,
 		LimitExceededBehavior:  FlowControlIgnore,
 	},
+	// Publisher compression defaults matches Java's defaults
+	// https://github.com/googleapis/java-pubsub/blob/7d33e7891db1b2e32fd523d7655b6c11ea140a8b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L717-L718
+	EnableCompression:         false,
+	CompressionBytesThreshold: 240,
 }
 
 // CreateTopic creates a new topic.
@@ -875,6 +891,7 @@
 	}
 	pbMsgs := make([]*pb.PubsubMessage, len(bms))
 	var orderingKey string
+	batchSize := 0
 	for i, bm := range bms {
 		orderingKey = bm.msg.OrderingKey
 		pbMsgs[i] = &pb.PubsubMessage{
@@ -882,6 +899,7 @@
 			Attributes:  bm.msg.Attributes,
 			OrderingKey: bm.msg.OrderingKey,
 		}
+		batchSize = batchSize + proto.Size(pbMsgs[i])
 		bm.msg = nil // release bm.msg for GC
 	}
 	var res *pb.PublishResponse
@@ -897,11 +915,17 @@
 			opt.Resolve(&settings)
 		}
 		r := &publishRetryer{defaultRetryer: settings.Retry()}
+		gaxOpts := []gax.CallOption{
+			gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
+			gax.WithRetry(func() gax.Retryer { return r }),
+		}
+		if t.PublishSettings.shouldCompress(batchSize) {
+			gaxOpts = append(gaxOpts, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
+		}
 		res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{
 			Topic:    t.name,
 			Messages: pbMsgs,
-		}, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
-			gax.WithRetry(func() gax.Retryer { return r }))
+		}, gaxOpts...)
 	}
 	end := time.Now()
 	if err != nil {
diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go
index eb53bef..974e0c4 100644
--- a/pubsub/topic_test.go
+++ b/pubsub/topic_test.go
@@ -747,3 +747,26 @@
 		t.Errorf("got %v, want errTopicOrderingNotEnabled", err)
 	}
 }
+
+func TestPublishCompression(t *testing.T) {
+	ctx := context.Background()
+	client, srv := newFake(t)
+	defer client.Close()
+	defer srv.Close()
+
+	topic := mustCreateTopic(t, client, "topic-compression")
+	defer topic.Stop()
+
+	topic.PublishSettings.EnableCompression = true
+	topic.PublishSettings.CompressionBytesThreshold = 50
+
+	const messageSizeBytes = 1000
+
+	msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))}
+	res := topic.Publish(ctx, msg)
+
+	_, err := res.Get(ctx)
+	if err != nil {
+		t.Errorf("publish result got err: %v", err)
+	}
+}