| package buildkit |
| |
| import ( |
| "context" |
| "encoding/json" |
| "io" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/containerd/containerd/content" |
| "github.com/containerd/containerd/platforms" |
| "github.com/docker/docker/api/types" |
| "github.com/docker/docker/api/types/backend" |
| "github.com/docker/docker/builder" |
| "github.com/docker/docker/daemon/images" |
| "github.com/docker/docker/pkg/jsonmessage" |
| controlapi "github.com/moby/buildkit/api/services/control" |
| "github.com/moby/buildkit/control" |
| "github.com/moby/buildkit/identity" |
| "github.com/moby/buildkit/session" |
| "github.com/moby/buildkit/util/tracing" |
| "github.com/pkg/errors" |
| "golang.org/x/sync/errgroup" |
| grpcmetadata "google.golang.org/grpc/metadata" |
| ) |
| |
| // Opt is option struct required for creating the builder |
| type Opt struct { |
| SessionManager *session.Manager |
| Root string |
| Dist images.DistributionServices |
| } |
| |
| // Builder can build using BuildKit backend |
| type Builder struct { |
| controller *control.Controller |
| reqBodyHandler *reqBodyHandler |
| |
| mu sync.Mutex |
| jobs map[string]*buildJob |
| } |
| |
| // New creates a new builder |
| func New(opt Opt) (*Builder, error) { |
| reqHandler := newReqBodyHandler(tracing.DefaultTransport) |
| |
| c, err := newController(reqHandler, opt) |
| if err != nil { |
| return nil, err |
| } |
| b := &Builder{ |
| controller: c, |
| reqBodyHandler: reqHandler, |
| jobs: map[string]*buildJob{}, |
| } |
| return b, nil |
| } |
| |
| // Cancel cancels a build using ID |
| func (b *Builder) Cancel(ctx context.Context, id string) error { |
| b.mu.Lock() |
| if j, ok := b.jobs[id]; ok && j.cancel != nil { |
| j.cancel() |
| } |
| b.mu.Unlock() |
| return nil |
| } |
| |
| // DiskUsage returns a report about space used by build cache |
| func (b *Builder) DiskUsage(ctx context.Context) ([]*types.BuildCache, error) { |
| duResp, err := b.controller.DiskUsage(ctx, &controlapi.DiskUsageRequest{}) |
| if err != nil { |
| return nil, err |
| } |
| |
| var items []*types.BuildCache |
| for _, r := range duResp.Record { |
| items = append(items, &types.BuildCache{ |
| ID: r.ID, |
| Mutable: r.Mutable, |
| InUse: r.InUse, |
| Size: r.Size_, |
| |
| CreatedAt: r.CreatedAt, |
| LastUsedAt: r.LastUsedAt, |
| UsageCount: int(r.UsageCount), |
| Parent: r.Parent, |
| Description: r.Description, |
| }) |
| } |
| return items, nil |
| } |
| |
| // Prune clears all reclaimable build cache |
| func (b *Builder) Prune(ctx context.Context) (int64, error) { |
| ch := make(chan *controlapi.UsageRecord) |
| |
| eg, ctx := errgroup.WithContext(ctx) |
| |
| eg.Go(func() error { |
| defer close(ch) |
| return b.controller.Prune(&controlapi.PruneRequest{}, &pruneProxy{ |
| streamProxy: streamProxy{ctx: ctx}, |
| ch: ch, |
| }) |
| }) |
| |
| var size int64 |
| eg.Go(func() error { |
| for r := range ch { |
| size += r.Size_ |
| } |
| return nil |
| }) |
| |
| if err := eg.Wait(); err != nil { |
| return 0, err |
| } |
| |
| return size, nil |
| } |
| |
| // Build executes a build request |
| func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder.Result, error) { |
| var rc = opt.Source |
| |
| if buildID := opt.Options.BuildID; buildID != "" { |
| b.mu.Lock() |
| |
| upload := false |
| if strings.HasPrefix(buildID, "upload-request:") { |
| upload = true |
| buildID = strings.TrimPrefix(buildID, "upload-request:") |
| } |
| |
| if _, ok := b.jobs[buildID]; !ok { |
| b.jobs[buildID] = newBuildJob() |
| } |
| j := b.jobs[buildID] |
| var cancel func() |
| ctx, cancel = context.WithCancel(ctx) |
| j.cancel = cancel |
| b.mu.Unlock() |
| |
| if upload { |
| ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) |
| defer cancel() |
| err := j.SetUpload(ctx2, rc) |
| return nil, err |
| } |
| |
| if remoteContext := opt.Options.RemoteContext; remoteContext == "upload-request" { |
| ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) |
| defer cancel() |
| var err error |
| rc, err = j.WaitUpload(ctx2) |
| if err != nil { |
| return nil, err |
| } |
| opt.Options.RemoteContext = "" |
| } |
| |
| defer func() { |
| delete(b.jobs, buildID) |
| }() |
| } |
| |
| var out builder.Result |
| |
| id := identity.NewID() |
| |
| frontendAttrs := map[string]string{} |
| |
| if opt.Options.Target != "" { |
| frontendAttrs["target"] = opt.Options.Target |
| } |
| |
| if opt.Options.Dockerfile != "" && opt.Options.Dockerfile != "." { |
| frontendAttrs["filename"] = opt.Options.Dockerfile |
| } |
| |
| if opt.Options.RemoteContext != "" { |
| if opt.Options.RemoteContext != "client-session" { |
| frontendAttrs["context"] = opt.Options.RemoteContext |
| } |
| } else { |
| url, cancel := b.reqBodyHandler.newRequest(rc) |
| defer cancel() |
| frontendAttrs["context"] = url |
| } |
| |
| cacheFrom := append([]string{}, opt.Options.CacheFrom...) |
| |
| frontendAttrs["cache-from"] = strings.Join(cacheFrom, ",") |
| |
| for k, v := range opt.Options.BuildArgs { |
| if v == nil { |
| continue |
| } |
| frontendAttrs["build-arg:"+k] = *v |
| } |
| |
| for k, v := range opt.Options.Labels { |
| frontendAttrs["label:"+k] = v |
| } |
| |
| if opt.Options.NoCache { |
| frontendAttrs["no-cache"] = "" |
| } |
| |
| if opt.Options.Platform != nil { |
| frontendAttrs["platform"] = platforms.Format(*opt.Options.Platform) |
| } |
| |
| exporterAttrs := map[string]string{} |
| |
| if len(opt.Options.Tags) > 0 { |
| exporterAttrs["name"] = strings.Join(opt.Options.Tags, ",") |
| } |
| |
| req := &controlapi.SolveRequest{ |
| Ref: id, |
| Exporter: "moby", |
| ExporterAttrs: exporterAttrs, |
| Frontend: "dockerfile.v0", |
| FrontendAttrs: frontendAttrs, |
| Session: opt.Options.SessionID, |
| } |
| |
| eg, ctx := errgroup.WithContext(ctx) |
| |
| eg.Go(func() error { |
| resp, err := b.controller.Solve(ctx, req) |
| if err != nil { |
| return err |
| } |
| id, ok := resp.ExporterResponse["containerimage.digest"] |
| if !ok { |
| return errors.Errorf("missing image id") |
| } |
| out.ImageID = id |
| return nil |
| }) |
| |
| ch := make(chan *controlapi.StatusResponse) |
| |
| eg.Go(func() error { |
| defer close(ch) |
| return b.controller.Status(&controlapi.StatusRequest{ |
| Ref: id, |
| }, &statusProxy{streamProxy: streamProxy{ctx: ctx}, ch: ch}) |
| }) |
| |
| eg.Go(func() error { |
| for sr := range ch { |
| dt, err := sr.Marshal() |
| if err != nil { |
| return err |
| } |
| |
| auxJSONBytes, err := json.Marshal(dt) |
| if err != nil { |
| return err |
| } |
| auxJSON := new(json.RawMessage) |
| *auxJSON = auxJSONBytes |
| msgJSON, err := json.Marshal(&jsonmessage.JSONMessage{ID: "moby.buildkit.trace", Aux: auxJSON}) |
| if err != nil { |
| return err |
| } |
| msgJSON = append(msgJSON, []byte("\r\n")...) |
| n, err := opt.ProgressWriter.Output.Write(msgJSON) |
| if err != nil { |
| return err |
| } |
| if n != len(msgJSON) { |
| return io.ErrShortWrite |
| } |
| } |
| return nil |
| }) |
| |
| if err := eg.Wait(); err != nil { |
| return nil, err |
| } |
| |
| return &out, nil |
| } |
| |
| type streamProxy struct { |
| ctx context.Context |
| } |
| |
| func (sp *streamProxy) SetHeader(_ grpcmetadata.MD) error { |
| return nil |
| } |
| |
| func (sp *streamProxy) SendHeader(_ grpcmetadata.MD) error { |
| return nil |
| } |
| |
| func (sp *streamProxy) SetTrailer(_ grpcmetadata.MD) { |
| } |
| |
| func (sp *streamProxy) Context() context.Context { |
| return sp.ctx |
| } |
| func (sp *streamProxy) RecvMsg(m interface{}) error { |
| return io.EOF |
| } |
| |
| type statusProxy struct { |
| streamProxy |
| ch chan *controlapi.StatusResponse |
| } |
| |
| func (sp *statusProxy) Send(resp *controlapi.StatusResponse) error { |
| return sp.SendMsg(resp) |
| } |
| func (sp *statusProxy) SendMsg(m interface{}) error { |
| if sr, ok := m.(*controlapi.StatusResponse); ok { |
| sp.ch <- sr |
| } |
| return nil |
| } |
| |
| type pruneProxy struct { |
| streamProxy |
| ch chan *controlapi.UsageRecord |
| } |
| |
| func (sp *pruneProxy) Send(resp *controlapi.UsageRecord) error { |
| return sp.SendMsg(resp) |
| } |
| func (sp *pruneProxy) SendMsg(m interface{}) error { |
| if sr, ok := m.(*controlapi.UsageRecord); ok { |
| sp.ch <- sr |
| } |
| return nil |
| } |
| |
| type contentStoreNoLabels struct { |
| content.Store |
| } |
| |
| func (c *contentStoreNoLabels) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { |
| return content.Info{}, nil |
| } |
| |
| type wrapRC struct { |
| io.ReadCloser |
| once sync.Once |
| err error |
| waitCh chan struct{} |
| } |
| |
| func (w *wrapRC) Read(b []byte) (int, error) { |
| n, err := w.ReadCloser.Read(b) |
| if err != nil { |
| e := err |
| if e == io.EOF { |
| e = nil |
| } |
| w.close(e) |
| } |
| return n, err |
| } |
| |
| func (w *wrapRC) Close() error { |
| err := w.ReadCloser.Close() |
| w.close(err) |
| return err |
| } |
| |
| func (w *wrapRC) close(err error) { |
| w.once.Do(func() { |
| w.err = err |
| close(w.waitCh) |
| }) |
| } |
| |
| func (w *wrapRC) wait() error { |
| <-w.waitCh |
| return w.err |
| } |
| |
| type buildJob struct { |
| cancel func() |
| waitCh chan func(io.ReadCloser) error |
| } |
| |
| func newBuildJob() *buildJob { |
| return &buildJob{waitCh: make(chan func(io.ReadCloser) error)} |
| } |
| |
| func (j *buildJob) WaitUpload(ctx context.Context) (io.ReadCloser, error) { |
| done := make(chan struct{}) |
| |
| var upload io.ReadCloser |
| fn := func(rc io.ReadCloser) error { |
| w := &wrapRC{ReadCloser: rc, waitCh: make(chan struct{})} |
| upload = w |
| close(done) |
| return w.wait() |
| } |
| |
| select { |
| case <-ctx.Done(): |
| return nil, ctx.Err() |
| case j.waitCh <- fn: |
| <-done |
| return upload, nil |
| } |
| } |
| |
| func (j *buildJob) SetUpload(ctx context.Context, rc io.ReadCloser) error { |
| select { |
| case <-ctx.Done(): |
| return ctx.Err() |
| case fn := <-j.waitCh: |
| return fn(rc) |
| } |
| } |