| package libcontainerd |
| |
| import ( |
| "fmt" |
| "os" |
| "strings" |
| "sync" |
| "syscall" |
| "time" |
| |
| "github.com/Sirupsen/logrus" |
| containerd "github.com/docker/containerd/api/grpc/types" |
| "github.com/docker/docker/pkg/ioutils" |
| "github.com/docker/docker/pkg/mount" |
| "github.com/golang/protobuf/ptypes" |
| "github.com/golang/protobuf/ptypes/timestamp" |
| specs "github.com/opencontainers/runtime-spec/specs-go" |
| "golang.org/x/net/context" |
| ) |
| |
| type client struct { |
| clientCommon |
| |
| // Platform specific properties below here. |
| remote *remote |
| q queue |
| exitNotifiers map[string]*exitNotifier |
| liveRestore bool |
| } |
| |
| // GetServerVersion returns the connected server version information |
| func (clnt *client) GetServerVersion(ctx context.Context) (*ServerVersion, error) { |
| resp, err := clnt.remote.apiClient.GetServerVersion(ctx, &containerd.GetServerVersionRequest{}) |
| if err != nil { |
| return nil, err |
| } |
| |
| sv := &ServerVersion{ |
| GetServerVersionResponse: *resp, |
| } |
| |
| return sv, nil |
| } |
| |
| // AddProcess is the handler for adding a process to an already running |
| // container. It's called through docker exec. It returns the system pid of the |
| // exec'd process. |
| func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) (pid int, err error) { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| container, err := clnt.getContainer(containerID) |
| if err != nil { |
| return -1, err |
| } |
| |
| spec, err := container.spec() |
| if err != nil { |
| return -1, err |
| } |
| sp := spec.Process |
| sp.Args = specp.Args |
| sp.Terminal = specp.Terminal |
| if len(specp.Env) > 0 { |
| sp.Env = specp.Env |
| } |
| if specp.Cwd != nil { |
| sp.Cwd = *specp.Cwd |
| } |
| if specp.User != nil { |
| sp.User = specs.User{ |
| UID: specp.User.UID, |
| GID: specp.User.GID, |
| AdditionalGids: specp.User.AdditionalGids, |
| } |
| } |
| if specp.Capabilities != nil { |
| sp.Capabilities = specp.Capabilities |
| } |
| |
| p := container.newProcess(processFriendlyName) |
| |
| r := &containerd.AddProcessRequest{ |
| Args: sp.Args, |
| Cwd: sp.Cwd, |
| Terminal: sp.Terminal, |
| Id: containerID, |
| Env: sp.Env, |
| User: &containerd.User{ |
| Uid: sp.User.UID, |
| Gid: sp.User.GID, |
| AdditionalGids: sp.User.AdditionalGids, |
| }, |
| Pid: processFriendlyName, |
| Stdin: p.fifo(syscall.Stdin), |
| Stdout: p.fifo(syscall.Stdout), |
| Stderr: p.fifo(syscall.Stderr), |
| Capabilities: sp.Capabilities, |
| ApparmorProfile: sp.ApparmorProfile, |
| SelinuxLabel: sp.SelinuxLabel, |
| NoNewPrivileges: sp.NoNewPrivileges, |
| Rlimits: convertRlimits(sp.Rlimits), |
| } |
| |
| fifoCtx, cancel := context.WithCancel(context.Background()) |
| defer func() { |
| if err != nil { |
| cancel() |
| } |
| }() |
| |
| iopipe, err := p.openFifos(fifoCtx, sp.Terminal) |
| if err != nil { |
| return -1, err |
| } |
| |
| resp, err := clnt.remote.apiClient.AddProcess(ctx, r) |
| if err != nil { |
| p.closeFifos(iopipe) |
| return -1, err |
| } |
| |
| var stdinOnce sync.Once |
| stdin := iopipe.Stdin |
| iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error { |
| var err error |
| stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed |
| err = stdin.Close() |
| if err2 := p.sendCloseStdin(); err == nil { |
| err = err2 |
| } |
| }) |
| return err |
| }) |
| |
| container.processes[processFriendlyName] = p |
| |
| if err := attachStdio(*iopipe); err != nil { |
| p.closeFifos(iopipe) |
| return -1, err |
| } |
| |
| return int(resp.SystemPid), nil |
| } |
| |
| func (clnt *client) SignalProcess(containerID string, pid string, sig int) error { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{ |
| Id: containerID, |
| Pid: pid, |
| Signal: uint32(sig), |
| }) |
| return err |
| } |
| |
| func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| if _, err := clnt.getContainer(containerID); err != nil { |
| return err |
| } |
| _, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{ |
| Id: containerID, |
| Pid: processFriendlyName, |
| Width: uint32(width), |
| Height: uint32(height), |
| }) |
| return err |
| } |
| |
| func (clnt *client) Pause(containerID string) error { |
| return clnt.setState(containerID, StatePause) |
| } |
| |
| func (clnt *client) setState(containerID, state string) error { |
| clnt.lock(containerID) |
| container, err := clnt.getContainer(containerID) |
| if err != nil { |
| clnt.unlock(containerID) |
| return err |
| } |
| if container.systemPid == 0 { |
| clnt.unlock(containerID) |
| return fmt.Errorf("No active process for container %s", containerID) |
| } |
| st := "running" |
| if state == StatePause { |
| st = "paused" |
| } |
| chstate := make(chan struct{}) |
| _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{ |
| Id: containerID, |
| Pid: InitFriendlyName, |
| Status: st, |
| }) |
| if err != nil { |
| clnt.unlock(containerID) |
| return err |
| } |
| container.pauseMonitor.append(state, chstate) |
| clnt.unlock(containerID) |
| <-chstate |
| return nil |
| } |
| |
| func (clnt *client) Resume(containerID string) error { |
| return clnt.setState(containerID, StateResume) |
| } |
| |
| func (clnt *client) Stats(containerID string) (*Stats, error) { |
| resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID}) |
| if err != nil { |
| return nil, err |
| } |
| return (*Stats)(resp), nil |
| } |
| |
| // Take care of the old 1.11.0 behavior in case the version upgrade |
| // happened without a clean daemon shutdown |
| func (clnt *client) cleanupOldRootfs(containerID string) { |
| // Unmount and delete the bundle folder |
| if mts, err := mount.GetMounts(); err == nil { |
| for _, mts := range mts { |
| if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") { |
| if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil { |
| os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs")) |
| } |
| break |
| } |
| } |
| } |
| } |
| |
| func (clnt *client) setExited(containerID string, exitCode uint32) error { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| |
| err := clnt.backend.StateChanged(containerID, StateInfo{ |
| CommonStateInfo: CommonStateInfo{ |
| State: StateExit, |
| ExitCode: exitCode, |
| }}) |
| |
| clnt.cleanupOldRootfs(containerID) |
| |
| return err |
| } |
| |
| func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) { |
| cont, err := clnt.getContainerdContainer(containerID) |
| if err != nil { |
| return nil, err |
| } |
| pids := make([]int, len(cont.Pids)) |
| for i, p := range cont.Pids { |
| pids[i] = int(p) |
| } |
| return pids, nil |
| } |
| |
| // Summary returns a summary of the processes running in a container. |
| // This is a no-op on Linux. |
| func (clnt *client) Summary(containerID string) ([]Summary, error) { |
| return nil, nil |
| } |
| |
| func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) { |
| resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID}) |
| if err != nil { |
| return nil, err |
| } |
| for _, cont := range resp.Containers { |
| if cont.Id == containerID { |
| return cont, nil |
| } |
| } |
| return nil, fmt.Errorf("invalid state response") |
| } |
| |
| func (clnt *client) UpdateResources(containerID string, resources Resources) error { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| container, err := clnt.getContainer(containerID) |
| if err != nil { |
| return err |
| } |
| if container.systemPid == 0 { |
| return fmt.Errorf("No active process for container %s", containerID) |
| } |
| _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{ |
| Id: containerID, |
| Pid: InitFriendlyName, |
| Resources: (*containerd.UpdateResource)(&resources), |
| }) |
| if err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func (clnt *client) getExitNotifier(containerID string) *exitNotifier { |
| clnt.mapMutex.RLock() |
| defer clnt.mapMutex.RUnlock() |
| return clnt.exitNotifiers[containerID] |
| } |
| |
| func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier { |
| clnt.mapMutex.Lock() |
| w, ok := clnt.exitNotifiers[containerID] |
| defer clnt.mapMutex.Unlock() |
| if !ok { |
| w = &exitNotifier{c: make(chan struct{}), client: clnt} |
| clnt.exitNotifiers[containerID] = w |
| } |
| return w |
| } |
| |
| func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) { |
| clnt.lock(cont.Id) |
| defer clnt.unlock(cont.Id) |
| |
| logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status) |
| |
| containerID := cont.Id |
| if _, err := clnt.getContainer(containerID); err == nil { |
| return fmt.Errorf("container %s is already active", containerID) |
| } |
| |
| defer func() { |
| if err != nil { |
| clnt.deleteContainer(cont.Id) |
| } |
| }() |
| |
| container := clnt.newContainer(cont.BundlePath, options...) |
| container.systemPid = systemPid(cont) |
| |
| var terminal bool |
| for _, p := range cont.Processes { |
| if p.Pid == InitFriendlyName { |
| terminal = p.Terminal |
| } |
| } |
| |
| fifoCtx, cancel := context.WithCancel(context.Background()) |
| defer func() { |
| if err != nil { |
| cancel() |
| } |
| }() |
| |
| iopipe, err := container.openFifos(fifoCtx, terminal) |
| if err != nil { |
| return err |
| } |
| var stdinOnce sync.Once |
| stdin := iopipe.Stdin |
| iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error { |
| var err error |
| stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed |
| err = stdin.Close() |
| }) |
| return err |
| }) |
| |
| if err := attachStdio(*iopipe); err != nil { |
| container.closeFifos(iopipe) |
| return err |
| } |
| |
| clnt.appendContainer(container) |
| |
| err = clnt.backend.StateChanged(containerID, StateInfo{ |
| CommonStateInfo: CommonStateInfo{ |
| State: StateRestore, |
| Pid: container.systemPid, |
| }}) |
| |
| if err != nil { |
| container.closeFifos(iopipe) |
| return err |
| } |
| |
| if lastEvent != nil { |
| // This should only be a pause or resume event |
| if lastEvent.Type == StatePause || lastEvent.Type == StateResume { |
| return clnt.backend.StateChanged(containerID, StateInfo{ |
| CommonStateInfo: CommonStateInfo{ |
| State: lastEvent.Type, |
| Pid: container.systemPid, |
| }}) |
| } |
| |
| logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent) |
| } |
| |
| return nil |
| } |
| |
| func (clnt *client) getContainerLastEventSinceTime(id string, tsp *timestamp.Timestamp) (*containerd.Event, error) { |
| er := &containerd.EventsRequest{ |
| Timestamp: tsp, |
| StoredOnly: true, |
| Id: id, |
| } |
| events, err := clnt.remote.apiClient.Events(context.Background(), er) |
| if err != nil { |
| logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err) |
| return nil, err |
| } |
| |
| var ev *containerd.Event |
| for { |
| e, err := events.Recv() |
| if err != nil { |
| if err.Error() == "EOF" { |
| break |
| } |
| logrus.Errorf("libcontainerd: failed to get container event for %s: %q", id, err) |
| return nil, err |
| } |
| ev = e |
| logrus.Debugf("libcontainerd: received past event %#v", ev) |
| } |
| |
| return ev, nil |
| } |
| |
| func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) { |
| ev, err := clnt.getContainerLastEventSinceTime(id, clnt.remote.restoreFromTimestamp) |
| if err == nil && ev == nil { |
| // If ev is nil and the container is running in containerd, |
| // we already consumed all the event of the |
| // container, included the "exit" one. |
| // Thus, we request all events containerd has in memory for |
| // this container in order to get the last one (which should |
| // be an exit event) |
| logrus.Warnf("libcontainerd: client is out of sync, restore was called on a fully synced container (%s).", id) |
| // Request all events since beginning of time |
| t := time.Unix(0, 0) |
| tsp, err := ptypes.TimestampProto(t) |
| if err != nil { |
| logrus.Errorf("libcontainerd: getLastEventSinceTime() failed to convert timestamp: %q", err) |
| return nil, err |
| } |
| |
| return clnt.getContainerLastEventSinceTime(id, tsp) |
| } |
| |
| return ev, err |
| } |
| |
| func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error { |
| // Synchronize with live events |
| clnt.remote.Lock() |
| defer clnt.remote.Unlock() |
| // Check that containerd still knows this container. |
| // |
| // In the unlikely event that Restore for this container process |
| // the its past event before the main loop, the event will be |
| // processed twice. However, this is not an issue as all those |
| // events will do is change the state of the container to be |
| // exactly the same. |
| cont, err := clnt.getContainerdContainer(containerID) |
| // Get its last event |
| ev, eerr := clnt.getContainerLastEvent(containerID) |
| if err != nil || cont.Status == "Stopped" { |
| if err != nil { |
| logrus.Warnf("libcontainerd: failed to retrieve container %s state: %v", containerID, err) |
| } |
| if ev != nil && (ev.Pid != InitFriendlyName || ev.Type != StateExit) { |
| // Wait a while for the exit event |
| timeout := time.NewTimer(10 * time.Second) |
| tick := time.NewTicker(100 * time.Millisecond) |
| stop: |
| for { |
| select { |
| case <-timeout.C: |
| break stop |
| case <-tick.C: |
| ev, eerr = clnt.getContainerLastEvent(containerID) |
| if eerr != nil { |
| break stop |
| } |
| if ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit { |
| break stop |
| } |
| } |
| } |
| timeout.Stop() |
| tick.Stop() |
| } |
| |
| // get the exit status for this container, if we don't have |
| // one, indicate an error |
| ec := uint32(255) |
| if eerr == nil && ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit { |
| ec = ev.Status |
| } |
| clnt.setExited(containerID, ec) |
| |
| return nil |
| } |
| |
| // container is still alive |
| if clnt.liveRestore { |
| if err := clnt.restore(cont, ev, attachStdio, options...); err != nil { |
| logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err) |
| } |
| return nil |
| } |
| |
| // Kill the container if liveRestore == false |
| w := clnt.getOrCreateExitNotifier(containerID) |
| clnt.lock(cont.Id) |
| container := clnt.newContainer(cont.BundlePath) |
| container.systemPid = systemPid(cont) |
| clnt.appendContainer(container) |
| clnt.unlock(cont.Id) |
| |
| container.discardFifos() |
| |
| if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil { |
| logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err) |
| } |
| |
| // Let the main loop handle the exit event |
| clnt.remote.Unlock() |
| |
| if ev != nil && ev.Type == StatePause { |
| // resume container, it depends on the main loop, so we do it after Unlock() |
| logrus.Debugf("libcontainerd: %s was paused, resuming it so it can die", containerID) |
| if err := clnt.Resume(containerID); err != nil { |
| return fmt.Errorf("failed to resume container: %v", err) |
| } |
| } |
| |
| select { |
| case <-time.After(10 * time.Second): |
| if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil { |
| logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err) |
| } |
| select { |
| case <-time.After(2 * time.Second): |
| case <-w.wait(): |
| // relock because of the defer |
| clnt.remote.Lock() |
| return nil |
| } |
| case <-w.wait(): |
| // relock because of the defer |
| clnt.remote.Lock() |
| return nil |
| } |
| // relock because of the defer |
| clnt.remote.Lock() |
| |
| clnt.deleteContainer(containerID) |
| |
| return clnt.setExited(containerID, uint32(255)) |
| } |
| |
| func (clnt *client) CreateCheckpoint(containerID string, checkpointID string, checkpointDir string, exit bool) error { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| if _, err := clnt.getContainer(containerID); err != nil { |
| return err |
| } |
| |
| _, err := clnt.remote.apiClient.CreateCheckpoint(context.Background(), &containerd.CreateCheckpointRequest{ |
| Id: containerID, |
| Checkpoint: &containerd.Checkpoint{ |
| Name: checkpointID, |
| Exit: exit, |
| Tcp: true, |
| UnixSockets: true, |
| Shell: false, |
| EmptyNS: []string{"network"}, |
| }, |
| CheckpointDir: checkpointDir, |
| }) |
| return err |
| } |
| |
| func (clnt *client) DeleteCheckpoint(containerID string, checkpointID string, checkpointDir string) error { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| if _, err := clnt.getContainer(containerID); err != nil { |
| return err |
| } |
| |
| _, err := clnt.remote.apiClient.DeleteCheckpoint(context.Background(), &containerd.DeleteCheckpointRequest{ |
| Id: containerID, |
| Name: checkpointID, |
| CheckpointDir: checkpointDir, |
| }) |
| return err |
| } |
| |
| func (clnt *client) ListCheckpoints(containerID string, checkpointDir string) (*Checkpoints, error) { |
| clnt.lock(containerID) |
| defer clnt.unlock(containerID) |
| if _, err := clnt.getContainer(containerID); err != nil { |
| return nil, err |
| } |
| |
| resp, err := clnt.remote.apiClient.ListCheckpoint(context.Background(), &containerd.ListCheckpointRequest{ |
| Id: containerID, |
| CheckpointDir: checkpointDir, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return (*Checkpoints)(resp), nil |
| } |