blob: 44f8a5175ad0a3ed7138f8cf08dc373c43332ff5 [file] [log] [blame]
package serf
import (
"bufio"
"encoding/json"
"fmt"
"log"
"math/rand"
"net"
"os"
"strconv"
"strings"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/serf/coordinate"
)
/*
Serf supports using a "snapshot" file that contains various
transactional data that is used to help Serf recover quickly
and gracefully from a failure. We append member events, as well
as the latest clock values to the file during normal operation,
and periodically checkpoint and roll over the file. During a restore,
we can replay the various member events to recall a list of known
nodes to re-join, as well as restore our clock values to avoid replaying
old events.
*/
const flushInterval = 500 * time.Millisecond
const clockUpdateInterval = 500 * time.Millisecond
const coordinateUpdateInterval = 60 * time.Second
const tmpExt = ".compact"
// Snapshotter is responsible for ingesting events and persisting
// them to disk, and providing a recovery mechanism at start time.
type Snapshotter struct {
aliveNodes map[string]string
clock *LamportClock
coordClient *coordinate.Client
fh *os.File
buffered *bufio.Writer
inCh <-chan Event
lastFlush time.Time
lastClock LamportTime
lastEventClock LamportTime
lastQueryClock LamportTime
leaveCh chan struct{}
leaving bool
logger *log.Logger
maxSize int64
path string
offset int64
outCh chan<- Event
rejoinAfterLeave bool
shutdownCh <-chan struct{}
waitCh chan struct{}
}
// PreviousNode is used to represent the previously known alive nodes
type PreviousNode struct {
Name string
Addr string
}
func (p PreviousNode) String() string {
return fmt.Sprintf("%s: %s", p.Name, p.Addr)
}
// NewSnapshotter creates a new Snapshotter that records events up to a
// max byte size before rotating the file. It can also be used to
// recover old state. Snapshotter works by reading an event channel it returns,
// passing through to an output channel, and persisting relevant events to disk.
// Setting rejoinAfterLeave makes leave not clear the state, and can be used
// if you intend to rejoin the same cluster after a leave.
func NewSnapshotter(path string,
maxSize int,
rejoinAfterLeave bool,
logger *log.Logger,
clock *LamportClock,
coordClient *coordinate.Client,
outCh chan<- Event,
shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) {
inCh := make(chan Event, 1024)
// Try to open the file
fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
if err != nil {
return nil, nil, fmt.Errorf("failed to open snapshot: %v", err)
}
// Determine the offset
info, err := fh.Stat()
if err != nil {
fh.Close()
return nil, nil, fmt.Errorf("failed to stat snapshot: %v", err)
}
offset := info.Size()
// Create the snapshotter
snap := &Snapshotter{
aliveNodes: make(map[string]string),
clock: clock,
coordClient: coordClient,
fh: fh,
buffered: bufio.NewWriter(fh),
inCh: inCh,
lastClock: 0,
lastEventClock: 0,
lastQueryClock: 0,
leaveCh: make(chan struct{}),
logger: logger,
maxSize: int64(maxSize),
path: path,
offset: offset,
outCh: outCh,
rejoinAfterLeave: rejoinAfterLeave,
shutdownCh: shutdownCh,
waitCh: make(chan struct{}),
}
// Recover the last known state
if err := snap.replay(); err != nil {
fh.Close()
return nil, nil, err
}
// Start handling new commands
go snap.stream()
return inCh, snap, nil
}
// LastClock returns the last known clock time
func (s *Snapshotter) LastClock() LamportTime {
return s.lastClock
}
// LastEventClock returns the last known event clock time
func (s *Snapshotter) LastEventClock() LamportTime {
return s.lastEventClock
}
// LastQueryClock returns the last known query clock time
func (s *Snapshotter) LastQueryClock() LamportTime {
return s.lastQueryClock
}
// AliveNodes returns the last known alive nodes
func (s *Snapshotter) AliveNodes() []*PreviousNode {
// Copy the previously known
previous := make([]*PreviousNode, 0, len(s.aliveNodes))
for name, addr := range s.aliveNodes {
previous = append(previous, &PreviousNode{name, addr})
}
// Randomize the order, prevents hot shards
for i := range previous {
j := rand.Intn(i + 1)
previous[i], previous[j] = previous[j], previous[i]
}
return previous
}
// Wait is used to wait until the snapshotter finishes shut down
func (s *Snapshotter) Wait() {
<-s.waitCh
}
// Leave is used to remove known nodes to prevent a restart from
// causing a join. Otherwise nodes will re-join after leaving!
func (s *Snapshotter) Leave() {
select {
case s.leaveCh <- struct{}{}:
case <-s.shutdownCh:
}
}
// stream is a long running routine that is used to handle events
func (s *Snapshotter) stream() {
clockTicker := time.NewTicker(clockUpdateInterval)
defer clockTicker.Stop()
coordinateTicker := time.NewTicker(coordinateUpdateInterval)
defer coordinateTicker.Stop()
for {
select {
case <-s.leaveCh:
s.leaving = true
// If we plan to re-join, keep our state
if !s.rejoinAfterLeave {
s.aliveNodes = make(map[string]string)
}
s.tryAppend("leave\n")
if err := s.buffered.Flush(); err != nil {
s.logger.Printf("[ERR] serf: failed to flush leave to snapshot: %v", err)
}
if err := s.fh.Sync(); err != nil {
s.logger.Printf("[ERR] serf: failed to sync leave to snapshot: %v", err)
}
case e := <-s.inCh:
// Forward the event immediately
if s.outCh != nil {
s.outCh <- e
}
// Stop recording events after a leave is issued
if s.leaving {
continue
}
switch typed := e.(type) {
case MemberEvent:
s.processMemberEvent(typed)
case UserEvent:
s.processUserEvent(typed)
case *Query:
s.processQuery(typed)
default:
s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e)
}
case <-clockTicker.C:
s.updateClock()
case <-coordinateTicker.C:
s.updateCoordinate()
case <-s.shutdownCh:
if err := s.buffered.Flush(); err != nil {
s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err)
}
if err := s.fh.Sync(); err != nil {
s.logger.Printf("[ERR] serf: failed to sync snapshot: %v", err)
}
s.fh.Close()
close(s.waitCh)
return
}
}
}
// processMemberEvent is used to handle a single member event
func (s *Snapshotter) processMemberEvent(e MemberEvent) {
switch e.Type {
case EventMemberJoin:
for _, mem := range e.Members {
addr := net.TCPAddr{IP: mem.Addr, Port: int(mem.Port)}
s.aliveNodes[mem.Name] = addr.String()
s.tryAppend(fmt.Sprintf("alive: %s %s\n", mem.Name, addr.String()))
}
case EventMemberLeave:
fallthrough
case EventMemberFailed:
for _, mem := range e.Members {
delete(s.aliveNodes, mem.Name)
s.tryAppend(fmt.Sprintf("not-alive: %s\n", mem.Name))
}
}
s.updateClock()
}
// updateClock is called periodically to check if we should udpate our
// clock value. This is done after member events but should also be done
// periodically due to race conditions with join and leave intents
func (s *Snapshotter) updateClock() {
lastSeen := s.clock.Time() - 1
if lastSeen > s.lastClock {
s.lastClock = lastSeen
s.tryAppend(fmt.Sprintf("clock: %d\n", s.lastClock))
}
}
// updateCoordinate is called periodically to write out the current local
// coordinate. It's safe to call this if coordinates aren't enabled (nil
// client) and it will be a no-op.
func (s *Snapshotter) updateCoordinate() {
if s.coordClient != nil {
encoded, err := json.Marshal(s.coordClient.GetCoordinate())
if err != nil {
s.logger.Printf("[ERR] serf: Failed to encode coordinate: %v", err)
} else {
s.tryAppend(fmt.Sprintf("coordinate: %s\n", encoded))
}
}
}
// processUserEvent is used to handle a single user event
func (s *Snapshotter) processUserEvent(e UserEvent) {
// Ignore old clocks
if e.LTime <= s.lastEventClock {
return
}
s.lastEventClock = e.LTime
s.tryAppend(fmt.Sprintf("event-clock: %d\n", e.LTime))
}
// processQuery is used to handle a single query event
func (s *Snapshotter) processQuery(q *Query) {
// Ignore old clocks
if q.LTime <= s.lastQueryClock {
return
}
s.lastQueryClock = q.LTime
s.tryAppend(fmt.Sprintf("query-clock: %d\n", q.LTime))
}
// tryAppend will invoke append line but will not return an error
func (s *Snapshotter) tryAppend(l string) {
if err := s.appendLine(l); err != nil {
s.logger.Printf("[ERR] serf: Failed to update snapshot: %v", err)
}
}
// appendLine is used to append a line to the existing log
func (s *Snapshotter) appendLine(l string) error {
defer metrics.MeasureSince([]string{"serf", "snapshot", "appendLine"}, time.Now())
n, err := s.buffered.WriteString(l)
if err != nil {
return err
}
// Check if we should flush
now := time.Now()
if now.Sub(s.lastFlush) > flushInterval {
s.lastFlush = now
if err := s.buffered.Flush(); err != nil {
return err
}
}
// Check if a compaction is necessary
s.offset += int64(n)
if s.offset > s.maxSize {
return s.compact()
}
return nil
}
// Compact is used to compact the snapshot once it is too large
func (s *Snapshotter) compact() error {
defer metrics.MeasureSince([]string{"serf", "snapshot", "compact"}, time.Now())
// Try to open the file to new fiel
newPath := s.path + tmpExt
fh, err := os.OpenFile(newPath, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0755)
if err != nil {
return fmt.Errorf("failed to open new snapshot: %v", err)
}
// Create a buffered writer
buf := bufio.NewWriter(fh)
// Write out the live nodes
var offset int64
for name, addr := range s.aliveNodes {
line := fmt.Sprintf("alive: %s %s\n", name, addr)
n, err := buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
}
// Write out the clocks
line := fmt.Sprintf("clock: %d\n", s.lastClock)
n, err := buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
line = fmt.Sprintf("event-clock: %d\n", s.lastEventClock)
n, err = buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
line = fmt.Sprintf("query-clock: %d\n", s.lastQueryClock)
n, err = buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
// Write out the coordinate.
if s.coordClient != nil {
encoded, err := json.Marshal(s.coordClient.GetCoordinate())
if err != nil {
fh.Close()
return err
}
line = fmt.Sprintf("coordinate: %s\n", encoded)
n, err = buf.WriteString(line)
if err != nil {
fh.Close()
return err
}
offset += int64(n)
}
// Flush the new snapshot
err = buf.Flush()
fh.Close()
if err != nil {
return fmt.Errorf("failed to flush new snapshot: %v", err)
}
// We now need to swap the old snapshot file with the new snapshot.
// Turns out, Windows won't let us rename the files if we have
// open handles to them or if the destination already exists. This
// means we are forced to close the existing handles, delete the
// old file, move the new one in place, and then re-open the file
// handles.
// Flush the existing snapshot, ignoring errors since we will
// delete it momentarily.
s.buffered.Flush()
s.buffered = nil
// Close the file handle to the old snapshot
s.fh.Close()
s.fh = nil
// Delete the old file
if err := os.Remove(s.path); err != nil {
return fmt.Errorf("failed to remove old snapshot: %v", err)
}
// Move the new file into place
if err := os.Rename(newPath, s.path); err != nil {
return fmt.Errorf("failed to install new snapshot: %v", err)
}
// Open the new snapshot
fh, err = os.OpenFile(s.path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
if err != nil {
return fmt.Errorf("failed to open snapshot: %v", err)
}
buf = bufio.NewWriter(fh)
// Rotate our handles
s.fh = fh
s.buffered = buf
s.offset = offset
s.lastFlush = time.Now()
return nil
}
// replay is used to seek to reset our internal state by replaying
// the snapshot file. It is used at initialization time to read old
// state
func (s *Snapshotter) replay() error {
// Seek to the beginning
if _, err := s.fh.Seek(0, os.SEEK_SET); err != nil {
return err
}
// Read each line
reader := bufio.NewReader(s.fh)
for {
line, err := reader.ReadString('\n')
if err != nil {
break
}
// Skip the newline
line = line[:len(line)-1]
// Switch on the prefix
if strings.HasPrefix(line, "alive: ") {
info := strings.TrimPrefix(line, "alive: ")
addrIdx := strings.LastIndex(info, " ")
if addrIdx == -1 {
s.logger.Printf("[WARN] serf: Failed to parse address: %v", line)
continue
}
addr := info[addrIdx+1:]
name := info[:addrIdx]
s.aliveNodes[name] = addr
} else if strings.HasPrefix(line, "not-alive: ") {
name := strings.TrimPrefix(line, "not-alive: ")
delete(s.aliveNodes, name)
} else if strings.HasPrefix(line, "clock: ") {
timeStr := strings.TrimPrefix(line, "clock: ")
timeInt, err := strconv.ParseUint(timeStr, 10, 64)
if err != nil {
s.logger.Printf("[WARN] serf: Failed to convert clock time: %v", err)
continue
}
s.lastClock = LamportTime(timeInt)
} else if strings.HasPrefix(line, "event-clock: ") {
timeStr := strings.TrimPrefix(line, "event-clock: ")
timeInt, err := strconv.ParseUint(timeStr, 10, 64)
if err != nil {
s.logger.Printf("[WARN] serf: Failed to convert event clock time: %v", err)
continue
}
s.lastEventClock = LamportTime(timeInt)
} else if strings.HasPrefix(line, "query-clock: ") {
timeStr := strings.TrimPrefix(line, "query-clock: ")
timeInt, err := strconv.ParseUint(timeStr, 10, 64)
if err != nil {
s.logger.Printf("[WARN] serf: Failed to convert query clock time: %v", err)
continue
}
s.lastQueryClock = LamportTime(timeInt)
} else if strings.HasPrefix(line, "coordinate: ") {
if s.coordClient == nil {
s.logger.Printf("[WARN] serf: Ignoring snapshot coordinates since they are disabled")
continue
}
coordStr := strings.TrimPrefix(line, "coordinate: ")
var coord coordinate.Coordinate
err := json.Unmarshal([]byte(coordStr), &coord)
if err != nil {
s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err)
continue
}
s.coordClient.SetCoordinate(&coord)
} else if line == "leave" {
// Ignore a leave if we plan on re-joining
if s.rejoinAfterLeave {
s.logger.Printf("[INFO] serf: Ignoring previous leave in snapshot")
continue
}
s.aliveNodes = make(map[string]string)
s.lastClock = 0
s.lastEventClock = 0
s.lastQueryClock = 0
} else if strings.HasPrefix(line, "#") {
// Skip comment lines
} else {
s.logger.Printf("[WARN] serf: Unrecognized snapshot line: %v", line)
}
}
// Seek to the end
if _, err := s.fh.Seek(0, os.SEEK_END); err != nil {
return err
}
return nil
}