| package client |
| |
| import ( |
| "context" |
| "io" |
| "os" |
| "path/filepath" |
| "strings" |
| "time" |
| |
| controlapi "github.com/moby/buildkit/api/services/control" |
| "github.com/moby/buildkit/client/llb" |
| "github.com/moby/buildkit/identity" |
| "github.com/moby/buildkit/session" |
| "github.com/moby/buildkit/session/filesync" |
| "github.com/moby/buildkit/session/grpchijack" |
| "github.com/moby/buildkit/solver/pb" |
| "github.com/moby/buildkit/util/entitlements" |
| opentracing "github.com/opentracing/opentracing-go" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| fstypes "github.com/tonistiigi/fsutil/types" |
| "golang.org/x/sync/errgroup" |
| ) |
| |
| type SolveOpt struct { |
| Exporter string |
| ExporterAttrs map[string]string |
| ExporterOutput io.WriteCloser // for ExporterOCI and ExporterDocker |
| ExporterOutputDir string // for ExporterLocal |
| LocalDirs map[string]string |
| SharedKey string |
| Frontend string |
| FrontendAttrs map[string]string |
| ExportCache string |
| ExportCacheAttrs map[string]string |
| ImportCache []string |
| Session []session.Attachable |
| AllowedEntitlements []entitlements.Entitlement |
| } |
| |
| // Solve calls Solve on the controller. |
| // def must be nil if (and only if) opt.Frontend is set. |
| func (c *Client) Solve(ctx context.Context, def *llb.Definition, opt SolveOpt, statusChan chan *SolveStatus) (*SolveResponse, error) { |
| defer func() { |
| if statusChan != nil { |
| close(statusChan) |
| } |
| }() |
| |
| if opt.Frontend == "" && def == nil { |
| return nil, errors.New("invalid empty definition") |
| } |
| if opt.Frontend != "" && def != nil { |
| return nil, errors.Errorf("invalid definition for frontend %s", opt.Frontend) |
| } |
| |
| return c.solve(ctx, def, nil, opt, statusChan) |
| } |
| |
| type runGatewayCB func(ref string, s *session.Session) error |
| |
| func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runGatewayCB, opt SolveOpt, statusChan chan *SolveStatus) (*SolveResponse, error) { |
| if def != nil && runGateway != nil { |
| return nil, errors.New("invalid with def and cb") |
| } |
| |
| syncedDirs, err := prepareSyncedDirs(def, opt.LocalDirs) |
| if err != nil { |
| return nil, err |
| } |
| |
| ref := identity.NewID() |
| eg, ctx := errgroup.WithContext(ctx) |
| |
| statusContext, cancelStatus := context.WithCancel(context.Background()) |
| defer cancelStatus() |
| |
| if span := opentracing.SpanFromContext(ctx); span != nil { |
| statusContext = opentracing.ContextWithSpan(statusContext, span) |
| } |
| |
| s, err := session.NewSession(statusContext, defaultSessionName(), opt.SharedKey) |
| if err != nil { |
| return nil, errors.Wrap(err, "failed to create session") |
| } |
| |
| if len(syncedDirs) > 0 { |
| s.Allow(filesync.NewFSSyncProvider(syncedDirs)) |
| } |
| |
| for _, a := range opt.Session { |
| s.Allow(a) |
| } |
| |
| switch opt.Exporter { |
| case ExporterLocal: |
| if opt.ExporterOutput != nil { |
| return nil, errors.New("output file writer is not supported by local exporter") |
| } |
| if opt.ExporterOutputDir == "" { |
| return nil, errors.New("output directory is required for local exporter") |
| } |
| s.Allow(filesync.NewFSSyncTargetDir(opt.ExporterOutputDir)) |
| case ExporterOCI, ExporterDocker: |
| if opt.ExporterOutputDir != "" { |
| return nil, errors.Errorf("output directory %s is not supported by %s exporter", opt.ExporterOutputDir, opt.Exporter) |
| } |
| if opt.ExporterOutput == nil { |
| return nil, errors.Errorf("output file writer is required for %s exporter", opt.Exporter) |
| } |
| s.Allow(filesync.NewFSSyncTarget(opt.ExporterOutput)) |
| default: |
| if opt.ExporterOutput != nil { |
| return nil, errors.Errorf("output file writer is not supported by %s exporter", opt.Exporter) |
| } |
| if opt.ExporterOutputDir != "" { |
| return nil, errors.Errorf("output directory %s is not supported by %s exporter", opt.ExporterOutputDir, opt.Exporter) |
| } |
| } |
| |
| eg.Go(func() error { |
| return s.Run(statusContext, grpchijack.Dialer(c.controlClient())) |
| }) |
| |
| solveCtx, cancelSolve := context.WithCancel(ctx) |
| var res *SolveResponse |
| eg.Go(func() error { |
| ctx := solveCtx |
| defer cancelSolve() |
| |
| defer func() { // make sure the Status ends cleanly on build errors |
| go func() { |
| <-time.After(3 * time.Second) |
| cancelStatus() |
| }() |
| logrus.Debugf("stopping session") |
| s.Close() |
| }() |
| var pbd *pb.Definition |
| if def != nil { |
| pbd = def.ToPB() |
| } |
| resp, err := c.controlClient().Solve(ctx, &controlapi.SolveRequest{ |
| Ref: ref, |
| Definition: pbd, |
| Exporter: opt.Exporter, |
| ExporterAttrs: opt.ExporterAttrs, |
| Session: s.ID(), |
| Frontend: opt.Frontend, |
| FrontendAttrs: opt.FrontendAttrs, |
| Cache: controlapi.CacheOptions{ |
| ExportRef: opt.ExportCache, |
| ImportRefs: opt.ImportCache, |
| ExportAttrs: opt.ExportCacheAttrs, |
| }, |
| Entitlements: opt.AllowedEntitlements, |
| }) |
| if err != nil { |
| return errors.Wrap(err, "failed to solve") |
| } |
| res = &SolveResponse{ |
| ExporterResponse: resp.ExporterResponse, |
| } |
| return nil |
| }) |
| |
| if runGateway != nil { |
| eg.Go(func() error { |
| err := runGateway(ref, s) |
| if err == nil { |
| return nil |
| } |
| |
| // If the callback failed then the main |
| // `Solve` (called above) should error as |
| // well. However as a fallback we wait up to |
| // 5s for that to happen before failing this |
| // goroutine. |
| select { |
| case <-solveCtx.Done(): |
| case <-time.After(5 * time.Second): |
| cancelSolve() |
| } |
| |
| return err |
| }) |
| } |
| |
| eg.Go(func() error { |
| stream, err := c.controlClient().Status(statusContext, &controlapi.StatusRequest{ |
| Ref: ref, |
| }) |
| if err != nil { |
| return errors.Wrap(err, "failed to get status") |
| } |
| for { |
| resp, err := stream.Recv() |
| if err != nil { |
| if err == io.EOF { |
| return nil |
| } |
| return errors.Wrap(err, "failed to receive status") |
| } |
| s := SolveStatus{} |
| for _, v := range resp.Vertexes { |
| s.Vertexes = append(s.Vertexes, &Vertex{ |
| Digest: v.Digest, |
| Inputs: v.Inputs, |
| Name: v.Name, |
| Started: v.Started, |
| Completed: v.Completed, |
| Error: v.Error, |
| Cached: v.Cached, |
| }) |
| } |
| for _, v := range resp.Statuses { |
| s.Statuses = append(s.Statuses, &VertexStatus{ |
| ID: v.ID, |
| Vertex: v.Vertex, |
| Name: v.Name, |
| Total: v.Total, |
| Current: v.Current, |
| Timestamp: v.Timestamp, |
| Started: v.Started, |
| Completed: v.Completed, |
| }) |
| } |
| for _, v := range resp.Logs { |
| s.Logs = append(s.Logs, &VertexLog{ |
| Vertex: v.Vertex, |
| Stream: int(v.Stream), |
| Data: v.Msg, |
| Timestamp: v.Timestamp, |
| }) |
| } |
| if statusChan != nil { |
| statusChan <- &s |
| } |
| } |
| }) |
| |
| if err := eg.Wait(); err != nil { |
| return nil, err |
| } |
| return res, nil |
| } |
| |
| func prepareSyncedDirs(def *llb.Definition, localDirs map[string]string) ([]filesync.SyncedDir, error) { |
| for _, d := range localDirs { |
| fi, err := os.Stat(d) |
| if err != nil { |
| return nil, errors.Wrapf(err, "could not find %s", d) |
| } |
| if !fi.IsDir() { |
| return nil, errors.Errorf("%s not a directory", d) |
| } |
| } |
| resetUIDAndGID := func(st *fstypes.Stat) bool { |
| st.Uid = 0 |
| st.Gid = 0 |
| return true |
| } |
| |
| dirs := make([]filesync.SyncedDir, 0, len(localDirs)) |
| if def == nil { |
| for name, d := range localDirs { |
| dirs = append(dirs, filesync.SyncedDir{Name: name, Dir: d, Map: resetUIDAndGID}) |
| } |
| } else { |
| for _, dt := range def.Def { |
| var op pb.Op |
| if err := (&op).Unmarshal(dt); err != nil { |
| return nil, errors.Wrap(err, "failed to parse llb proto op") |
| } |
| if src := op.GetSource(); src != nil { |
| if strings.HasPrefix(src.Identifier, "local://") { // TODO: just make a type property |
| name := strings.TrimPrefix(src.Identifier, "local://") |
| d, ok := localDirs[name] |
| if !ok { |
| return nil, errors.Errorf("local directory %s not enabled", name) |
| } |
| dirs = append(dirs, filesync.SyncedDir{Name: name, Dir: d, Map: resetUIDAndGID}) // TODO: excludes |
| } |
| } |
| } |
| } |
| return dirs, nil |
| } |
| |
| func defaultSessionName() string { |
| wd, err := os.Getwd() |
| if err != nil { |
| return "unknown" |
| } |
| return filepath.Base(wd) |
| } |