| package serf |
| |
| import ( |
| "bytes" |
| "encoding/base64" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "io/ioutil" |
| "log" |
| "math/rand" |
| "net" |
| "strconv" |
| "sync" |
| "time" |
| |
| "github.com/armon/go-metrics" |
| "github.com/hashicorp/go-msgpack/codec" |
| "github.com/hashicorp/memberlist" |
| "github.com/hashicorp/serf/coordinate" |
| ) |
| |
| // These are the protocol versions that Serf can _understand_. These are |
| // Serf-level protocol versions that are passed down as the delegate |
| // version to memberlist below. |
| const ( |
| ProtocolVersionMin uint8 = 2 |
| ProtocolVersionMax = 4 |
| ) |
| |
| const ( |
| // Used to detect if the meta data is tags |
| // or if it is a raw role |
| tagMagicByte uint8 = 255 |
| ) |
| |
| var ( |
| // FeatureNotSupported is returned if a feature cannot be used |
| // due to an older protocol version being used. |
| FeatureNotSupported = fmt.Errorf("Feature not supported") |
| ) |
| |
| func init() { |
| // Seed the random number generator |
| rand.Seed(time.Now().UnixNano()) |
| } |
| |
| // Serf is a single node that is part of a single cluster that gets |
| // events about joins/leaves/failures/etc. It is created with the Create |
| // method. |
| // |
| // All functions on the Serf structure are safe to call concurrently. |
| type Serf struct { |
| // The clocks for different purposes. These MUST be the first things |
| // in this struct due to Golang issue #599. |
| clock LamportClock |
| eventClock LamportClock |
| queryClock LamportClock |
| |
| broadcasts *memberlist.TransmitLimitedQueue |
| config *Config |
| failedMembers []*memberState |
| leftMembers []*memberState |
| memberlist *memberlist.Memberlist |
| memberLock sync.RWMutex |
| members map[string]*memberState |
| |
| // Circular buffers for recent intents, used |
| // in case we get the intent before the relevant event |
| recentLeave []nodeIntent |
| recentLeaveIndex int |
| recentJoin []nodeIntent |
| recentJoinIndex int |
| |
| eventBroadcasts *memberlist.TransmitLimitedQueue |
| eventBuffer []*userEvents |
| eventJoinIgnore bool |
| eventMinTime LamportTime |
| eventLock sync.RWMutex |
| |
| queryBroadcasts *memberlist.TransmitLimitedQueue |
| queryBuffer []*queries |
| queryMinTime LamportTime |
| queryResponse map[LamportTime]*QueryResponse |
| queryLock sync.RWMutex |
| |
| logger *log.Logger |
| joinLock sync.Mutex |
| stateLock sync.Mutex |
| state SerfState |
| shutdownCh chan struct{} |
| |
| snapshotter *Snapshotter |
| keyManager *KeyManager |
| |
| coordClient *coordinate.Client |
| coordCache map[string]*coordinate.Coordinate |
| coordCacheLock sync.RWMutex |
| } |
| |
| // SerfState is the state of the Serf instance. |
| type SerfState int |
| |
| const ( |
| SerfAlive SerfState = iota |
| SerfLeaving |
| SerfLeft |
| SerfShutdown |
| ) |
| |
| func (s SerfState) String() string { |
| switch s { |
| case SerfAlive: |
| return "alive" |
| case SerfLeaving: |
| return "leaving" |
| case SerfLeft: |
| return "left" |
| case SerfShutdown: |
| return "shutdown" |
| default: |
| return "unknown" |
| } |
| } |
| |
| // Member is a single member of the Serf cluster. |
| type Member struct { |
| Name string |
| Addr net.IP |
| Port uint16 |
| Tags map[string]string |
| Status MemberStatus |
| |
| // The minimum, maximum, and current values of the protocol versions |
| // and delegate (Serf) protocol versions that each member can understand |
| // or is speaking. |
| ProtocolMin uint8 |
| ProtocolMax uint8 |
| ProtocolCur uint8 |
| DelegateMin uint8 |
| DelegateMax uint8 |
| DelegateCur uint8 |
| } |
| |
| // MemberStatus is the state that a member is in. |
| type MemberStatus int |
| |
| const ( |
| StatusNone MemberStatus = iota |
| StatusAlive |
| StatusLeaving |
| StatusLeft |
| StatusFailed |
| ) |
| |
| func (s MemberStatus) String() string { |
| switch s { |
| case StatusNone: |
| return "none" |
| case StatusAlive: |
| return "alive" |
| case StatusLeaving: |
| return "leaving" |
| case StatusLeft: |
| return "left" |
| case StatusFailed: |
| return "failed" |
| default: |
| panic(fmt.Sprintf("unknown MemberStatus: %d", s)) |
| } |
| } |
| |
| // memberState is used to track members that are no longer active due to |
| // leaving, failing, partitioning, etc. It tracks the member along with |
| // when that member was marked as leaving. |
| type memberState struct { |
| Member |
| statusLTime LamportTime // lamport clock time of last received message |
| leaveTime time.Time // wall clock time of leave |
| } |
| |
| // nodeIntent is used to buffer intents for out-of-order deliveries |
| type nodeIntent struct { |
| LTime LamportTime |
| Node string |
| } |
| |
| // userEvent is used to buffer events to prevent re-delivery |
| type userEvent struct { |
| Name string |
| Payload []byte |
| } |
| |
| func (ue *userEvent) Equals(other *userEvent) bool { |
| if ue.Name != other.Name { |
| return false |
| } |
| if bytes.Compare(ue.Payload, other.Payload) != 0 { |
| return false |
| } |
| return true |
| } |
| |
| // userEvents stores all the user events at a specific time |
| type userEvents struct { |
| LTime LamportTime |
| Events []userEvent |
| } |
| |
| // queries stores all the query ids at a specific time |
| type queries struct { |
| LTime LamportTime |
| QueryIDs []uint32 |
| } |
| |
| const ( |
| UserEventSizeLimit = 512 // Maximum byte size for event name and payload |
| snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot |
| ) |
| |
| // Create creates a new Serf instance, starting all the background tasks |
| // to maintain cluster membership information. |
| // |
| // After calling this function, the configuration should no longer be used |
| // or modified by the caller. |
| func Create(conf *Config) (*Serf, error) { |
| conf.Init() |
| if conf.ProtocolVersion < ProtocolVersionMin { |
| return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]", |
| conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) |
| } else if conf.ProtocolVersion > ProtocolVersionMax { |
| return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]", |
| conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) |
| } |
| |
| serf := &Serf{ |
| config: conf, |
| logger: log.New(conf.LogOutput, "", log.LstdFlags), |
| members: make(map[string]*memberState), |
| queryResponse: make(map[LamportTime]*QueryResponse), |
| shutdownCh: make(chan struct{}), |
| state: SerfAlive, |
| } |
| |
| // Check that the meta data length is okay |
| if len(serf.encodeTags(conf.Tags)) > memberlist.MetaMaxSize { |
| return nil, fmt.Errorf("Encoded length of tags exceeds limit of %d bytes", memberlist.MetaMaxSize) |
| } |
| |
| // Check if serf member event coalescing is enabled |
| if conf.CoalescePeriod > 0 && conf.QuiescentPeriod > 0 && conf.EventCh != nil { |
| c := &memberEventCoalescer{ |
| lastEvents: make(map[string]EventType), |
| latestEvents: make(map[string]coalesceEvent), |
| } |
| |
| conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh, |
| conf.CoalescePeriod, conf.QuiescentPeriod, c) |
| } |
| |
| // Check if user event coalescing is enabled |
| if conf.UserCoalescePeriod > 0 && conf.UserQuiescentPeriod > 0 && conf.EventCh != nil { |
| c := &userEventCoalescer{ |
| events: make(map[string]*latestUserEvents), |
| } |
| |
| conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh, |
| conf.UserCoalescePeriod, conf.UserQuiescentPeriod, c) |
| } |
| |
| // Listen for internal Serf queries. This is setup before the snapshotter, since |
| // we want to capture the query-time, but the internal listener does not passthrough |
| // the queries |
| outCh, err := newSerfQueries(serf, serf.logger, conf.EventCh, serf.shutdownCh) |
| if err != nil { |
| return nil, fmt.Errorf("Failed to setup serf query handler: %v", err) |
| } |
| conf.EventCh = outCh |
| |
| // Set up network coordinate client. |
| if !conf.DisableCoordinates { |
| serf.coordClient, err = coordinate.NewClient(coordinate.DefaultConfig()) |
| if err != nil { |
| return nil, fmt.Errorf("Failed to create coordinate client: %v", err) |
| } |
| } |
| |
| // Try access the snapshot |
| var oldClock, oldEventClock, oldQueryClock LamportTime |
| var prev []*PreviousNode |
| if conf.SnapshotPath != "" { |
| eventCh, snap, err := NewSnapshotter( |
| conf.SnapshotPath, |
| snapshotSizeLimit, |
| conf.RejoinAfterLeave, |
| serf.logger, |
| &serf.clock, |
| serf.coordClient, |
| conf.EventCh, |
| serf.shutdownCh) |
| if err != nil { |
| return nil, fmt.Errorf("Failed to setup snapshot: %v", err) |
| } |
| serf.snapshotter = snap |
| conf.EventCh = eventCh |
| prev = snap.AliveNodes() |
| oldClock = snap.LastClock() |
| oldEventClock = snap.LastEventClock() |
| oldQueryClock = snap.LastQueryClock() |
| serf.eventMinTime = oldEventClock + 1 |
| serf.queryMinTime = oldQueryClock + 1 |
| } |
| |
| // Set up the coordinate cache. We do this after we read the snapshot to |
| // make sure we get a good initial value from there, if we got one. |
| if !conf.DisableCoordinates { |
| serf.coordCache = make(map[string]*coordinate.Coordinate) |
| serf.coordCache[conf.NodeName] = serf.coordClient.GetCoordinate() |
| } |
| |
| // Setup the various broadcast queues, which we use to send our own |
| // custom broadcasts along the gossip channel. |
| serf.broadcasts = &memberlist.TransmitLimitedQueue{ |
| NumNodes: func() int { |
| return len(serf.members) |
| }, |
| RetransmitMult: conf.MemberlistConfig.RetransmitMult, |
| } |
| serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{ |
| NumNodes: func() int { |
| return len(serf.members) |
| }, |
| RetransmitMult: conf.MemberlistConfig.RetransmitMult, |
| } |
| serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{ |
| NumNodes: func() int { |
| return len(serf.members) |
| }, |
| RetransmitMult: conf.MemberlistConfig.RetransmitMult, |
| } |
| |
| // Create the buffer for recent intents |
| serf.recentJoin = make([]nodeIntent, conf.RecentIntentBuffer) |
| serf.recentLeave = make([]nodeIntent, conf.RecentIntentBuffer) |
| |
| // Create a buffer for events and queries |
| serf.eventBuffer = make([]*userEvents, conf.EventBuffer) |
| serf.queryBuffer = make([]*queries, conf.QueryBuffer) |
| |
| // Ensure our lamport clock is at least 1, so that the default |
| // join LTime of 0 does not cause issues |
| serf.clock.Increment() |
| serf.eventClock.Increment() |
| serf.queryClock.Increment() |
| |
| // Restore the clock from snap if we have one |
| serf.clock.Witness(oldClock) |
| serf.eventClock.Witness(oldEventClock) |
| serf.queryClock.Witness(oldQueryClock) |
| |
| // Modify the memberlist configuration with keys that we set |
| conf.MemberlistConfig.Events = &eventDelegate{serf: serf} |
| conf.MemberlistConfig.Conflict = &conflictDelegate{serf: serf} |
| conf.MemberlistConfig.Delegate = &delegate{serf: serf} |
| conf.MemberlistConfig.DelegateProtocolVersion = conf.ProtocolVersion |
| conf.MemberlistConfig.DelegateProtocolMin = ProtocolVersionMin |
| conf.MemberlistConfig.DelegateProtocolMax = ProtocolVersionMax |
| conf.MemberlistConfig.Name = conf.NodeName |
| conf.MemberlistConfig.ProtocolVersion = ProtocolVersionMap[conf.ProtocolVersion] |
| if !conf.DisableCoordinates { |
| conf.MemberlistConfig.Ping = &pingDelegate{serf: serf} |
| } |
| |
| // Setup a merge delegate if necessary |
| if conf.Merge != nil { |
| md := &mergeDelegate{serf: serf} |
| conf.MemberlistConfig.Merge = md |
| conf.MemberlistConfig.Alive = md |
| } |
| |
| // Create the underlying memberlist that will manage membership |
| // and failure detection for the Serf instance. |
| memberlist, err := memberlist.Create(conf.MemberlistConfig) |
| if err != nil { |
| return nil, fmt.Errorf("Failed to create memberlist: %v", err) |
| } |
| |
| serf.memberlist = memberlist |
| |
| // Create a key manager for handling all encryption key changes |
| serf.keyManager = &KeyManager{serf: serf} |
| |
| // Start the background tasks. See the documentation above each method |
| // for more information on their role. |
| go serf.handleReap() |
| go serf.handleReconnect() |
| go serf.checkQueueDepth("Intent", serf.broadcasts) |
| go serf.checkQueueDepth("Event", serf.eventBroadcasts) |
| go serf.checkQueueDepth("Query", serf.queryBroadcasts) |
| |
| // Attempt to re-join the cluster if we have known nodes |
| if len(prev) != 0 { |
| go serf.handleRejoin(prev) |
| } |
| |
| return serf, nil |
| } |
| |
| // ProtocolVersion returns the current protocol version in use by Serf. |
| // This is the Serf protocol version, not the memberlist protocol version. |
| func (s *Serf) ProtocolVersion() uint8 { |
| return s.config.ProtocolVersion |
| } |
| |
| // EncryptionEnabled is a predicate that determines whether or not encryption |
| // is enabled, which can be possible in one of 2 cases: |
| // - Single encryption key passed at agent start (no persistence) |
| // - Keyring file provided at agent start |
| func (s *Serf) EncryptionEnabled() bool { |
| return s.config.MemberlistConfig.Keyring != nil |
| } |
| |
| // KeyManager returns the key manager for the current Serf instance. |
| func (s *Serf) KeyManager() *KeyManager { |
| return s.keyManager |
| } |
| |
| // UserEvent is used to broadcast a custom user event with a given |
| // name and payload. The events must be fairly small, and if the |
| // size limit is exceeded and error will be returned. If coalesce is enabled, |
| // nodes are allowed to coalesce this event. Coalescing is only available |
| // starting in v0.2 |
| func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error { |
| // Check the size limit |
| if len(name)+len(payload) > UserEventSizeLimit { |
| return fmt.Errorf("user event exceeds limit of %d bytes", UserEventSizeLimit) |
| } |
| |
| // Create a message |
| msg := messageUserEvent{ |
| LTime: s.eventClock.Time(), |
| Name: name, |
| Payload: payload, |
| CC: coalesce, |
| } |
| s.eventClock.Increment() |
| |
| // Process update locally |
| s.handleUserEvent(&msg) |
| |
| // Start broadcasting the event |
| raw, err := encodeMessage(messageUserEventType, &msg) |
| if err != nil { |
| return err |
| } |
| s.eventBroadcasts.QueueBroadcast(&broadcast{ |
| msg: raw, |
| }) |
| return nil |
| } |
| |
| // Query is used to broadcast a new query. The query must be fairly small, |
| // and an error will be returned if the size limit is exceeded. This is only |
| // available with protocol version 4 and newer. Query parameters are optional, |
| // and if not provided, a sane set of defaults will be used. |
| func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryResponse, error) { |
| // Check that the latest protocol is in use |
| if s.ProtocolVersion() < 4 { |
| return nil, FeatureNotSupported |
| } |
| |
| // Provide default parameters if none given |
| if params == nil { |
| params = s.DefaultQueryParams() |
| } else if params.Timeout == 0 { |
| params.Timeout = s.DefaultQueryTimeout() |
| } |
| |
| // Get the local node |
| local := s.memberlist.LocalNode() |
| |
| // Encode the filters |
| filters, err := params.encodeFilters() |
| if err != nil { |
| return nil, fmt.Errorf("Failed to format filters: %v", err) |
| } |
| |
| // Setup the flags |
| var flags uint32 |
| if params.RequestAck { |
| flags |= queryFlagAck |
| } |
| |
| // Create a message |
| q := messageQuery{ |
| LTime: s.queryClock.Time(), |
| ID: uint32(rand.Int31()), |
| Addr: local.Addr, |
| Port: local.Port, |
| Filters: filters, |
| Flags: flags, |
| Timeout: params.Timeout, |
| Name: name, |
| Payload: payload, |
| } |
| |
| // Encode the query |
| raw, err := encodeMessage(messageQueryType, &q) |
| if err != nil { |
| return nil, err |
| } |
| |
| // Check the size |
| if len(raw) > s.config.QuerySizeLimit { |
| return nil, fmt.Errorf("query exceeds limit of %d bytes", s.config.QuerySizeLimit) |
| } |
| |
| // Register QueryResponse to track acks and responses |
| resp := newQueryResponse(s.memberlist.NumMembers(), &q) |
| s.registerQueryResponse(params.Timeout, resp) |
| |
| // Process query locally |
| s.handleQuery(&q) |
| |
| // Start broadcasting the event |
| s.queryBroadcasts.QueueBroadcast(&broadcast{ |
| msg: raw, |
| }) |
| return resp, nil |
| } |
| |
| // registerQueryResponse is used to setup the listeners for the query, |
| // and to schedule closing the query after the timeout. |
| func (s *Serf) registerQueryResponse(timeout time.Duration, resp *QueryResponse) { |
| s.queryLock.Lock() |
| defer s.queryLock.Unlock() |
| |
| // Map the LTime to the QueryResponse. This is necessarily 1-to-1, |
| // since we increment the time for each new query. |
| s.queryResponse[resp.lTime] = resp |
| |
| // Setup a timer to close the response and deregister after the timeout |
| time.AfterFunc(timeout, func() { |
| s.queryLock.Lock() |
| delete(s.queryResponse, resp.lTime) |
| resp.Close() |
| s.queryLock.Unlock() |
| }) |
| } |
| |
| // SetTags is used to dynamically update the tags associated with |
| // the local node. This will propagate the change to the rest of |
| // the cluster. Blocks until a the message is broadcast out. |
| func (s *Serf) SetTags(tags map[string]string) error { |
| // Check that the meta data length is okay |
| if len(s.encodeTags(tags)) > memberlist.MetaMaxSize { |
| return fmt.Errorf("Encoded length of tags exceeds limit of %d bytes", |
| memberlist.MetaMaxSize) |
| } |
| |
| // Update the config |
| s.config.Tags = tags |
| |
| // Trigger a memberlist update |
| return s.memberlist.UpdateNode(s.config.BroadcastTimeout) |
| } |
| |
| // Join joins an existing Serf cluster. Returns the number of nodes |
| // successfully contacted. The returned error will be non-nil only in the |
| // case that no nodes could be contacted. If ignoreOld is true, then any |
| // user messages sent prior to the join will be ignored. |
| func (s *Serf) Join(existing []string, ignoreOld bool) (int, error) { |
| // Do a quick state check |
| if s.State() != SerfAlive { |
| return 0, fmt.Errorf("Serf can't Join after Leave or Shutdown") |
| } |
| |
| // Hold the joinLock, this is to make eventJoinIgnore safe |
| s.joinLock.Lock() |
| defer s.joinLock.Unlock() |
| |
| // Ignore any events from a potential join. This is safe since we hold |
| // the joinLock and nobody else can be doing a Join |
| if ignoreOld { |
| s.eventJoinIgnore = true |
| defer func() { |
| s.eventJoinIgnore = false |
| }() |
| } |
| |
| // Have memberlist attempt to join |
| num, err := s.memberlist.Join(existing) |
| |
| // If we joined any nodes, broadcast the join message |
| if num > 0 { |
| // Start broadcasting the update |
| if err := s.broadcastJoin(s.clock.Time()); err != nil { |
| return num, err |
| } |
| } |
| |
| return num, err |
| } |
| |
| // broadcastJoin broadcasts a new join intent with a |
| // given clock value. It is used on either join, or if |
| // we need to refute an older leave intent. Cannot be called |
| // with the memberLock held. |
| func (s *Serf) broadcastJoin(ltime LamportTime) error { |
| // Construct message to update our lamport clock |
| msg := messageJoin{ |
| LTime: ltime, |
| Node: s.config.NodeName, |
| } |
| s.clock.Witness(ltime) |
| |
| // Process update locally |
| s.handleNodeJoinIntent(&msg) |
| |
| // Start broadcasting the update |
| if err := s.broadcast(messageJoinType, &msg, nil); err != nil { |
| s.logger.Printf("[WARN] serf: Failed to broadcast join intent: %v", err) |
| return err |
| } |
| return nil |
| } |
| |
| // Leave gracefully exits the cluster. It is safe to call this multiple |
| // times. |
| func (s *Serf) Leave() error { |
| // Check the current state |
| s.stateLock.Lock() |
| if s.state == SerfLeft { |
| s.stateLock.Unlock() |
| return nil |
| } else if s.state == SerfLeaving { |
| s.stateLock.Unlock() |
| return fmt.Errorf("Leave already in progress") |
| } else if s.state == SerfShutdown { |
| s.stateLock.Unlock() |
| return fmt.Errorf("Leave called after Shutdown") |
| } |
| s.state = SerfLeaving |
| s.stateLock.Unlock() |
| |
| // If we have a snapshot, mark we are leaving |
| if s.snapshotter != nil { |
| s.snapshotter.Leave() |
| } |
| |
| // Construct the message for the graceful leave |
| msg := messageLeave{ |
| LTime: s.clock.Time(), |
| Node: s.config.NodeName, |
| } |
| s.clock.Increment() |
| |
| // Process the leave locally |
| s.handleNodeLeaveIntent(&msg) |
| |
| // Only broadcast the leave message if there is at least one |
| // other node alive. |
| if s.hasAliveMembers() { |
| notifyCh := make(chan struct{}) |
| if err := s.broadcast(messageLeaveType, &msg, notifyCh); err != nil { |
| return err |
| } |
| |
| select { |
| case <-notifyCh: |
| case <-time.After(s.config.BroadcastTimeout): |
| return errors.New("timeout while waiting for graceful leave") |
| } |
| } |
| |
| // Attempt the memberlist leave |
| err := s.memberlist.Leave(s.config.BroadcastTimeout) |
| if err != nil { |
| return err |
| } |
| |
| // Transition to Left only if we not already shutdown |
| s.stateLock.Lock() |
| if s.state != SerfShutdown { |
| s.state = SerfLeft |
| } |
| s.stateLock.Unlock() |
| return nil |
| } |
| |
| // hasAliveMembers is called to check for any alive members other than |
| // ourself. |
| func (s *Serf) hasAliveMembers() bool { |
| s.memberLock.RLock() |
| defer s.memberLock.RUnlock() |
| |
| hasAlive := false |
| for _, m := range s.members { |
| // Skip ourself, we want to know if OTHER members are alive |
| if m.Name == s.config.NodeName { |
| continue |
| } |
| |
| if m.Status == StatusAlive { |
| hasAlive = true |
| break |
| } |
| } |
| return hasAlive |
| } |
| |
| // LocalMember returns the Member information for the local node |
| func (s *Serf) LocalMember() Member { |
| s.memberLock.RLock() |
| defer s.memberLock.RUnlock() |
| return s.members[s.config.NodeName].Member |
| } |
| |
| // Members returns a point-in-time snapshot of the members of this cluster. |
| func (s *Serf) Members() []Member { |
| s.memberLock.RLock() |
| defer s.memberLock.RUnlock() |
| |
| members := make([]Member, 0, len(s.members)) |
| for _, m := range s.members { |
| members = append(members, m.Member) |
| } |
| |
| return members |
| } |
| |
| // RemoveFailedNode forcibly removes a failed node from the cluster |
| // immediately, instead of waiting for the reaper to eventually reclaim it. |
| // This also has the effect that Serf will no longer attempt to reconnect |
| // to this node. |
| func (s *Serf) RemoveFailedNode(node string) error { |
| // Construct the message to broadcast |
| msg := messageLeave{ |
| LTime: s.clock.Time(), |
| Node: node, |
| } |
| s.clock.Increment() |
| |
| // Process our own event |
| s.handleNodeLeaveIntent(&msg) |
| |
| // If we have no members, then we don't need to broadcast |
| if !s.hasAliveMembers() { |
| return nil |
| } |
| |
| // Broadcast the remove |
| notifyCh := make(chan struct{}) |
| if err := s.broadcast(messageLeaveType, &msg, notifyCh); err != nil { |
| return err |
| } |
| |
| // Wait for the broadcast |
| select { |
| case <-notifyCh: |
| case <-time.After(s.config.BroadcastTimeout): |
| return fmt.Errorf("timed out broadcasting node removal") |
| } |
| |
| return nil |
| } |
| |
| // Shutdown forcefully shuts down the Serf instance, stopping all network |
| // activity and background maintenance associated with the instance. |
| // |
| // This is not a graceful shutdown, and should be preceded by a call |
| // to Leave. Otherwise, other nodes in the cluster will detect this node's |
| // exit as a node failure. |
| // |
| // It is safe to call this method multiple times. |
| func (s *Serf) Shutdown() error { |
| s.stateLock.Lock() |
| defer s.stateLock.Unlock() |
| |
| if s.state == SerfShutdown { |
| return nil |
| } |
| |
| if s.state != SerfLeft { |
| s.logger.Printf("[WARN] serf: Shutdown without a Leave") |
| } |
| |
| s.state = SerfShutdown |
| close(s.shutdownCh) |
| |
| err := s.memberlist.Shutdown() |
| if err != nil { |
| return err |
| } |
| |
| // Wait for the snapshoter to finish if we have one |
| if s.snapshotter != nil { |
| s.snapshotter.Wait() |
| } |
| |
| return nil |
| } |
| |
| // ShutdownCh returns a channel that can be used to wait for |
| // Serf to shutdown. |
| func (s *Serf) ShutdownCh() <-chan struct{} { |
| return s.shutdownCh |
| } |
| |
| // Memberlist is used to get access to the underlying Memberlist instance |
| func (s *Serf) Memberlist() *memberlist.Memberlist { |
| return s.memberlist |
| } |
| |
| // State is the current state of this Serf instance. |
| func (s *Serf) State() SerfState { |
| s.stateLock.Lock() |
| defer s.stateLock.Unlock() |
| return s.state |
| } |
| |
| // broadcast takes a Serf message type, encodes it for the wire, and queues |
| // the broadcast. If a notify channel is given, this channel will be closed |
| // when the broadcast is sent. |
| func (s *Serf) broadcast(t messageType, msg interface{}, notify chan<- struct{}) error { |
| raw, err := encodeMessage(t, msg) |
| if err != nil { |
| return err |
| } |
| |
| s.broadcasts.QueueBroadcast(&broadcast{ |
| msg: raw, |
| notify: notify, |
| }) |
| return nil |
| } |
| |
| // handleNodeJoin is called when a node join event is received |
| // from memberlist. |
| func (s *Serf) handleNodeJoin(n *memberlist.Node) { |
| s.memberLock.Lock() |
| defer s.memberLock.Unlock() |
| |
| var oldStatus MemberStatus |
| member, ok := s.members[n.Name] |
| if !ok { |
| oldStatus = StatusNone |
| member = &memberState{ |
| Member: Member{ |
| Name: n.Name, |
| Addr: net.IP(n.Addr), |
| Port: n.Port, |
| Tags: s.decodeTags(n.Meta), |
| Status: StatusAlive, |
| }, |
| } |
| |
| // Check if we have a join intent and use the LTime |
| if join := recentIntent(s.recentJoin, n.Name); join != nil { |
| member.statusLTime = join.LTime |
| } |
| |
| // Check if we have a leave intent |
| if leave := recentIntent(s.recentLeave, n.Name); leave != nil { |
| if leave.LTime > member.statusLTime { |
| member.Status = StatusLeaving |
| member.statusLTime = leave.LTime |
| } |
| } |
| |
| s.members[n.Name] = member |
| } else { |
| oldStatus = member.Status |
| member.Status = StatusAlive |
| member.leaveTime = time.Time{} |
| member.Addr = net.IP(n.Addr) |
| member.Port = n.Port |
| member.Tags = s.decodeTags(n.Meta) |
| } |
| |
| // Update the protocol versions every time we get an event |
| member.ProtocolMin = n.PMin |
| member.ProtocolMax = n.PMax |
| member.ProtocolCur = n.PCur |
| member.DelegateMin = n.DMin |
| member.DelegateMax = n.DMax |
| member.DelegateCur = n.DCur |
| |
| // If node was previously in a failed state, then clean up some |
| // internal accounting. |
| // TODO(mitchellh): needs tests to verify not reaped |
| if oldStatus == StatusFailed || oldStatus == StatusLeft { |
| s.failedMembers = removeOldMember(s.failedMembers, member.Name) |
| s.leftMembers = removeOldMember(s.leftMembers, member.Name) |
| } |
| |
| // Update some metrics |
| metrics.IncrCounter([]string{"serf", "member", "join"}, 1) |
| |
| // Send an event along |
| s.logger.Printf("[INFO] serf: EventMemberJoin: %s %s", |
| member.Member.Name, member.Member.Addr) |
| if s.config.EventCh != nil { |
| s.config.EventCh <- MemberEvent{ |
| Type: EventMemberJoin, |
| Members: []Member{member.Member}, |
| } |
| } |
| } |
| |
| // handleNodeLeave is called when a node leave event is received |
| // from memberlist. |
| func (s *Serf) handleNodeLeave(n *memberlist.Node) { |
| s.memberLock.Lock() |
| defer s.memberLock.Unlock() |
| |
| member, ok := s.members[n.Name] |
| if !ok { |
| // We've never even heard of this node that is supposedly |
| // leaving. Just ignore it completely. |
| return |
| } |
| |
| switch member.Status { |
| case StatusLeaving: |
| member.Status = StatusLeft |
| member.leaveTime = time.Now() |
| s.leftMembers = append(s.leftMembers, member) |
| case StatusAlive: |
| member.Status = StatusFailed |
| member.leaveTime = time.Now() |
| s.failedMembers = append(s.failedMembers, member) |
| default: |
| // Unknown state that it was in? Just don't do anything |
| s.logger.Printf("[WARN] serf: Bad state when leave: %d", member.Status) |
| return |
| } |
| |
| // Send an event along |
| event := EventMemberLeave |
| eventStr := "EventMemberLeave" |
| if member.Status != StatusLeft { |
| event = EventMemberFailed |
| eventStr = "EventMemberFailed" |
| } |
| |
| // Update some metrics |
| metrics.IncrCounter([]string{"serf", "member", member.Status.String()}, 1) |
| |
| s.logger.Printf("[INFO] serf: %s: %s %s", |
| eventStr, member.Member.Name, member.Member.Addr) |
| if s.config.EventCh != nil { |
| s.config.EventCh <- MemberEvent{ |
| Type: event, |
| Members: []Member{member.Member}, |
| } |
| } |
| } |
| |
| // handleNodeUpdate is called when a node meta data update |
| // has taken place |
| func (s *Serf) handleNodeUpdate(n *memberlist.Node) { |
| s.memberLock.Lock() |
| defer s.memberLock.Unlock() |
| |
| member, ok := s.members[n.Name] |
| if !ok { |
| // We've never even heard of this node that is updating. |
| // Just ignore it completely. |
| return |
| } |
| |
| // Update the member attributes |
| member.Addr = net.IP(n.Addr) |
| member.Port = n.Port |
| member.Tags = s.decodeTags(n.Meta) |
| |
| // Snag the latest versions. NOTE - the current memberlist code will NOT |
| // fire an update event if the metadata (for Serf, tags) stays the same |
| // and only the protocol versions change. If we wake any Serf-level |
| // protocol changes where we want to get this event under those |
| // circumstances, we will need to update memberlist to do a check of |
| // versions as well as the metadata. |
| member.ProtocolMin = n.PMin |
| member.ProtocolMax = n.PMax |
| member.ProtocolCur = n.PCur |
| member.DelegateMin = n.DMin |
| member.DelegateMax = n.DMax |
| member.DelegateCur = n.DCur |
| |
| // Update some metrics |
| metrics.IncrCounter([]string{"serf", "member", "update"}, 1) |
| |
| // Send an event along |
| s.logger.Printf("[INFO] serf: EventMemberUpdate: %s", member.Member.Name) |
| if s.config.EventCh != nil { |
| s.config.EventCh <- MemberEvent{ |
| Type: EventMemberUpdate, |
| Members: []Member{member.Member}, |
| } |
| } |
| } |
| |
| // handleNodeLeaveIntent is called when an intent to leave is received. |
| func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool { |
| // Witness a potentially newer time |
| s.clock.Witness(leaveMsg.LTime) |
| |
| s.memberLock.Lock() |
| defer s.memberLock.Unlock() |
| |
| member, ok := s.members[leaveMsg.Node] |
| if !ok { |
| // If we've already seen this message don't rebroadcast |
| if recentIntent(s.recentLeave, leaveMsg.Node) != nil { |
| return false |
| } |
| |
| // We don't know this member so store it in a buffer for now |
| s.recentLeave[s.recentLeaveIndex] = nodeIntent{ |
| LTime: leaveMsg.LTime, |
| Node: leaveMsg.Node, |
| } |
| s.recentLeaveIndex = (s.recentLeaveIndex + 1) % len(s.recentLeave) |
| return true |
| } |
| |
| // If the message is old, then it is irrelevant and we can skip it |
| if leaveMsg.LTime <= member.statusLTime { |
| return false |
| } |
| |
| // Refute us leaving if we are in the alive state |
| // Must be done in another goroutine since we have the memberLock |
| if leaveMsg.Node == s.config.NodeName && s.state == SerfAlive { |
| s.logger.Printf("[DEBUG] serf: Refuting an older leave intent") |
| go s.broadcastJoin(s.clock.Time()) |
| return false |
| } |
| |
| // State transition depends on current state |
| switch member.Status { |
| case StatusAlive: |
| member.Status = StatusLeaving |
| member.statusLTime = leaveMsg.LTime |
| return true |
| case StatusFailed: |
| member.Status = StatusLeft |
| member.statusLTime = leaveMsg.LTime |
| |
| // Remove from the failed list and add to the left list. We add |
| // to the left list so that when we do a sync, other nodes will |
| // remove it from their failed list. |
| s.failedMembers = removeOldMember(s.failedMembers, member.Name) |
| s.leftMembers = append(s.leftMembers, member) |
| |
| // We must push a message indicating the node has now |
| // left to allow higher-level applications to handle the |
| // graceful leave. |
| s.logger.Printf("[INFO] serf: EventMemberLeave (forced): %s %s", |
| member.Member.Name, member.Member.Addr) |
| if s.config.EventCh != nil { |
| s.config.EventCh <- MemberEvent{ |
| Type: EventMemberLeave, |
| Members: []Member{member.Member}, |
| } |
| } |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // handleNodeJoinIntent is called when a node broadcasts a |
| // join message to set the lamport time of its join |
| func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool { |
| // Witness a potentially newer time |
| s.clock.Witness(joinMsg.LTime) |
| |
| s.memberLock.Lock() |
| defer s.memberLock.Unlock() |
| |
| member, ok := s.members[joinMsg.Node] |
| if !ok { |
| // If we've already seen this message don't rebroadcast |
| if recentIntent(s.recentJoin, joinMsg.Node) != nil { |
| return false |
| } |
| |
| // We don't know this member so store it in a buffer for now |
| s.recentJoin[s.recentJoinIndex] = nodeIntent{LTime: joinMsg.LTime, Node: joinMsg.Node} |
| s.recentJoinIndex = (s.recentJoinIndex + 1) % len(s.recentJoin) |
| return true |
| } |
| |
| // Check if this time is newer than what we have |
| if joinMsg.LTime <= member.statusLTime { |
| return false |
| } |
| |
| // Update the LTime |
| member.statusLTime = joinMsg.LTime |
| |
| // If we are in the leaving state, we should go back to alive, |
| // since the leaving message must have been for an older time |
| if member.Status == StatusLeaving { |
| member.Status = StatusAlive |
| } |
| return true |
| } |
| |
| // handleUserEvent is called when a user event broadcast is |
| // received. Returns if the message should be rebroadcast. |
| func (s *Serf) handleUserEvent(eventMsg *messageUserEvent) bool { |
| // Witness a potentially newer time |
| s.eventClock.Witness(eventMsg.LTime) |
| |
| s.eventLock.Lock() |
| defer s.eventLock.Unlock() |
| |
| // Ignore if it is before our minimum event time |
| if eventMsg.LTime < s.eventMinTime { |
| return false |
| } |
| |
| // Check if this message is too old |
| curTime := s.eventClock.Time() |
| if curTime > LamportTime(len(s.eventBuffer)) && |
| eventMsg.LTime < curTime-LamportTime(len(s.eventBuffer)) { |
| s.logger.Printf( |
| "[WARN] serf: received old event %s from time %d (current: %d)", |
| eventMsg.Name, |
| eventMsg.LTime, |
| s.eventClock.Time()) |
| return false |
| } |
| |
| // Check if we've already seen this |
| idx := eventMsg.LTime % LamportTime(len(s.eventBuffer)) |
| seen := s.eventBuffer[idx] |
| userEvent := userEvent{Name: eventMsg.Name, Payload: eventMsg.Payload} |
| if seen != nil && seen.LTime == eventMsg.LTime { |
| for _, previous := range seen.Events { |
| if previous.Equals(&userEvent) { |
| return false |
| } |
| } |
| } else { |
| seen = &userEvents{LTime: eventMsg.LTime} |
| s.eventBuffer[idx] = seen |
| } |
| |
| // Add to recent events |
| seen.Events = append(seen.Events, userEvent) |
| |
| // Update some metrics |
| metrics.IncrCounter([]string{"serf", "events"}, 1) |
| metrics.IncrCounter([]string{"serf", "events", eventMsg.Name}, 1) |
| |
| if s.config.EventCh != nil { |
| s.config.EventCh <- UserEvent{ |
| LTime: eventMsg.LTime, |
| Name: eventMsg.Name, |
| Payload: eventMsg.Payload, |
| Coalesce: eventMsg.CC, |
| } |
| } |
| return true |
| } |
| |
| // handleQuery is called when a query broadcast is |
| // received. Returns if the message should be rebroadcast. |
| func (s *Serf) handleQuery(query *messageQuery) bool { |
| // Witness a potentially newer time |
| s.queryClock.Witness(query.LTime) |
| |
| s.queryLock.Lock() |
| defer s.queryLock.Unlock() |
| |
| // Ignore if it is before our minimum query time |
| if query.LTime < s.queryMinTime { |
| return false |
| } |
| |
| // Check if this message is too old |
| curTime := s.queryClock.Time() |
| if curTime > LamportTime(len(s.queryBuffer)) && |
| query.LTime < curTime-LamportTime(len(s.queryBuffer)) { |
| s.logger.Printf( |
| "[WARN] serf: received old query %s from time %d (current: %d)", |
| query.Name, |
| query.LTime, |
| s.queryClock.Time()) |
| return false |
| } |
| |
| // Check if we've already seen this |
| idx := query.LTime % LamportTime(len(s.queryBuffer)) |
| seen := s.queryBuffer[idx] |
| if seen != nil && seen.LTime == query.LTime { |
| for _, previous := range seen.QueryIDs { |
| if previous == query.ID { |
| // Seen this ID already |
| return false |
| } |
| } |
| } else { |
| seen = &queries{LTime: query.LTime} |
| s.queryBuffer[idx] = seen |
| } |
| |
| // Add to recent queries |
| seen.QueryIDs = append(seen.QueryIDs, query.ID) |
| |
| // Update some metrics |
| metrics.IncrCounter([]string{"serf", "queries"}, 1) |
| metrics.IncrCounter([]string{"serf", "queries", query.Name}, 1) |
| |
| // Check if we should rebroadcast, this may be disabled by a flag |
| rebroadcast := true |
| if query.NoBroadcast() { |
| rebroadcast = false |
| } |
| |
| // Filter the query |
| if !s.shouldProcessQuery(query.Filters) { |
| // Even if we don't process it further, we should rebroadcast, |
| // since it is the first time we've seen this. |
| return rebroadcast |
| } |
| |
| // Send ack if requested, without waiting for client to Respond() |
| if query.Ack() { |
| ack := messageQueryResponse{ |
| LTime: query.LTime, |
| ID: query.ID, |
| From: s.config.NodeName, |
| Flags: queryFlagAck, |
| } |
| raw, err := encodeMessage(messageQueryResponseType, &ack) |
| if err != nil { |
| s.logger.Printf("[ERR] serf: failed to format ack: %v", err) |
| } else { |
| addr := net.UDPAddr{IP: query.Addr, Port: int(query.Port)} |
| if err := s.memberlist.SendTo(&addr, raw); err != nil { |
| s.logger.Printf("[ERR] serf: failed to send ack: %v", err) |
| } |
| } |
| } |
| |
| if s.config.EventCh != nil { |
| s.config.EventCh <- &Query{ |
| LTime: query.LTime, |
| Name: query.Name, |
| Payload: query.Payload, |
| serf: s, |
| id: query.ID, |
| addr: query.Addr, |
| port: query.Port, |
| deadline: time.Now().Add(query.Timeout), |
| } |
| } |
| return rebroadcast |
| } |
| |
| // handleResponse is called when a query response is |
| // received. |
| func (s *Serf) handleQueryResponse(resp *messageQueryResponse) { |
| // Look for a corresponding QueryResponse |
| s.queryLock.RLock() |
| query, ok := s.queryResponse[resp.LTime] |
| s.queryLock.RUnlock() |
| if !ok { |
| s.logger.Printf("[WARN] serf: reply for non-running query (LTime: %d, ID: %d) From: %s", |
| resp.LTime, resp.ID, resp.From) |
| return |
| } |
| |
| // Verify the ID matches |
| if query.id != resp.ID { |
| s.logger.Printf("[WARN] serf: query reply ID mismatch (Local: %d, Response: %d)", |
| query.id, resp.ID) |
| return |
| } |
| |
| // Check if the query is closed |
| if query.Finished() { |
| return |
| } |
| |
| // Process each type of response |
| if resp.Ack() { |
| metrics.IncrCounter([]string{"serf", "query_acks"}, 1) |
| select { |
| case query.ackCh <- resp.From: |
| default: |
| s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping") |
| } |
| } else { |
| metrics.IncrCounter([]string{"serf", "query_responses"}, 1) |
| select { |
| case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}: |
| default: |
| s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping") |
| } |
| } |
| } |
| |
| // handleNodeConflict is invoked when a join detects a conflict over a name. |
| // This means two different nodes (IP/Port) are claiming the same name. Memberlist |
| // will reject the "new" node mapping, but we can still be notified |
| func (s *Serf) handleNodeConflict(existing, other *memberlist.Node) { |
| // Log a basic warning if the node is not us... |
| if existing.Name != s.config.NodeName { |
| s.logger.Printf("[WARN] serf: Name conflict for '%s' both %s:%d and %s:%d are claiming", |
| existing.Name, existing.Addr, existing.Port, other.Addr, other.Port) |
| return |
| } |
| |
| // The current node is conflicting! This is an error |
| s.logger.Printf("[ERR] serf: Node name conflicts with another node at %s:%d. Names must be unique! (Resolution enabled: %v)", |
| other.Addr, other.Port, s.config.EnableNameConflictResolution) |
| |
| // If automatic resolution is enabled, kick off the resolution |
| if s.config.EnableNameConflictResolution { |
| go s.resolveNodeConflict() |
| } |
| } |
| |
| // resolveNodeConflict is used to determine which node should remain during |
| // a name conflict. This is done by running an internal query. |
| func (s *Serf) resolveNodeConflict() { |
| // Get the local node |
| local := s.memberlist.LocalNode() |
| |
| // Start a name resolution query |
| qName := internalQueryName(conflictQuery) |
| payload := []byte(s.config.NodeName) |
| resp, err := s.Query(qName, payload, nil) |
| if err != nil { |
| s.logger.Printf("[ERR] serf: Failed to start name resolution query: %v", err) |
| return |
| } |
| |
| // Counter to determine winner |
| var responses, matching int |
| |
| // Gather responses |
| respCh := resp.ResponseCh() |
| for r := range respCh { |
| // Decode the response |
| if len(r.Payload) < 1 || messageType(r.Payload[0]) != messageConflictResponseType { |
| s.logger.Printf("[ERR] serf: Invalid conflict query response type: %v", r.Payload) |
| continue |
| } |
| var member Member |
| if err := decodeMessage(r.Payload[1:], &member); err != nil { |
| s.logger.Printf("[ERR] serf: Failed to decode conflict query response: %v", err) |
| continue |
| } |
| |
| // Update the counters |
| responses++ |
| if bytes.Equal(member.Addr, local.Addr) && member.Port == local.Port { |
| matching++ |
| } |
| } |
| |
| // Query over, determine if we should live |
| majority := (responses / 2) + 1 |
| if matching >= majority { |
| s.logger.Printf("[INFO] serf: majority in name conflict resolution [%d / %d]", |
| matching, responses) |
| return |
| } |
| |
| // Since we lost the vote, we need to exit |
| s.logger.Printf("[WARN] serf: minority in name conflict resolution, quiting [%d / %d]", |
| matching, responses) |
| if err := s.Shutdown(); err != nil { |
| s.logger.Printf("[ERR] serf: Failed to shutdown: %v", err) |
| } |
| } |
| |
| // handleReap periodically reaps the list of failed and left members. |
| func (s *Serf) handleReap() { |
| for { |
| select { |
| case <-time.After(s.config.ReapInterval): |
| s.memberLock.Lock() |
| s.failedMembers = s.reap(s.failedMembers, s.config.ReconnectTimeout) |
| s.leftMembers = s.reap(s.leftMembers, s.config.TombstoneTimeout) |
| s.memberLock.Unlock() |
| case <-s.shutdownCh: |
| return |
| } |
| } |
| } |
| |
| // handleReconnect attempts to reconnect to recently failed nodes |
| // on configured intervals. |
| func (s *Serf) handleReconnect() { |
| for { |
| select { |
| case <-time.After(s.config.ReconnectInterval): |
| s.reconnect() |
| case <-s.shutdownCh: |
| return |
| } |
| } |
| } |
| |
| // reap is called with a list of old members and a timeout, and removes |
| // members that have exceeded the timeout. The members are removed from |
| // both the old list and the members itself. Locking is left to the caller. |
| func (s *Serf) reap(old []*memberState, timeout time.Duration) []*memberState { |
| now := time.Now() |
| n := len(old) |
| for i := 0; i < n; i++ { |
| m := old[i] |
| |
| // Skip if the timeout is not yet reached |
| if now.Sub(m.leaveTime) <= timeout { |
| continue |
| } |
| |
| // Delete from the list |
| old[i], old[n-1] = old[n-1], nil |
| old = old[:n-1] |
| n-- |
| i-- |
| |
| // Delete from members |
| delete(s.members, m.Name) |
| |
| // Tell the coordinate client the node has gone away and delete |
| // its cached coordinates. |
| if !s.config.DisableCoordinates { |
| s.coordClient.ForgetNode(m.Name) |
| |
| s.coordCacheLock.Lock() |
| delete(s.coordCache, m.Name) |
| s.coordCacheLock.Unlock() |
| } |
| |
| // Send an event along |
| s.logger.Printf("[INFO] serf: EventMemberReap: %s", m.Name) |
| if s.config.EventCh != nil { |
| s.config.EventCh <- MemberEvent{ |
| Type: EventMemberReap, |
| Members: []Member{m.Member}, |
| } |
| } |
| } |
| |
| return old |
| } |
| |
| // reconnect attempts to reconnect to recently fail nodes. |
| func (s *Serf) reconnect() { |
| s.memberLock.RLock() |
| |
| // Nothing to do if there are no failed members |
| n := len(s.failedMembers) |
| if n == 0 { |
| s.memberLock.RUnlock() |
| return |
| } |
| |
| // Probability we should attempt to reconect is given |
| // by num failed / (num members - num failed - num left) |
| // This means that we probabilistically expect the cluster |
| // to attempt to connect to each failed member once per |
| // reconnect interval |
| numFailed := float32(len(s.failedMembers)) |
| numAlive := float32(len(s.members) - len(s.failedMembers) - len(s.leftMembers)) |
| if numAlive == 0 { |
| numAlive = 1 // guard against zero divide |
| } |
| prob := numFailed / numAlive |
| if rand.Float32() > prob { |
| s.memberLock.RUnlock() |
| s.logger.Printf("[DEBUG] serf: forgoing reconnect for random throttling") |
| return |
| } |
| |
| // Select a random member to try and join |
| idx := int(rand.Uint32() % uint32(n)) |
| mem := s.failedMembers[idx] |
| s.memberLock.RUnlock() |
| |
| // Format the addr |
| addr := net.UDPAddr{IP: mem.Addr, Port: int(mem.Port)} |
| s.logger.Printf("[INFO] serf: attempting reconnect to %v %s", mem.Name, addr.String()) |
| |
| // Attempt to join at the memberlist level |
| s.memberlist.Join([]string{addr.String()}) |
| } |
| |
| // checkQueueDepth periodically checks the size of a queue to see if |
| // it is too large |
| func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) { |
| for { |
| select { |
| case <-time.After(time.Second): |
| numq := queue.NumQueued() |
| metrics.AddSample([]string{"serf", "queue", name}, float32(numq)) |
| if numq >= s.config.QueueDepthWarning { |
| s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq) |
| } |
| if numq > s.config.MaxQueueDepth { |
| s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!", |
| name, numq, s.config.MaxQueueDepth) |
| queue.Prune(s.config.MaxQueueDepth) |
| } |
| case <-s.shutdownCh: |
| return |
| } |
| } |
| } |
| |
| // removeOldMember is used to remove an old member from a list of old |
| // members. |
| func removeOldMember(old []*memberState, name string) []*memberState { |
| for i, m := range old { |
| if m.Name == name { |
| n := len(old) |
| old[i], old[n-1] = old[n-1], nil |
| return old[:n-1] |
| } |
| } |
| |
| return old |
| } |
| |
| // recentIntent checks the recent intent buffer for a matching |
| // entry for a given node, and either returns the message or nil |
| func recentIntent(recent []nodeIntent, node string) (intent *nodeIntent) { |
| for i := 0; i < len(recent); i++ { |
| // Break fast if we hit a zero entry |
| if recent[i].LTime == 0 { |
| break |
| } |
| |
| // Check for a node match |
| if recent[i].Node == node { |
| // Take the most recent entry |
| if intent == nil || recent[i].LTime > intent.LTime { |
| intent = &recent[i] |
| } |
| } |
| } |
| return |
| } |
| |
| // handleRejoin attempts to reconnect to previously known alive nodes |
| func (s *Serf) handleRejoin(previous []*PreviousNode) { |
| for _, prev := range previous { |
| // Do not attempt to join ourself |
| if prev.Name == s.config.NodeName { |
| continue |
| } |
| |
| s.logger.Printf("[INFO] serf: Attempting re-join to previously known node: %s", prev) |
| _, err := s.memberlist.Join([]string{prev.Addr}) |
| if err == nil { |
| s.logger.Printf("[INFO] serf: Re-joined to previously known node: %s", prev) |
| return |
| } |
| } |
| s.logger.Printf("[WARN] serf: Failed to re-join any previously known node") |
| } |
| |
| // encodeTags is used to encode a tag map |
| func (s *Serf) encodeTags(tags map[string]string) []byte { |
| // Support role-only backwards compatibility |
| if s.ProtocolVersion() < 3 { |
| role := tags["role"] |
| return []byte(role) |
| } |
| |
| // Use a magic byte prefix and msgpack encode the tags |
| var buf bytes.Buffer |
| buf.WriteByte(tagMagicByte) |
| enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) |
| if err := enc.Encode(tags); err != nil { |
| panic(fmt.Sprintf("Failed to encode tags: %v", err)) |
| } |
| return buf.Bytes() |
| } |
| |
| // decodeTags is used to decode a tag map |
| func (s *Serf) decodeTags(buf []byte) map[string]string { |
| tags := make(map[string]string) |
| |
| // Backwards compatibility mode |
| if len(buf) == 0 || buf[0] != tagMagicByte { |
| tags["role"] = string(buf) |
| return tags |
| } |
| |
| // Decode the tags |
| r := bytes.NewReader(buf[1:]) |
| dec := codec.NewDecoder(r, &codec.MsgpackHandle{}) |
| if err := dec.Decode(&tags); err != nil { |
| s.logger.Printf("[ERR] serf: Failed to decode tags: %v", err) |
| } |
| return tags |
| } |
| |
| // Stats is used to provide operator debugging information |
| func (s *Serf) Stats() map[string]string { |
| toString := func(v uint64) string { |
| return strconv.FormatUint(v, 10) |
| } |
| stats := map[string]string{ |
| "members": toString(uint64(len(s.members))), |
| "failed": toString(uint64(len(s.failedMembers))), |
| "left": toString(uint64(len(s.leftMembers))), |
| "member_time": toString(uint64(s.clock.Time())), |
| "event_time": toString(uint64(s.eventClock.Time())), |
| "query_time": toString(uint64(s.queryClock.Time())), |
| "intent_queue": toString(uint64(s.broadcasts.NumQueued())), |
| "event_queue": toString(uint64(s.eventBroadcasts.NumQueued())), |
| "query_queue": toString(uint64(s.queryBroadcasts.NumQueued())), |
| "encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()), |
| } |
| return stats |
| } |
| |
| // WriteKeyringFile will serialize the current keyring and save it to a file. |
| func (s *Serf) writeKeyringFile() error { |
| if len(s.config.KeyringFile) == 0 { |
| return nil |
| } |
| |
| keyring := s.config.MemberlistConfig.Keyring |
| keysRaw := keyring.GetKeys() |
| keysEncoded := make([]string, len(keysRaw)) |
| |
| for i, key := range keysRaw { |
| keysEncoded[i] = base64.StdEncoding.EncodeToString(key) |
| } |
| |
| encodedKeys, err := json.MarshalIndent(keysEncoded, "", " ") |
| if err != nil { |
| return fmt.Errorf("Failed to encode keys: %s", err) |
| } |
| |
| // Use 0600 for permissions because key data is sensitive |
| if err = ioutil.WriteFile(s.config.KeyringFile, encodedKeys, 0600); err != nil { |
| return fmt.Errorf("Failed to write keyring file: %s", err) |
| } |
| |
| // Success! |
| return nil |
| } |
| |
| // GetCoordinate returns the network coordinate of the local node. |
| func (s *Serf) GetCoordinate() (*coordinate.Coordinate, error) { |
| if !s.config.DisableCoordinates { |
| return s.coordClient.GetCoordinate(), nil |
| } |
| |
| return nil, fmt.Errorf("Coordinates are disabled") |
| } |
| |
| // GetCachedCoordinate returns the network coordinate for the node with the given |
| // name. This will only be valid if DisableCoordinates is set to false. |
| func (s *Serf) GetCachedCoordinate(name string) (coord *coordinate.Coordinate, ok bool) { |
| if !s.config.DisableCoordinates { |
| s.coordCacheLock.RLock() |
| defer s.coordCacheLock.RUnlock() |
| if coord, ok = s.coordCache[name]; ok { |
| return coord, true |
| } |
| |
| return nil, false |
| } |
| |
| return nil, false |
| } |
| |
| // NumNodes returns the number of nodes in the serf cluster, regardless of |
| // their health or status. |
| func (s *Serf) NumNodes() (numNodes int) { |
| s.memberLock.RLock() |
| numNodes = len(s.members) |
| s.memberLock.RUnlock() |
| |
| return numNodes |
| } |