| package cluster // import "github.com/docker/docker/daemon/cluster" |
| |
| import ( |
| "context" |
| "fmt" |
| "path/filepath" |
| "runtime" |
| "strings" |
| "sync" |
| "time" |
| |
| types "github.com/docker/docker/api/types/swarm" |
| "github.com/docker/docker/daemon/cluster/executor/container" |
| lncluster "github.com/docker/libnetwork/cluster" |
| swarmapi "github.com/docker/swarmkit/api" |
| swarmallocator "github.com/docker/swarmkit/manager/allocator/cnmallocator" |
| swarmnode "github.com/docker/swarmkit/node" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| // nodeRunner implements a manager for continuously running swarmkit node, restarting them with backoff delays if needed. |
| type nodeRunner struct { |
| nodeState |
| mu sync.RWMutex |
| done chan struct{} // closed when swarmNode exits |
| ready chan struct{} // closed when swarmNode becomes active |
| reconnectDelay time.Duration |
| config nodeStartConfig |
| |
| repeatedRun bool |
| cancelReconnect func() |
| stopping bool |
| cluster *Cluster // only for accessing config helpers, never call any methods. TODO: change to config struct |
| } |
| |
| // nodeStartConfig holds configuration needed to start a new node. Exported |
| // fields of this structure are saved to disk in json. Unexported fields |
| // contain data that shouldn't be persisted between daemon reloads. |
| type nodeStartConfig struct { |
| // LocalAddr is this machine's local IP or hostname, if specified. |
| LocalAddr string |
| // RemoteAddr is the address that was given to "swarm join". It is used |
| // to find LocalAddr if necessary. |
| RemoteAddr string |
| // ListenAddr is the address we bind to, including a port. |
| ListenAddr string |
| // AdvertiseAddr is the address other nodes should connect to, |
| // including a port. |
| AdvertiseAddr string |
| // DataPathAddr is the address that has to be used for the data path |
| DataPathAddr string |
| // DefaultAddressPool contains list of subnets |
| DefaultAddressPool []string |
| // SubnetSize contains subnet size of DefaultAddressPool |
| SubnetSize uint32 |
| // DataPathPort contains Data path port (VXLAN UDP port) number that is used for data traffic. |
| DataPathPort uint32 |
| // JoinInProgress is set to true if a join operation has started, but |
| // not completed yet. |
| JoinInProgress bool |
| |
| joinAddr string |
| forceNewCluster bool |
| joinToken string |
| lockKey []byte |
| autolock bool |
| availability types.NodeAvailability |
| } |
| |
| func (n *nodeRunner) Ready() chan error { |
| c := make(chan error, 1) |
| n.mu.RLock() |
| ready, done := n.ready, n.done |
| n.mu.RUnlock() |
| go func() { |
| select { |
| case <-ready: |
| case <-done: |
| } |
| select { |
| case <-ready: |
| default: |
| n.mu.RLock() |
| c <- n.err |
| n.mu.RUnlock() |
| } |
| close(c) |
| }() |
| return c |
| } |
| |
| func (n *nodeRunner) Start(conf nodeStartConfig) error { |
| n.mu.Lock() |
| defer n.mu.Unlock() |
| |
| n.reconnectDelay = initialReconnectDelay |
| |
| return n.start(conf) |
| } |
| |
| func (n *nodeRunner) start(conf nodeStartConfig) error { |
| var control string |
| if runtime.GOOS == "windows" { |
| control = `\\.\pipe\` + controlSocket |
| } else { |
| control = filepath.Join(n.cluster.runtimeRoot, controlSocket) |
| } |
| |
| joinAddr := conf.joinAddr |
| if joinAddr == "" && conf.JoinInProgress { |
| // We must have been restarted while trying to join a cluster. |
| // Continue trying to join instead of forming our own cluster. |
| joinAddr = conf.RemoteAddr |
| } |
| |
| // Hostname is not set here. Instead, it is obtained from |
| // the node description that is reported periodically |
| swarmnodeConfig := swarmnode.Config{ |
| ForceNewCluster: conf.forceNewCluster, |
| ListenControlAPI: control, |
| ListenRemoteAPI: conf.ListenAddr, |
| AdvertiseRemoteAPI: conf.AdvertiseAddr, |
| NetworkConfig: &swarmallocator.NetworkConfig{ |
| DefaultAddrPool: conf.DefaultAddressPool, |
| SubnetSize: conf.SubnetSize, |
| VXLANUDPPort: conf.DataPathPort, |
| }, |
| JoinAddr: joinAddr, |
| StateDir: n.cluster.root, |
| JoinToken: conf.joinToken, |
| Executor: container.NewExecutor( |
| n.cluster.config.Backend, |
| n.cluster.config.PluginBackend, |
| n.cluster.config.ImageBackend, |
| n.cluster.config.VolumeBackend, |
| ), |
| HeartbeatTick: n.cluster.config.RaftHeartbeatTick, |
| // Recommended value in etcd/raft is 10 x (HeartbeatTick). |
| // Lower values were seen to have caused instability because of |
| // frequent leader elections when running on flakey networks. |
| ElectionTick: n.cluster.config.RaftElectionTick, |
| UnlockKey: conf.lockKey, |
| AutoLockManagers: conf.autolock, |
| PluginGetter: n.cluster.config.Backend.PluginGetter(), |
| } |
| if conf.availability != "" { |
| avail, ok := swarmapi.NodeSpec_Availability_value[strings.ToUpper(string(conf.availability))] |
| if !ok { |
| return fmt.Errorf("invalid Availability: %q", conf.availability) |
| } |
| swarmnodeConfig.Availability = swarmapi.NodeSpec_Availability(avail) |
| } |
| node, err := swarmnode.New(&swarmnodeConfig) |
| if err != nil { |
| return err |
| } |
| if err := node.Start(context.Background()); err != nil { |
| return err |
| } |
| |
| n.done = make(chan struct{}) |
| n.ready = make(chan struct{}) |
| n.swarmNode = node |
| if conf.joinAddr != "" { |
| conf.JoinInProgress = true |
| } |
| n.config = conf |
| savePersistentState(n.cluster.root, conf) |
| |
| ctx, cancel := context.WithCancel(context.Background()) |
| |
| go func() { |
| n.handleNodeExit(node) |
| cancel() |
| }() |
| |
| go n.handleReadyEvent(ctx, node, n.ready) |
| go n.handleControlSocketChange(ctx, node) |
| |
| return nil |
| } |
| |
| func (n *nodeRunner) handleControlSocketChange(ctx context.Context, node *swarmnode.Node) { |
| for conn := range node.ListenControlSocket(ctx) { |
| n.mu.Lock() |
| if n.grpcConn != conn { |
| if conn == nil { |
| n.controlClient = nil |
| n.logsClient = nil |
| } else { |
| n.controlClient = swarmapi.NewControlClient(conn) |
| n.logsClient = swarmapi.NewLogsClient(conn) |
| // push store changes to daemon |
| go n.watchClusterEvents(ctx, conn) |
| } |
| } |
| n.grpcConn = conn |
| n.mu.Unlock() |
| n.cluster.SendClusterEvent(lncluster.EventSocketChange) |
| } |
| } |
| |
| func (n *nodeRunner) watchClusterEvents(ctx context.Context, conn *grpc.ClientConn) { |
| client := swarmapi.NewWatchClient(conn) |
| watch, err := client.Watch(ctx, &swarmapi.WatchRequest{ |
| Entries: []*swarmapi.WatchRequest_WatchEntry{ |
| { |
| Kind: "node", |
| Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove, |
| }, |
| { |
| Kind: "service", |
| Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove, |
| }, |
| { |
| Kind: "network", |
| Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove, |
| }, |
| { |
| Kind: "secret", |
| Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove, |
| }, |
| { |
| Kind: "config", |
| Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove, |
| }, |
| }, |
| IncludeOldObject: true, |
| }) |
| if err != nil { |
| logrus.WithError(err).Error("failed to watch cluster store") |
| return |
| } |
| for { |
| msg, err := watch.Recv() |
| if err != nil { |
| // store watch is broken |
| errStatus, ok := status.FromError(err) |
| if !ok || errStatus.Code() != codes.Canceled { |
| logrus.WithError(err).Error("failed to receive changes from store watch API") |
| } |
| return |
| } |
| select { |
| case <-ctx.Done(): |
| return |
| case n.cluster.watchStream <- msg: |
| } |
| } |
| } |
| |
| func (n *nodeRunner) handleReadyEvent(ctx context.Context, node *swarmnode.Node, ready chan struct{}) { |
| select { |
| case <-node.Ready(): |
| n.mu.Lock() |
| n.err = nil |
| if n.config.JoinInProgress { |
| n.config.JoinInProgress = false |
| savePersistentState(n.cluster.root, n.config) |
| } |
| n.mu.Unlock() |
| close(ready) |
| case <-ctx.Done(): |
| } |
| n.cluster.SendClusterEvent(lncluster.EventNodeReady) |
| } |
| |
| func (n *nodeRunner) handleNodeExit(node *swarmnode.Node) { |
| err := detectLockedError(node.Err(context.Background())) |
| if err != nil { |
| logrus.Errorf("cluster exited with error: %v", err) |
| } |
| n.mu.Lock() |
| n.swarmNode = nil |
| n.err = err |
| close(n.done) |
| select { |
| case <-n.ready: |
| n.enableReconnectWatcher() |
| default: |
| if n.repeatedRun { |
| n.enableReconnectWatcher() |
| } |
| } |
| n.repeatedRun = true |
| n.mu.Unlock() |
| } |
| |
| // Stop stops the current swarm node if it is running. |
| func (n *nodeRunner) Stop() error { |
| n.mu.Lock() |
| if n.cancelReconnect != nil { // between restarts |
| n.cancelReconnect() |
| n.cancelReconnect = nil |
| } |
| if n.swarmNode == nil { |
| // even though the swarm node is nil we still may need |
| // to send a node leave event to perform any cleanup required. |
| if n.cluster != nil { |
| n.cluster.SendClusterEvent(lncluster.EventNodeLeave) |
| } |
| n.mu.Unlock() |
| return nil |
| } |
| n.stopping = true |
| ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) |
| defer cancel() |
| n.mu.Unlock() |
| if err := n.swarmNode.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { |
| return err |
| } |
| n.cluster.SendClusterEvent(lncluster.EventNodeLeave) |
| <-n.done |
| return nil |
| } |
| |
| func (n *nodeRunner) State() nodeState { |
| if n == nil { |
| return nodeState{status: types.LocalNodeStateInactive} |
| } |
| n.mu.RLock() |
| defer n.mu.RUnlock() |
| |
| ns := n.nodeState |
| |
| if ns.err != nil || n.cancelReconnect != nil { |
| if errors.Cause(ns.err) == errSwarmLocked { |
| ns.status = types.LocalNodeStateLocked |
| } else { |
| ns.status = types.LocalNodeStateError |
| } |
| } else { |
| select { |
| case <-n.ready: |
| ns.status = types.LocalNodeStateActive |
| default: |
| ns.status = types.LocalNodeStatePending |
| } |
| } |
| |
| return ns |
| } |
| |
| func (n *nodeRunner) enableReconnectWatcher() { |
| if n.stopping { |
| return |
| } |
| n.reconnectDelay *= 2 |
| if n.reconnectDelay > maxReconnectDelay { |
| n.reconnectDelay = maxReconnectDelay |
| } |
| logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds()) |
| delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay) |
| n.cancelReconnect = cancel |
| |
| go func() { |
| <-delayCtx.Done() |
| if delayCtx.Err() != context.DeadlineExceeded { |
| return |
| } |
| n.mu.Lock() |
| defer n.mu.Unlock() |
| if n.stopping { |
| return |
| } |
| |
| if err := n.start(n.config); err != nil { |
| n.err = err |
| } |
| }() |
| } |
| |
| // nodeState represents information about the current state of the cluster and |
| // provides access to the grpc clients. |
| type nodeState struct { |
| swarmNode *swarmnode.Node |
| grpcConn *grpc.ClientConn |
| controlClient swarmapi.ControlClient |
| logsClient swarmapi.LogsClient |
| status types.LocalNodeState |
| actualLocalAddr string |
| err error |
| } |
| |
| // IsActiveManager returns true if node is a manager ready to accept control requests. It is safe to access the client properties if this returns true. |
| func (ns nodeState) IsActiveManager() bool { |
| return ns.controlClient != nil |
| } |
| |
| // IsManager returns true if node is a manager. |
| func (ns nodeState) IsManager() bool { |
| return ns.swarmNode != nil && ns.swarmNode.Manager() != nil |
| } |
| |
| // NodeID returns node's ID or empty string if node is inactive. |
| func (ns nodeState) NodeID() string { |
| if ns.swarmNode != nil { |
| return ns.swarmNode.NodeID() |
| } |
| return "" |
| } |