blob: a1d2873e801394c59f31c9c82e29ccc5384f9cb8 [file] [log] [blame]
package global
import (
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/constraint"
"github.com/docker/swarmkit/manager/orchestrator"
"github.com/docker/swarmkit/manager/orchestrator/restart"
"github.com/docker/swarmkit/manager/orchestrator/taskinit"
"github.com/docker/swarmkit/manager/orchestrator/update"
"github.com/docker/swarmkit/manager/state/store"
"golang.org/x/net/context"
)
type globalService struct {
*api.Service
// Compiled constraints
constraints []constraint.Constraint
}
// Orchestrator runs a reconciliation loop to create and destroy tasks as
// necessary for global services.
type Orchestrator struct {
store *store.MemoryStore
// nodes is the set of non-drained nodes in the cluster, indexed by node ID
nodes map[string]*api.Node
// globalServices has all the global services in the cluster, indexed by ServiceID
globalServices map[string]globalService
restartTasks map[string]struct{}
// stopChan signals to the state machine to stop running.
stopChan chan struct{}
// doneChan is closed when the state machine terminates.
doneChan chan struct{}
updater *update.Supervisor
restarts *restart.Supervisor
cluster *api.Cluster // local instance of the cluster
}
// NewGlobalOrchestrator creates a new global Orchestrator
func NewGlobalOrchestrator(store *store.MemoryStore) *Orchestrator {
restartSupervisor := restart.NewSupervisor(store)
updater := update.NewSupervisor(store, restartSupervisor)
return &Orchestrator{
store: store,
nodes: make(map[string]*api.Node),
globalServices: make(map[string]globalService),
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
updater: updater,
restarts: restartSupervisor,
restartTasks: make(map[string]struct{}),
}
}
func (g *Orchestrator) initTasks(ctx context.Context, readTx store.ReadTx) error {
return taskinit.CheckTasks(ctx, g.store, readTx, g, g.restarts)
}
// Run contains the global orchestrator event loop
func (g *Orchestrator) Run(ctx context.Context) error {
defer close(g.doneChan)
// Watch changes to services and tasks
queue := g.store.WatchQueue()
watcher, cancel := queue.Watch()
defer cancel()
// lookup the cluster
var err error
g.store.View(func(readTx store.ReadTx) {
var clusters []*api.Cluster
clusters, err = store.FindClusters(readTx, store.ByName("default"))
if len(clusters) != 1 {
return // just pick up the cluster when it is created.
}
g.cluster = clusters[0]
})
if err != nil {
return err
}
// Get list of nodes
var nodes []*api.Node
g.store.View(func(readTx store.ReadTx) {
nodes, err = store.FindNodes(readTx, store.All)
})
if err != nil {
return err
}
for _, n := range nodes {
g.updateNode(n)
}
// Lookup global services
var existingServices []*api.Service
g.store.View(func(readTx store.ReadTx) {
existingServices, err = store.FindServices(readTx, store.All)
})
if err != nil {
return err
}
var reconcileServiceIDs []string
for _, s := range existingServices {
if orchestrator.IsGlobalService(s) {
g.updateService(s)
reconcileServiceIDs = append(reconcileServiceIDs, s.ID)
}
}
// fix tasks in store before reconciliation loop
g.store.View(func(readTx store.ReadTx) {
err = g.initTasks(ctx, readTx)
})
if err != nil {
return err
}
g.tickTasks(ctx)
g.reconcileServices(ctx, reconcileServiceIDs)
for {
select {
case event := <-watcher:
// TODO(stevvooe): Use ctx to limit running time of operation.
switch v := event.(type) {
case api.EventUpdateCluster:
g.cluster = v.Cluster
case api.EventCreateService:
if !orchestrator.IsGlobalService(v.Service) {
continue
}
g.updateService(v.Service)
g.reconcileServices(ctx, []string{v.Service.ID})
case api.EventUpdateService:
if !orchestrator.IsGlobalService(v.Service) {
continue
}
g.updateService(v.Service)
g.reconcileServices(ctx, []string{v.Service.ID})
case api.EventDeleteService:
if !orchestrator.IsGlobalService(v.Service) {
continue
}
orchestrator.DeleteServiceTasks(ctx, g.store, v.Service)
// delete the service from service map
delete(g.globalServices, v.Service.ID)
g.restarts.ClearServiceHistory(v.Service.ID)
case api.EventCreateNode:
g.updateNode(v.Node)
g.reconcileOneNode(ctx, v.Node)
case api.EventUpdateNode:
g.updateNode(v.Node)
g.reconcileOneNode(ctx, v.Node)
case api.EventDeleteNode:
g.foreachTaskFromNode(ctx, v.Node, g.deleteTask)
delete(g.nodes, v.Node.ID)
case api.EventUpdateTask:
g.handleTaskChange(ctx, v.Task)
}
case <-g.stopChan:
return nil
}
g.tickTasks(ctx)
}
}
// FixTask validates a task with the current cluster settings, and takes
// action to make it conformant to node state and service constraint
// it's called at orchestrator initialization
func (g *Orchestrator) FixTask(ctx context.Context, batch *store.Batch, t *api.Task) {
if _, exists := g.globalServices[t.ServiceID]; !exists {
return
}
// if a task's DesiredState has past running, the task has been processed
if t.DesiredState > api.TaskStateRunning {
return
}
var node *api.Node
if t.NodeID != "" {
node = g.nodes[t.NodeID]
}
// if the node no longer valid, remove the task
if t.NodeID == "" || orchestrator.InvalidNode(node) {
g.shutdownTask(ctx, batch, t)
return
}
// restart a task if it fails
if t.Status.State > api.TaskStateRunning {
g.restartTasks[t.ID] = struct{}{}
}
}
// handleTaskChange defines what orchestrator does when a task is updated by agent
func (g *Orchestrator) handleTaskChange(ctx context.Context, t *api.Task) {
if _, exists := g.globalServices[t.ServiceID]; !exists {
return
}
// if a task's DesiredState has passed running, it
// means the task has been processed
if t.DesiredState > api.TaskStateRunning {
return
}
// if a task has passed running, restart it
if t.Status.State > api.TaskStateRunning {
g.restartTasks[t.ID] = struct{}{}
}
}
// Stop stops the orchestrator.
func (g *Orchestrator) Stop() {
close(g.stopChan)
<-g.doneChan
g.updater.CancelAll()
g.restarts.CancelAll()
}
func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node, cb func(context.Context, *store.Batch, *api.Task)) {
var (
tasks []*api.Task
err error
)
g.store.View(func(tx store.ReadTx) {
tasks, err = store.FindTasks(tx, store.ByNodeID(node.ID))
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed finding tasks")
return
}
err = g.store.Batch(func(batch *store.Batch) error {
for _, t := range tasks {
// Global orchestrator only removes tasks from globalServices
if _, exists := g.globalServices[t.ServiceID]; exists {
cb(ctx, batch, t)
}
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed batching tasks")
}
}
func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []string) {
nodeTasks := make(map[string]map[string][]*api.Task)
g.store.View(func(tx store.ReadTx) {
for _, serviceID := range serviceIDs {
service := g.globalServices[serviceID].Service
if service == nil {
continue
}
tasks, err := store.FindTasks(tx, store.ByServiceID(serviceID))
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices failed finding tasks for service %s", serviceID)
continue
}
// nodeID -> task list
nodeTasks[serviceID] = make(map[string][]*api.Task)
for _, t := range tasks {
nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t)
}
// Keep all runnable instances of this service,
// and instances that were not be restarted due
// to restart policy but may be updated if the
// service spec changed.
for nodeID, slot := range nodeTasks[serviceID] {
updatable := g.restarts.UpdatableTasksInSlot(ctx, slot, g.globalServices[serviceID].Service)
if len(updatable) != 0 {
nodeTasks[serviceID][nodeID] = updatable
} else {
delete(nodeTasks[serviceID], nodeID)
}
}
}
})
updates := make(map[*api.Service][]orchestrator.Slot)
err := g.store.Batch(func(batch *store.Batch) error {
for _, serviceID := range serviceIDs {
var updateTasks []orchestrator.Slot
if _, exists := nodeTasks[serviceID]; !exists {
continue
}
service := g.globalServices[serviceID]
for nodeID, node := range g.nodes {
meetsConstraints := constraint.NodeMatches(service.constraints, node)
ntasks := nodeTasks[serviceID][nodeID]
delete(nodeTasks[serviceID], nodeID)
if !meetsConstraints {
g.shutdownTasks(ctx, batch, ntasks)
continue
}
if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update
// any tasks
continue
}
// this node needs to run 1 copy of the task
if len(ntasks) == 0 {
g.addTask(ctx, batch, service.Service, nodeID)
} else {
updateTasks = append(updateTasks, ntasks)
}
}
if len(updateTasks) > 0 {
updates[service.Service] = updateTasks
}
// Remove any tasks assigned to nodes not found in g.nodes.
// These must be associated with nodes that are drained, or
// nodes that no longer exist.
for _, ntasks := range nodeTasks[serviceID] {
g.shutdownTasks(ctx, batch, ntasks)
}
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices transaction failed")
}
for service, updateTasks := range updates {
g.updater.Update(ctx, g.cluster, service, updateTasks)
}
}
// updateNode updates g.nodes based on the current node value
func (g *Orchestrator) updateNode(node *api.Node) {
if node.Spec.Availability == api.NodeAvailabilityDrain || node.Status.State == api.NodeStatus_DOWN {
delete(g.nodes, node.ID)
} else {
g.nodes[node.ID] = node
}
}
// updateService updates g.globalServices based on the current service value
func (g *Orchestrator) updateService(service *api.Service) {
var constraints []constraint.Constraint
if service.Spec.Task.Placement != nil && len(service.Spec.Task.Placement.Constraints) != 0 {
constraints, _ = constraint.Parse(service.Spec.Task.Placement.Constraints)
}
g.globalServices[service.ID] = globalService{
Service: service,
constraints: constraints,
}
}
// reconcileOneNode checks all global services on one node
func (g *Orchestrator) reconcileOneNode(ctx context.Context, node *api.Node) {
if node.Spec.Availability == api.NodeAvailabilityDrain {
log.G(ctx).Debugf("global orchestrator: node %s in drain state, shutting down its tasks", node.ID)
g.foreachTaskFromNode(ctx, node, g.shutdownTask)
return
}
if node.Status.State == api.NodeStatus_DOWN {
log.G(ctx).Debugf("global orchestrator: node %s is down, shutting down its tasks", node.ID)
g.foreachTaskFromNode(ctx, node, g.shutdownTask)
return
}
if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update tasks
return
}
node, exists := g.nodes[node.ID]
if !exists {
return
}
// tasks by service
tasks := make(map[string][]*api.Task)
var (
tasksOnNode []*api.Task
err error
)
g.store.View(func(tx store.ReadTx) {
tasksOnNode, err = store.FindTasks(tx, store.ByNodeID(node.ID))
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcile failed finding tasks on node %s", node.ID)
return
}
for serviceID, service := range g.globalServices {
for _, t := range tasksOnNode {
if t.ServiceID != serviceID {
continue
}
tasks[serviceID] = append(tasks[serviceID], t)
}
// Keep all runnable instances of this service,
// and instances that were not be restarted due
// to restart policy but may be updated if the
// service spec changed.
for serviceID, slot := range tasks {
updatable := g.restarts.UpdatableTasksInSlot(ctx, slot, service.Service)
if len(updatable) != 0 {
tasks[serviceID] = updatable
} else {
delete(tasks, serviceID)
}
}
}
err = g.store.Batch(func(batch *store.Batch) error {
for serviceID, service := range g.globalServices {
if !constraint.NodeMatches(service.constraints, node) {
continue
}
if len(tasks) == 0 {
g.addTask(ctx, batch, service.Service, node.ID)
} else {
// If task is out of date, update it. This can happen
// on node reconciliation if, for example, we pause a
// node, update the service, and then activate the node
// later.
// We don't use g.updater here for two reasons:
// - This is not a rolling update. Since it was not
// triggered directly by updating the service, it
// should not observe the rolling update parameters
// or show status in UpdateStatus.
// - Calling Update cancels any current rolling updates
// for the service, such as one triggered by service
// reconciliation.
var (
dirtyTasks []*api.Task
cleanTasks []*api.Task
)
for _, t := range tasks[serviceID] {
if orchestrator.IsTaskDirty(service.Service, t) {
dirtyTasks = append(dirtyTasks, t)
} else {
cleanTasks = append(cleanTasks, t)
}
}
if len(cleanTasks) == 0 {
g.addTask(ctx, batch, service.Service, node.ID)
} else {
dirtyTasks = append(dirtyTasks, cleanTasks[1:]...)
}
g.shutdownTasks(ctx, batch, dirtyTasks)
}
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServiceOneNode batch failed")
}
}
func (g *Orchestrator) tickTasks(ctx context.Context) {
if len(g.restartTasks) == 0 {
return
}
err := g.store.Batch(func(batch *store.Batch) error {
for taskID := range g.restartTasks {
err := batch.Update(func(tx store.Tx) error {
t := store.GetTask(tx, taskID)
if t == nil || t.DesiredState > api.TaskStateRunning {
return nil
}
service := store.GetService(tx, t.ServiceID)
if service == nil {
return nil
}
node, nodeExists := g.nodes[t.NodeID]
serviceEntry, serviceExists := g.globalServices[t.ServiceID]
if !nodeExists || !serviceExists {
return nil
}
if node.Spec.Availability == api.NodeAvailabilityPause ||
!constraint.NodeMatches(serviceEntry.constraints, node) {
t.DesiredState = api.TaskStateShutdown
return store.UpdateTask(tx, t)
}
return g.restarts.Restart(ctx, tx, g.cluster, service, *t)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("orchestrator restartTask transaction failed")
}
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: restartTask transaction failed")
}
g.restartTasks = make(map[string]struct{})
}
func (g *Orchestrator) shutdownTask(ctx context.Context, batch *store.Batch, t *api.Task) {
// set existing task DesiredState to TaskStateShutdown
// TODO(aaronl): optimistic update?
err := batch.Update(func(tx store.Tx) error {
t = store.GetTask(tx, t.ID)
if t != nil && t.DesiredState < api.TaskStateShutdown {
t.DesiredState = api.TaskStateShutdown
return store.UpdateTask(tx, t)
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: shutdownTask failed to shut down %s", t.ID)
}
}
func (g *Orchestrator) addTask(ctx context.Context, batch *store.Batch, service *api.Service, nodeID string) {
task := orchestrator.NewTask(g.cluster, service, 0, nodeID)
err := batch.Update(func(tx store.Tx) error {
if store.GetService(tx, service.ID) == nil {
return nil
}
return store.CreateTask(tx, task)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: failed to create task")
}
}
func (g *Orchestrator) shutdownTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) {
for _, t := range tasks {
g.shutdownTask(ctx, batch, t)
}
}
func (g *Orchestrator) deleteTask(ctx context.Context, batch *store.Batch, t *api.Task) {
err := batch.Update(func(tx store.Tx) error {
return store.DeleteTask(tx, t.ID)
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: deleteTask failed to delete %s", t.ID)
}
}
// IsRelatedService returns true if the service should be governed by this orchestrator
func (g *Orchestrator) IsRelatedService(service *api.Service) bool {
return orchestrator.IsGlobalService(service)
}
// SlotTuple returns a slot tuple for the global service task.
func (g *Orchestrator) SlotTuple(t *api.Task) orchestrator.SlotTuple {
return orchestrator.SlotTuple{
ServiceID: t.ServiceID,
NodeID: t.NodeID,
}
}
func isTaskCompleted(t *api.Task, restartPolicy api.RestartPolicy_RestartCondition) bool {
if t == nil || t.DesiredState <= api.TaskStateRunning {
return false
}
return restartPolicy == api.RestartOnNone ||
(restartPolicy == api.RestartOnFailure && t.Status.State == api.TaskStateCompleted)
}