blob: ad7e0907bfeaadf7997910016efc6d70a2e1c526 [file] [log] [blame]
// Copyright 2017 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"errors"
"sync/atomic"
"golang.org/x/sync/semaphore"
)
// LimitExceededBehavior configures the behavior that flowController can use in case
// the flow control limits are exceeded.
type LimitExceededBehavior int
const (
// FlowControlIgnore disables flow control.
FlowControlIgnore LimitExceededBehavior = iota
// FlowControlBlock signals to wait until the request can be made without exceeding the limit.
FlowControlBlock
// FlowControlSignalError signals an error to the caller of acquire.
FlowControlSignalError
)
// flowControllerPurpose indicates whether a flowController is for a topic or a
// subscription.
type flowControllerPurpose int
const (
flowControllerPurposeSubscription flowControllerPurpose = iota
flowControllerPurposeTopic
)
// FlowControlSettings controls flow control for messages while publishing or subscribing.
type FlowControlSettings struct {
// MaxOutstandingMessages is the maximum number of bufered messages to be published.
// If less than or equal to zero, this is disabled.
MaxOutstandingMessages int
// MaxOutstandingBytes is the maximum size of buffered messages to be published.
// If less than or equal to zero, this is disabled.
MaxOutstandingBytes int
// LimitExceededBehavior configures the behavior when trying to publish
// additional messages while the flow controller is full. The available options
// are Ignore (disable, default), Block, and SignalError (publish
// results will return an error).
LimitExceededBehavior LimitExceededBehavior
}
var (
// ErrFlowControllerMaxOutstandingMessages indicates that outstanding messages exceeds MaxOutstandingMessages.
ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded")
// ErrFlowControllerMaxOutstandingBytes indicates that outstanding bytes of messages exceeds MaxOutstandingBytes.
ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded")
)
// flowController implements flow control for publishing and subscribing.
type flowController struct {
maxCount int
maxSize int // max total size of messages
semCount, semSize *semaphore.Weighted // enforces max number and size of messages
// Number of calls to acquire - number of calls to release. This can go
// negative if semCount == nil and a large acquire is followed by multiple
// small releases.
// Atomic.
countRemaining int64
// Number of outstanding bytes remaining. Atomic.
bytesRemaining int64
limitBehavior LimitExceededBehavior
purpose flowControllerPurpose
}
// newFlowController creates a new flowController that ensures no more than
// maxCount messages or maxSize bytes are outstanding at once. If maxCount or
// maxSize is < 1, then an unlimited number of messages or bytes is permitted,
// respectively.
func newFlowController(fc FlowControlSettings) flowController {
f := flowController{
maxCount: fc.MaxOutstandingMessages,
maxSize: fc.MaxOutstandingBytes,
semCount: nil,
semSize: nil,
limitBehavior: fc.LimitExceededBehavior,
}
if fc.MaxOutstandingMessages > 0 {
f.semCount = semaphore.NewWeighted(int64(fc.MaxOutstandingMessages))
}
if fc.MaxOutstandingBytes > 0 {
f.semSize = semaphore.NewWeighted(int64(fc.MaxOutstandingBytes))
}
return f
}
func newTopicFlowController(fc FlowControlSettings) flowController {
f := newFlowController(fc)
f.purpose = flowControllerPurposeTopic
return f
}
func newSubscriptionFlowController(fc FlowControlSettings) flowController {
f := newFlowController(fc)
f.purpose = flowControllerPurposeSubscription
return f
}
// acquire allocates space for a message: the message count and its size.
//
// In FlowControlSignalError mode, large messages greater than maxSize
// will be result in an error. In other modes, large messages will be treated
// as if it were equal to maxSize.
func (f *flowController) acquire(ctx context.Context, size int) error {
switch f.limitBehavior {
case FlowControlIgnore:
return nil
case FlowControlBlock:
if f.semCount != nil {
if err := f.semCount.Acquire(ctx, 1); err != nil {
return err
}
}
if f.semSize != nil {
if err := f.semSize.Acquire(ctx, f.bound(size)); err != nil {
if f.semCount != nil {
f.semCount.Release(1)
}
return err
}
}
case FlowControlSignalError:
if f.semCount != nil {
if !f.semCount.TryAcquire(1) {
return ErrFlowControllerMaxOutstandingMessages
}
}
if f.semSize != nil {
// Try to acquire the full size of the message here.
if !f.semSize.TryAcquire(int64(size)) {
if f.semCount != nil {
f.semCount.Release(1)
}
return ErrFlowControllerMaxOutstandingBytes
}
}
}
if f.semCount != nil {
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
f.recordOutstandingMessages(ctx, outstandingMessages)
}
if f.semSize != nil {
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
f.recordOutstandingBytes(ctx, outstandingBytes)
}
return nil
}
// release notes that one message of size bytes is no longer outstanding.
func (f *flowController) release(ctx context.Context, size int) {
if f.limitBehavior == FlowControlIgnore {
return
}
if f.semCount != nil {
outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
f.recordOutstandingMessages(ctx, outstandingMessages)
f.semCount.Release(1)
}
if f.semSize != nil {
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
f.recordOutstandingBytes(ctx, outstandingBytes)
f.semSize.Release(f.bound(size))
}
}
func (f *flowController) bound(size int) int64 {
if size > f.maxSize {
return int64(f.maxSize)
}
return int64(size)
}
// count returns the number of outstanding messages.
// if maxCount is 0, this will always return 0.
func (f *flowController) count() int {
return int(atomic.LoadInt64(&f.countRemaining))
}
func (f *flowController) recordOutstandingMessages(ctx context.Context, n int64) {
if f.purpose == flowControllerPurposeTopic {
recordStat(ctx, PublisherOutstandingMessages, n)
return
}
recordStat(ctx, OutstandingMessages, n)
}
func (f *flowController) recordOutstandingBytes(ctx context.Context, n int64) {
if f.purpose == flowControllerPurposeTopic {
recordStat(ctx, PublisherOutstandingBytes, n)
return
}
recordStat(ctx, OutstandingBytes, n)
}