blob: 18b8e24abaf209b9cdaad107b3d0b470e603772b [file] [log] [blame]
package replicated
import (
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/orchestrator/restart"
"github.com/docker/swarmkit/manager/orchestrator/update"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"golang.org/x/net/context"
)
// An Orchestrator runs a reconciliation loop to create and destroy
// tasks as necessary for the replicated services.
type Orchestrator struct {
store *store.MemoryStore
reconcileServices map[string]*api.Service
restartTasks map[string]struct{}
// stopChan signals to the state machine to stop running.
stopChan chan struct{}
// doneChan is closed when the state machine terminates.
doneChan chan struct{}
updater *update.Supervisor
restarts *restart.Supervisor
cluster *api.Cluster // local cluster instance
}
// NewReplicatedOrchestrator creates a new replicated Orchestrator.
func NewReplicatedOrchestrator(store *store.MemoryStore) *Orchestrator {
restartSupervisor := restart.NewSupervisor(store)
updater := update.NewSupervisor(store, restartSupervisor)
return &Orchestrator{
store: store,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
reconcileServices: make(map[string]*api.Service),
restartTasks: make(map[string]struct{}),
updater: updater,
restarts: restartSupervisor,
}
}
// Run contains the orchestrator event loop. It runs until Stop is called.
func (r *Orchestrator) Run(ctx context.Context) error {
defer close(r.doneChan)
// Watch changes to services and tasks
queue := r.store.WatchQueue()
watcher, cancel := queue.Watch()
defer cancel()
// Balance existing services and drain initial tasks attached to invalid
// nodes
var err error
r.store.View(func(readTx store.ReadTx) {
if err = r.initTasks(ctx, readTx); err != nil {
return
}
if err = r.initServices(readTx); err != nil {
return
}
if err = r.initCluster(readTx); err != nil {
return
}
})
if err != nil {
return err
}
r.tick(ctx)
for {
select {
case event := <-watcher:
// TODO(stevvooe): Use ctx to limit running time of operation.
r.handleTaskEvent(ctx, event)
r.handleServiceEvent(ctx, event)
switch v := event.(type) {
case state.EventCommit:
r.tick(ctx)
case api.EventUpdateCluster:
r.cluster = v.Cluster
}
case <-r.stopChan:
return nil
}
}
}
// Stop stops the orchestrator.
func (r *Orchestrator) Stop() {
close(r.stopChan)
<-r.doneChan
r.updater.CancelAll()
r.restarts.CancelAll()
}
func (r *Orchestrator) tick(ctx context.Context) {
// tickTasks must be called first, so we respond to task-level changes
// before performing service reconciliation.
r.tickTasks(ctx)
r.tickServices(ctx)
}