blob: 17c713c092603bcf401c4a27e83db1da66e83416 [file] [log] [blame]
package agent
import (
"context"
"sync"
"time"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/equality"
"github.com/docker/swarmkit/log"
)
// taskManager manages all aspects of task execution and reporting for an agent
// through state management.
type taskManager struct {
task *api.Task
ctlr exec.Controller
reporter StatusReporter
updateq chan *api.Task
shutdown chan struct{}
shutdownOnce sync.Once
closed chan struct{}
closeOnce sync.Once
}
func newTaskManager(ctx context.Context, task *api.Task, ctlr exec.Controller, reporter StatusReporter) *taskManager {
t := &taskManager{
task: task.Copy(),
ctlr: ctlr,
reporter: reporter,
updateq: make(chan *api.Task),
shutdown: make(chan struct{}),
closed: make(chan struct{}),
}
go t.run(ctx)
return t
}
// Update the task data.
func (tm *taskManager) Update(ctx context.Context, task *api.Task) error {
select {
case tm.updateq <- task:
return nil
case <-tm.closed:
return ErrClosed
case <-ctx.Done():
return ctx.Err()
}
}
// Close shuts down the task manager, blocking until it is closed.
func (tm *taskManager) Close() error {
tm.shutdownOnce.Do(func() {
close(tm.shutdown)
})
<-tm.closed
return nil
}
func (tm *taskManager) Logs(ctx context.Context, options api.LogSubscriptionOptions, publisher exec.LogPublisher) {
ctx = log.WithModule(ctx, "taskmanager")
logCtlr, ok := tm.ctlr.(exec.ControllerLogs)
if !ok {
return // no logs available
}
if err := logCtlr.Logs(ctx, publisher, options); err != nil {
log.G(ctx).WithError(err).Errorf("logs call failed")
}
}
func (tm *taskManager) run(ctx context.Context) {
ctx, cancelAll := context.WithCancel(ctx)
defer cancelAll() // cancel all child operations on exit.
ctx = log.WithModule(ctx, "taskmanager")
var (
opctx context.Context
cancel context.CancelFunc
run = make(chan struct{}, 1)
statusq = make(chan *api.TaskStatus)
errs = make(chan error)
shutdown = tm.shutdown
updated bool // true if the task was updated.
)
defer func() {
// closure picks up current value of cancel.
if cancel != nil {
cancel()
}
}()
run <- struct{}{} // prime the pump
for {
select {
case <-run:
// always check for shutdown before running.
select {
case <-tm.shutdown:
shutdown = tm.shutdown // a little questionable
continue // ignore run request and handle shutdown
case <-tm.closed:
continue
default:
}
opctx, cancel = context.WithCancel(ctx)
// Several variables need to be snapshotted for the closure below.
opcancel := cancel // fork for the closure
running := tm.task.Copy() // clone the task before dispatch
statusqLocal := statusq
updatedLocal := updated // capture state of update for goroutine
updated = false
go runctx(ctx, tm.closed, errs, func(ctx context.Context) error {
defer opcancel()
if updatedLocal {
// before we do anything, update the task for the controller.
// always update the controller before running.
if err := tm.ctlr.Update(opctx, running); err != nil {
log.G(ctx).WithError(err).Error("updating task controller failed")
return err
}
}
status, err := exec.Do(opctx, running, tm.ctlr)
if status != nil {
// always report the status if we get one back. This
// returns to the manager loop, then reports the status
// upstream.
select {
case statusqLocal <- status:
case <-ctx.Done(): // not opctx, since that may have been cancelled.
}
if err := tm.reporter.UpdateTaskStatus(ctx, running.ID, status); err != nil {
log.G(ctx).WithError(err).Error("task manager failed to report status to agent")
}
}
return err
})
case err := <-errs:
// This branch is always executed when an operations completes. The
// goal is to decide whether or not we re-dispatch the operation.
cancel = nil
select {
case <-tm.shutdown:
shutdown = tm.shutdown // re-enable the shutdown branch
continue // no dispatch if we are in shutdown.
default:
}
switch err {
case exec.ErrTaskNoop:
if !updated {
continue // wait till getting pumped via update.
}
case exec.ErrTaskRetry:
// TODO(stevvooe): Add exponential backoff with random jitter
// here. For now, this backoff is enough to keep the task
// manager from running away with the CPU.
time.AfterFunc(time.Second, func() {
errs <- nil // repump this branch, with no err
})
continue
case nil, context.Canceled, context.DeadlineExceeded:
// no log in this case
default:
log.G(ctx).WithError(err).Error("task operation failed")
}
select {
case run <- struct{}{}:
default:
}
case status := <-statusq:
tm.task.Status = *status
case task := <-tm.updateq:
if equality.TasksEqualStable(task, tm.task) {
continue // ignore the update
}
if task.ID != tm.task.ID {
log.G(ctx).WithField("task.update.id", task.ID).Error("received update for incorrect task")
continue
}
if task.DesiredState < tm.task.DesiredState {
log.G(ctx).WithField("task.update.desiredstate", task.DesiredState).
Error("ignoring task update with invalid desired state")
continue
}
task = task.Copy()
task.Status = tm.task.Status // overwrite our status, as it is canonical.
tm.task = task
updated = true
// we have accepted the task update
if cancel != nil {
cancel() // cancel outstanding if necessary.
} else {
// If this channel op fails, it means there is already a
// message on the run queue.
select {
case run <- struct{}{}:
default:
}
}
case <-shutdown:
if cancel != nil {
// cancel outstanding operation.
cancel()
// subtle: after a cancellation, we want to avoid busy wait
// here. this gets renabled in the errs branch and we'll come
// back around and try shutdown again.
shutdown = nil // turn off this branch until op proceeds
continue // wait until operation actually exits.
}
// disable everything, and prepare for closing.
statusq = nil
errs = nil
shutdown = nil
tm.closeOnce.Do(func() {
close(tm.closed)
})
case <-tm.closed:
return
case <-ctx.Done():
tm.closeOnce.Do(func() {
close(tm.closed)
})
return
}
}
}