blob: 9aee40b072212d2800b7e3f9bcf08939a9d89500 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package wire
import (
"context"
"errors"
"reflect"
"time"
"google.golang.org/grpc"
vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
var (
errServerNoMessages = errors.New("pubsublite: server delivered no messages")
errInvalidInitialSubscribeResponse = errors.New("pubsublite: first response from server was not an initial response for subscribe")
errInvalidSubscribeResponse = errors.New("pubsublite: received invalid subscribe response from server")
errNoInFlightSeek = errors.New("pubsublite: received seek response for no in-flight seek")
)
// ReceivedMessage stores a received Pub/Sub message and AckConsumer for
// acknowledging the message.
type ReceivedMessage struct {
Msg *pb.SequencedMessage
Ack AckConsumer
}
// MessageReceiverFunc receives a batch of Pub/Sub messages from a topic
// partition.
type MessageReceiverFunc func([]*ReceivedMessage)
// The frequency of sending batch flow control requests.
const batchFlowControlPeriod = 100 * time.Millisecond
// subscribeStream directly wraps the subscribe client stream. It passes
// messages to the message receiver and manages flow control. Flow control
// tokens are batched and sent to the stream via a periodic background task,
// although it can be expedited if the user is rapidly acking messages.
//
// Client-initiated seek unsupported.
type subscribeStream struct {
// Immutable after creation.
subClient *vkit.SubscriberClient
settings ReceiveSettings
subscription subscriptionPartition
initialReq *pb.SubscribeRequest
receiver MessageReceiverFunc
// Fields below must be guarded with mutex.
stream *retryableStream
acks *ackTracker
offsetTracker subscriberOffsetTracker
flowControl flowControlBatcher
pollFlowControl *periodicTask
seekInFlight bool
abstractService
}
func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, settings ReceiveSettings,
receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker, disableTasks bool) *subscribeStream {
s := &subscribeStream{
subClient: subClient,
settings: settings,
subscription: subscription,
initialReq: &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Initial{
Initial: &pb.InitialSubscribeRequest{
Subscription: subscription.Path,
Partition: int64(subscription.Partition),
},
},
},
receiver: receiver,
acks: acks,
}
s.stream = newRetryableStream(ctx, s, settings.Timeout, reflect.TypeOf(pb.SubscribeResponse{}))
backgroundTask := s.sendBatchFlowControl
if disableTasks {
backgroundTask = func() {}
}
s.pollFlowControl = newPeriodicTask(batchFlowControlPeriod, backgroundTask)
return s
}
// Start establishes a subscribe stream connection and initializes flow control
// tokens from ReceiveSettings.
func (s *subscribeStream) Start() {
s.mu.Lock()
defer s.mu.Unlock()
if s.unsafeUpdateStatus(serviceStarting, nil) {
s.stream.Start()
s.pollFlowControl.Start()
s.flowControl.OnClientFlow(flowControlTokens{
Bytes: int64(s.settings.MaxOutstandingBytes),
Messages: int64(s.settings.MaxOutstandingMessages),
})
}
}
// Stop immediately terminates the subscribe stream.
func (s *subscribeStream) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
s.unsafeInitiateShutdown(serviceTerminating, nil)
}
func (s *subscribeStream) newStream(ctx context.Context) (grpc.ClientStream, error) {
return s.subClient.Subscribe(addSubscriptionRoutingMetadata(ctx, s.subscription))
}
func (s *subscribeStream) initialRequest() (interface{}, bool) {
return s.initialReq, true
}
func (s *subscribeStream) validateInitialResponse(response interface{}) error {
subscribeResponse, _ := response.(*pb.SubscribeResponse)
if subscribeResponse.GetInitial() == nil {
return errInvalidInitialSubscribeResponse
}
return nil
}
func (s *subscribeStream) onStreamStatusChange(status streamStatus) {
s.mu.Lock()
defer s.mu.Unlock()
switch status {
case streamConnected:
s.unsafeUpdateStatus(serviceActive, nil)
// Reinitialize the offset and flow control tokens when a new subscribe
// stream instance is connected.
if seekReq := s.offsetTracker.RequestForRestart(); seekReq != nil {
// Note: If Send() returns false, the subscriber will either terminate or
// the stream will be reconnected.
if s.stream.Send(&pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Seek{Seek: seekReq},
}) {
s.seekInFlight = true
}
}
s.unsafeSendFlowControl(s.flowControl.RequestForRestart())
s.pollFlowControl.Start()
case streamReconnecting:
s.seekInFlight = false
s.pollFlowControl.Stop()
case streamTerminated:
s.unsafeInitiateShutdown(serviceTerminated, s.stream.Error())
}
}
func (s *subscribeStream) onResponse(response interface{}) {
var receivedMsgs []*ReceivedMessage
var err error
s.mu.Lock()
subscribeResponse, _ := response.(*pb.SubscribeResponse)
switch {
case subscribeResponse.GetMessages() != nil:
receivedMsgs, err = s.unsafeOnMessageResponse(subscribeResponse.GetMessages())
case subscribeResponse.GetSeek() != nil:
err = s.unsafeOnSeekResponse(subscribeResponse.GetSeek())
default:
err = errInvalidSubscribeResponse
}
if receivedMsgs != nil {
// Deliver messages without holding the mutex to prevent deadlocks.
s.mu.Unlock()
s.receiver(receivedMsgs)
return
}
if err != nil {
s.unsafeInitiateShutdown(serviceTerminated, err)
}
s.mu.Unlock()
}
func (s *subscribeStream) unsafeOnSeekResponse(response *pb.SeekResponse) error {
if !s.seekInFlight {
return errNoInFlightSeek
}
s.seekInFlight = false
return nil
}
func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) ([]*ReceivedMessage, error) {
if len(response.Messages) == 0 {
return nil, errServerNoMessages
}
if err := s.offsetTracker.OnMessages(response.Messages); err != nil {
return nil, err
}
if err := s.flowControl.OnMessages(response.Messages); err != nil {
return nil, err
}
var receivedMsgs []*ReceivedMessage
for _, msg := range response.Messages {
// Register outstanding acks, which are primarily handled by the
// `committer`.
ack := newAckConsumer(msg.GetCursor().GetOffset(), msg.GetSizeBytes(), s.onAck)
if err := s.acks.Push(ack); err != nil {
return nil, err
}
receivedMsgs = append(receivedMsgs, &ReceivedMessage{Msg: msg, Ack: ack})
}
return receivedMsgs, nil
}
func (s *subscribeStream) onAck(ac *ackConsumer) {
// Don't block the user's goroutine with potentially expensive ack processing.
go s.onAckAsync(ac.MsgBytes)
}
func (s *subscribeStream) onAckAsync(msgBytes int64) {
s.mu.Lock()
defer s.mu.Unlock()
if s.status == serviceActive {
s.unsafeAllowFlow(flowControlTokens{Bytes: msgBytes, Messages: 1})
}
}
// sendBatchFlowControl is called by the periodic background task.
func (s *subscribeStream) sendBatchFlowControl() {
s.mu.Lock()
defer s.mu.Unlock()
s.unsafeSendFlowControl(s.flowControl.ReleasePendingRequest())
}
func (s *subscribeStream) unsafeAllowFlow(allow flowControlTokens) {
s.flowControl.OnClientFlow(allow)
if s.flowControl.ShouldExpediteBatchRequest() {
s.unsafeSendFlowControl(s.flowControl.ReleasePendingRequest())
}
}
func (s *subscribeStream) unsafeSendFlowControl(req *pb.FlowControlRequest) {
if req == nil {
return
}
// Note: If Send() returns false, the stream will be reconnected and
// flowControlBatcher.RequestForRestart() will be sent when the stream
// reconnects. So its return value is ignored.
s.stream.Send(&pb.SubscribeRequest{
Request: &pb.SubscribeRequest_FlowControl{FlowControl: req},
})
}
func (s *subscribeStream) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !s.unsafeUpdateStatus(targetStatus, err) {
return
}
// No data to send. Immediately terminate the stream.
s.pollFlowControl.Stop()
s.stream.Stop()
}
// singlePartitionSubscriber receives messages from a single topic partition.
// It requires 2 child services:
// - subscribeStream to receive messages from the subscribe stream.
// - committer to commit cursor offsets to the streaming commit cursor stream.
type singlePartitionSubscriber struct {
compositeService
}
type singlePartitionSubscriberFactory struct {
ctx context.Context
subClient *vkit.SubscriberClient
cursorClient *vkit.CursorClient
settings ReceiveSettings
subscriptionPath string
receiver MessageReceiverFunc
disableTasks bool
}
func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSubscriber {
subscription := subscriptionPartition{Path: f.subscriptionPath, Partition: partition}
acks := newAckTracker()
commit := newCommitter(f.ctx, f.cursorClient, f.settings, subscription, acks, f.disableTasks)
sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, f.disableTasks)
ps := new(singlePartitionSubscriber)
ps.init()
ps.unsafeAddServices(sub, commit)
return ps
}
// multiPartitionSubscriber receives messages from a fixed set of topic
// partitions.
type multiPartitionSubscriber struct {
compositeService
}
func newMultiPartitionSubscriber(subFactory *singlePartitionSubscriberFactory) *multiPartitionSubscriber {
ms := new(multiPartitionSubscriber)
ms.init()
for _, partition := range subFactory.settings.Partitions {
subscriber := subFactory.New(partition)
ms.unsafeAddServices(subscriber)
}
return ms
}