| package allocator |
| |
| import ( |
| "fmt" |
| "time" |
| |
| "github.com/docker/go-events" |
| "github.com/docker/swarmkit/api" |
| "github.com/docker/swarmkit/log" |
| "github.com/docker/swarmkit/manager/allocator/cnmallocator" |
| "github.com/docker/swarmkit/manager/allocator/networkallocator" |
| "github.com/docker/swarmkit/manager/state" |
| "github.com/docker/swarmkit/manager/state/store" |
| "github.com/docker/swarmkit/protobuf/ptypes" |
| "github.com/pkg/errors" |
| "golang.org/x/net/context" |
| ) |
| |
| const ( |
| // Network allocator Voter ID for task allocation vote. |
| networkVoter = "network" |
| allocatedStatusMessage = "pending task scheduling" |
| ) |
| |
| var ( |
| // ErrNoIngress is returned when no ingress network is found in store |
| ErrNoIngress = errors.New("no ingress network found") |
| errNoChanges = errors.New("task unchanged") |
| |
| retryInterval = 5 * time.Minute |
| ) |
| |
| // Network context information which is used throughout the network allocation code. |
| type networkContext struct { |
| ingressNetwork *api.Network |
| // Instance of the low-level network allocator which performs |
| // the actual network allocation. |
| nwkAllocator networkallocator.NetworkAllocator |
| |
| // A set of tasks which are ready to be allocated as a batch. This is |
| // distinct from "unallocatedTasks" which are tasks that failed to |
| // allocate on the first try, being held for a future retry. |
| pendingTasks map[string]*api.Task |
| |
| // A set of unallocated tasks which will be revisited if any thing |
| // changes in system state that might help task allocation. |
| unallocatedTasks map[string]*api.Task |
| |
| // A set of unallocated services which will be revisited if |
| // any thing changes in system state that might help service |
| // allocation. |
| unallocatedServices map[string]*api.Service |
| |
| // A set of unallocated networks which will be revisited if |
| // any thing changes in system state that might help network |
| // allocation. |
| unallocatedNetworks map[string]*api.Network |
| |
| // lastRetry is the last timestamp when unallocated |
| // tasks/services/networks were retried. |
| lastRetry time.Time |
| |
| // somethingWasDeallocated indicates that we just deallocated at |
| // least one service/task/network, so we should retry failed |
| // allocations (in we are experiencing IP exhaustion and an IP was |
| // released). |
| somethingWasDeallocated bool |
| } |
| |
| func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { |
| na, err := cnmallocator.New(a.pluginGetter) |
| if err != nil { |
| return err |
| } |
| |
| nc := &networkContext{ |
| nwkAllocator: na, |
| pendingTasks: make(map[string]*api.Task), |
| unallocatedTasks: make(map[string]*api.Task), |
| unallocatedServices: make(map[string]*api.Service), |
| unallocatedNetworks: make(map[string]*api.Network), |
| lastRetry: time.Now(), |
| } |
| a.netCtx = nc |
| defer func() { |
| // Clear a.netCtx if initialization was unsuccessful. |
| if err != nil { |
| a.netCtx = nil |
| } |
| }() |
| |
| // Ingress network is now created at cluster's first time creation. |
| // Check if we have the ingress network. If found, make sure it is |
| // allocated, before reading all network objects for allocation. |
| // If not found, it means it was removed by user, nothing to do here. |
| ingressNetwork, err := GetIngressNetwork(a.store) |
| switch err { |
| case nil: |
| // Try to complete ingress network allocation before anything else so |
| // that the we can get the preferred subnet for ingress network. |
| nc.ingressNetwork = ingressNetwork |
| if !na.IsAllocated(nc.ingressNetwork) { |
| if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil { |
| log.G(ctx).WithError(err).Error("failed allocating ingress network during init") |
| } else if err := a.store.Batch(func(batch *store.Batch) error { |
| if err := a.commitAllocatedNetwork(ctx, batch, nc.ingressNetwork); err != nil { |
| log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init") |
| } |
| return nil |
| }); err != nil { |
| log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init") |
| } |
| } |
| case ErrNoIngress: |
| // Ingress network is not present in store, It means user removed it |
| // and did not create a new one. |
| default: |
| return errors.Wrap(err, "failure while looking for ingress network during init") |
| } |
| |
| // Allocate networks in the store so far before we started |
| // watching. |
| var networks []*api.Network |
| a.store.View(func(tx store.ReadTx) { |
| networks, err = store.FindNetworks(tx, store.All) |
| }) |
| if err != nil { |
| return errors.Wrap(err, "error listing all networks in store while trying to allocate during init") |
| } |
| |
| var allocatedNetworks []*api.Network |
| for _, n := range networks { |
| if na.IsAllocated(n) { |
| continue |
| } |
| |
| if err := a.allocateNetwork(ctx, n); err != nil { |
| log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID) |
| continue |
| } |
| allocatedNetworks = append(allocatedNetworks, n) |
| } |
| |
| if err := a.store.Batch(func(batch *store.Batch) error { |
| for _, n := range allocatedNetworks { |
| if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil { |
| log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID) |
| } |
| } |
| return nil |
| }); err != nil { |
| log.G(ctx).WithError(err).Error("failed committing allocation of networks during init") |
| } |
| |
| // First, allocate objects that already have addresses associated with |
| // them, to reserve these IP addresses in internal state. |
| if err := a.allocateNodes(ctx, true); err != nil { |
| return err |
| } |
| |
| if err := a.allocateServices(ctx, true); err != nil { |
| return err |
| } |
| if err := a.allocateTasks(ctx, true); err != nil { |
| return err |
| } |
| |
| if err := a.allocateNodes(ctx, false); err != nil { |
| return err |
| } |
| |
| if err := a.allocateServices(ctx, false); err != nil { |
| return err |
| } |
| return a.allocateTasks(ctx, false) |
| } |
| |
| func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { |
| nc := a.netCtx |
| |
| switch v := ev.(type) { |
| case api.EventCreateNetwork: |
| n := v.Network.Copy() |
| if nc.nwkAllocator.IsAllocated(n) { |
| break |
| } |
| |
| if IsIngressNetwork(n) && nc.ingressNetwork != nil { |
| log.G(ctx).Errorf("Cannot allocate ingress network %s (%s) because another ingress network is already present: %s (%s)", |
| n.ID, n.Spec.Annotations.Name, nc.ingressNetwork.ID, nc.ingressNetwork.Spec.Annotations) |
| break |
| } |
| |
| if err := a.allocateNetwork(ctx, n); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID) |
| break |
| } |
| |
| if err := a.store.Batch(func(batch *store.Batch) error { |
| return a.commitAllocatedNetwork(ctx, batch, n) |
| }); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed to commit allocation for network %s", n.ID) |
| } |
| if IsIngressNetwork(n) { |
| nc.ingressNetwork = n |
| } |
| err := a.allocateNodes(ctx, false) |
| if err != nil { |
| log.G(ctx).WithError(err).Error(err) |
| } |
| case api.EventDeleteNetwork: |
| n := v.Network.Copy() |
| |
| if IsIngressNetwork(n) && nc.ingressNetwork != nil && nc.ingressNetwork.ID == n.ID { |
| nc.ingressNetwork = nil |
| } |
| |
| if err := a.deallocateNodeAttachments(ctx, n.ID); err != nil { |
| log.G(ctx).WithError(err).Error(err) |
| } |
| |
| // The assumption here is that all dependent objects |
| // have been cleaned up when we are here so the only |
| // thing that needs to happen is free the network |
| // resources. |
| if err := nc.nwkAllocator.Deallocate(n); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID) |
| } else { |
| nc.somethingWasDeallocated = true |
| } |
| |
| delete(nc.unallocatedNetworks, n.ID) |
| case api.EventCreateService: |
| var s *api.Service |
| a.store.View(func(tx store.ReadTx) { |
| s = store.GetService(tx, v.Service.ID) |
| }) |
| |
| if s == nil { |
| break |
| } |
| |
| if nc.nwkAllocator.IsServiceAllocated(s) { |
| break |
| } |
| |
| if err := a.allocateService(ctx, s); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID) |
| break |
| } |
| |
| if err := a.store.Batch(func(batch *store.Batch) error { |
| return a.commitAllocatedService(ctx, batch, s) |
| }); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed to commit allocation for service %s", s.ID) |
| } |
| case api.EventUpdateService: |
| // We may have already allocated this service. If a create or |
| // update event is older than the current version in the store, |
| // we run the risk of allocating the service a second time. |
| // Only operate on the latest version of the service. |
| var s *api.Service |
| a.store.View(func(tx store.ReadTx) { |
| s = store.GetService(tx, v.Service.ID) |
| }) |
| |
| if s == nil { |
| break |
| } |
| |
| if nc.nwkAllocator.IsServiceAllocated(s) { |
| if !nc.nwkAllocator.HostPublishPortsNeedUpdate(s) { |
| break |
| } |
| updatePortsInHostPublishMode(s) |
| } else { |
| if err := a.allocateService(ctx, s); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID) |
| break |
| } |
| } |
| |
| if err := a.store.Batch(func(batch *store.Batch) error { |
| return a.commitAllocatedService(ctx, batch, s) |
| }); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID) |
| nc.unallocatedServices[s.ID] = s |
| } else { |
| delete(nc.unallocatedServices, s.ID) |
| } |
| case api.EventDeleteService: |
| s := v.Service.Copy() |
| |
| if err := nc.nwkAllocator.DeallocateService(s); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID) |
| } else { |
| nc.somethingWasDeallocated = true |
| } |
| |
| // Remove it from unallocatedServices just in case |
| // it's still there. |
| delete(nc.unallocatedServices, s.ID) |
| case api.EventCreateNode, api.EventUpdateNode, api.EventDeleteNode: |
| a.doNodeAlloc(ctx, ev) |
| case api.EventCreateTask, api.EventUpdateTask, api.EventDeleteTask: |
| a.doTaskAlloc(ctx, ev) |
| case state.EventCommit: |
| a.procTasksNetwork(ctx, false) |
| |
| if time.Since(nc.lastRetry) > retryInterval || nc.somethingWasDeallocated { |
| a.procUnallocatedNetworks(ctx) |
| a.procUnallocatedServices(ctx) |
| a.procTasksNetwork(ctx, true) |
| nc.lastRetry = time.Now() |
| nc.somethingWasDeallocated = false |
| } |
| |
| // Any left over tasks are moved to the unallocated set |
| for _, t := range nc.pendingTasks { |
| nc.unallocatedTasks[t.ID] = t |
| } |
| nc.pendingTasks = make(map[string]*api.Task) |
| } |
| } |
| |
| func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) { |
| var ( |
| isDelete bool |
| node *api.Node |
| ) |
| |
| // We may have already allocated this node. If a create or update |
| // event is older than the current version in the store, we run the |
| // risk of allocating the node a second time. Only operate on the |
| // latest version of the node. |
| switch v := ev.(type) { |
| case api.EventCreateNode: |
| a.store.View(func(tx store.ReadTx) { |
| node = store.GetNode(tx, v.Node.ID) |
| }) |
| case api.EventUpdateNode: |
| a.store.View(func(tx store.ReadTx) { |
| node = store.GetNode(tx, v.Node.ID) |
| }) |
| case api.EventDeleteNode: |
| isDelete = true |
| node = v.Node.Copy() |
| } |
| |
| if node == nil { |
| return |
| } |
| |
| nc := a.netCtx |
| |
| if isDelete { |
| if err := a.deallocateNode(node); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID) |
| } else { |
| nc.somethingWasDeallocated = true |
| } |
| } else { |
| allocatedNetworks, err := a.getAllocatedNetworks() |
| if err != nil { |
| log.G(ctx).WithError(err).Errorf("Error listing allocated networks in network %s", node.ID) |
| } |
| |
| isAllocated := a.allocateNode(ctx, node, false, allocatedNetworks) |
| |
| if isAllocated { |
| if err := a.store.Batch(func(batch *store.Batch) error { |
| return a.commitAllocatedNode(ctx, batch, node) |
| }); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID) |
| } |
| } |
| } |
| } |
| |
| func isOverlayNetwork(n *api.Network) bool { |
| if n.DriverState != nil && n.DriverState.Name == "overlay" { |
| return true |
| } |
| |
| if n.Spec.DriverConfig != nil && n.Spec.DriverConfig.Name == "overlay" { |
| return true |
| } |
| |
| return false |
| } |
| |
| func (a *Allocator) getAllocatedNetworks() ([]*api.Network, error) { |
| var ( |
| err error |
| nc = a.netCtx |
| na = nc.nwkAllocator |
| allocatedNetworks []*api.Network |
| ) |
| |
| // Find allocated networks |
| var networks []*api.Network |
| a.store.View(func(tx store.ReadTx) { |
| networks, err = store.FindNetworks(tx, store.All) |
| }) |
| |
| if err != nil { |
| return nil, errors.Wrap(err, "error listing all networks in store while trying to allocate during init") |
| } |
| |
| for _, n := range networks { |
| |
| if isOverlayNetwork(n) && na.IsAllocated(n) { |
| allocatedNetworks = append(allocatedNetworks, n) |
| } |
| } |
| |
| return allocatedNetworks, nil |
| } |
| |
| func (a *Allocator) allocateNodes(ctx context.Context, existingAddressesOnly bool) error { |
| // Allocate nodes in the store so far before we process watched events. |
| var ( |
| allocatedNodes []*api.Node |
| nodes []*api.Node |
| err error |
| ) |
| |
| a.store.View(func(tx store.ReadTx) { |
| nodes, err = store.FindNodes(tx, store.All) |
| }) |
| if err != nil { |
| return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources") |
| } |
| |
| allocatedNetworks, err := a.getAllocatedNetworks() |
| if err != nil { |
| return errors.Wrap(err, "error listing all nodes in store while trying to allocate network resources") |
| } |
| |
| for _, node := range nodes { |
| isAllocated := a.allocateNode(ctx, node, existingAddressesOnly, allocatedNetworks) |
| if isAllocated { |
| allocatedNodes = append(allocatedNodes, node) |
| } |
| } |
| |
| if err := a.store.Batch(func(batch *store.Batch) error { |
| for _, node := range allocatedNodes { |
| if err := a.commitAllocatedNode(ctx, batch, node); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID) |
| } |
| } |
| return nil |
| }); err != nil { |
| log.G(ctx).WithError(err).Error("Failed to commit allocation of network resources for nodes") |
| } |
| |
| return nil |
| } |
| |
| func (a *Allocator) deallocateNodes(ctx context.Context) error { |
| var ( |
| nodes []*api.Node |
| nc = a.netCtx |
| err error |
| ) |
| |
| a.store.View(func(tx store.ReadTx) { |
| nodes, err = store.FindNodes(tx, store.All) |
| }) |
| if err != nil { |
| return fmt.Errorf("error listing all nodes in store while trying to free network resources") |
| } |
| |
| for _, node := range nodes { |
| if err := a.deallocateNode(node); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID) |
| } else { |
| nc.somethingWasDeallocated = true |
| } |
| if err := a.store.Batch(func(batch *store.Batch) error { |
| return a.commitAllocatedNode(ctx, batch, node) |
| }); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID) |
| } |
| } |
| |
| return nil |
| } |
| |
| func (a *Allocator) deallocateNodeAttachments(ctx context.Context, nid string) error { |
| var ( |
| nodes []*api.Node |
| nc = a.netCtx |
| err error |
| ) |
| |
| a.store.View(func(tx store.ReadTx) { |
| nodes, err = store.FindNodes(tx, store.All) |
| }) |
| if err != nil { |
| return fmt.Errorf("error listing all nodes in store while trying to free network resources") |
| } |
| |
| for _, node := range nodes { |
| |
| var networkAttachment *api.NetworkAttachment |
| var naIndex int |
| for index, na := range node.Attachments { |
| if na.Network.ID == nid { |
| networkAttachment = na |
| naIndex = index |
| break |
| } |
| } |
| |
| if networkAttachment == nil { |
| log.G(ctx).Errorf("Failed to find network %s on node %s", nid, node.ID) |
| continue |
| } |
| |
| if nc.nwkAllocator.IsAttachmentAllocated(node, networkAttachment) { |
| if err := nc.nwkAllocator.DeallocateAttachment(node, networkAttachment); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID) |
| } else { |
| |
| // Delete the lbattachment |
| node.Attachments[naIndex] = node.Attachments[len(node.Attachments)-1] |
| node.Attachments[len(node.Attachments)-1] = nil |
| node.Attachments = node.Attachments[:len(node.Attachments)-1] |
| |
| if err := a.store.Batch(func(batch *store.Batch) error { |
| return a.commitAllocatedNode(ctx, batch, node) |
| }); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID) |
| } |
| |
| } |
| } |
| |
| } |
| return nil |
| } |
| |
| func (a *Allocator) deallocateNode(node *api.Node) error { |
| var ( |
| nc = a.netCtx |
| ) |
| |
| for _, na := range node.Attachments { |
| if nc.nwkAllocator.IsAttachmentAllocated(node, na) { |
| if err := nc.nwkAllocator.DeallocateAttachment(node, na); err != nil { |
| return err |
| } |
| } |
| } |
| |
| node.Attachments = nil |
| |
| return nil |
| } |
| |
| // allocateServices allocates services in the store so far before we process |
| // watched events. |
| func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly bool) error { |
| var ( |
| nc = a.netCtx |
| services []*api.Service |
| err error |
| ) |
| a.store.View(func(tx store.ReadTx) { |
| services, err = store.FindServices(tx, store.All) |
| }) |
| if err != nil { |
| return errors.Wrap(err, "error listing all services in store while trying to allocate during init") |
| } |
| |
| var allocatedServices []*api.Service |
| for _, s := range services { |
| if nc.nwkAllocator.IsServiceAllocated(s, networkallocator.OnInit) { |
| continue |
| } |
| |
| if existingAddressesOnly && |
| (s.Endpoint == nil || |
| len(s.Endpoint.VirtualIPs) == 0) { |
| continue |
| } |
| |
| if err := a.allocateService(ctx, s); err != nil { |
| log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID) |
| continue |
| } |
| allocatedServices = append(allocatedServices, s) |
| } |
| |
| if err := a.store.Batch(func(batch *store.Batch) error { |
| for _, s := range allocatedServices { |
| if err := a.commitAllocatedService(ctx, batch, s); err != nil { |
| log.G(ctx).WithError(err).Errorf("failed committing allocation of service %s during init", s.ID) |
| } |
| } |
| return nil |
| }); err != nil { |
| log.G(ctx).WithError(err).Error("failed committing allocation of services during init") |
| } |
| |
| return nil |
| } |
| |
| // allocateTasks allocates tasks in the store so far before we started watching. |
| func (a *Allocator) allocateTasks(ctx context.Context, existingAddressesOnly bool) error { |
| var ( |
| nc = a.netCtx |
| tasks []*api.Task |
| allocatedTasks []*api.Task |
| err error |
| ) |
| a.store.View(func(tx store.ReadTx) { |
| tasks, err = store.FindTasks(tx, store.All) |
| }) |
| if err != nil { |
| return errors.Wrap(err, "error listing all tasks in store while trying to allocate during init") |
| } |
| |
| for _, t := range tasks { |
| if t.Status.State > api.TaskStateRunning { |
| continue |
| } |
| |
| if existingAddressesOnly { |
| hasAddresses := false |
| for _, nAttach := range t.Networks { |
| if len(nAttach.Addresses) != 0 { |
| hasAddresses = true |
| break |
| } |
| } |
| if !hasAddresses { |
| continue |
| } |
| } |
| |
| var s *api.Service |
| if t.ServiceID != "" { |
| a.store.View(func(tx store.ReadTx) { |
| s = store.GetService(tx, t.ServiceID) |
| }) |
| } |
| |
| // Populate network attachments in the task |
| // based on service spec. |
| a.taskCreateNetworkAttachments(t, s) |
| |
| if taskReadyForNetworkVote(t, s, nc) { |
| if t.Status.State >= api.TaskStatePending { |
| continue |
| } |
| |
| if a.taskAllocateVote(networkVoter, t.ID) { |
| // If the task is not attached to any network, network |
| // allocators job is done. Immediately cast a vote so |
| // that the task can be moved to the PENDING state as |
| // soon as possible. |
| updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage) |
| allocatedTasks = append(allocatedTasks, t) |
| } |
| continue |
| } |
| |
| err := a.allocateTask(ctx, t) |
| if err == nil { |
| allocatedTasks = append(allocatedTasks, t) |
| } else if err != errNoChanges { |
| log.G(ctx).WithError(err).Errorf("failed allocating task %s during init", t.ID) |
| nc.unallocatedTasks[t.ID] = t |
| } |
| } |
| |
| if err := a.store.Batch(func(batch *store.Batch) error { |
| for _, t := range allocatedTasks { |
| if err := a.commitAllocatedTask(ctx, batch, t); err != nil { |
| log.G(ctx).WithError(err).Errorf("failed committing allocation of task %s during init", t.ID) |
| } |
| } |
| |
| return nil |
| }); err != nil { |
| log.G(ctx).WithError(err).Error("failed committing allocation of tasks during init") |
| } |
| |
| return nil |
| } |
| |
| // taskReadyForNetworkVote checks if the task is ready for a network |
| // vote to move it to PENDING state. |
| func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool { |
| // Task is ready for vote if the following is true: |
| // |
| // Task has no network attached or networks attached but all |
| // of them allocated AND Task's service has no endpoint or |
| // network configured or service endpoints have been |
| // allocated. |
| return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) && |
| (s == nil || nc.nwkAllocator.IsServiceAllocated(s)) |
| } |
| |
| func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) { |
| networksCopy := make([]*api.NetworkAttachment, 0, len(networks)) |
| for _, n := range networks { |
| networksCopy = append(networksCopy, n.Copy()) |
| } |
| |
| t.Networks = networksCopy |
| } |
| |
| func taskUpdateEndpoint(t *api.Task, endpoint *api.Endpoint) { |
| t.Endpoint = endpoint.Copy() |
| } |
| |
| // IsIngressNetworkNeeded checks whether the service requires the routing-mesh |
| func IsIngressNetworkNeeded(s *api.Service) bool { |
| return networkallocator.IsIngressNetworkNeeded(s) |
| } |
| |
| func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) { |
| // If task network attachments have already been filled in no |
| // need to do anything else. |
| if len(t.Networks) != 0 { |
| return |
| } |
| |
| var networks []*api.NetworkAttachment |
| if IsIngressNetworkNeeded(s) && a.netCtx.ingressNetwork != nil { |
| networks = append(networks, &api.NetworkAttachment{Network: a.netCtx.ingressNetwork}) |
| } |
| |
| a.store.View(func(tx store.ReadTx) { |
| // Always prefer NetworkAttachmentConfig in the TaskSpec |
| specNetworks := t.Spec.Networks |
| if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 { |
| specNetworks = s.Spec.Networks |
| } |
| |
| for _, na := range specNetworks { |
| n := store.GetNetwork(tx, na.Target) |
| if n == nil { |
| continue |
| } |
| |
| attachment := api.NetworkAttachment{Network: n} |
| attachment.Aliases = append(attachment.Aliases, na.Aliases...) |
| attachment.Addresses = append(attachment.Addresses, na.Addresses...) |
| attachment.DriverAttachmentOpts = na.DriverAttachmentOpts |
| networks = append(networks, &attachment) |
| } |
| }) |
| |
| taskUpdateNetworks(t, networks) |
| } |
| |
| func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { |
| var ( |
| isDelete bool |
| t *api.Task |
| ) |
| |
| // We may have already allocated this task. If a create or update |
| // event is older than the current version in the store, we run the |
| // risk of allocating the task a second time. Only operate on the |
| // latest version of the task. |
| switch v := ev.(type) { |
| case api.EventCreateTask: |
| a.store.View(func(tx store.ReadTx) { |
| t = store.GetTask(tx, v.Task.ID) |
| }) |
| case api.EventUpdateTask: |
| a.store.View(func(tx store.ReadTx) { |
| t = store.GetTask(tx, v.Task.ID) |
| }) |
| case api.EventDeleteTask: |
| isDelete = true |
| t = v.Task.Copy() |
| } |
| |
| if t == nil { |
| return |
| } |
| |
| nc := a.netCtx |
| |
| // If the task has stopped running then we should free the network |
| // resources associated with the task right away. |
| if t.Status.State > api.TaskStateRunning || isDelete { |
| if nc.nwkAllocator.IsTaskAllocated(t) { |
| if err := nc.nwkAllocator.DeallocateTask(t); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID) |
| } else { |
| nc.somethingWasDeallocated = true |
| } |
| } |
| |
| // Cleanup any task references that might exist |
| delete(nc.pendingTasks, t.ID) |
| delete(nc.unallocatedTasks, t.ID) |
| |
| return |
| } |
| |
| // If we are already in allocated state, there is |
| // absolutely nothing else to do. |
| if t.Status.State >= api.TaskStatePending { |
| delete(nc.pendingTasks, t.ID) |
| delete(nc.unallocatedTasks, t.ID) |
| return |
| } |
| |
| var s *api.Service |
| if t.ServiceID != "" { |
| a.store.View(func(tx store.ReadTx) { |
| s = store.GetService(tx, t.ServiceID) |
| }) |
| if s == nil { |
| // If the task is running it is not normal to |
| // not be able to find the associated |
| // service. If the task is not running (task |
| // is either dead or the desired state is set |
| // to dead) then the service may not be |
| // available in store. But we still need to |
| // cleanup network resources associated with |
| // the task. |
| if t.Status.State <= api.TaskStateRunning && !isDelete { |
| log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID) |
| return |
| } |
| } |
| } |
| |
| // Populate network attachments in the task |
| // based on service spec. |
| a.taskCreateNetworkAttachments(t, s) |
| |
| nc.pendingTasks[t.ID] = t |
| } |
| |
| func (a *Allocator) allocateNode(ctx context.Context, node *api.Node, existingAddressesOnly bool, networks []*api.Network) bool { |
| var allocated bool |
| |
| nc := a.netCtx |
| |
| for _, network := range networks { |
| |
| var lbAttachment *api.NetworkAttachment |
| for _, na := range node.Attachments { |
| if na.Network != nil && na.Network.ID == network.ID { |
| lbAttachment = na |
| break |
| } |
| } |
| |
| if lbAttachment != nil { |
| if nc.nwkAllocator.IsAttachmentAllocated(node, lbAttachment) { |
| continue |
| } |
| } |
| |
| if lbAttachment == nil { |
| lbAttachment = &api.NetworkAttachment{} |
| node.Attachments = append(node.Attachments, lbAttachment) |
| } |
| |
| if existingAddressesOnly && len(lbAttachment.Addresses) == 0 { |
| continue |
| } |
| |
| lbAttachment.Network = network.Copy() |
| if err := a.netCtx.nwkAllocator.AllocateAttachment(node, lbAttachment); err != nil { |
| log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID) |
| // TODO: Should we add a unallocatedNode and retry allocating resources like we do for network, tasks, services? |
| // right now, we will only retry allocating network resources for the node when the node is updated. |
| continue |
| } |
| |
| allocated = true |
| } |
| return allocated |
| |
| } |
| |
| func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, node *api.Node) error { |
| if err := batch.Update(func(tx store.Tx) error { |
| err := store.UpdateNode(tx, node) |
| |
| if err == store.ErrSequenceConflict { |
| storeNode := store.GetNode(tx, node.ID) |
| storeNode.Attachments = node.Attachments |
| err = store.UpdateNode(tx, storeNode) |
| } |
| |
| return errors.Wrapf(err, "failed updating state in store transaction for node %s", node.ID) |
| }); err != nil { |
| if err := a.deallocateNode(node); err != nil { |
| log.G(ctx).WithError(err).Errorf("failed rolling back allocation of node %s", node.ID) |
| } |
| |
| return err |
| } |
| |
| return nil |
| } |
| |
| // This function prepares the service object for being updated when the change regards |
| // the published ports in host mode: It resets the runtime state ports (s.Endpoint.Ports) |
| // to the current ingress mode runtime state ports plus the newly configured publish mode ports, |
| // so that the service allocation invoked on this new service object will trigger the deallocation |
| // of any old publish mode port and allocation of any new one. |
| func updatePortsInHostPublishMode(s *api.Service) { |
| // First, remove all host-mode ports from s.Endpoint.Ports |
| if s.Endpoint != nil { |
| var portConfigs []*api.PortConfig |
| for _, portConfig := range s.Endpoint.Ports { |
| if portConfig.PublishMode != api.PublishModeHost { |
| portConfigs = append(portConfigs, portConfig) |
| } |
| } |
| s.Endpoint.Ports = portConfigs |
| } |
| |
| // Add back all host-mode ports |
| if s.Spec.Endpoint != nil { |
| if s.Endpoint == nil { |
| s.Endpoint = &api.Endpoint{} |
| } |
| for _, portConfig := range s.Spec.Endpoint.Ports { |
| if portConfig.PublishMode == api.PublishModeHost { |
| s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy()) |
| } |
| } |
| } |
| s.Endpoint.Spec = s.Spec.Endpoint.Copy() |
| } |
| |
| func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error { |
| nc := a.netCtx |
| |
| if s.Spec.Endpoint != nil { |
| // service has user-defined endpoint |
| if s.Endpoint == nil { |
| // service currently has no allocated endpoint, need allocated. |
| s.Endpoint = &api.Endpoint{ |
| Spec: s.Spec.Endpoint.Copy(), |
| } |
| } |
| |
| // The service is trying to expose ports to the external |
| // world. Automatically attach the service to the ingress |
| // network only if it is not already done. |
| if IsIngressNetworkNeeded(s) { |
| if nc.ingressNetwork == nil { |
| return fmt.Errorf("ingress network is missing") |
| } |
| var found bool |
| for _, vip := range s.Endpoint.VirtualIPs { |
| if vip.NetworkID == nc.ingressNetwork.ID { |
| found = true |
| break |
| } |
| } |
| |
| if !found { |
| s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs, |
| &api.Endpoint_VirtualIP{NetworkID: nc.ingressNetwork.ID}) |
| } |
| } |
| } else if s.Endpoint != nil { |
| // service has no user-defined endpoints while has already allocated network resources, |
| // need deallocated. |
| if err := nc.nwkAllocator.DeallocateService(s); err != nil { |
| return err |
| } |
| nc.somethingWasDeallocated = true |
| } |
| |
| if err := nc.nwkAllocator.AllocateService(s); err != nil { |
| nc.unallocatedServices[s.ID] = s |
| return err |
| } |
| |
| // If the service doesn't expose ports any more and if we have |
| // any lingering virtual IP references for ingress network |
| // clean them up here. |
| if !IsIngressNetworkNeeded(s) && nc.ingressNetwork != nil { |
| if s.Endpoint != nil { |
| for i, vip := range s.Endpoint.VirtualIPs { |
| if vip.NetworkID == nc.ingressNetwork.ID { |
| n := len(s.Endpoint.VirtualIPs) |
| s.Endpoint.VirtualIPs[i], s.Endpoint.VirtualIPs[n-1] = s.Endpoint.VirtualIPs[n-1], nil |
| s.Endpoint.VirtualIPs = s.Endpoint.VirtualIPs[:n-1] |
| break |
| } |
| } |
| } |
| } |
| return nil |
| } |
| |
| func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error { |
| if err := batch.Update(func(tx store.Tx) error { |
| err := store.UpdateService(tx, s) |
| |
| if err == store.ErrSequenceConflict { |
| storeService := store.GetService(tx, s.ID) |
| storeService.Endpoint = s.Endpoint |
| err = store.UpdateService(tx, storeService) |
| } |
| |
| return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID) |
| }); err != nil { |
| if err := a.netCtx.nwkAllocator.DeallocateService(s); err != nil { |
| log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID) |
| } |
| |
| return err |
| } |
| |
| return nil |
| } |
| |
| func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error { |
| nc := a.netCtx |
| |
| if err := nc.nwkAllocator.Allocate(n); err != nil { |
| nc.unallocatedNetworks[n.ID] = n |
| return err |
| } |
| |
| return nil |
| } |
| |
| func (a *Allocator) commitAllocatedNetwork(ctx context.Context, batch *store.Batch, n *api.Network) error { |
| if err := batch.Update(func(tx store.Tx) error { |
| if err := store.UpdateNetwork(tx, n); err != nil { |
| return errors.Wrapf(err, "failed updating state in store transaction for network %s", n.ID) |
| } |
| return nil |
| }); err != nil { |
| if err := a.netCtx.nwkAllocator.Deallocate(n); err != nil { |
| log.G(ctx).WithError(err).Errorf("failed rolling back allocation of network %s", n.ID) |
| } |
| |
| return err |
| } |
| |
| return nil |
| } |
| |
| func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) { |
| taskUpdated := false |
| nc := a.netCtx |
| |
| // We might be here even if a task allocation has already |
| // happened but wasn't successfully committed to store. In such |
| // cases skip allocation and go straight ahead to updating the |
| // store. |
| if !nc.nwkAllocator.IsTaskAllocated(t) { |
| a.store.View(func(tx store.ReadTx) { |
| if t.ServiceID != "" { |
| s := store.GetService(tx, t.ServiceID) |
| if s == nil { |
| err = fmt.Errorf("could not find service %s", t.ServiceID) |
| return |
| } |
| |
| if !nc.nwkAllocator.IsServiceAllocated(s) { |
| err = fmt.Errorf("service %s to which this task %s belongs has pending allocations", s.ID, t.ID) |
| return |
| } |
| |
| if s.Endpoint != nil { |
| taskUpdateEndpoint(t, s.Endpoint) |
| taskUpdated = true |
| } |
| } |
| |
| for _, na := range t.Networks { |
| n := store.GetNetwork(tx, na.Network.ID) |
| if n == nil { |
| err = fmt.Errorf("failed to retrieve network %s while allocating task %s", na.Network.ID, t.ID) |
| return |
| } |
| |
| if !nc.nwkAllocator.IsAllocated(n) { |
| err = fmt.Errorf("network %s attached to task %s not allocated yet", n.ID, t.ID) |
| return |
| } |
| |
| na.Network = n |
| } |
| |
| if err = nc.nwkAllocator.AllocateTask(t); err != nil { |
| return |
| } |
| if nc.nwkAllocator.IsTaskAllocated(t) { |
| taskUpdated = true |
| } |
| }) |
| |
| if err != nil { |
| return err |
| } |
| } |
| |
| // Update the network allocations and moving to |
| // PENDING state on top of the latest store state. |
| if a.taskAllocateVote(networkVoter, t.ID) { |
| if t.Status.State < api.TaskStatePending { |
| updateTaskStatus(t, api.TaskStatePending, allocatedStatusMessage) |
| taskUpdated = true |
| } |
| } |
| |
| if !taskUpdated { |
| return errNoChanges |
| } |
| |
| return nil |
| } |
| |
| func (a *Allocator) commitAllocatedTask(ctx context.Context, batch *store.Batch, t *api.Task) error { |
| return batch.Update(func(tx store.Tx) error { |
| err := store.UpdateTask(tx, t) |
| |
| if err == store.ErrSequenceConflict { |
| storeTask := store.GetTask(tx, t.ID) |
| taskUpdateNetworks(storeTask, t.Networks) |
| taskUpdateEndpoint(storeTask, t.Endpoint) |
| if storeTask.Status.State < api.TaskStatePending { |
| storeTask.Status = t.Status |
| } |
| err = store.UpdateTask(tx, storeTask) |
| } |
| |
| return errors.Wrapf(err, "failed updating state in store transaction for task %s", t.ID) |
| }) |
| } |
| |
| func (a *Allocator) procUnallocatedNetworks(ctx context.Context) { |
| nc := a.netCtx |
| var allocatedNetworks []*api.Network |
| for _, n := range nc.unallocatedNetworks { |
| if !nc.nwkAllocator.IsAllocated(n) { |
| if err := a.allocateNetwork(ctx, n); err != nil { |
| log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated network %s", n.ID) |
| continue |
| } |
| allocatedNetworks = append(allocatedNetworks, n) |
| } |
| } |
| |
| if len(allocatedNetworks) == 0 { |
| return |
| } |
| |
| err := a.store.Batch(func(batch *store.Batch) error { |
| for _, n := range allocatedNetworks { |
| if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil { |
| log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated network %s", n.ID) |
| continue |
| } |
| delete(nc.unallocatedNetworks, n.ID) |
| } |
| return nil |
| }) |
| |
| if err != nil { |
| log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated networks") |
| // We optimistically removed these from nc.unallocatedNetworks |
| // above in anticipation of successfully committing the batch, |
| // but since the transaction has failed, we requeue them here. |
| for _, n := range allocatedNetworks { |
| nc.unallocatedNetworks[n.ID] = n |
| } |
| } |
| } |
| |
| func (a *Allocator) procUnallocatedServices(ctx context.Context) { |
| nc := a.netCtx |
| var allocatedServices []*api.Service |
| for _, s := range nc.unallocatedServices { |
| if !nc.nwkAllocator.IsServiceAllocated(s) { |
| if err := a.allocateService(ctx, s); err != nil { |
| log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID) |
| continue |
| } |
| allocatedServices = append(allocatedServices, s) |
| } |
| } |
| |
| if len(allocatedServices) == 0 { |
| return |
| } |
| |
| err := a.store.Batch(func(batch *store.Batch) error { |
| for _, s := range allocatedServices { |
| if err := a.commitAllocatedService(ctx, batch, s); err != nil { |
| log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated service %s", s.ID) |
| continue |
| } |
| delete(nc.unallocatedServices, s.ID) |
| } |
| return nil |
| }) |
| |
| if err != nil { |
| log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated services") |
| // We optimistically removed these from nc.unallocatedServices |
| // above in anticipation of successfully committing the batch, |
| // but since the transaction has failed, we requeue them here. |
| for _, s := range allocatedServices { |
| nc.unallocatedServices[s.ID] = s |
| } |
| } |
| } |
| |
| func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) { |
| nc := a.netCtx |
| quiet := false |
| toAllocate := nc.pendingTasks |
| if onRetry { |
| toAllocate = nc.unallocatedTasks |
| quiet = true |
| } |
| allocatedTasks := make([]*api.Task, 0, len(toAllocate)) |
| |
| for _, t := range toAllocate { |
| if err := a.allocateTask(ctx, t); err == nil { |
| allocatedTasks = append(allocatedTasks, t) |
| } else if err != errNoChanges { |
| if quiet { |
| log.G(ctx).WithError(err).Debug("task allocation failure") |
| } else { |
| log.G(ctx).WithError(err).Error("task allocation failure") |
| } |
| } |
| } |
| |
| if len(allocatedTasks) == 0 { |
| return |
| } |
| |
| err := a.store.Batch(func(batch *store.Batch) error { |
| for _, t := range allocatedTasks { |
| err := a.commitAllocatedTask(ctx, batch, t) |
| if err != nil { |
| log.G(ctx).WithError(err).Error("task allocation commit failure") |
| continue |
| } |
| delete(toAllocate, t.ID) |
| } |
| |
| return nil |
| }) |
| |
| if err != nil { |
| log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks") |
| // We optimistically removed these from toAllocate above in |
| // anticipation of successfully committing the batch, but since |
| // the transaction has failed, we requeue them here. |
| for _, t := range allocatedTasks { |
| toAllocate[t.ID] = t |
| } |
| } |
| } |
| |
| // IsBuiltInNetworkDriver returns whether the passed driver is an internal network driver |
| func IsBuiltInNetworkDriver(name string) bool { |
| return cnmallocator.IsBuiltInDriver(name) |
| } |
| |
| // PredefinedNetworks returns the list of predefined network structures for a given network model |
| func PredefinedNetworks() []networkallocator.PredefinedNetworkData { |
| return cnmallocator.PredefinedNetworks() |
| } |
| |
| // updateTaskStatus sets TaskStatus and updates timestamp. |
| func updateTaskStatus(t *api.Task, newStatus api.TaskState, message string) { |
| t.Status = api.TaskStatus{ |
| State: newStatus, |
| Message: message, |
| Timestamp: ptypes.MustTimestampProto(time.Now()), |
| } |
| } |
| |
| // IsIngressNetwork returns whether the passed network is an ingress network. |
| func IsIngressNetwork(nw *api.Network) bool { |
| return networkallocator.IsIngressNetwork(nw) |
| } |
| |
| // GetIngressNetwork fetches the ingress network from store. |
| // ErrNoIngress will be returned if the ingress network is not present, |
| // nil otherwise. In case of any other failure in accessing the store, |
| // the respective error will be reported as is. |
| func GetIngressNetwork(s *store.MemoryStore) (*api.Network, error) { |
| var ( |
| networks []*api.Network |
| err error |
| ) |
| s.View(func(tx store.ReadTx) { |
| networks, err = store.FindNetworks(tx, store.All) |
| }) |
| if err != nil { |
| return nil, err |
| } |
| for _, n := range networks { |
| if IsIngressNetwork(n) { |
| return n, nil |
| } |
| } |
| return nil, ErrNoIngress |
| } |