blob: 7aa7651db726c766474eea12dcd22f4e48087e10 [file] [log] [blame]
package constraintenforcer
import (
"time"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/genericresource"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/constraint"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/protobuf/ptypes"
)
// ConstraintEnforcer watches for updates to nodes and shuts down tasks that no
// longer satisfy scheduling constraints or resource limits.
type ConstraintEnforcer struct {
store *store.MemoryStore
stopChan chan struct{}
doneChan chan struct{}
}
// New creates a new ConstraintEnforcer.
func New(store *store.MemoryStore) *ConstraintEnforcer {
return &ConstraintEnforcer{
store: store,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
}
}
// Run is the ConstraintEnforcer's main loop.
func (ce *ConstraintEnforcer) Run() {
defer close(ce.doneChan)
watcher, cancelWatch := state.Watch(ce.store.WatchQueue(), api.EventUpdateNode{})
defer cancelWatch()
var (
nodes []*api.Node
err error
)
ce.store.View(func(readTx store.ReadTx) {
nodes, err = store.FindNodes(readTx, store.All)
})
if err != nil {
log.L.WithError(err).Error("failed to check nodes for noncompliant tasks")
} else {
for _, node := range nodes {
ce.rejectNoncompliantTasks(node)
}
}
for {
select {
case event := <-watcher:
node := event.(api.EventUpdateNode).Node
ce.rejectNoncompliantTasks(node)
case <-ce.stopChan:
return
}
}
}
func (ce *ConstraintEnforcer) rejectNoncompliantTasks(node *api.Node) {
// If the availability is "drain", the orchestrator will
// shut down all tasks.
// If the availability is "pause", we shouldn't touch
// the tasks on this node.
if node.Spec.Availability != api.NodeAvailabilityActive {
return
}
var (
tasks []*api.Task
err error
)
ce.store.View(func(tx store.ReadTx) {
tasks, err = store.FindTasks(tx, store.ByNodeID(node.ID))
})
if err != nil {
log.L.WithError(err).Errorf("failed to list tasks for node ID %s", node.ID)
}
available := &api.Resources{}
var fakeStore []*api.GenericResource
if node.Description != nil && node.Description.Resources != nil {
available = node.Description.Resources.Copy()
}
removeTasks := make(map[string]*api.Task)
// TODO(aaronl): The set of tasks removed will be
// nondeterministic because it depends on the order of
// the slice returned from FindTasks. We could do
// a separate pass over the tasks for each type of
// resource, and sort by the size of the reservation
// to remove the most resource-intensive tasks.
loop:
for _, t := range tasks {
if t.DesiredState < api.TaskStateAssigned || t.DesiredState > api.TaskStateRunning {
continue
}
// Ensure that the task still meets scheduling
// constraints.
if t.Spec.Placement != nil && len(t.Spec.Placement.Constraints) != 0 {
constraints, _ := constraint.Parse(t.Spec.Placement.Constraints)
if !constraint.NodeMatches(constraints, node) {
removeTasks[t.ID] = t
continue
}
}
// Ensure that the task assigned to the node
// still satisfies the resource limits.
if t.Spec.Resources != nil && t.Spec.Resources.Reservations != nil {
if t.Spec.Resources.Reservations.MemoryBytes > available.MemoryBytes {
removeTasks[t.ID] = t
continue
}
if t.Spec.Resources.Reservations.NanoCPUs > available.NanoCPUs {
removeTasks[t.ID] = t
continue
}
for _, ta := range t.AssignedGenericResources {
// Type change or no longer available
if genericresource.HasResource(ta, available.Generic) {
removeTasks[t.ID] = t
break loop
}
}
available.MemoryBytes -= t.Spec.Resources.Reservations.MemoryBytes
available.NanoCPUs -= t.Spec.Resources.Reservations.NanoCPUs
genericresource.ClaimResources(&available.Generic,
&fakeStore, t.AssignedGenericResources)
}
}
if len(removeTasks) != 0 {
err := ce.store.Batch(func(batch *store.Batch) error {
for _, t := range removeTasks {
err := batch.Update(func(tx store.Tx) error {
t = store.GetTask(tx, t.ID)
if t == nil || t.DesiredState > api.TaskStateRunning {
return nil
}
// We set the observed state to
// REJECTED, rather than the desired
// state. Desired state is owned by the
// orchestrator, and setting it directly
// will bypass actions such as
// restarting the task on another node
// (if applicable).
t.Status.State = api.TaskStateRejected
t.Status.Message = "task rejected by constraint enforcer"
t.Status.Err = "assigned node no longer meets constraints"
t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
return store.UpdateTask(tx, t)
})
if err != nil {
log.L.WithError(err).Errorf("failed to shut down task %s", t.ID)
}
}
return nil
})
if err != nil {
log.L.WithError(err).Errorf("failed to shut down tasks")
}
}
}
// Stop stops the ConstraintEnforcer and waits for the main loop to exit.
func (ce *ConstraintEnforcer) Stop() {
close(ce.stopChan)
<-ce.doneChan
}