blob: 9c3e6dc277d16d53c82c28f212b260d6cfb31b86 [file] [log] [blame] [edit]
package sync
import (
"errors"
"sync"
"github.com/pkg/sftp/v2/internal/pragma"
)
// SlicePool is a set of temporary slices that may be individually saved and retrieved.
// It is intended to mirror [sync.Pool], except it has been specifically designed to meet the needs of pkg/sftp.
//
// Any slice stored in the SlicePool will be held onto indefinitely,
// and slices are returned for reuse in a round-robin order.
//
// A SlicePool is safe for use by multiple goroutines simultaneously.
//
// SlicePool's purpose is to cache allocated but unused slices for later reuse,
// relieving pressure on the garbage collector and amortizing allocation overhead.
//
// Unlike the standard library Pool, it is suitable to act as a free list of short-lived slices,
// since the free list is maintained as a channel, and thus has fairly low overhead.
type SlicePool[S []T, T any] struct {
noCopy pragma.DoNotCopy
metrics
ch chan S
length int
}
// NewSlicePool returns a [SlicePool] set to hold onto depth number of items,
// and discard any slice with a capacity greater than the cull length.
//
// It will panic if given a negative depth, the same as making a negative-buffer channel.
// It will also panic if given a zero or negative cull length.
func NewSlicePool[S []T, T any](depth, cullLength int) *SlicePool[S, T] {
if cullLength <= 0 {
panic("sftp: bufPool: new buffer creation length must be greater than zero")
}
return &SlicePool[S, T]{
ch: make(chan S, depth),
length: cullLength,
}
}
// Get retrieves a slice from the pool, sets the length to the capacity, and then returns it to the caller.
// If the pool is empty, it will return a nil slice.
//
// A nil SlicePool is treated as an empty pool,
// that is, it returns only nil slices.
func (p *SlicePool[S, T]) Get() S {
if p == nil {
return nil
}
select {
case b := <-p.ch:
p.hit()
return b[:cap(b)] // re-extend to the full length.
default:
p.miss()
return nil // Don't over allocate; let ReadFrom allocate the specific size.
}
}
// Put adds the slice to the pool, if there is capacity in the pool,
// and if the capacity of the slice is less than the culling length.
//
// A nil SlicePool is treated as a pool with no capacity.
func (p *SlicePool[S, T]) Put(b S) {
if p == nil {
// functional default: no reuse
return
}
if cap(b) > p.length {
// DO NOT reuse buffers with excessive capacity.
// This could cause memory leaks.
return
}
select {
case p.ch <- b:
default:
}
}
// Pool is a set of temporary items that may be individually saved and retrieved.
// It is intended to mirror [sync.Pool], except it has been specifically designed to meet the needs of pkg/sftp.
//
// Any item stored in the Pool will be held onto indefinitely,
// and items are returned for reuse in a round-robin order.
//
// A Pool is safe for use by multiple goroutines simultaneously.
//
// Pool's purpose is to cache allocated but unused items for later reuse,
// relieving pressure on the garbage collector and amortizing allocation overhead.
//
// Unlike the standard library Pool, it is suitable to act as a free list of short-lived items,
// since the free list is maintained as a channel, and thus has fairly low overhead.
type Pool[T any] struct {
noCopy pragma.DoNotCopy
metrics
ch chan *T
}
// NewPool returns a [Pool] set to hold onto depth number of pointers to the given type.
//
// It will panic if given a negative depth, the same as making a negative-buffer channel.
func NewPool[T any](depth int) *Pool[T] {
return &Pool[T]{
ch: make(chan *T, depth),
}
}
// Get retrieves an item from the pool, and then returns it to the caller.
// If the pool is empty, it will return a pointer to a newly allocated item.
//
// A nil Pool is treated as an empty pool,
// that is, it always returns a pointer to a newly allocated item.
func (p *Pool[T]) Get() *T {
if p == nil {
return new(T)
}
select {
case v := <-p.ch:
p.hit()
return v
default:
p.miss()
return new(T)
}
}
// Put adds the given pointer to item to the pool, if there is capacity in the pool.
//
// A nil Pool is treated as a pool with no capacity.
func (p *Pool[T]) Put(v *T) {
if p == nil {
// functional default: no reuse
return
}
var z T
*v = z // shallow zero.
select {
case p.ch <- v:
default:
}
}
// WorkPool is a set of temporary work channels that can co-ordinate returns of work done among goroutines.
// It is intended to mimic [sync.Pool], except it has been specifically designed to meet the needs of pkg/sftp.
//
// A WorkPool will be filled to capacity at creation with work channels of the given type and a buffer of 1.
// It will track channels that have been handed out through Get,
// blocking on Close until all of them have been returned.
//
// WorkPool's purpose is also to block allocate work channels for reuse during concurrent transfers,
// relieving pressure on the garbage collector and amortizing allocation overhead.
// While also co-ordinating outstanding work, so the caller can wait for all work to be complete.
type WorkPool[T any] struct {
wg sync.WaitGroup
ch chan chan T
}
// NewWorkPool returns a [WorkPool] set to hold onto depth number of channels of the given type.
//
// It will panic if given a negative depth, the same as making a negative-buffer channel.
func NewWorkPool[T any](depth int) *WorkPool[T] {
p := &WorkPool[T]{
ch: make(chan chan T, depth),
}
for len(p.ch) < cap(p.ch) {
p.ch <- make(chan T, 1)
}
return p
}
// Close closes the [WorkPool] to all further Get request.
// Close then waits for all outstanding channels to be returned to the pool.
//
// After calling Close, all calls to Get will return a nil work channel and false.
//
// After Close returns, the pool will be empty,
// and all work channels will have been discarded and ready for the garbage collector.
//
// It is an error not a panic to close a nil WorkPool.
// However, Close will panic if called more than once.
func (p *WorkPool[T]) Close() error {
if p == nil {
return errors.New("cannot close nil work pool")
}
close(p.ch)
p.wg.Wait()
for range p.ch {
// drain the pool and drop them on all on the ground for GC.
}
return nil
}
// Get retrieves a work channel from the pool, and then returns it to the caller,
// or it returns a nil channel, and false if the [WorkPool] has been closed.
//
// If no work channels are available, it will block until a work channel has been returned to the pool.
//
// A nil WorkPool will simply always return a new work channel and true.
func (p *WorkPool[T]) Get() (chan T, bool) {
if p == nil {
return make(chan T, 1), true
}
v, ok := <-p.ch
if ok {
p.wg.Add(1)
}
return v, ok
}
// Put returns the given work channel to the pool.
//
// Put panics if an attempt is made to return more work channels to the pool than the capacity of the pool.
//
// A nil SlicePool will simply discard work channels.
func (p *WorkPool[T]) Put(v chan T) {
if p == nil {
// functional default: no reuse
return
}
select {
case p.ch <- v:
p.wg.Done()
default:
panic("worker pool overfill")
// This is an overfill, which shouldn't happen, but just in case...
}
}