feat(pubsublite): Message type and message routers (#3077)

pubsublite.Message is similar to pubsub.Message, with the following differences:
- Attributes can have multiple values for the same key.
- Pub/Sub Lite uses []byte for data, attribute values and ordering keys.

Message routers select a partition to route a published message to. SHA256 hash is used for routing messages with ordering keys. Round robin is used for routing messages without ordering keys.
diff --git a/pubsublite/message.go b/pubsublite/message.go
new file mode 100644
index 0000000..79e6a71
--- /dev/null
+++ b/pubsublite/message.go
@@ -0,0 +1,153 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package pubsublite
+
+import (
+	"crypto/sha256"
+	"fmt"
+	"math/big"
+	"math/rand"
+	"time"
+
+	"github.com/golang/protobuf/ptypes"
+
+	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
+)
+
+// AttributeValues is a slice of strings.
+type AttributeValues [][]byte
+
+// Message represents a Pub/Sub message.
+type Message struct {
+	// Data is the actual data in the message.
+	Data []byte
+
+	// Attributes can be used to label the message. A key may have multiple
+	// values.
+	Attributes map[string]AttributeValues
+
+	// EventTime is an optional, user-specified event time for this message.
+	EventTime time.Time
+
+	// OrderingKey identifies related messages for which publish order should
+	// be respected. Messages with the same ordering key are published to the
+	// same topic partition and subscribers will receive the messages in order.
+	// If the ordering key is empty, the message will be sent to an arbitrary
+	// partition.
+	OrderingKey []byte
+}
+
+func (m *Message) toProto() (*pb.PubSubMessage, error) {
+	msgpb := &pb.PubSubMessage{
+		Data: m.Data,
+		Key:  m.OrderingKey,
+	}
+
+	if len(m.Attributes) > 0 {
+		msgpb.Attributes = make(map[string]*pb.AttributeValues)
+		for key, values := range m.Attributes {
+			msgpb.Attributes[key] = &pb.AttributeValues{Values: values}
+		}
+	}
+
+	if !m.EventTime.IsZero() {
+		ts, err := ptypes.TimestampProto(m.EventTime)
+		if err != nil {
+			return nil, fmt.Errorf("pubsublite: error converting message timestamp: %v", err)
+		}
+		msgpb.EventTime = ts
+	}
+	return msgpb, nil
+}
+
+// messageRouter outputs a partition number, given an ordering key. Results are
+// undefined when:
+// - setPartitionCount() is called with count <= 0.
+// - route() is called before setPartitionCount() to initialize the router.
+//
+// Message routers need to accommodate topic partition resizing.
+type messageRouter interface {
+	SetPartitionCount(count int)
+	Route(orderingKey []byte) int
+}
+
+// roundRobinMsgRouter sequentially cycles through partition numbers, starting
+// from a random partition.
+type roundRobinMsgRouter struct {
+	rng            *rand.Rand
+	partitionCount int
+	nextPartition  int
+}
+
+func (r *roundRobinMsgRouter) SetPartitionCount(count int) {
+	r.partitionCount = count
+	r.nextPartition = int(r.rng.Int63n(int64(count)))
+}
+
+func (r *roundRobinMsgRouter) Route(orderingKey []byte) (partition int) {
+	partition = r.nextPartition
+	r.nextPartition = (partition + 1) % r.partitionCount
+	return
+}
+
+// hashingMsgRouter hashes an ordering key using SHA256 to obtain a partition
+// number. It should only be used for messages with an ordering key.
+//
+// Matches implementation at:
+// https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/DefaultRoutingPolicy.java
+type hashingMsgRouter struct {
+	partitionCount *big.Int
+}
+
+func (r *hashingMsgRouter) SetPartitionCount(count int) {
+	r.partitionCount = big.NewInt(int64(count))
+}
+
+func (r *hashingMsgRouter) Route(orderingKey []byte) int {
+	if len(orderingKey) == 0 {
+		return -1
+	}
+	h := sha256.Sum256(orderingKey)
+	num := new(big.Int).SetBytes(h[:])
+	partition := new(big.Int).Mod(num, r.partitionCount)
+	return int(partition.Int64())
+}
+
+// compositeMsgRouter delegates to different message routers for messages
+// with/without ordering keys.
+type compositeMsgRouter struct {
+	keyedRouter   messageRouter
+	keylessRouter messageRouter
+}
+
+func (r *compositeMsgRouter) SetPartitionCount(count int) {
+	r.keyedRouter.SetPartitionCount(count)
+	r.keylessRouter.SetPartitionCount(count)
+}
+
+func (r *compositeMsgRouter) Route(orderingKey []byte) int {
+	if len(orderingKey) > 0 {
+		return r.keyedRouter.Route(orderingKey)
+	}
+	return r.keylessRouter.Route(orderingKey)
+}
+
+// defaultMessageRouter returns a compositeMsgRouter that uses hashingMsgRouter
+// for messages with ordering key and roundRobinMsgRouter for messages without.
+func newDefaultMessageRouter(rng *rand.Rand) messageRouter {
+	return &compositeMsgRouter{
+		keyedRouter:   &hashingMsgRouter{},
+		keylessRouter: &roundRobinMsgRouter{rng: rng},
+	}
+}
diff --git a/pubsublite/message_test.go b/pubsublite/message_test.go
new file mode 100644
index 0000000..1d5b3a9
--- /dev/null
+++ b/pubsublite/message_test.go
@@ -0,0 +1,215 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package pubsublite
+
+import (
+	"fmt"
+	"math/rand"
+	"testing"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	tspb "github.com/golang/protobuf/ptypes/timestamp"
+	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
+)
+
+type fakeSource struct {
+	ret int64
+}
+
+func (f *fakeSource) Int63() int64    { return f.ret }
+func (f *fakeSource) Seed(seed int64) {}
+
+type fakeMsgRouter struct {
+	multiplier     int
+	partitionCount int
+}
+
+func (f *fakeMsgRouter) SetPartitionCount(count int) {
+	f.partitionCount = count
+}
+
+func (f *fakeMsgRouter) Route(orderingKey []byte) int {
+	return f.partitionCount * f.multiplier
+}
+
+func TestMessageToProto(t *testing.T) {
+	for _, tc := range []struct {
+		desc string
+		msg  *Message
+		want *pb.PubSubMessage
+	}{
+		{
+			desc: "valid: minimal",
+			msg: &Message{
+				Data: []byte("Hello world"),
+			},
+			want: &pb.PubSubMessage{
+				Data: []byte("Hello world"),
+			},
+		},
+		{
+			desc: "valid: filled",
+			msg: &Message{
+				Data: []byte("foo"),
+				Attributes: map[string]AttributeValues{
+					"attr1": [][]byte{
+						[]byte("val1"),
+						[]byte("val2"),
+					},
+				},
+				EventTime:   time.Unix(1555593697, 154358*1000),
+				OrderingKey: []byte("order"),
+			},
+			want: &pb.PubSubMessage{
+				Data: []byte("foo"),
+				Attributes: map[string]*pb.AttributeValues{
+					"attr1": {
+						Values: [][]byte{
+							[]byte("val1"),
+							[]byte("val2"),
+						},
+					},
+				},
+				EventTime: &tspb.Timestamp{
+					Seconds: 1555593697,
+					Nanos:   154358 * 1000,
+				},
+				Key: []byte("order"),
+			},
+		},
+	} {
+		t.Run(tc.desc, func(t *testing.T) {
+			got, err := tc.msg.toProto()
+			if err != nil {
+				t.Errorf("toProto() err = %v", err)
+			} else if !proto.Equal(got, tc.want) {
+				t.Errorf("toProto() got = %v\nwant = %v", got, tc.want)
+			}
+		})
+	}
+}
+
+func TestRoundRobinMsgRouter(t *testing.T) {
+	// Using the same msgRouter for each test run ensures that it reinitializes
+	// when the partition count changes.
+	source := &fakeSource{}
+	msgRouter := &roundRobinMsgRouter{rng: rand.New(source)}
+
+	for _, tc := range []struct {
+		partitionCount int
+		source         int64
+		want           []int
+	}{
+		{
+			partitionCount: 8,
+			source:         9,
+			want:           []int{1, 2, 3, 4, 5, 6, 7, 0, 1},
+		},
+		{
+			partitionCount: 5,
+			source:         2,
+			want:           []int{2, 3, 4, 0, 1, 2},
+		},
+	} {
+		t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) {
+			source.ret = tc.source
+			msgRouter.SetPartitionCount(tc.partitionCount)
+			for i, want := range tc.want {
+				got := msgRouter.Route([]byte("IGNORED"))
+				if got != want {
+					t.Errorf("i=%d: Route() = %d, want = %d", i, got, want)
+				}
+			}
+		})
+	}
+}
+
+func TestHashingMsgRouter(t *testing.T) {
+	// Using the same msgRouter for each test run ensures that it reinitializes
+	// when the partition count changes.
+	msgRouter := &hashingMsgRouter{}
+
+	keys := [][]byte{
+		[]byte("foo1"),
+		[]byte("foo2"),
+		[]byte("foo3"),
+		[]byte("foo4"),
+		[]byte("foo5"),
+	}
+
+	for _, tc := range []struct {
+		partitionCount int
+	}{
+		{partitionCount: 10},
+		{partitionCount: 5},
+	} {
+		t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) {
+			msgRouter.SetPartitionCount(tc.partitionCount)
+			for _, key := range keys {
+				p1 := msgRouter.Route(key)
+				p2 := msgRouter.Route(key)
+				if p1 != p2 {
+					t.Errorf("Route() returned different partitions for same key %v", key)
+				}
+				if p1 < 0 || p1 >= tc.partitionCount {
+					t.Errorf("Route() returned partition out of range: %v", p1)
+				}
+			}
+		})
+	}
+}
+
+func TestCompositeMsgRouter(t *testing.T) {
+	keyedRouter := &fakeMsgRouter{multiplier: 10}
+	keylessRouter := &fakeMsgRouter{multiplier: 100}
+	msgRouter := &compositeMsgRouter{
+		keyedRouter:   keyedRouter,
+		keylessRouter: keylessRouter,
+	}
+
+	for _, tc := range []struct {
+		desc           string
+		partitionCount int
+		key            []byte
+		want           int
+	}{
+		{
+			desc:           "key",
+			partitionCount: 2,
+			key:            []byte("foo"),
+			want:           20,
+		},
+		{
+			desc:           "nil key",
+			partitionCount: 8,
+			key:            nil,
+			want:           800,
+		},
+		{
+			desc:           "empty key",
+			partitionCount: 5,
+			key:            []byte{},
+			want:           500,
+		},
+	} {
+		t.Run(tc.desc, func(t *testing.T) {
+			msgRouter.SetPartitionCount(tc.partitionCount)
+			if got := msgRouter.Route(tc.key); got != tc.want {
+				t.Errorf("Route() = %d, want = %d", got, tc.want)
+			}
+		})
+	}
+}