| package gateway |
| |
| import ( |
| "context" |
| "fmt" |
| "runtime" |
| "sort" |
| "strings" |
| "sync" |
| |
| "github.com/moby/buildkit/cache" |
| "github.com/moby/buildkit/executor" |
| "github.com/moby/buildkit/frontend/gateway/client" |
| "github.com/moby/buildkit/session" |
| "github.com/moby/buildkit/snapshot" |
| "github.com/moby/buildkit/solver/llbsolver/mounts" |
| opspb "github.com/moby/buildkit/solver/pb" |
| "github.com/moby/buildkit/util/stack" |
| utilsystem "github.com/moby/buildkit/util/system" |
| "github.com/moby/buildkit/worker" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| "golang.org/x/sync/errgroup" |
| ) |
| |
| type NewContainerRequest struct { |
| ContainerID string |
| NetMode opspb.NetMode |
| Mounts []Mount |
| Platform *opspb.Platform |
| Constraints *opspb.WorkerConstraints |
| } |
| |
| // Mount used for the gateway.Container is nearly identical to the client.Mount |
| // except is has a RefProxy instead of Ref to allow for a common abstraction |
| // between gateway clients. |
| type Mount struct { |
| *opspb.Mount |
| WorkerRef *worker.WorkerRef |
| } |
| |
| func NewContainer(ctx context.Context, w worker.Worker, sm *session.Manager, g session.Group, req NewContainerRequest) (client.Container, error) { |
| ctx, cancel := context.WithCancel(ctx) |
| eg, ctx := errgroup.WithContext(ctx) |
| platform := opspb.Platform{ |
| OS: runtime.GOOS, |
| Architecture: runtime.GOARCH, |
| } |
| if req.Platform != nil { |
| platform = *req.Platform |
| } |
| ctr := &gatewayContainer{ |
| id: req.ContainerID, |
| netMode: req.NetMode, |
| platform: platform, |
| executor: w.Executor(), |
| errGroup: eg, |
| ctx: ctx, |
| cancel: cancel, |
| } |
| |
| var ( |
| mnts []*opspb.Mount |
| refs []*worker.WorkerRef |
| ) |
| for _, m := range req.Mounts { |
| mnts = append(mnts, m.Mount) |
| if m.WorkerRef != nil { |
| refs = append(refs, m.WorkerRef) |
| m.Mount.Input = opspb.InputIndex(len(refs) - 1) |
| } else { |
| m.Mount.Input = opspb.Empty |
| } |
| } |
| |
| name := fmt.Sprintf("container %s", req.ContainerID) |
| mm := mounts.NewMountManager(name, w.CacheManager(), sm, w.MetadataStore()) |
| p, err := PrepareMounts(ctx, mm, w.CacheManager(), g, mnts, refs, func(m *opspb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error) { |
| cm := w.CacheManager() |
| if m.Input != opspb.Empty { |
| cm = refs[m.Input].Worker.CacheManager() |
| } |
| return cm.New(ctx, ref, g) |
| |
| }) |
| if err != nil { |
| for i := len(p.Actives) - 1; i >= 0; i-- { // call in LIFO order |
| p.Actives[i].Ref.Release(context.TODO()) |
| } |
| for _, o := range p.OutputRefs { |
| o.Ref.Release(context.TODO()) |
| } |
| return nil, err |
| } |
| ctr.rootFS = p.Root |
| ctr.mounts = p.Mounts |
| |
| for _, o := range p.OutputRefs { |
| o := o |
| ctr.cleanup = append(ctr.cleanup, func() error { |
| return o.Ref.Release(context.TODO()) |
| }) |
| } |
| for _, active := range p.Actives { |
| active := active |
| ctr.cleanup = append(ctr.cleanup, func() error { |
| return active.Ref.Release(context.TODO()) |
| }) |
| } |
| |
| return ctr, nil |
| } |
| |
| type PreparedMounts struct { |
| Root executor.Mount |
| ReadonlyRootFS bool |
| Mounts []executor.Mount |
| OutputRefs []MountRef |
| Actives []MountMutableRef |
| } |
| |
| type MountRef struct { |
| Ref cache.Ref |
| MountIndex int |
| } |
| |
| type MountMutableRef struct { |
| Ref cache.MutableRef |
| MountIndex int |
| } |
| |
| type MakeMutable func(m *opspb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error) |
| |
| func PrepareMounts(ctx context.Context, mm *mounts.MountManager, cm cache.Manager, g session.Group, mnts []*opspb.Mount, refs []*worker.WorkerRef, makeMutable MakeMutable) (p PreparedMounts, err error) { |
| // loop over all mounts, fill in mounts, root and outputs |
| for i, m := range mnts { |
| var ( |
| mountable cache.Mountable |
| ref cache.ImmutableRef |
| ) |
| |
| if m.Dest == opspb.RootMount && m.MountType != opspb.MountType_BIND { |
| return p, errors.Errorf("invalid mount type %s for %s", m.MountType.String(), m.Dest) |
| } |
| |
| // if mount is based on input validate and load it |
| if m.Input != opspb.Empty { |
| if int(m.Input) >= len(refs) { |
| return p, errors.Errorf("missing input %d", m.Input) |
| } |
| ref = refs[int(m.Input)].ImmutableRef |
| mountable = ref |
| } |
| |
| switch m.MountType { |
| case opspb.MountType_BIND: |
| // if mount creates an output |
| if m.Output != opspb.SkipOutput { |
| // if it is readonly and not root then output is the input |
| if m.Readonly && ref != nil && m.Dest != opspb.RootMount { |
| p.OutputRefs = append(p.OutputRefs, MountRef{ |
| MountIndex: i, |
| Ref: ref.Clone(), |
| }) |
| } else { |
| // otherwise output and mount is the mutable child |
| active, err := makeMutable(m, ref) |
| if err != nil { |
| return p, err |
| } |
| mountable = active |
| p.OutputRefs = append(p.OutputRefs, MountRef{ |
| MountIndex: i, |
| Ref: active, |
| }) |
| } |
| } else if (!m.Readonly || ref == nil) && m.Dest != opspb.RootMount { |
| // this case is empty readonly scratch without output that is not really useful for anything but don't error |
| active, err := makeMutable(m, ref) |
| if err != nil { |
| return p, err |
| } |
| p.Actives = append(p.Actives, MountMutableRef{ |
| MountIndex: i, |
| Ref: active, |
| }) |
| mountable = active |
| } |
| |
| case opspb.MountType_CACHE: |
| active, err := mm.MountableCache(ctx, m, ref, g) |
| if err != nil { |
| return p, err |
| } |
| mountable = active |
| p.Actives = append(p.Actives, MountMutableRef{ |
| MountIndex: i, |
| Ref: active, |
| }) |
| if m.Output != opspb.SkipOutput && ref != nil { |
| p.OutputRefs = append(p.OutputRefs, MountRef{ |
| MountIndex: i, |
| Ref: ref.Clone(), |
| }) |
| } |
| |
| case opspb.MountType_TMPFS: |
| mountable = mm.MountableTmpFS() |
| case opspb.MountType_SECRET: |
| var err error |
| mountable, err = mm.MountableSecret(ctx, m, g) |
| if err != nil { |
| return p, err |
| } |
| if mountable == nil { |
| continue |
| } |
| case opspb.MountType_SSH: |
| var err error |
| mountable, err = mm.MountableSSH(ctx, m, g) |
| if err != nil { |
| return p, err |
| } |
| if mountable == nil { |
| continue |
| } |
| |
| default: |
| return p, errors.Errorf("mount type %s not implemented", m.MountType) |
| } |
| |
| // validate that there is a mount |
| if mountable == nil { |
| return p, errors.Errorf("mount %s has no input", m.Dest) |
| } |
| |
| // if dest is root we need mutable ref even if there is no output |
| if m.Dest == opspb.RootMount { |
| root := mountable |
| p.ReadonlyRootFS = m.Readonly |
| if m.Output == opspb.SkipOutput && p.ReadonlyRootFS { |
| active, err := makeMutable(m, ref) |
| if err != nil { |
| return p, err |
| } |
| p.Actives = append(p.Actives, MountMutableRef{ |
| MountIndex: i, |
| Ref: active, |
| }) |
| root = active |
| } |
| p.Root = mountWithSession(root, g) |
| } else { |
| mws := mountWithSession(mountable, g) |
| mws.Dest = m.Dest |
| mws.Readonly = m.Readonly |
| mws.Selector = m.Selector |
| p.Mounts = append(p.Mounts, mws) |
| } |
| } |
| |
| // sort mounts so parents are mounted first |
| sort.Slice(p.Mounts, func(i, j int) bool { |
| return p.Mounts[i].Dest < p.Mounts[j].Dest |
| }) |
| |
| return p, nil |
| } |
| |
| type gatewayContainer struct { |
| id string |
| netMode opspb.NetMode |
| platform opspb.Platform |
| rootFS executor.Mount |
| mounts []executor.Mount |
| executor executor.Executor |
| started bool |
| errGroup *errgroup.Group |
| mu sync.Mutex |
| cleanup []func() error |
| ctx context.Context |
| cancel func() |
| } |
| |
| func (gwCtr *gatewayContainer) Start(ctx context.Context, req client.StartRequest) (client.ContainerProcess, error) { |
| resize := make(chan executor.WinSize) |
| procInfo := executor.ProcessInfo{ |
| Meta: executor.Meta{ |
| Args: req.Args, |
| Env: req.Env, |
| User: req.User, |
| Cwd: req.Cwd, |
| Tty: req.Tty, |
| NetMode: gwCtr.netMode, |
| SecurityMode: req.SecurityMode, |
| }, |
| Stdin: req.Stdin, |
| Stdout: req.Stdout, |
| Stderr: req.Stderr, |
| Resize: resize, |
| } |
| if procInfo.Meta.Cwd == "" { |
| procInfo.Meta.Cwd = "/" |
| } |
| procInfo.Meta.Env = addDefaultEnvvar(procInfo.Meta.Env, "PATH", utilsystem.DefaultPathEnv(gwCtr.platform.OS)) |
| if req.Tty { |
| procInfo.Meta.Env = addDefaultEnvvar(procInfo.Meta.Env, "TERM", "xterm") |
| } |
| |
| // mark that we have started on the first call to execProcess for this |
| // container, so that future calls will call Exec rather than Run |
| gwCtr.mu.Lock() |
| started := gwCtr.started |
| gwCtr.started = true |
| gwCtr.mu.Unlock() |
| |
| eg, ctx := errgroup.WithContext(gwCtr.ctx) |
| gwProc := &gatewayContainerProcess{ |
| resize: resize, |
| errGroup: eg, |
| groupCtx: ctx, |
| } |
| |
| if !started { |
| startedCh := make(chan struct{}) |
| gwProc.errGroup.Go(func() error { |
| logrus.Debugf("Starting new container for %s with args: %q", gwCtr.id, procInfo.Meta.Args) |
| err := gwCtr.executor.Run(ctx, gwCtr.id, gwCtr.rootFS, gwCtr.mounts, procInfo, startedCh) |
| return stack.Enable(err) |
| }) |
| select { |
| case <-ctx.Done(): |
| case <-startedCh: |
| } |
| } else { |
| gwProc.errGroup.Go(func() error { |
| logrus.Debugf("Execing into container %s with args: %q", gwCtr.id, procInfo.Meta.Args) |
| err := gwCtr.executor.Exec(ctx, gwCtr.id, procInfo) |
| return stack.Enable(err) |
| }) |
| } |
| |
| gwCtr.errGroup.Go(gwProc.errGroup.Wait) |
| |
| return gwProc, nil |
| } |
| |
| func (gwCtr *gatewayContainer) Release(ctx context.Context) error { |
| gwCtr.cancel() |
| err1 := gwCtr.errGroup.Wait() |
| |
| var err2 error |
| for i := len(gwCtr.cleanup) - 1; i >= 0; i-- { // call in LIFO order |
| err := gwCtr.cleanup[i]() |
| if err2 == nil { |
| err2 = err |
| } |
| } |
| |
| if err1 != nil { |
| return stack.Enable(err1) |
| } |
| return stack.Enable(err2) |
| } |
| |
| type gatewayContainerProcess struct { |
| errGroup *errgroup.Group |
| groupCtx context.Context |
| resize chan<- executor.WinSize |
| mu sync.Mutex |
| } |
| |
| func (gwProc *gatewayContainerProcess) Wait() error { |
| err := stack.Enable(gwProc.errGroup.Wait()) |
| gwProc.mu.Lock() |
| defer gwProc.mu.Unlock() |
| close(gwProc.resize) |
| return err |
| } |
| |
| func (gwProc *gatewayContainerProcess) Resize(ctx context.Context, size client.WinSize) error { |
| gwProc.mu.Lock() |
| defer gwProc.mu.Unlock() |
| |
| // is the container done or should we proceed with sending event? |
| select { |
| case <-gwProc.groupCtx.Done(): |
| return nil |
| case <-ctx.Done(): |
| return nil |
| default: |
| } |
| |
| // now we select on contexts again in case p.resize blocks b/c |
| // container no longer reading from it. In that case when |
| // the errgroup finishes we want to unblock on the write |
| // and exit |
| select { |
| case <-gwProc.groupCtx.Done(): |
| case <-ctx.Done(): |
| case gwProc.resize <- executor.WinSize{Cols: size.Cols, Rows: size.Rows}: |
| } |
| return nil |
| } |
| |
| func addDefaultEnvvar(env []string, k, v string) []string { |
| for _, e := range env { |
| if strings.HasPrefix(e, k+"=") { |
| return env |
| } |
| } |
| return append(env, k+"="+v) |
| } |
| |
| func mountWithSession(m cache.Mountable, g session.Group) executor.Mount { |
| _, readonly := m.(cache.ImmutableRef) |
| return executor.Mount{ |
| Src: &mountable{m: m, g: g}, |
| Readonly: readonly, |
| } |
| } |
| |
| type mountable struct { |
| m cache.Mountable |
| g session.Group |
| } |
| |
| func (m *mountable) Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { |
| return m.m.Mount(ctx, readonly, m.g) |
| } |