| /* |
| memberlist is a library that manages cluster |
| membership and member failure detection using a gossip based protocol. |
| |
| The use cases for such a library are far-reaching: all distributed systems |
| require membership, and memberlist is a re-usable solution to managing |
| cluster membership and node failure detection. |
| |
| memberlist is eventually consistent but converges quickly on average. |
| The speed at which it converges can be heavily tuned via various knobs |
| on the protocol. Node failures are detected and network partitions are partially |
| tolerated by attempting to communicate to potentially dead nodes through |
| multiple routes. |
| */ |
| package memberlist |
| |
| import ( |
| "fmt" |
| "log" |
| "net" |
| "os" |
| "strconv" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| multierror "github.com/hashicorp/go-multierror" |
| sockaddr "github.com/hashicorp/go-sockaddr" |
| "github.com/miekg/dns" |
| ) |
| |
| type Memberlist struct { |
| sequenceNum uint32 // Local sequence number |
| incarnation uint32 // Local incarnation number |
| numNodes uint32 // Number of known nodes (estimate) |
| |
| config *Config |
| shutdown int32 // Used as an atomic boolean value |
| shutdownCh chan struct{} |
| leave int32 // Used as an atomic boolean value |
| leaveBroadcast chan struct{} |
| |
| shutdownLock sync.Mutex // Serializes calls to Shutdown |
| leaveLock sync.Mutex // Serializes calls to Leave |
| |
| transport Transport |
| handoff chan msgHandoff |
| |
| nodeLock sync.RWMutex |
| nodes []*nodeState // Known nodes |
| nodeMap map[string]*nodeState // Maps Addr.String() -> NodeState |
| nodeTimers map[string]*suspicion // Maps Addr.String() -> suspicion timer |
| awareness *awareness |
| |
| tickerLock sync.Mutex |
| tickers []*time.Ticker |
| stopTick chan struct{} |
| probeIndex int |
| |
| ackLock sync.Mutex |
| ackHandlers map[uint32]*ackHandler |
| |
| broadcasts *TransmitLimitedQueue |
| |
| logger *log.Logger |
| } |
| |
| // newMemberlist creates the network listeners. |
| // Does not schedule execution of background maintenance. |
| func newMemberlist(conf *Config) (*Memberlist, error) { |
| if conf.ProtocolVersion < ProtocolVersionMin { |
| return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]", |
| conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) |
| } else if conf.ProtocolVersion > ProtocolVersionMax { |
| return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]", |
| conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) |
| } |
| |
| if len(conf.SecretKey) > 0 { |
| if conf.Keyring == nil { |
| keyring, err := NewKeyring(nil, conf.SecretKey) |
| if err != nil { |
| return nil, err |
| } |
| conf.Keyring = keyring |
| } else { |
| if err := conf.Keyring.AddKey(conf.SecretKey); err != nil { |
| return nil, err |
| } |
| if err := conf.Keyring.UseKey(conf.SecretKey); err != nil { |
| return nil, err |
| } |
| } |
| } |
| |
| if conf.LogOutput != nil && conf.Logger != nil { |
| return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.") |
| } |
| |
| logDest := conf.LogOutput |
| if logDest == nil { |
| logDest = os.Stderr |
| } |
| |
| logger := conf.Logger |
| if logger == nil { |
| logger = log.New(logDest, "", log.LstdFlags) |
| } |
| |
| // Set up a network transport by default if a custom one wasn't given |
| // by the config. |
| transport := conf.Transport |
| if transport == nil { |
| nc := &NetTransportConfig{ |
| BindAddrs: []string{conf.BindAddr}, |
| BindPort: conf.BindPort, |
| Logger: logger, |
| } |
| |
| // See comment below for details about the retry in here. |
| makeNetRetry := func(limit int) (*NetTransport, error) { |
| var err error |
| for try := 0; try < limit; try++ { |
| var nt *NetTransport |
| if nt, err = NewNetTransport(nc); err == nil { |
| return nt, nil |
| } |
| if strings.Contains(err.Error(), "address already in use") { |
| logger.Printf("[DEBUG] memberlist: Got bind error: %v", err) |
| continue |
| } |
| } |
| |
| return nil, fmt.Errorf("failed to obtain an address: %v", err) |
| } |
| |
| // The dynamic bind port operation is inherently racy because |
| // even though we are using the kernel to find a port for us, we |
| // are attempting to bind multiple protocols (and potentially |
| // multiple addresses) with the same port number. We build in a |
| // few retries here since this often gets transient errors in |
| // busy unit tests. |
| limit := 1 |
| if conf.BindPort == 0 { |
| limit = 10 |
| } |
| |
| nt, err := makeNetRetry(limit) |
| if err != nil { |
| return nil, fmt.Errorf("Could not set up network transport: %v", err) |
| } |
| if conf.BindPort == 0 { |
| port := nt.GetAutoBindPort() |
| conf.BindPort = port |
| conf.AdvertisePort = port |
| logger.Printf("[DEBUG] memberlist: Using dynamic bind port %d", port) |
| } |
| transport = nt |
| } |
| |
| m := &Memberlist{ |
| config: conf, |
| shutdownCh: make(chan struct{}), |
| leaveBroadcast: make(chan struct{}, 1), |
| transport: transport, |
| handoff: make(chan msgHandoff, conf.HandoffQueueDepth), |
| nodeMap: make(map[string]*nodeState), |
| nodeTimers: make(map[string]*suspicion), |
| awareness: newAwareness(conf.AwarenessMaxMultiplier), |
| ackHandlers: make(map[uint32]*ackHandler), |
| broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, |
| logger: logger, |
| } |
| m.broadcasts.NumNodes = func() int { |
| return m.estNumNodes() |
| } |
| go m.streamListen() |
| go m.packetListen() |
| go m.packetHandler() |
| return m, nil |
| } |
| |
| // Create will create a new Memberlist using the given configuration. |
| // This will not connect to any other node (see Join) yet, but will start |
| // all the listeners to allow other nodes to join this memberlist. |
| // After creating a Memberlist, the configuration given should not be |
| // modified by the user anymore. |
| func Create(conf *Config) (*Memberlist, error) { |
| m, err := newMemberlist(conf) |
| if err != nil { |
| return nil, err |
| } |
| if err := m.setAlive(); err != nil { |
| m.Shutdown() |
| return nil, err |
| } |
| m.schedule() |
| return m, nil |
| } |
| |
| // Join is used to take an existing Memberlist and attempt to join a cluster |
| // by contacting all the given hosts and performing a state sync. Initially, |
| // the Memberlist only contains our own state, so doing this will cause |
| // remote nodes to become aware of the existence of this node, effectively |
| // joining the cluster. |
| // |
| // This returns the number of hosts successfully contacted and an error if |
| // none could be reached. If an error is returned, the node did not successfully |
| // join the cluster. |
| func (m *Memberlist) Join(existing []string) (int, error) { |
| numSuccess := 0 |
| var errs error |
| for _, exist := range existing { |
| addrs, err := m.resolveAddr(exist) |
| if err != nil { |
| err = fmt.Errorf("Failed to resolve %s: %v", exist, err) |
| errs = multierror.Append(errs, err) |
| m.logger.Printf("[WARN] memberlist: %v", err) |
| continue |
| } |
| |
| for _, addr := range addrs { |
| hp := joinHostPort(addr.ip.String(), addr.port) |
| if err := m.pushPullNode(hp, true); err != nil { |
| err = fmt.Errorf("Failed to join %s: %v", addr.ip, err) |
| errs = multierror.Append(errs, err) |
| m.logger.Printf("[DEBUG] memberlist: %v", err) |
| continue |
| } |
| numSuccess++ |
| } |
| |
| } |
| if numSuccess > 0 { |
| errs = nil |
| } |
| return numSuccess, errs |
| } |
| |
| // ipPort holds information about a node we want to try to join. |
| type ipPort struct { |
| ip net.IP |
| port uint16 |
| } |
| |
| // tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host. |
| // The built-in Go resolver will do a UDP lookup first, and will only use TCP if |
| // the response has the truncate bit set, which isn't common on DNS servers like |
| // Consul's. By doing the TCP lookup directly, we get the best chance for the |
| // largest list of hosts to join. Since joins are relatively rare events, it's ok |
| // to do this rather expensive operation. |
| func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, error) { |
| // Don't attempt any TCP lookups against non-fully qualified domain |
| // names, since those will likely come from the resolv.conf file. |
| if !strings.Contains(host, ".") { |
| return nil, nil |
| } |
| |
| // Make sure the domain name is terminated with a dot (we know there's |
| // at least one character at this point). |
| dn := host |
| if dn[len(dn)-1] != '.' { |
| dn = dn + "." |
| } |
| |
| // See if we can find a server to try. |
| cc, err := dns.ClientConfigFromFile(m.config.DNSConfigPath) |
| if err != nil { |
| return nil, err |
| } |
| if len(cc.Servers) > 0 { |
| // We support host:port in the DNS config, but need to add the |
| // default port if one is not supplied. |
| server := cc.Servers[0] |
| if !hasPort(server) { |
| server = net.JoinHostPort(server, cc.Port) |
| } |
| |
| // Do the lookup. |
| c := new(dns.Client) |
| c.Net = "tcp" |
| msg := new(dns.Msg) |
| msg.SetQuestion(dn, dns.TypeANY) |
| in, _, err := c.Exchange(msg, server) |
| if err != nil { |
| return nil, err |
| } |
| |
| // Handle any IPs we get back that we can attempt to join. |
| var ips []ipPort |
| for _, r := range in.Answer { |
| switch rr := r.(type) { |
| case (*dns.A): |
| ips = append(ips, ipPort{rr.A, defaultPort}) |
| case (*dns.AAAA): |
| ips = append(ips, ipPort{rr.AAAA, defaultPort}) |
| case (*dns.CNAME): |
| m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host) |
| } |
| } |
| return ips, nil |
| } |
| |
| return nil, nil |
| } |
| |
| // resolveAddr is used to resolve the address into an address, |
| // port, and error. If no port is given, use the default |
| func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { |
| // This captures the supplied port, or the default one. |
| hostStr = ensurePort(hostStr, m.config.BindPort) |
| host, sport, err := net.SplitHostPort(hostStr) |
| if err != nil { |
| return nil, err |
| } |
| lport, err := strconv.ParseUint(sport, 10, 16) |
| if err != nil { |
| return nil, err |
| } |
| port := uint16(lport) |
| |
| // If it looks like an IP address we are done. The SplitHostPort() above |
| // will make sure the host part is in good shape for parsing, even for |
| // IPv6 addresses. |
| if ip := net.ParseIP(host); ip != nil { |
| return []ipPort{ipPort{ip, port}}, nil |
| } |
| |
| // First try TCP so we have the best chance for the largest list of |
| // hosts to join. If this fails it's not fatal since this isn't a standard |
| // way to query DNS, and we have a fallback below. |
| ips, err := m.tcpLookupIP(host, port) |
| if err != nil { |
| m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err) |
| } |
| if len(ips) > 0 { |
| return ips, nil |
| } |
| |
| // If TCP didn't yield anything then use the normal Go resolver which |
| // will try UDP, then might possibly try TCP again if the UDP response |
| // indicates it was truncated. |
| ans, err := net.LookupIP(host) |
| if err != nil { |
| return nil, err |
| } |
| ips = make([]ipPort, 0, len(ans)) |
| for _, ip := range ans { |
| ips = append(ips, ipPort{ip, port}) |
| } |
| return ips, nil |
| } |
| |
| // setAlive is used to mark this node as being alive. This is the same |
| // as if we received an alive notification our own network channel for |
| // ourself. |
| func (m *Memberlist) setAlive() error { |
| // Get the final advertise address from the transport, which may need |
| // to see which address we bound to. |
| addr, port, err := m.transport.FinalAdvertiseAddr( |
| m.config.AdvertiseAddr, m.config.AdvertisePort) |
| if err != nil { |
| return fmt.Errorf("Failed to get final advertise address: %v", err) |
| } |
| |
| // Check if this is a public address without encryption |
| ipAddr, err := sockaddr.NewIPAddr(addr.String()) |
| if err != nil { |
| return fmt.Errorf("Failed to parse interface addresses: %v", err) |
| } |
| ifAddrs := []sockaddr.IfAddr{ |
| sockaddr.IfAddr{ |
| SockAddr: ipAddr, |
| }, |
| } |
| _, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs) |
| if len(publicIfs) > 0 && !m.config.EncryptionEnabled() { |
| m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!") |
| } |
| |
| // Set any metadata from the delegate. |
| var meta []byte |
| if m.config.Delegate != nil { |
| meta = m.config.Delegate.NodeMeta(MetaMaxSize) |
| if len(meta) > MetaMaxSize { |
| panic("Node meta data provided is longer than the limit") |
| } |
| } |
| |
| a := alive{ |
| Incarnation: m.nextIncarnation(), |
| Node: m.config.Name, |
| Addr: addr, |
| Port: uint16(port), |
| Meta: meta, |
| Vsn: []uint8{ |
| ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion, |
| m.config.DelegateProtocolMin, m.config.DelegateProtocolMax, |
| m.config.DelegateProtocolVersion, |
| }, |
| } |
| m.aliveNode(&a, nil, true) |
| return nil |
| } |
| |
| // LocalNode is used to return the local Node |
| func (m *Memberlist) LocalNode() *Node { |
| m.nodeLock.RLock() |
| defer m.nodeLock.RUnlock() |
| state := m.nodeMap[m.config.Name] |
| return &state.Node |
| } |
| |
| // UpdateNode is used to trigger re-advertising the local node. This is |
| // primarily used with a Delegate to support dynamic updates to the local |
| // meta data. This will block until the update message is successfully |
| // broadcasted to a member of the cluster, if any exist or until a specified |
| // timeout is reached. |
| func (m *Memberlist) UpdateNode(timeout time.Duration) error { |
| // Get the node meta data |
| var meta []byte |
| if m.config.Delegate != nil { |
| meta = m.config.Delegate.NodeMeta(MetaMaxSize) |
| if len(meta) > MetaMaxSize { |
| panic("Node meta data provided is longer than the limit") |
| } |
| } |
| |
| // Get the existing node |
| m.nodeLock.RLock() |
| state := m.nodeMap[m.config.Name] |
| m.nodeLock.RUnlock() |
| |
| // Format a new alive message |
| a := alive{ |
| Incarnation: m.nextIncarnation(), |
| Node: m.config.Name, |
| Addr: state.Addr, |
| Port: state.Port, |
| Meta: meta, |
| Vsn: []uint8{ |
| ProtocolVersionMin, ProtocolVersionMax, m.config.ProtocolVersion, |
| m.config.DelegateProtocolMin, m.config.DelegateProtocolMax, |
| m.config.DelegateProtocolVersion, |
| }, |
| } |
| notifyCh := make(chan struct{}) |
| m.aliveNode(&a, notifyCh, true) |
| |
| // Wait for the broadcast or a timeout |
| if m.anyAlive() { |
| var timeoutCh <-chan time.Time |
| if timeout > 0 { |
| timeoutCh = time.After(timeout) |
| } |
| select { |
| case <-notifyCh: |
| case <-timeoutCh: |
| return fmt.Errorf("timeout waiting for update broadcast") |
| } |
| } |
| return nil |
| } |
| |
| // SendTo is deprecated in favor of SendBestEffort, which requires a node to |
| // target. |
| func (m *Memberlist) SendTo(to net.Addr, msg []byte) error { |
| // Encode as a user message |
| buf := make([]byte, 1, len(msg)+1) |
| buf[0] = byte(userMsg) |
| buf = append(buf, msg...) |
| |
| // Send the message |
| return m.rawSendMsgPacket(to.String(), nil, buf) |
| } |
| |
| // SendToUDP is deprecated in favor of SendBestEffort. |
| func (m *Memberlist) SendToUDP(to *Node, msg []byte) error { |
| return m.SendBestEffort(to, msg) |
| } |
| |
| // SendToTCP is deprecated in favor of SendReliable. |
| func (m *Memberlist) SendToTCP(to *Node, msg []byte) error { |
| return m.SendReliable(to, msg) |
| } |
| |
| // SendBestEffort uses the unreliable packet-oriented interface of the transport |
| // to target a user message at the given node (this does not use the gossip |
| // mechanism). The maximum size of the message depends on the configured |
| // UDPBufferSize for this memberlist instance. |
| func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error { |
| // Encode as a user message |
| buf := make([]byte, 1, len(msg)+1) |
| buf[0] = byte(userMsg) |
| buf = append(buf, msg...) |
| |
| // Send the message |
| return m.rawSendMsgPacket(to.Address(), to, buf) |
| } |
| |
| // SendReliable uses the reliable stream-oriented interface of the transport to |
| // target a user message at the given node (this does not use the gossip |
| // mechanism). Delivery is guaranteed if no error is returned, and there is no |
| // limit on the size of the message. |
| func (m *Memberlist) SendReliable(to *Node, msg []byte) error { |
| return m.sendUserMsg(to.Address(), msg) |
| } |
| |
| // Members returns a list of all known live nodes. The node structures |
| // returned must not be modified. If you wish to modify a Node, make a |
| // copy first. |
| func (m *Memberlist) Members() []*Node { |
| m.nodeLock.RLock() |
| defer m.nodeLock.RUnlock() |
| |
| nodes := make([]*Node, 0, len(m.nodes)) |
| for _, n := range m.nodes { |
| if n.State != stateDead { |
| nodes = append(nodes, &n.Node) |
| } |
| } |
| |
| return nodes |
| } |
| |
| // NumMembers returns the number of alive nodes currently known. Between |
| // the time of calling this and calling Members, the number of alive nodes |
| // may have changed, so this shouldn't be used to determine how many |
| // members will be returned by Members. |
| func (m *Memberlist) NumMembers() (alive int) { |
| m.nodeLock.RLock() |
| defer m.nodeLock.RUnlock() |
| |
| for _, n := range m.nodes { |
| if n.State != stateDead { |
| alive++ |
| } |
| } |
| |
| return |
| } |
| |
| // Leave will broadcast a leave message but will not shutdown the background |
| // listeners, meaning the node will continue participating in gossip and state |
| // updates. |
| // |
| // This will block until the leave message is successfully broadcasted to |
| // a member of the cluster, if any exist or until a specified timeout |
| // is reached. |
| // |
| // This method is safe to call multiple times, but must not be called |
| // after the cluster is already shut down. |
| func (m *Memberlist) Leave(timeout time.Duration) error { |
| m.leaveLock.Lock() |
| defer m.leaveLock.Unlock() |
| |
| if m.hasShutdown() { |
| panic("leave after shutdown") |
| } |
| |
| if !m.hasLeft() { |
| atomic.StoreInt32(&m.leave, 1) |
| |
| m.nodeLock.Lock() |
| state, ok := m.nodeMap[m.config.Name] |
| m.nodeLock.Unlock() |
| if !ok { |
| m.logger.Printf("[WARN] memberlist: Leave but we're not in the node map.") |
| return nil |
| } |
| |
| d := dead{ |
| Incarnation: state.Incarnation, |
| Node: state.Name, |
| } |
| m.deadNode(&d) |
| |
| // Block until the broadcast goes out |
| if m.anyAlive() { |
| var timeoutCh <-chan time.Time |
| if timeout > 0 { |
| timeoutCh = time.After(timeout) |
| } |
| select { |
| case <-m.leaveBroadcast: |
| case <-timeoutCh: |
| return fmt.Errorf("timeout waiting for leave broadcast") |
| } |
| } |
| } |
| |
| return nil |
| } |
| |
| // Check for any other alive node. |
| func (m *Memberlist) anyAlive() bool { |
| m.nodeLock.RLock() |
| defer m.nodeLock.RUnlock() |
| for _, n := range m.nodes { |
| if n.State != stateDead && n.Name != m.config.Name { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // GetHealthScore gives this instance's idea of how well it is meeting the soft |
| // real-time requirements of the protocol. Lower numbers are better, and zero |
| // means "totally healthy". |
| func (m *Memberlist) GetHealthScore() int { |
| return m.awareness.GetHealthScore() |
| } |
| |
| // ProtocolVersion returns the protocol version currently in use by |
| // this memberlist. |
| func (m *Memberlist) ProtocolVersion() uint8 { |
| // NOTE: This method exists so that in the future we can control |
| // any locking if necessary, if we change the protocol version at |
| // runtime, etc. |
| return m.config.ProtocolVersion |
| } |
| |
| // Shutdown will stop any background maintanence of network activity |
| // for this memberlist, causing it to appear "dead". A leave message |
| // will not be broadcasted prior, so the cluster being left will have |
| // to detect this node's shutdown using probing. If you wish to more |
| // gracefully exit the cluster, call Leave prior to shutting down. |
| // |
| // This method is safe to call multiple times. |
| func (m *Memberlist) Shutdown() error { |
| m.shutdownLock.Lock() |
| defer m.shutdownLock.Unlock() |
| |
| if m.hasShutdown() { |
| return nil |
| } |
| |
| // Shut down the transport first, which should block until it's |
| // completely torn down. If we kill the memberlist-side handlers |
| // those I/O handlers might get stuck. |
| if err := m.transport.Shutdown(); err != nil { |
| m.logger.Printf("[ERR] Failed to shutdown transport: %v", err) |
| } |
| |
| // Now tear down everything else. |
| atomic.StoreInt32(&m.shutdown, 1) |
| close(m.shutdownCh) |
| m.deschedule() |
| return nil |
| } |
| |
| func (m *Memberlist) hasShutdown() bool { |
| return atomic.LoadInt32(&m.shutdown) == 1 |
| } |
| |
| func (m *Memberlist) hasLeft() bool { |
| return atomic.LoadInt32(&m.leave) == 1 |
| } |