blob: 994b90ff1040a9ddc53ab5512f6a440773a78780 [file] [log] [blame]
package memberlist
import (
"sort"
"sync"
)
// TransmitLimitedQueue is used to queue messages to broadcast to
// the cluster (via gossip) but limits the number of transmits per
// message. It also prioritizes messages with lower transmit counts
// (hence newer messages).
type TransmitLimitedQueue struct {
// NumNodes returns the number of nodes in the cluster. This is
// used to determine the retransmit count, which is calculated
// based on the log of this.
NumNodes func() int
// RetransmitMult is the multiplier used to determine the maximum
// number of retransmissions attempted.
RetransmitMult int
sync.Mutex
bcQueue limitedBroadcasts
}
type limitedBroadcast struct {
transmits int // Number of transmissions attempted.
b Broadcast
}
type limitedBroadcasts []*limitedBroadcast
// Broadcast is something that can be broadcasted via gossip to
// the memberlist cluster.
type Broadcast interface {
// Invalidates checks if enqueuing the current broadcast
// invalidates a previous broadcast
Invalidates(b Broadcast) bool
// Returns a byte form of the message
Message() []byte
// Finished is invoked when the message will no longer
// be broadcast, either due to invalidation or to the
// transmit limit being reached
Finished()
}
// QueueBroadcast is used to enqueue a broadcast
func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) {
q.Lock()
defer q.Unlock()
// Check if this message invalidates another
n := len(q.bcQueue)
for i := 0; i < n; i++ {
if b.Invalidates(q.bcQueue[i].b) {
q.bcQueue[i].b.Finished()
copy(q.bcQueue[i:], q.bcQueue[i+1:])
q.bcQueue[n-1] = nil
q.bcQueue = q.bcQueue[:n-1]
n--
}
}
// Append to the queue
q.bcQueue = append(q.bcQueue, &limitedBroadcast{0, b})
}
// GetBroadcasts is used to get a number of broadcasts, up to a byte limit
// and applying a per-message overhead as provided.
func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte {
q.Lock()
defer q.Unlock()
// Fast path the default case
if len(q.bcQueue) == 0 {
return nil
}
transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes())
bytesUsed := 0
var toSend [][]byte
for i := len(q.bcQueue) - 1; i >= 0; i-- {
// Check if this is within our limits
b := q.bcQueue[i]
msg := b.b.Message()
if bytesUsed+overhead+len(msg) > limit {
continue
}
// Add to slice to send
bytesUsed += overhead + len(msg)
toSend = append(toSend, msg)
// Check if we should stop transmission
b.transmits++
if b.transmits >= transmitLimit {
b.b.Finished()
n := len(q.bcQueue)
q.bcQueue[i], q.bcQueue[n-1] = q.bcQueue[n-1], nil
q.bcQueue = q.bcQueue[:n-1]
}
}
// If we are sending anything, we need to re-sort to deal
// with adjusted transmit counts
if len(toSend) > 0 {
q.bcQueue.Sort()
}
return toSend
}
// NumQueued returns the number of queued messages
func (q *TransmitLimitedQueue) NumQueued() int {
q.Lock()
defer q.Unlock()
return len(q.bcQueue)
}
// Reset clears all the queued messages
func (q *TransmitLimitedQueue) Reset() {
q.Lock()
defer q.Unlock()
for _, b := range q.bcQueue {
b.b.Finished()
}
q.bcQueue = nil
}
// Prune will retain the maxRetain latest messages, and the rest
// will be discarded. This can be used to prevent unbounded queue sizes
func (q *TransmitLimitedQueue) Prune(maxRetain int) {
q.Lock()
defer q.Unlock()
// Do nothing if queue size is less than the limit
n := len(q.bcQueue)
if n < maxRetain {
return
}
// Invalidate the messages we will be removing
for i := 0; i < n-maxRetain; i++ {
q.bcQueue[i].b.Finished()
}
// Move the messages, and retain only the last maxRetain
copy(q.bcQueue[0:], q.bcQueue[n-maxRetain:])
q.bcQueue = q.bcQueue[:maxRetain]
}
func (b limitedBroadcasts) Len() int {
return len(b)
}
func (b limitedBroadcasts) Less(i, j int) bool {
return b[i].transmits < b[j].transmits
}
func (b limitedBroadcasts) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}
func (b limitedBroadcasts) Sort() {
sort.Sort(sort.Reverse(b))
}