| package cluster // import "github.com/docker/docker/daemon/cluster" |
| |
| import ( |
| "context" |
| "fmt" |
| "net" |
| "strings" |
| "time" |
| |
| apitypes "github.com/docker/docker/api/types" |
| "github.com/docker/docker/api/types/filters" |
| types "github.com/docker/docker/api/types/swarm" |
| "github.com/docker/docker/daemon/cluster/convert" |
| "github.com/docker/docker/errdefs" |
| "github.com/docker/docker/opts" |
| "github.com/docker/docker/pkg/signal" |
| swarmapi "github.com/docker/swarmkit/api" |
| "github.com/docker/swarmkit/manager/encryption" |
| swarmnode "github.com/docker/swarmkit/node" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| ) |
| |
| // Init initializes new cluster from user provided request. |
| func (c *Cluster) Init(req types.InitRequest) (string, error) { |
| c.controlMutex.Lock() |
| defer c.controlMutex.Unlock() |
| if c.nr != nil { |
| if req.ForceNewCluster { |
| |
| // Take c.mu temporarily to wait for presently running |
| // API handlers to finish before shutting down the node. |
| c.mu.Lock() |
| if !c.nr.nodeState.IsManager() { |
| return "", errSwarmNotManager |
| } |
| c.mu.Unlock() |
| |
| if err := c.nr.Stop(); err != nil { |
| return "", err |
| } |
| } else { |
| return "", errSwarmExists |
| } |
| } |
| |
| if err := validateAndSanitizeInitRequest(&req); err != nil { |
| return "", errdefs.InvalidParameter(err) |
| } |
| |
| listenHost, listenPort, err := resolveListenAddr(req.ListenAddr) |
| if err != nil { |
| return "", err |
| } |
| |
| advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort) |
| if err != nil { |
| return "", err |
| } |
| |
| dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr) |
| if err != nil { |
| return "", err |
| } |
| |
| localAddr := listenHost |
| |
| // If the local address is undetermined, the advertise address |
| // will be used as local address, if it belongs to this system. |
| // If the advertise address is not local, then we try to find |
| // a system address to use as local address. If this fails, |
| // we give up and ask the user to pass the listen address. |
| if net.ParseIP(localAddr).IsUnspecified() { |
| advertiseIP := net.ParseIP(advertiseHost) |
| |
| found := false |
| for _, systemIP := range listSystemIPs() { |
| if systemIP.Equal(advertiseIP) { |
| localAddr = advertiseIP.String() |
| found = true |
| break |
| } |
| } |
| |
| if !found { |
| ip, err := c.resolveSystemAddr() |
| if err != nil { |
| logrus.Warnf("Could not find a local address: %v", err) |
| return "", errMustSpecifyListenAddr |
| } |
| localAddr = ip.String() |
| } |
| } |
| |
| //Validate Default Address Pool input |
| if err := validateDefaultAddrPool(req.DefaultAddrPool, req.SubnetSize); err != nil { |
| return "", err |
| } |
| nr, err := c.newNodeRunner(nodeStartConfig{ |
| forceNewCluster: req.ForceNewCluster, |
| autolock: req.AutoLockManagers, |
| LocalAddr: localAddr, |
| ListenAddr: net.JoinHostPort(listenHost, listenPort), |
| AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort), |
| DataPathAddr: dataPathAddr, |
| DefaultAddressPool: req.DefaultAddrPool, |
| SubnetSize: req.SubnetSize, |
| availability: req.Availability, |
| }) |
| if err != nil { |
| return "", err |
| } |
| c.mu.Lock() |
| c.nr = nr |
| c.mu.Unlock() |
| |
| if err := <-nr.Ready(); err != nil { |
| c.mu.Lock() |
| c.nr = nil |
| c.mu.Unlock() |
| if !req.ForceNewCluster { // if failure on first attempt don't keep state |
| if err := clearPersistentState(c.root); err != nil { |
| return "", err |
| } |
| } |
| return "", err |
| } |
| state := nr.State() |
| if state.swarmNode == nil { // should never happen but protect from panic |
| return "", errors.New("invalid cluster state for spec initialization") |
| } |
| if err := initClusterSpec(state.swarmNode, req.Spec); err != nil { |
| return "", err |
| } |
| return state.NodeID(), nil |
| } |
| |
| // Join makes current Cluster part of an existing swarm cluster. |
| func (c *Cluster) Join(req types.JoinRequest) error { |
| c.controlMutex.Lock() |
| defer c.controlMutex.Unlock() |
| c.mu.Lock() |
| if c.nr != nil { |
| c.mu.Unlock() |
| return errors.WithStack(errSwarmExists) |
| } |
| c.mu.Unlock() |
| |
| if err := validateAndSanitizeJoinRequest(&req); err != nil { |
| return errdefs.InvalidParameter(err) |
| } |
| |
| listenHost, listenPort, err := resolveListenAddr(req.ListenAddr) |
| if err != nil { |
| return err |
| } |
| |
| var advertiseAddr string |
| if req.AdvertiseAddr != "" { |
| advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort) |
| // For joining, we don't need to provide an advertise address, |
| // since the remote side can detect it. |
| if err == nil { |
| advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort) |
| } |
| } |
| |
| dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr) |
| if err != nil { |
| return err |
| } |
| |
| nr, err := c.newNodeRunner(nodeStartConfig{ |
| RemoteAddr: req.RemoteAddrs[0], |
| ListenAddr: net.JoinHostPort(listenHost, listenPort), |
| AdvertiseAddr: advertiseAddr, |
| DataPathAddr: dataPathAddr, |
| joinAddr: req.RemoteAddrs[0], |
| joinToken: req.JoinToken, |
| availability: req.Availability, |
| }) |
| if err != nil { |
| return err |
| } |
| |
| c.mu.Lock() |
| c.nr = nr |
| c.mu.Unlock() |
| |
| select { |
| case <-time.After(swarmConnectTimeout): |
| return errSwarmJoinTimeoutReached |
| case err := <-nr.Ready(): |
| if err != nil { |
| c.mu.Lock() |
| c.nr = nil |
| c.mu.Unlock() |
| if err := clearPersistentState(c.root); err != nil { |
| return err |
| } |
| } |
| return err |
| } |
| } |
| |
| // Inspect retrieves the configuration properties of a managed swarm cluster. |
| func (c *Cluster) Inspect() (types.Swarm, error) { |
| var swarm types.Swarm |
| if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { |
| s, err := c.inspect(ctx, state) |
| if err != nil { |
| return err |
| } |
| swarm = s |
| return nil |
| }); err != nil { |
| return types.Swarm{}, err |
| } |
| return swarm, nil |
| } |
| |
| func (c *Cluster) inspect(ctx context.Context, state nodeState) (types.Swarm, error) { |
| s, err := getSwarm(ctx, state.controlClient) |
| if err != nil { |
| return types.Swarm{}, err |
| } |
| return convert.SwarmFromGRPC(*s), nil |
| } |
| |
| // Update updates configuration of a managed swarm cluster. |
| func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error { |
| return c.lockedManagerAction(func(ctx context.Context, state nodeState) error { |
| swarm, err := getSwarm(ctx, state.controlClient) |
| if err != nil { |
| return err |
| } |
| |
| // Validate spec name. |
| if spec.Annotations.Name == "" { |
| spec.Annotations.Name = "default" |
| } else if spec.Annotations.Name != "default" { |
| return errdefs.InvalidParameter(errors.New(`swarm spec must be named "default"`)) |
| } |
| |
| // In update, client should provide the complete spec of the swarm, including |
| // Name and Labels. If a field is specified with 0 or nil, then the default value |
| // will be used to swarmkit. |
| clusterSpec, err := convert.SwarmSpecToGRPC(spec) |
| if err != nil { |
| return errdefs.InvalidParameter(err) |
| } |
| |
| _, err = state.controlClient.UpdateCluster( |
| ctx, |
| &swarmapi.UpdateClusterRequest{ |
| ClusterID: swarm.ID, |
| Spec: &clusterSpec, |
| ClusterVersion: &swarmapi.Version{ |
| Index: version, |
| }, |
| Rotation: swarmapi.KeyRotation{ |
| WorkerJoinToken: flags.RotateWorkerToken, |
| ManagerJoinToken: flags.RotateManagerToken, |
| ManagerUnlockKey: flags.RotateManagerUnlockKey, |
| }, |
| }, |
| ) |
| return err |
| }) |
| } |
| |
| // GetUnlockKey returns the unlock key for the swarm. |
| func (c *Cluster) GetUnlockKey() (string, error) { |
| var resp *swarmapi.GetUnlockKeyResponse |
| if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { |
| client := swarmapi.NewCAClient(state.grpcConn) |
| |
| r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{}) |
| if err != nil { |
| return err |
| } |
| resp = r |
| return nil |
| }); err != nil { |
| return "", err |
| } |
| if len(resp.UnlockKey) == 0 { |
| // no key |
| return "", nil |
| } |
| return encryption.HumanReadableKey(resp.UnlockKey), nil |
| } |
| |
| // UnlockSwarm provides a key to decrypt data that is encrypted at rest. |
| func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error { |
| c.controlMutex.Lock() |
| defer c.controlMutex.Unlock() |
| |
| c.mu.RLock() |
| state := c.currentNodeState() |
| |
| if !state.IsActiveManager() { |
| // when manager is not active, |
| // unless it is locked, otherwise return error. |
| if err := c.errNoManager(state); err != errSwarmLocked { |
| c.mu.RUnlock() |
| return err |
| } |
| } else { |
| // when manager is active, return an error of "not locked" |
| c.mu.RUnlock() |
| return notLockedError{} |
| } |
| |
| // only when swarm is locked, code running reaches here |
| nr := c.nr |
| c.mu.RUnlock() |
| |
| key, err := encryption.ParseHumanReadableKey(req.UnlockKey) |
| if err != nil { |
| return errdefs.InvalidParameter(err) |
| } |
| |
| config := nr.config |
| config.lockKey = key |
| if err := nr.Stop(); err != nil { |
| return err |
| } |
| nr, err = c.newNodeRunner(config) |
| if err != nil { |
| return err |
| } |
| |
| c.mu.Lock() |
| c.nr = nr |
| c.mu.Unlock() |
| |
| if err := <-nr.Ready(); err != nil { |
| if errors.Cause(err) == errSwarmLocked { |
| return invalidUnlockKey{} |
| } |
| return errors.Errorf("swarm component could not be started: %v", err) |
| } |
| return nil |
| } |
| |
| // Leave shuts down Cluster and removes current state. |
| func (c *Cluster) Leave(force bool) error { |
| c.controlMutex.Lock() |
| defer c.controlMutex.Unlock() |
| |
| c.mu.Lock() |
| nr := c.nr |
| if nr == nil { |
| c.mu.Unlock() |
| return errors.WithStack(errNoSwarm) |
| } |
| |
| state := c.currentNodeState() |
| |
| c.mu.Unlock() |
| |
| if errors.Cause(state.err) == errSwarmLocked && !force { |
| // leave a locked swarm without --force is not allowed |
| return errors.WithStack(notAvailableError("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message.")) |
| } |
| |
| if state.IsManager() && !force { |
| msg := "You are attempting to leave the swarm on a node that is participating as a manager. " |
| if state.IsActiveManager() { |
| active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID()) |
| if err == nil { |
| if active && removingManagerCausesLossOfQuorum(reachable, unreachable) { |
| if isLastManager(reachable, unreachable) { |
| msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. " |
| return errors.WithStack(notAvailableError(msg)) |
| } |
| msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable) |
| } |
| } |
| } else { |
| msg += "Doing so may lose the consensus of your cluster. " |
| } |
| |
| msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message." |
| return errors.WithStack(notAvailableError(msg)) |
| } |
| // release readers in here |
| if err := nr.Stop(); err != nil { |
| logrus.Errorf("failed to shut down cluster node: %v", err) |
| signal.DumpStacks("") |
| return err |
| } |
| |
| c.mu.Lock() |
| c.nr = nil |
| c.mu.Unlock() |
| |
| if nodeID := state.NodeID(); nodeID != "" { |
| nodeContainers, err := c.listContainerForNode(nodeID) |
| if err != nil { |
| return err |
| } |
| for _, id := range nodeContainers { |
| if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil { |
| logrus.Errorf("error removing %v: %v", id, err) |
| } |
| } |
| } |
| |
| // todo: cleanup optional? |
| if err := clearPersistentState(c.root); err != nil { |
| return err |
| } |
| c.config.Backend.DaemonLeavesCluster() |
| return nil |
| } |
| |
| // Info returns information about the current cluster state. |
| func (c *Cluster) Info() types.Info { |
| info := types.Info{ |
| NodeAddr: c.GetAdvertiseAddress(), |
| } |
| c.mu.RLock() |
| defer c.mu.RUnlock() |
| |
| state := c.currentNodeState() |
| info.LocalNodeState = state.status |
| if state.err != nil { |
| info.Error = state.err.Error() |
| } |
| |
| ctx, cancel := c.getRequestContext() |
| defer cancel() |
| |
| if state.IsActiveManager() { |
| info.ControlAvailable = true |
| swarm, err := c.inspect(ctx, state) |
| if err != nil { |
| info.Error = err.Error() |
| } |
| |
| info.Cluster = &swarm.ClusterInfo |
| |
| if r, err := state.controlClient.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err != nil { |
| info.Error = err.Error() |
| } else { |
| info.Nodes = len(r.Nodes) |
| for _, n := range r.Nodes { |
| if n.ManagerStatus != nil { |
| info.Managers = info.Managers + 1 |
| } |
| } |
| } |
| } |
| |
| if state.swarmNode != nil { |
| for _, r := range state.swarmNode.Remotes() { |
| info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr}) |
| } |
| info.NodeID = state.swarmNode.NodeID() |
| } |
| |
| return info |
| } |
| |
| func validateAndSanitizeInitRequest(req *types.InitRequest) error { |
| var err error |
| req.ListenAddr, err = validateAddr(req.ListenAddr) |
| if err != nil { |
| return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err) |
| } |
| |
| if req.Spec.Annotations.Name == "" { |
| req.Spec.Annotations.Name = "default" |
| } else if req.Spec.Annotations.Name != "default" { |
| return errors.New(`swarm spec must be named "default"`) |
| } |
| |
| return nil |
| } |
| |
| func validateAndSanitizeJoinRequest(req *types.JoinRequest) error { |
| var err error |
| req.ListenAddr, err = validateAddr(req.ListenAddr) |
| if err != nil { |
| return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err) |
| } |
| if len(req.RemoteAddrs) == 0 { |
| return errors.New("at least 1 RemoteAddr is required to join") |
| } |
| for i := range req.RemoteAddrs { |
| req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i]) |
| if err != nil { |
| return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err) |
| } |
| } |
| return nil |
| } |
| |
| func validateAddr(addr string) (string, error) { |
| if addr == "" { |
| return addr, errors.New("invalid empty address") |
| } |
| newaddr, err := opts.ParseTCPAddr(addr, defaultAddr) |
| if err != nil { |
| return addr, nil |
| } |
| return strings.TrimPrefix(newaddr, "tcp://"), nil |
| } |
| |
| func initClusterSpec(node *swarmnode.Node, spec types.Spec) error { |
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| defer cancel() |
| for conn := range node.ListenControlSocket(ctx) { |
| if ctx.Err() != nil { |
| return ctx.Err() |
| } |
| if conn != nil { |
| client := swarmapi.NewControlClient(conn) |
| var cluster *swarmapi.Cluster |
| for i := 0; ; i++ { |
| lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{}) |
| if err != nil { |
| return fmt.Errorf("error on listing clusters: %v", err) |
| } |
| if len(lcr.Clusters) == 0 { |
| if i < 10 { |
| time.Sleep(200 * time.Millisecond) |
| continue |
| } |
| return errors.New("empty list of clusters was returned") |
| } |
| cluster = lcr.Clusters[0] |
| break |
| } |
| // In init, we take the initial default values from swarmkit, and merge |
| // any non nil or 0 value from spec to GRPC spec. This will leave the |
| // default value alone. |
| // Note that this is different from Update(), as in Update() we expect |
| // user to specify the complete spec of the cluster (as they already know |
| // the existing one and knows which field to update) |
| clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec) |
| if err != nil { |
| return fmt.Errorf("error updating cluster settings: %v", err) |
| } |
| _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{ |
| ClusterID: cluster.ID, |
| ClusterVersion: &cluster.Meta.Version, |
| Spec: &clusterSpec, |
| }) |
| if err != nil { |
| return fmt.Errorf("error updating cluster settings: %v", err) |
| } |
| return nil |
| } |
| } |
| return ctx.Err() |
| } |
| |
| func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) { |
| var ids []string |
| filters := filters.NewArgs() |
| filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID)) |
| containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{ |
| Filters: filters, |
| }) |
| if err != nil { |
| return []string{}, err |
| } |
| for _, c := range containers { |
| ids = append(ids, c.ID) |
| } |
| return ids, nil |
| } |