feat(pubsublite): Test utils for streams (#3153)

Adds the remaining Pub/Sub Lite streaming RPCs to the mock server. Adds utilities for crafting requests and responses.
diff --git a/pubsublite/internal/test/mock.go b/pubsublite/internal/test/mock.go
index 0c8c5a4..d27a437 100644
--- a/pubsublite/internal/test/mock.go
+++ b/pubsublite/internal/test/mock.go
@@ -47,6 +47,13 @@
 	OnTestEnd()
 	// AddPublishStream adds a verifier for a publish stream of a topic partition.
 	AddPublishStream(topic string, partition int, streamVerifier *RPCVerifier)
+	// AddSubscribeStream adds a verifier for a subscribe stream of a partition.
+	AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier)
+	// AddCommitStream adds a verifier for a commit stream of a partition.
+	AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier)
+	// AddAssignmentStream adds a verifier for a partition assignment stream for a
+	// subscription.
+	AddAssignmentStream(subscription string, streamVerifier *RPCVerifier)
 }
 
 // NewServer creates a new mock Pub/Sub Lite server.
@@ -58,6 +65,9 @@
 	liteServer := newMockLiteServer()
 	pb.RegisterAdminServiceServer(srv.Gsrv, liteServer)
 	pb.RegisterPublisherServiceServer(srv.Gsrv, liteServer)
+	pb.RegisterSubscriberServiceServer(srv.Gsrv, liteServer)
+	pb.RegisterCursorServiceServer(srv.Gsrv, liteServer)
+	pb.RegisterPartitionAssignmentServiceServer(srv.Gsrv, liteServer)
 	srv.Start()
 	return &Server{LiteServer: liteServer, gRPCServer: srv}, nil
 }
@@ -81,6 +91,9 @@
 type mockLiteServer struct {
 	pb.AdminServiceServer
 	pb.PublisherServiceServer
+	pb.SubscriberServiceServer
+	pb.CursorServiceServer
+	pb.PartitionAssignmentServiceServer
 
 	mu sync.Mutex
 
@@ -88,8 +101,11 @@
 	// test begins.
 	globalVerifier *RPCVerifier
 
-	// Publish stream verifiers by topic & partition.
-	publishVerifiers *keyedStreamVerifiers
+	// Stream verifiers by key.
+	publishVerifiers    *keyedStreamVerifiers
+	subscribeVerifiers  *keyedStreamVerifiers
+	commitVerifiers     *keyedStreamVerifiers
+	assignmentVerifiers *keyedStreamVerifiers
 
 	nextStreamID  int
 	activeStreams map[int]*streamHolder
@@ -102,8 +118,11 @@
 
 func newMockLiteServer() *mockLiteServer {
 	return &mockLiteServer{
-		publishVerifiers: newKeyedStreamVerifiers(),
-		activeStreams:    make(map[int]*streamHolder),
+		publishVerifiers:    newKeyedStreamVerifiers(),
+		subscribeVerifiers:  newKeyedStreamVerifiers(),
+		commitVerifiers:     newKeyedStreamVerifiers(),
+		assignmentVerifiers: newKeyedStreamVerifiers(),
+		activeStreams:       make(map[int]*streamHolder),
 	}
 }
 
@@ -188,6 +207,9 @@
 	s.testActive = true
 	s.globalVerifier = globalVerifier
 	s.publishVerifiers.Reset()
+	s.subscribeVerifiers.Reset()
+	s.commitVerifiers.Reset()
+	s.assignmentVerifiers.Reset()
 	s.activeStreams = make(map[int]*streamHolder)
 }
 
@@ -211,6 +233,24 @@
 	s.publishVerifiers.Push(key(topic, partition), streamVerifier)
 }
 
+func (s *mockLiteServer) AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.subscribeVerifiers.Push(key(subscription, partition), streamVerifier)
+}
+
+func (s *mockLiteServer) AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.commitVerifiers.Push(key(subscription, partition), streamVerifier)
+}
+
+func (s *mockLiteServer) AddAssignmentStream(subscription string, streamVerifier *RPCVerifier) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.assignmentVerifiers.Push(subscription, streamVerifier)
+}
+
 // PublisherService implementation.
 
 func (s *mockLiteServer) Publish(stream pb.PublisherService_PublishServer) error {
@@ -227,6 +267,53 @@
 	return s.handleStream(stream, req, reflect.TypeOf(pb.PublishRequest{}), k, s.publishVerifiers)
 }
 
+// SubscriberService implementation.
+
+func (s *mockLiteServer) Subscribe(stream pb.SubscriberService_SubscribeServer) error {
+	req, err := stream.Recv()
+	if err != nil {
+		return status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error before initial request: %v", err)
+	}
+	if len(req.GetInitial().GetSubscription()) == 0 {
+		return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial subscribe request: %v", req)
+	}
+
+	initReq := req.GetInitial()
+	k := key(initReq.GetSubscription(), int(initReq.GetPartition()))
+	return s.handleStream(stream, req, reflect.TypeOf(pb.SubscribeRequest{}), k, s.subscribeVerifiers)
+}
+
+// CursorService implementation.
+
+func (s *mockLiteServer) StreamingCommitCursor(stream pb.CursorService_StreamingCommitCursorServer) error {
+	req, err := stream.Recv()
+	if err != nil {
+		return status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error before initial request: %v", err)
+	}
+	if len(req.GetInitial().GetSubscription()) == 0 {
+		return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial streaming commit cursor request: %v", req)
+	}
+
+	initReq := req.GetInitial()
+	k := key(initReq.GetSubscription(), int(initReq.GetPartition()))
+	return s.handleStream(stream, req, reflect.TypeOf(pb.StreamingCommitCursorRequest{}), k, s.commitVerifiers)
+}
+
+// PartitionAssignmentService implementation.
+
+func (s *mockLiteServer) AssignPartitions(stream pb.PartitionAssignmentService_AssignPartitionsServer) error {
+	req, err := stream.Recv()
+	if err != nil {
+		return status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error before initial request: %v", err)
+	}
+	if len(req.GetInitial().GetSubscription()) == 0 {
+		return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial partition assignment request: %v", req)
+	}
+
+	k := req.GetInitial().GetSubscription()
+	return s.handleStream(stream, req, reflect.TypeOf(pb.PartitionAssignmentRequest{}), k, s.assignmentVerifiers)
+}
+
 // AdminService implementation.
 
 func (s *mockLiteServer) GetTopicPartitions(ctx context.Context, req *pb.GetTopicPartitionsRequest) (*pb.TopicPartitions, error) {
diff --git a/pubsublite/internal/test/util.go b/pubsublite/internal/test/util.go
index 5486c13..979c8cc 100644
--- a/pubsublite/internal/test/util.go
+++ b/pubsublite/internal/test/util.go
@@ -14,6 +14,8 @@
 package test
 
 import (
+	"strings"
+
 	"github.com/google/go-cmp/cmp"
 	"github.com/google/go-cmp/cmp/cmpopts"
 	"google.golang.org/grpc/codes"
@@ -36,6 +38,11 @@
 	return false
 }
 
+// ErrorHasMsg returns true if an error message contains the desired substring.
+func ErrorHasMsg(got error, wantStr string) bool {
+	return strings.Index(got.Error(), wantStr) >= 0
+}
+
 // FakeSource is a fake source that returns a configurable constant.
 type FakeSource struct {
 	Ret int64
diff --git a/pubsublite/internal/wire/flow_control_test.go b/pubsublite/internal/wire/flow_control_test.go
index ca6d066..f38255b 100644
--- a/pubsublite/internal/wire/flow_control_test.go
+++ b/pubsublite/internal/wire/flow_control_test.go
@@ -25,25 +25,6 @@
 	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
 )
 
-func flowControlReq(tokens flowControlTokens) *pb.FlowControlRequest {
-	return &pb.FlowControlRequest{
-		AllowedBytes:    tokens.Bytes,
-		AllowedMessages: tokens.Messages,
-	}
-}
-
-func seqMsgWithOffset(offset int64) *pb.SequencedMessage {
-	return &pb.SequencedMessage{
-		Cursor: &pb.Cursor{Offset: offset},
-	}
-}
-
-func seqMsgWithSizeBytes(size int64) *pb.SequencedMessage {
-	return &pb.SequencedMessage{
-		SizeBytes: size,
-	}
-}
-
 func TestTokenCounterAdd(t *testing.T) {
 	// Note: tests are applied to this counter instance in order.
 	counter := tokenCounter{}
diff --git a/pubsublite/internal/wire/requests_test.go b/pubsublite/internal/wire/requests_test.go
new file mode 100644
index 0000000..94944a9
--- /dev/null
+++ b/pubsublite/internal/wire/requests_test.go
@@ -0,0 +1,218 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package wire
+
+import pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
+
+// AdminService
+
+func topicPartitionsReq(topicPath string) *pb.GetTopicPartitionsRequest {
+	return &pb.GetTopicPartitionsRequest{Name: topicPath}
+}
+
+func topicPartitionsResp(count int) *pb.TopicPartitions {
+	return &pb.TopicPartitions{PartitionCount: int64(count)}
+}
+
+// CursorService
+
+func initCommitReq(subscription subscriptionPartition) *pb.StreamingCommitCursorRequest {
+	return &pb.StreamingCommitCursorRequest{
+		Request: &pb.StreamingCommitCursorRequest_Initial{
+			Initial: &pb.InitialCommitCursorRequest{
+				Subscription: subscription.Path,
+				Partition:    int64(subscription.Partition),
+			},
+		},
+	}
+}
+
+func initCommitResp() *pb.StreamingCommitCursorResponse {
+	return &pb.StreamingCommitCursorResponse{
+		Request: &pb.StreamingCommitCursorResponse_Initial{
+			Initial: &pb.InitialCommitCursorResponse{},
+		},
+	}
+}
+
+func commitReq(offset int64) *pb.StreamingCommitCursorRequest {
+	return &pb.StreamingCommitCursorRequest{
+		Request: &pb.StreamingCommitCursorRequest_Commit{
+			Commit: &pb.SequencedCommitCursorRequest{
+				Cursor: &pb.Cursor{Offset: offset},
+			},
+		},
+	}
+}
+
+func commitResp(numAck int) *pb.StreamingCommitCursorResponse {
+	return &pb.StreamingCommitCursorResponse{
+		Request: &pb.StreamingCommitCursorResponse_Commit{
+			Commit: &pb.SequencedCommitCursorResponse{
+				AcknowledgedCommits: int64(numAck),
+			},
+		},
+	}
+}
+
+// PartitionAssignmentService
+
+func initAssignmentReq(subscription string, clientID []byte) *pb.PartitionAssignmentRequest {
+	return &pb.PartitionAssignmentRequest{
+		Request: &pb.PartitionAssignmentRequest_Initial{
+			Initial: &pb.InitialPartitionAssignmentRequest{
+				Subscription: subscription,
+				ClientId:     clientID,
+			},
+		},
+	}
+}
+
+func assignmentAckReq() *pb.PartitionAssignmentRequest {
+	return &pb.PartitionAssignmentRequest{
+		Request: &pb.PartitionAssignmentRequest_Ack{
+			Ack: &pb.PartitionAssignmentAck{},
+		},
+	}
+}
+
+func assignmentResp(partitions []int64) *pb.PartitionAssignment {
+	return &pb.PartitionAssignment{
+		Partitions: partitions,
+	}
+}
+
+// PublisherService
+
+func initPubReq(topic topicPartition) *pb.PublishRequest {
+	return &pb.PublishRequest{
+		RequestType: &pb.PublishRequest_InitialRequest{
+			InitialRequest: &pb.InitialPublishRequest{
+				Topic:     topic.Path,
+				Partition: int64(topic.Partition),
+			},
+		},
+	}
+}
+
+func initPubResp() *pb.PublishResponse {
+	return &pb.PublishResponse{
+		ResponseType: &pb.PublishResponse_InitialResponse{
+			InitialResponse: &pb.InitialPublishResponse{},
+		},
+	}
+}
+
+func msgPubReq(msgs ...*pb.PubSubMessage) *pb.PublishRequest {
+	return &pb.PublishRequest{
+		RequestType: &pb.PublishRequest_MessagePublishRequest{
+			MessagePublishRequest: &pb.MessagePublishRequest{Messages: msgs},
+		},
+	}
+}
+
+func msgPubResp(cursor int64) *pb.PublishResponse {
+	return &pb.PublishResponse{
+		ResponseType: &pb.PublishResponse_MessageResponse{
+			MessageResponse: &pb.MessagePublishResponse{
+				StartCursor: &pb.Cursor{Offset: cursor},
+			},
+		},
+	}
+}
+
+// SubscriberService
+
+func initSubReq(subscription subscriptionPartition) *pb.SubscribeRequest {
+	return &pb.SubscribeRequest{
+		Request: &pb.SubscribeRequest_Initial{
+			Initial: &pb.InitialSubscribeRequest{
+				Subscription: subscription.Path,
+				Partition:    int64(subscription.Partition),
+			},
+		},
+	}
+}
+
+func initSubResp() *pb.SubscribeResponse {
+	return &pb.SubscribeResponse{
+		Response: &pb.SubscribeResponse_Initial{
+			Initial: &pb.InitialSubscribeResponse{},
+		},
+	}
+}
+
+func seekReq(offset int64) *pb.SubscribeRequest {
+	return &pb.SubscribeRequest{
+		Request: &pb.SubscribeRequest_Seek{
+			Seek: &pb.SeekRequest{
+				Target: &pb.SeekRequest_Cursor{
+					Cursor: &pb.Cursor{Offset: offset},
+				},
+			},
+		},
+	}
+}
+
+func seekResp(offset int64) *pb.SubscribeResponse {
+	return &pb.SubscribeResponse{
+		Response: &pb.SubscribeResponse_Seek{
+			Seek: &pb.SeekResponse{
+				Cursor: &pb.Cursor{Offset: offset},
+			},
+		},
+	}
+}
+
+func flowControlReq(tokens flowControlTokens) *pb.FlowControlRequest {
+	return &pb.FlowControlRequest{
+		AllowedBytes:    tokens.Bytes,
+		AllowedMessages: tokens.Messages,
+	}
+}
+
+func flowControlSubReq(tokens flowControlTokens) *pb.SubscribeRequest {
+	return &pb.SubscribeRequest{
+		Request: &pb.SubscribeRequest_FlowControl{
+			FlowControl: flowControlReq(tokens),
+		},
+	}
+}
+
+func seqMsgWithOffset(offset int64) *pb.SequencedMessage {
+	return &pb.SequencedMessage{
+		Cursor: &pb.Cursor{Offset: offset},
+	}
+}
+
+func seqMsgWithSizeBytes(size int64) *pb.SequencedMessage {
+	return &pb.SequencedMessage{
+		SizeBytes: size,
+	}
+}
+
+func seqMsgWithOffsetAndSize(offset, size int64) *pb.SequencedMessage {
+	return &pb.SequencedMessage{
+		Cursor:    &pb.Cursor{Offset: offset},
+		SizeBytes: size,
+	}
+}
+
+func msgSubResp(msgs ...*pb.SequencedMessage) *pb.SubscribeResponse {
+	return &pb.SubscribeResponse{
+		Response: &pb.SubscribeResponse_Messages{
+			Messages: &pb.MessageResponse{Messages: msgs},
+		},
+	}
+}