blob: 28e32204f57c0570b692c20d309c342da5e82a16 [file] [log] [blame]
package local
import (
"context"
"io/ioutil"
"os"
"strings"
"time"
"github.com/docker/docker/pkg/idtools"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/progress"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)
type Opt struct {
SessionManager *session.Manager
}
type localExporter struct {
opt Opt
// session manager
}
func New(opt Opt) (exporter.Exporter, error) {
le := &localExporter{opt: opt}
return le, nil
}
func (e *localExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
return &localExporterInstance{localExporter: e}, nil
}
type localExporterInstance struct {
*localExporter
}
func (e *localExporterInstance) Name() string {
return "exporting to client"
}
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
if err != nil {
return nil, err
}
isMap := len(inp.Refs) > 0
export := func(ctx context.Context, k string, ref cache.ImmutableRef) func() error {
return func() error {
var src string
var err error
var idmap *idtools.IdentityMapping
if ref == nil {
src, err = ioutil.TempDir("", "buildkit")
if err != nil {
return err
}
defer os.RemoveAll(src)
} else {
mount, err := ref.Mount(ctx, true)
if err != nil {
return err
}
lm := snapshot.LocalMounter(mount)
src, err = lm.Mount()
if err != nil {
return err
}
idmap = mount.IdentityMapping()
defer lm.Unmount()
}
walkOpt := &fsutil.WalkOpt{}
if idmap != nil {
walkOpt.Map = func(p string, st *fstypes.Stat) bool {
uid, gid, err := idmap.ToContainer(idtools.Identity{
UID: int(st.Uid),
GID: int(st.Gid),
})
if err != nil {
return false
}
st.Uid = uint32(uid)
st.Gid = uint32(gid)
return true
}
}
fs := fsutil.NewFS(src, walkOpt)
lbl := "copying files"
if isMap {
lbl += " " + k
fs, err = fsutil.SubDirFS([]fsutil.Dir{{FS: fs, Stat: fstypes.Stat{
Mode: uint32(os.ModeDir | 0755),
Path: strings.Replace(k, "/", "_", -1),
}}})
if err != nil {
return err
}
}
progress := newProgressHandler(ctx, lbl)
if err := filesync.CopyToCaller(ctx, fs, caller, progress); err != nil {
return err
}
return nil
}
}
eg, ctx := errgroup.WithContext(ctx)
if isMap {
for k, ref := range inp.Refs {
eg.Go(export(ctx, k, ref))
}
} else {
eg.Go(export(ctx, "", inp.Ref))
}
if err := eg.Wait(); err != nil {
return nil, err
}
return nil, nil
}
func newProgressHandler(ctx context.Context, id string) func(int, bool) {
limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
pw, _, _ := progress.FromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
Action: "transferring",
}
pw.Write(id, st)
return func(s int, last bool) {
if last || limiter.Allow() {
st.Current = s
if last {
now := time.Now()
st.Completed = &now
}
pw.Write(id, st)
if last {
pw.Close()
}
}
}
}