| package raft |
| |
| import ( |
| "fmt" |
| |
| "github.com/coreos/etcd/raft" |
| "github.com/coreos/etcd/raft/raftpb" |
| "github.com/docker/swarmkit/api" |
| "github.com/docker/swarmkit/log" |
| "github.com/docker/swarmkit/manager/encryption" |
| "github.com/docker/swarmkit/manager/state/raft/membership" |
| "github.com/docker/swarmkit/manager/state/raft/storage" |
| "github.com/docker/swarmkit/manager/state/store" |
| "github.com/pkg/errors" |
| "golang.org/x/net/context" |
| ) |
| |
| func (n *Node) readFromDisk(ctx context.Context) (*raftpb.Snapshot, storage.WALData, error) { |
| keys := n.keyRotator.GetKeys() |
| |
| n.raftLogger = &storage.EncryptedRaftLogger{ |
| StateDir: n.opts.StateDir, |
| EncryptionKey: keys.CurrentDEK, |
| } |
| if keys.PendingDEK != nil { |
| n.raftLogger.EncryptionKey = keys.PendingDEK |
| } |
| |
| snap, walData, err := n.raftLogger.BootstrapFromDisk(ctx) |
| |
| if keys.PendingDEK != nil { |
| switch errors.Cause(err).(type) { |
| case nil: |
| if err = n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: keys.PendingDEK}); err != nil { |
| err = errors.Wrap(err, "previous key rotation was successful, but unable mark rotation as complete") |
| } |
| case encryption.ErrCannotDecrypt: |
| snap, walData, err = n.raftLogger.BootstrapFromDisk(ctx, keys.CurrentDEK) |
| } |
| } |
| |
| if err != nil { |
| return nil, storage.WALData{}, err |
| } |
| return snap, walData, nil |
| } |
| |
| // bootstraps a node's raft store from the raft logs and snapshots on disk |
| func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error { |
| snapshot, waldata, err := n.readFromDisk(ctx) |
| if err != nil { |
| return err |
| } |
| |
| // Read logs to fully catch up store |
| var raftNode api.RaftMember |
| if err := raftNode.Unmarshal(waldata.Metadata); err != nil { |
| return errors.Wrap(err, "failed to unmarshal WAL metadata") |
| } |
| n.Config.ID = raftNode.RaftID |
| |
| if snapshot != nil { |
| snapCluster, err := n.clusterSnapshot(snapshot.Data) |
| if err != nil { |
| return err |
| } |
| var bootstrapMembers []*api.RaftMember |
| if forceNewCluster { |
| for _, m := range snapCluster.Members { |
| if m.RaftID != n.Config.ID { |
| n.cluster.RemoveMember(m.RaftID) |
| continue |
| } |
| bootstrapMembers = append(bootstrapMembers, m) |
| } |
| } else { |
| bootstrapMembers = snapCluster.Members |
| } |
| n.bootstrapMembers = bootstrapMembers |
| for _, removedMember := range snapCluster.Removed { |
| n.cluster.RemoveMember(removedMember) |
| } |
| } |
| |
| ents, st := waldata.Entries, waldata.HardState |
| |
| // All members that are no longer part of the cluster must be added to |
| // the removed list right away, so that we don't try to connect to them |
| // before processing the configuration change entries, which could make |
| // us get stuck. |
| for _, ent := range ents { |
| if ent.Index <= st.Commit && ent.Type == raftpb.EntryConfChange { |
| var cc raftpb.ConfChange |
| if err := cc.Unmarshal(ent.Data); err != nil { |
| return errors.Wrap(err, "failed to unmarshal config change") |
| } |
| if cc.Type == raftpb.ConfChangeRemoveNode { |
| n.cluster.RemoveMember(cc.NodeID) |
| } |
| } |
| } |
| |
| if forceNewCluster { |
| // discard the previously uncommitted entries |
| for i, ent := range ents { |
| if ent.Index > st.Commit { |
| log.G(ctx).Infof("discarding %d uncommitted WAL entries", len(ents)-i) |
| ents = ents[:i] |
| break |
| } |
| } |
| |
| // force append the configuration change entries |
| toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), n.Config.ID, st.Term, st.Commit) |
| |
| // All members that are being removed as part of the |
| // force-new-cluster process must be added to the |
| // removed list right away, so that we don't try to |
| // connect to them before processing the configuration |
| // change entries, which could make us get stuck. |
| for _, ccEnt := range toAppEnts { |
| if ccEnt.Type == raftpb.EntryConfChange { |
| var cc raftpb.ConfChange |
| if err := cc.Unmarshal(ccEnt.Data); err != nil { |
| return errors.Wrap(err, "error unmarshalling force-new-cluster config change") |
| } |
| if cc.Type == raftpb.ConfChangeRemoveNode { |
| n.cluster.RemoveMember(cc.NodeID) |
| } |
| } |
| } |
| ents = append(ents, toAppEnts...) |
| |
| // force commit newly appended entries |
| err := n.raftLogger.SaveEntries(st, toAppEnts) |
| if err != nil { |
| log.G(ctx).WithError(err).Fatal("failed to save WAL while forcing new cluster") |
| } |
| if len(toAppEnts) != 0 { |
| st.Commit = toAppEnts[len(toAppEnts)-1].Index |
| } |
| } |
| |
| if snapshot != nil { |
| if err := n.raftStore.ApplySnapshot(*snapshot); err != nil { |
| return err |
| } |
| } |
| if err := n.raftStore.SetHardState(st); err != nil { |
| return err |
| } |
| return n.raftStore.Append(ents) |
| } |
| |
| func (n *Node) newRaftLogs(nodeID string) (raft.Peer, error) { |
| raftNode := &api.RaftMember{ |
| RaftID: n.Config.ID, |
| NodeID: nodeID, |
| Addr: n.opts.Addr, |
| } |
| metadata, err := raftNode.Marshal() |
| if err != nil { |
| return raft.Peer{}, errors.Wrap(err, "error marshalling raft node") |
| } |
| if err := n.raftLogger.BootstrapNew(metadata); err != nil { |
| return raft.Peer{}, err |
| } |
| n.cluster.AddMember(&membership.Member{RaftMember: raftNode}) |
| return raft.Peer{ID: n.Config.ID, Context: metadata}, nil |
| } |
| |
| func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) { |
| snapshot := api.Snapshot{Version: api.Snapshot_V0} |
| for _, member := range n.cluster.Members() { |
| snapshot.Membership.Members = append(snapshot.Membership.Members, |
| &api.RaftMember{ |
| NodeID: member.NodeID, |
| RaftID: member.RaftID, |
| Addr: member.Addr, |
| }) |
| } |
| snapshot.Membership.Removed = n.cluster.Removed() |
| |
| viewStarted := make(chan struct{}) |
| n.asyncTasks.Add(1) |
| n.snapshotInProgress = make(chan raftpb.SnapshotMetadata, 1) // buffered in case Shutdown is called during the snapshot |
| go func(appliedIndex uint64, snapshotMeta raftpb.SnapshotMetadata) { |
| defer func() { |
| n.asyncTasks.Done() |
| n.snapshotInProgress <- snapshotMeta |
| }() |
| var err error |
| n.memoryStore.View(func(tx store.ReadTx) { |
| close(viewStarted) |
| |
| var storeSnapshot *api.StoreSnapshot |
| storeSnapshot, err = n.memoryStore.Save(tx) |
| snapshot.Store = *storeSnapshot |
| }) |
| if err != nil { |
| log.G(ctx).WithError(err).Error("failed to read snapshot from store") |
| return |
| } |
| |
| d, err := snapshot.Marshal() |
| if err != nil { |
| log.G(ctx).WithError(err).Error("failed to marshal snapshot") |
| return |
| } |
| snap, err := n.raftStore.CreateSnapshot(appliedIndex, &n.confState, d) |
| if err == nil { |
| if err := n.raftLogger.SaveSnapshot(snap); err != nil { |
| log.G(ctx).WithError(err).Error("failed to save snapshot") |
| return |
| } |
| snapshotMeta = snap.Metadata |
| |
| if appliedIndex > raftConfig.LogEntriesForSlowFollowers { |
| err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers) |
| if err != nil && err != raft.ErrCompacted { |
| log.G(ctx).WithError(err).Error("failed to compact snapshot") |
| } |
| } |
| } else if err != raft.ErrSnapOutOfDate { |
| log.G(ctx).WithError(err).Error("failed to create snapshot") |
| } |
| }(n.appliedIndex, n.snapshotMeta) |
| |
| // Wait for the goroutine to establish a read transaction, to make |
| // sure it sees the state as of this moment. |
| <-viewStarted |
| } |
| |
| func (n *Node) clusterSnapshot(data []byte) (api.ClusterSnapshot, error) { |
| var snapshot api.Snapshot |
| if err := snapshot.Unmarshal(data); err != nil { |
| return snapshot.Membership, err |
| } |
| if snapshot.Version != api.Snapshot_V0 { |
| return snapshot.Membership, fmt.Errorf("unrecognized snapshot version %d", snapshot.Version) |
| } |
| |
| if err := n.memoryStore.Restore(&snapshot.Store); err != nil { |
| return snapshot.Membership, err |
| } |
| |
| return snapshot.Membership, nil |
| } |