blob: eee99d29d9d98ccb59d47254d6524800507a6a6e [file] [log] [blame] [edit]
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"fmt"
"slices"
"sync/atomic"
"testing"
"time"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsub/internal"
"cloud.google.com/go/pubsub/pstest"
"github.com/google/go-cmp/cmp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func TestTrace_MessageCarrier(t *testing.T) {
ctx := context.Background()
e := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(e))
otel.SetTextMapPropagator(propagation.TraceContext{})
defer tp.Shutdown(ctx)
otel.SetTracerProvider(tp)
ctx, _ = tp.Tracer("a").Start(ctx, "fake-span")
msg := &Message{
Data: []byte("asdf"),
OrderingKey: "asdf",
Attributes: map[string]string{},
}
otel.GetTextMapPropagator().Inject(ctx, newMessageCarrier(msg))
if _, ok := msg.Attributes[googclientPrefix+"traceparent"]; !ok {
t.Fatalf("expected traceparent in message attributes, found empty string")
}
newCtx := context.Background()
propagation.TraceContext{}.Extract(newCtx, newMessageCarrier(msg))
if _, ok := msg.Attributes[googclientPrefix+"traceparent"]; !ok {
t.Fatalf("expected traceparent in message attributes, found empty string")
}
}
func TestTrace_PublishSpan(t *testing.T) {
ctx := context.Background()
c, srv := newFakeWithTracing(t)
defer c.Close()
defer srv.Close()
e := tracetest.NewInMemoryExporter()
g := &incrementIDGenerator{}
tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(e), sdktrace.WithIDGenerator(g))
defer tp.Shutdown(ctx)
otel.SetTracerProvider(tp)
m := &Message{
Data: []byte("test"),
OrderingKey: "my-key",
}
topicID := "t"
expectedSpans := tracetest.SpanStubs{
tracetest.SpanStub{
Name: fmt.Sprintf("%s %s", topicID, createSpanName),
SpanKind: trace.SpanKindProducer,
Attributes: []attribute.KeyValue{
semconv.CodeFunction("Publish"),
semconv.MessagingDestinationName(topicID),
semconv.MessagingGCPPubsubMessageOrderingKey(m.OrderingKey),
// Hardcoded since the fake server always returns m0 first.
semconv.MessagingMessageIDKey.String("m0"),
semconv.MessagingSystemGCPPubsub,
semconv.MessagingMessageBodySize(len(m.Data)),
attribute.String(gcpProjectIDAttribute, projName),
},
Events: []sdktrace.Event{
{
Name: eventPublishStart,
Attributes: []attribute.KeyValue{
semconv.MessagingBatchMessageCount(1),
},
},
{
Name: eventPublishEnd,
},
},
Links: []sdktrace.Link{
{
SpanContext: trace.NewSpanContext(trace.SpanContextConfig{
TraceID: [16]byte{(byte(2))},
SpanID: [8]byte{(byte(4))},
TraceFlags: 1,
}),
},
},
ChildSpanCount: 2,
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
},
tracetest.SpanStub{
Name: publishFCSpanName,
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
},
tracetest.SpanStub{
Name: batcherSpanName,
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
},
tracetest.SpanStub{
Name: fmt.Sprintf("%s %s", topicID, publishRPCSpanName),
Attributes: []attribute.KeyValue{
semconv.MessagingSystemGCPPubsub,
semconv.MessagingDestinationName(topicID),
semconv.CodeFunction("publishMessageBundle"),
semconv.MessagingBatchMessageCount(1),
attribute.String(gcpProjectIDAttribute, projName),
},
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
Links: []sdktrace.Link{
{
SpanContext: trace.NewSpanContext(trace.SpanContextConfig{
TraceID: [16]byte{(byte(1))},
SpanID: [8]byte{(byte(1))},
TraceFlags: 1,
}),
},
},
},
}
topic, err := c.CreateTopic(ctx, topicID)
if err != nil {
t.Fatalf("failed to create topic: %v", err)
}
if m.OrderingKey != "" {
topic.EnableMessageOrdering = true
}
r := topic.Publish(ctx, m)
_, err = r.Get(ctx)
if err != nil {
t.Fatalf("failed to publish message: %v", err)
}
defer topic.Stop()
got := getSpans(e)
slices.SortFunc(expectedSpans, func(a, b tracetest.SpanStub) int {
return sortSpanStub(a, b)
})
compareSpans(t, got, expectedSpans)
}
func TestTrace_PublishSpanError(t *testing.T) {
ctx := context.Background()
c, srv := newFakeWithTracing(t)
defer c.Close()
defer srv.Close()
e := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(e))
defer tp.Shutdown(ctx)
otel.SetTracerProvider(tp)
m := &Message{
Data: []byte("test"),
OrderingKey: "m",
}
topicID := "t"
topic, err := c.CreateTopic(ctx, topicID)
if err != nil {
t.Fatalf("failed to create topic: %v", err)
}
// Publishing a message with an ordering key without enabling ordering topic ordering
// should fail.
t.Run("no ordering key", func(t *testing.T) {
r := topic.Publish(ctx, m)
_, err = r.Get(ctx)
if err == nil {
t.Fatal("expected err, got nil")
}
want := getPublishSpanStubsWithError(topicID, m, errTopicOrderingNotEnabled)
got := getSpans(e)
opts := []cmp.Option{
cmp.Comparer(spanStubComparer),
}
if diff := testutil.Diff(got, want, opts...); diff != "" {
t.Errorf("diff: -got, +want:\n%s\n", diff)
}
e.Reset()
topic.ResumePublish(m.OrderingKey)
})
t.Run("stopped topic", func(t *testing.T) {
// Publishing a message with a stopped publisher should fail too
topic.ResumePublish(m.OrderingKey)
topic.EnableMessageOrdering = true
topic.Stop()
r := topic.Publish(ctx, m)
_, err = r.Get(ctx)
if err == nil {
t.Fatal("expected err, got nil")
}
got := getSpans(e)
want := getPublishSpanStubsWithError(topicID, m, ErrTopicStopped)
opts := []cmp.Option{
cmp.Comparer(spanStubComparer),
}
if diff := testutil.Diff(got, want, opts...); diff != "" {
t.Errorf("diff: -got, +want:\n%s\n", diff)
}
e.Reset()
topic.ResumePublish(m.OrderingKey)
})
t.Run("flow control error", func(t *testing.T) {
// Use a different topic here than above since
// we need to adjust the flow control settings,
// which are immutable after publish.
topicID := "t2"
topic, err := c.CreateTopic(ctx, topicID)
if err != nil {
t.Fatalf("failed to create topic: %v", err)
}
topic.EnableMessageOrdering = true
topic.PublishSettings.FlowControlSettings = FlowControlSettings{
LimitExceededBehavior: FlowControlSignalError,
MaxOutstandingBytes: 1,
}
r := topic.Publish(ctx, m)
_, err = r.Get(ctx)
if err == nil {
t.Fatal("expected err, got nil")
}
got := getSpans(e)
want := getFlowControlSpanStubs(ErrFlowControllerMaxOutstandingBytes)
opts := []cmp.Option{
cmp.Comparer(spanStubComparer),
}
if diff := testutil.Diff(got, want, opts...); diff != "" {
t.Errorf("diff: -got, +want:\n%s\n", diff)
}
})
}
func TestTrace_SubscribeSpans(t *testing.T) {
ctx := context.Background()
c, srv := newFakeWithTracing(t)
defer c.Close()
defer srv.Close()
// For subscribe spans, we'll publish before setting the tracer provider
// so we don't trace the publish spans. Context propagation is tested separately.
m := &Message{
Data: []byte("test"),
OrderingKey: "my-key",
}
topicID := "t"
topic, err := c.CreateTopic(ctx, topicID)
if err != nil {
t.Fatalf("failed to create topic: %v", err)
}
subID := "s"
enableEOS := false
sub, err := c.CreateSubscription(ctx, subID, SubscriptionConfig{
Topic: topic,
EnableExactlyOnceDelivery: enableEOS,
})
if err != nil {
t.Fatalf("failed to create subscription: %v", err)
}
if m.OrderingKey != "" {
topic.EnableMessageOrdering = true
}
// Call publish before enabling tracer provider to only test subscribe spans.
r := topic.Publish(ctx, m)
_, err = r.Get(ctx)
if err != nil {
t.Fatalf("failed to publish message: %v", err)
}
e := tracetest.NewInMemoryExporter()
g := &incrementIDGenerator{}
tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(e), sdktrace.WithIDGenerator(g))
defer tp.Shutdown(ctx)
otel.SetTracerProvider(tp)
ctx, cancel := context.WithCancel(ctx)
sub.Receive(ctx, func(ctx context.Context, m *Message) {
// Add artificial processsing time so the message isn't acked before the modack can be sent out.
time.Sleep(1 * time.Second)
m.Ack()
cancel()
})
expectedSpans := tracetest.SpanStubs{
tracetest.SpanStub{
Name: fmt.Sprintf("%s %s", subID, processSpanName),
SpanKind: trace.SpanKindInternal,
Attributes: []attribute.KeyValue{
semconv.MessagingOperationTypeDeliver,
},
Events: []sdktrace.Event{
{
Name: eventAckCalled,
},
},
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
},
tracetest.SpanStub{
Name: fmt.Sprintf("%s %s", subID, subscribeSpanName),
SpanKind: trace.SpanKindConsumer,
Attributes: []attribute.KeyValue{
semconv.CodeFunction("receive"),
semconv.MessagingBatchMessageCount(1),
semconv.MessagingDestinationName(subID),
attribute.Bool(eosAttribute, enableEOS),
// Hardcoded since the fake server always returns m0 first.
semconv.MessagingMessageIDKey.String("m0"),
// The fake server uses message ID as ackID, this is not the case with live service.
semconv.MessagingGCPPubsubMessageAckID("m0"),
semconv.MessagingGCPPubsubMessageOrderingKey(m.OrderingKey),
attribute.String(resultAttribute, resultAcked),
semconv.MessagingSystemGCPPubsub,
semconv.MessagingMessageBodySize(len(m.Data)),
attribute.String(gcpProjectIDAttribute, projName),
},
Events: []sdktrace.Event{
{
Name: eventModackStart,
Attributes: []attribute.KeyValue{
semconv.MessagingBatchMessageCount(1),
},
},
{
Name: eventModackEnd,
},
{
Name: eventAckStart,
Attributes: []attribute.KeyValue{
semconv.MessagingBatchMessageCount(1),
},
},
{
Name: eventAckEnd,
},
},
// ChildSpanCount: 3,
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
},
tracetest.SpanStub{
Name: scheduleSpanName,
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
},
tracetest.SpanStub{
Name: ccSpanName,
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
},
tracetest.SpanStub{
Name: fmt.Sprintf("%s %s", subID, ackSpanName),
Links: []sdktrace.Link{
{
SpanContext: trace.NewSpanContext(trace.SpanContextConfig{
TraceID: [16]byte{(byte(1))},
SpanID: [8]byte{(byte(1))},
TraceFlags: 1,
}),
},
},
Attributes: []attribute.KeyValue{
semconv.CodeFunction("sendAck"),
semconv.MessagingBatchMessageCount(1),
semconv.MessagingSystemGCPPubsub,
semconv.MessagingDestinationName(subID),
attribute.String(gcpProjectIDAttribute, projName),
},
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
},
tracetest.SpanStub{
Name: fmt.Sprintf("%s %s", subID, modackSpanName),
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
Links: []sdktrace.Link{
{
SpanContext: trace.NewSpanContext(trace.SpanContextConfig{
TraceID: [16]byte{(byte(1))},
SpanID: [8]byte{(byte(1))},
TraceFlags: 1,
}),
},
},
Attributes: []attribute.KeyValue{
semconv.CodeFunction("sendModAck"),
attribute.Bool(receiptModackAttribute, true),
semconv.MessagingGCPPubsubMessageAckDeadline(10),
semconv.MessagingBatchMessageCount(1),
semconv.MessagingSystemGCPPubsub,
semconv.MessagingDestinationName(subID),
attribute.String(gcpProjectIDAttribute, projName),
},
},
}
got := getSpans(e)
slices.SortFunc(expectedSpans, func(a, b tracetest.SpanStub) int {
return sortSpanStub(a, b)
})
compareSpans(t, got, expectedSpans)
}
func TestTrace_TracingNotEnabled(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
defer c.Close()
defer srv.Close()
e := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(e))
defer tp.Shutdown(ctx)
otel.SetTracerProvider(tp)
m := &Message{
Data: []byte("test"),
}
topicID := "t"
subID := "s"
topic, err := c.CreateTopic(ctx, topicID)
if err != nil {
t.Fatalf("failed to create topic: %v", err)
}
sub, err := c.CreateSubscription(ctx, subID, SubscriptionConfig{
Topic: topic,
})
if err != nil {
t.Fatalf("failed to create subscription: %v", err)
}
r := topic.Publish(ctx, m)
_, err = r.Get(ctx)
if err != nil {
t.Fatalf("failed to publish message: %v", err)
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
sub.Receive(ctx, func(ctx context.Context, msg *Message) {
msg.Ack()
cancel()
})
got := getSpans(e)
if len(got) != 0 {
t.Fatalf("expected no spans, got %d", len(got))
}
}
func spanStubComparer(a, b tracetest.SpanStub) bool {
if a.Name != b.Name {
fmt.Printf("a.Name: %s\nb.Name: %s\n", a.Name, b.Name)
return false
}
if a.ChildSpanCount != b.ChildSpanCount {
fmt.Printf("a.ChildSpanCount: %d\nb.ChildSpanCount: %d\n", a.ChildSpanCount, b.ChildSpanCount)
return false
}
// Use attribute.NewSet and set.Equals() once go1.21 is min version
if len(a.Attributes) != len(b.Attributes) {
fmt.Printf("len mismatch: a.Attributes: %d, b.Attributes: %d\n", len(a.Attributes), len(b.Attributes))
return false
}
if a.InstrumentationLibrary != b.InstrumentationLibrary {
fmt.Printf("a.InstrumentationLibrary: %v\nb.InstrumentationLibrary: %v\n", a.InstrumentationLibrary, b.InstrumentationLibrary)
return false
}
if a.Status != b.Status {
fmt.Printf("a.Status: %v\nb.Status: %v\n", a.Status, b.Status)
return false
}
return true
}
func sortSpanStub(a, b tracetest.SpanStub) int {
if a.Name == b.Name {
return 0
} else if a.Name < b.Name {
return -1
} else {
return 1
}
}
func getSpans(e *tracetest.InMemoryExporter) tracetest.SpanStubs {
// Wait a fixed amount for spans to be fully exported.
time.Sleep(100 * time.Millisecond)
spans := e.GetSpans()
// Implement sortable struct, replace with slices.SortFunc once go 1.21 is min version
slices.SortFunc(spans, func(a, b tracetest.SpanStub) int {
return sortSpanStub(a, b)
})
return spans
}
func compareSpans(t *testing.T, got, want tracetest.SpanStubs) {
if len(got) != len(want) {
for _, span := range got {
t.Logf("got: %s\n", span.Name)
}
for _, span := range want {
t.Logf("want: %s\n", span.Name)
}
t.Errorf("got %d spans, want %d", len(got), len(want))
}
opts := []cmp.Option{
cmp.Comparer(spanStubComparer),
}
for i, span := range got {
wanti := want[i]
if diff := testutil.Diff(span, wanti, opts...); diff != "" {
t.Errorf("diff: -got, +want:\n%s\n", diff)
}
}
}
func getPublishSpanStubsWithError(topicID string, m *Message, err error) tracetest.SpanStubs {
return tracetest.SpanStubs{
tracetest.SpanStub{
Name: fmt.Sprintf("%s %s", topicID, createSpanName),
SpanKind: trace.SpanKindProducer,
Attributes: []attribute.KeyValue{
semconv.CodeFunction("Publish"),
semconv.MessagingDestinationName(topicID),
semconv.MessagingMessageIDKey.String(""),
semconv.MessagingMessageBodySize(len(m.Data)),
semconv.MessagingGCPPubsubMessageOrderingKey(m.OrderingKey),
semconv.MessagingSystemGCPPubsub,
attribute.String(gcpProjectIDAttribute, projName),
},
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
Status: sdktrace.Status{
Code: codes.Error,
Description: err.Error(),
},
},
}
}
func getFlowControlSpanStubs(err error) tracetest.SpanStubs {
return tracetest.SpanStubs{
tracetest.SpanStub{
Name: publishFCSpanName,
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
Status: sdktrace.Status{
Code: codes.Error,
Description: err.Error(),
},
},
}
}
func newFakeWithTracing(t *testing.T) (*Client, *pstest.Server) {
ctx := context.Background()
srv := pstest.NewServer()
client, err := NewClientWithConfig(ctx, projName,
&ClientConfig{EnableOpenTelemetryTracing: true},
option.WithEndpoint(srv.Addr),
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
option.WithTelemetryDisabled(),
)
if err != nil {
t.Fatal(err)
}
return client, srv
}
var _ sdktrace.IDGenerator = &incrementIDGenerator{}
type incrementIDGenerator struct {
tid int64
sid int64
}
func (i *incrementIDGenerator) NewSpanID(ctx context.Context, traceID trace.TraceID) trace.SpanID {
id := atomic.AddInt64(&i.sid, 1)
sid := [8]byte{byte(id)}
return sid
}
// NewIDs returns a non-zero trace ID and a non-zero span ID from a
// randomly-chosen sequence.
func (i *incrementIDGenerator) NewIDs(ctx context.Context) (trace.TraceID, trace.SpanID) {
id1 := atomic.AddInt64(&i.tid, 1)
id2 := atomic.AddInt64(&i.sid, 1)
tid := [16]byte{byte(id1)}
sid := [8]byte{byte(id2)}
return tid, sid
}
func BenchmarkNoTracingEnabled(b *testing.B) {
ctx := context.Background()
t := &testing.T{}
c, srv := newFake(t)
defer c.Close()
defer srv.Close()
e := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(e))
defer tp.Shutdown(ctx)
otel.SetTracerProvider(tp)
m := &Message{
Data: []byte("test"),
}
topicID := "t"
subID := "s"
topic, err := c.CreateTopic(ctx, topicID)
if err != nil {
b.Fatalf("failed to create topic: %v", err)
}
sub, err := c.CreateSubscription(ctx, subID, SubscriptionConfig{
Topic: topic,
})
if err != nil {
b.Fatalf("failed to create subscription: %v", err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
r := topic.Publish(ctx, m)
_, err = r.Get(ctx)
if err != nil {
b.Fatalf("failed to publish message: %v", err)
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
sub.Receive(ctx, func(ctx context.Context, msg *Message) {
msg.Ack()
cancel()
})
got := getSpans(e)
if len(got) != 0 {
b.Fatalf("expected no spans, got %d", len(got))
}
}
}