feat(pubsublite): Pub/Sub Lite admin client (#3036)

Implements pubsublite.Client, which wraps the Pub/Sub Lite Admin Service. Includes integration tests for admin operations.
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 3a90efe..e139872 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -104,6 +104,7 @@
 - Google Compute Engine Instance Groups API
 - Kubernetes Engine API
 - Cloud Error Reporting API
+- Pub/Sub Lite API
 
 Next, create a Datastore database in the general project, and a Firestore
 database in the Firestore project.
diff --git a/pubsublite/admin.go b/pubsublite/admin.go
new file mode 100644
index 0000000..4195c3f
--- /dev/null
+++ b/pubsublite/admin.go
@@ -0,0 +1,208 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package pubsublite
+
+import (
+	"context"
+
+	"google.golang.org/api/option"
+	"google.golang.org/api/option/internaloption"
+
+	vkit "cloud.google.com/go/pubsublite/apiv1"
+	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
+)
+
+// AdminClient provides admin operations for Google Pub/Sub Lite resources
+// within a Google Cloud region. An AdminClient may be shared by multiple
+// goroutines.
+type AdminClient struct {
+	admin *vkit.AdminClient
+}
+
+// NewAdminClient creates a new Cloud Pub/Sub Lite client to perform admin
+// operations for resources within a given region.
+// See https://cloud.google.com/pubsub/lite/docs/locations for the list of
+// regions and zones where Google Pub/Sub Lite is available.
+func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) {
+	if err := validateRegion(region); err != nil {
+		return nil, err
+	}
+	options := []option.ClientOption{internaloption.WithDefaultEndpoint(region + "-pubsublite.googleapis.com:443")}
+	options = append(options, opts...)
+	admin, err := vkit.NewAdminClient(ctx, options...)
+	if err != nil {
+		return nil, err
+	}
+	return &AdminClient{admin: admin}, nil
+}
+
+// CreateTopic creates a new topic from the given config.
+func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*TopicConfig, error) {
+	req := &pb.CreateTopicRequest{
+		Parent:  config.Name.location().String(),
+		Topic:   config.toProto(),
+		TopicId: config.Name.TopicID,
+	}
+	topicpb, err := ac.admin.CreateTopic(ctx, req)
+	if err != nil {
+		return nil, err
+	}
+	return protoToTopicConfig(topicpb)
+}
+
+// UpdateTopic updates an existing topic from the given config and returns the
+// new topic config.
+func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpdate) (*TopicConfig, error) {
+	topicpb, err := ac.admin.UpdateTopic(ctx, config.toUpdateRequest())
+	if err != nil {
+		return nil, err
+	}
+	return protoToTopicConfig(topicpb)
+}
+
+// DeleteTopic deletes a topic.
+func (ac *AdminClient) DeleteTopic(ctx context.Context, topic TopicPath) error {
+	return ac.admin.DeleteTopic(ctx, &pb.DeleteTopicRequest{Name: topic.String()})
+}
+
+// Topic retrieves the configuration of a topic.
+func (ac *AdminClient) Topic(ctx context.Context, topic TopicPath) (*TopicConfig, error) {
+	topicpb, err := ac.admin.GetTopic(ctx, &pb.GetTopicRequest{Name: topic.String()})
+	if err != nil {
+		return nil, err
+	}
+	return protoToTopicConfig(topicpb)
+}
+
+// TopicPartitions returns the number of partitions for a topic.
+func (ac *AdminClient) TopicPartitions(ctx context.Context, topic TopicPath) (int, error) {
+	partitions, err := ac.admin.GetTopicPartitions(ctx, &pb.GetTopicPartitionsRequest{Name: topic.String()})
+	if err != nil {
+		return 0, err
+	}
+	return int(partitions.GetPartitionCount()), nil
+}
+
+// TopicSubscriptions retrieves the list of subscription paths for a topic.
+func (ac *AdminClient) TopicSubscriptions(ctx context.Context, topic TopicPath) (*SubscriptionPathIterator, error) {
+	subsPathIt := ac.admin.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{Name: topic.String()})
+	return &SubscriptionPathIterator{it: subsPathIt}, nil
+}
+
+// Topics retrieves the list of topic configs for a given project and zone.
+func (ac *AdminClient) Topics(ctx context.Context, location LocationPath) *TopicIterator {
+	return &TopicIterator{
+		it: ac.admin.ListTopics(ctx, &pb.ListTopicsRequest{Parent: location.String()}),
+	}
+}
+
+// CreateSubscription creates a new subscription from the given config.
+func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) {
+	req := &pb.CreateSubscriptionRequest{
+		Parent:         config.Name.location().String(),
+		Subscription:   config.toProto(),
+		SubscriptionId: config.Name.SubscriptionID,
+	}
+	subspb, err := ac.admin.CreateSubscription(ctx, req)
+	if err != nil {
+		return nil, err
+	}
+	return protoToSubscriptionConfig(subspb)
+}
+
+// UpdateSubscription updates an existing subscription from the given config and
+// returns the new subscription config.
+func (ac *AdminClient) UpdateSubscription(ctx context.Context, config SubscriptionConfigToUpdate) (*SubscriptionConfig, error) {
+	subspb, err := ac.admin.UpdateSubscription(ctx, config.toUpdateRequest())
+	if err != nil {
+		return nil, err
+	}
+	return protoToSubscriptionConfig(subspb)
+}
+
+// DeleteSubscription deletes a subscription.
+func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription SubscriptionPath) error {
+	return ac.admin.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Name: subscription.String()})
+}
+
+// Subscription retrieves the configuration of a subscription.
+func (ac *AdminClient) Subscription(ctx context.Context, subscription SubscriptionPath) (*SubscriptionConfig, error) {
+	subspb, err := ac.admin.GetSubscription(ctx, &pb.GetSubscriptionRequest{Name: subscription.String()})
+	if err != nil {
+		return nil, err
+	}
+	return protoToSubscriptionConfig(subspb)
+}
+
+// Subscriptions retrieves the list of subscription configs for a given project
+// and zone.
+func (ac *AdminClient) Subscriptions(ctx context.Context, location LocationPath) *SubscriptionIterator {
+	return &SubscriptionIterator{
+		it: ac.admin.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{Parent: location.String()}),
+	}
+}
+
+// Close releases any resources held by the client when it is no longer
+// required. If the client is available for the lifetime of the program, then
+// Close need not be called at exit.
+func (ac *AdminClient) Close() error {
+	return ac.admin.Close()
+}
+
+// TopicIterator is an iterator that returns a list of topic configs.
+type TopicIterator struct {
+	it *vkit.TopicIterator
+}
+
+// Next returns the next topic config. The second return value will be
+// iterator.Done if there are no more topic configs.
+func (t *TopicIterator) Next() (*TopicConfig, error) {
+	topicpb, err := t.it.Next()
+	if err != nil {
+		return nil, err
+	}
+	return protoToTopicConfig(topicpb)
+}
+
+// SubscriptionIterator is an iterator that returns a list of subscription
+// configs.
+type SubscriptionIterator struct {
+	it *vkit.SubscriptionIterator
+}
+
+// Next returns the next subscription config. The second return value will be
+// iterator.Done if there are no more subscription configs.
+func (s *SubscriptionIterator) Next() (*SubscriptionConfig, error) {
+	subspb, err := s.it.Next()
+	if err != nil {
+		return nil, err
+	}
+	return protoToSubscriptionConfig(subspb)
+}
+
+// SubscriptionPathIterator is an iterator that returns a list of subscription
+// paths.
+type SubscriptionPathIterator struct {
+	it *vkit.StringIterator
+}
+
+// Next returns the next subscription path. The second return value will be
+// iterator.Done if there are no more subscription paths.
+func (sp *SubscriptionPathIterator) Next() (SubscriptionPath, error) {
+	subsPath, err := sp.it.Next()
+	if err != nil {
+		return SubscriptionPath{}, err
+	}
+	return parseSubscriptionPath(subsPath)
+}
diff --git a/pubsublite/config.go b/pubsublite/config.go
index 4225900..e115c51 100644
--- a/pubsublite/config.go
+++ b/pubsublite/config.go
@@ -24,6 +24,11 @@
 	fmpb "google.golang.org/genproto/protobuf/field_mask"
 )
 
+// InfiniteRetention is a sentinel used in topic configs to denote an infinite
+// retention duration (i.e. retain messages as long as there is available
+// storage).
+const InfiniteRetention = time.Duration(-1)
+
 // TopicConfig describes the properties of a Google Pub/Sub Lite topic.
 // See https://cloud.google.com/pubsub/lite/docs/topics for more information
 // about how topics are configured.
@@ -33,37 +38,37 @@
 
 	// The number of partitions in the topic. Must be at least 1. Cannot be
 	// changed after creation.
-	PartitionCount int64
+	PartitionCount int
 
 	// Publish throughput capacity per partition in MiB/s.
 	// Must be >= 4 and <= 16.
-	PublishCapacityMiBPerSec int32
+	PublishCapacityMiBPerSec int
 
 	// Subscribe throughput capacity per partition in MiB/s.
 	// Must be >= 4 and <= 32.
-	SubscribeCapacityMiBPerSec int32
+	SubscribeCapacityMiBPerSec int
 
 	// The provisioned storage, in bytes, per partition. If the number of bytes
 	// stored in any of the topic's partitions grows beyond this value, older
 	// messages will be dropped to make room for newer ones, regardless of the
-	// value of `RetentionDuration`.
+	// value of `RetentionDuration`. Must be > 0.
 	PerPartitionBytes int64
 
-	// How long a published message is retained. If unset, messages will be
-	// retained as long as the bytes retained for each partition is below
-	// `PerPartitionBytes`.
-	RetentionDuration optional.Duration
+	// How long a published message is retained. If set to `InfiniteRetention`,
+	// messages will be retained as long as the bytes retained for each partition
+	// is below `PerPartitionBytes`. Otherwise, must be > 0.
+	RetentionDuration time.Duration
 }
 
 func (tc *TopicConfig) toProto() *pb.Topic {
 	topicpb := &pb.Topic{
 		Name: tc.Name.String(),
 		PartitionConfig: &pb.Topic_PartitionConfig{
-			Count: tc.PartitionCount,
+			Count: int64(tc.PartitionCount),
 			Dimension: &pb.Topic_PartitionConfig_Capacity_{
 				Capacity: &pb.Topic_PartitionConfig_Capacity{
-					PublishMibPerSec:   tc.PublishCapacityMiBPerSec,
-					SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
+					PublishMibPerSec:   int32(tc.PublishCapacityMiBPerSec),
+					SubscribeMibPerSec: int32(tc.SubscribeCapacityMiBPerSec),
 				},
 			},
 		},
@@ -71,17 +76,14 @@
 			PerPartitionBytes: tc.PerPartitionBytes,
 		},
 	}
-	if tc.RetentionDuration != nil {
-		duration := optional.ToDuration(tc.RetentionDuration)
-		if duration >= 0 {
-			topicpb.RetentionConfig.Period = ptypes.DurationProto(duration)
-		}
+	if tc.RetentionDuration >= 0 {
+		topicpb.RetentionConfig.Period = ptypes.DurationProto(tc.RetentionDuration)
 	}
 	return topicpb
 }
 
 func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) {
-	name, err := ParseTopicPath(t.GetName())
+	name, err := parseTopicPath(t.GetName())
 	if err != nil {
 		return nil, fmt.Errorf("pubsublite: invalid topic name %q in topic config", t.GetName())
 	}
@@ -90,10 +92,11 @@
 	retentionCfg := t.GetRetentionConfig()
 	topic := &TopicConfig{
 		Name:                       name,
-		PartitionCount:             partitionCfg.GetCount(),
-		PublishCapacityMiBPerSec:   partitionCfg.GetCapacity().GetPublishMibPerSec(),
-		SubscribeCapacityMiBPerSec: partitionCfg.GetCapacity().GetSubscribeMibPerSec(),
+		PartitionCount:             int(partitionCfg.GetCount()),
+		PublishCapacityMiBPerSec:   int(partitionCfg.GetCapacity().GetPublishMibPerSec()),
+		SubscribeCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetSubscribeMibPerSec()),
 		PerPartitionBytes:          retentionCfg.GetPerPartitionBytes(),
+		RetentionDuration:          InfiniteRetention,
 	}
 	// An unset retention period proto denotes "infinite retention".
 	if retentionCfg.Period != nil {
@@ -106,28 +109,23 @@
 	return topic, nil
 }
 
-// InfiniteRetention is sentinel used when updating topic configs to clear a
-// retention duration (i.e. retain messages as long as there is available
-// storage).
-const InfiniteRetention = time.Duration(-1)
-
 // TopicConfigToUpdate specifies the properties to update for a topic.
 type TopicConfigToUpdate struct {
 	// The full path of the topic to update. Required.
 	Name TopicPath
 
 	// If non-zero, will update the publish throughput capacity per partition.
-	PublishCapacityMiBPerSec int32
+	PublishCapacityMiBPerSec int
 
 	// If non-zero, will update the subscribe throughput capacity per partition.
-	SubscribeCapacityMiBPerSec int32
+	SubscribeCapacityMiBPerSec int
 
 	// If non-zero, will update the provisioned storage per partition.
 	PerPartitionBytes int64
 
 	// If specified, will update how long a published message is retained. To
 	// clear a retention duration (i.e. retain messages as long as there is
-	// available storage), set this to `pubsublite.InfiniteRetention`.
+	// available storage), set this to `InfiniteRetention`.
 	RetentionDuration optional.Duration
 }
 
@@ -137,8 +135,8 @@
 		PartitionConfig: &pb.Topic_PartitionConfig{
 			Dimension: &pb.Topic_PartitionConfig_Capacity_{
 				Capacity: &pb.Topic_PartitionConfig_Capacity{
-					PublishMibPerSec:   tc.PublishCapacityMiBPerSec,
-					SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
+					PublishMibPerSec:   int32(tc.PublishCapacityMiBPerSec),
+					SubscribeMibPerSec: int32(tc.SubscribeCapacityMiBPerSec),
 				},
 			},
 		},
@@ -219,11 +217,11 @@
 }
 
 func protoToSubscriptionConfig(s *pb.Subscription) (*SubscriptionConfig, error) {
-	name, err := ParseSubscriptionPath(s.GetName())
+	name, err := parseSubscriptionPath(s.GetName())
 	if err != nil {
 		return nil, fmt.Errorf("pubsublite: invalid subscription name %q in subscription config", s.GetName())
 	}
-	topic, err := ParseTopicPath(s.GetTopic())
+	topic, err := parseTopicPath(s.GetTopic())
 	if err != nil {
 		return nil, fmt.Errorf("pubsublite: invalid topic name %q in subscription config", s.GetTopic())
 	}
diff --git a/pubsublite/config_test.go b/pubsublite/config_test.go
index b9d8ec8..9ec387b 100644
--- a/pubsublite/config_test.go
+++ b/pubsublite/config_test.go
@@ -92,6 +92,7 @@
 				PublishCapacityMiBPerSec:   4,
 				SubscribeCapacityMiBPerSec: 8,
 				PerPartitionBytes:          4294967296,
+				RetentionDuration:          InfiniteRetention,
 			},
 		},
 		{
diff --git a/pubsublite/integration_test.go b/pubsublite/integration_test.go
new file mode 100644
index 0000000..3568f59
--- /dev/null
+++ b/pubsublite/integration_test.go
@@ -0,0 +1,272 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+
+package pubsublite
+
+import (
+	"context"
+	"math/rand"
+	"testing"
+	"time"
+
+	"cloud.google.com/go/internal/testutil"
+	"cloud.google.com/go/internal/uid"
+	"google.golang.org/api/iterator"
+	"google.golang.org/api/option"
+
+	vkit "cloud.google.com/go/pubsublite/apiv1"
+)
+
+const gibi = 1 << 30
+
+var (
+	resourceIDs = uid.NewSpace("go-admin-test", nil)
+	rng         *rand.Rand
+
+	// A random zone is selected for each integration test run.
+	supportedZones = []string{
+		"us-central1-a",
+		"us-central1-b",
+		"us-central1-c",
+		"europe-west1-b",
+		"europe-west1-d",
+	}
+)
+
+func initIntegrationTest(t *testing.T) {
+	if testing.Short() {
+		t.Skip("Integration tests skipped in short mode")
+	}
+	if testutil.ProjID() == "" {
+		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
+	}
+	rng = testutil.NewRand(time.Now())
+}
+
+func withGRPCHeadersAssertion(t *testing.T, opts ...option.ClientOption) []option.ClientOption {
+	grpcHeadersEnforcer := &testutil.HeadersEnforcer{
+		OnFailure: t.Errorf,
+		Checkers: []*testutil.HeaderChecker{
+			testutil.XGoogClientHeaderChecker,
+		},
+	}
+	return append(grpcHeadersEnforcer.CallOptions(), opts...)
+}
+
+func adminClient(ctx context.Context, t *testing.T, region string, opts ...option.ClientOption) *AdminClient {
+	ts := testutil.TokenSource(ctx, vkit.DefaultAuthScopes()...)
+	if ts == nil {
+		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
+	}
+	opts = append(withGRPCHeadersAssertion(t, option.WithTokenSource(ts)), opts...)
+	admin, err := NewAdminClient(ctx, region, opts...)
+	if err != nil {
+		t.Fatalf("Failed to create admin client: %v", err)
+	}
+	return admin
+}
+
+func cleanUpTopic(ctx context.Context, t *testing.T, admin *AdminClient, name TopicPath) {
+	if err := admin.DeleteTopic(ctx, name); err != nil {
+		t.Errorf("Failed to delete topic %s: %v", name, err)
+	}
+}
+
+func cleanUpSubscription(ctx context.Context, t *testing.T, admin *AdminClient, name SubscriptionPath) {
+	if err := admin.DeleteSubscription(ctx, name); err != nil {
+		t.Errorf("Failed to delete subscription %s: %v", name, err)
+	}
+}
+
+func randomLiteZone() string {
+	return supportedZones[rng.Intn(len(supportedZones))]
+}
+
+func TestResourceAdminOperations(t *testing.T) {
+	initIntegrationTest(t)
+
+	ctx := context.Background()
+	proj := testutil.ProjID()
+	zone := randomLiteZone()
+	region, _ := ZoneToRegion(zone)
+	resourceID := resourceIDs.New()
+
+	locationPath := LocationPath{Project: proj, Zone: zone}
+	topicPath := TopicPath{Project: proj, Zone: zone, TopicID: resourceID}
+	subscriptionPath := SubscriptionPath{Project: proj, Zone: zone, SubscriptionID: resourceID}
+	t.Logf("Topic path: %s", topicPath)
+
+	admin := adminClient(ctx, t, region)
+	defer admin.Close()
+
+	// Topic admin operations.
+	newTopicConfig := &TopicConfig{
+		Name:                       topicPath,
+		PartitionCount:             2,
+		PublishCapacityMiBPerSec:   4,
+		SubscribeCapacityMiBPerSec: 4,
+		PerPartitionBytes:          30 * gibi,
+		RetentionDuration:          time.Duration(24 * time.Hour),
+	}
+
+	gotTopicConfig, err := admin.CreateTopic(ctx, *newTopicConfig)
+	if err != nil {
+		t.Fatalf("Failed to create topic: %v", err)
+	}
+	defer cleanUpTopic(ctx, t, admin, topicPath)
+	if diff := testutil.Diff(gotTopicConfig, newTopicConfig); diff != "" {
+		t.Errorf("CreateTopic() got: -, want: +\n%s", diff)
+	}
+
+	if gotTopicConfig, err := admin.Topic(ctx, topicPath); err != nil {
+		t.Errorf("Failed to get topic: %v", err)
+	} else if diff := testutil.Diff(gotTopicConfig, newTopicConfig); diff != "" {
+		t.Errorf("Topic() got: -, want: +\n%s", diff)
+	}
+
+	if gotTopicPartitions, err := admin.TopicPartitions(ctx, topicPath); err != nil {
+		t.Errorf("Failed to get topic partitions: %v", err)
+	} else if gotTopicPartitions != newTopicConfig.PartitionCount {
+		t.Errorf("TopicPartitions() got: %v, want: %v", gotTopicPartitions, newTopicConfig.PartitionCount)
+	}
+
+	topicIt := admin.Topics(ctx, locationPath)
+	var foundTopic *TopicConfig
+	for {
+		topic, err := topicIt.Next()
+		if err == iterator.Done {
+			break
+		}
+		if testutil.Equal(topic.Name, topicPath) {
+			foundTopic = topic
+			break
+		}
+	}
+	if foundTopic == nil {
+		t.Error("Topics() did not return topic config")
+	} else if diff := testutil.Diff(foundTopic, newTopicConfig); diff != "" {
+		t.Errorf("Topics() found config: -, want: +\n%s", diff)
+	}
+
+	topicUpdate1 := TopicConfigToUpdate{
+		Name:                       topicPath,
+		PublishCapacityMiBPerSec:   6,
+		SubscribeCapacityMiBPerSec: 8,
+	}
+	wantUpdatedTopicConfig1 := &TopicConfig{
+		Name:                       topicPath,
+		PartitionCount:             2,
+		PublishCapacityMiBPerSec:   6,
+		SubscribeCapacityMiBPerSec: 8,
+		PerPartitionBytes:          30 * gibi,
+		RetentionDuration:          time.Duration(24 * time.Hour),
+	}
+	if gotTopicConfig, err := admin.UpdateTopic(ctx, topicUpdate1); err != nil {
+		t.Errorf("Failed to update topic: %v", err)
+	} else if diff := testutil.Diff(gotTopicConfig, wantUpdatedTopicConfig1); diff != "" {
+		t.Errorf("UpdateTopic() got: -, want: +\n%s", diff)
+	}
+
+	topicUpdate2 := TopicConfigToUpdate{
+		Name:              topicPath,
+		PerPartitionBytes: 35 * gibi,
+		RetentionDuration: InfiniteRetention,
+	}
+	wantUpdatedTopicConfig2 := &TopicConfig{
+		Name:                       topicPath,
+		PartitionCount:             2,
+		PublishCapacityMiBPerSec:   6,
+		SubscribeCapacityMiBPerSec: 8,
+		PerPartitionBytes:          35 * gibi,
+		RetentionDuration:          InfiniteRetention,
+	}
+	if gotTopicConfig, err := admin.UpdateTopic(ctx, topicUpdate2); err != nil {
+		t.Errorf("Failed to update topic: %v", err)
+	} else if diff := testutil.Diff(gotTopicConfig, wantUpdatedTopicConfig2); diff != "" {
+		t.Errorf("UpdateTopic() got: -, want: +\n%s", diff)
+	}
+
+	// Subscription admin operations.
+	newSubsConfig := &SubscriptionConfig{
+		Name:                subscriptionPath,
+		Topic:               topicPath,
+		DeliveryRequirement: DeliverImmediately,
+	}
+
+	gotSubsConfig, err := admin.CreateSubscription(ctx, *newSubsConfig)
+	if err != nil {
+		t.Fatalf("Failed to create subscription: %v", err)
+	}
+	defer cleanUpSubscription(ctx, t, admin, subscriptionPath)
+	if diff := testutil.Diff(gotSubsConfig, newSubsConfig); diff != "" {
+		t.Errorf("CreateSubscription() got: -, want: +\n%s", diff)
+	}
+
+	if gotSubsConfig, err := admin.Subscription(ctx, subscriptionPath); err != nil {
+		t.Errorf("Failed to get subscription: %v", err)
+	} else if diff := testutil.Diff(gotSubsConfig, newSubsConfig); diff != "" {
+		t.Errorf("Subscription() got: -, want: +\n%s", diff)
+	}
+
+	subsIt := admin.Subscriptions(ctx, locationPath)
+	var foundSubs *SubscriptionConfig
+	for {
+		subs, err := subsIt.Next()
+		if err == iterator.Done {
+			break
+		}
+		if testutil.Equal(subs.Name, subscriptionPath) {
+			foundSubs = subs
+			break
+		}
+	}
+	if foundSubs == nil {
+		t.Error("Subscriptions() did not return subscription config")
+	} else if diff := testutil.Diff(foundSubs, gotSubsConfig); diff != "" {
+		t.Errorf("Subscriptions() found config: -, want: +\n%s", diff)
+	}
+
+	if subsPathIt, err := admin.TopicSubscriptions(ctx, topicPath); err != nil {
+		t.Errorf("Failed to list topic subscriptions: %v", err)
+	} else {
+		foundSubsPath := false
+		for {
+			subsPath, err := subsPathIt.Next()
+			if err == iterator.Done {
+				break
+			}
+			if testutil.Equal(subsPath, subscriptionPath) {
+				foundSubsPath = true
+				break
+			}
+		}
+		if !foundSubsPath {
+			t.Error("TopicSubscriptions() did not return subscription path")
+		}
+	}
+
+	subsUpdate := SubscriptionConfigToUpdate{
+		Name:                subscriptionPath,
+		DeliveryRequirement: DeliverAfterStored,
+	}
+	wantUpdatedSubsConfig := &SubscriptionConfig{
+		Name:                subscriptionPath,
+		Topic:               topicPath,
+		DeliveryRequirement: DeliverAfterStored,
+	}
+	if gotSubsConfig, err := admin.UpdateSubscription(ctx, subsUpdate); err != nil {
+		t.Errorf("Failed to update subscription: %v", err)
+	} else if diff := testutil.Diff(gotSubsConfig, wantUpdatedSubsConfig); diff != "" {
+		t.Errorf("UpdateSubscription() got: -, want: +\n%s", diff)
+	}
+}
diff --git a/pubsublite/types.go b/pubsublite/types.go
index a120c49..40104d2 100644
--- a/pubsublite/types.go
+++ b/pubsublite/types.go
@@ -48,6 +48,8 @@
 	Zone string
 
 	// The ID of the Google Pub/Sub Lite topic, for example "my-topic-name".
+	// See https://cloud.google.com/pubsub/docs/admin#resource_names for more
+	// information.
 	TopicID string
 }
 
@@ -55,16 +57,15 @@
 	return fmt.Sprintf("projects/%s/locations/%s/topics/%s", t.Project, t.Zone, t.TopicID)
 }
 
-// Location returns the path of the parent location.
-func (t TopicPath) Location() LocationPath {
+func (t TopicPath) location() LocationPath {
 	return LocationPath{Project: t.Project, Zone: t.Zone}
 }
 
 var topicPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/topics/([^/]+)$`)
 
-// ParseTopicPath parses the full path of a Google Pub/Sub Lite topic, which
+// parseTopicPath parses the full path of a Google Pub/Sub Lite topic, which
 // should have the format: `projects/{project}/locations/{zone}/topics/{id}`.
-func ParseTopicPath(input string) (TopicPath, error) {
+func parseTopicPath(input string) (TopicPath, error) {
 	parts := topicPathRE.FindStringSubmatch(input)
 	if len(parts) < 4 {
 		return TopicPath{}, fmt.Errorf("pubsublite: invalid topic path %q", input)
@@ -87,6 +88,8 @@
 
 	// The ID of the Google Pub/Sub Lite subscription, for example
 	// "my-subscription-name".
+	// See https://cloud.google.com/pubsub/docs/admin#resource_names for more
+	// information.
 	SubscriptionID string
 }
 
@@ -94,17 +97,16 @@
 	return fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", s.Project, s.Zone, s.SubscriptionID)
 }
 
-// Location returns the path of the parent location.
-func (s SubscriptionPath) Location() LocationPath {
+func (s SubscriptionPath) location() LocationPath {
 	return LocationPath{Project: s.Project, Zone: s.Zone}
 }
 
 var subsPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/subscriptions/([^/]+)$`)
 
-// ParseSubscriptionPath parses the full path of a Google Pub/Sub Lite
+// parseSubscriptionPath parses the full path of a Google Pub/Sub Lite
 // subscription, which should have the format:
 // `projects/{project}/locations/{zone}/subscriptions/{id}`.
-func ParseSubscriptionPath(input string) (SubscriptionPath, error) {
+func parseSubscriptionPath(input string) (SubscriptionPath, error) {
 	parts := subsPathRE.FindStringSubmatch(input)
 	if len(parts) < 4 {
 		return SubscriptionPath{}, fmt.Errorf("pubsublite: invalid subscription path %q", input)
@@ -112,10 +114,10 @@
 	return SubscriptionPath{Project: parts[1], Zone: parts[2], SubscriptionID: parts[3]}, nil
 }
 
-// ValidateZone verifies that the `input` string has the format of a valid
+// validateZone verifies that the `input` string has the format of a valid
 // Google Cloud zone. An example zone is "europe-west1-b".
 // See https://cloud.google.com/compute/docs/regions-zones for more information.
-func ValidateZone(input string) error {
+func validateZone(input string) error {
 	parts := strings.Split(input, "-")
 	if len(parts) != 3 {
 		return fmt.Errorf("pubsublite: invalid zone %q", input)
@@ -123,10 +125,10 @@
 	return nil
 }
 
-// ValidateRegion verifies that the `input` string has the format of a valid
+// validateRegion verifies that the `input` string has the format of a valid
 // Google Cloud region. An example region is "europe-west1".
 // See https://cloud.google.com/compute/docs/regions-zones for more information.
-func ValidateRegion(input string) error {
+func validateRegion(input string) error {
 	parts := strings.Split(input, "-")
 	if len(parts) != 2 {
 		return fmt.Errorf("pubsublite: invalid region %q", input)
@@ -136,7 +138,7 @@
 
 // ZoneToRegion returns the region that the given zone is in.
 func ZoneToRegion(zone string) (string, error) {
-	if err := ValidateZone(zone); err != nil {
+	if err := validateZone(zone); err != nil {
 		return "", err
 	}
 	return zone[0:strings.LastIndex(zone, "-")], nil
diff --git a/pubsublite/types_test.go b/pubsublite/types_test.go
index c9a9871..18f02b9 100644
--- a/pubsublite/types_test.go
+++ b/pubsublite/types_test.go
@@ -64,9 +64,9 @@
 		},
 	} {
 		t.Run(tc.desc, func(t *testing.T) {
-			gotPath, gotErr := ParseTopicPath(tc.input)
+			gotPath, gotErr := parseTopicPath(tc.input)
 			if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr {
-				t.Errorf("ParseTopicPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr)
+				t.Errorf("parseTopicPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr)
 			}
 		})
 	}
@@ -121,9 +121,9 @@
 		},
 	} {
 		t.Run(tc.desc, func(t *testing.T) {
-			gotPath, gotErr := ParseSubscriptionPath(tc.input)
+			gotPath, gotErr := parseSubscriptionPath(tc.input)
 			if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr {
-				t.Errorf("ParseSubscriptionPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr)
+				t.Errorf("parseSubscriptionPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr)
 			}
 		})
 	}
@@ -152,9 +152,9 @@
 		},
 	} {
 		t.Run(tc.desc, func(t *testing.T) {
-			err := ValidateZone(tc.input)
+			err := validateZone(tc.input)
 			if (err != nil) != tc.wantErr {
-				t.Errorf("ValidateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
+				t.Errorf("validateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
 			}
 		})
 	}
@@ -183,9 +183,9 @@
 		},
 	} {
 		t.Run(tc.desc, func(t *testing.T) {
-			err := ValidateRegion(tc.input)
+			err := validateRegion(tc.input)
 			if (err != nil) != tc.wantErr {
-				t.Errorf("ValidateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
+				t.Errorf("validateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
 			}
 		})
 	}