| // Copyright 2016 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package pubsub |
| |
| import ( |
| "context" |
| "io" |
| "strings" |
| "sync" |
| "time" |
| |
| vkit "cloud.google.com/go/pubsub/apiv1" |
| "cloud.google.com/go/pubsub/internal/distribution" |
| "github.com/golang/protobuf/proto" |
| gax "github.com/googleapis/gax-go/v2" |
| pb "google.golang.org/genproto/googleapis/pubsub/v1" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| // Between message receipt and ack (that is, the time spent processing a message) we want to extend the message |
| // deadline by way of modack. However, we don't want to extend the deadline right as soon as the deadline expires; |
| // instead, we'd want to extend the deadline a little bit of time ahead. gracePeriod is that amount of time ahead |
| // of the actual deadline. |
| const gracePeriod = 5 * time.Second |
| |
| type messageIterator struct { |
| ctx context.Context |
| cancel func() // the function that will cancel ctx; called in stop |
| po *pullOptions |
| ps *pullStream |
| subc *vkit.SubscriberClient |
| subName string |
| maxExtensionPeriod *time.Duration |
| kaTick <-chan time.Time // keep-alive (deadline extensions) |
| ackTicker *time.Ticker // message acks |
| nackTicker *time.Ticker // message nacks (more frequent than acks) |
| pingTicker *time.Ticker // sends to the stream to keep it open |
| failed chan struct{} // closed on stream error |
| drained chan struct{} // closed when stopped && no more pending messages |
| wg sync.WaitGroup |
| |
| mu sync.Mutex |
| ackTimeDist *distribution.D // dist uses seconds |
| |
| // keepAliveDeadlines is a map of id to expiration time. This map is used in conjunction with |
| // subscription.ReceiveSettings.MaxExtension to record the maximum amount of time (the |
| // deadline, more specifically) we're willing to extend a message's ack deadline. As each |
| // message arrives, we'll record now+MaxExtension in this table; whenever we have a chance |
| // to update ack deadlines (via modack), we'll consult this table and only include IDs |
| // that are not beyond their deadline. |
| keepAliveDeadlines map[string]time.Time |
| pendingAcks map[string]bool |
| pendingNacks map[string]bool |
| pendingModAcks map[string]bool // ack IDs whose ack deadline is to be modified |
| err error // error from stream failure |
| } |
| |
| // newMessageIterator starts and returns a new messageIterator. |
| // subName is the full name of the subscription to pull messages from. |
| // Stop must be called on the messageIterator when it is no longer needed. |
| // The iterator always uses the background context for acking messages and extending message deadlines. |
| func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensionPeriod *time.Duration, po *pullOptions) *messageIterator { |
| var ps *pullStream |
| if !po.synchronous { |
| maxMessages := po.maxOutstandingMessages |
| maxBytes := po.maxOutstandingBytes |
| if po.useLegacyFlowControl { |
| maxMessages = 0 |
| maxBytes = 0 |
| } |
| ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes) |
| } |
| // The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending |
| // the first keepAlive halfway towards the minimum ack deadline. |
| keepAlivePeriod := minAckDeadline / 2 |
| |
| // Ack promptly so users don't lose work if client crashes. |
| ackTicker := time.NewTicker(100 * time.Millisecond) |
| nackTicker := time.NewTicker(100 * time.Millisecond) |
| pingTicker := time.NewTicker(30 * time.Second) |
| cctx, cancel := context.WithCancel(context.Background()) |
| it := &messageIterator{ |
| ctx: cctx, |
| cancel: cancel, |
| ps: ps, |
| po: po, |
| subc: subc, |
| subName: subName, |
| maxExtensionPeriod: maxExtensionPeriod, |
| kaTick: time.After(keepAlivePeriod), |
| ackTicker: ackTicker, |
| nackTicker: nackTicker, |
| pingTicker: pingTicker, |
| failed: make(chan struct{}), |
| drained: make(chan struct{}), |
| ackTimeDist: distribution.New(int(maxAckDeadline/time.Second) + 1), |
| keepAliveDeadlines: map[string]time.Time{}, |
| pendingAcks: map[string]bool{}, |
| pendingNacks: map[string]bool{}, |
| pendingModAcks: map[string]bool{}, |
| } |
| it.wg.Add(1) |
| go it.sender() |
| return it |
| } |
| |
| // Subscription.receive will call stop on its messageIterator when finished with it. |
| // Stop will block until Done has been called on all Messages that have been |
| // returned by Next, or until the context with which the messageIterator was created |
| // is cancelled or exceeds its deadline. |
| func (it *messageIterator) stop() { |
| it.cancel() |
| it.mu.Lock() |
| it.checkDrained() |
| it.mu.Unlock() |
| it.wg.Wait() |
| } |
| |
| // checkDrained closes the drained channel if the iterator has been stopped and all |
| // pending messages have either been n/acked or expired. |
| // |
| // Called with the lock held. |
| func (it *messageIterator) checkDrained() { |
| select { |
| case <-it.drained: |
| return |
| default: |
| } |
| select { |
| case <-it.ctx.Done(): |
| if len(it.keepAliveDeadlines) == 0 { |
| close(it.drained) |
| } |
| default: |
| } |
| } |
| |
| // Called when a message is acked/nacked. |
| func (it *messageIterator) done(ackID string, ack bool, receiveTime time.Time) { |
| it.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second)) |
| it.mu.Lock() |
| defer it.mu.Unlock() |
| delete(it.keepAliveDeadlines, ackID) |
| if ack { |
| it.pendingAcks[ackID] = true |
| } else { |
| it.pendingNacks[ackID] = true |
| } |
| it.checkDrained() |
| } |
| |
| // fail is called when a stream method returns a permanent error. |
| // fail returns it.err. This may be err, or it may be the error |
| // set by an earlier call to fail. |
| func (it *messageIterator) fail(err error) error { |
| it.mu.Lock() |
| defer it.mu.Unlock() |
| if it.err == nil { |
| it.err = err |
| close(it.failed) |
| } |
| return it.err |
| } |
| |
| // receive makes a call to the stream's Recv method, or the Pull RPC, and returns |
| // its messages. |
| // maxToPull is the maximum number of messages for the Pull RPC. |
| func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { |
| it.mu.Lock() |
| ierr := it.err |
| it.mu.Unlock() |
| if ierr != nil { |
| return nil, ierr |
| } |
| |
| // Stop retrieving messages if the iterator's Stop method was called. |
| select { |
| case <-it.ctx.Done(): |
| it.wg.Wait() |
| return nil, io.EOF |
| default: |
| } |
| |
| var rmsgs []*pb.ReceivedMessage |
| var err error |
| if it.po.synchronous { |
| rmsgs, err = it.pullMessages(maxToPull) |
| } else { |
| rmsgs, err = it.recvMessages() |
| } |
| // Any error here is fatal. |
| if err != nil { |
| return nil, it.fail(err) |
| } |
| recordStat(it.ctx, PullCount, int64(len(rmsgs))) |
| msgs, err := convertMessages(rmsgs) |
| if err != nil { |
| return nil, it.fail(err) |
| } |
| // We received some messages. Remember them so we can keep them alive. Also, |
| // do a receipt mod-ack when streaming. |
| maxExt := time.Now().Add(it.po.maxExtension) |
| ackIDs := map[string]bool{} |
| it.mu.Lock() |
| now := time.Now() |
| for _, m := range msgs { |
| ackh, _ := m.ackHandler() |
| ackh.receiveTime = now |
| addRecv(m.ID, ackh.ackID, now) |
| ackh.doneFunc = it.done |
| it.keepAliveDeadlines[ackh.ackID] = maxExt |
| // Don't change the mod-ack if the message is going to be nacked. This is |
| // possible if there are retries. |
| if !it.pendingNacks[ackh.ackID] { |
| ackIDs[ackh.ackID] = true |
| } |
| } |
| deadline := it.ackDeadline() |
| it.mu.Unlock() |
| if len(ackIDs) > 0 { |
| if !it.sendModAck(ackIDs, deadline) { |
| return nil, it.err |
| } |
| } |
| return msgs, nil |
| } |
| |
| // Get messages using the Pull RPC. |
| // This may block indefinitely. It may also return zero messages, after some time waiting. |
| func (it *messageIterator) pullMessages(maxToPull int32) ([]*pb.ReceivedMessage, error) { |
| // Use it.ctx as the RPC context, so that if the iterator is stopped, the call |
| // will return immediately. |
| res, err := it.subc.Pull(it.ctx, &pb.PullRequest{ |
| Subscription: it.subName, |
| MaxMessages: maxToPull, |
| }, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes))) |
| switch { |
| case err == context.Canceled: |
| return nil, nil |
| case err != nil: |
| return nil, err |
| default: |
| return res.ReceivedMessages, nil |
| } |
| } |
| |
| func (it *messageIterator) recvMessages() ([]*pb.ReceivedMessage, error) { |
| res, err := it.ps.Recv() |
| if err != nil { |
| return nil, err |
| } |
| return res.ReceivedMessages, nil |
| } |
| |
| // sender runs in a goroutine and handles all sends to the stream. |
| func (it *messageIterator) sender() { |
| defer it.wg.Done() |
| defer it.ackTicker.Stop() |
| defer it.nackTicker.Stop() |
| defer it.pingTicker.Stop() |
| defer func() { |
| if it.ps != nil { |
| it.ps.CloseSend() |
| } |
| }() |
| |
| done := false |
| for !done { |
| sendAcks := false |
| sendNacks := false |
| sendModAcks := false |
| sendPing := false |
| |
| dl := it.ackDeadline() |
| |
| select { |
| case <-it.failed: |
| // Stream failed: nothing to do, so stop immediately. |
| return |
| |
| case <-it.drained: |
| // All outstanding messages have been marked done: |
| // nothing left to do except make the final calls. |
| it.mu.Lock() |
| sendAcks = (len(it.pendingAcks) > 0) |
| sendNacks = (len(it.pendingNacks) > 0) |
| // No point in sending modacks. |
| done = true |
| |
| case <-it.kaTick: |
| it.mu.Lock() |
| it.handleKeepAlives() |
| sendModAcks = (len(it.pendingModAcks) > 0) |
| |
| nextTick := dl - gracePeriod |
| if nextTick <= 0 { |
| // If the deadline is <= gracePeriod, let's tick again halfway to |
| // the deadline. |
| nextTick = dl / 2 |
| } |
| it.kaTick = time.After(nextTick) |
| |
| case <-it.nackTicker.C: |
| it.mu.Lock() |
| sendNacks = (len(it.pendingNacks) > 0) |
| |
| case <-it.ackTicker.C: |
| it.mu.Lock() |
| sendAcks = (len(it.pendingAcks) > 0) |
| |
| case <-it.pingTicker.C: |
| it.mu.Lock() |
| // Ping only if we are processing messages via streaming. |
| sendPing = !it.po.synchronous |
| } |
| // Lock is held here. |
| var acks, nacks, modAcks map[string]bool |
| if sendAcks { |
| acks = it.pendingAcks |
| it.pendingAcks = map[string]bool{} |
| } |
| if sendNacks { |
| nacks = it.pendingNacks |
| it.pendingNacks = map[string]bool{} |
| } |
| if sendModAcks { |
| modAcks = it.pendingModAcks |
| it.pendingModAcks = map[string]bool{} |
| } |
| it.mu.Unlock() |
| // Make Ack and ModAck RPCs. |
| if sendAcks { |
| if !it.sendAck(acks) { |
| return |
| } |
| } |
| if sendNacks { |
| // Nack indicated by modifying the deadline to zero. |
| if !it.sendModAck(nacks, 0) { |
| return |
| } |
| } |
| if sendModAcks { |
| if !it.sendModAck(modAcks, dl) { |
| return |
| } |
| } |
| if sendPing { |
| it.pingStream() |
| } |
| } |
| } |
| |
| // handleKeepAlives modifies the pending request to include deadline extensions |
| // for live messages. It also purges expired messages. |
| // |
| // Called with the lock held. |
| func (it *messageIterator) handleKeepAlives() { |
| now := time.Now() |
| for id, expiry := range it.keepAliveDeadlines { |
| if expiry.Before(now) { |
| // This delete will not result in skipping any map items, as implied by |
| // the spec at https://golang.org/ref/spec#For_statements, "For |
| // statements with range clause", note 3, and stated explicitly at |
| // https://groups.google.com/forum/#!msg/golang-nuts/UciASUb03Js/pzSq5iVFAQAJ. |
| delete(it.keepAliveDeadlines, id) |
| } else { |
| // This will not conflict with a nack, because nacking removes the ID from keepAliveDeadlines. |
| it.pendingModAcks[id] = true |
| } |
| } |
| it.checkDrained() |
| } |
| |
| func (it *messageIterator) sendAck(m map[string]bool) bool { |
| // Account for the Subscription field. |
| overhead := calcFieldSizeString(it.subName) |
| return it.sendAckIDRPC(m, maxPayload-overhead, func(ids []string) error { |
| recordStat(it.ctx, AckCount, int64(len(ids))) |
| addAcks(ids) |
| bo := gax.Backoff{ |
| Initial: 100 * time.Millisecond, |
| Max: time.Second, |
| Multiplier: 2, |
| } |
| cctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) |
| defer cancel() |
| for { |
| // Use context.Background() as the call's context, not it.ctx. We don't |
| // want to cancel this RPC when the iterator is stopped. |
| cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) |
| defer cancel2() |
| err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ |
| Subscription: it.subName, |
| AckIds: ids, |
| }) |
| // Retry DeadlineExceeded errors a few times before giving up and |
| // allowing the message to expire and be redelivered. |
| // The underlying library handles other retries, currently only |
| // codes.Unavailable. |
| switch status.Code(err) { |
| case codes.DeadlineExceeded: |
| // Use the outer context with timeout here. Errors from gax, including |
| // context deadline exceeded should be transparent, as unacked messages |
| // will be redelivered. |
| if err := gax.Sleep(cctx, bo.Pause()); err != nil { |
| return nil |
| } |
| default: |
| if err == nil { |
| return nil |
| } |
| // This addresses an error where `context deadline exceeded` errors |
| // not captured by the previous case causes fatal errors. |
| // See https://github.com/googleapis/google-cloud-go/issues/3060 |
| if strings.Contains(err.Error(), "context deadline exceeded") { |
| // Context deadline exceeded errors here should be transparent |
| // to prevent the iterator from shutting down. |
| if err := gax.Sleep(cctx, bo.Pause()); err != nil { |
| return nil |
| } |
| continue |
| } |
| // Any other error is fatal. |
| return err |
| } |
| } |
| }) |
| } |
| |
| // The receipt mod-ack amount is derived from a percentile distribution based |
| // on the time it takes to process messages. The percentile chosen is the 99%th |
| // percentile in order to capture the highest amount of time necessary without |
| // considering 1% outliers. |
| func (it *messageIterator) sendModAck(m map[string]bool, deadline time.Duration) bool { |
| deadlineSec := int32(deadline / time.Second) |
| // Account for the Subscription and AckDeadlineSeconds fields. |
| overhead := calcFieldSizeString(it.subName) + calcFieldSizeInt(int(deadlineSec)) |
| return it.sendAckIDRPC(m, maxPayload-overhead, func(ids []string) error { |
| if deadline == 0 { |
| recordStat(it.ctx, NackCount, int64(len(ids))) |
| } else { |
| recordStat(it.ctx, ModAckCount, int64(len(ids))) |
| } |
| addModAcks(ids, deadlineSec) |
| // Retry this RPC on Unavailable for a short amount of time, then give up |
| // without returning a fatal error. The utility of this RPC is by nature |
| // transient (since the deadline is relative to the current time) and it |
| // isn't crucial for correctness (since expired messages will just be |
| // resent). |
| cctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) |
| defer cancel() |
| bo := gax.Backoff{ |
| Initial: 100 * time.Millisecond, |
| Max: time.Second, |
| Multiplier: 2, |
| } |
| for { |
| err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{ |
| Subscription: it.subName, |
| AckDeadlineSeconds: deadlineSec, |
| AckIds: ids, |
| }) |
| switch status.Code(err) { |
| case codes.Unavailable: |
| if err := gax.Sleep(cctx, bo.Pause()); err == nil { |
| continue |
| } |
| // Treat sleep timeout like RPC timeout. |
| fallthrough |
| case codes.DeadlineExceeded: |
| // Timeout. Not a fatal error, but note that it happened. |
| recordStat(it.ctx, ModAckTimeoutCount, 1) |
| return nil |
| default: |
| if err == nil { |
| return nil |
| } |
| // This addresses an error where `context deadline exceeded` errors |
| // not captured by the previous case causes fatal errors. |
| // See https://github.com/googleapis/google-cloud-go/issues/3060 |
| if strings.Contains(err.Error(), "context deadline exceeded") { |
| recordStat(it.ctx, ModAckTimeoutCount, 1) |
| return nil |
| } |
| // Any other error is fatal. |
| return err |
| } |
| } |
| }) |
| } |
| |
| func (it *messageIterator) sendAckIDRPC(ackIDSet map[string]bool, maxSize int, call func([]string) error) bool { |
| ackIDs := make([]string, 0, len(ackIDSet)) |
| for k := range ackIDSet { |
| ackIDs = append(ackIDs, k) |
| } |
| var toSend []string |
| for len(ackIDs) > 0 { |
| toSend, ackIDs = splitRequestIDs(ackIDs, maxSize) |
| if err := call(toSend); err != nil { |
| // The underlying client handles retries, so any error is fatal to the |
| // iterator. |
| it.fail(err) |
| return false |
| } |
| } |
| return true |
| } |
| |
| // Send a message to the stream to keep it open. The stream will close if there's no |
| // traffic on it for a while. By keeping it open, we delay the start of the |
| // expiration timer on messages that are buffered by gRPC or elsewhere in the |
| // network. This matters if it takes a long time to process messages relative to the |
| // default ack deadline, and if the messages are small enough so that many can fit |
| // into the buffer. |
| func (it *messageIterator) pingStream() { |
| // Ignore error; if the stream is broken, this doesn't matter anyway. |
| _ = it.ps.Send(&pb.StreamingPullRequest{}) |
| } |
| |
| // calcFieldSizeString returns the number of bytes string fields |
| // will take up in an encoded proto message. |
| func calcFieldSizeString(fields ...string) int { |
| overhead := 0 |
| for _, field := range fields { |
| overhead += 1 + len(field) + proto.SizeVarint(uint64(len(field))) |
| } |
| return overhead |
| } |
| |
| // calcFieldSizeInt returns the number of bytes int fields |
| // will take up in an encoded proto message. |
| func calcFieldSizeInt(fields ...int) int { |
| overhead := 0 |
| for _, field := range fields { |
| overhead += 1 + proto.SizeVarint(uint64(field)) |
| } |
| return overhead |
| } |
| |
| // splitRequestIDs takes a slice of ackIDs and returns two slices such that the first |
| // ackID slice can be used in a request where the payload does not exceed maxSize. |
| func splitRequestIDs(ids []string, maxSize int) (prefix, remainder []string) { |
| size := 0 |
| i := 0 |
| // TODO(hongalex): Use binary search to find split index, since ackIDs are |
| // fairly constant. |
| for size < maxSize && i < len(ids) { |
| size += calcFieldSizeString(ids[i]) |
| i++ |
| } |
| if size > maxSize { |
| i-- |
| } |
| return ids[:i], ids[i:] |
| } |
| |
| // The deadline to ack is derived from a percentile distribution based |
| // on the time it takes to process messages. The percentile chosen is the 99%th |
| // percentile - that is, processing times up to the 99%th longest processing |
| // times should be safe. The highest 1% may expire. This number was chosen |
| // as a way to cover most users' usecases without losing the value of |
| // expiration. |
| func (it *messageIterator) ackDeadline() time.Duration { |
| pt := time.Duration(it.ackTimeDist.Percentile(.99)) * time.Second |
| |
| if *it.maxExtensionPeriod > 0 && pt > *it.maxExtensionPeriod { |
| return *it.maxExtensionPeriod |
| } |
| if pt > maxAckDeadline { |
| return maxAckDeadline |
| } |
| if pt < minAckDeadline { |
| return minAckDeadline |
| } |
| return pt |
| } |