| package taskinit |
| |
| import ( |
| "sort" |
| "time" |
| |
| "github.com/docker/swarmkit/api" |
| "github.com/docker/swarmkit/api/defaults" |
| "github.com/docker/swarmkit/log" |
| "github.com/docker/swarmkit/manager/orchestrator" |
| "github.com/docker/swarmkit/manager/orchestrator/restart" |
| "github.com/docker/swarmkit/manager/state/store" |
| gogotypes "github.com/gogo/protobuf/types" |
| "golang.org/x/net/context" |
| ) |
| |
| // InitHandler defines orchestrator's action to fix tasks at start. |
| type InitHandler interface { |
| IsRelatedService(service *api.Service) bool |
| FixTask(ctx context.Context, batch *store.Batch, t *api.Task) |
| SlotTuple(t *api.Task) orchestrator.SlotTuple |
| } |
| |
| // CheckTasks fixes tasks in the store before orchestrator runs. The previous leader might |
| // not have finished processing their updates and left them in an inconsistent state. |
| func CheckTasks(ctx context.Context, s *store.MemoryStore, readTx store.ReadTx, initHandler InitHandler, startSupervisor *restart.Supervisor) error { |
| instances := make(map[orchestrator.SlotTuple][]*api.Task) |
| err := s.Batch(func(batch *store.Batch) error { |
| tasks, err := store.FindTasks(readTx, store.All) |
| if err != nil { |
| return err |
| } |
| for _, t := range tasks { |
| if t.ServiceID == "" { |
| continue |
| } |
| |
| // TODO(aluzzardi): We should NOT retrieve the service here. |
| service := store.GetService(readTx, t.ServiceID) |
| if service == nil { |
| // Service was deleted |
| err := batch.Update(func(tx store.Tx) error { |
| return store.DeleteTask(tx, t.ID) |
| }) |
| if err != nil { |
| log.G(ctx).WithError(err).Error("failed to delete task") |
| } |
| continue |
| } |
| if !initHandler.IsRelatedService(service) { |
| continue |
| } |
| |
| tuple := initHandler.SlotTuple(t) |
| instances[tuple] = append(instances[tuple], t) |
| |
| // handle task updates from agent which should have been triggered by task update events |
| initHandler.FixTask(ctx, batch, t) |
| |
| // desired state ready is a transient state that it should be started. |
| // however previous leader may not have started it, retry start here |
| if t.DesiredState != api.TaskStateReady || t.Status.State > api.TaskStateRunning { |
| continue |
| } |
| restartDelay, _ := gogotypes.DurationFromProto(defaults.Service.Task.Restart.Delay) |
| if t.Spec.Restart != nil && t.Spec.Restart.Delay != nil { |
| var err error |
| restartDelay, err = gogotypes.DurationFromProto(t.Spec.Restart.Delay) |
| if err != nil { |
| log.G(ctx).WithError(err).Error("invalid restart delay") |
| restartDelay, _ = gogotypes.DurationFromProto(defaults.Service.Task.Restart.Delay) |
| } |
| } |
| if restartDelay != 0 { |
| var timestamp time.Time |
| if t.Status.AppliedAt != nil { |
| timestamp, err = gogotypes.TimestampFromProto(t.Status.AppliedAt) |
| } else { |
| timestamp, err = gogotypes.TimestampFromProto(t.Status.Timestamp) |
| } |
| if err == nil { |
| restartTime := timestamp.Add(restartDelay) |
| calculatedRestartDelay := restartTime.Sub(time.Now()) |
| if calculatedRestartDelay < restartDelay { |
| restartDelay = calculatedRestartDelay |
| } |
| if restartDelay > 0 { |
| _ = batch.Update(func(tx store.Tx) error { |
| t := store.GetTask(tx, t.ID) |
| // TODO(aluzzardi): This is shady as well. We should have a more generic condition. |
| if t == nil || t.DesiredState != api.TaskStateReady { |
| return nil |
| } |
| startSupervisor.DelayStart(ctx, tx, nil, t.ID, restartDelay, true) |
| return nil |
| }) |
| continue |
| } |
| } else { |
| log.G(ctx).WithError(err).Error("invalid status timestamp") |
| } |
| } |
| |
| // Start now |
| err := batch.Update(func(tx store.Tx) error { |
| return startSupervisor.StartNow(tx, t.ID) |
| }) |
| if err != nil { |
| log.G(ctx).WithError(err).WithField("task.id", t.ID).Error("moving task out of delayed state failed") |
| } |
| } |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| |
| for tuple, instance := range instances { |
| // Find the most current spec version. That's the only one |
| // we care about for the purpose of reconstructing restart |
| // history. |
| maxVersion := uint64(0) |
| for _, t := range instance { |
| if t.SpecVersion != nil && t.SpecVersion.Index > maxVersion { |
| maxVersion = t.SpecVersion.Index |
| } |
| } |
| |
| // Create a new slice with just the current spec version tasks. |
| var upToDate []*api.Task |
| for _, t := range instance { |
| if t.SpecVersion != nil && t.SpecVersion.Index == maxVersion { |
| upToDate = append(upToDate, t) |
| } |
| } |
| |
| // Sort by creation timestamp |
| sort.Sort(tasksByCreationTimestamp(upToDate)) |
| |
| // All up-to-date tasks in this instance except the first one |
| // should be considered restarted. |
| if len(upToDate) < 2 { |
| continue |
| } |
| for _, t := range upToDate[1:] { |
| startSupervisor.RecordRestartHistory(tuple, t) |
| } |
| } |
| return nil |
| } |
| |
| type tasksByCreationTimestamp []*api.Task |
| |
| func (t tasksByCreationTimestamp) Len() int { |
| return len(t) |
| } |
| func (t tasksByCreationTimestamp) Swap(i, j int) { |
| t[i], t[j] = t[j], t[i] |
| } |
| func (t tasksByCreationTimestamp) Less(i, j int) bool { |
| if t[i].Meta.CreatedAt == nil { |
| return true |
| } |
| if t[j].Meta.CreatedAt == nil { |
| return false |
| } |
| if t[i].Meta.CreatedAt.Seconds < t[j].Meta.CreatedAt.Seconds { |
| return true |
| } |
| if t[i].Meta.CreatedAt.Seconds > t[j].Meta.CreatedAt.Seconds { |
| return false |
| } |
| return t[i].Meta.CreatedAt.Nanos < t[j].Meta.CreatedAt.Nanos |
| } |