feat(pubsublite): Message type and message routers (#3077)
pubsublite.Message is similar to pubsub.Message, with the following differences:
- Attributes can have multiple values for the same key.
- Pub/Sub Lite uses []byte for data, attribute values and ordering keys.
Message routers select a partition to route a published message to. SHA256 hash is used for routing messages with ordering keys. Round robin is used for routing messages without ordering keys.
diff --git a/pubsublite/message.go b/pubsublite/message.go
new file mode 100644
index 0000000..79e6a71
--- /dev/null
+++ b/pubsublite/message.go
@@ -0,0 +1,153 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package pubsublite
+
+import (
+ "crypto/sha256"
+ "fmt"
+ "math/big"
+ "math/rand"
+ "time"
+
+ "github.com/golang/protobuf/ptypes"
+
+ pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
+)
+
+// AttributeValues is a slice of strings.
+type AttributeValues [][]byte
+
+// Message represents a Pub/Sub message.
+type Message struct {
+ // Data is the actual data in the message.
+ Data []byte
+
+ // Attributes can be used to label the message. A key may have multiple
+ // values.
+ Attributes map[string]AttributeValues
+
+ // EventTime is an optional, user-specified event time for this message.
+ EventTime time.Time
+
+ // OrderingKey identifies related messages for which publish order should
+ // be respected. Messages with the same ordering key are published to the
+ // same topic partition and subscribers will receive the messages in order.
+ // If the ordering key is empty, the message will be sent to an arbitrary
+ // partition.
+ OrderingKey []byte
+}
+
+func (m *Message) toProto() (*pb.PubSubMessage, error) {
+ msgpb := &pb.PubSubMessage{
+ Data: m.Data,
+ Key: m.OrderingKey,
+ }
+
+ if len(m.Attributes) > 0 {
+ msgpb.Attributes = make(map[string]*pb.AttributeValues)
+ for key, values := range m.Attributes {
+ msgpb.Attributes[key] = &pb.AttributeValues{Values: values}
+ }
+ }
+
+ if !m.EventTime.IsZero() {
+ ts, err := ptypes.TimestampProto(m.EventTime)
+ if err != nil {
+ return nil, fmt.Errorf("pubsublite: error converting message timestamp: %v", err)
+ }
+ msgpb.EventTime = ts
+ }
+ return msgpb, nil
+}
+
+// messageRouter outputs a partition number, given an ordering key. Results are
+// undefined when:
+// - setPartitionCount() is called with count <= 0.
+// - route() is called before setPartitionCount() to initialize the router.
+//
+// Message routers need to accommodate topic partition resizing.
+type messageRouter interface {
+ SetPartitionCount(count int)
+ Route(orderingKey []byte) int
+}
+
+// roundRobinMsgRouter sequentially cycles through partition numbers, starting
+// from a random partition.
+type roundRobinMsgRouter struct {
+ rng *rand.Rand
+ partitionCount int
+ nextPartition int
+}
+
+func (r *roundRobinMsgRouter) SetPartitionCount(count int) {
+ r.partitionCount = count
+ r.nextPartition = int(r.rng.Int63n(int64(count)))
+}
+
+func (r *roundRobinMsgRouter) Route(orderingKey []byte) (partition int) {
+ partition = r.nextPartition
+ r.nextPartition = (partition + 1) % r.partitionCount
+ return
+}
+
+// hashingMsgRouter hashes an ordering key using SHA256 to obtain a partition
+// number. It should only be used for messages with an ordering key.
+//
+// Matches implementation at:
+// https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/DefaultRoutingPolicy.java
+type hashingMsgRouter struct {
+ partitionCount *big.Int
+}
+
+func (r *hashingMsgRouter) SetPartitionCount(count int) {
+ r.partitionCount = big.NewInt(int64(count))
+}
+
+func (r *hashingMsgRouter) Route(orderingKey []byte) int {
+ if len(orderingKey) == 0 {
+ return -1
+ }
+ h := sha256.Sum256(orderingKey)
+ num := new(big.Int).SetBytes(h[:])
+ partition := new(big.Int).Mod(num, r.partitionCount)
+ return int(partition.Int64())
+}
+
+// compositeMsgRouter delegates to different message routers for messages
+// with/without ordering keys.
+type compositeMsgRouter struct {
+ keyedRouter messageRouter
+ keylessRouter messageRouter
+}
+
+func (r *compositeMsgRouter) SetPartitionCount(count int) {
+ r.keyedRouter.SetPartitionCount(count)
+ r.keylessRouter.SetPartitionCount(count)
+}
+
+func (r *compositeMsgRouter) Route(orderingKey []byte) int {
+ if len(orderingKey) > 0 {
+ return r.keyedRouter.Route(orderingKey)
+ }
+ return r.keylessRouter.Route(orderingKey)
+}
+
+// defaultMessageRouter returns a compositeMsgRouter that uses hashingMsgRouter
+// for messages with ordering key and roundRobinMsgRouter for messages without.
+func newDefaultMessageRouter(rng *rand.Rand) messageRouter {
+ return &compositeMsgRouter{
+ keyedRouter: &hashingMsgRouter{},
+ keylessRouter: &roundRobinMsgRouter{rng: rng},
+ }
+}
diff --git a/pubsublite/message_test.go b/pubsublite/message_test.go
new file mode 100644
index 0000000..1d5b3a9
--- /dev/null
+++ b/pubsublite/message_test.go
@@ -0,0 +1,215 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package pubsublite
+
+import (
+ "fmt"
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ tspb "github.com/golang/protobuf/ptypes/timestamp"
+ pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
+)
+
+type fakeSource struct {
+ ret int64
+}
+
+func (f *fakeSource) Int63() int64 { return f.ret }
+func (f *fakeSource) Seed(seed int64) {}
+
+type fakeMsgRouter struct {
+ multiplier int
+ partitionCount int
+}
+
+func (f *fakeMsgRouter) SetPartitionCount(count int) {
+ f.partitionCount = count
+}
+
+func (f *fakeMsgRouter) Route(orderingKey []byte) int {
+ return f.partitionCount * f.multiplier
+}
+
+func TestMessageToProto(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ msg *Message
+ want *pb.PubSubMessage
+ }{
+ {
+ desc: "valid: minimal",
+ msg: &Message{
+ Data: []byte("Hello world"),
+ },
+ want: &pb.PubSubMessage{
+ Data: []byte("Hello world"),
+ },
+ },
+ {
+ desc: "valid: filled",
+ msg: &Message{
+ Data: []byte("foo"),
+ Attributes: map[string]AttributeValues{
+ "attr1": [][]byte{
+ []byte("val1"),
+ []byte("val2"),
+ },
+ },
+ EventTime: time.Unix(1555593697, 154358*1000),
+ OrderingKey: []byte("order"),
+ },
+ want: &pb.PubSubMessage{
+ Data: []byte("foo"),
+ Attributes: map[string]*pb.AttributeValues{
+ "attr1": {
+ Values: [][]byte{
+ []byte("val1"),
+ []byte("val2"),
+ },
+ },
+ },
+ EventTime: &tspb.Timestamp{
+ Seconds: 1555593697,
+ Nanos: 154358 * 1000,
+ },
+ Key: []byte("order"),
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ got, err := tc.msg.toProto()
+ if err != nil {
+ t.Errorf("toProto() err = %v", err)
+ } else if !proto.Equal(got, tc.want) {
+ t.Errorf("toProto() got = %v\nwant = %v", got, tc.want)
+ }
+ })
+ }
+}
+
+func TestRoundRobinMsgRouter(t *testing.T) {
+ // Using the same msgRouter for each test run ensures that it reinitializes
+ // when the partition count changes.
+ source := &fakeSource{}
+ msgRouter := &roundRobinMsgRouter{rng: rand.New(source)}
+
+ for _, tc := range []struct {
+ partitionCount int
+ source int64
+ want []int
+ }{
+ {
+ partitionCount: 8,
+ source: 9,
+ want: []int{1, 2, 3, 4, 5, 6, 7, 0, 1},
+ },
+ {
+ partitionCount: 5,
+ source: 2,
+ want: []int{2, 3, 4, 0, 1, 2},
+ },
+ } {
+ t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) {
+ source.ret = tc.source
+ msgRouter.SetPartitionCount(tc.partitionCount)
+ for i, want := range tc.want {
+ got := msgRouter.Route([]byte("IGNORED"))
+ if got != want {
+ t.Errorf("i=%d: Route() = %d, want = %d", i, got, want)
+ }
+ }
+ })
+ }
+}
+
+func TestHashingMsgRouter(t *testing.T) {
+ // Using the same msgRouter for each test run ensures that it reinitializes
+ // when the partition count changes.
+ msgRouter := &hashingMsgRouter{}
+
+ keys := [][]byte{
+ []byte("foo1"),
+ []byte("foo2"),
+ []byte("foo3"),
+ []byte("foo4"),
+ []byte("foo5"),
+ }
+
+ for _, tc := range []struct {
+ partitionCount int
+ }{
+ {partitionCount: 10},
+ {partitionCount: 5},
+ } {
+ t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) {
+ msgRouter.SetPartitionCount(tc.partitionCount)
+ for _, key := range keys {
+ p1 := msgRouter.Route(key)
+ p2 := msgRouter.Route(key)
+ if p1 != p2 {
+ t.Errorf("Route() returned different partitions for same key %v", key)
+ }
+ if p1 < 0 || p1 >= tc.partitionCount {
+ t.Errorf("Route() returned partition out of range: %v", p1)
+ }
+ }
+ })
+ }
+}
+
+func TestCompositeMsgRouter(t *testing.T) {
+ keyedRouter := &fakeMsgRouter{multiplier: 10}
+ keylessRouter := &fakeMsgRouter{multiplier: 100}
+ msgRouter := &compositeMsgRouter{
+ keyedRouter: keyedRouter,
+ keylessRouter: keylessRouter,
+ }
+
+ for _, tc := range []struct {
+ desc string
+ partitionCount int
+ key []byte
+ want int
+ }{
+ {
+ desc: "key",
+ partitionCount: 2,
+ key: []byte("foo"),
+ want: 20,
+ },
+ {
+ desc: "nil key",
+ partitionCount: 8,
+ key: nil,
+ want: 800,
+ },
+ {
+ desc: "empty key",
+ partitionCount: 5,
+ key: []byte{},
+ want: 500,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ msgRouter.SetPartitionCount(tc.partitionCount)
+ if got := msgRouter.Route(tc.key); got != tc.want {
+ t.Errorf("Route() = %d, want = %d", got, tc.want)
+ }
+ })
+ }
+}