feat(pubsublite): Trackers for acks and commit cursor (#3137)

These are similar to their Java counterparts with the following differences:
- ackConsumer is similar to AckReplyConsumer.
- ackTracker is equivalent to AckSetTrackerImpl. It maintains a single ordered queue of unacked offsets and stores the desired commit offset in ackedPrefixOffset.
- commitCursorTracker is equivalent to CommitState. It maintains the last commit offset confirmed by the server and a queue of unacknowledged committed offsets.
diff --git a/pubsublite/internal/wire/acks.go b/pubsublite/internal/wire/acks.go
new file mode 100644
index 0000000..0a7c208
--- /dev/null
+++ b/pubsublite/internal/wire/acks.go
@@ -0,0 +1,240 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package wire
+
+import (
+	"container/list"
+	"fmt"
+	"sync"
+)
+
+// AckConsumer is the interface exported from this package for acking messages.
+type AckConsumer interface {
+	Ack()
+}
+
+// ackedFunc is invoked when a message has been acked by the user. Note: if the
+// ackedFunc implementation calls any ackConsumer methods, it needs to run in a
+// goroutine to avoid a deadlock.
+type ackedFunc func(*ackConsumer)
+
+// ackConsumer is used for handling message acks. It is attached to a Message
+// and also stored within the ackTracker until the message has been acked by the
+// user.
+type ackConsumer struct {
+	// The message offset.
+	Offset int64
+	// Bytes released to the flow controller once the message has been acked.
+	MsgBytes int64
+
+	// Guards access to fields below.
+	mu    sync.Mutex
+	acked bool
+	onAck ackedFunc
+}
+
+func newAckConsumer(offset, msgBytes int64, onAck ackedFunc) *ackConsumer {
+	return &ackConsumer{Offset: offset, MsgBytes: msgBytes, onAck: onAck}
+}
+
+func (ac *ackConsumer) Ack() {
+	ac.mu.Lock()
+	defer ac.mu.Unlock()
+
+	if ac.acked {
+		return
+	}
+	ac.acked = true
+	if ac.onAck != nil {
+		// Not invoked in a goroutine here for ease of testing.
+		ac.onAck(ac)
+	}
+}
+
+func (ac *ackConsumer) IsAcked() bool {
+	ac.mu.Lock()
+	defer ac.mu.Unlock()
+	return ac.acked
+}
+
+// Clear onAck when the ack can no longer be processed. The user's ack would be
+// ignored.
+func (ac *ackConsumer) Clear() {
+	ac.mu.Lock()
+	defer ac.mu.Unlock()
+	ac.onAck = nil
+}
+
+// Represents an uninitialized cursor offset. A sentinel value is used instead
+// if an optional to simplify cursor comparisons (i.e. -1 works without the need
+// to check for nil and then convert to int64).
+const nilCursorOffset int64 = -1
+
+// ackTracker manages outstanding message acks, i.e. messages that have been
+// delivered to the user, but not yet acked. It is used by the committer and
+// wireSubscriber, so requires its own mutex.
+type ackTracker struct {
+	// Guards access to fields below.
+	mu sync.Mutex
+	// All offsets before and including this prefix have been acked by the user.
+	ackedPrefixOffset int64
+	// Outstanding message acks, strictly ordered by increasing message offsets.
+	outstandingAcks *list.List // Value = *ackConsumer
+}
+
+func newAckTracker() *ackTracker {
+	return &ackTracker{
+		ackedPrefixOffset: nilCursorOffset,
+		outstandingAcks:   list.New(),
+	}
+}
+
+// Push adds an outstanding ack to the tracker.
+func (at *ackTracker) Push(ack *ackConsumer) error {
+	at.mu.Lock()
+	defer at.mu.Unlock()
+
+	// These errors should not occur unless there is a bug in the client library
+	// as message ordering should have been validated by subscriberOffsetTracker.
+	if ack.Offset <= at.ackedPrefixOffset {
+		return errOutOfOrderMessages
+	}
+	if elem := at.outstandingAcks.Back(); elem != nil {
+		lastOutstandingAck, _ := elem.Value.(*ackConsumer)
+		if ack.Offset <= lastOutstandingAck.Offset {
+			return errOutOfOrderMessages
+		}
+	}
+
+	at.outstandingAcks.PushBack(ack)
+	return nil
+}
+
+// CommitOffset returns the cursor offset that should be committed. May return
+// nilCursorOffset if no messages have been acked thus far.
+func (at *ackTracker) CommitOffset() int64 {
+	at.mu.Lock()
+	defer at.mu.Unlock()
+
+	// Process outstanding acks and update `ackedPrefixOffset` until an unacked
+	// message is found.
+	for {
+		elem := at.outstandingAcks.Front()
+		if elem == nil {
+			break
+		}
+		ack, _ := elem.Value.(*ackConsumer)
+		if !ack.IsAcked() {
+			break
+		}
+		at.ackedPrefixOffset = ack.Offset
+		at.outstandingAcks.Remove(elem)
+		ack.Clear()
+	}
+
+	if at.ackedPrefixOffset == nilCursorOffset {
+		return nilCursorOffset
+	}
+	// Convert from last acked to first unacked, which is the commit offset.
+	return at.ackedPrefixOffset + 1
+}
+
+// Release clears and invalidates any outstanding acks. This should be called
+// when the subscriber terminates.
+func (at *ackTracker) Release() {
+	at.mu.Lock()
+	defer at.mu.Unlock()
+
+	for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() {
+		ack, _ := elem.Value.(*ackConsumer)
+		ack.Clear()
+	}
+	at.outstandingAcks.Init()
+}
+
+// commitCursorTracker tracks pending and last successful committed offsets.
+// It is only accessed by the committer.
+type commitCursorTracker struct {
+	// Used to obtain the desired commit offset based on messages acked by the
+	// user.
+	acks *ackTracker
+	// Last offset for which the server confirmed (acknowledged) the commit.
+	lastConfirmedOffset int64
+	// Queue of committed offsets awaiting confirmation from the server.
+	pendingOffsets *list.List // Value = int64
+}
+
+func newCommitCursorTracker(acks *ackTracker) *commitCursorTracker {
+	return &commitCursorTracker{
+		acks:                acks,
+		lastConfirmedOffset: nilCursorOffset,
+		pendingOffsets:      list.New(),
+	}
+}
+
+func extractOffsetFromElem(elem *list.Element) int64 {
+	if elem == nil {
+		return nilCursorOffset
+	}
+	offset, _ := elem.Value.(int64)
+	return offset
+}
+
+// NextOffset is the commit offset to be sent to the stream. Returns
+// nilCursorOffset if the commit offset does not need to be updated.
+func (ct *commitCursorTracker) NextOffset() int64 {
+	desiredCommitOffset := ct.acks.CommitOffset()
+	if desiredCommitOffset <= ct.lastConfirmedOffset {
+		// The server has already confirmed the commit offset.
+		return nilCursorOffset
+	}
+	if desiredCommitOffset <= extractOffsetFromElem(ct.pendingOffsets.Back()) {
+		// The commit offset has already been sent to the commit stream and is
+		// awaiting confirmation.
+		return nilCursorOffset
+	}
+	return desiredCommitOffset
+}
+
+// AddPending adds a sent, but not yet confirmed, committed offset.
+func (ct *commitCursorTracker) AddPending(offset int64) {
+	ct.pendingOffsets.PushBack(offset)
+}
+
+// ClearPending discards old pending offsets. Should be called when the commit
+// stream reconnects, as the server acknowledgments for these would not be
+// received.
+func (ct *commitCursorTracker) ClearPending() {
+	ct.pendingOffsets.Init()
+}
+
+// ConfirmOffsets processes the server's acknowledgment of the first
+// `numConfirmed` pending offsets.
+func (ct *commitCursorTracker) ConfirmOffsets(numConfirmed int64) error {
+	if numPending := int64(ct.pendingOffsets.Len()); numPending < numConfirmed {
+		return fmt.Errorf("pubsublite: server acknowledged %d cursor commits, but only %d were sent", numConfirmed, numPending)
+	}
+
+	for i := int64(0); i < numConfirmed; i++ {
+		front := ct.pendingOffsets.Front()
+		ct.lastConfirmedOffset = extractOffsetFromElem(front)
+		ct.pendingOffsets.Remove(front)
+	}
+	return nil
+}
+
+// Done when the server has confirmed the desired commit offset.
+func (ct *commitCursorTracker) Done() bool {
+	return ct.acks.CommitOffset() <= ct.lastConfirmedOffset
+}
diff --git a/pubsublite/internal/wire/acks_test.go b/pubsublite/internal/wire/acks_test.go
new file mode 100644
index 0000000..b7204e3
--- /dev/null
+++ b/pubsublite/internal/wire/acks_test.go
@@ -0,0 +1,311 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package wire
+
+import "testing"
+
+func TestAckConsumerAck(t *testing.T) {
+	numAcks := 0
+	onAck := func(ac *ackConsumer) {
+		numAcks++
+	}
+	ackConsumer := newAckConsumer(0, 0, onAck)
+
+	// Test duplicate acks.
+	for i := 0; i < 3; i++ {
+		ackConsumer.Ack()
+
+		if got, want := ackConsumer.IsAcked(), true; got != want {
+			t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want)
+		}
+		if got, want := numAcks, 1; got != want {
+			t.Errorf("onAck func called %v times, expected %v call", got, want)
+		}
+	}
+}
+
+func TestAckConsumerClear(t *testing.T) {
+	onAck := func(ac *ackConsumer) {
+		t.Error("onAck func should not have been called")
+	}
+	ackConsumer := newAckConsumer(0, 0, onAck)
+	ackConsumer.Clear()
+	ackConsumer.Ack()
+
+	if got, want := ackConsumer.IsAcked(), true; got != want {
+		t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want)
+	}
+}
+
+func TestAckTrackerProcessing(t *testing.T) {
+	ackTracker := newAckTracker()
+
+	// No messages received yet.
+	if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want {
+		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
+	}
+
+	onAck := func(ac *ackConsumer) {
+		// Nothing to do.
+	}
+	ack1 := newAckConsumer(1, 0, onAck)
+	ack2 := newAckConsumer(2, 0, onAck)
+	ack3 := newAckConsumer(3, 0, onAck)
+	if err := ackTracker.Push(ack1); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+	if err := ackTracker.Push(ack2); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+	if err := ackTracker.Push(ack3); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+
+	// All messages unacked.
+	if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want {
+		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
+	}
+
+	ack1.Ack()
+	if got, want := ackTracker.CommitOffset(), int64(2); got != want {
+		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
+	}
+
+	// Skipped ack2, so the commit offset should not have been updated.
+	ack3.Ack()
+	if got, want := ackTracker.CommitOffset(), int64(2); got != want {
+		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
+	}
+
+	// Both ack2 and ack3 should be removed from the outstanding acks queue.
+	ack2.Ack()
+	if got, want := ackTracker.CommitOffset(), int64(4); got != want {
+		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
+	}
+
+	// Newly received message.
+	ack4 := newAckConsumer(4, 0, onAck)
+	if err := ackTracker.Push(ack4); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+	ack4.Ack()
+	if got, want := ackTracker.CommitOffset(), int64(5); got != want {
+		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
+	}
+}
+
+func TestAckTrackerRelease(t *testing.T) {
+	ackTracker := newAckTracker()
+	onAck := func(ac *ackConsumer) {
+		t.Error("onAck should not be called")
+	}
+	ack1 := newAckConsumer(1, 0, onAck)
+	ack2 := newAckConsumer(2, 0, onAck)
+	ack3 := newAckConsumer(3, 0, onAck)
+
+	if err := ackTracker.Push(ack1); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+	if err := ackTracker.Push(ack2); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+	if err := ackTracker.Push(ack3); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+
+	// After clearing outstanding acks, onAck should not be called.
+	ackTracker.Release()
+	ack1.Ack()
+	ack2.Ack()
+	ack3.Ack()
+}
+
+func TestCommitCursorTrackerProcessing(t *testing.T) {
+	ackTracker := newAckTracker()
+	commitTracker := newCommitCursorTracker(ackTracker)
+
+	// No messages received yet.
+	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+
+	onAck := func(ac *ackConsumer) {
+		// Nothing to do.
+	}
+	ack1 := newAckConsumer(1, 0, onAck)
+	ack2 := newAckConsumer(2, 0, onAck)
+	ack3 := newAckConsumer(3, 0, onAck)
+	if err := ackTracker.Push(ack1); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+	if err := ackTracker.Push(ack2); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+	if err := ackTracker.Push(ack3); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+
+	// All messages unacked.
+	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+
+	// Msg1 acked and commit sent to stream.
+	ack1.Ack()
+	if got, want := commitTracker.NextOffset(), int64(2); got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+	commitTracker.AddPending(commitTracker.NextOffset())
+	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+
+	// Msg 2 & 3 acked commit and sent to stream.
+	ack2.Ack()
+	if got, want := commitTracker.NextOffset(), int64(3); got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+	ack3.Ack()
+	if got, want := commitTracker.NextOffset(), int64(4); got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+	commitTracker.AddPending(commitTracker.NextOffset())
+	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+	if got, want := commitTracker.Done(), false; got != want {
+		t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want)
+	}
+
+	// First 2 pending commits acknowledged.
+	if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want {
+		t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
+	}
+	if err := commitTracker.ConfirmOffsets(2); err != nil {
+		t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
+	}
+	if got, want := commitTracker.lastConfirmedOffset, int64(4); got != want {
+		t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
+	}
+	if got, want := commitTracker.Done(), true; got != want {
+		t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want)
+	}
+}
+
+func TestCommitCursorTrackerStreamReconnects(t *testing.T) {
+	ackTracker := newAckTracker()
+	commitTracker := newCommitCursorTracker(ackTracker)
+
+	onAck := func(ac *ackConsumer) {
+		// Nothing to do.
+	}
+	ack1 := newAckConsumer(1, 0, onAck)
+	ack2 := newAckConsumer(2, 0, onAck)
+	ack3 := newAckConsumer(3, 0, onAck)
+	if err := ackTracker.Push(ack1); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+	if err := ackTracker.Push(ack2); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+	if err := ackTracker.Push(ack3); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+
+	// All messages unacked.
+	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+
+	// Msg1 acked and commit sent to stream.
+	ack1.Ack()
+	if got, want := commitTracker.NextOffset(), int64(2); got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+	commitTracker.AddPending(commitTracker.NextOffset())
+	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+
+	// Msg2 acked and commit sent to stream.
+	ack2.Ack()
+	if got, want := commitTracker.NextOffset(), int64(3); got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+	commitTracker.AddPending(commitTracker.NextOffset())
+	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+
+	// Stream breaks and pending offsets are cleared.
+	commitTracker.ClearPending()
+	if got, want := commitTracker.Done(), false; got != want {
+		t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want)
+	}
+	// When the stream reconnects the next offset should be 3 (offset 2 skipped).
+	if got, want := commitTracker.NextOffset(), int64(3); got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+	commitTracker.AddPending(commitTracker.NextOffset())
+	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+
+	// Msg2 acked and commit sent to stream.
+	ack3.Ack()
+	if got, want := commitTracker.NextOffset(), int64(4); got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+	commitTracker.AddPending(commitTracker.NextOffset())
+	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+
+	// Only 1 pending commit confirmed.
+	if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want {
+		t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
+	}
+	if err := commitTracker.ConfirmOffsets(1); err != nil {
+		t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
+	}
+	if got, want := commitTracker.lastConfirmedOffset, int64(3); got != want {
+		t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
+	}
+	if got, want := commitTracker.Done(), false; got != want {
+		t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want)
+	}
+
+	// Final pending commit confirmed.
+	if err := commitTracker.ConfirmOffsets(1); err != nil {
+		t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
+	}
+	if got, want := commitTracker.lastConfirmedOffset, int64(4); got != want {
+		t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
+	}
+	if got, want := commitTracker.Done(), true; got != want {
+		t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want)
+	}
+
+	// Note: Done() returns true even though there are unacked messages.
+	ack4 := newAckConsumer(4, 0, onAck)
+	if err := ackTracker.Push(ack4); err != nil {
+		t.Errorf("ackTracker.Push() got err %v", err)
+	}
+	if got, want := commitTracker.Done(), true; got != want {
+		t.Errorf("commitCursorTracker.Done() got %v, want %v", got, want)
+	}
+	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
+		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
+	}
+}