feat(pubsublite): Receive settings (#3195)
These will be used by subscribers.
diff --git a/pubsublite/internal/wire/settings.go b/pubsublite/internal/wire/settings.go
index 64a6665..3486729 100644
--- a/pubsublite/internal/wire/settings.go
+++ b/pubsublite/internal/wire/settings.go
@@ -29,11 +29,13 @@
MaxPublishMessageBytes = 1000000
// MaxPublishRequestBytes is the maximum allowed serialized size of a single
- // publish request (containing a batch of messages) in bytes.
+ // publish request (containing a batch of messages) in bytes. Must be lower
+ // than the gRPC limit of 4 MiB.
MaxPublishRequestBytes = 3500000
)
-// PublishSettings control the batching of published messages.
+// PublishSettings control the batching of published messages. These settings
+// apply per partition.
type PublishSettings struct {
// Publish a non-empty batch after this delay has passed. Must be > 0.
DelayThreshold time.Duration
@@ -71,7 +73,7 @@
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 100,
ByteThreshold: 1e6,
- Timeout: 60 * time.Second,
+ Timeout: 10 * time.Minute,
// By default set to a high limit that is not likely to occur, but prevents
// OOM errors in clients.
BufferedByteLimit: 1 << 30, // 1 GiB
@@ -101,3 +103,61 @@
}
return nil
}
+
+// ReceiveSettings control the receiving of messages. These settings apply
+// per partition.
+type ReceiveSettings struct {
+ // MaxOutstandingMessages is the maximum number of unacknowledged messages.
+ // Must be > 0.
+ MaxOutstandingMessages int
+
+ // MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
+ // messages. Must be > 0.
+ MaxOutstandingBytes int
+
+ // The maximum time that the client will attempt to establish a subscribe
+ // stream connection to the server. Must be > 0.
+ //
+ // The timeout is exceeded, the subscriber will terminate with the last error
+ // that occurred while trying to reconnect.
+ Timeout time.Duration
+
+ // The topic partition numbers (zero-indexed) to receive messages from.
+ // Values must be less than the number of partitions for the topic. If not
+ // specified, the client will use the partition assignment service to
+ // determine which partitions it should connect to.
+ Partitions []int
+}
+
+// DefaultReceiveSettings holds the default values for ReceiveSettings.
+var DefaultReceiveSettings = ReceiveSettings{
+ MaxOutstandingMessages: 1000,
+ MaxOutstandingBytes: 1e9,
+ Timeout: 10 * time.Minute,
+}
+
+func validateReceiveSettings(settings ReceiveSettings) error {
+ if settings.MaxOutstandingMessages <= 0 {
+ return errors.New("pubsublite: invalid receive settings. MaxOutstandingMessages must be > 0")
+ }
+ if settings.MaxOutstandingBytes <= 0 {
+ return errors.New("pubsublite: invalid receive settings. MaxOutstandingBytes must be > 0")
+ }
+ if settings.Timeout <= 0 {
+ return errors.New("pubsublite: invalid receive settings. Timeout duration must be > 0")
+ }
+ if len(settings.Partitions) > 0 {
+ var void struct{}
+ partitionMap := make(map[int]struct{})
+ for _, p := range settings.Partitions {
+ if p < 0 {
+ return fmt.Errorf("pubsublite: invalid partition number %d in receive settings. Partition numbers are zero-indexed", p)
+ }
+ if _, exists := partitionMap[p]; exists {
+ return fmt.Errorf("pubsublite: invalid receive settings. Duplicate partition number %d", p)
+ }
+ partitionMap[p] = void
+ }
+ }
+ return nil
+}
diff --git a/pubsublite/internal/wire/settings_test.go b/pubsublite/internal/wire/settings_test.go
index 175ca6d..400e79c 100644
--- a/pubsublite/internal/wire/settings_test.go
+++ b/pubsublite/internal/wire/settings_test.go
@@ -21,90 +21,81 @@
func TestValidatePublishSettings(t *testing.T) {
for _, tc := range []struct {
desc string
- // settingsFunc is passed DefaultPublishSettings
- settingsFunc func(settings PublishSettings) PublishSettings
- wantErr bool
+ // mutateSettings is passed a copy of DefaultPublishSettings to mutate.
+ mutateSettings func(settings *PublishSettings)
+ wantErr bool
}{
{
- desc: "valid: default",
- settingsFunc: func(settings PublishSettings) PublishSettings {
- return DefaultPublishSettings
- },
- wantErr: false,
+ desc: "valid: default",
+ mutateSettings: func(settings *PublishSettings) {},
+ wantErr: false,
},
{
desc: "valid: max",
- settingsFunc: func(settings PublishSettings) PublishSettings {
- return PublishSettings{
- CountThreshold: MaxPublishRequestCount,
- ByteThreshold: MaxPublishRequestBytes,
- // These have no max bounds, check large values.
- DelayThreshold: 24 * time.Hour,
- Timeout: 24 * time.Hour,
- BufferedByteLimit: 1e10,
- }
+ mutateSettings: func(settings *PublishSettings) {
+ settings.CountThreshold = MaxPublishRequestCount
+ settings.ByteThreshold = MaxPublishRequestBytes
+ // These have no max bounds, check large values.
+ settings.DelayThreshold = 24 * time.Hour
+ settings.Timeout = 24 * time.Hour
+ settings.BufferedByteLimit = 1e10
},
wantErr: false,
},
{
desc: "invalid: zero CountThreshold",
- settingsFunc: func(settings PublishSettings) PublishSettings {
+ mutateSettings: func(settings *PublishSettings) {
settings.CountThreshold = 0
- return settings
},
wantErr: true,
},
{
desc: "invalid: CountThreshold over MaxPublishRequestCount",
- settingsFunc: func(settings PublishSettings) PublishSettings {
+ mutateSettings: func(settings *PublishSettings) {
settings.CountThreshold = MaxPublishRequestCount + 1
- return settings
},
wantErr: true,
},
{
desc: "invalid: ByteThreshold over MaxPublishRequestBytes",
- settingsFunc: func(settings PublishSettings) PublishSettings {
+ mutateSettings: func(settings *PublishSettings) {
settings.ByteThreshold = MaxPublishRequestBytes + 1
- return settings
},
wantErr: true,
},
{
desc: "invalid: zero ByteThreshold",
- settingsFunc: func(settings PublishSettings) PublishSettings {
+ mutateSettings: func(settings *PublishSettings) {
settings.ByteThreshold = 0
- return settings
},
wantErr: true,
},
{
desc: "invalid: zero DelayThreshold",
- settingsFunc: func(settings PublishSettings) PublishSettings {
+ mutateSettings: func(settings *PublishSettings) {
settings.DelayThreshold = time.Duration(0)
- return settings
},
wantErr: true,
},
{
desc: "invalid: zero Timeout",
- settingsFunc: func(settings PublishSettings) PublishSettings {
+ mutateSettings: func(settings *PublishSettings) {
settings.Timeout = time.Duration(0)
- return settings
},
wantErr: true,
},
{
desc: "invalid: zero BufferedByteLimit",
- settingsFunc: func(settings PublishSettings) PublishSettings {
+ mutateSettings: func(settings *PublishSettings) {
settings.BufferedByteLimit = 0
- return settings
},
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
- settings := tc.settingsFunc(DefaultPublishSettings)
+ settings := DefaultPublishSettings
+ tc.mutateSettings(&settings)
+
gotErr := validatePublishSettings(settings)
if (gotErr != nil) != tc.wantErr {
t.Errorf("validatePublishSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr)
@@ -112,3 +103,67 @@
})
}
}
+
+func TestValidateReceiveSettings(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ // mutateSettings is passed a copy of DefaultReceiveSettings to mutate.
+ mutateSettings func(settings *ReceiveSettings)
+ wantErr bool
+ }{
+ {
+ desc: "valid: default",
+ mutateSettings: func(settings *ReceiveSettings) {},
+ wantErr: false,
+ },
+ {
+ desc: "valid: max",
+ mutateSettings: func(settings *ReceiveSettings) {
+ settings.Partitions = []int{5, 3, 9, 1, 4, 0}
+ // These have no max bounds, check large values.
+ settings.MaxOutstandingMessages = 100000
+ settings.MaxOutstandingBytes = 1e10
+ settings.Timeout = 24 * time.Hour
+ },
+ wantErr: false,
+ },
+ {
+ desc: "invalid: zero MaxOutstandingMessages",
+ mutateSettings: func(settings *ReceiveSettings) {
+ settings.MaxOutstandingMessages = 0
+ },
+ wantErr: true,
+ },
+ {
+ desc: "invalid: zero MaxOutstandingBytes",
+ mutateSettings: func(settings *ReceiveSettings) {
+ settings.MaxOutstandingBytes = 0
+ },
+ wantErr: true,
+ },
+ {
+ desc: "invalid: negative partition",
+ mutateSettings: func(settings *ReceiveSettings) {
+ settings.Partitions = []int{0, -1}
+ },
+ wantErr: true,
+ },
+ {
+ desc: "invalid: duplicate partition",
+ mutateSettings: func(settings *ReceiveSettings) {
+ settings.Partitions = []int{0, 1, 2, 3, 4, 1}
+ },
+ wantErr: true,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ settings := DefaultReceiveSettings
+ tc.mutateSettings(&settings)
+
+ gotErr := validateReceiveSettings(settings)
+ if (gotErr != nil) != tc.wantErr {
+ t.Errorf("validateReceiveSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr)
+ }
+ })
+ }
+}