blob: d0339bd158f144613572ae0f7b32ca40eccf3686 [file] [log] [blame]
package memberlist
import (
"bytes"
"fmt"
"math"
"math/rand"
"net"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
)
type nodeStateType int
const (
stateAlive nodeStateType = iota
stateSuspect
stateDead
)
// Node represents a node in the cluster.
type Node struct {
Name string
Addr net.IP
Port uint16
Meta []byte // Metadata from the delegate for this node.
PMin uint8 // Minimum protocol version this understands
PMax uint8 // Maximum protocol version this understands
PCur uint8 // Current version node is speaking
DMin uint8 // Min protocol version for the delegate to understand
DMax uint8 // Max protocol version for the delegate to understand
DCur uint8 // Current version delegate is speaking
}
// NodeState is used to manage our state view of another node
type nodeState struct {
Node
Incarnation uint32 // Last known incarnation number
State nodeStateType // Current state
StateChange time.Time // Time last state change happened
}
// ackHandler is used to register handlers for incoming acks
type ackHandler struct {
handler func([]byte, time.Time)
timer *time.Timer
}
// NoPingResponseError is used to indicate a 'ping' packet was
// successfully issued but no response was received
type NoPingResponseError struct {
node string
}
func (f NoPingResponseError) Error() string {
return fmt.Sprintf("No response from node %s", f.node)
}
// Schedule is used to ensure the Tick is performed periodically. This
// function is safe to call multiple times. If the memberlist is already
// scheduled, then it won't do anything.
func (m *Memberlist) schedule() {
m.tickerLock.Lock()
defer m.tickerLock.Unlock()
// If we already have tickers, then don't do anything, since we're
// scheduled
if len(m.tickers) > 0 {
return
}
// Create the stop tick channel, a blocking channel. We close this
// when we should stop the tickers.
stopCh := make(chan struct{})
// Create a new probeTicker
if m.config.ProbeInterval > 0 {
t := time.NewTicker(m.config.ProbeInterval)
go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
m.tickers = append(m.tickers, t)
}
// Create a push pull ticker if needed
if m.config.PushPullInterval > 0 {
go m.pushPullTrigger(stopCh)
}
// Create a gossip ticker if needed
if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
t := time.NewTicker(m.config.GossipInterval)
go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
m.tickers = append(m.tickers, t)
}
// If we made any tickers, then record the stopTick channel for
// later.
if len(m.tickers) > 0 {
m.stopTick = stopCh
}
}
// triggerFunc is used to trigger a function call each time a
// message is received until a stop tick arrives.
func (m *Memberlist) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
// Use a random stagger to avoid syncronizing
randStagger := time.Duration(uint64(rand.Int63()) % uint64(stagger))
select {
case <-time.After(randStagger):
case <-stop:
return
}
for {
select {
case <-C:
f()
case <-stop:
return
}
}
}
// pushPullTrigger is used to periodically trigger a push/pull until
// a stop tick arrives. We don't use triggerFunc since the push/pull
// timer is dynamically scaled based on cluster size to avoid network
// saturation
func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) {
interval := m.config.PushPullInterval
// Use a random stagger to avoid syncronizing
randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval))
select {
case <-time.After(randStagger):
case <-stop:
return
}
// Tick using a dynamic timer
for {
tickTime := pushPullScale(interval, m.estNumNodes())
select {
case <-time.After(tickTime):
m.pushPull()
case <-stop:
return
}
}
}
// Deschedule is used to stop the background maintenence. This is safe
// to call multiple times.
func (m *Memberlist) deschedule() {
m.tickerLock.Lock()
defer m.tickerLock.Unlock()
// If we have no tickers, then we aren't scheduled.
if len(m.tickers) == 0 {
return
}
// Close the stop channel so all the ticker listeners stop.
close(m.stopTick)
// Explicitly stop all the tickers themselves so they don't take
// up any more resources, and get rid of the list.
for _, t := range m.tickers {
t.Stop()
}
m.tickers = nil
}
// Tick is used to perform a single round of failure detection and gossip
func (m *Memberlist) probe() {
// Track the number of indexes we've considered probing
numCheck := 0
START:
m.nodeLock.RLock()
// Make sure we don't wrap around infinitely
if numCheck >= len(m.nodes) {
m.nodeLock.RUnlock()
return
}
// Handle the wrap around case
if m.probeIndex >= len(m.nodes) {
m.nodeLock.RUnlock()
m.resetNodes()
m.probeIndex = 0
numCheck++
goto START
}
// Determine if we should probe this node
skip := false
var node nodeState
node = *m.nodes[m.probeIndex]
if node.Name == m.config.Name {
skip = true
} else if node.State == stateDead {
skip = true
}
// Potentially skip
m.nodeLock.RUnlock()
m.probeIndex++
if skip {
numCheck++
goto START
}
// Probe the specific node
m.probeNode(&node)
}
// probeNode handles a single round of failure checking on a node.
func (m *Memberlist) probeNode(node *nodeState) {
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
// Prepare a ping message and setup an ack handler.
ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name}
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)
// Send a ping to the node.
deadline := time.Now().Add(m.config.ProbeInterval)
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
return
}
// Mark the sent time here, which should be after any pre-processing and
// system calls to do the actual send. This probably under-reports a bit,
// but it's the best we can do.
sent := time.Now()
// Wait for response or round-trip-time.
select {
case v := <-ackCh:
if v.Complete == true {
if m.config.Ping != nil {
rtt := v.Timestamp.Sub(sent)
m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
}
return
}
// As an edge case, if we get a timeout, we need to re-enqueue it
// here to break out of the select below.
if v.Complete == false {
ackCh <- v
}
case <-time.After(m.config.ProbeTimeout):
m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node.Name)
}
// Get some random live nodes.
m.nodeLock.RLock()
excludes := []string{m.config.Name, node.Name}
kNodes := kRandomNodes(m.config.IndirectChecks, excludes, m.nodes)
m.nodeLock.RUnlock()
// Attempt an indirect ping.
ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
for _, peer := range kNodes {
destAddr := &net.UDPAddr{IP: peer.Addr, Port: int(peer.Port)}
if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
}
}
// Also make an attempt to contact the node directly over TCP. This
// helps prevent confused clients who get isolated from UDP traffic
// but can still speak TCP (which also means they can possibly report
// misinformation to other nodes via anti-entropy), avoiding flapping in
// the cluster.
//
// This is a little unusual because we will attempt a TCP ping to any
// member who understands version 3 of the protocol, regardless of
// which protocol version we are speaking. That's why we've included a
// config option to turn this off if desired.
fallbackCh := make(chan bool, 1)
if (!m.config.DisableTcpPings) && (node.PMax >= 3) {
destAddr := &net.TCPAddr{IP: node.Addr, Port: int(node.Port)}
go func() {
defer close(fallbackCh)
didContact, err := m.sendPingAndWaitForAck(destAddr, ping, deadline)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed TCP fallback ping: %s", err)
} else {
fallbackCh <- didContact
}
}()
} else {
close(fallbackCh)
}
// Wait for the acks or timeout. Note that we don't check the fallback
// channel here because we want to issue a warning below if that's the
// *only* way we hear back from the peer, so we have to let this time
// out first to allow the normal UDP-based acks to come in.
select {
case v := <-ackCh:
if v.Complete == true {
return
}
}
// Finally, poll the fallback channel. The timeouts are set such that
// the channel will have something or be closed without having to wait
// any additional time here.
for didContact := range fallbackCh {
if didContact {
m.logger.Printf("[WARN] memberlist: Was able to reach %s via TCP but not UDP, network may be misconfigured and not allowing bidirectional UDP", node.Name)
return
}
}
// No acks received from target, suspect
m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
m.suspectNode(&s)
}
// Ping initiates a ping to the node with the specified name.
func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
// Prepare a ping message and setup an ack handler.
ping := ping{SeqNo: m.nextSeqNo(), Node: node}
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)
// Send a ping to the node.
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
return 0, err
}
// Mark the sent time here, which should be after any pre-processing and
// system calls to do the actual send. This probably under-reports a bit,
// but it's the best we can do.
sent := time.Now()
// Wait for response or timeout.
select {
case v := <-ackCh:
if v.Complete == true {
return v.Timestamp.Sub(sent), nil
}
case <-time.After(m.config.ProbeTimeout):
// Timeout, return an error below.
}
m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node)
return 0, NoPingResponseError{ping.Node}
}
// resetNodes is used when the tick wraps around. It will reap the
// dead nodes and shuffle the node list.
func (m *Memberlist) resetNodes() {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
// Move the dead nodes
deadIdx := moveDeadNodes(m.nodes)
// Deregister the dead nodes
for i := deadIdx; i < len(m.nodes); i++ {
delete(m.nodeMap, m.nodes[i].Name)
m.nodes[i] = nil
}
// Trim the nodes to exclude the dead nodes
m.nodes = m.nodes[0:deadIdx]
// Update numNodes after we've trimmed the dead nodes
atomic.StoreUint32(&m.numNodes, uint32(deadIdx))
// Shuffle live nodes
shuffleNodes(m.nodes)
}
// gossip is invoked every GossipInterval period to broadcast our gossip
// messages to a few random nodes.
func (m *Memberlist) gossip() {
defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())
// Get some random live nodes
m.nodeLock.RLock()
excludes := []string{m.config.Name}
kNodes := kRandomNodes(m.config.GossipNodes, excludes, m.nodes)
m.nodeLock.RUnlock()
// Compute the bytes available
bytesAvail := udpSendBuf - compoundHeaderOverhead
if m.config.EncryptionEnabled() {
bytesAvail -= encryptOverhead(m.encryptionVersion())
}
for _, node := range kNodes {
// Get any pending broadcasts
msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
if len(msgs) == 0 {
return
}
// Create a compound message
compound := makeCompoundMessage(msgs)
// Send the compound message
destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err)
}
}
}
// pushPull is invoked periodically to randomly perform a complete state
// exchange. Used to ensure a high level of convergence, but is also
// reasonably expensive as the entire state of this node is exchanged
// with the other node.
func (m *Memberlist) pushPull() {
// Get a random live node
m.nodeLock.RLock()
excludes := []string{m.config.Name}
nodes := kRandomNodes(1, excludes, m.nodes)
m.nodeLock.RUnlock()
// If no nodes, bail
if len(nodes) == 0 {
return
}
node := nodes[0]
// Attempt a push pull
if err := m.pushPullNode(node.Addr, node.Port, false); err != nil {
m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
}
}
// pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(addr []byte, port uint16, join bool) error {
defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
// Attempt to send and receive with the node
remote, userState, err := m.sendAndReceiveState(addr, port, join)
if err != nil {
return err
}
if err := m.mergeRemoteState(join, remote, userState); err != nil {
return err
}
return nil
}
// verifyProtocol verifies that all the remote nodes can speak with our
// nodes and vice versa on both the core protocol as well as the
// delegate protocol level.
//
// The verification works by finding the maximum minimum and
// minimum maximum understood protocol and delegate versions. In other words,
// it finds the common denominator of protocol and delegate version ranges
// for the entire cluster.
//
// After this, it goes through the entire cluster (local and remote) and
// verifies that everyone's speaking protocol versions satisfy this range.
// If this passes, it means that every node can understand each other.
func (m *Memberlist) verifyProtocol(remote []pushNodeState) error {
m.nodeLock.RLock()
defer m.nodeLock.RUnlock()
// Maximum minimum understood and minimum maximum understood for both
// the protocol and delegate versions. We use this to verify everyone
// can be understood.
var maxpmin, minpmax uint8
var maxdmin, mindmax uint8
minpmax = math.MaxUint8
mindmax = math.MaxUint8
for _, rn := range remote {
// If the node isn't alive, then skip it
if rn.State != stateAlive {
continue
}
// Skip nodes that don't have versions set, it just means
// their version is zero.
if len(rn.Vsn) == 0 {
continue
}
if rn.Vsn[0] > maxpmin {
maxpmin = rn.Vsn[0]
}
if rn.Vsn[1] < minpmax {
minpmax = rn.Vsn[1]
}
if rn.Vsn[3] > maxdmin {
maxdmin = rn.Vsn[3]
}
if rn.Vsn[4] < mindmax {
mindmax = rn.Vsn[4]
}
}
for _, n := range m.nodes {
// Ignore non-alive nodes
if n.State != stateAlive {
continue
}
if n.PMin > maxpmin {
maxpmin = n.PMin
}
if n.PMax < minpmax {
minpmax = n.PMax
}
if n.DMin > maxdmin {
maxdmin = n.DMin
}
if n.DMax < mindmax {
mindmax = n.DMax
}
}
// Now that we definitively know the minimum and maximum understood
// version that satisfies the whole cluster, we verify that every
// node in the cluster satisifies this.
for _, n := range remote {
var nPCur, nDCur uint8
if len(n.Vsn) > 0 {
nPCur = n.Vsn[2]
nDCur = n.Vsn[5]
}
if nPCur < maxpmin || nPCur > minpmax {
return fmt.Errorf(
"Node '%s' protocol version (%d) is incompatible: [%d, %d]",
n.Name, nPCur, maxpmin, minpmax)
}
if nDCur < maxdmin || nDCur > mindmax {
return fmt.Errorf(
"Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
n.Name, nDCur, maxdmin, mindmax)
}
}
for _, n := range m.nodes {
nPCur := n.PCur
nDCur := n.DCur
if nPCur < maxpmin || nPCur > minpmax {
return fmt.Errorf(
"Node '%s' protocol version (%d) is incompatible: [%d, %d]",
n.Name, nPCur, maxpmin, minpmax)
}
if nDCur < maxdmin || nDCur > mindmax {
return fmt.Errorf(
"Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
n.Name, nDCur, maxdmin, mindmax)
}
}
return nil
}
// nextSeqNo returns a usable sequence number in a thread safe way
func (m *Memberlist) nextSeqNo() uint32 {
return atomic.AddUint32(&m.sequenceNum, 1)
}
// nextIncarnation returns the next incarnation number in a thread safe way
func (m *Memberlist) nextIncarnation() uint32 {
return atomic.AddUint32(&m.incarnation, 1)
}
// estNumNodes is used to get the current estimate of the number of nodes
func (m *Memberlist) estNumNodes() int {
return int(atomic.LoadUint32(&m.numNodes))
}
type ackMessage struct {
Complete bool
Payload []byte
Timestamp time.Time
}
// setAckChannel is used to attach a channel to receive a message when an ack with a given
// sequence number is received. The `complete` field of the message will be false on timeout
func (m *Memberlist) setAckChannel(seqNo uint32, ch chan ackMessage, timeout time.Duration) {
// Create a handler function
handler := func(payload []byte, timestamp time.Time) {
select {
case ch <- ackMessage{true, payload, timestamp}:
default:
}
}
// Add the handler
ah := &ackHandler{handler, nil}
m.ackLock.Lock()
m.ackHandlers[seqNo] = ah
m.ackLock.Unlock()
// Setup a reaping routing
ah.timer = time.AfterFunc(timeout, func() {
m.ackLock.Lock()
delete(m.ackHandlers, seqNo)
m.ackLock.Unlock()
select {
case ch <- ackMessage{false, nil, time.Now()}:
default:
}
})
}
// setAckHandler is used to attach a handler to be invoked when an
// ack with a given sequence number is received. If a timeout is reached,
// the handler is deleted
func (m *Memberlist) setAckHandler(seqNo uint32, handler func([]byte, time.Time), timeout time.Duration) {
// Add the handler
ah := &ackHandler{handler, nil}
m.ackLock.Lock()
m.ackHandlers[seqNo] = ah
m.ackLock.Unlock()
// Setup a reaping routing
ah.timer = time.AfterFunc(timeout, func() {
m.ackLock.Lock()
delete(m.ackHandlers, seqNo)
m.ackLock.Unlock()
})
}
// Invokes an Ack handler if any is associated, and reaps the handler immediately
func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
m.ackLock.Lock()
ah, ok := m.ackHandlers[ack.SeqNo]
delete(m.ackHandlers, ack.SeqNo)
m.ackLock.Unlock()
if !ok {
return
}
ah.timer.Stop()
ah.handler(ack.Payload, timestamp)
}
// aliveNode is invoked by the network layer when we get a message about a
// live node.
func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
state, ok := m.nodeMap[a.Node]
// It is possible that during a Leave(), there is already an aliveMsg
// in-queue to be processed but blocked by the locks above. If we let
// that aliveMsg process, it'll cause us to re-join the cluster. This
// ensures that we don't.
if m.leave && a.Node == m.config.Name {
return
}
// Invoke the Alive delegate if any. This can be used to filter out
// alive messages based on custom logic. For example, using a cluster name.
// Using a merge delegate is not enough, as it is possible for passive
// cluster merging to still occur.
if m.config.Alive != nil {
node := &Node{
Name: a.Node,
Addr: a.Addr,
Port: a.Port,
Meta: a.Meta,
PMin: a.Vsn[0],
PMax: a.Vsn[1],
PCur: a.Vsn[2],
DMin: a.Vsn[3],
DMax: a.Vsn[4],
DCur: a.Vsn[5],
}
if err := m.config.Alive.NotifyAlive(node); err != nil {
m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s': %s",
a.Node, err)
return
}
}
// Check if we've never seen this node before, and if not, then
// store this node in our node map.
if !ok {
state = &nodeState{
Node: Node{
Name: a.Node,
Addr: a.Addr,
Port: a.Port,
Meta: a.Meta,
},
State: stateDead,
}
// Add to map
m.nodeMap[a.Node] = state
// Get a random offset. This is important to ensure
// the failure detection bound is low on average. If all
// nodes did an append, failure detection bound would be
// very high.
n := len(m.nodes)
offset := randomOffset(n)
// Add at the end and swap with the node at the offset
m.nodes = append(m.nodes, state)
m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset]
// Update numNodes after we've added a new node
atomic.AddUint32(&m.numNodes, 1)
}
// Check if this address is different than the existing node
if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port {
m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d",
state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
// Inform the conflict delegate if provided
if m.config.Conflict != nil {
other := Node{
Name: a.Node,
Addr: a.Addr,
Port: a.Port,
Meta: a.Meta,
}
m.config.Conflict.NotifyConflict(&state.Node, &other)
}
return
}
// Bail if the incarnation number is older, and this is not about us
isLocalNode := state.Name == m.config.Name
if a.Incarnation <= state.Incarnation && !isLocalNode {
return
}
// Bail if strictly less and this is about us
if a.Incarnation < state.Incarnation && isLocalNode {
return
}
// Store the old state and meta data
oldState := state.State
oldMeta := state.Meta
// If this is us we need to refute, otherwise re-broadcast
if !bootstrap && isLocalNode {
// Compute the version vector
versions := []uint8{
state.PMin, state.PMax, state.PCur,
state.DMin, state.DMax, state.DCur,
}
// If the Incarnation is the same, we need special handling, since it
// possible for the following situation to happen:
// 1) Start with configuration C, join cluster
// 2) Hard fail / Kill / Shutdown
// 3) Restart with configuration C', join cluster
//
// In this case, other nodes and the local node see the same incarnation,
// but the values may not be the same. For this reason, we always
// need to do an equality check for this Incarnation. In most cases,
// we just ignore, but we may need to refute.
//
if a.Incarnation == state.Incarnation &&
bytes.Equal(a.Meta, state.Meta) &&
bytes.Equal(a.Vsn, versions) {
return
}
inc := m.nextIncarnation()
for a.Incarnation >= inc {
inc = m.nextIncarnation()
}
state.Incarnation = inc
a := alive{
Incarnation: inc,
Node: state.Name,
Addr: state.Addr,
Port: state.Port,
Meta: state.Meta,
Vsn: versions,
}
m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
m.logger.Printf("[WARN] memberlist: Refuting an alive message")
} else {
m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
// Update protocol versions if it arrived
if len(a.Vsn) > 0 {
state.PMin = a.Vsn[0]
state.PMax = a.Vsn[1]
state.PCur = a.Vsn[2]
state.DMin = a.Vsn[3]
state.DMax = a.Vsn[4]
state.DCur = a.Vsn[5]
}
// Update the state and incarnation number
state.Incarnation = a.Incarnation
state.Meta = a.Meta
if state.State != stateAlive {
state.State = stateAlive
state.StateChange = time.Now()
}
}
// Update metrics
metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1)
// Notify the delegate of any relevant updates
if m.config.Events != nil {
if oldState == stateDead {
// if Dead -> Alive, notify of join
m.config.Events.NotifyJoin(&state.Node)
} else if !bytes.Equal(oldMeta, state.Meta) {
// if Meta changed, trigger an update notification
m.config.Events.NotifyUpdate(&state.Node)
}
}
}
// suspectNode is invoked by the network layer when we get a message
// about a suspect node
func (m *Memberlist) suspectNode(s *suspect) {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
state, ok := m.nodeMap[s.Node]
// If we've never heard about this node before, ignore it
if !ok {
return
}
// Ignore old incarnation numbers
if s.Incarnation < state.Incarnation {
return
}
// Ignore non-alive nodes
if state.State != stateAlive {
return
}
// If this is us we need to refute, otherwise re-broadcast
if state.Name == m.config.Name {
inc := m.nextIncarnation()
for s.Incarnation >= inc {
inc = m.nextIncarnation()
}
state.Incarnation = inc
a := alive{
Incarnation: inc,
Node: state.Name,
Addr: state.Addr,
Port: state.Port,
Meta: state.Meta,
Vsn: []uint8{
state.PMin, state.PMax, state.PCur,
state.DMin, state.DMax, state.DCur,
},
}
m.encodeAndBroadcast(s.Node, aliveMsg, a)
m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From)
return // Do not mark ourself suspect
} else {
m.encodeAndBroadcast(s.Node, suspectMsg, s)
}
// Update metrics
metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1)
// Update the state
state.Incarnation = s.Incarnation
state.State = stateSuspect
changeTime := time.Now()
state.StateChange = changeTime
// Setup a timeout for this
timeout := suspicionTimeout(m.config.SuspicionMult, m.estNumNodes(), m.config.ProbeInterval)
time.AfterFunc(timeout, func() {
m.nodeLock.Lock()
state, ok := m.nodeMap[s.Node]
timeout := ok && state.State == stateSuspect && state.StateChange == changeTime
m.nodeLock.Unlock()
if timeout {
m.suspectTimeout(state)
}
})
}
// suspectTimeout is invoked when a suspect timeout has occurred
func (m *Memberlist) suspectTimeout(n *nodeState) {
// Construct a dead message
m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached", n.Name)
d := dead{Incarnation: n.Incarnation, Node: n.Name, From: m.config.Name}
m.deadNode(&d)
}
// deadNode is invoked by the network layer when we get a message
// about a dead node
func (m *Memberlist) deadNode(d *dead) {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
state, ok := m.nodeMap[d.Node]
// If we've never heard about this node before, ignore it
if !ok {
return
}
// Ignore old incarnation numbers
if d.Incarnation < state.Incarnation {
return
}
// Ignore if node is already dead
if state.State == stateDead {
return
}
// Check if this is us
if state.Name == m.config.Name {
// If we are not leaving we need to refute
if !m.leave {
inc := m.nextIncarnation()
for d.Incarnation >= inc {
inc = m.nextIncarnation()
}
state.Incarnation = inc
a := alive{
Incarnation: inc,
Node: state.Name,
Addr: state.Addr,
Port: state.Port,
Meta: state.Meta,
Vsn: []uint8{
state.PMin, state.PMax, state.PCur,
state.DMin, state.DMax, state.DCur,
},
}
m.encodeAndBroadcast(d.Node, aliveMsg, a)
m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
return // Do not mark ourself dead
}
// If we are leaving, we broadcast and wait
m.encodeBroadcastNotify(d.Node, deadMsg, d, m.leaveBroadcast)
} else {
m.encodeAndBroadcast(d.Node, deadMsg, d)
}
// Update metrics
metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1)
// Update the state
state.Incarnation = d.Incarnation
state.State = stateDead
state.StateChange = time.Now()
// Notify of death
if m.config.Events != nil {
m.config.Events.NotifyLeave(&state.Node)
}
}
// mergeState is invoked by the network layer when we get a Push/Pull
// state transfer
func (m *Memberlist) mergeState(remote []pushNodeState) {
for _, r := range remote {
switch r.State {
case stateAlive:
a := alive{
Incarnation: r.Incarnation,
Node: r.Name,
Addr: r.Addr,
Port: r.Port,
Meta: r.Meta,
Vsn: r.Vsn,
}
m.aliveNode(&a, nil, false)
case stateDead:
// If the remote node belives a node is dead, we prefer to
// suspect that node instead of declaring it dead instantly
fallthrough
case stateSuspect:
s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name}
m.suspectNode(&s)
}
}
}