| // Copyright 2017 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package pubsub |
| |
| import ( |
| "bytes" |
| "context" |
| "errors" |
| "fmt" |
| "reflect" |
| "sync" |
| "sync/atomic" |
| "testing" |
| "time" |
| |
| "cloud.google.com/go/internal/testutil" |
| "cloud.google.com/go/pubsub/pstest" |
| "google.golang.org/api/option" |
| pb "google.golang.org/genproto/googleapis/pubsub/v1" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| var ( |
| projName = "some-project" |
| topicName = "some-topic" |
| fullyQualifiedTopicName = fmt.Sprintf("projects/%s/topics/%s", projName, topicName) |
| ) |
| |
| func TestSplitRequestIDs(t *testing.T) { |
| t.Parallel() |
| ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"} |
| for _, test := range []struct { |
| ids []string |
| splitIndex int |
| }{ |
| {[]string{}, 0}, |
| {ids, 2}, |
| {ids[:2], 2}, |
| } { |
| got1, got2 := splitRequestIDs(test.ids, 15) |
| want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:] |
| if !testutil.Equal(got1, want1) { |
| t.Errorf("%v, 1: got %v, want %v", test, got1, want1) |
| } |
| if !testutil.Equal(got2, want2) { |
| t.Errorf("%v, 2: got %v, want %v", test, got2, want2) |
| } |
| } |
| } |
| |
| func TestCalcFieldSize(t *testing.T) { |
| t.Parallel() |
| // Create a mock ack request to test. |
| req := &pb.AcknowledgeRequest{ |
| Subscription: "sub", |
| AckIds: []string{"aaa", "bbb", "ccc", "ddd", "eee"}, |
| } |
| size := calcFieldSizeString(req.Subscription) + calcFieldSizeString(req.AckIds...) |
| |
| // Proto encoding is calculated from 1 tag byte and 1 size byte for each string. |
| want := (1 + 1) + len(req.Subscription) + // subscription field: 1 tag byte + 1 size byte |
| 5*(1+1+3) // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)] |
| if size != want { |
| t.Errorf("pubsub: calculated ack req size of %d bytes, want %d", size, want) |
| } |
| |
| req.Subscription = string(bytes.Repeat([]byte{'A'}, 300)) |
| size = calcFieldSizeString(req.Subscription) + calcFieldSizeString(req.AckIds...) |
| |
| // With a longer subscription name, we use an extra size byte. |
| want = (1 + 2) + len(req.Subscription) + // subscription field: 1 tag byte + 2 size bytes |
| 5*(1+1+3) // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)] |
| if size != want { |
| t.Errorf("pubsub: calculated ack req size of %d bytes, want %d", size, want) |
| } |
| |
| // Create a mock modack request to test. |
| modAckReq := &pb.ModifyAckDeadlineRequest{ |
| Subscription: "sub", |
| AckIds: []string{"aaa", "bbb", "ccc", "ddd", "eee"}, |
| AckDeadlineSeconds: 300, |
| } |
| |
| size = calcFieldSizeString(modAckReq.Subscription) + |
| calcFieldSizeString(modAckReq.AckIds...) + |
| calcFieldSizeInt(int(modAckReq.AckDeadlineSeconds)) |
| |
| want = (1 + 1) + len(modAckReq.Subscription) + // subscription field: 1 tag byte + 1 size byte |
| 5*(1+1+3) + // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)] |
| (1 + 2) // ackDeadline: 1 tag byte + 2 size bytes |
| if size != want { |
| t.Errorf("pubsub: calculated modAck req size of %d bytes, want %d", size, want) |
| } |
| } |
| |
| func TestMaxExtensionPeriod(t *testing.T) { |
| srv := pstest.NewServer() |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| |
| srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) |
| |
| _, client, err := initConn(ctx, srv.Addr) |
| if err != nil { |
| t.Fatal(err) |
| } |
| want := time.Duration(1) * time.Second |
| iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &want, &pullOptions{}) |
| |
| receiveTime := time.Now().Add(time.Duration(-3) * time.Second) |
| iter.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second)) |
| |
| if got := iter.ackDeadline(); got != want { |
| t.Fatalf("deadline got = %v, want %v", got, want) |
| } |
| } |
| |
| func TestAckDistribution(t *testing.T) { |
| if testing.Short() { |
| t.SkipNow() |
| } |
| t.Skip("broken") |
| |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| |
| minAckDeadline = 1 * time.Second |
| pstest.SetMinAckDeadline(minAckDeadline) |
| srv := pstest.NewServer() |
| defer srv.Close() |
| defer pstest.ResetMinAckDeadline() |
| |
| // Create the topic via a Publish. It's convenient to do it here as opposed to client.CreateTopic because the client |
| // has not been established yet, and also because we want to create the topic once whereas the client is established |
| // below twice. |
| srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) |
| |
| queuedMsgs := make(chan int32, 1024) |
| go continuouslySend(ctx, srv, queuedMsgs) |
| |
| for _, testcase := range []struct { |
| initialProcessSecs int32 |
| finalProcessSecs int32 |
| }{ |
| {initialProcessSecs: 3, finalProcessSecs: 5}, // Process time goes up |
| {initialProcessSecs: 5, finalProcessSecs: 3}, // Process time goes down |
| } { |
| t.Logf("Testing %d -> %d", testcase.initialProcessSecs, testcase.finalProcessSecs) |
| |
| // processTimeSecs is used by the sender to coordinate with the receiver how long the receiver should |
| // pretend to process for. e.g. if we test 3s -> 5s, processTimeSecs will start at 3, causing receiver |
| // to process messages received for 3s while sender sends the first batch. Then, as sender begins to |
| // send the next batch, sender will swap processTimeSeconds to 5s and begin sending, and receiver will |
| // process each message for 5s. In this way we simulate a client whose time-to-ack (process time) changes. |
| processTimeSecs := testcase.initialProcessSecs |
| |
| s, client, err := initConn(ctx, srv.Addr) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| // recvdWg increments for each message sent, and decrements for each message received. |
| recvdWg := &sync.WaitGroup{} |
| |
| go startReceiving(ctx, t, s, recvdWg, &processTimeSecs) |
| startSending(t, queuedMsgs, &processTimeSecs, testcase.initialProcessSecs, testcase.finalProcessSecs, recvdWg) |
| |
| recvdWg.Wait() |
| time.Sleep(100 * time.Millisecond) // Wait a bit more for resources to clean up |
| err = client.Close() |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| modacks := modacksByTime(srv.Messages()) |
| u := modackDeadlines(modacks) |
| initialDL := int32(minAckDeadline / time.Second) |
| if !setsAreEqual(u, []int32{initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs}) { |
| t.Fatalf("Expected modack deadlines to contain (exactly, and only) %ds, %ds, %ds. Instead, got %v", |
| initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs, toSet(u)) |
| } |
| } |
| } |
| |
| // modacksByTime buckets modacks by time. |
| func modacksByTime(msgs []*pstest.Message) map[time.Time][]pstest.Modack { |
| modacks := map[time.Time][]pstest.Modack{} |
| |
| for _, msg := range msgs { |
| for _, m := range msg.Modacks { |
| modacks[m.ReceivedAt] = append(modacks[m.ReceivedAt], m) |
| } |
| } |
| return modacks |
| } |
| |
| // setsAreEqual reports whether a and b contain the same values, ignoring duplicates. |
| func setsAreEqual(haystack, needles []int32) bool { |
| hMap := map[int32]bool{} |
| nMap := map[int32]bool{} |
| |
| for _, n := range needles { |
| nMap[n] = true |
| } |
| |
| for _, n := range haystack { |
| hMap[n] = true |
| } |
| |
| return reflect.DeepEqual(nMap, hMap) |
| } |
| |
| // startReceiving pretends to be a client. It calls s.Receive and acks messages after some random delay. It also |
| // looks out for dupes - any message that arrives twice will cause a failure. |
| func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg *sync.WaitGroup, processTimeSecs *int32) { |
| t.Log("Receiving..") |
| |
| var recvdMu sync.Mutex |
| recvd := map[string]bool{} |
| |
| err := s.Receive(ctx, func(ctx context.Context, msg *Message) { |
| msgData := string(msg.Data) |
| recvdMu.Lock() |
| _, ok := recvd[msgData] |
| if ok { |
| recvdMu.Unlock() |
| t.Fatalf("already saw \"%s\"\n", msgData) |
| return |
| } |
| recvd[msgData] = true |
| recvdMu.Unlock() |
| |
| select { |
| case <-ctx.Done(): |
| msg.Nack() |
| recvdWg.Done() |
| case <-time.After(time.Duration(atomic.LoadInt32(processTimeSecs)) * time.Second): |
| msg.Ack() |
| recvdWg.Done() |
| } |
| }) |
| if err != nil { |
| if status.Code(err) != codes.Canceled { |
| t.Error(err) |
| } |
| } |
| } |
| |
| // startSending sends four batches of messages broken up by minDeadline, initialProcessSecs, and finalProcessSecs. |
| func startSending(t *testing.T, queuedMsgs chan int32, processTimeSecs *int32, initialProcessSecs int32, finalProcessSecs int32, recvdWg *sync.WaitGroup) { |
| var msg int32 |
| |
| // We must send this block to force the receiver to send its initially-configured modack time. The time that |
| // gets sent should be ignorant of the distribution, since there haven't been enough (any, actually) messages |
| // to create a distribution yet. |
| t.Log("minAckDeadlineSecsSending an initial message") |
| recvdWg.Add(1) |
| msg++ |
| queuedMsgs <- msg |
| <-time.After(minAckDeadline) |
| |
| t.Logf("Sending some messages to update distribution to %d. This new distribution will be used "+ |
| "when the next batch of messages go out.", initialProcessSecs) |
| for i := 0; i < 10; i++ { |
| recvdWg.Add(1) |
| msg++ |
| queuedMsgs <- msg |
| } |
| atomic.SwapInt32(processTimeSecs, finalProcessSecs) |
| <-time.After(time.Duration(initialProcessSecs) * time.Second) |
| |
| t.Logf("Sending many messages to update distribution to %d. This new distribution will be used "+ |
| "when the next batch of messages go out.", finalProcessSecs) |
| for i := 0; i < 100; i++ { |
| recvdWg.Add(1) |
| msg++ |
| queuedMsgs <- msg // Send many messages to drastically change distribution |
| } |
| <-time.After(time.Duration(finalProcessSecs) * time.Second) |
| |
| t.Logf("Last message going out, whose deadline should be %d.", finalProcessSecs) |
| recvdWg.Add(1) |
| msg++ |
| queuedMsgs <- msg |
| } |
| |
| // continuouslySend continuously sends messages that exist on the queuedMsgs chan. |
| func continuouslySend(ctx context.Context, srv *pstest.Server, queuedMsgs chan int32) { |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| case m := <-queuedMsgs: |
| srv.Publish(fullyQualifiedTopicName, []byte(fmt.Sprintf("message %d", m)), nil) |
| } |
| } |
| } |
| |
| func toSet(arr []int32) []int32 { |
| var s []int32 |
| m := map[int32]bool{} |
| |
| for _, v := range arr { |
| _, ok := m[v] |
| if !ok { |
| s = append(s, v) |
| m[v] = true |
| } |
| } |
| |
| return s |
| |
| } |
| |
| func initConn(ctx context.Context, addr string) (*Subscription, *Client, error) { |
| conn, err := grpc.Dial(addr, grpc.WithInsecure()) |
| if err != nil { |
| return nil, nil, err |
| } |
| e := testutil.DefaultHeadersEnforcer() |
| opts := append(e.CallOptions(), option.WithGRPCConn(conn)) |
| client, err := NewClient(ctx, projName, opts...) |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| topic := client.Topic(topicName) |
| s, err := client.CreateSubscription(ctx, fmt.Sprintf("sub-%d", time.Now().UnixNano()), SubscriptionConfig{Topic: topic}) |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| exists, err := s.Exists(ctx) |
| if !exists { |
| return nil, nil, errors.New("Subscription does not exist") |
| } |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| return s, client, nil |
| } |
| |
| // modackDeadlines takes a map of time => Modack, gathers all the Modack.AckDeadlines, |
| // and returns them as a slice |
| func modackDeadlines(m map[time.Time][]pstest.Modack) []int32 { |
| var u []int32 |
| for _, vv := range m { |
| for _, v := range vv { |
| u = append(u, v.AckDeadline) |
| } |
| } |
| return u |
| } |