blob: 2360d6345008cff1fd51703b6b5e101b6dc6f4bc [file] [log] [blame]
// Copyright 2017 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"bytes"
"context"
"errors"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
ipubsub "cloud.google.com/go/internal/pubsub"
"cloud.google.com/go/internal/testutil"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/pstest"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)
var (
projName = "P"
topicName = "some-topic"
subName = "some-sub"
fullyQualifiedTopicName = fmt.Sprintf("projects/%s/topics/%s", projName, topicName)
fullyQualifiedSubName = fmt.Sprintf("projects/%s/subscriptions/%s", projName, subName)
)
func TestSplitRequestIDs(t *testing.T) {
t.Parallel()
ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"}
for _, test := range []struct {
ids []string
splitIndex int
}{
{[]string{}, 0}, // empty slice, no split
{ids, 2}, // slice of size 5, split at index 2
{ids[:2], 2}, // slice of size 3, split at index 2
{ids[:1], 1}, // slice of size 1, split at index 1
} {
got1, got2 := splitRequestIDs(test.ids, 2)
want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:]
if !testutil.Equal(len(got1), len(want1)) {
t.Errorf("%v, 1: got %v, want %v", test, got1, want1)
}
if !testutil.Equal(len(got2), len(want2)) {
t.Errorf("%v, 2: got %v, want %v", test, got2, want2)
}
}
}
func TestCalcFieldSize(t *testing.T) {
t.Parallel()
// Create a mock ack request to test.
req := &pb.AcknowledgeRequest{
Subscription: "sub",
AckIds: []string{"aaa", "bbb", "ccc", "ddd", "eee"},
}
size := calcFieldSizeString(req.Subscription) + calcFieldSizeString(req.AckIds...)
// Proto encoding is calculated from 1 tag byte and 1 size byte for each string.
want := (1 + 1) + len(req.Subscription) + // subscription field: 1 tag byte + 1 size byte
5*(1+1+3) // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)]
if size != want {
t.Errorf("pubsub: calculated ack req size of %d bytes, want %d", size, want)
}
req.Subscription = string(bytes.Repeat([]byte{'A'}, 300))
size = calcFieldSizeString(req.Subscription) + calcFieldSizeString(req.AckIds...)
// With a longer subscription name, we use an extra size byte.
want = (1 + 2) + len(req.Subscription) + // subscription field: 1 tag byte + 2 size bytes
5*(1+1+3) // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)]
if size != want {
t.Errorf("pubsub: calculated ack req size of %d bytes, want %d", size, want)
}
// Create a mock modack request to test.
modAckReq := &pb.ModifyAckDeadlineRequest{
Subscription: "sub",
AckIds: []string{"aaa", "bbb", "ccc", "ddd", "eee"},
AckDeadlineSeconds: 300,
}
size = calcFieldSizeString(modAckReq.Subscription) +
calcFieldSizeString(modAckReq.AckIds...) +
calcFieldSizeInt(int(modAckReq.AckDeadlineSeconds))
want = (1 + 1) + len(modAckReq.Subscription) + // subscription field: 1 tag byte + 1 size byte
5*(1+1+3) + // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)]
(1 + 2) // ackDeadline: 1 tag byte + 2 size bytes
if size != want {
t.Errorf("pubsub: calculated modAck req size of %d bytes, want %d", size, want)
}
}
func TestMaxExtensionPeriod(t *testing.T) {
srv := pstest.NewServer()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
_, client, err := initConn(ctx, srv.Addr)
if err != nil {
t.Fatal(err)
}
want := 15 * time.Second
iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{
maxExtensionPeriod: want,
})
// Add a datapoint that's greater than maxExtensionPeriod.
receiveTime := time.Now().Add(time.Duration(-20) * time.Second)
iter.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second))
if got := iter.ackDeadline(); got != want {
t.Fatalf("deadline got = %v, want %v", got, want)
}
}
func TestAckDistribution(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Skip("broken")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
minDurationPerLeaseExtension = 1 * time.Second
pstest.SetMinAckDeadline(minDurationPerLeaseExtension)
srv := pstest.NewServer()
defer srv.Close()
defer pstest.ResetMinAckDeadline()
// Create the topic via a Publish. It's convenient to do it here as opposed to client.CreateTopic because the client
// has not been established yet, and also because we want to create the topic once whereas the client is established
// below twice.
srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
queuedMsgs := make(chan int32, 1024)
go continuouslySend(ctx, srv, queuedMsgs)
for _, testcase := range []struct {
initialProcessSecs int32
finalProcessSecs int32
}{
{initialProcessSecs: 3, finalProcessSecs: 5}, // Process time goes up
{initialProcessSecs: 5, finalProcessSecs: 3}, // Process time goes down
} {
t.Logf("Testing %d -> %d", testcase.initialProcessSecs, testcase.finalProcessSecs)
// processTimeSecs is used by the sender to coordinate with the receiver how long the receiver should
// pretend to process for. e.g. if we test 3s -> 5s, processTimeSecs will start at 3, causing receiver
// to process messages received for 3s while sender sends the first batch. Then, as sender begins to
// send the next batch, sender will swap processTimeSeconds to 5s and begin sending, and receiver will
// process each message for 5s. In this way we simulate a client whose time-to-ack (process time) changes.
processTimeSecs := testcase.initialProcessSecs
s, client, err := initConn(ctx, srv.Addr)
if err != nil {
t.Fatal(err)
}
// recvdWg increments for each message sent, and decrements for each message received.
recvdWg := &sync.WaitGroup{}
go startReceiving(ctx, t, s, recvdWg, &processTimeSecs)
startSending(t, queuedMsgs, &processTimeSecs, testcase.initialProcessSecs, testcase.finalProcessSecs, recvdWg)
recvdWg.Wait()
time.Sleep(100 * time.Millisecond) // Wait a bit more for resources to clean up
err = client.Close()
if err != nil {
t.Fatal(err)
}
modacks := modacksByTime(srv.Messages())
u := modackDeadlines(modacks)
initialDL := int32(minDurationPerLeaseExtension / time.Second)
if !setsAreEqual(u, []int32{initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs}) {
t.Fatalf("Expected modack deadlines to contain (exactly, and only) %ds, %ds, %ds. Instead, got %v",
initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs, toSet(u))
}
}
}
// modacksByTime buckets modacks by time.
func modacksByTime(msgs []*pstest.Message) map[time.Time][]pstest.Modack {
modacks := map[time.Time][]pstest.Modack{}
for _, msg := range msgs {
for _, m := range msg.Modacks {
modacks[m.ReceivedAt] = append(modacks[m.ReceivedAt], m)
}
}
return modacks
}
// setsAreEqual reports whether a and b contain the same values, ignoring duplicates.
func setsAreEqual(haystack, needles []int32) bool {
hMap := map[int32]bool{}
nMap := map[int32]bool{}
for _, n := range needles {
nMap[n] = true
}
for _, n := range haystack {
hMap[n] = true
}
return reflect.DeepEqual(nMap, hMap)
}
// startReceiving pretends to be a client. It calls s.Receive and acks messages after some random delay. It also
// looks out for dupes - any message that arrives twice will cause a failure.
func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg *sync.WaitGroup, processTimeSecs *int32) {
t.Log("Receiving..")
var recvdMu sync.Mutex
recvd := map[string]bool{}
err := s.Receive(ctx, func(ctx context.Context, msg *Message) {
msgData := string(msg.Data)
recvdMu.Lock()
_, ok := recvd[msgData]
if ok {
recvdMu.Unlock()
t.Logf("already saw \"%s\"\n", msgData)
return
}
recvd[msgData] = true
recvdMu.Unlock()
select {
case <-ctx.Done():
msg.Nack()
recvdWg.Done()
case <-time.After(time.Duration(atomic.LoadInt32(processTimeSecs)) * time.Second):
msg.Ack()
recvdWg.Done()
}
})
if err != nil {
if status.Code(err) != codes.Canceled {
t.Error(err)
}
}
}
// startSending sends four batches of messages broken up by minDeadline, initialProcessSecs, and finalProcessSecs.
func startSending(t *testing.T, queuedMsgs chan int32, processTimeSecs *int32, initialProcessSecs int32, finalProcessSecs int32, recvdWg *sync.WaitGroup) {
var msg int32
// We must send this block to force the receiver to send its initially-configured modack time. The time that
// gets sent should be ignorant of the distribution, since there haven't been enough (any, actually) messages
// to create a distribution yet.
t.Log("minAckDeadlineSecsSending an initial message")
recvdWg.Add(1)
msg++
queuedMsgs <- msg
<-time.After(minDurationPerLeaseExtension)
t.Logf("Sending some messages to update distribution to %d. This new distribution will be used "+
"when the next batch of messages go out.", initialProcessSecs)
for i := 0; i < 10; i++ {
recvdWg.Add(1)
msg++
queuedMsgs <- msg
}
atomic.SwapInt32(processTimeSecs, finalProcessSecs)
<-time.After(time.Duration(initialProcessSecs) * time.Second)
t.Logf("Sending many messages to update distribution to %d. This new distribution will be used "+
"when the next batch of messages go out.", finalProcessSecs)
for i := 0; i < 100; i++ {
recvdWg.Add(1)
msg++
queuedMsgs <- msg // Send many messages to drastically change distribution
}
<-time.After(time.Duration(finalProcessSecs) * time.Second)
t.Logf("Last message going out, whose deadline should be %d.", finalProcessSecs)
recvdWg.Add(1)
msg++
queuedMsgs <- msg
}
// continuouslySend continuously sends messages that exist on the queuedMsgs chan.
func continuouslySend(ctx context.Context, srv *pstest.Server, queuedMsgs chan int32) {
for {
select {
case <-ctx.Done():
return
case m := <-queuedMsgs:
srv.Publish(fullyQualifiedTopicName, []byte(fmt.Sprintf("message %d", m)), nil)
}
}
}
func toSet(arr []int32) []int32 {
var s []int32
m := map[int32]bool{}
for _, v := range arr {
_, ok := m[v]
if !ok {
s = append(s, v)
m[v] = true
}
}
return s
}
func initConn(ctx context.Context, addr string) (*Subscription, *Client, error) {
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return nil, nil, err
}
e := testutil.DefaultHeadersEnforcer()
opts := append(e.CallOptions(), option.WithGRPCConn(conn))
client, err := NewClient(ctx, projName, opts...)
if err != nil {
return nil, nil, err
}
topic := client.Topic(topicName)
s, err := client.CreateSubscription(ctx, fmt.Sprintf("sub-%d", time.Now().UnixNano()), SubscriptionConfig{Topic: topic})
if err != nil {
return nil, nil, err
}
exists, err := s.Exists(ctx)
if !exists {
return nil, nil, errors.New("Subscription does not exist")
}
if err != nil {
return nil, nil, err
}
return s, client, nil
}
// modackDeadlines takes a map of time => Modack, gathers all the Modack.AckDeadlines,
// and returns them as a slice
func modackDeadlines(m map[time.Time][]pstest.Modack) []int32 {
var u []int32
for _, vv := range m {
for _, v := range vv {
u = append(u, v.AckDeadline)
}
}
return u
}
func TestIterator_ModifyAckContextDeadline(t *testing.T) {
// Test that all context deadline exceeded errors in ModAckDeadline
// are not propagated to the client.
opts := []pstest.ServerReactorOption{
pstest.WithErrorInjection("ModifyAckDeadline", codes.Unknown, "context deadline exceeded"),
}
srv := pstest.NewServer(opts...)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
s, client, err := initConn(ctx, srv.Addr)
if err != nil {
t.Fatal(err)
}
srv.Publish(fullyQualifiedTopicName, []byte("some-message"), nil)
cctx, cancel := context.WithTimeout(ctx, time.Duration(5*time.Second))
defer cancel()
err = s.Receive(cctx, func(ctx context.Context, m *Message) {
m.Ack()
})
if err != nil {
t.Fatalf("Got error in Receive: %v", err)
}
err = client.Close()
if err != nil {
t.Fatal(err)
}
}
func TestIterator_SynchronousPullCancel(t *testing.T) {
srv := pstest.NewServer()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
_, client, err := initConn(ctx, srv.Addr)
if err != nil {
t.Fatal(err)
}
iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{})
// Cancelling the iterator and pulling should not result in any errors.
iter.cancel()
if _, err := iter.pullMessages(100); err != nil {
t.Fatalf("Got error in pullMessages: %v", err)
}
}
func TestIterator_BoundedDuration(t *testing.T) {
// Use exported fields for time.Duration fields so they
// print nicely. Otherwise, they will print as integers.
//
// AckDeadline is bounded by min/max ack deadline, which are
// 10 seconds and 600 seconds respectively. This is
// true for the real distribution data points as well.
testCases := []struct {
desc string
AckDeadline time.Duration
MinDuration time.Duration
MaxDuration time.Duration
exactlyOnce bool
Want time.Duration
}{
{
desc: "AckDeadline should be updated to the min duration",
AckDeadline: time.Duration(10 * time.Second),
MinDuration: time.Duration(15 * time.Second),
MaxDuration: time.Duration(10 * time.Minute),
exactlyOnce: false,
Want: time.Duration(15 * time.Second),
},
{
desc: "AckDeadline should be updated to 1 minute when using exactly once",
AckDeadline: time.Duration(10 * time.Second),
MinDuration: 0,
MaxDuration: time.Duration(10 * time.Minute),
exactlyOnce: true,
Want: time.Duration(1 * time.Minute),
},
{
desc: "AckDeadline should not be updated here, even though exactly once is enabled",
AckDeadline: time.Duration(10 * time.Second),
MinDuration: time.Duration(15 * time.Second),
MaxDuration: time.Duration(10 * time.Minute),
exactlyOnce: true,
Want: time.Duration(15 * time.Second),
},
{
desc: "AckDeadline should not be updated here",
AckDeadline: time.Duration(10 * time.Minute),
MinDuration: time.Duration(15 * time.Second),
MaxDuration: time.Duration(10 * time.Minute),
exactlyOnce: true,
Want: time.Duration(10 * time.Minute),
},
{
desc: "AckDeadline should not be updated when neither durations are set",
AckDeadline: time.Duration(5 * time.Minute),
MinDuration: 0,
MaxDuration: 0,
exactlyOnce: false,
Want: time.Duration(5 * time.Minute),
},
{
desc: "AckDeadline should should not be updated here since it is within both boundaries",
AckDeadline: time.Duration(5 * time.Minute),
MinDuration: time.Duration(1 * time.Minute),
MaxDuration: time.Duration(7 * time.Minute),
exactlyOnce: false,
Want: time.Duration(5 * time.Minute),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
got := boundedDuration(tc.AckDeadline, tc.MinDuration, tc.MaxDuration, tc.exactlyOnce)
if got != tc.Want {
t.Errorf("boundedDuration mismatch:\n%+v\ngot: %v, want: %v", tc, got, tc.Want)
}
})
}
}
func TestIterator_StreamingPullExactlyOnce(t *testing.T) {
srv := pstest.NewServer()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatal(err)
}
opts := withGRPCHeadersAssertion(t, option.WithGRPCConn(conn))
client, err := NewClient(ctx, projName, opts...)
if err != nil {
t.Fatal(err)
}
topic := client.Topic(topicName)
sc := SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: true,
EnableExactlyOnceDelivery: true,
}
_, err = client.CreateSubscription(ctx, subName, sc)
if err != nil {
t.Fatal(err)
}
// Make sure to call publish before constructing the iterator.
srv.Publish(fullyQualifiedTopicName, []byte("msg"), nil)
iter := newMessageIterator(client.subc, fullyQualifiedSubName, &pullOptions{
synchronous: false,
maxOutstandingMessages: 100,
maxOutstandingBytes: 1e6,
maxPrefetch: 30,
maxExtension: 1 * time.Minute,
maxExtensionPeriod: 10 * time.Second,
})
if _, err := iter.receive(10); err != nil {
t.Fatalf("Got error in recvMessages: %v", err)
}
if !iter.enableExactlyOnceDelivery {
t.Fatalf("expected iter.enableExactlyOnce=true")
}
}
func TestAddToDistribution(t *testing.T) {
c, _ := newFake(t)
iter := newMessageIterator(c.subc, "some-sub", &pullOptions{})
// Start with a datapoint that's too small that should be bounded to 10s.
receiveTime := time.Now().Add(time.Duration(-1) * time.Second)
iter.addToDistribution(receiveTime)
deadline := iter.ackTimeDist.Percentile(.99)
want := 10
if deadline != want {
t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want)
}
// The next datapoint should not be bounded.
receiveTime = time.Now().Add(time.Duration(-300) * time.Second)
iter.addToDistribution(receiveTime)
deadline = iter.ackTimeDist.Percentile(.99)
want = 300
if deadline != want {
t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want)
}
// Lastly, add a datapoint that should be bounded to 600s
receiveTime = time.Now().Add(time.Duration(-1000) * time.Second)
iter.addToDistribution(receiveTime)
deadline = iter.ackTimeDist.Percentile(.99)
want = 600
if deadline != want {
t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want)
}
}
func TestPingStreamAckDeadline(t *testing.T) {
c, srv := newFake(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
topic := c.Topic(topicName)
s, err := c.CreateSubscription(ctx, subName, SubscriptionConfig{Topic: topic})
if err != nil {
t.Errorf("failed to create subscription: %v", err)
}
iter := newMessageIterator(c.subc, fullyQualifiedSubName, &pullOptions{})
defer iter.stop()
iter.eoMu.RLock()
if iter.enableExactlyOnceDelivery {
t.Error("iter.enableExactlyOnceDelivery should be false")
}
iter.eoMu.RUnlock()
_, err = s.Update(ctx, SubscriptionConfigToUpdate{
EnableExactlyOnceDelivery: true,
})
if err != nil {
t.Error(err)
}
srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
// Receive one message via the stream to trigger the update to enableExactlyOnceDelivery
iter.receive(1)
iter.eoMu.RLock()
if !iter.enableExactlyOnceDelivery {
t.Error("iter.enableExactlyOnceDelivery should be true")
}
iter.eoMu.RUnlock()
}
func compareCompletedRetryLengths(t *testing.T, completed, retry map[string]*AckResult, wantCompleted, wantRetry int) {
if l := len(completed); l != wantCompleted {
t.Errorf("completed slice length got %d, want %d", l, wantCompleted)
}
if l := len(retry); l != wantRetry {
t.Errorf("retry slice length got %d, want %d", l, wantRetry)
}
}
func TestExactlyOnceProcessRequests(t *testing.T) {
ctx := context.Background()
t.Run("NoResults", func(t *testing.T) {
// If the ackResMap is nil, then the resulting slices should be empty.
// nil maps here behave the same as if they were empty maps.
completed, retry := processResults(nil, nil, nil)
compareCompletedRetryLengths(t, completed, retry, 0, 0)
})
t.Run("NoErrorsNilAckResult", func(t *testing.T) {
// No errors so request should be completed even without an AckResult.
ackReqMap := map[string]*AckResult{
"ackID": nil,
}
completed, retry := processResults(nil, ackReqMap, nil)
compareCompletedRetryLengths(t, completed, retry, 1, 0)
})
t.Run("NoErrors", func(t *testing.T) {
// No errors so AckResult should be completed with success.
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
completed, retry := processResults(nil, ackReqMap, nil)
compareCompletedRetryLengths(t, completed, retry, 1, 0)
// We can obtain the AckStatus from AckResult if results are completed.
s, err := r.Get(ctx)
if err != nil {
t.Errorf("AckResult err: got %v, want nil", err)
}
if s != AcknowledgeStatusSuccess {
t.Errorf("got %v, want AcknowledgeStatusSuccess", s)
}
})
t.Run("PermanentErrorInvalidAckID", func(t *testing.T) {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
errorsMap := map[string]string{
"ackID1": permanentInvalidAckErrString,
}
completed, retry := processResults(nil, ackReqMap, errorsMap)
compareCompletedRetryLengths(t, completed, retry, 1, 0)
s, err := r.Get(ctx)
if err == nil {
t.Error("AckResult err: got nil, want err")
}
if s != AcknowledgeStatusInvalidAckID {
t.Errorf("got %v, want AcknowledgeStatusSuccess", s)
}
})
t.Run("TransientErrorRetry", func(t *testing.T) {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
errorsMap := map[string]string{
"ackID1": transientErrStringPrefix + "_FAILURE",
}
completed, retry := processResults(nil, ackReqMap, errorsMap)
compareCompletedRetryLengths(t, completed, retry, 0, 1)
})
t.Run("UnknownError", func(t *testing.T) {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
errorsMap := map[string]string{
"ackID1": "unknown_error",
}
completed, retry := processResults(nil, ackReqMap, errorsMap)
compareCompletedRetryLengths(t, completed, retry, 1, 0)
s, err := r.Get(ctx)
if s != AcknowledgeStatusOther {
t.Errorf("got %v, want AcknowledgeStatusOther", s)
}
if err == nil || err.Error() != "unknown_error" {
t.Errorf("AckResult err: got %s, want unknown_error", err.Error())
}
})
t.Run("PermissionDenied", func(t *testing.T) {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
st := status.New(codes.PermissionDenied, "permission denied")
completed, retry := processResults(st, ackReqMap, nil)
compareCompletedRetryLengths(t, completed, retry, 1, 0)
s, err := r.Get(ctx)
if err == nil {
t.Error("AckResult err: got nil, want err")
}
if s != AcknowledgeStatusPermissionDenied {
t.Errorf("got %v, want AcknowledgeStatusPermissionDenied", s)
}
})
t.Run("FailedPrecondition", func(t *testing.T) {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
st := status.New(codes.FailedPrecondition, "failed_precondition")
completed, retry := processResults(st, ackReqMap, nil)
compareCompletedRetryLengths(t, completed, retry, 1, 0)
s, err := r.Get(ctx)
if err == nil {
t.Error("AckResult err: got nil, want err")
}
if s != AcknowledgeStatusFailedPrecondition {
t.Errorf("got %v, want AcknowledgeStatusFailedPrecondition", s)
}
})
t.Run("OtherErrorStatus", func(t *testing.T) {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
st := status.New(codes.OutOfRange, "out of range")
completed, retry := processResults(st, ackReqMap, nil)
compareCompletedRetryLengths(t, completed, retry, 1, 0)
s, err := r.Get(ctx)
if err == nil {
t.Error("AckResult err: got nil, want err")
}
if s != AcknowledgeStatusOther {
t.Errorf("got %v, want AcknowledgeStatusOther", s)
}
})
t.Run("MixedSuccessFailureAcks", func(t *testing.T) {
r1 := ipubsub.NewAckResult()
r2 := ipubsub.NewAckResult()
r3 := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r1,
"ackID2": r2,
"ackID3": r3,
}
errorsMap := map[string]string{
"ackID1": permanentInvalidAckErrString,
"ackID2": transientErrStringPrefix + "_FAILURE",
}
completed, retry := processResults(nil, ackReqMap, errorsMap)
compareCompletedRetryLengths(t, completed, retry, 2, 1)
// message with ackID "ackID1" fails
s, err := r1.Get(ctx)
if err == nil {
t.Error("r1: AckResult err: got nil, want err")
}
if s != AcknowledgeStatusInvalidAckID {
t.Errorf("r1: got %v, want AcknowledgeInvalidAckID", s)
}
// message with ackID "ackID2" is to be retried
ctx2, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
_, err = r2.Get(ctx2)
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("r2: AckResult.Get should timeout, got: %v", err)
}
// message with ackID "ackID3" succeeds
s, err = r3.Get(ctx)
if err != nil {
t.Errorf("r3: AckResult err: got %v, want nil\n", err)
}
if s != AcknowledgeStatusSuccess {
t.Errorf("r3: got %v, want AcknowledgeStatusSuccess", s)
}
})
t.Run("RetriableErrorStatusReturnsRequestForRetrying", func(t *testing.T) {
for c := range exactlyOnceDeliveryTemporaryRetryErrors {
r := ipubsub.NewAckResult()
ackReqMap := map[string]*AckResult{
"ackID1": r,
}
st := status.New(c, "")
completed, retry := processResults(st, ackReqMap, nil)
compareCompletedRetryLengths(t, completed, retry, 0, 1)
}
})
}