| package control |
| |
| import ( |
| "context" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| controlapi "github.com/moby/buildkit/api/services/control" |
| apitypes "github.com/moby/buildkit/api/types" |
| "github.com/moby/buildkit/cache/remotecache" |
| "github.com/moby/buildkit/client" |
| controlgateway "github.com/moby/buildkit/control/gateway" |
| "github.com/moby/buildkit/exporter" |
| "github.com/moby/buildkit/frontend" |
| "github.com/moby/buildkit/session" |
| "github.com/moby/buildkit/session/grpchijack" |
| "github.com/moby/buildkit/solver" |
| "github.com/moby/buildkit/solver/llbsolver" |
| "github.com/moby/buildkit/solver/pb" |
| "github.com/moby/buildkit/util/imageutil" |
| "github.com/moby/buildkit/util/throttle" |
| "github.com/moby/buildkit/worker" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| "golang.org/x/sync/errgroup" |
| "google.golang.org/grpc" |
| ) |
| |
| type Opt struct { |
| SessionManager *session.Manager |
| WorkerController *worker.Controller |
| Frontends map[string]frontend.Frontend |
| CacheKeyStorage solver.CacheKeyStorage |
| ResolveCacheExporterFuncs map[string]remotecache.ResolveCacheExporterFunc |
| ResolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc |
| Entitlements []string |
| } |
| |
| type Controller struct { // TODO: ControlService |
| buildCount int64 |
| opt Opt |
| solver *llbsolver.Solver |
| cache solver.CacheManager |
| gatewayForwarder *controlgateway.GatewayForwarder |
| throttledGC func() |
| gcmu sync.Mutex |
| } |
| |
| func NewController(opt Opt) (*Controller, error) { |
| cache := solver.NewCacheManager("local", opt.CacheKeyStorage, worker.NewCacheResultStorage(opt.WorkerController)) |
| |
| gatewayForwarder := controlgateway.NewGatewayForwarder() |
| |
| solver, err := llbsolver.New(opt.WorkerController, opt.Frontends, cache, opt.ResolveCacheImporterFuncs, gatewayForwarder, opt.SessionManager, opt.Entitlements) |
| if err != nil { |
| return nil, errors.Wrap(err, "failed to create solver") |
| } |
| |
| c := &Controller{ |
| opt: opt, |
| solver: solver, |
| cache: cache, |
| gatewayForwarder: gatewayForwarder, |
| } |
| c.throttledGC = throttle.ThrottleAfter(time.Minute, c.gc) |
| |
| defer func() { |
| time.AfterFunc(time.Second, c.throttledGC) |
| }() |
| |
| return c, nil |
| } |
| |
| func (c *Controller) Register(server *grpc.Server) error { |
| controlapi.RegisterControlServer(server, c) |
| c.gatewayForwarder.Register(server) |
| return nil |
| } |
| |
| func (c *Controller) DiskUsage(ctx context.Context, r *controlapi.DiskUsageRequest) (*controlapi.DiskUsageResponse, error) { |
| resp := &controlapi.DiskUsageResponse{} |
| workers, err := c.opt.WorkerController.List() |
| if err != nil { |
| return nil, err |
| } |
| for _, w := range workers { |
| du, err := w.DiskUsage(ctx, client.DiskUsageInfo{ |
| Filter: r.Filter, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| |
| for _, r := range du { |
| resp.Record = append(resp.Record, &controlapi.UsageRecord{ |
| // TODO: add worker info |
| ID: r.ID, |
| Mutable: r.Mutable, |
| InUse: r.InUse, |
| Size_: r.Size, |
| Parent: r.Parent, |
| UsageCount: int64(r.UsageCount), |
| Description: r.Description, |
| CreatedAt: r.CreatedAt, |
| LastUsedAt: r.LastUsedAt, |
| RecordType: string(r.RecordType), |
| Shared: r.Shared, |
| }) |
| } |
| } |
| return resp, nil |
| } |
| |
| func (c *Controller) Prune(req *controlapi.PruneRequest, stream controlapi.Control_PruneServer) error { |
| if atomic.LoadInt64(&c.buildCount) == 0 { |
| imageutil.CancelCacheLeases() |
| } |
| |
| ch := make(chan client.UsageInfo) |
| |
| eg, ctx := errgroup.WithContext(stream.Context()) |
| workers, err := c.opt.WorkerController.List() |
| if err != nil { |
| return errors.Wrap(err, "failed to list workers for prune") |
| } |
| |
| didPrune := false |
| defer func() { |
| if didPrune { |
| if c, ok := c.cache.(interface { |
| ReleaseUnreferenced() error |
| }); ok { |
| if err := c.ReleaseUnreferenced(); err != nil { |
| logrus.Errorf("failed to release cache metadata: %+v", err) |
| } |
| } |
| } |
| }() |
| |
| for _, w := range workers { |
| func(w worker.Worker) { |
| eg.Go(func() error { |
| return w.Prune(ctx, ch, client.PruneInfo{ |
| Filter: req.Filter, |
| All: req.All, |
| KeepDuration: time.Duration(req.KeepDuration), |
| KeepBytes: req.KeepBytes, |
| }) |
| }) |
| }(w) |
| } |
| |
| eg2, _ := errgroup.WithContext(stream.Context()) |
| |
| eg2.Go(func() error { |
| defer close(ch) |
| return eg.Wait() |
| }) |
| |
| eg2.Go(func() error { |
| for r := range ch { |
| didPrune = true |
| if err := stream.Send(&controlapi.UsageRecord{ |
| // TODO: add worker info |
| ID: r.ID, |
| Mutable: r.Mutable, |
| InUse: r.InUse, |
| Size_: r.Size, |
| Parent: r.Parent, |
| UsageCount: int64(r.UsageCount), |
| Description: r.Description, |
| CreatedAt: r.CreatedAt, |
| LastUsedAt: r.LastUsedAt, |
| RecordType: string(r.RecordType), |
| Shared: r.Shared, |
| }); err != nil { |
| return err |
| } |
| } |
| return nil |
| }) |
| |
| return eg2.Wait() |
| } |
| |
| func translateLegacySolveRequest(req *controlapi.SolveRequest) error { |
| // translates ExportRef and ExportAttrs to new Exports (v0.4.0) |
| if legacyExportRef := req.Cache.ExportRefDeprecated; legacyExportRef != "" { |
| ex := &controlapi.CacheOptionsEntry{ |
| Type: "registry", |
| Attrs: req.Cache.ExportAttrsDeprecated, |
| } |
| if ex.Attrs == nil { |
| ex.Attrs = make(map[string]string) |
| } |
| ex.Attrs["ref"] = legacyExportRef |
| // FIXME(AkihiroSuda): skip append if already exists |
| req.Cache.Exports = append(req.Cache.Exports, ex) |
| req.Cache.ExportRefDeprecated = "" |
| req.Cache.ExportAttrsDeprecated = nil |
| } |
| // translates ImportRefs to new Imports (v0.4.0) |
| for _, legacyImportRef := range req.Cache.ImportRefsDeprecated { |
| im := &controlapi.CacheOptionsEntry{ |
| Type: "registry", |
| Attrs: map[string]string{"ref": legacyImportRef}, |
| } |
| // FIXME(AkihiroSuda): skip append if already exists |
| req.Cache.Imports = append(req.Cache.Imports, im) |
| } |
| req.Cache.ImportRefsDeprecated = nil |
| return nil |
| } |
| |
| func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*controlapi.SolveResponse, error) { |
| atomic.AddInt64(&c.buildCount, 1) |
| defer atomic.AddInt64(&c.buildCount, -1) |
| |
| if err := translateLegacySolveRequest(req); err != nil { |
| return nil, err |
| } |
| |
| defer func() { |
| time.AfterFunc(time.Second, c.throttledGC) |
| }() |
| |
| var expi exporter.ExporterInstance |
| // TODO: multiworker |
| // This is actually tricky, as the exporter should come from the worker that has the returned reference. We may need to delay this so that the solver loads this. |
| w, err := c.opt.WorkerController.GetDefault() |
| if err != nil { |
| return nil, err |
| } |
| if req.Exporter != "" { |
| exp, err := w.Exporter(req.Exporter, c.opt.SessionManager) |
| if err != nil { |
| return nil, err |
| } |
| expi, err = exp.Resolve(ctx, req.ExporterAttrs) |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| var ( |
| cacheExporter remotecache.Exporter |
| cacheExportMode solver.CacheExportMode |
| cacheImports []frontend.CacheOptionsEntry |
| ) |
| if len(req.Cache.Exports) > 1 { |
| // TODO(AkihiroSuda): this should be fairly easy |
| return nil, errors.New("specifying multiple cache exports is not supported currently") |
| } |
| |
| if len(req.Cache.Exports) == 1 { |
| e := req.Cache.Exports[0] |
| cacheExporterFunc, ok := c.opt.ResolveCacheExporterFuncs[e.Type] |
| if !ok { |
| return nil, errors.Errorf("unknown cache exporter: %q", e.Type) |
| } |
| cacheExporter, err = cacheExporterFunc(ctx, session.NewGroup(req.Session), e.Attrs) |
| if err != nil { |
| return nil, err |
| } |
| cacheExportMode = parseCacheExportMode(e.Attrs["mode"]) |
| } |
| for _, im := range req.Cache.Imports { |
| cacheImports = append(cacheImports, frontend.CacheOptionsEntry{ |
| Type: im.Type, |
| Attrs: im.Attrs, |
| }) |
| } |
| |
| resp, err := c.solver.Solve(ctx, req.Ref, req.Session, frontend.SolveRequest{ |
| Frontend: req.Frontend, |
| Definition: req.Definition, |
| FrontendOpt: req.FrontendAttrs, |
| FrontendInputs: req.FrontendInputs, |
| CacheImports: cacheImports, |
| }, llbsolver.ExporterRequest{ |
| Exporter: expi, |
| CacheExporter: cacheExporter, |
| CacheExportMode: cacheExportMode, |
| }, req.Entitlements) |
| if err != nil { |
| return nil, err |
| } |
| return &controlapi.SolveResponse{ |
| ExporterResponse: resp.ExporterResponse, |
| }, nil |
| } |
| |
| func (c *Controller) Status(req *controlapi.StatusRequest, stream controlapi.Control_StatusServer) error { |
| ch := make(chan *client.SolveStatus, 8) |
| |
| eg, ctx := errgroup.WithContext(stream.Context()) |
| eg.Go(func() error { |
| return c.solver.Status(ctx, req.Ref, ch) |
| }) |
| |
| eg.Go(func() error { |
| for { |
| ss, ok := <-ch |
| if !ok { |
| return nil |
| } |
| sr := controlapi.StatusResponse{} |
| for _, v := range ss.Vertexes { |
| sr.Vertexes = append(sr.Vertexes, &controlapi.Vertex{ |
| Digest: v.Digest, |
| Inputs: v.Inputs, |
| Name: v.Name, |
| Started: v.Started, |
| Completed: v.Completed, |
| Error: v.Error, |
| Cached: v.Cached, |
| }) |
| } |
| for _, v := range ss.Statuses { |
| sr.Statuses = append(sr.Statuses, &controlapi.VertexStatus{ |
| ID: v.ID, |
| Vertex: v.Vertex, |
| Name: v.Name, |
| Current: v.Current, |
| Total: v.Total, |
| Timestamp: v.Timestamp, |
| Started: v.Started, |
| Completed: v.Completed, |
| }) |
| } |
| for _, v := range ss.Logs { |
| sr.Logs = append(sr.Logs, &controlapi.VertexLog{ |
| Vertex: v.Vertex, |
| Stream: int64(v.Stream), |
| Msg: v.Data, |
| Timestamp: v.Timestamp, |
| }) |
| } |
| if err := stream.SendMsg(&sr); err != nil { |
| return err |
| } |
| } |
| }) |
| |
| return eg.Wait() |
| } |
| |
| func (c *Controller) Session(stream controlapi.Control_SessionServer) error { |
| logrus.Debugf("session started") |
| conn, closeCh, opts := grpchijack.Hijack(stream) |
| defer conn.Close() |
| |
| ctx, cancel := context.WithCancel(stream.Context()) |
| go func() { |
| <-closeCh |
| cancel() |
| }() |
| |
| err := c.opt.SessionManager.HandleConn(ctx, conn, opts) |
| logrus.Debugf("session finished: %v", err) |
| return err |
| } |
| |
| func (c *Controller) ListWorkers(ctx context.Context, r *controlapi.ListWorkersRequest) (*controlapi.ListWorkersResponse, error) { |
| resp := &controlapi.ListWorkersResponse{} |
| workers, err := c.opt.WorkerController.List(r.Filter...) |
| if err != nil { |
| return nil, err |
| } |
| for _, w := range workers { |
| resp.Record = append(resp.Record, &apitypes.WorkerRecord{ |
| ID: w.ID(), |
| Labels: w.Labels(), |
| Platforms: pb.PlatformsFromSpec(w.Platforms(true)), |
| GCPolicy: toPBGCPolicy(w.GCPolicy()), |
| }) |
| } |
| return resp, nil |
| } |
| |
| func (c *Controller) gc() { |
| c.gcmu.Lock() |
| defer c.gcmu.Unlock() |
| |
| workers, err := c.opt.WorkerController.List() |
| if err != nil { |
| return |
| } |
| |
| eg, ctx := errgroup.WithContext(context.TODO()) |
| |
| var size int64 |
| ch := make(chan client.UsageInfo) |
| done := make(chan struct{}) |
| go func() { |
| for ui := range ch { |
| size += ui.Size |
| } |
| close(done) |
| }() |
| |
| for _, w := range workers { |
| func(w worker.Worker) { |
| eg.Go(func() error { |
| if policy := w.GCPolicy(); len(policy) > 0 { |
| return w.Prune(ctx, ch, policy...) |
| } |
| return nil |
| }) |
| }(w) |
| } |
| |
| err = eg.Wait() |
| close(ch) |
| if err != nil { |
| logrus.Errorf("gc error: %+v", err) |
| } |
| <-done |
| if size > 0 { |
| logrus.Debugf("gc cleaned up %d bytes", size) |
| } |
| } |
| |
| func parseCacheExportMode(mode string) solver.CacheExportMode { |
| switch mode { |
| case "min": |
| return solver.CacheExportModeMin |
| case "max": |
| return solver.CacheExportModeMax |
| case "": |
| default: |
| logrus.Debugf("skipping invalid cache export mode: %s", mode) |
| } |
| return solver.CacheExportModeMin |
| } |
| |
| func toPBGCPolicy(in []client.PruneInfo) []*apitypes.GCPolicy { |
| policy := make([]*apitypes.GCPolicy, 0, len(in)) |
| for _, p := range in { |
| policy = append(policy, &apitypes.GCPolicy{ |
| All: p.All, |
| KeepBytes: p.KeepBytes, |
| KeepDuration: int64(p.KeepDuration), |
| Filters: p.Filter, |
| }) |
| } |
| return policy |
| } |