| package memberlist |
| |
| import ( |
| "sort" |
| "sync" |
| ) |
| |
| // TransmitLimitedQueue is used to queue messages to broadcast to |
| // the cluster (via gossip) but limits the number of transmits per |
| // message. It also prioritizes messages with lower transmit counts |
| // (hence newer messages). |
| type TransmitLimitedQueue struct { |
| // NumNodes returns the number of nodes in the cluster. This is |
| // used to determine the retransmit count, which is calculated |
| // based on the log of this. |
| NumNodes func() int |
| |
| // RetransmitMult is the multiplier used to determine the maximum |
| // number of retransmissions attempted. |
| RetransmitMult int |
| |
| sync.Mutex |
| bcQueue limitedBroadcasts |
| } |
| |
| type limitedBroadcast struct { |
| transmits int // Number of transmissions attempted. |
| b Broadcast |
| } |
| type limitedBroadcasts []*limitedBroadcast |
| |
| // Broadcast is something that can be broadcasted via gossip to |
| // the memberlist cluster. |
| type Broadcast interface { |
| // Invalidates checks if enqueuing the current broadcast |
| // invalidates a previous broadcast |
| Invalidates(b Broadcast) bool |
| |
| // Returns a byte form of the message |
| Message() []byte |
| |
| // Finished is invoked when the message will no longer |
| // be broadcast, either due to invalidation or to the |
| // transmit limit being reached |
| Finished() |
| } |
| |
| // QueueBroadcast is used to enqueue a broadcast |
| func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) { |
| q.Lock() |
| defer q.Unlock() |
| |
| // Check if this message invalidates another |
| n := len(q.bcQueue) |
| for i := 0; i < n; i++ { |
| if b.Invalidates(q.bcQueue[i].b) { |
| q.bcQueue[i].b.Finished() |
| copy(q.bcQueue[i:], q.bcQueue[i+1:]) |
| q.bcQueue[n-1] = nil |
| q.bcQueue = q.bcQueue[:n-1] |
| n-- |
| } |
| } |
| |
| // Append to the queue |
| q.bcQueue = append(q.bcQueue, &limitedBroadcast{0, b}) |
| } |
| |
| // GetBroadcasts is used to get a number of broadcasts, up to a byte limit |
| // and applying a per-message overhead as provided. |
| func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte { |
| q.Lock() |
| defer q.Unlock() |
| |
| // Fast path the default case |
| if len(q.bcQueue) == 0 { |
| return nil |
| } |
| |
| transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes()) |
| bytesUsed := 0 |
| var toSend [][]byte |
| |
| for i := len(q.bcQueue) - 1; i >= 0; i-- { |
| // Check if this is within our limits |
| b := q.bcQueue[i] |
| msg := b.b.Message() |
| if bytesUsed+overhead+len(msg) > limit { |
| continue |
| } |
| |
| // Add to slice to send |
| bytesUsed += overhead + len(msg) |
| toSend = append(toSend, msg) |
| |
| // Check if we should stop transmission |
| b.transmits++ |
| if b.transmits >= transmitLimit { |
| b.b.Finished() |
| n := len(q.bcQueue) |
| q.bcQueue[i], q.bcQueue[n-1] = q.bcQueue[n-1], nil |
| q.bcQueue = q.bcQueue[:n-1] |
| } |
| } |
| |
| // If we are sending anything, we need to re-sort to deal |
| // with adjusted transmit counts |
| if len(toSend) > 0 { |
| q.bcQueue.Sort() |
| } |
| return toSend |
| } |
| |
| // NumQueued returns the number of queued messages |
| func (q *TransmitLimitedQueue) NumQueued() int { |
| q.Lock() |
| defer q.Unlock() |
| return len(q.bcQueue) |
| } |
| |
| // Reset clears all the queued messages |
| func (q *TransmitLimitedQueue) Reset() { |
| q.Lock() |
| defer q.Unlock() |
| for _, b := range q.bcQueue { |
| b.b.Finished() |
| } |
| q.bcQueue = nil |
| } |
| |
| // Prune will retain the maxRetain latest messages, and the rest |
| // will be discarded. This can be used to prevent unbounded queue sizes |
| func (q *TransmitLimitedQueue) Prune(maxRetain int) { |
| q.Lock() |
| defer q.Unlock() |
| |
| // Do nothing if queue size is less than the limit |
| n := len(q.bcQueue) |
| if n < maxRetain { |
| return |
| } |
| |
| // Invalidate the messages we will be removing |
| for i := 0; i < n-maxRetain; i++ { |
| q.bcQueue[i].b.Finished() |
| } |
| |
| // Move the messages, and retain only the last maxRetain |
| copy(q.bcQueue[0:], q.bcQueue[n-maxRetain:]) |
| q.bcQueue = q.bcQueue[:maxRetain] |
| } |
| |
| func (b limitedBroadcasts) Len() int { |
| return len(b) |
| } |
| |
| func (b limitedBroadcasts) Less(i, j int) bool { |
| return b[i].transmits < b[j].transmits |
| } |
| |
| func (b limitedBroadcasts) Swap(i, j int) { |
| b[i], b[j] = b[j], b[i] |
| } |
| |
| func (b limitedBroadcasts) Sort() { |
| sort.Sort(sort.Reverse(b)) |
| } |