blob: 4dfe7dfc5755a31102529a4942cc624beb3f63f2 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package pscompat
import (
"context"
"errors"
"testing"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/internal/test"
"cloud.google.com/go/pubsublite/internal/wire"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
// mockWirePublisher is a mock implementation of the wire.Publisher interface.
// It uses test.RPCVerifier to install fake PublishResults for each Publish
// call.
type mockWirePublisher struct {
Verifier *test.RPCVerifier
Stopped bool
err error
}
func (mp *mockWirePublisher) Publish(msg *pb.PubSubMessage, onResult wire.PublishResultFunc) {
resp, err := mp.Verifier.Pop(msg)
if err != nil {
mp.err = err
onResult(nil, err)
return
}
result := resp.(*wire.MessageMetadata)
onResult(result, nil)
}
func (mp *mockWirePublisher) Start() {}
func (mp *mockWirePublisher) Stop() { mp.Stopped = true }
func (mp *mockWirePublisher) WaitStarted() error { return mp.err }
func (mp *mockWirePublisher) WaitStopped() error { return mp.err }
func (mp *mockWirePublisher) Error() error { return mp.err }
func newTestPublisherClient(verifier *test.RPCVerifier, settings PublishSettings) *PublisherClient {
return &PublisherClient{
settings: settings,
wirePub: &mockWirePublisher{Verifier: verifier},
}
}
func TestPublisherClientTransformMessage(t *testing.T) {
ctx := context.Background()
input := &pubsub.Message{
Data: []byte("data"),
OrderingKey: "ordering_key",
Attributes: map[string]string{"attr": "value"},
}
fakeResponse := &wire.MessageMetadata{
Partition: 2,
Offset: 42,
}
wantResultID := "2:42"
for _, tc := range []struct {
desc string
// mutateSettings is passed a copy of DefaultPublishSettings to mutate.
mutateSettings func(settings *PublishSettings)
wantMsg *pb.PubSubMessage
}{
{
desc: "default settings",
mutateSettings: func(settings *PublishSettings) {},
wantMsg: &pb.PubSubMessage{
Data: []byte("data"),
Key: []byte("ordering_key"),
Attributes: map[string]*pb.AttributeValues{
"attr": {Values: [][]byte{[]byte("value")}},
},
},
},
{
desc: "custom key extractor",
mutateSettings: func(settings *PublishSettings) {
settings.KeyExtractor = func(msg *pubsub.Message) []byte {
return msg.Data
}
},
wantMsg: &pb.PubSubMessage{
Data: []byte("data"),
Key: []byte("data"),
Attributes: map[string]*pb.AttributeValues{
"attr": {Values: [][]byte{[]byte("value")}},
},
},
},
{
desc: "custom message transformer",
mutateSettings: func(settings *PublishSettings) {
settings.KeyExtractor = func(msg *pubsub.Message) []byte {
return msg.Data
}
settings.MessageTransformer = func(from *pubsub.Message, to *pb.PubSubMessage) error {
// Swaps data and key.
to.Data = []byte(from.OrderingKey)
to.Key = from.Data
return nil
}
},
wantMsg: &pb.PubSubMessage{
Data: []byte("ordering_key"),
Key: []byte("data"),
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
settings := DefaultPublishSettings
tc.mutateSettings(&settings)
verifier := test.NewRPCVerifier(t)
verifier.Push(tc.wantMsg, fakeResponse, nil)
defer verifier.Flush()
pubClient := newTestPublisherClient(verifier, settings)
result := pubClient.Publish(ctx, input)
gotID, err := result.Get(ctx)
if err != nil {
t.Errorf("Publish() got err: %v", err)
}
if gotID != wantResultID {
t.Errorf("Publish() got id: %q, want: %q", gotID, wantResultID)
}
})
}
}
func TestPublisherClientTransformMessageError(t *testing.T) {
wantErr := errors.New("message could not be converted")
settings := DefaultPublishSettings
settings.MessageTransformer = func(_ *pubsub.Message, _ *pb.PubSubMessage) error {
return wantErr
}
// No publish calls expected.
verifier := test.NewRPCVerifier(t)
defer verifier.Flush()
ctx := context.Background()
input := &pubsub.Message{
Data: []byte("data"),
}
pubClient := newTestPublisherClient(verifier, settings)
result := pubClient.Publish(ctx, input)
_, gotErr := result.Get(ctx)
if !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, wantErr)
}
if !test.ErrorEqual(pubClient.Error(), wantErr) {
t.Errorf("PublisherClient.Error() got: (%v), want: (%v)", pubClient.Error(), wantErr)
}
if got, want := pubClient.wirePub.(*mockWirePublisher).Stopped, true; got != want {
t.Errorf("Publisher.Stopped: got %v, want %v", got, want)
}
}