blob: f7e85a119cf09b687457b8646896d2ac3e508a14 [file] [log] [blame]
package memberlist
/*
The broadcast mechanism works by maintaining a sorted list of messages to be
sent out. When a message is to be broadcast, the retransmit count
is set to zero and appended to the queue. The retransmit count serves
as the "priority", ensuring that newer messages get sent first. Once
a message hits the retransmit limit, it is removed from the queue.
Additionally, older entries can be invalidated by new messages that
are contradictory. For example, if we send "{suspect M1 inc: 1},
then a following {alive M1 inc: 2} will invalidate that message
*/
type memberlistBroadcast struct {
node string
msg []byte
notify chan struct{}
}
func (b *memberlistBroadcast) Invalidates(other Broadcast) bool {
// Check if that broadcast is a memberlist type
mb, ok := other.(*memberlistBroadcast)
if !ok {
return false
}
// Invalidates any message about the same node
return b.node == mb.node
}
func (b *memberlistBroadcast) Message() []byte {
return b.msg
}
func (b *memberlistBroadcast) Finished() {
select {
case b.notify <- struct{}{}:
default:
}
}
// encodeAndBroadcast encodes a message and enqueues it for broadcast. Fails
// silently if there is an encoding error.
func (m *Memberlist) encodeAndBroadcast(node string, msgType messageType, msg interface{}) {
m.encodeBroadcastNotify(node, msgType, msg, nil)
}
// encodeBroadcastNotify encodes a message and enqueues it for broadcast
// and notifies the given channel when transmission is finished. Fails
// silently if there is an encoding error.
func (m *Memberlist) encodeBroadcastNotify(node string, msgType messageType, msg interface{}, notify chan struct{}) {
buf, err := encode(msgType, msg)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode message for broadcast: %s", err)
} else {
m.queueBroadcast(node, buf.Bytes(), notify)
}
}
// queueBroadcast is used to start dissemination of a message. It will be
// sent up to a configured number of times. The message could potentially
// be invalidated by a future message about the same node
func (m *Memberlist) queueBroadcast(node string, msg []byte, notify chan struct{}) {
b := &memberlistBroadcast{node, msg, notify}
m.broadcasts.QueueBroadcast(b)
}
// getBroadcasts is used to return a slice of broadcasts to send up to
// a maximum byte size, while imposing a per-broadcast overhead. This is used
// to fill a UDP packet with piggybacked data
func (m *Memberlist) getBroadcasts(overhead, limit int) [][]byte {
// Get memberlist messages first
toSend := m.broadcasts.GetBroadcasts(overhead, limit)
// Check if the user has anything to broadcast
d := m.config.Delegate
if d != nil {
// Determine the bytes used already
bytesUsed := 0
for _, msg := range toSend {
bytesUsed += len(msg) + overhead
}
// Check space remaining for user messages
avail := limit - bytesUsed
if avail > overhead+userMsgOverhead {
userMsgs := d.GetBroadcasts(overhead+userMsgOverhead, avail)
// Frame each user message
for _, msg := range userMsgs {
buf := make([]byte, 1, len(msg)+1)
buf[0] = byte(userMsg)
buf = append(buf, msg...)
toSend = append(toSend, buf)
}
}
}
return toSend
}