blob: bcef801f63ebfe613d2fd0f940aae0b09958102f [file] [log] [blame]
package taskreaper
import (
const (
// maxDirty is the size threshold for running a task pruning operation.
maxDirty = 1000
// reaperBatchingInterval is how often to prune old tasks.
reaperBatchingInterval = 250 * time.Millisecond
// A TaskReaper deletes old tasks when more than TaskHistoryRetentionLimit tasks
// exist for the same service/instance or service/nodeid combination.
type TaskReaper struct {
store *store.MemoryStore
// taskHistory is the number of tasks to keep
taskHistory int64
// List of slot tubles to be inspected for task history cleanup.
dirty map[orchestrator.SlotTuple]struct{}
// List of tasks collected for cleanup, which includes two kinds of tasks
// - serviceless orphaned tasks
// - tasks with desired state REMOVE that have already been shut down
cleanup []string
stopChan chan struct{}
doneChan chan struct{}
// New creates a new TaskReaper.
func New(store *store.MemoryStore) *TaskReaper {
return &TaskReaper{
store: store,
dirty: make(map[orchestrator.SlotTuple]struct{}),
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
// Run is the TaskReaper's watch loop which collects candidates for cleanup.
// Task history is mainly used in task restarts but is also available for administrative purposes.
// Note that the task history is stored per-slot-per-service for replicated services
// and per-node-per-service for global services. History does not apply to serviceless
// since they are not attached to a service. In addition, the TaskReaper watch loop is also
// responsible for cleaning up tasks associated with slots that were removed as part of
// service scale down or service removal.
func (tr *TaskReaper) Run(ctx context.Context) {
watcher, watchCancel := state.Watch(, api.EventCreateTask{}, api.EventUpdateTask{}, api.EventUpdateCluster{})
defer func() {
var orphanedTasks []*api.Task
var removeTasks []*api.Task store.ReadTx) {
var err error
clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
if err == nil && len(clusters) == 1 {
tr.taskHistory = clusters[0].Spec.Orchestration.TaskHistoryRetentionLimit
// On startup, scan the entire store and inspect orphaned tasks from previous life.
orphanedTasks, err = store.FindTasks(readTx, store.ByTaskState(api.TaskStateOrphaned))
if err != nil {
log.G(ctx).WithError(err).Error("failed to find Orphaned tasks in task reaper init")
removeTasks, err = store.FindTasks(readTx, store.ByDesiredState(api.TaskStateRemove))
if err != nil {
log.G(ctx).WithError(err).Error("failed to find tasks with desired state REMOVE in task reaper init")
if len(orphanedTasks)+len(removeTasks) > 0 {
for _, t := range orphanedTasks {
// Do not reap service tasks immediately.
// Let them go through the regular history cleanup process
// of checking TaskHistoryRetentionLimit.
if t.ServiceID != "" {
// Serviceless tasks can be cleaned up right away since they are not attached to a service.
tr.cleanup = append(tr.cleanup, t.ID)
// tasks with desired state REMOVE that have progressed beyond SHUTDOWN can be cleaned up
// right away
for _, t := range removeTasks {
if t.Status.State >= api.TaskStateShutdown {
tr.cleanup = append(tr.cleanup, t.ID)
// Clean up tasks in 'cleanup' right away
if len(tr.cleanup) > 0 {
// Clean up when we hit TaskHistoryRetentionLimit or when the timer expires,
// whichever happens first.
timer := time.NewTimer(reaperBatchingInterval)
// Watch for:
// 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot.
// 2. EventUpdateTask for cleaning
// - serviceless orphaned tasks (when orchestrator updates the task status to ORPHANED)
// - tasks which have desired state REMOVE and have been shut down by the agent
// (these are tasks which are associated with slots removed as part of service
// remove or scale down)
// 3. EventUpdateCluster for TaskHistoryRetentionLimit update.
for {
select {
case event := <-watcher:
switch v := event.(type) {
case api.EventCreateTask:
t := v.Task
Slot: t.Slot,
ServiceID: t.ServiceID,
NodeID: t.NodeID,
}] = struct{}{}
case api.EventUpdateTask:
t := v.Task
// add serviceless orphaned tasks
if t.Status.State >= api.TaskStateOrphaned && t.ServiceID == "" {
tr.cleanup = append(tr.cleanup, t.ID)
// add tasks that have progressed beyond SHUTDOWN and have desired state REMOVE. These
// tasks are associated with slots that were removed as part of a service scale down
// or service removal.
if t.DesiredState == api.TaskStateRemove && t.Status.State >= api.TaskStateShutdown {
tr.cleanup = append(tr.cleanup, t.ID)
case api.EventUpdateCluster:
tr.taskHistory = v.Cluster.Spec.Orchestration.TaskHistoryRetentionLimit
if len(tr.dirty)+len(tr.cleanup) > maxDirty {
} else {
case <-timer.C:
case <-tr.stopChan:
// tick performs task history cleanup.
func (tr *TaskReaper) tick() {
if len(tr.dirty) == 0 && len(tr.cleanup) == 0 {
defer func() {
tr.cleanup = nil
deleteTasks := make(map[string]struct{})
for _, tID := range tr.cleanup {
deleteTasks[tID] = struct{}{}
// Check history of dirty tasks for cleanup. store.ReadTx) {
for dirty := range tr.dirty {
service := store.GetService(tx, dirty.ServiceID)
if service == nil {
taskHistory := tr.taskHistory
// If MaxAttempts is set, keep at least one more than
// that number of tasks (this overrides TaskHistoryRetentionLimit).
// This is necessary to reconstruct restart history when the orchestrator starts up.
// TODO(aaronl): Consider hiding tasks beyond the normal
// retention limit in the UI.
// TODO(aaronl): There are some ways to cut down the
// number of retained tasks at the cost of more
// complexity:
// - Don't force retention of tasks with an older spec
// version.
// - Don't force retention of tasks outside of the
// time window configured for restart lookback.
if service.Spec.Task.Restart != nil && service.Spec.Task.Restart.MaxAttempts > 0 {
taskHistory = int64(service.Spec.Task.Restart.MaxAttempts) + 1
// Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history.
if taskHistory < 0 {
var historicTasks []*api.Task
switch service.Spec.GetMode().(type) {
case *api.ServiceSpec_Replicated:
// Clean out the slot for which we received EventCreateTask.
var err error
historicTasks, err = store.FindTasks(tx, store.BySlot(dirty.ServiceID, dirty.Slot))
if err != nil {
case *api.ServiceSpec_Global:
// Clean out the node history in case of global services.
tasksByNode, err := store.FindTasks(tx, store.ByNodeID(dirty.NodeID))
if err != nil {
for _, t := range tasksByNode {
if t.ServiceID == dirty.ServiceID {
historicTasks = append(historicTasks, t)
if int64(len(historicTasks)) <= taskHistory {
// TODO(aaronl): This could filter for non-running tasks and use quickselect
// instead of sorting the whole slice.
// TODO(aaronl): This sort should really use lamport time instead of wall
// clock time. We should store a Version in the Status field.
runningTasks := 0
for _, t := range historicTasks {
if t.DesiredState <= api.TaskStateRunning || t.Status.State <= api.TaskStateRunning {
// Don't delete running tasks
deleteTasks[t.ID] = struct{}{}
if int64(len(historicTasks)) <= taskHistory {
if runningTasks <= 1 {
delete(tr.dirty, dirty)
// Perform cleanup.
if len(deleteTasks) > 0 { *store.Batch) error {
for taskID := range deleteTasks {
batch.Update(func(tx store.Tx) error {
return store.DeleteTask(tx, taskID)
return nil
// Stop stops the TaskReaper and waits for the main loop to exit.
func (tr *TaskReaper) Stop() {