feat(pubsublite): Trackers for acks and commit cursor (#3137)
These are similar to their Java counterparts with the following differences:
- ackConsumer is similar to AckReplyConsumer.
- ackTracker is equivalent to AckSetTrackerImpl. It maintains a single ordered queue of unacked offsets and stores the desired commit offset in ackedPrefixOffset.
- commitCursorTracker is equivalent to CommitState. It maintains the last commit offset confirmed by the server and a queue of unacknowledged committed offsets.
diff --git a/pubsublite/internal/wire/acks.go b/pubsublite/internal/wire/acks.go
new file mode 100644
index 0000000..0a7c208
--- /dev/null
+++ b/pubsublite/internal/wire/acks.go
@@ -0,0 +1,240 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package wire
+
+import (
+ "container/list"
+ "fmt"
+ "sync"
+)
+
+// AckConsumer is the interface exported from this package for acking messages.
+type AckConsumer interface {
+ Ack()
+}
+
+// ackedFunc is invoked when a message has been acked by the user. Note: if the
+// ackedFunc implementation calls any ackConsumer methods, it needs to run in a
+// goroutine to avoid a deadlock.
+type ackedFunc func(*ackConsumer)
+
+// ackConsumer is used for handling message acks. It is attached to a Message
+// and also stored within the ackTracker until the message has been acked by the
+// user.
+type ackConsumer struct {
+ // The message offset.
+ Offset int64
+ // Bytes released to the flow controller once the message has been acked.
+ MsgBytes int64
+
+ // Guards access to fields below.
+ mu sync.Mutex
+ acked bool
+ onAck ackedFunc
+}
+
+func newAckConsumer(offset, msgBytes int64, onAck ackedFunc) *ackConsumer {
+ return &ackConsumer{Offset: offset, MsgBytes: msgBytes, onAck: onAck}
+}
+
+func (ac *ackConsumer) Ack() {
+ ac.mu.Lock()
+ defer ac.mu.Unlock()
+
+ if ac.acked {
+ return
+ }
+ ac.acked = true
+ if ac.onAck != nil {
+ // Not invoked in a goroutine here for ease of testing.
+ ac.onAck(ac)
+ }
+}
+
+func (ac *ackConsumer) IsAcked() bool {
+ ac.mu.Lock()
+ defer ac.mu.Unlock()
+ return ac.acked
+}
+
+// Clear onAck when the ack can no longer be processed. The user's ack would be
+// ignored.
+func (ac *ackConsumer) Clear() {
+ ac.mu.Lock()
+ defer ac.mu.Unlock()
+ ac.onAck = nil
+}
+
+// Represents an uninitialized cursor offset. A sentinel value is used instead
+// if an optional to simplify cursor comparisons (i.e. -1 works without the need
+// to check for nil and then convert to int64).
+const nilCursorOffset int64 = -1
+
+// ackTracker manages outstanding message acks, i.e. messages that have been
+// delivered to the user, but not yet acked. It is used by the committer and
+// wireSubscriber, so requires its own mutex.
+type ackTracker struct {
+ // Guards access to fields below.
+ mu sync.Mutex
+ // All offsets before and including this prefix have been acked by the user.
+ ackedPrefixOffset int64
+ // Outstanding message acks, strictly ordered by increasing message offsets.
+ outstandingAcks *list.List // Value = *ackConsumer
+}
+
+func newAckTracker() *ackTracker {
+ return &ackTracker{
+ ackedPrefixOffset: nilCursorOffset,
+ outstandingAcks: list.New(),
+ }
+}
+
+// Push adds an outstanding ack to the tracker.
+func (at *ackTracker) Push(ack *ackConsumer) error {
+ at.mu.Lock()
+ defer at.mu.Unlock()
+
+ // These errors should not occur unless there is a bug in the client library
+ // as message ordering should have been validated by subscriberOffsetTracker.
+ if ack.Offset <= at.ackedPrefixOffset {
+ return errOutOfOrderMessages
+ }
+ if elem := at.outstandingAcks.Back(); elem != nil {
+ lastOutstandingAck, _ := elem.Value.(*ackConsumer)
+ if ack.Offset <= lastOutstandingAck.Offset {
+ return errOutOfOrderMessages
+ }
+ }
+
+ at.outstandingAcks.PushBack(ack)
+ return nil
+}
+
+// CommitOffset returns the cursor offset that should be committed. May return
+// nilCursorOffset if no messages have been acked thus far.
+func (at *ackTracker) CommitOffset() int64 {
+ at.mu.Lock()
+ defer at.mu.Unlock()
+
+ // Process outstanding acks and update `ackedPrefixOffset` until an unacked
+ // message is found.
+ for {
+ elem := at.outstandingAcks.Front()
+ if elem == nil {
+ break
+ }
+ ack, _ := elem.Value.(*ackConsumer)
+ if !ack.IsAcked() {
+ break
+ }
+ at.ackedPrefixOffset = ack.Offset
+ at.outstandingAcks.Remove(elem)
+ ack.Clear()
+ }
+
+ if at.ackedPrefixOffset == nilCursorOffset {
+ return nilCursorOffset
+ }
+ // Convert from last acked to first unacked, which is the commit offset.
+ return at.ackedPrefixOffset + 1
+}
+
+// Release clears and invalidates any outstanding acks. This should be called
+// when the subscriber terminates.
+func (at *ackTracker) Release() {
+ at.mu.Lock()
+ defer at.mu.Unlock()
+
+ for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() {
+ ack, _ := elem.Value.(*ackConsumer)
+ ack.Clear()
+ }
+ at.outstandingAcks.Init()
+}
+
+// commitCursorTracker tracks pending and last successful committed offsets.
+// It is only accessed by the committer.
+type commitCursorTracker struct {
+ // Used to obtain the desired commit offset based on messages acked by the
+ // user.
+ acks *ackTracker
+ // Last offset for which the server confirmed (acknowledged) the commit.
+ lastConfirmedOffset int64
+ // Queue of committed offsets awaiting confirmation from the server.
+ pendingOffsets *list.List // Value = int64
+}
+
+func newCommitCursorTracker(acks *ackTracker) *commitCursorTracker {
+ return &commitCursorTracker{
+ acks: acks,
+ lastConfirmedOffset: nilCursorOffset,
+ pendingOffsets: list.New(),
+ }
+}
+
+func extractOffsetFromElem(elem *list.Element) int64 {
+ if elem == nil {
+ return nilCursorOffset
+ }
+ offset, _ := elem.Value.(int64)
+ return offset
+}
+
+// NextOffset is the commit offset to be sent to the stream. Returns
+// nilCursorOffset if the commit offset does not need to be updated.
+func (ct *commitCursorTracker) NextOffset() int64 {
+ desiredCommitOffset := ct.acks.CommitOffset()
+ if desiredCommitOffset <= ct.lastConfirmedOffset {
+ // The server has already confirmed the commit offset.
+ return nilCursorOffset
+ }
+ if desiredCommitOffset <= extractOffsetFromElem(ct.pendingOffsets.Back()) {
+ // The commit offset has already been sent to the commit stream and is
+ // awaiting confirmation.
+ return nilCursorOffset
+ }
+ return desiredCommitOffset
+}
+
+// AddPending adds a sent, but not yet confirmed, committed offset.
+func (ct *commitCursorTracker) AddPending(offset int64) {
+ ct.pendingOffsets.PushBack(offset)
+}
+
+// ClearPending discards old pending offsets. Should be called when the commit
+// stream reconnects, as the server acknowledgments for these would not be
+// received.
+func (ct *commitCursorTracker) ClearPending() {
+ ct.pendingOffsets.Init()
+}
+
+// ConfirmOffsets processes the server's acknowledgment of the first
+// `numConfirmed` pending offsets.
+func (ct *commitCursorTracker) ConfirmOffsets(numConfirmed int64) error {
+ if numPending := int64(ct.pendingOffsets.Len()); numPending < numConfirmed {
+ return fmt.Errorf("pubsublite: server acknowledged %d cursor commits, but only %d were sent", numConfirmed, numPending)
+ }
+
+ for i := int64(0); i < numConfirmed; i++ {
+ front := ct.pendingOffsets.Front()
+ ct.lastConfirmedOffset = extractOffsetFromElem(front)
+ ct.pendingOffsets.Remove(front)
+ }
+ return nil
+}
+
+// Done when the server has confirmed the desired commit offset.
+func (ct *commitCursorTracker) Done() bool {
+ return ct.acks.CommitOffset() <= ct.lastConfirmedOffset
+}
diff --git a/pubsublite/internal/wire/acks_test.go b/pubsublite/internal/wire/acks_test.go
new file mode 100644
index 0000000..b7204e3
--- /dev/null
+++ b/pubsublite/internal/wire/acks_test.go
@@ -0,0 +1,311 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package wire
+
+import "testing"
+
+func TestAckConsumerAck(t *testing.T) {
+ numAcks := 0
+ onAck := func(ac *ackConsumer) {
+ numAcks++
+ }
+ ackConsumer := newAckConsumer(0, 0, onAck)
+
+ // Test duplicate acks.
+ for i := 0; i < 3; i++ {
+ ackConsumer.Ack()
+
+ if got, want := ackConsumer.IsAcked(), true; got != want {
+ t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want)
+ }
+ if got, want := numAcks, 1; got != want {
+ t.Errorf("onAck func called %v times, expected %v call", got, want)
+ }
+ }
+}
+
+func TestAckConsumerClear(t *testing.T) {
+ onAck := func(ac *ackConsumer) {
+ t.Error("onAck func should not have been called")
+ }
+ ackConsumer := newAckConsumer(0, 0, onAck)
+ ackConsumer.Clear()
+ ackConsumer.Ack()
+
+ if got, want := ackConsumer.IsAcked(), true; got != want {
+ t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want)
+ }
+}
+
+func TestAckTrackerProcessing(t *testing.T) {
+ ackTracker := newAckTracker()
+
+ // No messages received yet.
+ if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want {
+ t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
+ }
+
+ onAck := func(ac *ackConsumer) {
+ // Nothing to do.
+ }
+ ack1 := newAckConsumer(1, 0, onAck)
+ ack2 := newAckConsumer(2, 0, onAck)
+ ack3 := newAckConsumer(3, 0, onAck)
+ if err := ackTracker.Push(ack1); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+ if err := ackTracker.Push(ack2); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+ if err := ackTracker.Push(ack3); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+
+ // All messages unacked.
+ if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want {
+ t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
+ }
+
+ ack1.Ack()
+ if got, want := ackTracker.CommitOffset(), int64(2); got != want {
+ t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
+ }
+
+ // Skipped ack2, so the commit offset should not have been updated.
+ ack3.Ack()
+ if got, want := ackTracker.CommitOffset(), int64(2); got != want {
+ t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
+ }
+
+ // Both ack2 and ack3 should be removed from the outstanding acks queue.
+ ack2.Ack()
+ if got, want := ackTracker.CommitOffset(), int64(4); got != want {
+ t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
+ }
+
+ // Newly received message.
+ ack4 := newAckConsumer(4, 0, onAck)
+ if err := ackTracker.Push(ack4); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+ ack4.Ack()
+ if got, want := ackTracker.CommitOffset(), int64(5); got != want {
+ t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
+ }
+}
+
+func TestAckTrackerRelease(t *testing.T) {
+ ackTracker := newAckTracker()
+ onAck := func(ac *ackConsumer) {
+ t.Error("onAck should not be called")
+ }
+ ack1 := newAckConsumer(1, 0, onAck)
+ ack2 := newAckConsumer(2, 0, onAck)
+ ack3 := newAckConsumer(3, 0, onAck)
+
+ if err := ackTracker.Push(ack1); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+ if err := ackTracker.Push(ack2); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+ if err := ackTracker.Push(ack3); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+
+ // After clearing outstanding acks, onAck should not be called.
+ ackTracker.Release()
+ ack1.Ack()
+ ack2.Ack()
+ ack3.Ack()
+}
+
+func TestCommitCursorTrackerProcessing(t *testing.T) {
+ ackTracker := newAckTracker()
+ commitTracker := newCommitCursorTracker(ackTracker)
+
+ // No messages received yet.
+ if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+
+ onAck := func(ac *ackConsumer) {
+ // Nothing to do.
+ }
+ ack1 := newAckConsumer(1, 0, onAck)
+ ack2 := newAckConsumer(2, 0, onAck)
+ ack3 := newAckConsumer(3, 0, onAck)
+ if err := ackTracker.Push(ack1); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+ if err := ackTracker.Push(ack2); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+ if err := ackTracker.Push(ack3); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+
+ // All messages unacked.
+ if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+
+ // Msg1 acked and commit sent to stream.
+ ack1.Ack()
+ if got, want := commitTracker.NextOffset(), int64(2); got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+ commitTracker.AddPending(commitTracker.NextOffset())
+ if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+
+ // Msg 2 & 3 acked commit and sent to stream.
+ ack2.Ack()
+ if got, want := commitTracker.NextOffset(), int64(3); got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+ ack3.Ack()
+ if got, want := commitTracker.NextOffset(), int64(4); got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+ commitTracker.AddPending(commitTracker.NextOffset())
+ if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+ if got, want := commitTracker.Done(), false; got != want {
+ t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want)
+ }
+
+ // First 2 pending commits acknowledged.
+ if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want {
+ t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
+ }
+ if err := commitTracker.ConfirmOffsets(2); err != nil {
+ t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
+ }
+ if got, want := commitTracker.lastConfirmedOffset, int64(4); got != want {
+ t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
+ }
+ if got, want := commitTracker.Done(), true; got != want {
+ t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want)
+ }
+}
+
+func TestCommitCursorTrackerStreamReconnects(t *testing.T) {
+ ackTracker := newAckTracker()
+ commitTracker := newCommitCursorTracker(ackTracker)
+
+ onAck := func(ac *ackConsumer) {
+ // Nothing to do.
+ }
+ ack1 := newAckConsumer(1, 0, onAck)
+ ack2 := newAckConsumer(2, 0, onAck)
+ ack3 := newAckConsumer(3, 0, onAck)
+ if err := ackTracker.Push(ack1); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+ if err := ackTracker.Push(ack2); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+ if err := ackTracker.Push(ack3); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+
+ // All messages unacked.
+ if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+
+ // Msg1 acked and commit sent to stream.
+ ack1.Ack()
+ if got, want := commitTracker.NextOffset(), int64(2); got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+ commitTracker.AddPending(commitTracker.NextOffset())
+ if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+
+ // Msg2 acked and commit sent to stream.
+ ack2.Ack()
+ if got, want := commitTracker.NextOffset(), int64(3); got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+ commitTracker.AddPending(commitTracker.NextOffset())
+ if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+
+ // Stream breaks and pending offsets are cleared.
+ commitTracker.ClearPending()
+ if got, want := commitTracker.Done(), false; got != want {
+ t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want)
+ }
+ // When the stream reconnects the next offset should be 3 (offset 2 skipped).
+ if got, want := commitTracker.NextOffset(), int64(3); got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+ commitTracker.AddPending(commitTracker.NextOffset())
+ if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+
+ // Msg2 acked and commit sent to stream.
+ ack3.Ack()
+ if got, want := commitTracker.NextOffset(), int64(4); got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+ commitTracker.AddPending(commitTracker.NextOffset())
+ if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+
+ // Only 1 pending commit confirmed.
+ if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want {
+ t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
+ }
+ if err := commitTracker.ConfirmOffsets(1); err != nil {
+ t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
+ }
+ if got, want := commitTracker.lastConfirmedOffset, int64(3); got != want {
+ t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
+ }
+ if got, want := commitTracker.Done(), false; got != want {
+ t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want)
+ }
+
+ // Final pending commit confirmed.
+ if err := commitTracker.ConfirmOffsets(1); err != nil {
+ t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
+ }
+ if got, want := commitTracker.lastConfirmedOffset, int64(4); got != want {
+ t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
+ }
+ if got, want := commitTracker.Done(), true; got != want {
+ t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want)
+ }
+
+ // Note: Done() returns true even though there are unacked messages.
+ ack4 := newAckConsumer(4, 0, onAck)
+ if err := ackTracker.Push(ack4); err != nil {
+ t.Errorf("ackTracker.Push() got err %v", err)
+ }
+ if got, want := commitTracker.Done(), true; got != want {
+ t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want)
+ }
+ if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+ t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+ }
+}