| package ca |
| |
| import ( |
| "bytes" |
| "context" |
| "fmt" |
| "reflect" |
| "sync" |
| "time" |
| |
| "github.com/cloudflare/cfssl/helpers" |
| "github.com/docker/swarmkit/api" |
| "github.com/docker/swarmkit/api/equality" |
| "github.com/docker/swarmkit/log" |
| "github.com/docker/swarmkit/manager/state/store" |
| "github.com/pkg/errors" |
| ) |
| |
| // IssuanceStateRotateMaxBatchSize is the maximum number of nodes we'll tell to rotate their certificates in any given update |
| const IssuanceStateRotateMaxBatchSize = 30 |
| |
| func hasIssuer(n *api.Node, info *IssuerInfo) bool { |
| if n.Description == nil || n.Description.TLSInfo == nil { |
| return false |
| } |
| return bytes.Equal(info.Subject, n.Description.TLSInfo.CertIssuerSubject) && bytes.Equal(info.PublicKey, n.Description.TLSInfo.CertIssuerPublicKey) |
| } |
| |
| var errRootRotationChanged = errors.New("target root rotation has changed") |
| |
| // rootRotationReconciler keeps track of all the nodes in the store so that we can determine which ones need reconciliation when nodes are updated |
| // or the root CA is updated. This is meant to be used with watches on nodes and the cluster, and provides functions to be called when the |
| // cluster's RootCA has changed and when a node is added, updated, or removed. |
| type rootRotationReconciler struct { |
| mu sync.Mutex |
| clusterID string |
| batchUpdateInterval time.Duration |
| ctx context.Context |
| store *store.MemoryStore |
| |
| currentRootCA *api.RootCA |
| currentIssuer IssuerInfo |
| unconvergedNodes map[string]*api.Node |
| |
| wg sync.WaitGroup |
| cancel func() |
| } |
| |
| // IssuerFromAPIRootCA returns the desired issuer given an API root CA object |
| func IssuerFromAPIRootCA(rootCA *api.RootCA) (*IssuerInfo, error) { |
| wantedIssuer := rootCA.CACert |
| if rootCA.RootRotation != nil { |
| wantedIssuer = rootCA.RootRotation.CACert |
| } |
| issuerCerts, err := helpers.ParseCertificatesPEM(wantedIssuer) |
| if err != nil { |
| return nil, errors.Wrap(err, "invalid certificate in cluster root CA object") |
| } |
| if len(issuerCerts) == 0 { |
| return nil, errors.New("invalid certificate in cluster root CA object") |
| } |
| return &IssuerInfo{ |
| Subject: issuerCerts[0].RawSubject, |
| PublicKey: issuerCerts[0].RawSubjectPublicKeyInfo, |
| }, nil |
| } |
| |
| // assumption: UpdateRootCA will never be called with a `nil` root CA because the caller will be acting in response to |
| // a store update event |
| func (r *rootRotationReconciler) UpdateRootCA(newRootCA *api.RootCA) { |
| issuerInfo, err := IssuerFromAPIRootCA(newRootCA) |
| if err != nil { |
| log.G(r.ctx).WithError(err).Error("unable to update process the current root CA") |
| return |
| } |
| |
| var ( |
| shouldStartNewLoop, waitForPrevLoop bool |
| loopCtx context.Context |
| ) |
| r.mu.Lock() |
| defer func() { |
| r.mu.Unlock() |
| if shouldStartNewLoop { |
| if waitForPrevLoop { |
| r.wg.Wait() |
| } |
| r.wg.Add(1) |
| go r.runReconcilerLoop(loopCtx, newRootCA) |
| } |
| }() |
| |
| // check if the issuer has changed, first |
| if reflect.DeepEqual(&r.currentIssuer, issuerInfo) { |
| r.currentRootCA = newRootCA |
| return |
| } |
| // If the issuer has changed, iterate through all the nodes to figure out which ones need rotation |
| if newRootCA.RootRotation != nil { |
| var nodes []*api.Node |
| r.store.View(func(tx store.ReadTx) { |
| nodes, err = store.FindNodes(tx, store.All) |
| }) |
| if err != nil { |
| log.G(r.ctx).WithError(err).Error("unable to list nodes, so unable to process the current root CA") |
| return |
| } |
| |
| // from here on out, there will be no more errors that cause us to have to abandon updating the Root CA, |
| // so we can start making changes to r's fields |
| r.unconvergedNodes = make(map[string]*api.Node) |
| for _, n := range nodes { |
| if !hasIssuer(n, issuerInfo) { |
| r.unconvergedNodes[n.ID] = n |
| } |
| } |
| shouldStartNewLoop = true |
| if r.cancel != nil { // there's already a loop going, so cancel it |
| r.cancel() |
| waitForPrevLoop = true |
| } |
| loopCtx, r.cancel = context.WithCancel(r.ctx) |
| } else { |
| r.unconvergedNodes = nil |
| } |
| r.currentRootCA = newRootCA |
| r.currentIssuer = *issuerInfo |
| } |
| |
| // assumption: UpdateNode will never be called with a `nil` node because the caller will be acting in response to |
| // a store update event |
| func (r *rootRotationReconciler) UpdateNode(node *api.Node) { |
| r.mu.Lock() |
| defer r.mu.Unlock() |
| // if we're not in the middle of a root rotation ignore the update |
| if r.currentRootCA == nil || r.currentRootCA.RootRotation == nil { |
| return |
| } |
| if hasIssuer(node, &r.currentIssuer) { |
| delete(r.unconvergedNodes, node.ID) |
| } else { |
| r.unconvergedNodes[node.ID] = node |
| } |
| } |
| |
| // assumption: DeleteNode will never be called with a `nil` node because the caller will be acting in response to |
| // a store update event |
| func (r *rootRotationReconciler) DeleteNode(node *api.Node) { |
| r.mu.Lock() |
| delete(r.unconvergedNodes, node.ID) |
| r.mu.Unlock() |
| } |
| |
| func (r *rootRotationReconciler) runReconcilerLoop(ctx context.Context, loopRootCA *api.RootCA) { |
| defer r.wg.Done() |
| for { |
| r.mu.Lock() |
| if len(r.unconvergedNodes) == 0 { |
| r.mu.Unlock() |
| |
| err := r.store.Update(func(tx store.Tx) error { |
| return r.finishRootRotation(tx, loopRootCA) |
| }) |
| if err == nil { |
| log.G(r.ctx).Info("completed root rotation") |
| return |
| } |
| log.G(r.ctx).WithError(err).Error("could not complete root rotation") |
| if err == errRootRotationChanged { |
| // if the root rotation has changed, this loop will be cancelled anyway, so may as well abort early |
| return |
| } |
| } else { |
| var toUpdate []*api.Node |
| for _, n := range r.unconvergedNodes { |
| iState := n.Certificate.Status.State |
| if iState != api.IssuanceStateRenew && iState != api.IssuanceStatePending && iState != api.IssuanceStateRotate { |
| n = n.Copy() |
| n.Certificate.Status.State = api.IssuanceStateRotate |
| toUpdate = append(toUpdate, n) |
| if len(toUpdate) >= IssuanceStateRotateMaxBatchSize { |
| break |
| } |
| } |
| } |
| r.mu.Unlock() |
| |
| if err := r.batchUpdateNodes(toUpdate); err != nil { |
| log.G(r.ctx).WithError(err).Errorf("store error when trying to batch update %d nodes to request certificate rotation", len(toUpdate)) |
| } |
| } |
| |
| select { |
| case <-ctx.Done(): |
| return |
| case <-time.After(r.batchUpdateInterval): |
| } |
| } |
| } |
| |
| // This function assumes that the expected root CA has root rotation. This is intended to be used by |
| // `reconcileNodeRootsAndCerts`, which uses the root CA from the `lastSeenClusterRootCA`, and checks |
| // that it has a root rotation before calling this function. |
| func (r *rootRotationReconciler) finishRootRotation(tx store.Tx, expectedRootCA *api.RootCA) error { |
| cluster := store.GetCluster(tx, r.clusterID) |
| if cluster == nil { |
| return fmt.Errorf("unable to get cluster %s", r.clusterID) |
| } |
| |
| // If the RootCA object has changed (because another root rotation was started or because some other node |
| // had finished the root rotation), we cannot finish the root rotation that we were working on. |
| if !equality.RootCAEqualStable(expectedRootCA, &cluster.RootCA) { |
| return errRootRotationChanged |
| } |
| |
| var signerCert []byte |
| if len(cluster.RootCA.RootRotation.CAKey) > 0 { |
| signerCert = cluster.RootCA.RootRotation.CACert |
| } |
| // we don't actually have to parse out the default node expiration from the cluster - we are just using |
| // the ca.RootCA object to generate new tokens and the digest |
| updatedRootCA, err := NewRootCA(cluster.RootCA.RootRotation.CACert, signerCert, cluster.RootCA.RootRotation.CAKey, |
| DefaultNodeCertExpiration, nil) |
| if err != nil { |
| return errors.Wrap(err, "invalid cluster root rotation object") |
| } |
| cluster.RootCA = api.RootCA{ |
| CACert: cluster.RootCA.RootRotation.CACert, |
| CAKey: cluster.RootCA.RootRotation.CAKey, |
| CACertHash: updatedRootCA.Digest.String(), |
| JoinTokens: api.JoinTokens{ |
| Worker: GenerateJoinToken(&updatedRootCA, cluster.FIPS), |
| Manager: GenerateJoinToken(&updatedRootCA, cluster.FIPS), |
| }, |
| LastForcedRotation: cluster.RootCA.LastForcedRotation, |
| } |
| return store.UpdateCluster(tx, cluster) |
| } |
| |
| func (r *rootRotationReconciler) batchUpdateNodes(toUpdate []*api.Node) error { |
| if len(toUpdate) == 0 { |
| return nil |
| } |
| err := r.store.Batch(func(batch *store.Batch) error { |
| // Directly update the nodes rather than get + update, and ignore version errors. Since |
| // `rootRotationReconciler` should be hooked up to all node update/delete/create events, we should have |
| // close to the latest versions of all the nodes. If not, the node will updated later and the |
| // next batch of updates should catch it. |
| for _, n := range toUpdate { |
| if err := batch.Update(func(tx store.Tx) error { |
| return store.UpdateNode(tx, n) |
| }); err != nil && err != store.ErrSequenceConflict { |
| log.G(r.ctx).WithError(err).Errorf("unable to update node %s to request a certificate rotation", n.ID) |
| } |
| } |
| return nil |
| }) |
| return err |
| } |