blob: 3177ecfa38f399ec3475c77637226142600894eb [file] [log] [blame]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"context"
"sync/atomic"
"golang.org/x/sync/semaphore"
)
// Flow controller for write API. Adapted from pubsub.
type flowController struct {
// The max number of pending write requests.
maxInsertCount int
// The max pending request bytes.
maxInsertBytes int
// Semaphores for governing pending inserts.
semInsertCount, semInsertBytes *semaphore.Weighted
countRemaining int64 // Atomic.
}
func newFlowController(maxInserts, maxInsertBytes int) *flowController {
fc := &flowController{
maxInsertCount: maxInserts,
maxInsertBytes: maxInsertBytes,
semInsertCount: nil,
semInsertBytes: nil,
}
if maxInserts > 0 {
fc.semInsertCount = semaphore.NewWeighted(int64(maxInserts))
}
if maxInsertBytes > 0 {
fc.semInsertBytes = semaphore.NewWeighted(int64(maxInsertBytes))
}
return fc
}
// acquire blocks until one insert of size bytes can proceed or ctx is done.
// It returns nil in the first case, or ctx.Err() in the second.
//
// acquire allows large messages to proceed by treating a size greater than maxSize
// as if it were equal to maxSize.
func (fc *flowController) acquire(ctx context.Context, sizeBytes int) error {
if fc.semInsertCount != nil {
if err := fc.semInsertCount.Acquire(ctx, 1); err != nil {
return err
}
}
if fc.semInsertBytes != nil {
if err := fc.semInsertBytes.Acquire(ctx, fc.bound(sizeBytes)); err != nil {
if fc.semInsertCount != nil {
fc.semInsertCount.Release(1)
}
return err
}
}
atomic.AddInt64(&fc.countRemaining, 1)
return nil
}
// tryAcquire returns false if acquire would block. Otherwise, it behaves like
// acquire and returns true.
//
// tryAcquire allows large inserts to proceed by treating a size greater than
// maxSize as if it were equal to maxSize.
func (fc *flowController) tryAcquire(sizeBytes int) bool {
if fc.semInsertCount != nil {
if !fc.semInsertCount.TryAcquire(1) {
return false
}
}
if fc.semInsertBytes != nil {
if !fc.semInsertBytes.TryAcquire(fc.bound(sizeBytes)) {
if fc.semInsertCount != nil {
fc.semInsertCount.Release(1)
}
return false
}
}
atomic.AddInt64(&fc.countRemaining, 1)
return true
}
func (fc *flowController) release(sizeBytes int) {
atomic.AddInt64(&fc.countRemaining, -1)
if fc.semInsertCount != nil {
fc.semInsertCount.Release(1)
}
if fc.semInsertBytes != nil {
fc.semInsertBytes.Release(fc.bound(sizeBytes))
}
}
// bound normalizes input size to maxInsertBytes if it exceeds the limit.
func (fc *flowController) bound(sizeBytes int) int64 {
if sizeBytes > fc.maxInsertBytes {
return int64(fc.maxInsertBytes)
}
return int64(sizeBytes)
}
func (fc *flowController) count() int {
return int(atomic.LoadInt64(&fc.countRemaining))
}