feat(pubsublite): Test utils for streams (#3153)
Adds the remaining Pub/Sub Lite streaming RPCs to the mock server. Adds utilities for crafting requests and responses.
diff --git a/pubsublite/internal/test/mock.go b/pubsublite/internal/test/mock.go
index 0c8c5a4..d27a437 100644
--- a/pubsublite/internal/test/mock.go
+++ b/pubsublite/internal/test/mock.go
@@ -47,6 +47,13 @@
OnTestEnd()
// AddPublishStream adds a verifier for a publish stream of a topic partition.
AddPublishStream(topic string, partition int, streamVerifier *RPCVerifier)
+ // AddSubscribeStream adds a verifier for a subscribe stream of a partition.
+ AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier)
+ // AddCommitStream adds a verifier for a commit stream of a partition.
+ AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier)
+ // AddAssignmentStream adds a verifier for a partition assignment stream for a
+ // subscription.
+ AddAssignmentStream(subscription string, streamVerifier *RPCVerifier)
}
// NewServer creates a new mock Pub/Sub Lite server.
@@ -58,6 +65,9 @@
liteServer := newMockLiteServer()
pb.RegisterAdminServiceServer(srv.Gsrv, liteServer)
pb.RegisterPublisherServiceServer(srv.Gsrv, liteServer)
+ pb.RegisterSubscriberServiceServer(srv.Gsrv, liteServer)
+ pb.RegisterCursorServiceServer(srv.Gsrv, liteServer)
+ pb.RegisterPartitionAssignmentServiceServer(srv.Gsrv, liteServer)
srv.Start()
return &Server{LiteServer: liteServer, gRPCServer: srv}, nil
}
@@ -81,6 +91,9 @@
type mockLiteServer struct {
pb.AdminServiceServer
pb.PublisherServiceServer
+ pb.SubscriberServiceServer
+ pb.CursorServiceServer
+ pb.PartitionAssignmentServiceServer
mu sync.Mutex
@@ -88,8 +101,11 @@
// test begins.
globalVerifier *RPCVerifier
- // Publish stream verifiers by topic & partition.
- publishVerifiers *keyedStreamVerifiers
+ // Stream verifiers by key.
+ publishVerifiers *keyedStreamVerifiers
+ subscribeVerifiers *keyedStreamVerifiers
+ commitVerifiers *keyedStreamVerifiers
+ assignmentVerifiers *keyedStreamVerifiers
nextStreamID int
activeStreams map[int]*streamHolder
@@ -102,8 +118,11 @@
func newMockLiteServer() *mockLiteServer {
return &mockLiteServer{
- publishVerifiers: newKeyedStreamVerifiers(),
- activeStreams: make(map[int]*streamHolder),
+ publishVerifiers: newKeyedStreamVerifiers(),
+ subscribeVerifiers: newKeyedStreamVerifiers(),
+ commitVerifiers: newKeyedStreamVerifiers(),
+ assignmentVerifiers: newKeyedStreamVerifiers(),
+ activeStreams: make(map[int]*streamHolder),
}
}
@@ -188,6 +207,9 @@
s.testActive = true
s.globalVerifier = globalVerifier
s.publishVerifiers.Reset()
+ s.subscribeVerifiers.Reset()
+ s.commitVerifiers.Reset()
+ s.assignmentVerifiers.Reset()
s.activeStreams = make(map[int]*streamHolder)
}
@@ -211,6 +233,24 @@
s.publishVerifiers.Push(key(topic, partition), streamVerifier)
}
+func (s *mockLiteServer) AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.subscribeVerifiers.Push(key(subscription, partition), streamVerifier)
+}
+
+func (s *mockLiteServer) AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.commitVerifiers.Push(key(subscription, partition), streamVerifier)
+}
+
+func (s *mockLiteServer) AddAssignmentStream(subscription string, streamVerifier *RPCVerifier) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.assignmentVerifiers.Push(subscription, streamVerifier)
+}
+
// PublisherService implementation.
func (s *mockLiteServer) Publish(stream pb.PublisherService_PublishServer) error {
@@ -227,6 +267,53 @@
return s.handleStream(stream, req, reflect.TypeOf(pb.PublishRequest{}), k, s.publishVerifiers)
}
+// SubscriberService implementation.
+
+func (s *mockLiteServer) Subscribe(stream pb.SubscriberService_SubscribeServer) error {
+ req, err := stream.Recv()
+ if err != nil {
+ return status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error before initial request: %v", err)
+ }
+ if len(req.GetInitial().GetSubscription()) == 0 {
+ return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial subscribe request: %v", req)
+ }
+
+ initReq := req.GetInitial()
+ k := key(initReq.GetSubscription(), int(initReq.GetPartition()))
+ return s.handleStream(stream, req, reflect.TypeOf(pb.SubscribeRequest{}), k, s.subscribeVerifiers)
+}
+
+// CursorService implementation.
+
+func (s *mockLiteServer) StreamingCommitCursor(stream pb.CursorService_StreamingCommitCursorServer) error {
+ req, err := stream.Recv()
+ if err != nil {
+ return status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error before initial request: %v", err)
+ }
+ if len(req.GetInitial().GetSubscription()) == 0 {
+ return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial streaming commit cursor request: %v", req)
+ }
+
+ initReq := req.GetInitial()
+ k := key(initReq.GetSubscription(), int(initReq.GetPartition()))
+ return s.handleStream(stream, req, reflect.TypeOf(pb.StreamingCommitCursorRequest{}), k, s.commitVerifiers)
+}
+
+// PartitionAssignmentService implementation.
+
+func (s *mockLiteServer) AssignPartitions(stream pb.PartitionAssignmentService_AssignPartitionsServer) error {
+ req, err := stream.Recv()
+ if err != nil {
+ return status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error before initial request: %v", err)
+ }
+ if len(req.GetInitial().GetSubscription()) == 0 {
+ return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial partition assignment request: %v", req)
+ }
+
+ k := req.GetInitial().GetSubscription()
+ return s.handleStream(stream, req, reflect.TypeOf(pb.PartitionAssignmentRequest{}), k, s.assignmentVerifiers)
+}
+
// AdminService implementation.
func (s *mockLiteServer) GetTopicPartitions(ctx context.Context, req *pb.GetTopicPartitionsRequest) (*pb.TopicPartitions, error) {
diff --git a/pubsublite/internal/test/util.go b/pubsublite/internal/test/util.go
index 5486c13..979c8cc 100644
--- a/pubsublite/internal/test/util.go
+++ b/pubsublite/internal/test/util.go
@@ -14,6 +14,8 @@
package test
import (
+ "strings"
+
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/codes"
@@ -36,6 +38,11 @@
return false
}
+// ErrorHasMsg returns true if an error message contains the desired substring.
+func ErrorHasMsg(got error, wantStr string) bool {
+ return strings.Index(got.Error(), wantStr) >= 0
+}
+
// FakeSource is a fake source that returns a configurable constant.
type FakeSource struct {
Ret int64
diff --git a/pubsublite/internal/wire/flow_control_test.go b/pubsublite/internal/wire/flow_control_test.go
index ca6d066..f38255b 100644
--- a/pubsublite/internal/wire/flow_control_test.go
+++ b/pubsublite/internal/wire/flow_control_test.go
@@ -25,25 +25,6 @@
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
-func flowControlReq(tokens flowControlTokens) *pb.FlowControlRequest {
- return &pb.FlowControlRequest{
- AllowedBytes: tokens.Bytes,
- AllowedMessages: tokens.Messages,
- }
-}
-
-func seqMsgWithOffset(offset int64) *pb.SequencedMessage {
- return &pb.SequencedMessage{
- Cursor: &pb.Cursor{Offset: offset},
- }
-}
-
-func seqMsgWithSizeBytes(size int64) *pb.SequencedMessage {
- return &pb.SequencedMessage{
- SizeBytes: size,
- }
-}
-
func TestTokenCounterAdd(t *testing.T) {
// Note: tests are applied to this counter instance in order.
counter := tokenCounter{}
diff --git a/pubsublite/internal/wire/requests_test.go b/pubsublite/internal/wire/requests_test.go
new file mode 100644
index 0000000..94944a9
--- /dev/null
+++ b/pubsublite/internal/wire/requests_test.go
@@ -0,0 +1,218 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package wire
+
+import pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
+
+// AdminService
+
+func topicPartitionsReq(topicPath string) *pb.GetTopicPartitionsRequest {
+ return &pb.GetTopicPartitionsRequest{Name: topicPath}
+}
+
+func topicPartitionsResp(count int) *pb.TopicPartitions {
+ return &pb.TopicPartitions{PartitionCount: int64(count)}
+}
+
+// CursorService
+
+func initCommitReq(subscription subscriptionPartition) *pb.StreamingCommitCursorRequest {
+ return &pb.StreamingCommitCursorRequest{
+ Request: &pb.StreamingCommitCursorRequest_Initial{
+ Initial: &pb.InitialCommitCursorRequest{
+ Subscription: subscription.Path,
+ Partition: int64(subscription.Partition),
+ },
+ },
+ }
+}
+
+func initCommitResp() *pb.StreamingCommitCursorResponse {
+ return &pb.StreamingCommitCursorResponse{
+ Request: &pb.StreamingCommitCursorResponse_Initial{
+ Initial: &pb.InitialCommitCursorResponse{},
+ },
+ }
+}
+
+func commitReq(offset int64) *pb.StreamingCommitCursorRequest {
+ return &pb.StreamingCommitCursorRequest{
+ Request: &pb.StreamingCommitCursorRequest_Commit{
+ Commit: &pb.SequencedCommitCursorRequest{
+ Cursor: &pb.Cursor{Offset: offset},
+ },
+ },
+ }
+}
+
+func commitResp(numAck int) *pb.StreamingCommitCursorResponse {
+ return &pb.StreamingCommitCursorResponse{
+ Request: &pb.StreamingCommitCursorResponse_Commit{
+ Commit: &pb.SequencedCommitCursorResponse{
+ AcknowledgedCommits: int64(numAck),
+ },
+ },
+ }
+}
+
+// PartitionAssignmentService
+
+func initAssignmentReq(subscription string, clientID []byte) *pb.PartitionAssignmentRequest {
+ return &pb.PartitionAssignmentRequest{
+ Request: &pb.PartitionAssignmentRequest_Initial{
+ Initial: &pb.InitialPartitionAssignmentRequest{
+ Subscription: subscription,
+ ClientId: clientID,
+ },
+ },
+ }
+}
+
+func assignmentAckReq() *pb.PartitionAssignmentRequest {
+ return &pb.PartitionAssignmentRequest{
+ Request: &pb.PartitionAssignmentRequest_Ack{
+ Ack: &pb.PartitionAssignmentAck{},
+ },
+ }
+}
+
+func assignmentResp(partitions []int64) *pb.PartitionAssignment {
+ return &pb.PartitionAssignment{
+ Partitions: partitions,
+ }
+}
+
+// PublisherService
+
+func initPubReq(topic topicPartition) *pb.PublishRequest {
+ return &pb.PublishRequest{
+ RequestType: &pb.PublishRequest_InitialRequest{
+ InitialRequest: &pb.InitialPublishRequest{
+ Topic: topic.Path,
+ Partition: int64(topic.Partition),
+ },
+ },
+ }
+}
+
+func initPubResp() *pb.PublishResponse {
+ return &pb.PublishResponse{
+ ResponseType: &pb.PublishResponse_InitialResponse{
+ InitialResponse: &pb.InitialPublishResponse{},
+ },
+ }
+}
+
+func msgPubReq(msgs ...*pb.PubSubMessage) *pb.PublishRequest {
+ return &pb.PublishRequest{
+ RequestType: &pb.PublishRequest_MessagePublishRequest{
+ MessagePublishRequest: &pb.MessagePublishRequest{Messages: msgs},
+ },
+ }
+}
+
+func msgPubResp(cursor int64) *pb.PublishResponse {
+ return &pb.PublishResponse{
+ ResponseType: &pb.PublishResponse_MessageResponse{
+ MessageResponse: &pb.MessagePublishResponse{
+ StartCursor: &pb.Cursor{Offset: cursor},
+ },
+ },
+ }
+}
+
+// SubscriberService
+
+func initSubReq(subscription subscriptionPartition) *pb.SubscribeRequest {
+ return &pb.SubscribeRequest{
+ Request: &pb.SubscribeRequest_Initial{
+ Initial: &pb.InitialSubscribeRequest{
+ Subscription: subscription.Path,
+ Partition: int64(subscription.Partition),
+ },
+ },
+ }
+}
+
+func initSubResp() *pb.SubscribeResponse {
+ return &pb.SubscribeResponse{
+ Response: &pb.SubscribeResponse_Initial{
+ Initial: &pb.InitialSubscribeResponse{},
+ },
+ }
+}
+
+func seekReq(offset int64) *pb.SubscribeRequest {
+ return &pb.SubscribeRequest{
+ Request: &pb.SubscribeRequest_Seek{
+ Seek: &pb.SeekRequest{
+ Target: &pb.SeekRequest_Cursor{
+ Cursor: &pb.Cursor{Offset: offset},
+ },
+ },
+ },
+ }
+}
+
+func seekResp(offset int64) *pb.SubscribeResponse {
+ return &pb.SubscribeResponse{
+ Response: &pb.SubscribeResponse_Seek{
+ Seek: &pb.SeekResponse{
+ Cursor: &pb.Cursor{Offset: offset},
+ },
+ },
+ }
+}
+
+func flowControlReq(tokens flowControlTokens) *pb.FlowControlRequest {
+ return &pb.FlowControlRequest{
+ AllowedBytes: tokens.Bytes,
+ AllowedMessages: tokens.Messages,
+ }
+}
+
+func flowControlSubReq(tokens flowControlTokens) *pb.SubscribeRequest {
+ return &pb.SubscribeRequest{
+ Request: &pb.SubscribeRequest_FlowControl{
+ FlowControl: flowControlReq(tokens),
+ },
+ }
+}
+
+func seqMsgWithOffset(offset int64) *pb.SequencedMessage {
+ return &pb.SequencedMessage{
+ Cursor: &pb.Cursor{Offset: offset},
+ }
+}
+
+func seqMsgWithSizeBytes(size int64) *pb.SequencedMessage {
+ return &pb.SequencedMessage{
+ SizeBytes: size,
+ }
+}
+
+func seqMsgWithOffsetAndSize(offset, size int64) *pb.SequencedMessage {
+ return &pb.SequencedMessage{
+ Cursor: &pb.Cursor{Offset: offset},
+ SizeBytes: size,
+ }
+}
+
+func msgSubResp(msgs ...*pb.SequencedMessage) *pb.SubscribeResponse {
+ return &pb.SubscribeResponse{
+ Response: &pb.SubscribeResponse_Messages{
+ Messages: &pb.MessageResponse{Messages: msgs},
+ },
+ }
+}