blob: 8e3155dca883bf867ceed8813b94c0d2b1430fb2 [file] [log] [blame]
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"errors"
"io"
"log"
"strings"
"sync"
"time"
ipubsub "cloud.google.com/go/internal/pubsub"
vkit "cloud.google.com/go/pubsub/apiv1"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/internal/distribution"
gax "github.com/googleapis/gax-go/v2"
"github.com/googleapis/gax-go/v2/apierror"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protowire"
)
// Between message receipt and ack (that is, the time spent processing a message) we want to extend the message
// deadline by way of modack. However, we don't want to extend the deadline right as soon as the deadline expires;
// instead, we'd want to extend the deadline a little bit of time ahead. gracePeriod is that amount of time ahead
// of the actual deadline.
const gracePeriod = 5 * time.Second
// ackIDBatchSize is the maximum number of ACK IDs to send in a single Ack/Modack RPC.
// The backend imposes a maximum request size limit of 524288 bytes (512 KiB) per
// acknowledge / modifyAckDeadline request. ACK IDs have a maximum size of 164
// bytes, thus we cannot send more than 524288/176 ~= 2979 ACK IDs in an Ack/ModAc
// Accounting for some overhead, we should thus only send a maximum of 2500 ACK
// IDs at a time.
// This is a var such that it can be modified for tests.
const ackIDBatchSize int = 2500
// These are vars so tests can change them.
var (
maxDurationPerLeaseExtension = 10 * time.Minute
minDurationPerLeaseExtension = 10 * time.Second
minDurationPerLeaseExtensionExactlyOnce = 1 * time.Minute
// The total amount of time to retry acks/modacks with exactly once delivery enabled subscriptions.
exactlyOnceDeliveryRetryDeadline = 600 * time.Second
)
type messageIterator struct {
ctx context.Context
cancel func() // the function that will cancel ctx; called in stop
po *pullOptions
ps *pullStream
subc *vkit.SubscriberClient
subName string
kaTick <-chan time.Time // keep-alive (deadline extensions)
ackTicker *time.Ticker // message acks
nackTicker *time.Ticker // message nacks
pingTicker *time.Ticker // sends to the stream to keep it open
failed chan struct{} // closed on stream error
drained chan struct{} // closed when stopped && no more pending messages
wg sync.WaitGroup
mu sync.Mutex
ackTimeDist *distribution.D // dist uses seconds
// keepAliveDeadlines is a map of id to expiration time. This map is used in conjunction with
// subscription.ReceiveSettings.MaxExtension to record the maximum amount of time (the
// deadline, more specifically) we're willing to extend a message's ack deadline. As each
// message arrives, we'll record now+MaxExtension in this table; whenever we have a chance
// to update ack deadlines (via modack), we'll consult this table and only include IDs
// that are not beyond their deadline.
keepAliveDeadlines map[string]time.Time
pendingAcks map[string]*AckResult
pendingNacks map[string]*AckResult
// ack IDs whose ack deadline is to be modified
// ModAcks don't have AckResults but allows reuse of the SendModAck function.
pendingModAcks map[string]*AckResult
err error // error from stream failure
eoMu sync.RWMutex
enableExactlyOnceDelivery bool
sendNewAckDeadline bool
orderingMu sync.RWMutex
// enableOrdering determines if messages should be processed in order. This is populated
// by the response in StreamingPull and can change mid Receive. Must be accessed
// with the lock held.
enableOrdering bool
}
// newMessageIterator starts and returns a new messageIterator.
// subName is the full name of the subscription to pull messages from.
// Stop must be called on the messageIterator when it is no longer needed.
// The iterator always uses the background context for acking messages and extending message deadlines.
func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOptions) *messageIterator {
var ps *pullStream
if !po.synchronous {
maxMessages := po.maxOutstandingMessages
maxBytes := po.maxOutstandingBytes
if po.useLegacyFlowControl {
maxMessages = 0
maxBytes = 0
}
ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes, po.maxExtensionPeriod)
}
// The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending
// the first keepAlive halfway towards the minimum ack deadline.
keepAlivePeriod := minDurationPerLeaseExtension / 2
// Ack promptly so users don't lose work if client crashes.
ackTicker := time.NewTicker(100 * time.Millisecond)
nackTicker := time.NewTicker(100 * time.Millisecond)
pingTicker := time.NewTicker(30 * time.Second)
cctx, cancel := context.WithCancel(context.Background())
cctx = withSubscriptionKey(cctx, subName)
it := &messageIterator{
ctx: cctx,
cancel: cancel,
ps: ps,
po: po,
subc: subc,
subName: subName,
kaTick: time.After(keepAlivePeriod),
ackTicker: ackTicker,
nackTicker: nackTicker,
pingTicker: pingTicker,
failed: make(chan struct{}),
drained: make(chan struct{}),
ackTimeDist: distribution.New(int(maxDurationPerLeaseExtension/time.Second) + 1),
keepAliveDeadlines: map[string]time.Time{},
pendingAcks: map[string]*AckResult{},
pendingNacks: map[string]*AckResult{},
pendingModAcks: map[string]*AckResult{},
}
it.wg.Add(1)
go it.sender()
return it
}
// Subscription.receive will call stop on its messageIterator when finished with it.
// Stop will block until Done has been called on all Messages that have been
// returned by Next, or until the context with which the messageIterator was created
// is cancelled or exceeds its deadline.
func (it *messageIterator) stop() {
it.cancel()
it.mu.Lock()
it.checkDrained()
it.mu.Unlock()
it.wg.Wait()
}
// checkDrained closes the drained channel if the iterator has been stopped and all
// pending messages have either been n/acked or expired.
//
// Called with the lock held.
func (it *messageIterator) checkDrained() {
select {
case <-it.drained:
return
default:
}
select {
case <-it.ctx.Done():
if len(it.keepAliveDeadlines) == 0 {
close(it.drained)
}
default:
}
}
// Given a receiveTime, add the elapsed time to the iterator's ack distribution.
// These values are bounded by the ModifyAckDeadline limits, which are
// min/maxDurationPerLeaseExtension.
func (it *messageIterator) addToDistribution(receiveTime time.Time) {
d := time.Since(receiveTime)
d = maxDuration(d, minDurationPerLeaseExtension)
d = minDuration(d, maxDurationPerLeaseExtension)
it.ackTimeDist.Record(int(d / time.Second))
}
// Called when a message is acked/nacked.
func (it *messageIterator) done(ackID string, ack bool, r *AckResult, receiveTime time.Time) {
it.addToDistribution(receiveTime)
it.mu.Lock()
defer it.mu.Unlock()
delete(it.keepAliveDeadlines, ackID)
if ack {
it.pendingAcks[ackID] = r
} else {
it.pendingNacks[ackID] = r
}
it.checkDrained()
}
// fail is called when a stream method returns a permanent error.
// fail returns it.err. This may be err, or it may be the error
// set by an earlier call to fail.
func (it *messageIterator) fail(err error) error {
it.mu.Lock()
defer it.mu.Unlock()
if it.err == nil {
it.err = err
close(it.failed)
}
return it.err
}
// receive makes a call to the stream's Recv method, or the Pull RPC, and returns
// its messages.
// maxToPull is the maximum number of messages for the Pull RPC.
func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
it.mu.Lock()
ierr := it.err
it.mu.Unlock()
if ierr != nil {
return nil, ierr
}
// Stop retrieving messages if the iterator's Stop method was called.
select {
case <-it.ctx.Done():
it.wg.Wait()
return nil, io.EOF
default:
}
var rmsgs []*pb.ReceivedMessage
var err error
if it.po.synchronous {
rmsgs, err = it.pullMessages(maxToPull)
} else {
rmsgs, err = it.recvMessages()
}
// Any error here is fatal.
if err != nil {
return nil, it.fail(err)
}
recordStat(it.ctx, PullCount, int64(len(rmsgs)))
now := time.Now()
msgs, err := convertMessages(rmsgs, now, it.done)
if err != nil {
return nil, it.fail(err)
}
// We received some messages. Remember them so we can keep them alive. Also,
// do a receipt mod-ack when streaming.
maxExt := time.Now().Add(it.po.maxExtension)
ackIDs := map[string]*AckResult{}
it.eoMu.RLock()
exactlyOnceDelivery := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
it.mu.Lock()
// pendingMessages maps ackID -> message, and is used
// only when exactly once delivery is enabled.
// At first, all messages are pending, and they
// are removed if the modack call fails. All other
// messages are returned to the client for processing.
pendingMessages := make(map[string]*ipubsub.Message)
for _, m := range msgs {
ackID := msgAckID(m)
addRecv(m.ID, ackID, now)
it.keepAliveDeadlines[ackID] = maxExt
// Don't change the mod-ack if the message is going to be nacked. This is
// possible if there are retries.
if _, ok := it.pendingNacks[ackID]; !ok {
// Don't use the message's AckResult here since these are only for receipt modacks.
// modack results are transparent to the user so these can automatically succeed unless
// exactly once is enabled.
// We can't use an empty AckResult here either since SetAckResult will try to
// close the channel without checking if it exists.
if !exactlyOnceDelivery {
ackIDs[ackID] = newSuccessAckResult()
} else {
ackIDs[ackID] = ipubsub.NewAckResult()
pendingMessages[ackID] = m
}
}
}
deadline := it.ackDeadline()
it.mu.Unlock()
if len(ackIDs) > 0 {
// When exactly once delivery is not enabled, modacks are fire and forget.
if !exactlyOnceDelivery {
go func() {
it.sendModAck(ackIDs, deadline, false)
}()
return msgs, nil
}
// If exactly once is enabled, we should wait until modack responses are successes
// before attempting to process messages.
it.sendModAck(ackIDs, deadline, false)
for ackID, ar := range ackIDs {
ctx := context.Background()
_, err := ar.Get(ctx)
if err != nil {
delete(pendingMessages, ackID)
it.mu.Lock()
// Remove the message from lease management if modack fails here.
delete(it.keepAliveDeadlines, ackID)
it.mu.Unlock()
}
}
// Only return for processing messages that were successfully modack'ed.
// Iterate over the original messages slice for ordering.
v := make([]*ipubsub.Message, 0, len(pendingMessages))
for _, m := range msgs {
ackID := msgAckID(m)
if _, ok := pendingMessages[ackID]; ok {
v = append(v, m)
}
}
return v, nil
}
return nil, nil
}
// Get messages using the Pull RPC.
// This may block indefinitely. It may also return zero messages, after some time waiting.
func (it *messageIterator) pullMessages(maxToPull int32) ([]*pb.ReceivedMessage, error) {
// Use it.ctx as the RPC context, so that if the iterator is stopped, the call
// will return immediately.
res, err := it.subc.Pull(it.ctx, &pb.PullRequest{
Subscription: it.subName,
MaxMessages: maxToPull,
}, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
switch {
case err == context.Canceled:
return nil, nil
case status.Code(err) == codes.Canceled:
return nil, nil
case err != nil:
return nil, err
default:
return res.ReceivedMessages, nil
}
}
func (it *messageIterator) recvMessages() ([]*pb.ReceivedMessage, error) {
res, err := it.ps.Recv()
if err != nil {
return nil, err
}
// If the new exactly once settings are different than the current settings, update it.
it.eoMu.RLock()
enableEOD := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
subProp := res.GetSubscriptionProperties()
if got := subProp.GetExactlyOnceDeliveryEnabled(); got != enableEOD {
it.eoMu.Lock()
it.sendNewAckDeadline = true
it.enableExactlyOnceDelivery = got
it.eoMu.Unlock()
}
// Also update the subscriber's ordering setting if stale.
it.orderingMu.RLock()
enableOrdering := it.enableOrdering
it.orderingMu.RUnlock()
if got := subProp.GetMessageOrderingEnabled(); got != enableOrdering {
it.orderingMu.Lock()
it.enableOrdering = got
it.orderingMu.Unlock()
}
return res.ReceivedMessages, nil
}
// sender runs in a goroutine and handles all sends to the stream.
func (it *messageIterator) sender() {
defer it.wg.Done()
defer it.ackTicker.Stop()
defer it.nackTicker.Stop()
defer it.pingTicker.Stop()
defer func() {
if it.ps != nil {
it.ps.CloseSend()
}
}()
done := false
for !done {
sendAcks := false
sendNacks := false
sendModAcks := false
sendPing := false
dl := it.ackDeadline()
select {
case <-it.failed:
// Stream failed: nothing to do, so stop immediately.
return
case <-it.drained:
// All outstanding messages have been marked done:
// nothing left to do except make the final calls.
it.mu.Lock()
sendAcks = (len(it.pendingAcks) > 0)
sendNacks = (len(it.pendingNacks) > 0)
// No point in sending modacks.
done = true
case <-it.kaTick:
it.mu.Lock()
it.handleKeepAlives()
sendModAcks = (len(it.pendingModAcks) > 0)
nextTick := dl - gracePeriod
if nextTick <= 0 {
// If the deadline is <= gracePeriod, let's tick again halfway to
// the deadline.
nextTick = dl / 2
}
it.kaTick = time.After(nextTick)
case <-it.nackTicker.C:
it.mu.Lock()
sendNacks = (len(it.pendingNacks) > 0)
case <-it.ackTicker.C:
it.mu.Lock()
sendAcks = (len(it.pendingAcks) > 0)
case <-it.pingTicker.C:
it.mu.Lock()
// Ping only if we are processing messages via streaming.
sendPing = !it.po.synchronous
}
// Lock is held here.
var acks, nacks, modAcks map[string]*AckResult
if sendAcks {
acks = it.pendingAcks
it.pendingAcks = map[string]*AckResult{}
}
if sendNacks {
nacks = it.pendingNacks
it.pendingNacks = map[string]*AckResult{}
}
if sendModAcks {
modAcks = it.pendingModAcks
it.pendingModAcks = map[string]*AckResult{}
}
it.mu.Unlock()
// Make Ack and ModAck RPCs.
if sendAcks {
it.sendAck(acks)
}
if sendNacks {
// Nack indicated by modifying the deadline to zero.
it.sendModAck(nacks, 0, false)
}
if sendModAcks {
it.sendModAck(modAcks, dl, true)
}
if sendPing {
it.pingStream()
}
}
}
// handleKeepAlives modifies the pending request to include deadline extensions
// for live messages. It also purges expired messages.
//
// Called with the lock held.
func (it *messageIterator) handleKeepAlives() {
now := time.Now()
for id, expiry := range it.keepAliveDeadlines {
if expiry.Before(now) {
// This delete will not result in skipping any map items, as implied by
// the spec at https://golang.org/ref/spec#For_statements, "For
// statements with range clause", note 3, and stated explicitly at
// https://groups.google.com/forum/#!msg/golang-nuts/UciASUb03Js/pzSq5iVFAQAJ.
delete(it.keepAliveDeadlines, id)
} else {
// Use a success AckResult since we don't propagate ModAcks back to the user.
it.pendingModAcks[id] = newSuccessAckResult()
}
}
it.checkDrained()
}
// sendAck is used to confirm acknowledgement of a message. If exactly once delivery is
// enabled, we'll retry these messages for a short duration in a goroutine.
func (it *messageIterator) sendAck(m map[string]*AckResult) {
ackIDs := make([]string, 0, len(m))
for k := range m {
ackIDs = append(ackIDs, k)
}
it.eoMu.RLock()
exactlyOnceDelivery := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
var toSend []string
for len(ackIDs) > 0 {
toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize)
recordStat(it.ctx, AckCount, int64(len(toSend)))
addAcks(toSend)
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{
Subscription: it.subName,
AckIds: toSend,
})
if exactlyOnceDelivery {
resultsByAckID := make(map[string]*AckResult)
for _, ackID := range toSend {
resultsByAckID[ackID] = m[ackID]
}
st, md := extractMetadata(err)
_, toRetry := processResults(st, resultsByAckID, md)
if len(toRetry) > 0 {
// Retry acks in a separate goroutine.
go func() {
it.retryAcks(toRetry)
}()
}
}
}
}
// sendModAck is used to extend the lease of messages or nack them.
// The receipt mod-ack amount is derived from a percentile distribution based
// on the time it takes to process messages. The percentile chosen is the 99%th
// percentile in order to capture the highest amount of time necessary without
// considering 1% outliers. If the ModAck RPC fails and exactly once delivery is
// enabled, we retry it in a separate goroutine for a short duration.
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) {
deadlineSec := int32(deadline / time.Second)
ackIDs := make([]string, 0, len(m))
for k := range m {
ackIDs = append(ackIDs, k)
}
it.eoMu.RLock()
exactlyOnceDelivery := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
var toSend []string
for len(ackIDs) > 0 {
toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize)
if deadline == 0 {
recordStat(it.ctx, NackCount, int64(len(toSend)))
} else {
recordStat(it.ctx, ModAckCount, int64(len(toSend)))
}
addModAcks(toSend, deadlineSec)
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{
Subscription: it.subName,
AckDeadlineSeconds: deadlineSec,
AckIds: toSend,
})
if exactlyOnceDelivery {
resultsByAckID := make(map[string]*AckResult)
for _, ackID := range toSend {
resultsByAckID[ackID] = m[ackID]
}
st, md := extractMetadata(err)
_, toRetry := processResults(st, resultsByAckID, md)
if len(toRetry) > 0 {
// Retry modacks/nacks in a separate goroutine.
go func() {
it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
}()
}
}
}
}
// retryAcks retries the ack RPC with backoff. This must be called in a goroutine
// in it.sendAck(), with a max of 2500 ackIDs.
func (it *messageIterator) retryAcks(m map[string]*AckResult) {
ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline)
defer cancel()
bo := newExactlyOnceBackoff()
for {
if ctx.Err() != nil {
for _, r := range m {
ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
}
return
}
// Don't need to split map since this is the retry function and
// there is already a max of 2500 ackIDs here.
ackIDs := make([]string, 0, len(m))
for k := range m {
ackIDs = append(ackIDs, k)
}
cctx2, cancel2 := context.WithTimeout(ctx, 60*time.Second)
defer cancel2()
err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{
Subscription: it.subName,
AckIds: ackIDs,
})
st, md := extractMetadata(err)
_, toRetry := processResults(st, m, md)
if len(toRetry) == 0 {
return
}
time.Sleep(bo.Pause())
m = toRetry
}
}
// retryModAcks retries the modack RPC with backoff. This must be called in a goroutine
// in it.sendModAck(), with a max of 2500 ackIDs. Modacks are retried up to 3 times
// since after that, the message will have expired. Nacks are retried up until the default
// deadline of 10 minutes.
func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32, logOnInvalid bool) {
bo := newExactlyOnceBackoff()
retryCount := 0
ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline)
defer cancel()
for {
// If context is done, complete all AckResults with errors.
if ctx.Err() != nil {
for _, r := range m {
ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
}
return
}
// Only retry modack requests up to 3 times.
if deadlineSec != 0 && retryCount > 3 {
ackIDs := make([]string, 0, len(m))
for k, ar := range m {
ackIDs = append(ackIDs, k)
ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("modack retry failed"))
}
if logOnInvalid {
log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs)
}
return
}
// Don't need to split map since this is the retry function and
// there is already a max of 2500 ackIDs here.
ackIDs := make([]string, 0, len(m))
for k := range m {
ackIDs = append(ackIDs, k)
}
cctx2, cancel2 := context.WithTimeout(ctx, 60*time.Second)
defer cancel2()
err := it.subc.ModifyAckDeadline(cctx2, &pb.ModifyAckDeadlineRequest{
Subscription: it.subName,
AckIds: ackIDs,
AckDeadlineSeconds: deadlineSec,
})
st, md := extractMetadata(err)
_, toRetry := processResults(st, m, md)
if len(toRetry) == 0 {
return
}
time.Sleep(bo.Pause())
m = toRetry
retryCount++
}
}
// Send a message to the stream to keep it open. The stream will close if there's no
// traffic on it for a while. By keeping it open, we delay the start of the
// expiration timer on messages that are buffered by gRPC or elsewhere in the
// network. This matters if it takes a long time to process messages relative to the
// default ack deadline, and if the messages are small enough so that many can fit
// into the buffer.
func (it *messageIterator) pingStream() {
spr := &pb.StreamingPullRequest{}
it.eoMu.RLock()
if it.sendNewAckDeadline {
spr.StreamAckDeadlineSeconds = int32(it.ackDeadline())
it.sendNewAckDeadline = false
}
it.eoMu.RUnlock()
it.ps.Send(spr)
}
// calcFieldSizeString returns the number of bytes string fields
// will take up in an encoded proto message.
func calcFieldSizeString(fields ...string) int {
overhead := 0
for _, field := range fields {
overhead += 1 + len(field) + protowire.SizeVarint(uint64(len(field)))
}
return overhead
}
// calcFieldSizeInt returns the number of bytes int fields
// will take up in an encoded proto message.
func calcFieldSizeInt(fields ...int) int {
overhead := 0
for _, field := range fields {
overhead += 1 + protowire.SizeVarint(uint64(field))
}
return overhead
}
// splitRequestIDs takes a slice of ackIDs and returns two slices such that the first
// ackID slice can be used in a request where the payload does not exceed ackIDBatchSize.
func splitRequestIDs(ids []string, maxBatchSize int) (prefix, remainder []string) {
if len(ids) < maxBatchSize {
return ids, []string{}
}
return ids[:maxBatchSize], ids[maxBatchSize:]
}
// The deadline to ack is derived from a percentile distribution based
// on the time it takes to process messages. The percentile chosen is the 99%th
// percentile - that is, processing times up to the 99%th longest processing
// times should be safe. The highest 1% may expire. This number was chosen
// as a way to cover most users' usecases without losing the value of
// expiration.
func (it *messageIterator) ackDeadline() time.Duration {
pt := time.Duration(it.ackTimeDist.Percentile(.99)) * time.Second
it.eoMu.RLock()
enableExactlyOnce := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
return boundedDuration(pt, it.po.minExtensionPeriod, it.po.maxExtensionPeriod, enableExactlyOnce)
}
func boundedDuration(ackDeadline, minExtension, maxExtension time.Duration, exactlyOnce bool) time.Duration {
// If the user explicitly sets a maxExtensionPeriod, respect it.
if maxExtension > 0 {
ackDeadline = minDuration(ackDeadline, maxExtension)
}
// If the user explicitly sets a minExtensionPeriod, respect it.
if minExtension > 0 {
ackDeadline = maxDuration(ackDeadline, minExtension)
} else if exactlyOnce {
// Higher minimum ack_deadline for subscriptions with
// exactly-once delivery enabled.
ackDeadline = maxDuration(ackDeadline, minDurationPerLeaseExtensionExactlyOnce)
} else if ackDeadline < minDurationPerLeaseExtension {
// Otherwise, lower bound is min ack extension. This is normally bounded
// when adding datapoints to the distribution, but this is needed for
// the initial few calls to ackDeadline.
ackDeadline = minDurationPerLeaseExtension
}
return ackDeadline
}
func minDuration(x, y time.Duration) time.Duration {
if x < y {
return x
}
return y
}
func maxDuration(x, y time.Duration) time.Duration {
if x > y {
return x
}
return y
}
const (
transientErrStringPrefix = "TRANSIENT_"
permanentInvalidAckErrString = "PERMANENT_FAILURE_INVALID_ACK_ID"
)
// extracts information from an API error for exactly once delivery's ack/modack err responses.
func extractMetadata(err error) (*status.Status, map[string]string) {
apiErr, ok := apierror.FromError(err)
if ok {
return apiErr.GRPCStatus(), apiErr.Metadata()
}
return nil, nil
}
// processResults processes AckResults by referring to errorStatus and errorsByAckID.
// The errors returned by the server in `errorStatus` or in `errorsByAckID`
// are used to complete the AckResults in `ackResMap` (with a success
// or error) or to return requests for further retries.
// This function returns two maps of ackID to ack results, one for completed results and the other for ones to retry.
// Logic is derived from python-pubsub: https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L161-L220
func processResults(errorStatus *status.Status, ackResMap map[string]*AckResult, errorsByAckID map[string]string) (map[string]*AckResult, map[string]*AckResult) {
completedResults := make(map[string]*AckResult)
retryResults := make(map[string]*AckResult)
for ackID, ar := range ackResMap {
// Handle special errors returned for ack/modack RPCs via the ErrorInfo
// sidecar metadata when exactly-once delivery is enabled.
if errAckID, ok := errorsByAckID[ackID]; ok {
if strings.HasPrefix(errAckID, transientErrStringPrefix) {
retryResults[ackID] = ar
} else {
if errAckID == permanentInvalidAckErrString {
ipubsub.SetAckResult(ar, AcknowledgeStatusInvalidAckID, errors.New(errAckID))
} else {
ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New(errAckID))
}
completedResults[ackID] = ar
}
} else if errorStatus != nil && contains(errorStatus.Code(), exactlyOnceDeliveryTemporaryRetryErrors) {
retryResults[ackID] = ar
} else if errorStatus != nil {
// Other gRPC errors are not retried.
switch errorStatus.Code() {
case codes.PermissionDenied:
ipubsub.SetAckResult(ar, AcknowledgeStatusPermissionDenied, errorStatus.Err())
case codes.FailedPrecondition:
ipubsub.SetAckResult(ar, AcknowledgeStatusFailedPrecondition, errorStatus.Err())
default:
ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errorStatus.Err())
}
completedResults[ackID] = ar
} else if ar != nil {
// Since no error occurred, requests with AckResults are completed successfully.
ipubsub.SetAckResult(ar, AcknowledgeStatusSuccess, nil)
completedResults[ackID] = ar
} else {
// All other requests are considered completed.
completedResults[ackID] = ar
}
}
return completedResults, retryResults
}