| // +build !windows |
| |
| package shim |
| |
| import ( |
| "fmt" |
| "os" |
| "sync" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| |
| "github.com/containerd/console" |
| eventsapi "github.com/containerd/containerd/api/services/events/v1" |
| "github.com/containerd/containerd/api/types/task" |
| "github.com/containerd/containerd/errdefs" |
| "github.com/containerd/containerd/events" |
| shimapi "github.com/containerd/containerd/linux/shim/v1" |
| "github.com/containerd/containerd/log" |
| "github.com/containerd/containerd/namespaces" |
| "github.com/containerd/containerd/reaper" |
| "github.com/containerd/containerd/runtime" |
| runc "github.com/containerd/go-runc" |
| google_protobuf "github.com/golang/protobuf/ptypes/empty" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| "golang.org/x/net/context" |
| ) |
| |
| var empty = &google_protobuf.Empty{} |
| |
| // RuncRoot is the path to the root runc state directory |
| const RuncRoot = "/run/containerd/runc" |
| |
| // NewService returns a new shim service that can be used via GRPC |
| func NewService(config Config, publisher events.Publisher) (*Service, error) { |
| if config.Namespace == "" { |
| return nil, fmt.Errorf("shim namespace cannot be empty") |
| } |
| context := namespaces.WithNamespace(context.Background(), config.Namespace) |
| context = log.WithLogger(context, logrus.WithFields(logrus.Fields{ |
| "namespace": config.Namespace, |
| "path": config.Path, |
| "pid": os.Getpid(), |
| })) |
| s := &Service{ |
| config: config, |
| context: context, |
| processes: make(map[string]process), |
| events: make(chan interface{}, 128), |
| ec: reaper.Default.Subscribe(), |
| } |
| go s.processExits() |
| if err := s.initPlatform(); err != nil { |
| return nil, errors.Wrap(err, "failed to initialized platform behavior") |
| } |
| go s.forward(publisher) |
| return s, nil |
| } |
| |
| // platform handles platform-specific behavior that may differs across |
| // platform implementations |
| type platform interface { |
| copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) |
| shutdownConsole(ctx context.Context, console console.Console) error |
| close() error |
| } |
| |
| // Service is the shim implementation of a remote shim over GRPC |
| type Service struct { |
| mu sync.Mutex |
| |
| config Config |
| context context.Context |
| processes map[string]process |
| events chan interface{} |
| platform platform |
| ec chan runc.Exit |
| |
| // Filled by Create() |
| id string |
| bundle string |
| } |
| |
| // Create a new initial process and container with the underlying OCI runtime |
| func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| process, err := s.newInitProcess(ctx, r) |
| if err != nil { |
| return nil, errdefs.ToGRPC(err) |
| } |
| // save the main task id and bundle to the shim for additional requests |
| s.id = r.ID |
| s.bundle = r.Bundle |
| pid := process.Pid() |
| s.processes[r.ID] = process |
| s.events <- &eventsapi.TaskCreate{ |
| ContainerID: r.ID, |
| Bundle: r.Bundle, |
| Rootfs: r.Rootfs, |
| IO: &eventsapi.TaskIO{ |
| Stdin: r.Stdin, |
| Stdout: r.Stdout, |
| Stderr: r.Stderr, |
| Terminal: r.Terminal, |
| }, |
| Checkpoint: r.Checkpoint, |
| Pid: uint32(pid), |
| } |
| return &shimapi.CreateTaskResponse{ |
| Pid: uint32(pid), |
| }, nil |
| } |
| |
| // Start a process |
| func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| p := s.processes[r.ID] |
| if p == nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", r.ID) |
| } |
| if err := p.Start(ctx); err != nil { |
| return nil, err |
| } |
| if r.ID == s.id { |
| s.events <- &eventsapi.TaskStart{ |
| ContainerID: s.id, |
| Pid: uint32(p.Pid()), |
| } |
| } else { |
| pid := p.Pid() |
| s.events <- &eventsapi.TaskExecStarted{ |
| ContainerID: s.id, |
| ExecID: r.ID, |
| Pid: uint32(pid), |
| } |
| } |
| return &shimapi.StartResponse{ |
| ID: p.ID(), |
| Pid: uint32(p.Pid()), |
| }, nil |
| } |
| |
| // Delete the initial process and container |
| func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimapi.DeleteResponse, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| p := s.processes[s.id] |
| if p == nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") |
| } |
| |
| if err := p.Delete(ctx); err != nil { |
| return nil, err |
| } |
| delete(s.processes, s.id) |
| s.platform.close() |
| s.events <- &eventsapi.TaskDelete{ |
| ContainerID: s.id, |
| ExitStatus: uint32(p.ExitStatus()), |
| ExitedAt: p.ExitedAt(), |
| Pid: uint32(p.Pid()), |
| } |
| return &shimapi.DeleteResponse{ |
| ExitStatus: uint32(p.ExitStatus()), |
| ExitedAt: p.ExitedAt(), |
| Pid: uint32(p.Pid()), |
| }, nil |
| } |
| |
| // DeleteProcess deletes an exec'd process |
| func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| if r.ID == s.id { |
| return nil, grpc.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess") |
| } |
| p := s.processes[r.ID] |
| if p == nil { |
| return nil, errors.Wrapf(errdefs.ErrNotFound, "process %s", r.ID) |
| } |
| if err := p.Delete(ctx); err != nil { |
| return nil, err |
| } |
| delete(s.processes, r.ID) |
| return &shimapi.DeleteResponse{ |
| ExitStatus: uint32(p.ExitStatus()), |
| ExitedAt: p.ExitedAt(), |
| Pid: uint32(p.Pid()), |
| }, nil |
| } |
| |
| // Exec an additional process inside the container |
| func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*google_protobuf.Empty, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| |
| if p := s.processes[r.ID]; p != nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ID) |
| } |
| |
| p := s.processes[s.id] |
| if p == nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") |
| } |
| |
| process, err := newExecProcess(ctx, s.config.Path, r, p.(*initProcess), r.ID) |
| if err != nil { |
| return nil, errdefs.ToGRPC(err) |
| } |
| s.processes[r.ID] = process |
| |
| s.events <- &eventsapi.TaskExecAdded{ |
| ContainerID: s.id, |
| ExecID: r.ID, |
| } |
| return empty, nil |
| } |
| |
| // ResizePty of a process |
| func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*google_protobuf.Empty, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| if r.ID == "" { |
| return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided") |
| } |
| ws := console.WinSize{ |
| Width: uint16(r.Width), |
| Height: uint16(r.Height), |
| } |
| p := s.processes[r.ID] |
| if p == nil { |
| return nil, errors.Errorf("process does not exist %s", r.ID) |
| } |
| if err := p.Resize(ws); err != nil { |
| return nil, errdefs.ToGRPC(err) |
| } |
| return empty, nil |
| } |
| |
| // State returns runtime state information for a process |
| func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| p := s.processes[r.ID] |
| if p == nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID) |
| } |
| st, err := p.Status(ctx) |
| if err != nil { |
| return nil, err |
| } |
| status := task.StatusUnknown |
| switch st { |
| case "created": |
| status = task.StatusCreated |
| case "running": |
| status = task.StatusRunning |
| case "stopped": |
| status = task.StatusStopped |
| case "paused": |
| status = task.StatusPaused |
| case "pausing": |
| status = task.StatusPausing |
| } |
| sio := p.Stdio() |
| return &shimapi.StateResponse{ |
| ID: p.ID(), |
| Bundle: s.bundle, |
| Pid: uint32(p.Pid()), |
| Status: status, |
| Stdin: sio.stdin, |
| Stdout: sio.stdout, |
| Stderr: sio.stderr, |
| Terminal: sio.terminal, |
| ExitStatus: uint32(p.ExitStatus()), |
| ExitedAt: p.ExitedAt(), |
| }, nil |
| } |
| |
| // Pause the container |
| func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_protobuf.Empty, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| p := s.processes[s.id] |
| if p == nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") |
| } |
| if err := p.(*initProcess).Pause(ctx); err != nil { |
| return nil, err |
| } |
| s.events <- &eventsapi.TaskPaused{ |
| ContainerID: s.id, |
| } |
| return empty, nil |
| } |
| |
| // Resume the container |
| func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google_protobuf.Empty, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| p := s.processes[s.id] |
| if p == nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") |
| } |
| if err := p.(*initProcess).Resume(ctx); err != nil { |
| return nil, err |
| } |
| s.events <- &eventsapi.TaskResumed{ |
| ContainerID: s.id, |
| } |
| return empty, nil |
| } |
| |
| // Kill a process with the provided signal |
| func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*google_protobuf.Empty, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| if r.ID == "" { |
| p := s.processes[s.id] |
| if p == nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") |
| } |
| if err := p.Kill(ctx, r.Signal, r.All); err != nil { |
| return nil, errdefs.ToGRPC(err) |
| } |
| return empty, nil |
| } |
| |
| p := s.processes[r.ID] |
| if p == nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID) |
| } |
| if err := p.Kill(ctx, r.Signal, r.All); err != nil { |
| return nil, errdefs.ToGRPC(err) |
| } |
| return empty, nil |
| } |
| |
| // ListPids returns all pids inside the container |
| func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*shimapi.ListPidsResponse, error) { |
| pids, err := s.getContainerPids(ctx, r.ID) |
| if err != nil { |
| return nil, errdefs.ToGRPC(err) |
| } |
| var processes []*task.ProcessInfo |
| for _, pid := range pids { |
| processes = append(processes, &task.ProcessInfo{ |
| Pid: pid, |
| }) |
| } |
| return &shimapi.ListPidsResponse{ |
| Processes: processes, |
| }, nil |
| } |
| |
| // CloseIO of a process |
| func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*google_protobuf.Empty, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| p := s.processes[r.ID] |
| if p == nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", r.ID) |
| } |
| if stdin := p.Stdin(); stdin != nil { |
| if err := stdin.Close(); err != nil { |
| return nil, errors.Wrap(err, "close stdin") |
| } |
| } |
| return empty, nil |
| } |
| |
| // Checkpoint the container |
| func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*google_protobuf.Empty, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| p := s.processes[s.id] |
| if p == nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") |
| } |
| if err := p.(*initProcess).Checkpoint(ctx, r); err != nil { |
| return nil, errdefs.ToGRPC(err) |
| } |
| s.events <- &eventsapi.TaskCheckpointed{ |
| ContainerID: s.id, |
| } |
| return empty, nil |
| } |
| |
| // ShimInfo returns shim information such as the shim's pid |
| func (s *Service) ShimInfo(ctx context.Context, r *google_protobuf.Empty) (*shimapi.ShimInfoResponse, error) { |
| return &shimapi.ShimInfoResponse{ |
| ShimPid: uint32(os.Getpid()), |
| }, nil |
| } |
| |
| // Update a running container |
| func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*google_protobuf.Empty, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| p := s.processes[s.id] |
| if p == nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") |
| } |
| if err := p.(*initProcess).Update(ctx, r); err != nil { |
| return nil, errdefs.ToGRPC(err) |
| } |
| return empty, nil |
| } |
| |
| // Wait for a process to exit |
| func (s *Service) Wait(ctx context.Context, r *shimapi.WaitRequest) (*shimapi.WaitResponse, error) { |
| s.mu.Lock() |
| p := s.processes[r.ID] |
| s.mu.Unlock() |
| if p == nil { |
| return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") |
| } |
| p.Wait() |
| |
| return &shimapi.WaitResponse{ |
| ExitStatus: uint32(p.ExitStatus()), |
| ExitedAt: p.ExitedAt(), |
| }, nil |
| } |
| |
| func (s *Service) processExits() { |
| for e := range s.ec { |
| s.checkProcesses(e) |
| } |
| } |
| |
| func (s *Service) checkProcesses(e runc.Exit) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| for _, p := range s.processes { |
| if p.Pid() == e.Pid { |
| if ip, ok := p.(*initProcess); ok { |
| // Ensure all children are killed |
| if err := ip.killAll(s.context); err != nil { |
| log.G(s.context).WithError(err).WithField("id", ip.ID()). |
| Error("failed to kill init's children") |
| } |
| } |
| p.SetExited(e.Status) |
| s.events <- &eventsapi.TaskExit{ |
| ContainerID: s.id, |
| ID: p.ID(), |
| Pid: uint32(e.Pid), |
| ExitStatus: uint32(e.Status), |
| ExitedAt: p.ExitedAt(), |
| } |
| return |
| } |
| } |
| } |
| |
| func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| p := s.processes[s.id] |
| if p == nil { |
| return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created") |
| } |
| |
| ps, err := p.(*initProcess).runtime.Ps(ctx, id) |
| if err != nil { |
| return nil, err |
| } |
| pids := make([]uint32, 0, len(ps)) |
| for _, pid := range ps { |
| pids = append(pids, uint32(pid)) |
| } |
| return pids, nil |
| } |
| |
| func (s *Service) forward(publisher events.Publisher) { |
| for e := range s.events { |
| if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil { |
| logrus.WithError(err).Error("post event") |
| } |
| } |
| } |
| |
| func getTopic(ctx context.Context, e interface{}) string { |
| switch e.(type) { |
| case *eventsapi.TaskCreate: |
| return runtime.TaskCreateEventTopic |
| case *eventsapi.TaskStart: |
| return runtime.TaskStartEventTopic |
| case *eventsapi.TaskOOM: |
| return runtime.TaskOOMEventTopic |
| case *eventsapi.TaskExit: |
| return runtime.TaskExitEventTopic |
| case *eventsapi.TaskDelete: |
| return runtime.TaskDeleteEventTopic |
| case *eventsapi.TaskExecAdded: |
| return runtime.TaskExecAddedEventTopic |
| case *eventsapi.TaskExecStarted: |
| return runtime.TaskExecStartedEventTopic |
| case *eventsapi.TaskPaused: |
| return runtime.TaskPausedEventTopic |
| case *eventsapi.TaskResumed: |
| return runtime.TaskResumedEventTopic |
| case *eventsapi.TaskCheckpointed: |
| return runtime.TaskCheckpointedEventTopic |
| default: |
| logrus.Warnf("no topic for type %#v", e) |
| } |
| return runtime.TaskUnknownTopic |
| } |