feat(pubsublite): Receive settings (#3195)

These will be used by subscribers.
diff --git a/pubsublite/internal/wire/settings.go b/pubsublite/internal/wire/settings.go
index 64a6665..3486729 100644
--- a/pubsublite/internal/wire/settings.go
+++ b/pubsublite/internal/wire/settings.go
@@ -29,11 +29,13 @@
 	MaxPublishMessageBytes = 1000000
 
 	// MaxPublishRequestBytes is the maximum allowed serialized size of a single
-	// publish request (containing a batch of messages) in bytes.
+	// publish request (containing a batch of messages) in bytes. Must be lower
+	// than the gRPC limit of 4 MiB.
 	MaxPublishRequestBytes = 3500000
 )
 
-// PublishSettings control the batching of published messages.
+// PublishSettings control the batching of published messages. These settings
+// apply per partition.
 type PublishSettings struct {
 	// Publish a non-empty batch after this delay has passed. Must be > 0.
 	DelayThreshold time.Duration
@@ -71,7 +73,7 @@
 	DelayThreshold: 10 * time.Millisecond,
 	CountThreshold: 100,
 	ByteThreshold:  1e6,
-	Timeout:        60 * time.Second,
+	Timeout:        10 * time.Minute,
 	// By default set to a high limit that is not likely to occur, but prevents
 	// OOM errors in clients.
 	BufferedByteLimit: 1 << 30, // 1 GiB
@@ -101,3 +103,61 @@
 	}
 	return nil
 }
+
+// ReceiveSettings control the receiving of messages. These settings apply
+// per partition.
+type ReceiveSettings struct {
+	// MaxOutstandingMessages is the maximum number of unacknowledged messages.
+	// Must be > 0.
+	MaxOutstandingMessages int
+
+	// MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
+	// messages. Must be > 0.
+	MaxOutstandingBytes int
+
+	// The maximum time that the client will attempt to establish a subscribe
+	// stream connection to the server. Must be > 0.
+	//
+	// The timeout is exceeded, the subscriber will terminate with the last error
+	// that occurred while trying to reconnect.
+	Timeout time.Duration
+
+	// The topic partition numbers (zero-indexed) to receive messages from.
+	// Values must be less than the number of partitions for the topic. If not
+	// specified, the client will use the partition assignment service to
+	// determine which partitions it should connect to.
+	Partitions []int
+}
+
+// DefaultReceiveSettings holds the default values for ReceiveSettings.
+var DefaultReceiveSettings = ReceiveSettings{
+	MaxOutstandingMessages: 1000,
+	MaxOutstandingBytes:    1e9,
+	Timeout:                10 * time.Minute,
+}
+
+func validateReceiveSettings(settings ReceiveSettings) error {
+	if settings.MaxOutstandingMessages <= 0 {
+		return errors.New("pubsublite: invalid receive settings. MaxOutstandingMessages must be > 0")
+	}
+	if settings.MaxOutstandingBytes <= 0 {
+		return errors.New("pubsublite: invalid receive settings. MaxOutstandingBytes must be > 0")
+	}
+	if settings.Timeout <= 0 {
+		return errors.New("pubsublite: invalid receive settings. Timeout duration must be > 0")
+	}
+	if len(settings.Partitions) > 0 {
+		var void struct{}
+		partitionMap := make(map[int]struct{})
+		for _, p := range settings.Partitions {
+			if p < 0 {
+				return fmt.Errorf("pubsublite: invalid partition number %d in receive settings. Partition numbers are zero-indexed", p)
+			}
+			if _, exists := partitionMap[p]; exists {
+				return fmt.Errorf("pubsublite: invalid receive settings. Duplicate partition number %d", p)
+			}
+			partitionMap[p] = void
+		}
+	}
+	return nil
+}
diff --git a/pubsublite/internal/wire/settings_test.go b/pubsublite/internal/wire/settings_test.go
index 175ca6d..400e79c 100644
--- a/pubsublite/internal/wire/settings_test.go
+++ b/pubsublite/internal/wire/settings_test.go
@@ -21,90 +21,81 @@
 func TestValidatePublishSettings(t *testing.T) {
 	for _, tc := range []struct {
 		desc string
-		// settingsFunc is passed DefaultPublishSettings
-		settingsFunc func(settings PublishSettings) PublishSettings
-		wantErr      bool
+		// mutateSettings is passed a copy of DefaultPublishSettings to mutate.
+		mutateSettings func(settings *PublishSettings)
+		wantErr        bool
 	}{
 		{
-			desc: "valid: default",
-			settingsFunc: func(settings PublishSettings) PublishSettings {
-				return DefaultPublishSettings
-			},
-			wantErr: false,
+			desc:           "valid: default",
+			mutateSettings: func(settings *PublishSettings) {},
+			wantErr:        false,
 		},
 		{
 			desc: "valid: max",
-			settingsFunc: func(settings PublishSettings) PublishSettings {
-				return PublishSettings{
-					CountThreshold: MaxPublishRequestCount,
-					ByteThreshold:  MaxPublishRequestBytes,
-					// These have no max bounds, check large values.
-					DelayThreshold:    24 * time.Hour,
-					Timeout:           24 * time.Hour,
-					BufferedByteLimit: 1e10,
-				}
+			mutateSettings: func(settings *PublishSettings) {
+				settings.CountThreshold = MaxPublishRequestCount
+				settings.ByteThreshold = MaxPublishRequestBytes
+				// These have no max bounds, check large values.
+				settings.DelayThreshold = 24 * time.Hour
+				settings.Timeout = 24 * time.Hour
+				settings.BufferedByteLimit = 1e10
 			},
 			wantErr: false,
 		},
 		{
 			desc: "invalid: zero CountThreshold",
-			settingsFunc: func(settings PublishSettings) PublishSettings {
+			mutateSettings: func(settings *PublishSettings) {
 				settings.CountThreshold = 0
-				return settings
 			},
 			wantErr: true,
 		},
 		{
 			desc: "invalid: CountThreshold over MaxPublishRequestCount",
-			settingsFunc: func(settings PublishSettings) PublishSettings {
+			mutateSettings: func(settings *PublishSettings) {
 				settings.CountThreshold = MaxPublishRequestCount + 1
-				return settings
 			},
 			wantErr: true,
 		},
 		{
 			desc: "invalid: ByteThreshold over MaxPublishRequestBytes",
-			settingsFunc: func(settings PublishSettings) PublishSettings {
+			mutateSettings: func(settings *PublishSettings) {
 				settings.ByteThreshold = MaxPublishRequestBytes + 1
-				return settings
 			},
 			wantErr: true,
 		},
 		{
 			desc: "invalid: zero ByteThreshold",
-			settingsFunc: func(settings PublishSettings) PublishSettings {
+			mutateSettings: func(settings *PublishSettings) {
 				settings.ByteThreshold = 0
-				return settings
 			},
 			wantErr: true,
 		},
 		{
 			desc: "invalid: zero DelayThreshold",
-			settingsFunc: func(settings PublishSettings) PublishSettings {
+			mutateSettings: func(settings *PublishSettings) {
 				settings.DelayThreshold = time.Duration(0)
-				return settings
 			},
 			wantErr: true,
 		},
 		{
 			desc: "invalid: zero Timeout",
-			settingsFunc: func(settings PublishSettings) PublishSettings {
+			mutateSettings: func(settings *PublishSettings) {
 				settings.Timeout = time.Duration(0)
-				return settings
 			},
 			wantErr: true,
 		},
 		{
 			desc: "invalid: zero BufferedByteLimit",
-			settingsFunc: func(settings PublishSettings) PublishSettings {
+			mutateSettings: func(settings *PublishSettings) {
 				settings.BufferedByteLimit = 0
-				return settings
 			},
 			wantErr: true,
 		},
 	} {
 		t.Run(tc.desc, func(t *testing.T) {
-			settings := tc.settingsFunc(DefaultPublishSettings)
+			settings := DefaultPublishSettings
+			tc.mutateSettings(&settings)
+
 			gotErr := validatePublishSettings(settings)
 			if (gotErr != nil) != tc.wantErr {
 				t.Errorf("validatePublishSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr)
@@ -112,3 +103,67 @@
 		})
 	}
 }
+
+func TestValidateReceiveSettings(t *testing.T) {
+	for _, tc := range []struct {
+		desc string
+		// mutateSettings is passed a copy of DefaultReceiveSettings to mutate.
+		mutateSettings func(settings *ReceiveSettings)
+		wantErr        bool
+	}{
+		{
+			desc:           "valid: default",
+			mutateSettings: func(settings *ReceiveSettings) {},
+			wantErr:        false,
+		},
+		{
+			desc: "valid: max",
+			mutateSettings: func(settings *ReceiveSettings) {
+				settings.Partitions = []int{5, 3, 9, 1, 4, 0}
+				// These have no max bounds, check large values.
+				settings.MaxOutstandingMessages = 100000
+				settings.MaxOutstandingBytes = 1e10
+				settings.Timeout = 24 * time.Hour
+			},
+			wantErr: false,
+		},
+		{
+			desc: "invalid: zero MaxOutstandingMessages",
+			mutateSettings: func(settings *ReceiveSettings) {
+				settings.MaxOutstandingMessages = 0
+			},
+			wantErr: true,
+		},
+		{
+			desc: "invalid: zero MaxOutstandingBytes",
+			mutateSettings: func(settings *ReceiveSettings) {
+				settings.MaxOutstandingBytes = 0
+			},
+			wantErr: true,
+		},
+		{
+			desc: "invalid: negative partition",
+			mutateSettings: func(settings *ReceiveSettings) {
+				settings.Partitions = []int{0, -1}
+			},
+			wantErr: true,
+		},
+		{
+			desc: "invalid: duplicate partition",
+			mutateSettings: func(settings *ReceiveSettings) {
+				settings.Partitions = []int{0, 1, 2, 3, 4, 1}
+			},
+			wantErr: true,
+		},
+	} {
+		t.Run(tc.desc, func(t *testing.T) {
+			settings := DefaultReceiveSettings
+			tc.mutateSettings(&settings)
+
+			gotErr := validateReceiveSettings(settings)
+			if (gotErr != nil) != tc.wantErr {
+				t.Errorf("validateReceiveSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr)
+			}
+		})
+	}
+}