| package libcontainerd |
| |
| import ( |
| "encoding/json" |
| "fmt" |
| "os" |
| "path/filepath" |
| "strings" |
| "sync" |
| "syscall" |
| "time" |
| |
| "github.com/Sirupsen/logrus" |
| containerd "github.com/docker/containerd/api/grpc/types" |
| "github.com/docker/docker/pkg/idtools" |
| "github.com/docker/docker/pkg/mount" |
| specs "github.com/opencontainers/specs/specs-go" |
| "golang.org/x/net/context" |
| ) |
| |
| type client struct { |
| clientCommon |
| |
| // Platform specific properties below here. |
| remote *remote |
| q queue |
| exitNotifiers map[string]*exitNotifier |
| liveRestore bool |
| } |
| |
| func (clnt *client) AddProcess(containerID, processFriendlyName string, specp Process) error { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| container, err := clnt.getContainer(containerID) |
| if err != nil { |
| return err |
| } |
| |
| spec, err := container.spec() |
| if err != nil { |
| return err |
| } |
| sp := spec.Process |
| sp.Args = specp.Args |
| sp.Terminal = specp.Terminal |
| if specp.Env != nil { |
| sp.Env = specp.Env |
| } |
| if specp.Cwd != nil { |
| sp.Cwd = *specp.Cwd |
| } |
| if specp.User != nil { |
| sp.User = specs.User{ |
| UID: specp.User.UID, |
| GID: specp.User.GID, |
| AdditionalGids: specp.User.AdditionalGids, |
| } |
| } |
| if specp.Capabilities != nil { |
| sp.Capabilities = specp.Capabilities |
| } |
| |
| p := container.newProcess(processFriendlyName) |
| |
| r := &containerd.AddProcessRequest{ |
| Args: sp.Args, |
| Cwd: sp.Cwd, |
| Terminal: sp.Terminal, |
| Id: containerID, |
| Env: sp.Env, |
| User: &containerd.User{ |
| Uid: sp.User.UID, |
| Gid: sp.User.GID, |
| AdditionalGids: sp.User.AdditionalGids, |
| }, |
| Pid: processFriendlyName, |
| Stdin: p.fifo(syscall.Stdin), |
| Stdout: p.fifo(syscall.Stdout), |
| Stderr: p.fifo(syscall.Stderr), |
| Capabilities: sp.Capabilities, |
| ApparmorProfile: sp.ApparmorProfile, |
| SelinuxLabel: sp.SelinuxLabel, |
| NoNewPrivileges: sp.NoNewPrivileges, |
| Rlimits: convertRlimits(sp.Rlimits), |
| } |
| |
| iopipe, err := p.openFifos(sp.Terminal) |
| if err != nil { |
| return err |
| } |
| |
| if _, err := clnt.remote.apiClient.AddProcess(context.Background(), r); err != nil { |
| p.closeFifos(iopipe) |
| return err |
| } |
| |
| container.processes[processFriendlyName] = p |
| |
| clnt.unlock(containerID) |
| |
| if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil { |
| return err |
| } |
| clnt.lock(containerID) |
| |
| return nil |
| } |
| |
| func (clnt *client) prepareBundleDir(uid, gid int) (string, error) { |
| root, err := filepath.Abs(clnt.remote.stateDir) |
| if err != nil { |
| return "", err |
| } |
| if uid == 0 && gid == 0 { |
| return root, nil |
| } |
| p := string(filepath.Separator) |
| for _, d := range strings.Split(root, string(filepath.Separator))[1:] { |
| p = filepath.Join(p, d) |
| fi, err := os.Stat(p) |
| if err != nil && !os.IsNotExist(err) { |
| return "", err |
| } |
| if os.IsNotExist(err) || fi.Mode()&1 == 0 { |
| p = fmt.Sprintf("%s.%d.%d", p, uid, gid) |
| if err := idtools.MkdirAs(p, 0700, uid, gid); err != nil && !os.IsExist(err) { |
| return "", err |
| } |
| } |
| } |
| return p, nil |
| } |
| |
| func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| |
| if ctr, err := clnt.getContainer(containerID); err == nil { |
| if ctr.restarting { |
| ctr.restartManager.Cancel() |
| ctr.clean() |
| } else { |
| return fmt.Errorf("Container %s is already active", containerID) |
| } |
| } |
| |
| uid, gid, err := getRootIDs(specs.Spec(spec)) |
| if err != nil { |
| return err |
| } |
| dir, err := clnt.prepareBundleDir(uid, gid) |
| if err != nil { |
| return err |
| } |
| |
| container := clnt.newContainer(filepath.Join(dir, containerID), options...) |
| if err := container.clean(); err != nil { |
| return err |
| } |
| |
| defer func() { |
| if err != nil { |
| container.clean() |
| clnt.deleteContainer(containerID) |
| } |
| }() |
| |
| if err := idtools.MkdirAllAs(container.dir, 0700, uid, gid); err != nil && !os.IsExist(err) { |
| return err |
| } |
| |
| f, err := os.Create(filepath.Join(container.dir, configFilename)) |
| if err != nil { |
| return err |
| } |
| defer f.Close() |
| if err := json.NewEncoder(f).Encode(spec); err != nil { |
| return err |
| } |
| |
| return container.start() |
| } |
| |
| func (clnt *client) Signal(containerID string, sig int) error { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{ |
| Id: containerID, |
| Pid: InitFriendlyName, |
| Signal: uint32(sig), |
| }) |
| return err |
| } |
| |
| func (clnt *client) SignalProcess(containerID string, pid string, sig int) error { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{ |
| Id: containerID, |
| Pid: pid, |
| Signal: uint32(sig), |
| }) |
| return err |
| } |
| |
| func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| if _, err := clnt.getContainer(containerID); err != nil { |
| return err |
| } |
| _, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{ |
| Id: containerID, |
| Pid: processFriendlyName, |
| Width: uint32(width), |
| Height: uint32(height), |
| }) |
| return err |
| } |
| |
| func (clnt *client) Pause(containerID string) error { |
| return clnt.setState(containerID, StatePause) |
| } |
| |
| func (clnt *client) setState(containerID, state string) error { |
| clnt.lock(containerID) |
| container, err := clnt.getContainer(containerID) |
| if err != nil { |
| clnt.unlock(containerID) |
| return err |
| } |
| if container.systemPid == 0 { |
| clnt.unlock(containerID) |
| return fmt.Errorf("No active process for container %s", containerID) |
| } |
| st := "running" |
| if state == StatePause { |
| st = "paused" |
| } |
| chstate := make(chan struct{}) |
| _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{ |
| Id: containerID, |
| Pid: InitFriendlyName, |
| Status: st, |
| }) |
| if err != nil { |
| clnt.unlock(containerID) |
| return err |
| } |
| container.pauseMonitor.append(state, chstate) |
| clnt.unlock(containerID) |
| <-chstate |
| return nil |
| } |
| |
| func (clnt *client) Resume(containerID string) error { |
| return clnt.setState(containerID, StateResume) |
| } |
| |
| func (clnt *client) Stats(containerID string) (*Stats, error) { |
| resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID}) |
| if err != nil { |
| return nil, err |
| } |
| return (*Stats)(resp), nil |
| } |
| |
| // Take care of the old 1.11.0 behavior in case the version upgrade |
| // happened without a clean daemon shutdown |
| func (clnt *client) cleanupOldRootfs(containerID string) { |
| // Unmount and delete the bundle folder |
| if mts, err := mount.GetMounts(); err == nil { |
| for _, mts := range mts { |
| if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") { |
| if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil { |
| os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs")) |
| } |
| break |
| } |
| } |
| } |
| } |
| |
| func (clnt *client) setExited(containerID string) error { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| |
| var exitCode uint32 |
| if event, ok := clnt.remote.pastEvents[containerID]; ok { |
| exitCode = event.Status |
| delete(clnt.remote.pastEvents, containerID) |
| } |
| |
| err := clnt.backend.StateChanged(containerID, StateInfo{ |
| CommonStateInfo: CommonStateInfo{ |
| State: StateExit, |
| ExitCode: exitCode, |
| }}) |
| |
| clnt.cleanupOldRootfs(containerID) |
| |
| return err |
| } |
| |
| func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) { |
| cont, err := clnt.getContainerdContainer(containerID) |
| if err != nil { |
| return nil, err |
| } |
| pids := make([]int, len(cont.Pids)) |
| for i, p := range cont.Pids { |
| pids[i] = int(p) |
| } |
| return pids, nil |
| } |
| |
| // Summary returns a summary of the processes running in a container. |
| // This is a no-op on Linux. |
| func (clnt *client) Summary(containerID string) ([]Summary, error) { |
| return nil, nil |
| } |
| |
| func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) { |
| resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID}) |
| if err != nil { |
| return nil, err |
| } |
| for _, cont := range resp.Containers { |
| if cont.Id == containerID { |
| return cont, nil |
| } |
| } |
| return nil, fmt.Errorf("invalid state response") |
| } |
| |
| func (clnt *client) newContainer(dir string, options ...CreateOption) *container { |
| container := &container{ |
| containerCommon: containerCommon{ |
| process: process{ |
| dir: dir, |
| processCommon: processCommon{ |
| containerID: filepath.Base(dir), |
| client: clnt, |
| friendlyName: InitFriendlyName, |
| }, |
| }, |
| processes: make(map[string]*process), |
| }, |
| } |
| for _, option := range options { |
| if err := option.Apply(container); err != nil { |
| logrus.Error(err) |
| } |
| } |
| return container |
| } |
| |
| func (clnt *client) UpdateResources(containerID string, resources Resources) error { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| container, err := clnt.getContainer(containerID) |
| if err != nil { |
| return err |
| } |
| if container.systemPid == 0 { |
| return fmt.Errorf("No active process for container %s", containerID) |
| } |
| _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{ |
| Id: containerID, |
| Pid: InitFriendlyName, |
| Resources: (*containerd.UpdateResource)(&resources), |
| }) |
| if err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func (clnt *client) getExitNotifier(containerID string) *exitNotifier { |
| clnt.mapMutex.RLock() |
| defer clnt.mapMutex.RUnlock() |
| return clnt.exitNotifiers[containerID] |
| } |
| |
| func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier { |
| clnt.mapMutex.Lock() |
| w, ok := clnt.exitNotifiers[containerID] |
| defer clnt.mapMutex.Unlock() |
| if !ok { |
| w = &exitNotifier{c: make(chan struct{}), client: clnt} |
| clnt.exitNotifiers[containerID] = w |
| } |
| return w |
| } |
| |
| func (clnt *client) restore(cont *containerd.Container, options ...CreateOption) (err error) { |
| clnt.lock(cont.Id) |
| defer clnt.unlock(cont.Id) |
| |
| logrus.Debugf("restore container %s state %s", cont.Id, cont.Status) |
| |
| containerID := cont.Id |
| if _, err := clnt.getContainer(containerID); err == nil { |
| return fmt.Errorf("container %s is already active", containerID) |
| } |
| |
| defer func() { |
| if err != nil { |
| clnt.deleteContainer(cont.Id) |
| } |
| }() |
| |
| container := clnt.newContainer(cont.BundlePath, options...) |
| container.systemPid = systemPid(cont) |
| |
| var terminal bool |
| for _, p := range cont.Processes { |
| if p.Pid == InitFriendlyName { |
| terminal = p.Terminal |
| } |
| } |
| |
| iopipe, err := container.openFifos(terminal) |
| if err != nil { |
| return err |
| } |
| |
| if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil { |
| return err |
| } |
| |
| clnt.appendContainer(container) |
| |
| err = clnt.backend.StateChanged(containerID, StateInfo{ |
| CommonStateInfo: CommonStateInfo{ |
| State: StateRestore, |
| Pid: container.systemPid, |
| }}) |
| |
| if err != nil { |
| return err |
| } |
| |
| if event, ok := clnt.remote.pastEvents[containerID]; ok { |
| // This should only be a pause or resume event |
| if event.Type == StatePause || event.Type == StateResume { |
| return clnt.backend.StateChanged(containerID, StateInfo{ |
| CommonStateInfo: CommonStateInfo{ |
| State: event.Type, |
| Pid: container.systemPid, |
| }}) |
| } |
| |
| logrus.Warnf("unexpected backlog event: %#v", event) |
| } |
| |
| return nil |
| } |
| |
| func (clnt *client) Restore(containerID string, options ...CreateOption) error { |
| if clnt.liveRestore { |
| cont, err := clnt.getContainerdContainer(containerID) |
| if err == nil && cont.Status != "stopped" { |
| if err := clnt.restore(cont, options...); err != nil { |
| logrus.Errorf("error restoring %s: %v", containerID, err) |
| } |
| return nil |
| } |
| return clnt.setExited(containerID) |
| } |
| |
| cont, err := clnt.getContainerdContainer(containerID) |
| if err == nil && cont.Status != "stopped" { |
| w := clnt.getOrCreateExitNotifier(containerID) |
| clnt.lock(cont.Id) |
| container := clnt.newContainer(cont.BundlePath) |
| container.systemPid = systemPid(cont) |
| clnt.appendContainer(container) |
| clnt.unlock(cont.Id) |
| |
| container.discardFifos() |
| |
| if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil { |
| logrus.Errorf("error sending sigterm to %v: %v", containerID, err) |
| } |
| select { |
| case <-time.After(10 * time.Second): |
| if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil { |
| logrus.Errorf("error sending sigkill to %v: %v", containerID, err) |
| } |
| select { |
| case <-time.After(2 * time.Second): |
| case <-w.wait(): |
| return nil |
| } |
| case <-w.wait(): |
| return nil |
| } |
| } |
| |
| clnt.deleteContainer(containerID) |
| |
| return clnt.setExited(containerID) |
| } |
| |
| type exitNotifier struct { |
| id string |
| client *client |
| c chan struct{} |
| once sync.Once |
| } |
| |
| func (en *exitNotifier) close() { |
| en.once.Do(func() { |
| close(en.c) |
| en.client.mapMutex.Lock() |
| if en == en.client.exitNotifiers[en.id] { |
| delete(en.client.exitNotifiers, en.id) |
| } |
| en.client.mapMutex.Unlock() |
| }) |
| } |
| func (en *exitNotifier) wait() <-chan struct{} { |
| return en.c |
| } |