feat(pubsublite): Pub/Sub Lite admin client (#3036)
Implements pubsublite.Client, which wraps the Pub/Sub Lite Admin Service. Includes integration tests for admin operations.
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 3a90efe..e139872 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -104,6 +104,7 @@
- Google Compute Engine Instance Groups API
- Kubernetes Engine API
- Cloud Error Reporting API
+- Pub/Sub Lite API
Next, create a Datastore database in the general project, and a Firestore
database in the Firestore project.
diff --git a/pubsublite/admin.go b/pubsublite/admin.go
new file mode 100644
index 0000000..4195c3f
--- /dev/null
+++ b/pubsublite/admin.go
@@ -0,0 +1,208 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package pubsublite
+
+import (
+ "context"
+
+ "google.golang.org/api/option"
+ "google.golang.org/api/option/internaloption"
+
+ vkit "cloud.google.com/go/pubsublite/apiv1"
+ pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
+)
+
+// AdminClient provides admin operations for Google Pub/Sub Lite resources
+// within a Google Cloud region. An AdminClient may be shared by multiple
+// goroutines.
+type AdminClient struct {
+ admin *vkit.AdminClient
+}
+
+// NewAdminClient creates a new Cloud Pub/Sub Lite client to perform admin
+// operations for resources within a given region.
+// See https://cloud.google.com/pubsub/lite/docs/locations for the list of
+// regions and zones where Google Pub/Sub Lite is available.
+func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) {
+ if err := validateRegion(region); err != nil {
+ return nil, err
+ }
+ options := []option.ClientOption{internaloption.WithDefaultEndpoint(region + "-pubsublite.googleapis.com:443")}
+ options = append(options, opts...)
+ admin, err := vkit.NewAdminClient(ctx, options...)
+ if err != nil {
+ return nil, err
+ }
+ return &AdminClient{admin: admin}, nil
+}
+
+// CreateTopic creates a new topic from the given config.
+func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*TopicConfig, error) {
+ req := &pb.CreateTopicRequest{
+ Parent: config.Name.location().String(),
+ Topic: config.toProto(),
+ TopicId: config.Name.TopicID,
+ }
+ topicpb, err := ac.admin.CreateTopic(ctx, req)
+ if err != nil {
+ return nil, err
+ }
+ return protoToTopicConfig(topicpb)
+}
+
+// UpdateTopic updates an existing topic from the given config and returns the
+// new topic config.
+func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpdate) (*TopicConfig, error) {
+ topicpb, err := ac.admin.UpdateTopic(ctx, config.toUpdateRequest())
+ if err != nil {
+ return nil, err
+ }
+ return protoToTopicConfig(topicpb)
+}
+
+// DeleteTopic deletes a topic.
+func (ac *AdminClient) DeleteTopic(ctx context.Context, topic TopicPath) error {
+ return ac.admin.DeleteTopic(ctx, &pb.DeleteTopicRequest{Name: topic.String()})
+}
+
+// Topic retrieves the configuration of a topic.
+func (ac *AdminClient) Topic(ctx context.Context, topic TopicPath) (*TopicConfig, error) {
+ topicpb, err := ac.admin.GetTopic(ctx, &pb.GetTopicRequest{Name: topic.String()})
+ if err != nil {
+ return nil, err
+ }
+ return protoToTopicConfig(topicpb)
+}
+
+// TopicPartitions returns the number of partitions for a topic.
+func (ac *AdminClient) TopicPartitions(ctx context.Context, topic TopicPath) (int, error) {
+ partitions, err := ac.admin.GetTopicPartitions(ctx, &pb.GetTopicPartitionsRequest{Name: topic.String()})
+ if err != nil {
+ return 0, err
+ }
+ return int(partitions.GetPartitionCount()), nil
+}
+
+// TopicSubscriptions retrieves the list of subscription paths for a topic.
+func (ac *AdminClient) TopicSubscriptions(ctx context.Context, topic TopicPath) (*SubscriptionPathIterator, error) {
+ subsPathIt := ac.admin.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{Name: topic.String()})
+ return &SubscriptionPathIterator{it: subsPathIt}, nil
+}
+
+// Topics retrieves the list of topic configs for a given project and zone.
+func (ac *AdminClient) Topics(ctx context.Context, location LocationPath) *TopicIterator {
+ return &TopicIterator{
+ it: ac.admin.ListTopics(ctx, &pb.ListTopicsRequest{Parent: location.String()}),
+ }
+}
+
+// CreateSubscription creates a new subscription from the given config.
+func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) {
+ req := &pb.CreateSubscriptionRequest{
+ Parent: config.Name.location().String(),
+ Subscription: config.toProto(),
+ SubscriptionId: config.Name.SubscriptionID,
+ }
+ subspb, err := ac.admin.CreateSubscription(ctx, req)
+ if err != nil {
+ return nil, err
+ }
+ return protoToSubscriptionConfig(subspb)
+}
+
+// UpdateSubscription updates an existing subscription from the given config and
+// returns the new subscription config.
+func (ac *AdminClient) UpdateSubscription(ctx context.Context, config SubscriptionConfigToUpdate) (*SubscriptionConfig, error) {
+ subspb, err := ac.admin.UpdateSubscription(ctx, config.toUpdateRequest())
+ if err != nil {
+ return nil, err
+ }
+ return protoToSubscriptionConfig(subspb)
+}
+
+// DeleteSubscription deletes a subscription.
+func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription SubscriptionPath) error {
+ return ac.admin.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Name: subscription.String()})
+}
+
+// Subscription retrieves the configuration of a subscription.
+func (ac *AdminClient) Subscription(ctx context.Context, subscription SubscriptionPath) (*SubscriptionConfig, error) {
+ subspb, err := ac.admin.GetSubscription(ctx, &pb.GetSubscriptionRequest{Name: subscription.String()})
+ if err != nil {
+ return nil, err
+ }
+ return protoToSubscriptionConfig(subspb)
+}
+
+// Subscriptions retrieves the list of subscription configs for a given project
+// and zone.
+func (ac *AdminClient) Subscriptions(ctx context.Context, location LocationPath) *SubscriptionIterator {
+ return &SubscriptionIterator{
+ it: ac.admin.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{Parent: location.String()}),
+ }
+}
+
+// Close releases any resources held by the client when it is no longer
+// required. If the client is available for the lifetime of the program, then
+// Close need not be called at exit.
+func (ac *AdminClient) Close() error {
+ return ac.admin.Close()
+}
+
+// TopicIterator is an iterator that returns a list of topic configs.
+type TopicIterator struct {
+ it *vkit.TopicIterator
+}
+
+// Next returns the next topic config. The second return value will be
+// iterator.Done if there are no more topic configs.
+func (t *TopicIterator) Next() (*TopicConfig, error) {
+ topicpb, err := t.it.Next()
+ if err != nil {
+ return nil, err
+ }
+ return protoToTopicConfig(topicpb)
+}
+
+// SubscriptionIterator is an iterator that returns a list of subscription
+// configs.
+type SubscriptionIterator struct {
+ it *vkit.SubscriptionIterator
+}
+
+// Next returns the next subscription config. The second return value will be
+// iterator.Done if there are no more subscription configs.
+func (s *SubscriptionIterator) Next() (*SubscriptionConfig, error) {
+ subspb, err := s.it.Next()
+ if err != nil {
+ return nil, err
+ }
+ return protoToSubscriptionConfig(subspb)
+}
+
+// SubscriptionPathIterator is an iterator that returns a list of subscription
+// paths.
+type SubscriptionPathIterator struct {
+ it *vkit.StringIterator
+}
+
+// Next returns the next subscription path. The second return value will be
+// iterator.Done if there are no more subscription paths.
+func (sp *SubscriptionPathIterator) Next() (SubscriptionPath, error) {
+ subsPath, err := sp.it.Next()
+ if err != nil {
+ return SubscriptionPath{}, err
+ }
+ return parseSubscriptionPath(subsPath)
+}
diff --git a/pubsublite/config.go b/pubsublite/config.go
index 4225900..e115c51 100644
--- a/pubsublite/config.go
+++ b/pubsublite/config.go
@@ -24,6 +24,11 @@
fmpb "google.golang.org/genproto/protobuf/field_mask"
)
+// InfiniteRetention is a sentinel used in topic configs to denote an infinite
+// retention duration (i.e. retain messages as long as there is available
+// storage).
+const InfiniteRetention = time.Duration(-1)
+
// TopicConfig describes the properties of a Google Pub/Sub Lite topic.
// See https://cloud.google.com/pubsub/lite/docs/topics for more information
// about how topics are configured.
@@ -33,37 +38,37 @@
// The number of partitions in the topic. Must be at least 1. Cannot be
// changed after creation.
- PartitionCount int64
+ PartitionCount int
// Publish throughput capacity per partition in MiB/s.
// Must be >= 4 and <= 16.
- PublishCapacityMiBPerSec int32
+ PublishCapacityMiBPerSec int
// Subscribe throughput capacity per partition in MiB/s.
// Must be >= 4 and <= 32.
- SubscribeCapacityMiBPerSec int32
+ SubscribeCapacityMiBPerSec int
// The provisioned storage, in bytes, per partition. If the number of bytes
// stored in any of the topic's partitions grows beyond this value, older
// messages will be dropped to make room for newer ones, regardless of the
- // value of `RetentionDuration`.
+ // value of `RetentionDuration`. Must be > 0.
PerPartitionBytes int64
- // How long a published message is retained. If unset, messages will be
- // retained as long as the bytes retained for each partition is below
- // `PerPartitionBytes`.
- RetentionDuration optional.Duration
+ // How long a published message is retained. If set to `InfiniteRetention`,
+ // messages will be retained as long as the bytes retained for each partition
+ // is below `PerPartitionBytes`. Otherwise, must be > 0.
+ RetentionDuration time.Duration
}
func (tc *TopicConfig) toProto() *pb.Topic {
topicpb := &pb.Topic{
Name: tc.Name.String(),
PartitionConfig: &pb.Topic_PartitionConfig{
- Count: tc.PartitionCount,
+ Count: int64(tc.PartitionCount),
Dimension: &pb.Topic_PartitionConfig_Capacity_{
Capacity: &pb.Topic_PartitionConfig_Capacity{
- PublishMibPerSec: tc.PublishCapacityMiBPerSec,
- SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
+ PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec),
+ SubscribeMibPerSec: int32(tc.SubscribeCapacityMiBPerSec),
},
},
},
@@ -71,17 +76,14 @@
PerPartitionBytes: tc.PerPartitionBytes,
},
}
- if tc.RetentionDuration != nil {
- duration := optional.ToDuration(tc.RetentionDuration)
- if duration >= 0 {
- topicpb.RetentionConfig.Period = ptypes.DurationProto(duration)
- }
+ if tc.RetentionDuration >= 0 {
+ topicpb.RetentionConfig.Period = ptypes.DurationProto(tc.RetentionDuration)
}
return topicpb
}
func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) {
- name, err := ParseTopicPath(t.GetName())
+ name, err := parseTopicPath(t.GetName())
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid topic name %q in topic config", t.GetName())
}
@@ -90,10 +92,11 @@
retentionCfg := t.GetRetentionConfig()
topic := &TopicConfig{
Name: name,
- PartitionCount: partitionCfg.GetCount(),
- PublishCapacityMiBPerSec: partitionCfg.GetCapacity().GetPublishMibPerSec(),
- SubscribeCapacityMiBPerSec: partitionCfg.GetCapacity().GetSubscribeMibPerSec(),
+ PartitionCount: int(partitionCfg.GetCount()),
+ PublishCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetPublishMibPerSec()),
+ SubscribeCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetSubscribeMibPerSec()),
PerPartitionBytes: retentionCfg.GetPerPartitionBytes(),
+ RetentionDuration: InfiniteRetention,
}
// An unset retention period proto denotes "infinite retention".
if retentionCfg.Period != nil {
@@ -106,28 +109,23 @@
return topic, nil
}
-// InfiniteRetention is sentinel used when updating topic configs to clear a
-// retention duration (i.e. retain messages as long as there is available
-// storage).
-const InfiniteRetention = time.Duration(-1)
-
// TopicConfigToUpdate specifies the properties to update for a topic.
type TopicConfigToUpdate struct {
// The full path of the topic to update. Required.
Name TopicPath
// If non-zero, will update the publish throughput capacity per partition.
- PublishCapacityMiBPerSec int32
+ PublishCapacityMiBPerSec int
// If non-zero, will update the subscribe throughput capacity per partition.
- SubscribeCapacityMiBPerSec int32
+ SubscribeCapacityMiBPerSec int
// If non-zero, will update the provisioned storage per partition.
PerPartitionBytes int64
// If specified, will update how long a published message is retained. To
// clear a retention duration (i.e. retain messages as long as there is
- // available storage), set this to `pubsublite.InfiniteRetention`.
+ // available storage), set this to `InfiniteRetention`.
RetentionDuration optional.Duration
}
@@ -137,8 +135,8 @@
PartitionConfig: &pb.Topic_PartitionConfig{
Dimension: &pb.Topic_PartitionConfig_Capacity_{
Capacity: &pb.Topic_PartitionConfig_Capacity{
- PublishMibPerSec: tc.PublishCapacityMiBPerSec,
- SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
+ PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec),
+ SubscribeMibPerSec: int32(tc.SubscribeCapacityMiBPerSec),
},
},
},
@@ -219,11 +217,11 @@
}
func protoToSubscriptionConfig(s *pb.Subscription) (*SubscriptionConfig, error) {
- name, err := ParseSubscriptionPath(s.GetName())
+ name, err := parseSubscriptionPath(s.GetName())
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid subscription name %q in subscription config", s.GetName())
}
- topic, err := ParseTopicPath(s.GetTopic())
+ topic, err := parseTopicPath(s.GetTopic())
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid topic name %q in subscription config", s.GetTopic())
}
diff --git a/pubsublite/config_test.go b/pubsublite/config_test.go
index b9d8ec8..9ec387b 100644
--- a/pubsublite/config_test.go
+++ b/pubsublite/config_test.go
@@ -92,6 +92,7 @@
PublishCapacityMiBPerSec: 4,
SubscribeCapacityMiBPerSec: 8,
PerPartitionBytes: 4294967296,
+ RetentionDuration: InfiniteRetention,
},
},
{
diff --git a/pubsublite/integration_test.go b/pubsublite/integration_test.go
new file mode 100644
index 0000000..3568f59
--- /dev/null
+++ b/pubsublite/integration_test.go
@@ -0,0 +1,272 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package pubsublite
+
+import (
+ "context"
+ "math/rand"
+ "testing"
+ "time"
+
+ "cloud.google.com/go/internal/testutil"
+ "cloud.google.com/go/internal/uid"
+ "google.golang.org/api/iterator"
+ "google.golang.org/api/option"
+
+ vkit "cloud.google.com/go/pubsublite/apiv1"
+)
+
+const gibi = 1 << 30
+
+var (
+ resourceIDs = uid.NewSpace("go-admin-test", nil)
+ rng *rand.Rand
+
+ // A random zone is selected for each integration test run.
+ supportedZones = []string{
+ "us-central1-a",
+ "us-central1-b",
+ "us-central1-c",
+ "europe-west1-b",
+ "europe-west1-d",
+ }
+)
+
+func initIntegrationTest(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Integration tests skipped in short mode")
+ }
+ if testutil.ProjID() == "" {
+ t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
+ }
+ rng = testutil.NewRand(time.Now())
+}
+
+func withGRPCHeadersAssertion(t *testing.T, opts ...option.ClientOption) []option.ClientOption {
+ grpcHeadersEnforcer := &testutil.HeadersEnforcer{
+ OnFailure: t.Errorf,
+ Checkers: []*testutil.HeaderChecker{
+ testutil.XGoogClientHeaderChecker,
+ },
+ }
+ return append(grpcHeadersEnforcer.CallOptions(), opts...)
+}
+
+func adminClient(ctx context.Context, t *testing.T, region string, opts ...option.ClientOption) *AdminClient {
+ ts := testutil.TokenSource(ctx, vkit.DefaultAuthScopes()...)
+ if ts == nil {
+ t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
+ }
+ opts = append(withGRPCHeadersAssertion(t, option.WithTokenSource(ts)), opts...)
+ admin, err := NewAdminClient(ctx, region, opts...)
+ if err != nil {
+ t.Fatalf("Failed to create admin client: %v", err)
+ }
+ return admin
+}
+
+func cleanUpTopic(ctx context.Context, t *testing.T, admin *AdminClient, name TopicPath) {
+ if err := admin.DeleteTopic(ctx, name); err != nil {
+ t.Errorf("Failed to delete topic %s: %v", name, err)
+ }
+}
+
+func cleanUpSubscription(ctx context.Context, t *testing.T, admin *AdminClient, name SubscriptionPath) {
+ if err := admin.DeleteSubscription(ctx, name); err != nil {
+ t.Errorf("Failed to delete subscription %s: %v", name, err)
+ }
+}
+
+func randomLiteZone() string {
+ return supportedZones[rng.Intn(len(supportedZones))]
+}
+
+func TestResourceAdminOperations(t *testing.T) {
+ initIntegrationTest(t)
+
+ ctx := context.Background()
+ proj := testutil.ProjID()
+ zone := randomLiteZone()
+ region, _ := ZoneToRegion(zone)
+ resourceID := resourceIDs.New()
+
+ locationPath := LocationPath{Project: proj, Zone: zone}
+ topicPath := TopicPath{Project: proj, Zone: zone, TopicID: resourceID}
+ subscriptionPath := SubscriptionPath{Project: proj, Zone: zone, SubscriptionID: resourceID}
+ t.Logf("Topic path: %s", topicPath)
+
+ admin := adminClient(ctx, t, region)
+ defer admin.Close()
+
+ // Topic admin operations.
+ newTopicConfig := &TopicConfig{
+ Name: topicPath,
+ PartitionCount: 2,
+ PublishCapacityMiBPerSec: 4,
+ SubscribeCapacityMiBPerSec: 4,
+ PerPartitionBytes: 30 * gibi,
+ RetentionDuration: time.Duration(24 * time.Hour),
+ }
+
+ gotTopicConfig, err := admin.CreateTopic(ctx, *newTopicConfig)
+ if err != nil {
+ t.Fatalf("Failed to create topic: %v", err)
+ }
+ defer cleanUpTopic(ctx, t, admin, topicPath)
+ if diff := testutil.Diff(gotTopicConfig, newTopicConfig); diff != "" {
+ t.Errorf("CreateTopic() got: -, want: +\n%s", diff)
+ }
+
+ if gotTopicConfig, err := admin.Topic(ctx, topicPath); err != nil {
+ t.Errorf("Failed to get topic: %v", err)
+ } else if diff := testutil.Diff(gotTopicConfig, newTopicConfig); diff != "" {
+ t.Errorf("Topic() got: -, want: +\n%s", diff)
+ }
+
+ if gotTopicPartitions, err := admin.TopicPartitions(ctx, topicPath); err != nil {
+ t.Errorf("Failed to get topic partitions: %v", err)
+ } else if gotTopicPartitions != newTopicConfig.PartitionCount {
+ t.Errorf("TopicPartitions() got: %v, want: %v", gotTopicPartitions, newTopicConfig.PartitionCount)
+ }
+
+ topicIt := admin.Topics(ctx, locationPath)
+ var foundTopic *TopicConfig
+ for {
+ topic, err := topicIt.Next()
+ if err == iterator.Done {
+ break
+ }
+ if testutil.Equal(topic.Name, topicPath) {
+ foundTopic = topic
+ break
+ }
+ }
+ if foundTopic == nil {
+ t.Error("Topics() did not return topic config")
+ } else if diff := testutil.Diff(foundTopic, newTopicConfig); diff != "" {
+ t.Errorf("Topics() found config: -, want: +\n%s", diff)
+ }
+
+ topicUpdate1 := TopicConfigToUpdate{
+ Name: topicPath,
+ PublishCapacityMiBPerSec: 6,
+ SubscribeCapacityMiBPerSec: 8,
+ }
+ wantUpdatedTopicConfig1 := &TopicConfig{
+ Name: topicPath,
+ PartitionCount: 2,
+ PublishCapacityMiBPerSec: 6,
+ SubscribeCapacityMiBPerSec: 8,
+ PerPartitionBytes: 30 * gibi,
+ RetentionDuration: time.Duration(24 * time.Hour),
+ }
+ if gotTopicConfig, err := admin.UpdateTopic(ctx, topicUpdate1); err != nil {
+ t.Errorf("Failed to update topic: %v", err)
+ } else if diff := testutil.Diff(gotTopicConfig, wantUpdatedTopicConfig1); diff != "" {
+ t.Errorf("UpdateTopic() got: -, want: +\n%s", diff)
+ }
+
+ topicUpdate2 := TopicConfigToUpdate{
+ Name: topicPath,
+ PerPartitionBytes: 35 * gibi,
+ RetentionDuration: InfiniteRetention,
+ }
+ wantUpdatedTopicConfig2 := &TopicConfig{
+ Name: topicPath,
+ PartitionCount: 2,
+ PublishCapacityMiBPerSec: 6,
+ SubscribeCapacityMiBPerSec: 8,
+ PerPartitionBytes: 35 * gibi,
+ RetentionDuration: InfiniteRetention,
+ }
+ if gotTopicConfig, err := admin.UpdateTopic(ctx, topicUpdate2); err != nil {
+ t.Errorf("Failed to update topic: %v", err)
+ } else if diff := testutil.Diff(gotTopicConfig, wantUpdatedTopicConfig2); diff != "" {
+ t.Errorf("UpdateTopic() got: -, want: +\n%s", diff)
+ }
+
+ // Subscription admin operations.
+ newSubsConfig := &SubscriptionConfig{
+ Name: subscriptionPath,
+ Topic: topicPath,
+ DeliveryRequirement: DeliverImmediately,
+ }
+
+ gotSubsConfig, err := admin.CreateSubscription(ctx, *newSubsConfig)
+ if err != nil {
+ t.Fatalf("Failed to create subscription: %v", err)
+ }
+ defer cleanUpSubscription(ctx, t, admin, subscriptionPath)
+ if diff := testutil.Diff(gotSubsConfig, newSubsConfig); diff != "" {
+ t.Errorf("CreateSubscription() got: -, want: +\n%s", diff)
+ }
+
+ if gotSubsConfig, err := admin.Subscription(ctx, subscriptionPath); err != nil {
+ t.Errorf("Failed to get subscription: %v", err)
+ } else if diff := testutil.Diff(gotSubsConfig, newSubsConfig); diff != "" {
+ t.Errorf("Subscription() got: -, want: +\n%s", diff)
+ }
+
+ subsIt := admin.Subscriptions(ctx, locationPath)
+ var foundSubs *SubscriptionConfig
+ for {
+ subs, err := subsIt.Next()
+ if err == iterator.Done {
+ break
+ }
+ if testutil.Equal(subs.Name, subscriptionPath) {
+ foundSubs = subs
+ break
+ }
+ }
+ if foundSubs == nil {
+ t.Error("Subscriptions() did not return subscription config")
+ } else if diff := testutil.Diff(foundSubs, gotSubsConfig); diff != "" {
+ t.Errorf("Subscriptions() found config: -, want: +\n%s", diff)
+ }
+
+ if subsPathIt, err := admin.TopicSubscriptions(ctx, topicPath); err != nil {
+ t.Errorf("Failed to list topic subscriptions: %v", err)
+ } else {
+ foundSubsPath := false
+ for {
+ subsPath, err := subsPathIt.Next()
+ if err == iterator.Done {
+ break
+ }
+ if testutil.Equal(subsPath, subscriptionPath) {
+ foundSubsPath = true
+ break
+ }
+ }
+ if !foundSubsPath {
+ t.Error("TopicSubscriptions() did not return subscription path")
+ }
+ }
+
+ subsUpdate := SubscriptionConfigToUpdate{
+ Name: subscriptionPath,
+ DeliveryRequirement: DeliverAfterStored,
+ }
+ wantUpdatedSubsConfig := &SubscriptionConfig{
+ Name: subscriptionPath,
+ Topic: topicPath,
+ DeliveryRequirement: DeliverAfterStored,
+ }
+ if gotSubsConfig, err := admin.UpdateSubscription(ctx, subsUpdate); err != nil {
+ t.Errorf("Failed to update subscription: %v", err)
+ } else if diff := testutil.Diff(gotSubsConfig, wantUpdatedSubsConfig); diff != "" {
+ t.Errorf("UpdateSubscription() got: -, want: +\n%s", diff)
+ }
+}
diff --git a/pubsublite/types.go b/pubsublite/types.go
index a120c49..40104d2 100644
--- a/pubsublite/types.go
+++ b/pubsublite/types.go
@@ -48,6 +48,8 @@
Zone string
// The ID of the Google Pub/Sub Lite topic, for example "my-topic-name".
+ // See https://cloud.google.com/pubsub/docs/admin#resource_names for more
+ // information.
TopicID string
}
@@ -55,16 +57,15 @@
return fmt.Sprintf("projects/%s/locations/%s/topics/%s", t.Project, t.Zone, t.TopicID)
}
-// Location returns the path of the parent location.
-func (t TopicPath) Location() LocationPath {
+func (t TopicPath) location() LocationPath {
return LocationPath{Project: t.Project, Zone: t.Zone}
}
var topicPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/topics/([^/]+)$`)
-// ParseTopicPath parses the full path of a Google Pub/Sub Lite topic, which
+// parseTopicPath parses the full path of a Google Pub/Sub Lite topic, which
// should have the format: `projects/{project}/locations/{zone}/topics/{id}`.
-func ParseTopicPath(input string) (TopicPath, error) {
+func parseTopicPath(input string) (TopicPath, error) {
parts := topicPathRE.FindStringSubmatch(input)
if len(parts) < 4 {
return TopicPath{}, fmt.Errorf("pubsublite: invalid topic path %q", input)
@@ -87,6 +88,8 @@
// The ID of the Google Pub/Sub Lite subscription, for example
// "my-subscription-name".
+ // See https://cloud.google.com/pubsub/docs/admin#resource_names for more
+ // information.
SubscriptionID string
}
@@ -94,17 +97,16 @@
return fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", s.Project, s.Zone, s.SubscriptionID)
}
-// Location returns the path of the parent location.
-func (s SubscriptionPath) Location() LocationPath {
+func (s SubscriptionPath) location() LocationPath {
return LocationPath{Project: s.Project, Zone: s.Zone}
}
var subsPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/subscriptions/([^/]+)$`)
-// ParseSubscriptionPath parses the full path of a Google Pub/Sub Lite
+// parseSubscriptionPath parses the full path of a Google Pub/Sub Lite
// subscription, which should have the format:
// `projects/{project}/locations/{zone}/subscriptions/{id}`.
-func ParseSubscriptionPath(input string) (SubscriptionPath, error) {
+func parseSubscriptionPath(input string) (SubscriptionPath, error) {
parts := subsPathRE.FindStringSubmatch(input)
if len(parts) < 4 {
return SubscriptionPath{}, fmt.Errorf("pubsublite: invalid subscription path %q", input)
@@ -112,10 +114,10 @@
return SubscriptionPath{Project: parts[1], Zone: parts[2], SubscriptionID: parts[3]}, nil
}
-// ValidateZone verifies that the `input` string has the format of a valid
+// validateZone verifies that the `input` string has the format of a valid
// Google Cloud zone. An example zone is "europe-west1-b".
// See https://cloud.google.com/compute/docs/regions-zones for more information.
-func ValidateZone(input string) error {
+func validateZone(input string) error {
parts := strings.Split(input, "-")
if len(parts) != 3 {
return fmt.Errorf("pubsublite: invalid zone %q", input)
@@ -123,10 +125,10 @@
return nil
}
-// ValidateRegion verifies that the `input` string has the format of a valid
+// validateRegion verifies that the `input` string has the format of a valid
// Google Cloud region. An example region is "europe-west1".
// See https://cloud.google.com/compute/docs/regions-zones for more information.
-func ValidateRegion(input string) error {
+func validateRegion(input string) error {
parts := strings.Split(input, "-")
if len(parts) != 2 {
return fmt.Errorf("pubsublite: invalid region %q", input)
@@ -136,7 +138,7 @@
// ZoneToRegion returns the region that the given zone is in.
func ZoneToRegion(zone string) (string, error) {
- if err := ValidateZone(zone); err != nil {
+ if err := validateZone(zone); err != nil {
return "", err
}
return zone[0:strings.LastIndex(zone, "-")], nil
diff --git a/pubsublite/types_test.go b/pubsublite/types_test.go
index c9a9871..18f02b9 100644
--- a/pubsublite/types_test.go
+++ b/pubsublite/types_test.go
@@ -64,9 +64,9 @@
},
} {
t.Run(tc.desc, func(t *testing.T) {
- gotPath, gotErr := ParseTopicPath(tc.input)
+ gotPath, gotErr := parseTopicPath(tc.input)
if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr {
- t.Errorf("ParseTopicPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr)
+ t.Errorf("parseTopicPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr)
}
})
}
@@ -121,9 +121,9 @@
},
} {
t.Run(tc.desc, func(t *testing.T) {
- gotPath, gotErr := ParseSubscriptionPath(tc.input)
+ gotPath, gotErr := parseSubscriptionPath(tc.input)
if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr {
- t.Errorf("ParseSubscriptionPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr)
+ t.Errorf("parseSubscriptionPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr)
}
})
}
@@ -152,9 +152,9 @@
},
} {
t.Run(tc.desc, func(t *testing.T) {
- err := ValidateZone(tc.input)
+ err := validateZone(tc.input)
if (err != nil) != tc.wantErr {
- t.Errorf("ValidateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
+ t.Errorf("validateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
}
})
}
@@ -183,9 +183,9 @@
},
} {
t.Run(tc.desc, func(t *testing.T) {
- err := ValidateRegion(tc.input)
+ err := validateRegion(tc.input)
if (err != nil) != tc.wantErr {
- t.Errorf("ValidateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
+ t.Errorf("validateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
}
})
}