| package memberlist |
| |
| /* |
| The broadcast mechanism works by maintaining a sorted list of messages to be |
| sent out. When a message is to be broadcast, the retransmit count |
| is set to zero and appended to the queue. The retransmit count serves |
| as the "priority", ensuring that newer messages get sent first. Once |
| a message hits the retransmit limit, it is removed from the queue. |
| |
| Additionally, older entries can be invalidated by new messages that |
| are contradictory. For example, if we send "{suspect M1 inc: 1}, |
| then a following {alive M1 inc: 2} will invalidate that message |
| */ |
| |
| type memberlistBroadcast struct { |
| node string |
| msg []byte |
| notify chan struct{} |
| } |
| |
| func (b *memberlistBroadcast) Invalidates(other Broadcast) bool { |
| // Check if that broadcast is a memberlist type |
| mb, ok := other.(*memberlistBroadcast) |
| if !ok { |
| return false |
| } |
| |
| // Invalidates any message about the same node |
| return b.node == mb.node |
| } |
| |
| func (b *memberlistBroadcast) Message() []byte { |
| return b.msg |
| } |
| |
| func (b *memberlistBroadcast) Finished() { |
| select { |
| case b.notify <- struct{}{}: |
| default: |
| } |
| } |
| |
| // encodeAndBroadcast encodes a message and enqueues it for broadcast. Fails |
| // silently if there is an encoding error. |
| func (m *Memberlist) encodeAndBroadcast(node string, msgType messageType, msg interface{}) { |
| m.encodeBroadcastNotify(node, msgType, msg, nil) |
| } |
| |
| // encodeBroadcastNotify encodes a message and enqueues it for broadcast |
| // and notifies the given channel when transmission is finished. Fails |
| // silently if there is an encoding error. |
| func (m *Memberlist) encodeBroadcastNotify(node string, msgType messageType, msg interface{}, notify chan struct{}) { |
| buf, err := encode(msgType, msg) |
| if err != nil { |
| m.logger.Printf("[ERR] memberlist: Failed to encode message for broadcast: %s", err) |
| } else { |
| m.queueBroadcast(node, buf.Bytes(), notify) |
| } |
| } |
| |
| // queueBroadcast is used to start dissemination of a message. It will be |
| // sent up to a configured number of times. The message could potentially |
| // be invalidated by a future message about the same node |
| func (m *Memberlist) queueBroadcast(node string, msg []byte, notify chan struct{}) { |
| b := &memberlistBroadcast{node, msg, notify} |
| m.broadcasts.QueueBroadcast(b) |
| } |
| |
| // getBroadcasts is used to return a slice of broadcasts to send up to |
| // a maximum byte size, while imposing a per-broadcast overhead. This is used |
| // to fill a UDP packet with piggybacked data |
| func (m *Memberlist) getBroadcasts(overhead, limit int) [][]byte { |
| // Get memberlist messages first |
| toSend := m.broadcasts.GetBroadcasts(overhead, limit) |
| |
| // Check if the user has anything to broadcast |
| d := m.config.Delegate |
| if d != nil { |
| // Determine the bytes used already |
| bytesUsed := 0 |
| for _, msg := range toSend { |
| bytesUsed += len(msg) + overhead |
| } |
| |
| // Check space remaining for user messages |
| avail := limit - bytesUsed |
| if avail > overhead+userMsgOverhead { |
| userMsgs := d.GetBroadcasts(overhead+userMsgOverhead, avail) |
| |
| // Frame each user message |
| for _, msg := range userMsgs { |
| buf := make([]byte, 1, len(msg)+1) |
| buf[0] = byte(userMsg) |
| buf = append(buf, msg...) |
| toSend = append(toSend, buf) |
| } |
| } |
| } |
| return toSend |
| } |