| // Copyright 2016 Google Inc. All Rights Reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package pubsub |
| |
| import ( |
| "errors" |
| "fmt" |
| "strings" |
| "time" |
| |
| "cloud.google.com/go/iam" |
| "golang.org/x/net/context" |
| ) |
| |
| // The default period for which to automatically extend Message acknowledgement deadlines. |
| const DefaultMaxExtension = 10 * time.Minute |
| |
| // The default maximum number of messages that are prefetched from the server. |
| const DefaultMaxPrefetch = 100 |
| |
| // Subscription is a reference to a PubSub subscription. |
| type Subscription struct { |
| s service |
| |
| // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>" |
| name string |
| } |
| |
| // Subscription creates a reference to a subscription. |
| func (c *Client) Subscription(id string) *Subscription { |
| return &Subscription{ |
| s: c.s, |
| name: fmt.Sprintf("projects/%s/subscriptions/%s", c.projectID, id), |
| } |
| } |
| |
| // String returns the globally unique printable name of the subscription. |
| func (s *Subscription) String() string { |
| return s.name |
| } |
| |
| // ID returns the unique identifier of the subscription within its project. |
| func (s *Subscription) ID() string { |
| slash := strings.LastIndex(s.name, "/") |
| if slash == -1 { |
| // name is not a fully-qualified name. |
| panic("bad subscription name") |
| } |
| return s.name[slash+1:] |
| } |
| |
| // Subscriptions returns an iterator which returns all of the subscriptions for the client's project. |
| func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator { |
| return &SubscriptionIterator{ |
| s: c.s, |
| next: c.s.listProjectSubscriptions(ctx, c.fullyQualifiedProjectName()), |
| } |
| } |
| |
| // SubscriptionIterator is an iterator that returns a series of subscriptions. |
| type SubscriptionIterator struct { |
| s service |
| next nextStringFunc |
| } |
| |
| // Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned. |
| func (subs *SubscriptionIterator) Next() (*Subscription, error) { |
| subName, err := subs.next() |
| if err != nil { |
| return nil, err |
| } |
| return &Subscription{s: subs.s, name: subName}, nil |
| } |
| |
| // PushConfig contains configuration for subscriptions that operate in push mode. |
| type PushConfig struct { |
| // A URL locating the endpoint to which messages should be pushed. |
| Endpoint string |
| |
| // Endpoint configuration attributes. See https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions#PushConfig.FIELDS.attributes for more details. |
| Attributes map[string]string |
| } |
| |
| // Subscription config contains the configuration of a subscription. |
| type SubscriptionConfig struct { |
| Topic *Topic |
| PushConfig PushConfig |
| |
| // The default maximum time after a subscriber receives a message before |
| // the subscriber should acknowledge the message. Note: messages which are |
| // obtained via a MessageIterator need not be acknowledged within this |
| // deadline, as the deadline will be automatically extended. |
| AckDeadline time.Duration |
| } |
| |
| // Delete deletes the subscription. |
| func (s *Subscription) Delete(ctx context.Context) error { |
| return s.s.deleteSubscription(ctx, s.name) |
| } |
| |
| // Exists reports whether the subscription exists on the server. |
| func (s *Subscription) Exists(ctx context.Context) (bool, error) { |
| return s.s.subscriptionExists(ctx, s.name) |
| } |
| |
| // Config fetches the current configuration for the subscription. |
| func (s *Subscription) Config(ctx context.Context) (*SubscriptionConfig, error) { |
| conf, topicName, err := s.s.getSubscriptionConfig(ctx, s.name) |
| if err != nil { |
| return nil, err |
| } |
| conf.Topic = &Topic{ |
| s: s.s, |
| name: topicName, |
| } |
| return conf, nil |
| } |
| |
| // Pull returns a MessageIterator that can be used to fetch Messages. The MessageIterator |
| // will automatically extend the ack deadline of all fetched Messages, for the |
| // period specified by DefaultMaxExtension. This may be overridden by supplying |
| // a MaxExtension pull option. |
| // |
| // If ctx is cancelled or exceeds its deadline, outstanding acks or deadline |
| // extensions will fail. |
| // |
| // The caller must call Stop on the MessageIterator once finished with it. |
| func (s *Subscription) Pull(ctx context.Context, opts ...PullOption) (*MessageIterator, error) { |
| config, err := s.Config(ctx) |
| if err != nil { |
| return nil, err |
| } |
| po := processPullOptions(opts) |
| po.ackDeadline = config.AckDeadline |
| return newMessageIterator(ctx, s.s, s.name, po), nil |
| } |
| |
| // ModifyPushConfig updates the endpoint URL and other attributes of a push subscription. |
| func (s *Subscription) ModifyPushConfig(ctx context.Context, conf *PushConfig) error { |
| if conf == nil { |
| return errors.New("must supply non-nil PushConfig") |
| } |
| |
| return s.s.modifyPushConfig(ctx, s.name, conf) |
| } |
| |
| func (s *Subscription) IAM() *iam.Handle { |
| return s.s.iamHandle(s.name) |
| } |
| |
| // A PullOption is an optional argument to Subscription.Pull. |
| type PullOption interface { |
| setOptions(o *pullOptions) |
| } |
| |
| type pullOptions struct { |
| // maxExtension is the maximum period for which the iterator should |
| // automatically extend the ack deadline for each message. |
| maxExtension time.Duration |
| |
| // maxPrefetch is the maximum number of Messages to have in flight, to |
| // be returned by MessageIterator.Next. |
| maxPrefetch int32 |
| |
| // ackDeadline is the default ack deadline for the subscription. Not |
| // configurable via a PullOption. |
| ackDeadline time.Duration |
| } |
| |
| func processPullOptions(opts []PullOption) *pullOptions { |
| po := &pullOptions{ |
| maxExtension: DefaultMaxExtension, |
| maxPrefetch: DefaultMaxPrefetch, |
| } |
| |
| for _, o := range opts { |
| o.setOptions(po) |
| } |
| |
| return po |
| } |
| |
| type maxPrefetch int32 |
| |
| func (max maxPrefetch) setOptions(o *pullOptions) { |
| if o.maxPrefetch = int32(max); o.maxPrefetch < 1 { |
| o.maxPrefetch = 1 |
| } |
| } |
| |
| // MaxPrefetch returns a PullOption that limits Message prefetching. |
| // |
| // For performance reasons, the pubsub library may prefetch a pool of Messages |
| // to be returned serially from MessageIterator.Next. MaxPrefetch is used to limit the |
| // the size of this pool. |
| // |
| // If num is less than 1, it will be treated as if it were 1. |
| func MaxPrefetch(num int) PullOption { |
| return maxPrefetch(trunc32(int64(num))) |
| } |
| |
| type maxExtension time.Duration |
| |
| func (max maxExtension) setOptions(o *pullOptions) { |
| if o.maxExtension = time.Duration(max); o.maxExtension < 0 { |
| o.maxExtension = 0 |
| } |
| } |
| |
| // MaxExtension returns a PullOption that limits how long acks deadlines are |
| // extended for. |
| // |
| // A MessageIterator will automatically extend the ack deadline of all fetched |
| // Messages for the duration specified. Automatic deadline extension may be |
| // disabled by specifying a duration of 0. |
| func MaxExtension(duration time.Duration) PullOption { |
| return maxExtension(duration) |
| } |
| |
| // CreateSubscription creates a new subscription on a topic. |
| // |
| // name is the name of the subscription to create. It must start with a letter, |
| // and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), |
| // underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It |
| // must be between 3 and 255 characters in length, and must not start with |
| // "goog". |
| // |
| // topic is the topic from which the subscription should receive messages. It |
| // need not belong to the same project as the subscription. |
| // |
| // ackDeadline is the maximum time after a subscriber receives a message before |
| // the subscriber should acknowledge the message. It must be between 10 and 600 |
| // seconds (inclusive), and is rounded down to the nearest second. If the |
| // provided ackDeadline is 0, then the default value of 10 seconds is used. |
| // Note: messages which are obtained via a MessageIterator need not be |
| // acknowledged within this deadline, as the deadline will be automatically |
| // extended. |
| // |
| // pushConfig may be set to configure this subscription for push delivery. |
| // |
| // If the subscription already exists an error will be returned. |
| func (c *Client) CreateSubscription(ctx context.Context, id string, topic *Topic, ackDeadline time.Duration, pushConfig *PushConfig) (*Subscription, error) { |
| if ackDeadline == 0 { |
| ackDeadline = 10 * time.Second |
| } |
| if d := ackDeadline.Seconds(); d < 10 || d > 600 { |
| return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d) |
| } |
| |
| sub := c.Subscription(id) |
| err := c.s.createSubscription(ctx, topic.name, sub.name, ackDeadline, pushConfig) |
| return sub, err |
| } |