| package local |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "time" |
| |
| "github.com/docker/docker/pkg/idtools" |
| "github.com/moby/buildkit/cache" |
| "github.com/moby/buildkit/cache/contenthash" |
| "github.com/moby/buildkit/cache/metadata" |
| "github.com/moby/buildkit/client" |
| "github.com/moby/buildkit/session" |
| "github.com/moby/buildkit/session/filesync" |
| "github.com/moby/buildkit/snapshot" |
| "github.com/moby/buildkit/source" |
| "github.com/moby/buildkit/util/progress" |
| digest "github.com/opencontainers/go-digest" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| "github.com/tonistiigi/fsutil" |
| fstypes "github.com/tonistiigi/fsutil/types" |
| bolt "go.etcd.io/bbolt" |
| "golang.org/x/time/rate" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| const keySharedKey = "local.sharedKey" |
| |
| type Opt struct { |
| CacheAccessor cache.Accessor |
| MetadataStore *metadata.Store |
| } |
| |
| func NewSource(opt Opt) (source.Source, error) { |
| ls := &localSource{ |
| cm: opt.CacheAccessor, |
| md: opt.MetadataStore, |
| } |
| return ls, nil |
| } |
| |
| type localSource struct { |
| cm cache.Accessor |
| md *metadata.Store |
| } |
| |
| func (ls *localSource) ID() string { |
| return source.LocalScheme |
| } |
| |
| func (ls *localSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) { |
| localIdentifier, ok := id.(*source.LocalIdentifier) |
| if !ok { |
| return nil, errors.Errorf("invalid local identifier %v", id) |
| } |
| |
| return &localSourceHandler{ |
| src: *localIdentifier, |
| sm: sm, |
| localSource: ls, |
| }, nil |
| } |
| |
| type localSourceHandler struct { |
| src source.LocalIdentifier |
| sm *session.Manager |
| *localSource |
| } |
| |
| func (ls *localSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) { |
| sessionID := ls.src.SessionID |
| |
| if sessionID == "" { |
| id := g.SessionIterator().NextSession() |
| if id == "" { |
| return "", false, errors.New("could not access local files without session") |
| } |
| sessionID = id |
| } |
| dt, err := json.Marshal(struct { |
| SessionID string |
| IncludePatterns []string |
| ExcludePatterns []string |
| FollowPaths []string |
| }{SessionID: sessionID, IncludePatterns: ls.src.IncludePatterns, ExcludePatterns: ls.src.ExcludePatterns, FollowPaths: ls.src.FollowPaths}) |
| if err != nil { |
| return "", false, err |
| } |
| return "session:" + ls.src.Name + ":" + digest.FromBytes(dt).String(), true, nil |
| } |
| |
| func (ls *localSourceHandler) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) { |
| var ref cache.ImmutableRef |
| err := ls.sm.Any(ctx, g, func(ctx context.Context, _ string, c session.Caller) error { |
| r, err := ls.snapshot(ctx, c) |
| if err != nil { |
| return err |
| } |
| ref = r |
| return nil |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return ref, nil |
| } |
| |
| func (ls *localSourceHandler) snapshot(ctx context.Context, caller session.Caller) (out cache.ImmutableRef, retErr error) { |
| sharedKey := keySharedKey + ":" + ls.src.Name + ":" + ls.src.SharedKeyHint + ":" + caller.SharedKey() // TODO: replace caller.SharedKey() with source based hint from client(absolute-path+nodeid) |
| |
| var mutable cache.MutableRef |
| sis, err := ls.md.Search(sharedKey) |
| if err != nil { |
| return nil, err |
| } |
| for _, si := range sis { |
| if m, err := ls.cm.GetMutable(ctx, si.ID()); err == nil { |
| logrus.Debugf("reusing ref for local: %s", m.ID()) |
| mutable = m |
| break |
| } |
| } |
| |
| if mutable == nil { |
| m, err := ls.cm.New(ctx, nil, cache.CachePolicyRetain, cache.WithRecordType(client.UsageRecordTypeLocalSource), cache.WithDescription(fmt.Sprintf("local source for %s", ls.src.Name))) |
| if err != nil { |
| return nil, err |
| } |
| mutable = m |
| logrus.Debugf("new ref for local: %s", mutable.ID()) |
| } |
| |
| defer func() { |
| if retErr != nil && mutable != nil { |
| // on error remove the record as checksum update is in undefined state |
| cache.CachePolicyDefault(mutable) |
| if err := mutable.Metadata().Commit(); err != nil { |
| logrus.Errorf("failed to reset mutable cachepolicy: %v", err) |
| } |
| contenthash.ClearCacheContext(mutable.Metadata()) |
| go mutable.Release(context.TODO()) |
| } |
| }() |
| |
| mount, err := mutable.Mount(ctx, false) |
| if err != nil { |
| return nil, err |
| } |
| |
| lm := snapshot.LocalMounter(mount) |
| |
| dest, err := lm.Mount() |
| if err != nil { |
| return nil, err |
| } |
| |
| defer func() { |
| if retErr != nil && lm != nil { |
| lm.Unmount() |
| } |
| }() |
| |
| cc, err := contenthash.GetCacheContext(ctx, mutable.Metadata(), mount.IdentityMapping()) |
| if err != nil { |
| return nil, err |
| } |
| |
| opt := filesync.FSSendRequestOpt{ |
| Name: ls.src.Name, |
| IncludePatterns: ls.src.IncludePatterns, |
| ExcludePatterns: ls.src.ExcludePatterns, |
| FollowPaths: ls.src.FollowPaths, |
| OverrideExcludes: false, |
| DestDir: dest, |
| CacheUpdater: &cacheUpdater{cc, mount.IdentityMapping()}, |
| ProgressCb: newProgressHandler(ctx, "transferring "+ls.src.Name+":"), |
| } |
| |
| if idmap := mount.IdentityMapping(); idmap != nil { |
| opt.Filter = func(p string, stat *fstypes.Stat) bool { |
| identity, err := idmap.ToHost(idtools.Identity{ |
| UID: int(stat.Uid), |
| GID: int(stat.Gid), |
| }) |
| if err != nil { |
| return false |
| } |
| stat.Uid = uint32(identity.UID) |
| stat.Gid = uint32(identity.GID) |
| return true |
| } |
| } |
| |
| if err := filesync.FSSync(ctx, caller, opt); err != nil { |
| if status.Code(err) == codes.NotFound { |
| return nil, errors.Errorf("local source %s not enabled from the client", ls.src.Name) |
| } |
| return nil, err |
| } |
| |
| if err := lm.Unmount(); err != nil { |
| return nil, err |
| } |
| lm = nil |
| |
| if err := contenthash.SetCacheContext(ctx, mutable.Metadata(), cc); err != nil { |
| return nil, err |
| } |
| |
| // skip storing snapshot by the shared key if it already exists |
| skipStoreSharedKey := false |
| si, _ := ls.md.Get(mutable.ID()) |
| if v := si.Get(keySharedKey); v != nil { |
| var str string |
| if err := v.Unmarshal(&str); err != nil { |
| return nil, err |
| } |
| skipStoreSharedKey = str == sharedKey |
| } |
| if !skipStoreSharedKey { |
| v, err := metadata.NewValue(sharedKey) |
| if err != nil { |
| return nil, err |
| } |
| v.Index = sharedKey |
| if err := si.Update(func(b *bolt.Bucket) error { |
| return si.SetValue(b, sharedKey, v) |
| }); err != nil { |
| return nil, err |
| } |
| logrus.Debugf("saved %s as %s", mutable.ID(), sharedKey) |
| } |
| |
| snap, err := mutable.Commit(ctx) |
| if err != nil { |
| return nil, err |
| } |
| |
| mutable = nil // avoid deferred cleanup |
| |
| return snap, nil |
| } |
| |
| func newProgressHandler(ctx context.Context, id string) func(int, bool) { |
| limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 1) |
| pw, _, _ := progress.FromContext(ctx) |
| now := time.Now() |
| st := progress.Status{ |
| Started: &now, |
| Action: "transferring", |
| } |
| pw.Write(id, st) |
| return func(s int, last bool) { |
| if last || limiter.Allow() { |
| st.Current = s |
| if last { |
| now := time.Now() |
| st.Completed = &now |
| } |
| pw.Write(id, st) |
| if last { |
| pw.Close() |
| } |
| } |
| } |
| } |
| |
| type cacheUpdater struct { |
| contenthash.CacheContext |
| idmap *idtools.IdentityMapping |
| } |
| |
| func (cu *cacheUpdater) MarkSupported(bool) { |
| } |
| |
| func (cu *cacheUpdater) ContentHasher() fsutil.ContentHasher { |
| return contenthash.NewFromStat |
| } |