blob: 5ca50623bb30d00e6238e0bfff686b1c6d3abe37 [file] [log] [blame]
package allocator
import (
"sync"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"golang.org/x/net/context"
)
// Allocator controls how the allocation stage in the manager is handled.
type Allocator struct {
// The manager store.
store *store.MemoryStore
// the ballot used to synchronize across all allocators to ensure
// all of them have completed their respective allocations so that the
// task can be moved to ALLOCATED state.
taskBallot *taskBallot
// context for the network allocator that will be needed by
// network allocator.
netCtx *networkContext
// stopChan signals to the allocator to stop running.
stopChan chan struct{}
// doneChan is closed when the allocator is finished running.
doneChan chan struct{}
// pluginGetter provides access to docker's plugin inventory.
pluginGetter plugingetter.PluginGetter
}
// taskBallot controls how the voting for task allocation is
// coordinated b/w different allocators. This the only structure that
// will be written by all allocator goroutines concurrently. Hence the
// mutex.
type taskBallot struct {
sync.Mutex
// List of registered voters who have to cast their vote to
// indicate their allocation complete
voters []string
// List of votes collected for every task so far from different voters.
votes map[string][]string
}
// allocActor controls the various phases in the lifecycle of one kind of allocator.
type allocActor struct {
// Task voter identity of the allocator.
taskVoter string
// Action routine which is called for every event that the
// allocator received.
action func(context.Context, events.Event)
// Init routine which is called during the initialization of
// the allocator.
init func(ctx context.Context) error
}
// New returns a new instance of Allocator for use during allocation
// stage of the manager.
func New(store *store.MemoryStore, pg plugingetter.PluginGetter) (*Allocator, error) {
a := &Allocator{
store: store,
taskBallot: &taskBallot{
votes: make(map[string][]string),
},
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
pluginGetter: pg,
}
return a, nil
}
// Run starts all allocator go-routines and waits for Stop to be called.
func (a *Allocator) Run(ctx context.Context) error {
// Setup cancel context for all goroutines to use.
ctx, cancel := context.WithCancel(ctx)
var (
wg sync.WaitGroup
actors []func() error
)
defer func() {
cancel()
wg.Wait()
close(a.doneChan)
}()
for _, aa := range []allocActor{
{
taskVoter: networkVoter,
init: a.doNetworkInit,
action: a.doNetworkAlloc,
},
} {
if aa.taskVoter != "" {
a.registerToVote(aa.taskVoter)
}
// Assign a pointer for variable capture
aaPtr := &aa
actor := func() error {
wg.Add(1)
defer wg.Done()
// init might return an allocator specific context
// which is a child of the passed in context to hold
// allocator specific state
watch, watchCancel, err := a.init(ctx, aaPtr)
if err != nil {
return err
}
wg.Add(1)
go func(watch <-chan events.Event, watchCancel func()) {
defer func() {
wg.Done()
watchCancel()
}()
a.run(ctx, *aaPtr, watch)
}(watch, watchCancel)
return nil
}
actors = append(actors, actor)
}
for _, actor := range actors {
if err := actor(); err != nil {
return err
}
}
<-a.stopChan
return nil
}
// Stop stops the allocator
func (a *Allocator) Stop() {
close(a.stopChan)
// Wait for all allocator goroutines to truly exit
<-a.doneChan
}
func (a *Allocator) init(ctx context.Context, aa *allocActor) (<-chan events.Event, func(), error) {
watch, watchCancel := state.Watch(a.store.WatchQueue(),
api.EventCreateNetwork{},
api.EventDeleteNetwork{},
api.EventCreateService{},
api.EventUpdateService{},
api.EventDeleteService{},
api.EventCreateTask{},
api.EventUpdateTask{},
api.EventDeleteTask{},
api.EventCreateNode{},
api.EventUpdateNode{},
api.EventDeleteNode{},
state.EventCommit{},
)
if err := aa.init(ctx); err != nil {
watchCancel()
return nil, nil, err
}
return watch, watchCancel, nil
}
func (a *Allocator) run(ctx context.Context, aa allocActor, watch <-chan events.Event) {
for {
select {
case ev, ok := <-watch:
if !ok {
return
}
aa.action(ctx, ev)
case <-ctx.Done():
return
}
}
}
func (a *Allocator) registerToVote(name string) {
a.taskBallot.Lock()
defer a.taskBallot.Unlock()
a.taskBallot.voters = append(a.taskBallot.voters, name)
}
func (a *Allocator) taskAllocateVote(voter string, id string) bool {
a.taskBallot.Lock()
defer a.taskBallot.Unlock()
// If voter has already voted, return false
for _, v := range a.taskBallot.votes[id] {
// check if voter is in x
if v == voter {
return false
}
}
a.taskBallot.votes[id] = append(a.taskBallot.votes[id], voter)
// We haven't gotten enough votes yet
if len(a.taskBallot.voters) > len(a.taskBallot.votes[id]) {
return false
}
nextVoter:
for _, voter := range a.taskBallot.voters {
for _, vote := range a.taskBallot.votes[id] {
if voter == vote {
continue nextVoter
}
}
// Not every registered voter has registered a vote.
return false
}
return true
}