blob: 96862ed680c12c8963d7a8db1ccf5a2d93e36c79 [file] [log] [blame]
package control
import (
"context"
"sync"
"time"
"github.com/docker/distribution/reference"
controlapi "github.com/moby/buildkit/api/services/control"
apitypes "github.com/moby/buildkit/api/types"
"github.com/moby/buildkit/cache/remotecache"
"github.com/moby/buildkit/client"
controlgateway "github.com/moby/buildkit/control/gateway"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/grpchijack"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/throttle"
"github.com/moby/buildkit/worker"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
type ResolveCacheExporterFunc func(ctx context.Context, typ, target string) (remotecache.Exporter, error)
type Opt struct {
SessionManager *session.Manager
WorkerController *worker.Controller
Frontends map[string]frontend.Frontend
CacheKeyStorage solver.CacheKeyStorage
ResolveCacheExporterFunc remotecache.ResolveCacheExporterFunc
ResolveCacheImporterFunc remotecache.ResolveCacheImporterFunc
}
type Controller struct { // TODO: ControlService
opt Opt
solver *llbsolver.Solver
cache solver.CacheManager
gatewayForwarder *controlgateway.GatewayForwarder
throttledGC func()
gcmu sync.Mutex
}
func NewController(opt Opt) (*Controller, error) {
cache := solver.NewCacheManager("local", opt.CacheKeyStorage, worker.NewCacheResultStorage(opt.WorkerController))
gatewayForwarder := controlgateway.NewGatewayForwarder()
solver, err := llbsolver.New(opt.WorkerController, opt.Frontends, cache, opt.ResolveCacheImporterFunc, gatewayForwarder)
if err != nil {
return nil, errors.Wrap(err, "failed to create solver")
}
c := &Controller{
opt: opt,
solver: solver,
cache: cache,
gatewayForwarder: gatewayForwarder,
}
c.throttledGC = throttle.ThrottleAfter(time.Minute, c.gc)
defer func() {
time.AfterFunc(time.Second, c.throttledGC)
}()
return c, nil
}
func (c *Controller) Register(server *grpc.Server) error {
controlapi.RegisterControlServer(server, c)
c.gatewayForwarder.Register(server)
return nil
}
func (c *Controller) DiskUsage(ctx context.Context, r *controlapi.DiskUsageRequest) (*controlapi.DiskUsageResponse, error) {
resp := &controlapi.DiskUsageResponse{}
workers, err := c.opt.WorkerController.List()
if err != nil {
return nil, err
}
for _, w := range workers {
du, err := w.DiskUsage(ctx, client.DiskUsageInfo{
Filter: r.Filter,
})
if err != nil {
return nil, err
}
for _, r := range du {
resp.Record = append(resp.Record, &controlapi.UsageRecord{
// TODO: add worker info
ID: r.ID,
Mutable: r.Mutable,
InUse: r.InUse,
Size_: r.Size,
Parent: r.Parent,
UsageCount: int64(r.UsageCount),
Description: r.Description,
CreatedAt: r.CreatedAt,
LastUsedAt: r.LastUsedAt,
RecordType: string(r.RecordType),
Shared: r.Shared,
})
}
}
return resp, nil
}
func (c *Controller) Prune(req *controlapi.PruneRequest, stream controlapi.Control_PruneServer) error {
ch := make(chan client.UsageInfo)
eg, ctx := errgroup.WithContext(stream.Context())
workers, err := c.opt.WorkerController.List()
if err != nil {
return errors.Wrap(err, "failed to list workers for prune")
}
didPrune := false
defer func() {
if didPrune {
if c, ok := c.cache.(interface {
ReleaseUnreferenced() error
}); ok {
if err := c.ReleaseUnreferenced(); err != nil {
logrus.Errorf("failed to release cache metadata: %+v", err)
}
}
}
}()
for _, w := range workers {
func(w worker.Worker) {
eg.Go(func() error {
return w.Prune(ctx, ch, client.PruneInfo{
Filter: req.Filter,
All: req.All,
KeepDuration: time.Duration(req.KeepDuration),
KeepBytes: req.KeepBytes,
})
})
}(w)
}
eg2, _ := errgroup.WithContext(stream.Context())
eg2.Go(func() error {
defer close(ch)
return eg.Wait()
})
eg2.Go(func() error {
for r := range ch {
didPrune = true
if err := stream.Send(&controlapi.UsageRecord{
// TODO: add worker info
ID: r.ID,
Mutable: r.Mutable,
InUse: r.InUse,
Size_: r.Size,
Parent: r.Parent,
UsageCount: int64(r.UsageCount),
Description: r.Description,
CreatedAt: r.CreatedAt,
LastUsedAt: r.LastUsedAt,
RecordType: string(r.RecordType),
Shared: r.Shared,
}); err != nil {
return err
}
}
return nil
})
return eg2.Wait()
}
func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*controlapi.SolveResponse, error) {
ctx = session.NewContext(ctx, req.Session)
defer func() {
time.AfterFunc(time.Second, c.throttledGC)
}()
var expi exporter.ExporterInstance
// TODO: multiworker
// This is actually tricky, as the exporter should come from the worker that has the returned reference. We may need to delay this so that the solver loads this.
w, err := c.opt.WorkerController.GetDefault()
if err != nil {
return nil, err
}
if req.Exporter != "" {
exp, err := w.Exporter(req.Exporter)
if err != nil {
return nil, err
}
expi, err = exp.Resolve(ctx, req.ExporterAttrs)
if err != nil {
return nil, err
}
}
var cacheExporter remotecache.Exporter
if ref := req.Cache.ExportRef; ref != "" && c.opt.ResolveCacheExporterFunc != nil {
parsed, err := reference.ParseNormalizedNamed(ref)
if err != nil {
return nil, err
}
exportCacheRef := reference.TagNameOnly(parsed).String()
typ := "" // unimplemented yet (typically registry)
cacheExporter, err = c.opt.ResolveCacheExporterFunc(ctx, typ, exportCacheRef)
if err != nil {
return nil, err
}
}
var importCacheRefs []string
for _, ref := range req.Cache.ImportRefs {
parsed, err := reference.ParseNormalizedNamed(ref)
if err != nil {
return nil, err
}
importCacheRefs = append(importCacheRefs, reference.TagNameOnly(parsed).String())
}
resp, err := c.solver.Solve(ctx, req.Ref, frontend.SolveRequest{
Frontend: req.Frontend,
Definition: req.Definition,
FrontendOpt: req.FrontendAttrs,
ImportCacheRefs: importCacheRefs,
}, llbsolver.ExporterRequest{
Exporter: expi,
CacheExporter: cacheExporter,
CacheExportMode: parseCacheExporterOpt(req.Cache.ExportAttrs),
}, req.Entitlements)
if err != nil {
return nil, err
}
return &controlapi.SolveResponse{
ExporterResponse: resp.ExporterResponse,
}, nil
}
func (c *Controller) Status(req *controlapi.StatusRequest, stream controlapi.Control_StatusServer) error {
ch := make(chan *client.SolveStatus, 8)
eg, ctx := errgroup.WithContext(stream.Context())
eg.Go(func() error {
return c.solver.Status(ctx, req.Ref, ch)
})
eg.Go(func() error {
for {
ss, ok := <-ch
if !ok {
return nil
}
sr := controlapi.StatusResponse{}
for _, v := range ss.Vertexes {
sr.Vertexes = append(sr.Vertexes, &controlapi.Vertex{
Digest: v.Digest,
Inputs: v.Inputs,
Name: v.Name,
Started: v.Started,
Completed: v.Completed,
Error: v.Error,
Cached: v.Cached,
})
}
for _, v := range ss.Statuses {
sr.Statuses = append(sr.Statuses, &controlapi.VertexStatus{
ID: v.ID,
Vertex: v.Vertex,
Name: v.Name,
Current: v.Current,
Total: v.Total,
Timestamp: v.Timestamp,
Started: v.Started,
Completed: v.Completed,
})
}
for _, v := range ss.Logs {
sr.Logs = append(sr.Logs, &controlapi.VertexLog{
Vertex: v.Vertex,
Stream: int64(v.Stream),
Msg: v.Data,
Timestamp: v.Timestamp,
})
}
if err := stream.SendMsg(&sr); err != nil {
return err
}
}
})
return eg.Wait()
}
func (c *Controller) Session(stream controlapi.Control_SessionServer) error {
logrus.Debugf("session started")
conn, closeCh, opts := grpchijack.Hijack(stream)
defer conn.Close()
ctx, cancel := context.WithCancel(stream.Context())
go func() {
<-closeCh
cancel()
}()
err := c.opt.SessionManager.HandleConn(ctx, conn, opts)
logrus.Debugf("session finished: %v", err)
return err
}
func (c *Controller) ListWorkers(ctx context.Context, r *controlapi.ListWorkersRequest) (*controlapi.ListWorkersResponse, error) {
resp := &controlapi.ListWorkersResponse{}
workers, err := c.opt.WorkerController.List(r.Filter...)
if err != nil {
return nil, err
}
for _, w := range workers {
resp.Record = append(resp.Record, &apitypes.WorkerRecord{
ID: w.ID(),
Labels: w.Labels(),
Platforms: pb.PlatformsFromSpec(w.Platforms()),
GCPolicy: toPBGCPolicy(w.GCPolicy()),
})
}
return resp, nil
}
func (c *Controller) gc() {
c.gcmu.Lock()
defer c.gcmu.Unlock()
workers, err := c.opt.WorkerController.List()
if err != nil {
return
}
eg, ctx := errgroup.WithContext(context.TODO())
var size int64
ch := make(chan client.UsageInfo)
done := make(chan struct{})
go func() {
for ui := range ch {
size += ui.Size
}
close(done)
}()
for _, w := range workers {
func(w worker.Worker) {
eg.Go(func() error {
if policy := w.GCPolicy(); len(policy) > 0 {
return w.Prune(ctx, ch, policy...)
}
return nil
})
}(w)
}
err = eg.Wait()
close(ch)
if err != nil {
logrus.Errorf("gc error: %+v", err)
}
<-done
if size > 0 {
logrus.Debugf("gc cleaned up %d bytes", size)
}
}
func parseCacheExporterOpt(opt map[string]string) solver.CacheExportMode {
for k, v := range opt {
switch k {
case "mode":
switch v {
case "min":
return solver.CacheExportModeMin
case "max":
return solver.CacheExportModeMax
default:
logrus.Debugf("skipping incalid cache export mode: %s", v)
}
default:
logrus.Warnf("skipping invalid cache export opt: %s", v)
}
}
return solver.CacheExportModeMin
}
func toPBGCPolicy(in []client.PruneInfo) []*apitypes.GCPolicy {
policy := make([]*apitypes.GCPolicy, 0, len(in))
for _, p := range in {
policy = append(policy, &apitypes.GCPolicy{
All: p.All,
KeepBytes: p.KeepBytes,
KeepDuration: int64(p.KeepDuration),
Filters: p.Filter,
})
}
return policy
}