blob: 5a82cddf444ea03278beac203d3229ac3aa09f5c [file] [log] [blame]
package buildkit
import (
"context"
"encoding/json"
"io"
"strings"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/platforms"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/builder"
"github.com/docker/docker/daemon/images"
"github.com/docker/docker/pkg/jsonmessage"
controlapi "github.com/moby/buildkit/api/services/control"
"github.com/moby/buildkit/control"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/tracing"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
grpcmetadata "google.golang.org/grpc/metadata"
)
// Opt is option struct required for creating the builder
type Opt struct {
SessionManager *session.Manager
Root string
Dist images.DistributionServices
}
// Builder can build using BuildKit backend
type Builder struct {
controller *control.Controller
reqBodyHandler *reqBodyHandler
mu sync.Mutex
jobs map[string]*buildJob
}
// New creates a new builder
func New(opt Opt) (*Builder, error) {
reqHandler := newReqBodyHandler(tracing.DefaultTransport)
c, err := newController(reqHandler, opt)
if err != nil {
return nil, err
}
b := &Builder{
controller: c,
reqBodyHandler: reqHandler,
jobs: map[string]*buildJob{},
}
return b, nil
}
// Cancel cancels a build using ID
func (b *Builder) Cancel(ctx context.Context, id string) error {
b.mu.Lock()
if j, ok := b.jobs[id]; ok && j.cancel != nil {
j.cancel()
}
b.mu.Unlock()
return nil
}
// DiskUsage returns a report about space used by build cache
func (b *Builder) DiskUsage(ctx context.Context) ([]*types.BuildCache, error) {
duResp, err := b.controller.DiskUsage(ctx, &controlapi.DiskUsageRequest{})
if err != nil {
return nil, err
}
var items []*types.BuildCache
for _, r := range duResp.Record {
items = append(items, &types.BuildCache{
ID: r.ID,
Mutable: r.Mutable,
InUse: r.InUse,
Size: r.Size_,
CreatedAt: r.CreatedAt,
LastUsedAt: r.LastUsedAt,
UsageCount: int(r.UsageCount),
Parent: r.Parent,
Description: r.Description,
})
}
return items, nil
}
// Prune clears all reclaimable build cache
func (b *Builder) Prune(ctx context.Context) (int64, error) {
ch := make(chan *controlapi.UsageRecord)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
defer close(ch)
return b.controller.Prune(&controlapi.PruneRequest{}, &pruneProxy{
streamProxy: streamProxy{ctx: ctx},
ch: ch,
})
})
var size int64
eg.Go(func() error {
for r := range ch {
size += r.Size_
}
return nil
})
if err := eg.Wait(); err != nil {
return 0, err
}
return size, nil
}
// Build executes a build request
func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder.Result, error) {
var rc = opt.Source
if buildID := opt.Options.BuildID; buildID != "" {
b.mu.Lock()
upload := false
if strings.HasPrefix(buildID, "upload-request:") {
upload = true
buildID = strings.TrimPrefix(buildID, "upload-request:")
}
if _, ok := b.jobs[buildID]; !ok {
b.jobs[buildID] = newBuildJob()
}
j := b.jobs[buildID]
var cancel func()
ctx, cancel = context.WithCancel(ctx)
j.cancel = cancel
b.mu.Unlock()
if upload {
ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err := j.SetUpload(ctx2, rc)
return nil, err
}
if remoteContext := opt.Options.RemoteContext; remoteContext == "upload-request" {
ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var err error
rc, err = j.WaitUpload(ctx2)
if err != nil {
return nil, err
}
opt.Options.RemoteContext = ""
}
defer func() {
delete(b.jobs, buildID)
}()
}
var out builder.Result
id := identity.NewID()
frontendAttrs := map[string]string{}
if opt.Options.Target != "" {
frontendAttrs["target"] = opt.Options.Target
}
if opt.Options.Dockerfile != "" && opt.Options.Dockerfile != "." {
frontendAttrs["filename"] = opt.Options.Dockerfile
}
if opt.Options.RemoteContext != "" {
if opt.Options.RemoteContext != "client-session" {
frontendAttrs["context"] = opt.Options.RemoteContext
}
} else {
url, cancel := b.reqBodyHandler.newRequest(rc)
defer cancel()
frontendAttrs["context"] = url
}
cacheFrom := append([]string{}, opt.Options.CacheFrom...)
frontendAttrs["cache-from"] = strings.Join(cacheFrom, ",")
for k, v := range opt.Options.BuildArgs {
if v == nil {
continue
}
frontendAttrs["build-arg:"+k] = *v
}
for k, v := range opt.Options.Labels {
frontendAttrs["label:"+k] = v
}
if opt.Options.NoCache {
frontendAttrs["no-cache"] = ""
}
if opt.Options.Platform != nil {
frontendAttrs["platform"] = platforms.Format(*opt.Options.Platform)
}
exporterAttrs := map[string]string{}
if len(opt.Options.Tags) > 0 {
exporterAttrs["name"] = strings.Join(opt.Options.Tags, ",")
}
req := &controlapi.SolveRequest{
Ref: id,
Exporter: "moby",
ExporterAttrs: exporterAttrs,
Frontend: "dockerfile.v0",
FrontendAttrs: frontendAttrs,
Session: opt.Options.SessionID,
}
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
resp, err := b.controller.Solve(ctx, req)
if err != nil {
return err
}
id, ok := resp.ExporterResponse["containerimage.digest"]
if !ok {
return errors.Errorf("missing image id")
}
out.ImageID = id
return nil
})
ch := make(chan *controlapi.StatusResponse)
eg.Go(func() error {
defer close(ch)
return b.controller.Status(&controlapi.StatusRequest{
Ref: id,
}, &statusProxy{streamProxy: streamProxy{ctx: ctx}, ch: ch})
})
eg.Go(func() error {
for sr := range ch {
dt, err := sr.Marshal()
if err != nil {
return err
}
auxJSONBytes, err := json.Marshal(dt)
if err != nil {
return err
}
auxJSON := new(json.RawMessage)
*auxJSON = auxJSONBytes
msgJSON, err := json.Marshal(&jsonmessage.JSONMessage{ID: "moby.buildkit.trace", Aux: auxJSON})
if err != nil {
return err
}
msgJSON = append(msgJSON, []byte("\r\n")...)
n, err := opt.ProgressWriter.Output.Write(msgJSON)
if err != nil {
return err
}
if n != len(msgJSON) {
return io.ErrShortWrite
}
}
return nil
})
if err := eg.Wait(); err != nil {
return nil, err
}
return &out, nil
}
type streamProxy struct {
ctx context.Context
}
func (sp *streamProxy) SetHeader(_ grpcmetadata.MD) error {
return nil
}
func (sp *streamProxy) SendHeader(_ grpcmetadata.MD) error {
return nil
}
func (sp *streamProxy) SetTrailer(_ grpcmetadata.MD) {
}
func (sp *streamProxy) Context() context.Context {
return sp.ctx
}
func (sp *streamProxy) RecvMsg(m interface{}) error {
return io.EOF
}
type statusProxy struct {
streamProxy
ch chan *controlapi.StatusResponse
}
func (sp *statusProxy) Send(resp *controlapi.StatusResponse) error {
return sp.SendMsg(resp)
}
func (sp *statusProxy) SendMsg(m interface{}) error {
if sr, ok := m.(*controlapi.StatusResponse); ok {
sp.ch <- sr
}
return nil
}
type pruneProxy struct {
streamProxy
ch chan *controlapi.UsageRecord
}
func (sp *pruneProxy) Send(resp *controlapi.UsageRecord) error {
return sp.SendMsg(resp)
}
func (sp *pruneProxy) SendMsg(m interface{}) error {
if sr, ok := m.(*controlapi.UsageRecord); ok {
sp.ch <- sr
}
return nil
}
type contentStoreNoLabels struct {
content.Store
}
func (c *contentStoreNoLabels) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
return content.Info{}, nil
}
type wrapRC struct {
io.ReadCloser
once sync.Once
err error
waitCh chan struct{}
}
func (w *wrapRC) Read(b []byte) (int, error) {
n, err := w.ReadCloser.Read(b)
if err != nil {
e := err
if e == io.EOF {
e = nil
}
w.close(e)
}
return n, err
}
func (w *wrapRC) Close() error {
err := w.ReadCloser.Close()
w.close(err)
return err
}
func (w *wrapRC) close(err error) {
w.once.Do(func() {
w.err = err
close(w.waitCh)
})
}
func (w *wrapRC) wait() error {
<-w.waitCh
return w.err
}
type buildJob struct {
cancel func()
waitCh chan func(io.ReadCloser) error
}
func newBuildJob() *buildJob {
return &buildJob{waitCh: make(chan func(io.ReadCloser) error)}
}
func (j *buildJob) WaitUpload(ctx context.Context) (io.ReadCloser, error) {
done := make(chan struct{})
var upload io.ReadCloser
fn := func(rc io.ReadCloser) error {
w := &wrapRC{ReadCloser: rc, waitCh: make(chan struct{})}
upload = w
close(done)
return w.wait()
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case j.waitCh <- fn:
<-done
return upload, nil
}
}
func (j *buildJob) SetUpload(ctx context.Context, rc io.ReadCloser) error {
select {
case <-ctx.Done():
return ctx.Err()
case fn := <-j.waitCh:
return fn(rc)
}
}