blob: f538317c06cdfc2333e3a494f8cfcfc788cf76c1 [file] [log] [blame]
package raft
import (
"fmt"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/go-metrics"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/encryption"
"github.com/docker/swarmkit/manager/state/raft/membership"
"github.com/docker/swarmkit/manager/state/raft/storage"
"github.com/docker/swarmkit/manager/state/store"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
var (
// Snapshot create latency timer.
snapshotLatencyTimer metrics.Timer
)
func init() {
ns := metrics.NewNamespace("swarm", "raft", nil)
snapshotLatencyTimer = ns.NewTimer("snapshot_latency",
"Raft snapshot create latency.")
metrics.Register(ns)
}
func (n *Node) readFromDisk(ctx context.Context) (*raftpb.Snapshot, storage.WALData, error) {
keys := n.keyRotator.GetKeys()
n.raftLogger = &storage.EncryptedRaftLogger{
StateDir: n.opts.StateDir,
EncryptionKey: keys.CurrentDEK,
}
if keys.PendingDEK != nil {
n.raftLogger.EncryptionKey = keys.PendingDEK
}
snap, walData, err := n.raftLogger.BootstrapFromDisk(ctx)
if keys.PendingDEK != nil {
switch errors.Cause(err).(type) {
case nil:
if err = n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: keys.PendingDEK}); err != nil {
err = errors.Wrap(err, "previous key rotation was successful, but unable mark rotation as complete")
}
case encryption.ErrCannotDecrypt:
snap, walData, err = n.raftLogger.BootstrapFromDisk(ctx, keys.CurrentDEK)
}
}
if err != nil {
return nil, storage.WALData{}, err
}
return snap, walData, nil
}
// bootstraps a node's raft store from the raft logs and snapshots on disk
func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error {
snapshot, waldata, err := n.readFromDisk(ctx)
if err != nil {
return err
}
// Read logs to fully catch up store
var raftNode api.RaftMember
if err := raftNode.Unmarshal(waldata.Metadata); err != nil {
return errors.Wrap(err, "failed to unmarshal WAL metadata")
}
n.Config.ID = raftNode.RaftID
if snapshot != nil {
snapCluster, err := n.clusterSnapshot(snapshot.Data)
if err != nil {
return err
}
var bootstrapMembers []*api.RaftMember
if forceNewCluster {
for _, m := range snapCluster.Members {
if m.RaftID != n.Config.ID {
n.cluster.RemoveMember(m.RaftID)
continue
}
bootstrapMembers = append(bootstrapMembers, m)
}
} else {
bootstrapMembers = snapCluster.Members
}
n.bootstrapMembers = bootstrapMembers
for _, removedMember := range snapCluster.Removed {
n.cluster.RemoveMember(removedMember)
}
}
ents, st := waldata.Entries, waldata.HardState
// All members that are no longer part of the cluster must be added to
// the removed list right away, so that we don't try to connect to them
// before processing the configuration change entries, which could make
// us get stuck.
for _, ent := range ents {
if ent.Index <= st.Commit && ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
if err := cc.Unmarshal(ent.Data); err != nil {
return errors.Wrap(err, "failed to unmarshal config change")
}
if cc.Type == raftpb.ConfChangeRemoveNode {
n.cluster.RemoveMember(cc.NodeID)
}
}
}
if forceNewCluster {
// discard the previously uncommitted entries
for i, ent := range ents {
if ent.Index > st.Commit {
log.G(ctx).Infof("discarding %d uncommitted WAL entries", len(ents)-i)
ents = ents[:i]
break
}
}
// force append the configuration change entries
toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), n.Config.ID, st.Term, st.Commit)
// All members that are being removed as part of the
// force-new-cluster process must be added to the
// removed list right away, so that we don't try to
// connect to them before processing the configuration
// change entries, which could make us get stuck.
for _, ccEnt := range toAppEnts {
if ccEnt.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
if err := cc.Unmarshal(ccEnt.Data); err != nil {
return errors.Wrap(err, "error unmarshalling force-new-cluster config change")
}
if cc.Type == raftpb.ConfChangeRemoveNode {
n.cluster.RemoveMember(cc.NodeID)
}
}
}
ents = append(ents, toAppEnts...)
// force commit newly appended entries
err := n.raftLogger.SaveEntries(st, toAppEnts)
if err != nil {
log.G(ctx).WithError(err).Fatal("failed to save WAL while forcing new cluster")
}
if len(toAppEnts) != 0 {
st.Commit = toAppEnts[len(toAppEnts)-1].Index
}
}
if snapshot != nil {
if err := n.raftStore.ApplySnapshot(*snapshot); err != nil {
return err
}
}
if err := n.raftStore.SetHardState(st); err != nil {
return err
}
return n.raftStore.Append(ents)
}
func (n *Node) newRaftLogs(nodeID string) (raft.Peer, error) {
raftNode := &api.RaftMember{
RaftID: n.Config.ID,
NodeID: nodeID,
Addr: n.opts.Addr,
}
metadata, err := raftNode.Marshal()
if err != nil {
return raft.Peer{}, errors.Wrap(err, "error marshalling raft node")
}
if err := n.raftLogger.BootstrapNew(metadata); err != nil {
return raft.Peer{}, err
}
n.cluster.AddMember(&membership.Member{RaftMember: raftNode})
return raft.Peer{ID: n.Config.ID, Context: metadata}, nil
}
func (n *Node) triggerSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
snapshot := api.Snapshot{Version: api.Snapshot_V0}
for _, member := range n.cluster.Members() {
snapshot.Membership.Members = append(snapshot.Membership.Members,
&api.RaftMember{
NodeID: member.NodeID,
RaftID: member.RaftID,
Addr: member.Addr,
})
}
snapshot.Membership.Removed = n.cluster.Removed()
viewStarted := make(chan struct{})
n.asyncTasks.Add(1)
n.snapshotInProgress = make(chan raftpb.SnapshotMetadata, 1) // buffered in case Shutdown is called during the snapshot
go func(appliedIndex uint64, snapshotMeta raftpb.SnapshotMetadata) {
// Deferred latency capture.
defer metrics.StartTimer(snapshotLatencyTimer)()
defer func() {
n.asyncTasks.Done()
n.snapshotInProgress <- snapshotMeta
}()
var err error
n.memoryStore.View(func(tx store.ReadTx) {
close(viewStarted)
var storeSnapshot *api.StoreSnapshot
storeSnapshot, err = n.memoryStore.Save(tx)
snapshot.Store = *storeSnapshot
})
if err != nil {
log.G(ctx).WithError(err).Error("failed to read snapshot from store")
return
}
d, err := snapshot.Marshal()
if err != nil {
log.G(ctx).WithError(err).Error("failed to marshal snapshot")
return
}
snap, err := n.raftStore.CreateSnapshot(appliedIndex, &n.confState, d)
if err == nil {
if err := n.raftLogger.SaveSnapshot(snap); err != nil {
log.G(ctx).WithError(err).Error("failed to save snapshot")
return
}
snapshotMeta = snap.Metadata
if appliedIndex > raftConfig.LogEntriesForSlowFollowers {
err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers)
if err != nil && err != raft.ErrCompacted {
log.G(ctx).WithError(err).Error("failed to compact snapshot")
}
}
} else if err != raft.ErrSnapOutOfDate {
log.G(ctx).WithError(err).Error("failed to create snapshot")
}
}(n.appliedIndex, n.snapshotMeta)
// Wait for the goroutine to establish a read transaction, to make
// sure it sees the state as of this moment.
<-viewStarted
}
func (n *Node) clusterSnapshot(data []byte) (api.ClusterSnapshot, error) {
var snapshot api.Snapshot
if err := snapshot.Unmarshal(data); err != nil {
return snapshot.Membership, err
}
if snapshot.Version != api.Snapshot_V0 {
return snapshot.Membership, fmt.Errorf("unrecognized snapshot version %d", snapshot.Version)
}
if err := n.memoryStore.Restore(&snapshot.Store); err != nil {
return snapshot.Membership, err
}
return snapshot.Membership, nil
}